eval: Connect pipes in ExecPipeline; support channel pipes

This commit is contained in:
Cheer Xiao 2013-10-02 22:40:49 +08:00
parent 13049b3f34
commit 376fece703

View File

@ -110,7 +110,7 @@ func evalTermList(ln *parse.ListNode) ([]string, error) {
return ss, nil
}
func evalCommand(n *parse.CommandNode, in, out *io) (cmd *command, files []*os.File, err error) {
func evalCommand(n *parse.CommandNode) (cmd *command, ioTypes [3]ioType, files []*os.File, err error) {
if len(n.Nodes) == 0 {
err = fmt.Errorf("command is emtpy")
return
@ -126,9 +126,7 @@ func evalCommand(n *parse.CommandNode, in, out *io) (cmd *command, files []*os.F
// Save unresolved args[0] as name.
name := args[0]
// IO compatibility list, defaulting to all fileIO.
var ioTypes [3]ioType
// Resolve command name.
bi, isBuiltin := builtins[name]
if isBuiltin {
ioTypes = bi.ioTypes
@ -139,19 +137,11 @@ func evalCommand(n *parse.CommandNode, in, out *io) (cmd *command, files []*os.F
err = fmt.Errorf("can't resolve: %s", e)
return
}
}
if !in.compatible(ioTypes[0]) {
err = fmt.Errorf("Incompatible input pipe")
return
}
if !out.compatible(ioTypes[1]) {
err = fmt.Errorf("Incompatible output pipe")
return
// Use zero value (fileIO) for ioTypes
}
// IO list.
ios := [3]*io{in, out}
ios := [3]*io{}
defaultErrIO := &io{f: os.Stderr}
if defaultErrIO.compatible(ioTypes[2]) {
ios[2] = defaultErrIO
@ -241,33 +231,64 @@ func ExecPipeline(pl *parse.ListNode) (updates []<-chan *StateUpdate, err error)
var nextIn *io
for i, n := range pl.Nodes {
// Create pipes.
var in, out *io
if i == 0 {
in = &io{f: os.Stdin}
} else {
in = nextIn
}
if i == ncmds - 1 {
out = &io{f: os.Stdout}
} else {
// os.Pipe sets O_CLOEXEC, which is what we want.
// XXX Assumes fileIO now
reader, writer, e := os.Pipe()
if e != nil {
return nil, fmt.Errorf("failed to create pipe: %s", e)
}
filesToClose = append(filesToClose, reader, writer)
nextIn = &io{f: reader}
out = &io{f: writer}
}
cmd, files, err := evalCommand(n.(*parse.CommandNode), in, out)
cmd, ioTypes, files, err := evalCommand(n.(*parse.CommandNode))
filesToClose = append(filesToClose, files...)
if err != nil {
return nil, fmt.Errorf("error with command #%d: %s", i, err)
}
// Create and connect pipes.
if i == 0 {
// First command. Only connect input when no input redirection is
// present.
if cmd.ios[0] == nil {
if ioTypes[0] == chanIO {
return nil, fmt.Errorf("channel input from user not yet supported")
}
cmd.ios[0] = &io{f: os.Stdin}
}
} else {
if cmd.ios[0] != nil {
return nil, fmt.Errorf("command #%d has both pipe input and input redirection")
} else if !nextIn.compatible(ioTypes[0]) {
return nil, fmt.Errorf("command #%d has incompatible input pipe")
}
cmd.ios[0] = nextIn
}
if i == ncmds - 1 {
if cmd.ios[1] == nil {
if ioTypes[1] == chanIO {
return nil, fmt.Errorf("channel output to user not yet supported")
}
cmd.ios[1] = &io{f: os.Stdout}
}
} else {
if cmd.ios[1] != nil {
return nil, fmt.Errorf("command #%d has both pipe output and output redirection", i)
}
switch ioTypes[1] {
case unusedIO:
return nil, fmt.Errorf("command #%d has unused output connected in pipeline", i)
case fileIO:
// os.Pipe sets O_CLOEXEC, which is what we want.
reader, writer, e := os.Pipe()
if e != nil {
return nil, fmt.Errorf("failed to create pipe: %s", e)
}
filesToClose = append(filesToClose, reader, writer)
nextIn = &io{f: reader}
cmd.ios[1] = &io{f: writer}
case chanIO:
// TODO Buffered channel?
ch := make(chan string)
chansToClose = append(chansToClose, ch)
nextIn = &io{ch: ch}
cmd.ios[1] = &io{ch: ch}
default:
panic("unreachable")
}
}
cmds = append(cmds, cmd)
}