pkg/eval: Simplify API for output capture.

This commit is contained in:
Qi Xiao 2020-04-10 01:12:25 +01:00
parent af6b523e02
commit 096ee354d6
10 changed files with 72 additions and 97 deletions

View File

@ -402,7 +402,9 @@ func adaptMatcherMap(nt notifier, ev *eval.Evaler, m vals.Map) complete.Filterer
{File: os.Stderr},
}
fm := eval.NewTopFrame(ev, eval.NewInternalGoSource("[editor matcher]"), ports)
outputs, err := fm.CaptureOutput(matcher, []interface{}{seed}, eval.NoOpts)
outputs, err := fm.CaptureOutput(func(fm *eval.Frame) error {
return matcher.Call(fm, []interface{}{seed}, eval.NoOpts)
})
if err != nil {
nt.Notify(fmt.Sprintf("[matcher error] %s", err))
// Continue with whatever values have been output
@ -472,7 +474,10 @@ func adaptArgGeneratorMap(ev *eval.Evaler, m vals.Map) complete.ArgGenerator {
}
}
fm := eval.NewTopFrame(ev, eval.NewInternalGoSource("[editor arg generator]"), ports)
err := fm.CallWithOutputCallback(gen, argValues, eval.NoOpts, valueCb, bytesCb)
f := func(fm *eval.Frame) error {
return gen.Call(fm, argValues, eval.NoOpts)
}
err := fm.PipeOutput(f, valueCb, bytesCb)
return output, err
}
}

View File

@ -79,7 +79,8 @@ func instantStart(app cli.App, ev *eval.Evaler, binding cli.Handler) {
addLine(strings.TrimSuffix(line, "\n"))
}
}
err = fm.ExecWithOutputCallback(op, valuesCb, bytesCb)
err = fm.PipeOutput(
func(fm *eval.Frame) error { return fm.Eval(op) }, valuesCb, bytesCb)
return output, err
}
instant.Start(app, instant.Config{Binding: binding, Execute: execute})

View File

@ -63,8 +63,8 @@ func listingStartCustom(app cli.App, fm *eval.Frame, opts customListingOpts, ite
}
}
}
err := fm.CallWithOutputCallback(
fn, []interface{}{q}, eval.NoOpts, valuesCb, bytesCb)
f := func(fm *eval.Frame) error { return fn.Call(fm, []interface{}{q}, eval.NoOpts) }
err := fm.PipeOutput(f, valuesCb, bytesCb)
// TODO(xiaq): Report the error.
_ = err
return items

View File

