pkg/eval: Implement output capture in terms of simpler primitives.

Introduces two functions, PipePort and CapturePort, and implement output capture
in terms of them. These two functions return *Port instances, which can also be
used in (*Evaler).Eval calls.
This commit is contained in:
Qi Xiao 2021-01-02 01:40:04 +00:00
parent dc59cbfeb4
commit 4f037aeb2a
4 changed files with 109 additions and 79 deletions

View File

@ -77,7 +77,7 @@ func benchmarkOutputCapture(n int, f func(*Frame)) {
defer ev.Close()
fm := NewTopFrame(ev, parse.Source{Name: "[benchmark]"}, []*Port{{}, {}, {}})
for i := 0; i < n; i++ {
captureOutput(fm, func(fm *Frame) error {
fm.CaptureOutput(func(fm *Frame) error {
f(fm)
return nil
})

View File

@ -1,13 +1,9 @@
package eval
import (
"bufio"
"errors"
"fmt"
"io"
"os"
"strings"
"sync"
"github.com/elves/elvish/pkg/diag"
"github.com/elves/elvish/pkg/eval/vals"
@ -15,7 +11,6 @@ import (
"github.com/elves/elvish/pkg/fsutil"
"github.com/elves/elvish/pkg/glob"
"github.com/elves/elvish/pkg/parse"
"github.com/elves/elvish/pkg/strutil"
)
// An operation that produces values.
@ -367,69 +362,7 @@ type outputCaptureOp struct {
}
func (op outputCaptureOp) exec(fm *Frame) ([]interface{}, error) {
return captureOutput(fm, op.subop.exec)
}
func captureOutput(fm *Frame, f func(*Frame) error) ([]interface{}, error) {
vs := []interface{}{}
var m sync.Mutex
err := pipeOutput(fm, f,
func(ch <-chan interface{}) {
for v := range ch {
m.Lock()
vs = append(vs, v)
m.Unlock()
}
},
func(r *os.File) {
buffered := bufio.NewReader(r)
for {
line, err := buffered.ReadString('\n')
if line != "" {
v := strutil.ChopLineEnding(line)
m.Lock()
vs = append(vs, v)
m.Unlock()
}
if err != nil {
if err != io.EOF {
logger.Println("error on reading:", err)
}
break
}
}
})
return vs, err
}
func pipeOutput(fm *Frame, f func(*Frame) error, valuesCb func(<-chan interface{}), bytesCb func(*os.File)) error {
newFm := fm.fork("[output capture]")
ch := make(chan interface{}, outputCaptureBufferSize)
r, w, err := os.Pipe()
if err != nil {
return err
}
newFm.ports[1] = &Port{
Chan: ch, CloseChan: true,
File: w, CloseFile: true,
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
valuesCb(ch)
}()
go func() {
defer wg.Done()
defer r.Close()
bytesCb(r)
}()
err = f(newFm)
newFm.Close()
wg.Wait()
return err
return fm.CaptureOutput(op.subop.exec)
}
func (cp *compiler) lambda(n *parse.Primary) valuesOp {

View File

@ -141,6 +141,13 @@ func (fm *Frame) fork(name string) *Frame {
}
}
// A shorthand for forking a frame and setting the output port.
func (fm *Frame) forkWithOutput(name string, p *Port) *Frame {
newFm := fm.fork(name)
newFm.ports[1] = p
return newFm
}
// Eval evaluates an Op. It is like eval except that it sets fm.srcMeta
// temporarily to op.src during the evaluation.
func (fm *Frame) Eval(op Op) error {
@ -154,12 +161,23 @@ func (fm *Frame) Eval(op Op) error {
// CaptureOutput captures the output of a given callback that operates on a Frame.
func (fm *Frame) CaptureOutput(f func(*Frame) error) ([]interface{}, error) {
return captureOutput(fm, f)
outPort, collect, err := CapturePort()
if err != nil {
return nil, err
}
err = f(fm.forkWithOutput("[output capture]", outPort))
return collect(), err
}
// PipeOutput calls a callback with output piped to the given output handlers.
func (fm *Frame) PipeOutput(f func(*Frame) error, valuesCb func(<-chan interface{}), bytesCb func(*os.File)) error {
return pipeOutput(fm, f, valuesCb, bytesCb)
func (fm *Frame) PipeOutput(f func(*Frame) error, vCb func(<-chan interface{}), bCb func(*os.File)) error {
outPort, done, err := PipePort(vCb, bCb)
if err != nil {
return err
}
err = f(fm.forkWithOutput("[output pipe]", outPort))
done()
return err
}
func (fm *Frame) addTraceback(r diag.Ranger) *StackTrace {

View File

@ -1,8 +1,13 @@
package eval
import (
"bufio"
"fmt"
"io"
"os"
"sync"
"github.com/elves/elvish/pkg/strutil"
)
// Port conveys data stream. It always consists of a byte band and a channel band.
@ -32,18 +37,20 @@ func (p *Port) Close() {
}
var (
// ClosedChan is a closed channel, suitable for use as placeholder channel input.
// ClosedChan is a closed channel, suitable as a placeholder input channel.
ClosedChan = getClosedChan()
// BlackholeChan is channel writes onto which disappear, suitable for use as
// placeholder channel output.
// BlackholeChan is a channel that absorbs all values written to it,
// suitable as a placeholder output channel.
BlackholeChan = getBlackholeChan()
// DevNull is /dev/null.
// DevNull is /dev/null, suitable as a placeholder file for either input or
// output.
DevNull = getDevNull()
// DevNullClosedChan is a port made up from DevNull and ClosedChan,
// suitable as placeholder input port.
// DevNullClosedChan is a port made up from DevNull and ClosedChan, suitable
// as a placeholder input port.
DevNullClosedChan = &Port{File: DevNull, Chan: ClosedChan}
// DevNullBlackholeChan is a port made up from DevNull and BlackholeChan,
// suitable as placeholder output port.
// suitable as a placeholder output port.
DevNullBlackholeChan = &Port{File: DevNull, Chan: BlackholeChan}
)
@ -70,3 +77,75 @@ func getDevNull() *os.File {
}
return f
}
// PipePort returns an output *Port whose value and byte components are both
// piped. The supplied functions are called on a separate goroutine with the
// read ends of the value and byte components of the port. It also returns a
// function to clean up the port and wait for the callbacks to finish.
func PipePort(vCb func(<-chan interface{}), bCb func(*os.File)) (*Port, func(), error) {
r, w, err := os.Pipe()
if err != nil {
return nil, nil, err
}
ch := make(chan interface{}, outputCaptureBufferSize)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
vCb(ch)
}()
go func() {
defer wg.Done()
defer r.Close()
bCb(r)
}()
port := &Port{Chan: ch, CloseChan: true, File: w, CloseFile: true}
done := func() {
port.Close()
wg.Wait()
}
return port, done, nil
}
// CapturePort returns an output *Port whose value and byte components are
// both connected to an internal pipe that saves the output. It also returns a
// function to call to obtain the captured output.
func CapturePort() (*Port, func() []interface{}, error) {
vs := []interface{}{}
var m sync.Mutex
port, done, err := PipePort(
func(ch <-chan interface{}) {
for v := range ch {
m.Lock()
vs = append(vs, v)
m.Unlock()
}
},
func(r *os.File) {
buffered := bufio.NewReader(r)
for {
line, err := buffered.ReadString('\n')
if line != "" {
v := strutil.ChopLineEnding(line)
m.Lock()
vs = append(vs, v)
m.Unlock()
}
if err != nil {
if err != io.EOF {
logger.Println("error on reading:", err)
}
break
}
}
})
if err != nil {
return nil, nil, err
}
return port, func() []interface{} {
done()
return vs
}, nil
}