elvish/pkg/eval/builtin_fn_flow.go

217 lines
4.3 KiB
Go
Raw Normal View History

2017-12-17 13:20:03 +08:00
package eval
import (
2022-01-05 08:12:35 +08:00
"errors"
"math"
2023-05-07 06:22:20 +08:00
"math/big"
2017-12-17 13:20:03 +08:00
"sync"
"sync/atomic"
"golang.org/x/sync/semaphore"
"src.elv.sh/pkg/errutil"
"src.elv.sh/pkg/eval/errs"
"src.elv.sh/pkg/eval/vals"
2017-12-17 13:20:03 +08:00
)
// Flow control.
// TODO(xiaq): Document "multi-error".
func init() {
addBuiltinFns(map[string]any{
"run-parallel": runParallel,
// Exception and control
"fail": fail,
"multi-error": multiErrorFn,
"return": returnFn,
"break": breakFn,
"continue": continueFn,
2022-01-05 08:12:35 +08:00
"defer": deferFn,
// Iterations.
"each": each,
"peach": peach,
})
}
func runParallel(fm *Frame, functions ...Callable) error {
var wg sync.WaitGroup
wg.Add(len(functions))
exceptions := make([]Exception, len(functions))
for i, function := range functions {
go func(fm2 *Frame, function Callable, pexc *Exception) {
err := function.Call(fm2, NoArgs, NoOpts)
if err != nil {
*pexc = err.(Exception)
}
wg.Done()
}(fm.Fork("[run-parallel function]"), function, &exceptions[i])
}
wg.Wait()
return MakePipelineError(exceptions)
}
func each(fm *Frame, f Callable, inputs Inputs) error {
2017-12-17 13:20:03 +08:00
broken := false
var err error
inputs(func(v any) {
2017-12-17 13:20:03 +08:00
if broken {
return
}
newFm := fm.Fork("closure of each")
ex := f.Call(newFm, []any{v}, NoOpts)
newFm.Close()
2017-12-17 13:20:03 +08:00
if ex != nil {
2020-09-05 04:04:58 +08:00
switch Reason(ex) {
2017-12-17 13:20:03 +08:00
case nil, Continue:
// nop
case Break:
broken = true
default:
broken = true
err = ex
2017-12-17 13:20:03 +08:00
}
}
})
return err
2017-12-17 13:20:03 +08:00
}
2023-05-07 06:22:20 +08:00
type peachOpt struct{ NumWorkers vals.Num }
2023-05-07 06:22:20 +08:00
func (o *peachOpt) SetDefaultOptions() { o.NumWorkers = math.Inf(1) }
func peach(fm *Frame, opts peachOpt, f Callable, inputs Inputs) error {
var wg sync.WaitGroup
2021-06-01 05:13:24 +08:00
var broken int32
var errMu sync.Mutex
2017-12-17 13:20:03 +08:00
var err error
2021-06-01 05:13:24 +08:00
var workerSema *semaphore.Weighted
2023-05-07 06:22:20 +08:00
numWorkers, limited, err := parseNumWorkers(opts.NumWorkers)
if err != nil {
return err
}
if limited {
workerSema = semaphore.NewWeighted(int64(numWorkers))
}
2023-05-07 06:22:20 +08:00
ctx := fm.Context()
inputs(func(v any) {
2021-06-01 05:13:24 +08:00
if atomic.LoadInt32(&broken) != 0 {
2017-12-17 13:20:03 +08:00
return
}
2023-05-07 06:22:20 +08:00
if workerSema != nil {
workerSema.Acquire(ctx, 1)
}
wg.Add(1)
2017-12-17 13:20:03 +08:00
go func() {
newFm := fm.Fork("closure of peach")
2021-01-05 12:07:35 +08:00
newFm.ports[0] = DummyInputPort
ex := f.Call(newFm, []any{v}, NoOpts)
newFm.Close()
2017-12-17 13:20:03 +08:00
if ex != nil {
2020-09-05 04:04:58 +08:00
switch Reason(ex) {
2017-12-17 13:20:03 +08:00
case nil, Continue:
// nop
case Break:
2021-06-01 05:13:24 +08:00
atomic.StoreInt32(&broken, 1)
2017-12-17 13:20:03 +08:00
default:
2021-06-01 05:13:24 +08:00
errMu.Lock()
err = errutil.Multi(err, ex)
2021-06-01 05:13:24 +08:00
defer errMu.Unlock()
atomic.StoreInt32(&broken, 1)
2017-12-17 13:20:03 +08:00
}
}
wg.Done()
2023-05-07 06:22:20 +08:00
if workerSema != nil {
workerSema.Release(1)
}
2017-12-17 13:20:03 +08:00
}()
})
wg.Wait()
return err
2017-12-17 13:20:03 +08:00
}
2023-05-07 06:22:20 +08:00
func parseNumWorkers(n vals.Num) (int, bool, error) {
switch n := n.(type) {
case int:
if n >= 1 {
return n, true, nil
}
case *big.Int:
// A limit larger than MaxInt is equivalent to no limit.
return 0, false, nil
case float64:
if math.IsInf(n, 1) {
return 0, false, nil
}
}
return 0, false, errs.BadValue{
What: "peach &num-workers",
Valid: "exact positive integer or +inf",
Actual: vals.ToString(n),
}
}
// FailError is an error returned by the "fail" command.
type FailError struct{ Content any }
// Error returns the string representation of the cause.
func (e FailError) Error() string { return vals.ToString(e.Content) }
// Fields returns a structmap for accessing fields from Elvish.
func (e FailError) Fields() vals.StructMap { return failFields{e} }
type failFields struct{ e FailError }
func (failFields) IsStructMap() {}
func (f failFields) Type() string { return "fail" }
func (f failFields) Content() any { return f.e.Content }
func fail(v any) error {
if e, ok := v.(error); ok {
// MAYBE TODO: if v is an exception, attach a "rethrown" stack trace,
// like Java
return e
}
return FailError{v}
2017-12-17 13:20:03 +08:00
}
func multiErrorFn(excs ...Exception) error {
return PipelineError{excs}
2017-12-17 13:20:03 +08:00
}
func returnFn() error {
return Return
2017-12-17 13:20:03 +08:00
}
func breakFn() error {
return Break
2017-12-17 13:20:03 +08:00
}
func continueFn() error {
return Continue
2017-12-17 13:20:03 +08:00
}
2022-01-05 08:12:35 +08:00
var errDeferNotInClosure = errors.New("defer must be called from within a closure")
func deferFn(fm *Frame, fn Callable) error {
if fm.defers == nil {
return errDeferNotInClosure
}
deferTraceback := fm.traceback
fm.addDefer(func(fm *Frame) Exception {
err := fn.Call(fm, NoArgs, NoOpts)
if exc, ok := err.(Exception); ok {
return exc
}
return &exception{err, deferTraceback}
})
return nil
}