@ -170,7 +170,8 @@ func callForStyledText(nt notifier, ev *eval.Evaler, fn eval.Callable, args ...i
}
// XXX There is no source to pass to NewTopEvalCtx.
fm := eval.NewTopFrame(ev, eval.NewInternalGoSource("[prompt]"), ports)
err := fm.CallWithOutputCallback(fn, args, eval.NoOpts, valuesCb, bytesCb)
f := func(fm *eval.Frame) error { return fn.Call(fm, args, eval.NoOpts) }
err := fm.PipeOutput(f, valuesCb, bytesCb)
if err != nil {
nt.Notify(fmt.Sprintf("prompt function error: %v", err))

View File

@ -153,7 +153,9 @@ func Styled(fm *Frame, input interface{}, stylings ...interface{}) (ui.Text, err
text = ui.StyleText(text, parsedStyling)
case Callable:
for i, seg := range text {
vs, err := fm.CaptureOutput(styling, []interface{}{seg}, NoOpts)
vs, err := fm.CaptureOutput(func(fm *Frame) error {
return styling.Call(fm, []interface{}{seg}, NoOpts)
})
if err != nil {
return nil, err
}

View File

@ -624,9 +624,3 @@ func (op seqOp) invoke(fm *Frame) error {
}
return nil
}
type funcOp func(*Frame) error
func (op funcOp) invoke(fm *Frame) error {
return op(fm)
}

View File

@ -342,75 +342,68 @@ func (op exceptionCaptureOp) invoke(fm *Frame) ([]interface{}, error) {
type outputCaptureOp struct{ subop effectOp }
func (op outputCaptureOp) invoke(fm *Frame) ([]interface{}, error) {
return pcaptureOutput(fm, op.subop)
return captureOutput(fm, op.subop.exec)
}
func pcaptureOutput(fm *Frame, op effectOp) ([]interface{}, error) {
func captureOutput(fm *Frame, f func(*Frame) error) ([]interface{}, error) {
vs := []interface{}{}
var m sync.Mutex
valueCb := func(ch <-chan interface{}) {
for v := range ch {
m.Lock()
vs = append(vs, v)
m.Unlock()
}
}
bytesCb := func(r *os.File) {
buffered := bufio.NewReader(r)
for {
line, err := buffered.ReadString('\n')
if line != "" {
v := strings.TrimSuffix(line, "\n")
err := pipeOutput(fm, f,
func(ch <-chan interface{}) {
for v := range ch {
m.Lock()
vs = append(vs, v)
m.Unlock()
}
if err != nil {
if err != io.EOF {
logger.Println("error on reading:", err)
},
func(r *os.File) {
buffered := bufio.NewReader(r)
for {
line, err := buffered.ReadString('\n')
if line != "" {
v := strings.TrimSuffix(line, "\n")
m.Lock()
vs = append(vs, v)
m.Unlock()
}
if err != nil {
if err != io.EOF {
logger.Println("error on reading:", err)
}
break
}
break
}
}
}
err := pcaptureOutputInner(fm, op, valueCb, bytesCb)
})
return vs, err
}
func pcaptureOutputInner(fm *Frame, op effectOp, valuesCb func(<-chan interface{}), bytesCb func(*os.File)) error {
func pipeOutput(fm *Frame, f func(*Frame) error, valuesCb func(<-chan interface{}), bytesCb func(*os.File)) error {
newFm := fm.fork("[output capture]")
ch := make(chan interface{}, outputCaptureBufferSize)
pipeRead, pipeWrite, err := os.Pipe()
r, w, err := os.Pipe()
if err != nil {
return fmt.Errorf("failed to create pipe: %v", err)
return err
}
newFm.ports[1] = &Port{
Chan: ch, CloseChan: true,
File: pipeWrite, CloseFile: true,
File: w, CloseFile: true,
}
bytesCollected := make(chan struct{})
chCollected := make(chan struct{})
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
valuesCb(ch)
close(chCollected)
}()
go func() {
bytesCb(pipeRead)
pipeRead.Close()
close(bytesCollected)
defer wg.Done()
defer r.Close()
bytesCb(r)
}()
err = op.exec(newFm)
err = f(newFm)
newFm.Close()
<-bytesCollected
<-chCollected
wg.Wait()
return err
}

View File

@ -7,7 +7,6 @@ import (
"syscall"
"testing"
"github.com/elves/elvish/pkg/diag"
"github.com/elves/elvish/pkg/eval/vals"
)
@ -66,42 +65,38 @@ func TestConcurrentEval(t *testing.T) {
}
func BenchmarkOutputCaptureOverhead(b *testing.B) {
op := effectOp{funcOp(func(*Frame) error { return nil }), diag.Ranging{}}
benchmarkOutputCapture(op, b.N)
benchmarkOutputCapture(b.N, func(fm *Frame) {})
}
func BenchmarkOutputCaptureValues(b *testing.B) {
op := effectOp{funcOp(func(fm *Frame) error {
benchmarkOutputCapture(b.N, func(fm *Frame) {
fm.ports[1].Chan <- "test"
return nil
}), diag.Ranging{}}
benchmarkOutputCapture(op, b.N)
})
}
func BenchmarkOutputCaptureBytes(b *testing.B) {
bytesToWrite := []byte("test")
op := effectOp{funcOp(func(fm *Frame) error {
benchmarkOutputCapture(b.N, func(fm *Frame) {
fm.ports[1].File.Write(bytesToWrite)
return nil
}), diag.Ranging{}}
benchmarkOutputCapture(op, b.N)
})
}
func BenchmarkOutputCaptureMixed(b *testing.B) {
bytesToWrite := []byte("test")
op := effectOp{funcOp(func(fm *Frame) error {
benchmarkOutputCapture(b.N, func(fm *Frame) {
fm.ports[1].Chan <- false
fm.ports[1].File.Write(bytesToWrite)
return nil
}), diag.Ranging{}}
benchmarkOutputCapture(op, b.N)
})
}
func benchmarkOutputCapture(op effectOp, n int) {
func benchmarkOutputCapture(n int, f func(*Frame)) {
ev := NewEvaler()
defer ev.Close()
ec := NewTopFrame(ev, NewInternalGoSource("[benchmark]"), []*Port{{}, {}, {}})
fm := NewTopFrame(ev, NewInternalGoSource("[benchmark]"), []*Port{{}, {}, {}})
for i := 0; i < n; i++ {
pcaptureOutput(ec, op)
captureOutput(fm, func(fm *Frame) error {
f(fm)
return nil
})
}
}

View File

@ -145,32 +145,14 @@ func (fm *Frame) Eval(op Op) error {
return op.Inner.exec(fm)
}
// CaptureOutput calls a function with the given arguments and options,
// capturing and returning the output. It does so in a protected environment so
// that exceptions thrown are wrapped in an Error.
func (fm *Frame) CaptureOutput(fn Callable, args []interface{}, opts map[string]interface{}) (vs []interface{}, err error) {
// XXX There is no source.
opFunc := func(f *Frame) error {
return fn.Call(f, args, opts)
}
return pcaptureOutput(fm, effectOp{funcOp(opFunc), diag.Ranging{From: -1, To: -1}})
// CaptureOutput captures the output of a given callback that operates on a Frame.
func (fm *Frame) CaptureOutput(f func(*Frame) error) ([]interface{}, error) {
return captureOutput(fm, f)
}
// CallWithOutputCallback calls a function with the given arguments and options,
// feeding the outputs to the given callbacks. It does so in a protected
// environment so that exceptions thrown are wrapped in an Error.
func (fm *Frame) CallWithOutputCallback(fn Callable, args []interface{}, opts map[string]interface{}, valuesCb func(<-chan interface{}), bytesCb func(*os.File)) error {
// XXX There is no source.
opFunc := func(f *Frame) error {
return fn.Call(f, args, opts)
}
return pcaptureOutputInner(fm, effectOp{funcOp(opFunc), diag.Ranging{From: -1, To: -1}}, valuesCb, bytesCb)
}
// ExecWithOutputCallback executes an Op, feeding the outputs to the given
// callbacks.
func (fm *Frame) ExecWithOutputCallback(op Op, valuesCb func(<-chan interface{}), bytesCb func(*os.File)) error {
return pcaptureOutputInner(fm, op.Inner, valuesCb, bytesCb)
// PipeOutput calls a callback with output piped to the given output handlers.
func (fm *Frame) PipeOutput(f func(*Frame) error, valuesCb func(<-chan interface{}), bytesCb func(*os.File)) error {
return pipeOutput(fm, f, valuesCb, bytesCb)
}
func (fm *Frame) addTraceback(r diag.Ranger) *stackTrace {

View File

@ -102,7 +102,9 @@ func replace(fm *eval.Frame, opts replaceOpts, argPattern string, argRepl interf
if errReplace != nil {
return ""
}
values, err := fm.CaptureOutput(repl, []interface{}{s}, eval.NoOpts)
values, err := fm.CaptureOutput(func(fm *eval.Frame) error {
return repl.Call(fm, []interface{}{s}, eval.NoOpts)
})
if err != nil {
errReplace = err
return ""