Fix AsyncReader for go 1.9.

With the introduction of the builtin poller, the standard library manages
nonblockingness of fds itself. As a result, a Read on a nonblocking fd can
still block (presumably, since the builtin poller when make fds nonblocking on
its own, it will emulate blocking IO even if the fd itself is nonblocking).

The solution is to simply use the read syscall to read from a nonblocking fd,
thus avoiding the blocking IO emulation behavior of the standard library.
This commit is contained in:
Qi Xiao 2017-09-18 21:31:39 +01:00
parent e7a8b96d7d
commit 817bbe4117

View File

@ -1,8 +1,6 @@
package tty
import (
"bufio"
"io"
"os"
"syscall"
@ -16,7 +14,6 @@ const (
// AsyncReader delivers a Unix fd stream to a channel of runes.
type AsyncReader struct {
rd *os.File
bufrd *bufio.Reader
rCtrl, wCtrl *os.File
ctrlCh chan struct{}
ch chan rune
@ -31,7 +28,6 @@ func NewAsyncReader(rd *os.File) *AsyncReader {
}
return &AsyncReader{
rd,
bufio.NewReaderSize(rd, 0),
rCtrl, wCtrl,
make(chan struct{}),
make(chan rune, asyncReaderChanSize),
@ -66,7 +62,7 @@ func (ar *AsyncReader) Run() {
if err := poller.Init([]uintptr{fd, cfd}, []uintptr{}); err != nil {
// fatal error, unable to initialize poller
// TODO show erorr
ar.waitForQuit(err)
return
}
@ -77,7 +73,7 @@ func (ar *AsyncReader) Run() {
case syscall.EINTR:
continue
default:
ar.errCh <- err
ar.waitForQuit(err)
return
}
}
@ -89,39 +85,56 @@ func (ar *AsyncReader) Run() {
return
}
}
ReadRune:
bytes := make([]byte, 0, 32)
ReadRunes:
for {
r, _, err := ar.bufrd.ReadRune()
switch err {
case nil:
// Logger.Printf("read rune: %q", r)
buf := make([]byte, 32)
nr, err := syscall.Read(int(fd), buf[:])
if err == nil {
bytes = append(bytes, buf[:nr]...)
} else {
if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK {
// All input read, break the loop.
break ReadRunes
}
// Write error to errCh, unless termination is requested.
select {
case ar.ch <- r:
case ar.errCh <- err:
case <-ar.ctrlCh:
ar.rCtrl.Read(cBuf[:])
return
}
case io.EOF:
}
}
// TODO(xiaq): Invalid UTF-8 will result in a bunch of \ufffd, which is
// not helpful for debugging.
for _, r := range string(bytes) {
// Write error to ch, unless termination is requested.
select {
case ar.ch <- r:
case <-ar.ctrlCh:
ar.rCtrl.Read(cBuf[:])
return
default:
// BUG(xiaq): AsyncReader relies on the undocumented fact
// that (*os.File).Read returns an *os.File.PathError
patherr, ok := err.(*os.PathError) //.Err
if ok && patherr.Err == syscall.EWOULDBLOCK || patherr.Err == syscall.EAGAIN {
break ReadRune
} else {
select {
case ar.errCh <- err:
case <-ar.ctrlCh:
ar.rCtrl.Read(cBuf[:])
return
}
}
}
}
}
}
func (ar *AsyncReader) waitForQuit(err error) {
var cBuf [1]byte
select {
case ar.errCh <- err:
case <-ar.ctrlCh:
ar.rCtrl.Read(cBuf[:])
return
}
<-ar.ctrlCh
ar.rCtrl.Read(cBuf[:])
}
// Quit terminates the loop of Run.
func (ar *AsyncReader) Quit() {
_, err := ar.wCtrl.Write([]byte{'q'})