Detect and suppress SIGPIPE caused by the next command in a pipeline exiting early.

This addresses #952.
This commit is contained in:
Qi Xiao 2021-05-20 00:10:17 +01:00
parent fd70a5a274
commit 641f0ebf04
6 changed files with 66 additions and 14 deletions

View File

@ -4,6 +4,7 @@ import (
"fmt"
"os"
"sync"
"sync/atomic"
"src.elv.sh/pkg/diag"
"src.elv.sh/pkg/eval/errs"
@ -87,12 +88,13 @@ func (op *pipelineOp) exec(fm *Frame) Exception {
// For each form, create a dedicated evalCtx and run asynchronously
for i, formOp := range op.subops {
hasChanInput := i > 0
newFm := fm.fork("[form op]")
if i > 0 {
inputIsPipe := i > 0
outputIsPipe := i < nforms-1
if inputIsPipe {
newFm.ports[0] = nextIn
}
if i < nforms-1 {
if outputIsPipe {
// Each internal port pair consists of a (byte) pipe pair and a
// channel.
// os.Pipe sets O_CLOEXEC, which is what we want.
@ -101,21 +103,25 @@ func (op *pipelineOp) exec(fm *Frame) Exception {
return fm.errorpf(op, "failed to create pipe: %s", e)
}
ch := make(chan interface{}, pipelineChanBufferSize)
readerGone := new(int32)
newFm.ports[1] = &Port{
File: writer, Chan: ch, closeFile: true, closeChan: true}
File: writer, Chan: ch,
closeFile: true, closeChan: true, readerGone: readerGone}
nextIn = &Port{
File: reader, Chan: ch, closeFile: true, closeChan: false}
File: reader, Chan: ch,
closeFile: true, closeChan: false, readerGone: readerGone}
}
thisOp := formOp
thisExc := &excs[i]
go func() {
exc := thisOp.exec(newFm)
newFm.Close()
if exc != nil {
if exc != nil && !(outputIsPipe && isReaderGone(exc)) {
*thisExc = exc
}
wg.Done()
if hasChanInput {
if inputIsPipe {
input := newFm.ports[0]
atomic.StoreInt32(input.readerGone, 1)
// If the command has channel input, drain it. This
// mitigates the effect of erroneous pipelines like
// "range 100 | cat"; without draining the pipeline will
@ -123,6 +129,7 @@ func (op *pipelineOp) exec(fm *Frame) Exception {
for range newFm.InputChan() {
}
}
wg.Done()
}()
}
@ -146,6 +153,11 @@ func (op *pipelineOp) exec(fm *Frame) Exception {
return fm.errorp(op, MakePipelineError(excs))
}
func isReaderGone(exc Exception) bool {
_, ok := exc.Reason().(errs.ReaderGone)
return ok
}
func (cp *compiler) formOp(n *parse.Form) effectOp {
var tempLValues []lvalue
var assignmentOps []effectOp

View File

@ -11,7 +11,20 @@ import (
"src.elv.sh/pkg/testutil"
)
func TestCompileEffectUnix(t *testing.T) {
func TestPipeline_Unix(t *testing.T) {
Test(t,
// External commands terminated by SIGPIPE due to reader exiting early
// raise ReaderGone, which is then suppressed.
That("yes | nop").DoesNothing(),
That(
"var reached = $false",
"{ yes; reached = $true } | nop",
"put $reached",
).Puts(false),
)
}
func TestCommand_Unix(t *testing.T) {
_, cleanup := testutil.InTestDir()
defer cleanup()

View File

@ -4,8 +4,10 @@ import (
"errors"
"os"
"os/exec"
"sync/atomic"
"syscall"
"src.elv.sh/pkg/eval/errs"
"src.elv.sh/pkg/eval/vals"
"src.elv.sh/pkg/fsutil"
"src.elv.sh/pkg/parse"
@ -102,5 +104,12 @@ func (e externalCmd) Call(fm *Frame, argVals []interface{}, opts map[string]inte
// calling `Wait` twice on a particular process object.
return err
}
ws := state.Sys().(syscall.WaitStatus)
if ws.Signaled() && isSIGPIPE(ws.Signal()) {
readerGone := fm.ports[1].readerGone
if readerGone != nil && atomic.LoadInt32(readerGone) == 1 {
return errs.ReaderGone{}
}
}
return NewExternalCmdExit(e.Name, state.Sys().(syscall.WaitStatus), proc.Pid)
}

View File

@ -0,0 +1,9 @@
// +build !windows,!plan9,!js
package eval
import "syscall"
func isSIGPIPE(s syscall.Signal) bool {
return s == syscall.SIGPIPE
}

View File

@ -0,0 +1,8 @@
package eval
import "syscall"
func isSIGPIPE(s syscall.Signal) bool {
// Windows doesn't have SIGPIPE.
return false
}

View File

@ -13,15 +13,16 @@ import (
// Port conveys data stream. It always consists of a byte band and a channel band.
type Port struct {
File *os.File
Chan chan interface{}
closeFile bool
closeChan bool
File *os.File
Chan chan interface{}
closeFile bool
closeChan bool
readerGone *int32
}
// Returns a copy of the Port with the Close* flags unset.
func (p *Port) fork() *Port {
return &Port{p.File, p.Chan, false, false}
return &Port{p.File, p.Chan, false, false, p.readerGone}
}
// Closes a Port.