Add util.AsyncReader, to supersede util.TimedReader

This commit is contained in:
Cheer Xiao 2014-03-09 15:16:56 +08:00
parent 1e788f8161
commit e1b98bfc83
6 changed files with 199 additions and 1 deletions

View File

@ -1,5 +1,5 @@
EXE := elvish
PKGS := edit eval parse util service elvishd
PKGS := edit eval parse util service elvishd sys
PKG_PATHS := $(addprefix ./,$(PKGS)) # go tools want an explicit ./
PKG_COVERAGES := $(addprefix coverage/,$(PKGS))

25
sys/fcntl.go Normal file
View File

@ -0,0 +1,25 @@
package sys
import (
"syscall"
)
func Fcntl(fd int, cmd int, arg int) (val int, err error) {
r, _, e := syscall.Syscall(syscall.SYS_FCNTL, uintptr(fd), uintptr(cmd),
uintptr(arg))
val = int(r)
if e != 0 {
err = e
}
return
}
func GetNonblock(fd int) (bool, error) {
r, err := Fcntl(fd, syscall.F_GETFL, 0)
return r&syscall.O_NONBLOCK != 0, err
}
func SetNonblock(fd int) error {
_, err := Fcntl(fd, syscall.F_SETFL, syscall.O_NONBLOCK)
return err
}

67
sys/select.go Normal file
View File

@ -0,0 +1,67 @@
package sys
/*
#include <sys/select.h>
void fdclr(int fd, fd_set *set) {
FD_CLR(fd, set);
}
int fdisset(int fd, fd_set *set) {
return FD_ISSET(fd, set);
}
void fdset(int fd, fd_set *set) {
FD_SET(fd, set);
}
void fdzero(fd_set *set) {
FD_ZERO(set);
}
*/
import "C"
import (
"syscall"
"unsafe"
)
type FdSet syscall.FdSet
func (fs *FdSet) c() *C.fd_set {
return (*C.fd_set)(unsafe.Pointer(fs))
}
func (fs *FdSet) s() *syscall.FdSet {
return (*syscall.FdSet)(fs)
}
func NewFdSet(fds ...int) *FdSet {
fs := &FdSet{}
fs.Set(fds...)
return fs
}
func (fs *FdSet) Clear(fds ...int) {
for _, fd := range fds {
C.fdclr(C.int(fd), fs.c())
}
}
func (fs *FdSet) IsSet(fd int) bool {
return C.fdisset(C.int(fd), fs.c()) != 0
}
func (fs *FdSet) Set(fds ...int) {
for _, fd := range fds {
C.fdset(C.int(fd), fs.c())
}
}
func (fs *FdSet) Zero() {
C.fdzero(fs.c())
}
func Select(nfd int, r *FdSet, w *FdSet, e *FdSet, timeout *syscall.Timeval) (n int, err error) {
return syscall.Select(nfd, r.s(), w.s(), e.s(), timeout)
}

2
sys/sys.go Normal file
View File

@ -0,0 +1,2 @@
// Package sys wraps the stdlib syscall package.
package sys

93
util/async-reader.go Normal file
View File

@ -0,0 +1,93 @@
package util
import (
"bufio"
"io"
"os"
"syscall"
"github.com/xiaq/elvish/sys"
)
const (
asyncReaderChanSize int = 128
)
// AsyncReader delivers a Unix fd stream to a channel of runes.
type AsyncReader struct {
rd *os.File
bufrd *bufio.Reader
rQuit, wQuit *os.File
}
func NewAsyncReader(rd *os.File) *AsyncReader {
ar := &AsyncReader{
rd: rd,
bufrd: bufio.NewReaderSize(rd, 0),
}
r, w, err := os.Pipe()
if err != nil {
panic(err)
}
ar.rQuit, ar.wQuit = r, w
return ar
}
func (ar *AsyncReader) run(ch chan<- rune) {
fd := int(ar.rd.Fd())
qfd := int(ar.rQuit.Fd())
maxfd := MaxInt(fd, qfd)
fs := sys.NewFdSet()
var qBuf [1]byte
defer close(ch)
for {
fs.Set(fd, qfd)
_, err := sys.Select(maxfd+1, fs, nil, nil, nil)
if err != nil {
switch err {
case syscall.EINTR:
continue
default:
panic(err)
}
}
if fs.IsSet(qfd) {
// Consume the written byte
ar.rQuit.Read(qBuf[:])
return
} else {
ReadRune:
for {
r, _, err := ar.bufrd.ReadRune()
switch err {
case nil:
ch <- r
case io.EOF:
return
default:
// BUG(xiaq): AsyncReader relies on the undocumented fact
// that (*os.File).Read returns an *os.File.PathError
e := err.(*os.PathError).Err
if e == syscall.EWOULDBLOCK || e == syscall.EAGAIN {
break ReadRune
} else {
panic(err)
}
}
}
}
}
}
func (ar *AsyncReader) Start() <-chan rune {
ch := make(chan rune, asyncReaderChanSize)
go ar.run(ch)
return ch
}
func (ar *AsyncReader) Stop() {
ar.wQuit.Write([]byte("x"))
}

11
util/max.go Normal file
View File

@ -0,0 +1,11 @@
package util
func MaxInt(x0 int, xs ...int) int {
m := x0
for _, x := range xs {
if m < x {
m = x
}
}
return m
}