util/async-reader.go: Fix race condition by only spawning a single .run()

This commit is contained in:
Cheer Xiao 2014-03-10 21:04:25 +08:00
parent 1e4cd9dea0
commit dddfda192f

View File

@ -13,38 +13,51 @@ const (
asyncReaderChanSize int = 128
)
const (
asyncReaderStop byte = 's'
asyncReaderContinue = 'c'
asyncReaderQuit = 'q'
)
// AsyncReader delivers a Unix fd stream to a channel of runes.
type AsyncReader struct {
rd *os.File
bufrd *bufio.Reader
rQuit, wQuit *os.File
rCtrl, wCtrl *os.File
ch chan rune
}
func NewAsyncReader(rd *os.File) *AsyncReader {
ar := &AsyncReader{
rd: rd,
bufrd: bufio.NewReaderSize(rd, 0),
ch: make(chan rune, asyncReaderChanSize),
}
r, w, err := os.Pipe()
if err != nil {
panic(err)
}
ar.rQuit, ar.wQuit = r, w
ar.rCtrl, ar.wCtrl = r, w
go ar.run()
return ar
}
func (ar *AsyncReader) run(ch chan<- rune) {
fd := int(ar.rd.Fd())
qfd := int(ar.rQuit.Fd())
maxfd := MaxInt(fd, qfd)
fs := sys.NewFdSet()
var qBuf [1]byte
func (ar *AsyncReader) Chan() <-chan rune {
return ar.ch
}
defer close(ch)
func (ar *AsyncReader) run() {
fd := int(ar.rd.Fd())
cfd := int(ar.rCtrl.Fd())
maxfd := MaxInt(fd, cfd)
fs := sys.NewFdSet()
var cBuf [1]byte
defer close(ar.ch)
for {
fs.Set(fd, qfd)
fs.Set(fd, cfd)
_, err := sys.Select(maxfd+1, fs, nil, nil, nil)
if err != nil {
switch err {
@ -54,17 +67,31 @@ func (ar *AsyncReader) run(ch chan<- rune) {
panic(err)
}
}
if fs.IsSet(qfd) {
if fs.IsSet(cfd) {
// Consume the written byte
ar.rQuit.Read(qBuf[:])
ar.rCtrl.Read(cBuf[:])
switch cBuf[0] {
case asyncReaderQuit:
return
case asyncReaderStop:
Stop:
for {
ar.rCtrl.Read(cBuf[:])
switch cBuf[0] {
case asyncReaderQuit:
return
case asyncReaderContinue:
break Stop
}
}
}
} else {
ReadRune:
for {
r, _, err := ar.bufrd.ReadRune()
switch err {
case nil:
ch <- r
ar.ch <- r
case io.EOF:
return
default:
@ -82,15 +109,21 @@ func (ar *AsyncReader) run(ch chan<- rune) {
}
}
func (ar *AsyncReader) Start() <-chan rune {
ch := make(chan rune, asyncReaderChanSize)
go ar.run(ch)
return ch
}
func (ar *AsyncReader) Stop() {
_, err := ar.wQuit.Write([]byte("x"))
func (ar *AsyncReader) ctrl(r byte) {
_, err := ar.wCtrl.Write([]byte{r})
if err != nil {
panic(err)
}
}
func (ar *AsyncReader) Stop() {
ar.ctrl(asyncReaderStop)
}
func (ar *AsyncReader) Continue() {
ar.ctrl(asyncReaderContinue)
}
func (ar *AsyncReader) Quit() {
ar.ctrl(asyncReaderQuit)
}