diff --git a/util/async-reader.go b/util/async-reader.go index 64cadabd..e5a96724 100644 --- a/util/async-reader.go +++ b/util/async-reader.go @@ -13,38 +13,51 @@ const ( asyncReaderChanSize int = 128 ) +const ( + asyncReaderStop byte = 's' + asyncReaderContinue = 'c' + asyncReaderQuit = 'q' +) + // AsyncReader delivers a Unix fd stream to a channel of runes. type AsyncReader struct { rd *os.File bufrd *bufio.Reader - rQuit, wQuit *os.File + rCtrl, wCtrl *os.File + ch chan rune } func NewAsyncReader(rd *os.File) *AsyncReader { ar := &AsyncReader{ rd: rd, bufrd: bufio.NewReaderSize(rd, 0), + ch: make(chan rune, asyncReaderChanSize), } r, w, err := os.Pipe() if err != nil { panic(err) } - ar.rQuit, ar.wQuit = r, w + ar.rCtrl, ar.wCtrl = r, w + go ar.run() return ar } -func (ar *AsyncReader) run(ch chan<- rune) { - fd := int(ar.rd.Fd()) - qfd := int(ar.rQuit.Fd()) - maxfd := MaxInt(fd, qfd) - fs := sys.NewFdSet() - var qBuf [1]byte +func (ar *AsyncReader) Chan() <-chan rune { + return ar.ch +} - defer close(ch) +func (ar *AsyncReader) run() { + fd := int(ar.rd.Fd()) + cfd := int(ar.rCtrl.Fd()) + maxfd := MaxInt(fd, cfd) + fs := sys.NewFdSet() + var cBuf [1]byte + + defer close(ar.ch) for { - fs.Set(fd, qfd) + fs.Set(fd, cfd) _, err := sys.Select(maxfd+1, fs, nil, nil, nil) if err != nil { switch err { @@ -54,17 +67,31 @@ func (ar *AsyncReader) run(ch chan<- rune) { panic(err) } } - if fs.IsSet(qfd) { + if fs.IsSet(cfd) { // Consume the written byte - ar.rQuit.Read(qBuf[:]) - return + ar.rCtrl.Read(cBuf[:]) + switch cBuf[0] { + case asyncReaderQuit: + return + case asyncReaderStop: + Stop: + for { + ar.rCtrl.Read(cBuf[:]) + switch cBuf[0] { + case asyncReaderQuit: + return + case asyncReaderContinue: + break Stop + } + } + } } else { ReadRune: for { r, _, err := ar.bufrd.ReadRune() switch err { case nil: - ch <- r + ar.ch <- r case io.EOF: return default: @@ -82,15 +109,21 @@ func (ar *AsyncReader) run(ch chan<- rune) { } } -func (ar *AsyncReader) Start() <-chan rune { - ch := make(chan rune, asyncReaderChanSize) - go ar.run(ch) - return ch -} - -func (ar *AsyncReader) Stop() { - _, err := ar.wQuit.Write([]byte("x")) +func (ar *AsyncReader) ctrl(r byte) { + _, err := ar.wCtrl.Write([]byte{r}) if err != nil { panic(err) } } + +func (ar *AsyncReader) Stop() { + ar.ctrl(asyncReaderStop) +} + +func (ar *AsyncReader) Continue() { + ar.ctrl(asyncReaderContinue) +} + +func (ar *AsyncReader) Quit() { + ar.ctrl(asyncReaderQuit) +}