From 376fece7039745068124f1d6f9c9468d854a78db Mon Sep 17 00:00:00 2001 From: Cheer Xiao Date: Wed, 2 Oct 2013 22:40:49 +0800 Subject: [PATCH] eval: Connect pipes in ExecPipeline; support channel pipes --- eval/exec.go | 93 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 57 insertions(+), 36 deletions(-) diff --git a/eval/exec.go b/eval/exec.go index d697031e..29aad808 100644 --- a/eval/exec.go +++ b/eval/exec.go @@ -110,7 +110,7 @@ func evalTermList(ln *parse.ListNode) ([]string, error) { return ss, nil } -func evalCommand(n *parse.CommandNode, in, out *io) (cmd *command, files []*os.File, err error) { +func evalCommand(n *parse.CommandNode) (cmd *command, ioTypes [3]ioType, files []*os.File, err error) { if len(n.Nodes) == 0 { err = fmt.Errorf("command is emtpy") return @@ -126,9 +126,7 @@ func evalCommand(n *parse.CommandNode, in, out *io) (cmd *command, files []*os.F // Save unresolved args[0] as name. name := args[0] - // IO compatibility list, defaulting to all fileIO. - var ioTypes [3]ioType - + // Resolve command name. bi, isBuiltin := builtins[name] if isBuiltin { ioTypes = bi.ioTypes @@ -139,19 +137,11 @@ func evalCommand(n *parse.CommandNode, in, out *io) (cmd *command, files []*os.F err = fmt.Errorf("can't resolve: %s", e) return } - } - - if !in.compatible(ioTypes[0]) { - err = fmt.Errorf("Incompatible input pipe") - return - } - if !out.compatible(ioTypes[1]) { - err = fmt.Errorf("Incompatible output pipe") - return + // Use zero value (fileIO) for ioTypes } // IO list. - ios := [3]*io{in, out} + ios := [3]*io{} defaultErrIO := &io{f: os.Stderr} if defaultErrIO.compatible(ioTypes[2]) { ios[2] = defaultErrIO @@ -241,33 +231,64 @@ func ExecPipeline(pl *parse.ListNode) (updates []<-chan *StateUpdate, err error) var nextIn *io for i, n := range pl.Nodes { - // Create pipes. - var in, out *io - if i == 0 { - in = &io{f: os.Stdin} - } else { - in = nextIn - } - if i == ncmds - 1 { - out = &io{f: os.Stdout} - } else { - // os.Pipe sets O_CLOEXEC, which is what we want. - // XXX Assumes fileIO now - reader, writer, e := os.Pipe() - if e != nil { - return nil, fmt.Errorf("failed to create pipe: %s", e) - } - filesToClose = append(filesToClose, reader, writer) - nextIn = &io{f: reader} - out = &io{f: writer} - } - - cmd, files, err := evalCommand(n.(*parse.CommandNode), in, out) + cmd, ioTypes, files, err := evalCommand(n.(*parse.CommandNode)) filesToClose = append(filesToClose, files...) if err != nil { return nil, fmt.Errorf("error with command #%d: %s", i, err) } + // Create and connect pipes. + if i == 0 { + // First command. Only connect input when no input redirection is + // present. + if cmd.ios[0] == nil { + if ioTypes[0] == chanIO { + return nil, fmt.Errorf("channel input from user not yet supported") + } + cmd.ios[0] = &io{f: os.Stdin} + } + } else { + if cmd.ios[0] != nil { + return nil, fmt.Errorf("command #%d has both pipe input and input redirection") + } else if !nextIn.compatible(ioTypes[0]) { + return nil, fmt.Errorf("command #%d has incompatible input pipe") + } + cmd.ios[0] = nextIn + } + if i == ncmds - 1 { + if cmd.ios[1] == nil { + if ioTypes[1] == chanIO { + return nil, fmt.Errorf("channel output to user not yet supported") + } + cmd.ios[1] = &io{f: os.Stdout} + } + } else { + if cmd.ios[1] != nil { + return nil, fmt.Errorf("command #%d has both pipe output and output redirection", i) + } + switch ioTypes[1] { + case unusedIO: + return nil, fmt.Errorf("command #%d has unused output connected in pipeline", i) + case fileIO: + // os.Pipe sets O_CLOEXEC, which is what we want. + reader, writer, e := os.Pipe() + if e != nil { + return nil, fmt.Errorf("failed to create pipe: %s", e) + } + filesToClose = append(filesToClose, reader, writer) + nextIn = &io{f: reader} + cmd.ios[1] = &io{f: writer} + case chanIO: + // TODO Buffered channel? + ch := make(chan string) + chansToClose = append(chansToClose, ch) + nextIn = &io{ch: ch} + cmd.ios[1] = &io{ch: ch} + default: + panic("unreachable") + } + } + cmds = append(cmds, cmd) }