Throw exception when writing values to a port without a value component.

This is done by generalizing the mechanism used to detect that the reader of
value output has terminated.
This commit is contained in:
Qi Xiao 2021-06-22 00:33:41 +01:00
parent f4cb00e618
commit ce08c4a7d5
4 changed files with 68 additions and 36 deletions

View File

@ -103,16 +103,18 @@ func (op *pipelineOp) exec(fm *Frame) Exception {
return fm.errorpf(op, "failed to create pipe: %s", e)
}
ch := make(chan interface{}, pipelineChanBufferSize)
sendStop := make(chan struct{})
sendError := new(error)
readerGone := new(int32)
readerGoneCh := make(chan struct{})
newFm.ports[1] = &Port{
File: writer, Chan: ch,
closeFile: true, closeChan: true,
readerGone: readerGone, readerGoneCh: readerGoneCh}
sendStop: sendStop, sendError: sendError, readerGone: readerGone}
nextIn = &Port{
File: reader, Chan: ch,
closeFile: true, closeChan: false,
readerGone: readerGone, readerGoneCh: readerGoneCh}
// Store in input port for ease of retrieval later
sendStop: sendStop, sendError: sendError, readerGone: readerGone}
}
thisOp := formOp
thisExc := &excs[i]
@ -124,7 +126,8 @@ func (op *pipelineOp) exec(fm *Frame) Exception {
}
if inputIsPipe {
input := newFm.ports[0]
close(input.readerGoneCh)
*input.sendError = errs.ReaderGone{}
close(input.sendStop)
atomic.StoreInt32(input.readerGone, 1)
}
wg.Done()
@ -472,21 +475,6 @@ type invalidFD struct{ fd int }
func (err invalidFD) Error() string { return fmt.Sprintf("invalid fd: %d", err.fd) }
// Returns a suitable dummy value for the channel part of the port when
// redirecting from or to a file, so that the read and write attempts fail
// silently (instead of blocking or panicking).
//
// TODO: Instead of letting read and write attempts fail silently, consider
// raising an exception instead.
func chanForFileRedir(mode parse.RedirMode) chan interface{} {
if mode == parse.Read {
// ClosedChan produces no values when reading.
return ClosedChan
}
// BlackholeChan discards all values written to it.
return BlackholeChan
}
func (op *redirOp) exec(fm *Frame) Exception {
var dst int
if op.dstOp == nil {
@ -519,7 +507,9 @@ func (op *redirOp) exec(fm *Frame) Exception {
switch {
case src == -1:
// close
fm.ports[dst] = &Port{}
fm.ports[dst] = &Port{
// Ensure that writing to value output throws an exception
sendStop: closedSendStop, sendError: &ErrNoValueOutput}
case src >= len(fm.ports) || fm.ports[src] == nil:
return fm.errorp(op, invalidFD{src})
default:
@ -537,9 +527,9 @@ func (op *redirOp) exec(fm *Frame) Exception {
if err != nil {
return fm.errorpf(op, "failed to open file %s: %s", vals.Repr(src, vals.NoPretty), err)
}
fm.ports[dst] = &Port{File: f, closeFile: true, Chan: chanForFileRedir(op.mode)}
fm.ports[dst] = fileRedirPort(op.mode, f, true)
case vals.File:
fm.ports[dst] = &Port{File: src, closeFile: false, Chan: chanForFileRedir(op.mode)}
fm.ports[dst] = fileRedirPort(op.mode, src, false)
case vals.Pipe:
var f *os.File
switch op.mode {
@ -550,7 +540,7 @@ func (op *redirOp) exec(fm *Frame) Exception {
default:
return fm.errorpf(op, "can only use < or > with pipes")
}
fm.ports[dst] = &Port{File: f, closeFile: false, Chan: chanForFileRedir(op.mode)}
fm.ports[dst] = fileRedirPort(op.mode, f, false)
default:
return fm.errorp(op.srcOp, errs.BadValue{
What: "redirection source",
@ -559,6 +549,24 @@ func (op *redirOp) exec(fm *Frame) Exception {
return nil
}
// Creates a port that only have a file component, populating the
// channel-related fields with suitable values depending on the redirection
// mode.
func fileRedirPort(mode parse.RedirMode, f *os.File, closeFile bool) *Port {
if mode == parse.Read {
return &Port{
File: f, closeFile: closeFile,
// ClosedChan produces no values when reading.
Chan: ClosedChan,
}
}
return &Port{
File: f, closeFile: closeFile,
// Throws errValueOutputIsClosed when writing.
Chan: nil, sendStop: closedSendStop, sendError: &ErrNoValueOutput,
}
}
// Makes the size of *ports at least n, adding nil's if necessary.
func growPorts(ports *[]*Port, n int) {
if len(*ports) >= n {

View File

@ -218,6 +218,11 @@ func TestCommand_Redir(t *testing.T) {
That("echo abc > bytes", "each $echo~ < bytes").Prints("abc\n"),
That("echo def > bytes", "only-values < bytes | count").Puts(0),
// Writing value output to file throws an exception.
That("put foo >a").Throws(ErrNoValueOutput, "put foo >a"),
// Writing value output to closed port throws an exception too.
That("put foo >&-").Throws(ErrNoValueOutput, "put foo >&-"),
// Invalid redirection destination.
That("echo []> test").Throws(
errs.BadValue{

View File

@ -97,7 +97,8 @@ func (fm *Frame) InputFile() *os.File {
// ValueOutput returns a handle for writing value outputs.
func (fm *Frame) ValueOutput() ValueOutput {
return valueOutput{fm.ports[1].Chan, fm.ports[1].readerGoneCh}
p := fm.ports[1]
return valueOutput{p.Chan, p.sendStop, p.sendError}
}
// ByteOutput returns a handle for writing byte outputs.

View File

@ -2,6 +2,7 @@ package eval
import (
"bufio"
"errors"
"fmt"
"io"
"os"
@ -19,18 +20,34 @@ type Port struct {
closeFile bool
closeChan bool
// The following fields are only populated when the Port is connected to a
// pipe.
//
// When the reading end of File and Chan exits, it closes readerGoneCh,
// stores 1 in readerGone, before closing the reading end of File.
readerGoneCh chan struct{}
readerGone *int32
// The following two fields are populated as an additional control
// mechanism for output ports. When no more value should be send on Chan,
// chanSendError is populated and chanSendStop is closed. This is used for
// both detection of reader termination (see readerGone below) and closed
// ports.
sendStop chan struct{}
sendError *error
// Only populated in output ports writing to another command in a pipeline.
// When the reading end of the pipe exits, it stores 1 in readerGone. This
// is used to check if an external command killed by SIGPIPE is caused by
// the termination of the reader of the pipe.
readerGone *int32
}
// ErrNoValueOutput is thrown when writing to a pipe without a value output
// component.
var ErrNoValueOutput = errors.New("port has no value output")
// A closed channel, suitable as a value for Port.sendStop when there is no
// reader to start with.
var closedSendStop = make(chan struct{})
func init() { close(closedSendStop) }
// Returns a copy of the Port with the Close* flags unset.
func (p *Port) fork() *Port {
return &Port{p.File, p.Chan, false, false, p.readerGoneCh, p.readerGone}
return &Port{p.File, p.Chan, false, false, p.sendStop, p.sendError, p.readerGone}
}
// Closes a Port.
@ -251,16 +268,17 @@ type ValueOutput interface {
}
type valueOutput struct {
data chan<- interface{}
readerGone <-chan struct{}
data chan<- interface{}
sendStop <-chan struct{}
sendError *error
}
func (vo valueOutput) Put(v interface{}) error {
select {
case vo.data <- v:
return nil
case <-vo.readerGone:
return errs.ReaderGone{}
case <-vo.sendStop:
return *vo.sendError
}
}