pkg/eval: Clean up Port related code.

This commit is contained in:
Qi Xiao 2021-01-05 04:07:35 +00:00
parent 6e8ecc8bd5
commit 2033600881
11 changed files with 82 additions and 93 deletions

View File

@ -33,7 +33,7 @@ func initInstant(ed *Editor, ev *eval.Evaler, nb eval.NsBuilder) {
func instantStart(app cli.App, ev *eval.Evaler, binding cli.Handler) {
execute := func(code string) ([]string, error) {
outPort, collect, err := eval.CaptureStringPort()
outPort, collect, err := eval.StringCapturePort()
if err != nil {
return nil, err
}

View File

@ -103,11 +103,13 @@ func makeNotifyPort(nt notifier) (*eval.Port, func()) {
}
nt.notifyf("[bytes out] %s", line[:len(line)-1])
}
r.Close()
wg.Done()
}()
port := &eval.Port{Chan: ch, File: w, CloseChan: true, CloseFile: true}
port := &eval.Port{Chan: ch, File: w}
cleanup := func() {
port.Close()
close(ch)
w.Close()
wg.Wait()
}
return port, cleanup

View File

@ -40,7 +40,7 @@ func minibufSubmit(ed *Editor, ev *eval.Evaler) {
src := parse.Source{Name: "[minibuf]", Code: code}
notifyPort, cleanup := makeNotifyPort(ed)
defer cleanup()
ports := []*eval.Port{eval.DevNullClosedChan, notifyPort, notifyPort}
ports := []*eval.Port{eval.DummyInputPort, notifyPort, notifyPort}
err := ev.Eval(src, eval.EvalCfg{Ports: ports})
if err != nil {
app.Notify(err.Error())

View File

@ -170,7 +170,7 @@ func peach(fm *Frame, f Callable, inputs Inputs) error {
w.Add(1)
go func() {
newFm := fm.fork("closure of peach")
newFm.ports[0] = DevNullClosedChan
newFm.ports[0] = DummyInputPort
ex := f.Call(newFm, []interface{}{v}, NoOpts)
newFm.Close()

View File

@ -129,9 +129,9 @@ func (op *pipelineOp) exec(fm *Frame) error {
}
ch := make(chan interface{}, pipelineChanBufferSize)
newFm.ports[1] = &Port{
File: writer, Chan: ch, CloseFile: true, CloseChan: true}
File: writer, Chan: ch, closeFile: true, closeChan: true}
nextIn = &Port{
File: reader, Chan: ch, CloseFile: true, CloseChan: false}
File: reader, Chan: ch, closeFile: true, closeChan: false}
}
thisOp := formOp
thisError := &errors[i]
@ -519,7 +519,7 @@ func (op *redirOp) exec(fm *Frame) error {
}
growPorts(&fm.ports, dst+1)
fm.ports[dst].Close()
fm.ports[dst].close()
if op.srcIsFd {
src, err := evalForFd(fm, op.srcOp, true, "redirection source")
@ -533,7 +533,7 @@ func (op *redirOp) exec(fm *Frame) error {
case src >= len(fm.ports) || fm.ports[src] == nil:
return fm.errorp(op, invalidFD{src})
default:
fm.ports[dst] = fm.ports[src].Fork()
fm.ports[dst] = fm.ports[src].fork()
}
return nil
}
@ -547,9 +547,9 @@ func (op *redirOp) exec(fm *Frame) error {
if err != nil {
return fm.errorpf(op, "failed to open file %s: %s", vals.Repr(src, vals.NoPretty), err)
}
fm.ports[dst] = &Port{File: f, CloseFile: true, Chan: chanForFileRedir(op.mode)}
fm.ports[dst] = &Port{File: f, closeFile: true, Chan: chanForFileRedir(op.mode)}
case vals.File:
fm.ports[dst] = &Port{File: src, CloseFile: false, Chan: chanForFileRedir(op.mode)}
fm.ports[dst] = &Port{File: src, closeFile: false, Chan: chanForFileRedir(op.mode)}
case vals.Pipe:
var f *os.File
switch op.mode {
@ -560,7 +560,7 @@ func (op *redirOp) exec(fm *Frame) error {
default:
return fm.errorpf(op, "can only use < or > with pipes")
}
fm.ports[dst] = &Port{File: f, CloseFile: false, Chan: chanForFileRedir(op.mode)}
fm.ports[dst] = &Port{File: f, closeFile: false, Chan: chanForFileRedir(op.mode)}
default:
return fm.errorp(op.srcOp, errs.BadValue{
What: "redirection source",

View File

@ -391,8 +391,8 @@ func (ev *Evaler) Chdir(path string) error {
type EvalCfg struct {
// Ports to use in evaluation. The first 3 elements, if not specified
// (either being nil or Ports containing fewer than 3 elements),
// will be filled with DevNullClosedChan, DevNullBlackholeChan and
// DevNullBlackholeChan respectively.
// will be filled with DummyInputPort, DummyOutputPort and
// DummyOutputPort respectively.
Ports []*Port
// Callback to get a channel of interrupt signals and a function to call
// when the channel is no longer needed.
@ -409,13 +409,13 @@ func (cfg *EvalCfg) fillDefaults(ev *Evaler) {
cfg.Ports = append(cfg.Ports, make([]*Port, 3-len(cfg.Ports))...)
}
if cfg.Ports[0] == nil {
cfg.Ports[0] = DevNullClosedChan
cfg.Ports[0] = DummyInputPort
}
if cfg.Ports[1] == nil {
cfg.Ports[1] = DevNullBlackholeChan
cfg.Ports[1] = DummyOutputPort
}
if cfg.Ports[2] == nil {
cfg.Ports[2] = DevNullBlackholeChan
cfg.Ports[2] = DummyOutputPort
}
if cfg.Global == nil {

View File

@ -170,7 +170,7 @@ func EvalAndCollect(t *testing.T, ev *eval.Evaler, texts []string) Result {
wg.Done()
}()
ports := []*eval.Port{
eval.DevNullClosedChan,
eval.DummyInputPort,
{File: stdout, Chan: outCh},
{File: stderr, Chan: eval.BlackholeChan},
}

View File

@ -35,7 +35,7 @@ type Frame struct {
// error. It may be called only once.
func (fm *Frame) Close() error {
for _, port := range fm.ports {
port.Close()
port.close()
}
return nil
}
@ -113,7 +113,7 @@ func (fm *Frame) fork(name string) *Frame {
newPorts := make([]*Port, len(fm.ports))
for i, p := range fm.ports {
if p != nil {
newPorts[i] = p.Fork()
newPorts[i] = p.fork()
}
}
return &Frame{

View File

@ -15,24 +15,24 @@ import (
type Port struct {
File *os.File
Chan chan interface{}
CloseFile bool
CloseChan bool
closeFile bool
closeChan bool
}
// Fork returns a copy of a Port with the Close* flags unset.
func (p *Port) Fork() *Port {
// Returns a copy of the Port with the Close* flags unset.
func (p *Port) fork() *Port {
return &Port{p.File, p.Chan, false, false}
}
// Close closes a Port.
func (p *Port) Close() {
// Closes a Port.
func (p *Port) close() {
if p == nil {
return
}
if p.CloseFile {
if p.closeFile {
p.File.Close()
}
if p.CloseChan {
if p.closeChan {
close(p.Chan)
}
}
@ -47,12 +47,12 @@ var (
// output.
DevNull = getDevNull()
// DevNullClosedChan is a port made up from DevNull and ClosedChan, suitable
// as a placeholder input port.
DevNullClosedChan = &Port{File: DevNull, Chan: ClosedChan}
// DevNullBlackholeChan is a port made up from DevNull and BlackholeChan,
// DummyInputPort is a port made up from DevNull and ClosedChan, suitable as
// a placeholder input port.
DummyInputPort = &Port{File: DevNull, Chan: ClosedChan}
// DummyOutputPort is a port made up from DevNull and BlackholeChan,
// suitable as a placeholder output port.
DevNullBlackholeChan = &Port{File: DevNull, Chan: BlackholeChan}
DummyOutputPort = &Port{File: DevNull, Chan: BlackholeChan}
)
func getClosedChan() chan interface{} {
@ -102,9 +102,9 @@ func PipePort(vCb func(<-chan interface{}), bCb func(*os.File)) (*Port, func(),
bCb(r)
}()
port := &Port{Chan: ch, CloseChan: true, File: w, CloseFile: true}
port := &Port{Chan: ch, closeChan: true, File: w, closeFile: true}
done := func() {
port.Close()
port.close()
wg.Wait()
}
return port, done, nil
@ -151,9 +151,9 @@ func CapturePort() (*Port, func() []interface{}, error) {
}, nil
}
// CaptureStringPort is like CapturePort, but processes value outputs by
// StringCapturePort is like CapturePort, but processes value outputs by
// stringifying them and prepending an output marker.
func CaptureStringPort() (*Port, func() []string, error) {
func StringCapturePort() (*Port, func() []string, error) {
var lines []string
var mu sync.Mutex
addLine := func(line string) {
@ -188,3 +188,45 @@ func CaptureStringPort() (*Port, func() []string, error) {
return lines
}, nil
}
// Buffer size for the channel to use in FilePort. The value has been chosen
// arbitrarily.
const filePortChanSize = 32
// FilePort returns an output *Port where the byte component is the file itself,
// and the value component is converted to an internal channel that writes
// each value to the file, prepending with a prefix. It also returns a cleanup
// function, which should be called when the *Port is no longer needed.
func FilePort(f *os.File, valuePrefix string) (*Port, func()) {
ch := make(chan interface{}, filePortChanSize)
relayDone := make(chan struct{})
go func() {
for v := range ch {
f.WriteString(valuePrefix)
f.WriteString(vals.Repr(v, vals.NoPretty))
f.WriteString("\n")
}
close(relayDone)
}()
return &Port{File: f, Chan: ch}, func() {
close(ch)
<-relayDone
}
}
// PortsFromStdFiles is a shorthand for calling PortsFromFiles with os.Stdin,
// os.Stdout and os.Stderr.
func PortsFromStdFiles(prefix string) ([]*Port, func()) {
return PortsFromFiles([3]*os.File{os.Stdin, os.Stdout, os.Stderr}, prefix)
}
// PortsFromFiles builds 3 ports from 3 files. It also returns a function that
// should be called when the ports are no longer needed.
func PortsFromFiles(files [3]*os.File, prefix string) ([]*Port, func()) {
port1, cleanup1 := FilePort(files[1], prefix)
port2, cleanup2 := FilePort(files[2], prefix)
return []*Port{{File: files[0], Chan: ClosedChan}, port1, port2}, func() {
cleanup1()
cleanup2()
}
}

View File

@ -1,55 +0,0 @@
package eval
import (
"os"
"sync"
"github.com/elves/elvish/pkg/eval/vals"
)
const (
stdoutChanSize = 32
stderrChanSize = 32
)
// DevNullPorts is 3 placeholder ports.
var DevNullPorts = [3]*Port{
DevNullClosedChan, DevNullBlackholeChan, DevNullBlackholeChan}
// PortsFromStdFiles is a shorthand for calling PortsFromFiles with os.Stdin,
// os.Stdout and os.Stderr.
func PortsFromStdFiles(prefix string) ([]*Port, func()) {
return PortsFromFiles([3]*os.File{os.Stdin, os.Stdout, os.Stderr}, prefix)
}
// PortsFromFiles builds 3 ports from 3 files. It also returns a function that
// should be called when the ports are no longer needed.
func PortsFromFiles(files [3]*os.File, prefix string) ([]*Port, func()) {
stdoutChan := make(chan interface{}, stdoutChanSize)
stderrChan := make(chan interface{}, stderrChanSize)
relayerWait := new(sync.WaitGroup)
relayerWait.Add(2)
go relayChanToFile(stdoutChan, files[1], prefix, relayerWait)
go relayChanToFile(stderrChan, files[2], prefix, relayerWait)
ports := [3]*Port{
{File: files[0], Chan: ClosedChan},
{File: files[1], Chan: stdoutChan, CloseChan: true},
{File: files[2], Chan: stderrChan, CloseChan: true},
}
return ports[:], func() {
close(stdoutChan)
close(stderrChan)
relayerWait.Wait()
}
}
func relayChanToFile(ch <-chan interface{}, file *os.File, prefix string, w *sync.WaitGroup) {
for v := range ch {
file.WriteString(prefix)
file.WriteString(vals.Repr(v, initIndent))
file.WriteString("\n")
}
w.Done()
}

View File

@ -120,7 +120,7 @@ func evalAndCollect(ev *eval.Evaler, code string) (
errFile, chanErrBytes := makeBytesWriterAndCollect()
ports := []*eval.Port{
eval.DevNullClosedChan,
eval.DummyInputPort,
{File: outFile, Chan: outChan},
{File: errFile, Chan: eval.BlackholeChan},
}