eval: make all "exec" funcs synchronous

This commit is contained in:
Qi Xiao 2016-01-28 22:00:50 +01:00
parent 4c4313eeb0
commit d0a88e4961
6 changed files with 101 additions and 146 deletions

View File

@ -18,8 +18,8 @@ func (cp *compiler) pipelines(ns []*parse.Pipeline) []valuesOp {
return ops
}
func (cp *compiler) forms(ns []*parse.Form) []exitusChOp {
ops := make([]exitusChOp, len(ns))
func (cp *compiler) forms(ns []*parse.Form) []exitusOp {
ops := make([]exitusOp, len(ns))
for i, n := range ns {
ops[i] = cp.form(n)
}

View File

@ -297,9 +297,7 @@ func each(ec *evalCtx, f *closure) exitus {
in := ec.ports[0].ch
in:
for v := range in {
su := f.Exec(ec.copy("closure of each"), []Value{v})
// F.Exec will put exactly one stateUpdate on the channel
e := <-su
e := f.Exec(ec.copy("closure of each"), []Value{v})
switch e.Sort {
case Ok, Continue:
// nop

View File

@ -75,8 +75,11 @@ func (cp *compiler) pipeline(n *parse.Pipeline) valuesOp {
return func(ec *evalCtx) []Value {
var nextIn *port
updates := make([]<-chan exitus, len(ops))
// For each form, create a dedicated evalCtx and run
exituses := make([]Value, len(ops))
finished := make(chan bool, len(ops))
// For each form, create a dedicated evalCtx and run asynchronously
for i, op := range ops {
newEc := ec.copy(fmt.Sprintf("form op %v", op))
if i > 0 {
@ -96,25 +99,29 @@ func (cp *compiler) pipeline(n *parse.Pipeline) valuesOp {
nextIn = &port{
f: reader, ch: ch, closeF: true, closeCh: false}
}
updates[i] = op(newEc)
thisOp := op
thisExitus := &exituses[i]
go func() {
*thisExitus = thisOp(newEc)
finished <- true
}()
}
// Collect exit values
exits := make([]Value, len(ops))
for i, update := range updates {
exits[i] = <-update
// Wait for all forms to finish
for i := 0; i < len(ops); i++ {
<-finished
}
return exits
return exituses
}
}
func (cp *compiler) form(n *parse.Form) exitusChOp {
func (cp *compiler) form(n *parse.Form) exitusOp {
headStr, ok := oneString(n.Head)
if ok {
compileForm, ok := builtinSpecials[headStr]
if ok {
// special form
op := compileForm(cp, n)
return func(ec *evalCtx) <-chan exitus {
return func(ec *evalCtx) exitus {
return ec.execSpecial(op)
}
} else {
@ -131,7 +138,7 @@ func (cp *compiler) form(n *parse.Form) exitusChOp {
p := n.Begin()
// ec here is always a subevaler created in compiler.pipeline, so it can
// be safely modified.
return func(ec *evalCtx) <-chan exitus {
return func(ec *evalCtx) exitus {
// head
headValues := headOp(ec)
headMust := ec.must(headValues, "the head of command", p)

View File

@ -126,7 +126,7 @@ const (
// newTopEvalCtx creates a top-level evalCtx.
func newTopEvalCtx(ev *Evaler, name, text string) (*evalCtx, chan bool) {
ch := make(chan Value, outChanSize)
done := make(chan bool)
done := make(chan bool, 1)
go func() {
for v := range ch {
fmt.Printf("%s%s\n", outChanLeader, v.Repr())

View File

@ -14,21 +14,24 @@ const (
fdNil uintptr = ^uintptr(0)
)
var (
arityMismatch = newFailure("arity mismatch")
cdNoArg = newFailure("implicit cd accepts no arguments")
)
// execSpecial executes a builtin special form.
//
// NOTE(xiaq): execSpecial and execNonSpecial are always called on an
// intermediate "form redir" where only the form-local ports are marked
// shouldClose. ec.closePorts should be called at appropriate moments.
func (ec *evalCtx) execSpecial(op exitusOp) <-chan exitus {
update := make(chan exitus)
go func() {
ex := op(ec)
// Ports are closed after executaion of builtin is complete.
ec.closePorts()
update <- ex
close(update)
}()
return update
func (ec *evalCtx) execSpecial(op exitusOp) exitus {
defer ec.closePorts()
return op(ec)
}
// execNonSpecial executes a form that is not a special form.
func (ec *evalCtx) execNonSpecial(cmd Value, args []Value) exitus {
return ec.resolveNonSpecial(cmd).Exec(ec, args)
}
func (ec *evalCtx) resolveNonSpecial(cmd Value) callable {
@ -51,38 +54,20 @@ func (ec *evalCtx) resolveNonSpecial(cmd Value) callable {
return externalCmd{cmdStr}
}
// execNonSpecial executes a form that is not a special form.
func (ec *evalCtx) execNonSpecial(cmd Value, args []Value) <-chan exitus {
return ec.resolveNonSpecial(cmd).Exec(ec, args)
}
// Exec executes a builtin function.
func (b *builtinFn) Exec(ec *evalCtx, args []Value) <-chan exitus {
update := make(chan exitus)
go func() {
ex := b.Impl(ec, args)
// Ports are closed after executaion of builtin is complete.
ec.closePorts()
update <- ex
close(update)
}()
return update
func (b *builtinFn) Exec(ec *evalCtx, args []Value) exitus {
defer ec.closePorts()
return b.Impl(ec, args)
}
var (
arityMismatch = newFailure("arity mismatch")
)
// Exec executes a closure.
func (c *closure) Exec(ec *evalCtx, args []Value) <-chan exitus {
update := make(chan exitus, 1)
func (c *closure) Exec(ec *evalCtx, args []Value) exitus {
defer ec.closePorts()
// TODO Support optional/rest argument
if len(args) != len(c.ArgNames) {
// TODO Check arity before exec'ing
update <- arityMismatch
close(update)
return update
return arityMismatch
}
// This evalCtx is dedicated to the current form, so we modify it in place.
@ -103,35 +88,31 @@ func (c *closure) Exec(ec *evalCtx, args []Value) <-chan exitus {
// TODO(xiaq): Also change ec.name and ec.text since the closure being
// called can come from another source.
go func() {
vs, err := ec.eval(c.Op)
if err != nil {
fmt.Print(err.(*errutil.ContextualError).Pprint())
vs, err := ec.eval(c.Op)
if err != nil {
fmt.Print(err.(*errutil.ContextualError).Pprint())
// XXX should return failure
}
// Ports are closed after executaion of closure is complete.
if HasFailure(vs) {
var flow exitusSort
es := make([]exitus, len(vs))
// NOTE(xiaq): If there is a flow exitus, the last one is
// re-returned. Maybe we could use a more elegant semantics.
for i, v := range vs {
es[i] = v.(exitus)
if es[i].Sort >= FlowSortLower {
flow = es[i].Sort
}
}
// Ports are closed after executaion of closure is complete.
ec.closePorts()
if HasFailure(vs) {
var flow exitusSort
es := make([]exitus, len(vs))
// NOTE(xiaq): If there is a flow exitus, the last one is
// re-returned. Maybe we could use a more elegant semantics.
for i, v := range vs {
es[i] = v.(exitus)
if es[i].Sort >= FlowSortLower {
flow = es[i].Sort
}
}
if flow != 0 {
update <- newFlowExitus(flow)
} else {
update <- newTraceback(es)
}
if flow != 0 {
return newFlowExitus(flow)
} else {
update <- ok
return newTraceback(es)
}
close(update)
}()
return update
} else {
return ok
}
}
// waitStatusToExitus converts syscall.WaitStatus to an exitus.
@ -165,38 +146,15 @@ func waitStatusToExitus(ws syscall.WaitStatus) exitus {
}
}
// waitStateUpdate wait(2)s for pid, and feeds the WaitStatus into an
// exitus channel after proper conversion.
func waitStateUpdate(pid int, update chan<- exitus) {
var ws syscall.WaitStatus
_, err := syscall.Wait4(pid, &ws, 0, nil)
if err != nil {
update <- newFailure(fmt.Sprintf("wait:", err.Error()))
} else {
update <- waitStatusToExitus(ws)
}
close(update)
}
var (
cdNoArg = newFailure("implicit cd accepts no arguments")
)
// Exec executes an external command.
func (e externalCmd) Exec(ec *evalCtx, argVals []Value) <-chan exitus {
update := make(chan exitus, 1)
func (e externalCmd) Exec(ec *evalCtx, argVals []Value) exitus {
defer ec.closePorts()
if DontSearch(e.Name) {
stat, err := os.Stat(e.Name)
if err == nil && stat.IsDir() {
// implicit cd
ex := cdInner(e.Name, ec)
ec.closePorts()
update <- ex
close(update)
return update
return cdInner(e.Name, ec)
}
}
@ -220,54 +178,46 @@ func (e externalCmd) Exec(ec *evalCtx, argVals []Value) <-chan exitus {
attr := syscall.ProcAttr{Env: os.Environ(), Files: files[:], Sys: &sys}
path, err := ec.Search(e.Name)
var pid int
if err == nil {
args[0] = path
pid, err = syscall.ForkExec(path, args, &attr)
}
// Ports are closed after fork-exec of external is complete.
ec.closePorts()
if err != nil {
update <- newFailure(err.Error())
close(update)
} else {
go waitStateUpdate(pid, update)
return newFailure("search: " + err.Error())
}
return update
args[0] = path
pid, err := syscall.ForkExec(path, args, &attr)
if err != nil {
return newFailure("forkExec: " + err.Error())
}
var ws syscall.WaitStatus
_, err = syscall.Wait4(pid, &ws, 0, nil)
if err != nil {
return newFailure(fmt.Sprintf("wait:", err.Error()))
} else {
return waitStatusToExitus(ws)
}
}
func (t *list) Exec(ec *evalCtx, argVals []Value) <-chan exitus {
update := make(chan exitus)
go func() {
var v Value = t
for _, idx := range argVals {
// XXX the positions are obviously wrong.
v = evalSubscript(ec, v, idx, 0, 0)
}
ec.ports[1].ch <- v
ec.closePorts()
update <- ok
close(update)
}()
return update
func (t *list) Exec(ec *evalCtx, argVals []Value) exitus {
defer ec.closePorts()
var v Value = t
for _, idx := range argVals {
// XXX the positions are obviously wrong.
v = evalSubscript(ec, v, idx, 0, 0)
}
ec.ports[1].ch <- v
return ok
}
// XXX duplicate
func (t map_) Exec(ec *evalCtx, argVals []Value) <-chan exitus {
update := make(chan exitus)
go func() {
var v Value = t
for _, idx := range argVals {
// XXX the positions are obviously wrong.
v = evalSubscript(ec, v, idx, 0, 0)
}
ec.ports[1].ch <- v
ec.closePorts()
update <- ok
close(update)
}()
return update
func (t map_) Exec(ec *evalCtx, argVals []Value) exitus {
defer ec.closePorts()
var v Value = t
for _, idx := range argVals {
// XXX the positions are obviously wrong.
v = evalSubscript(ec, v, idx, 0, 0)
}
ec.ports[1].ch <- v
return ok
}

View File

@ -282,7 +282,7 @@ type callable interface {
// Exec executes a callable asynchronously on an Evaler. It assumes that
// it is the last callable to be executed on that Evaler and thus
// responsible for cleaning up the ports.
Exec(ec *evalCtx, args []Value) <-chan exitus
Exec(ec *evalCtx, args []Value) exitus
}
// closure is a closure.