diff options
author | Russ Cox <rsc@golang.org> | 2008-09-26 14:46:28 -0700 |
---|---|---|
committer | Russ Cox <rsc@golang.org> | 2008-09-26 14:46:28 -0700 |
commit | 53fad1c26901af67373c1dfb322689dcb2440b2d (patch) | |
tree | 649129f2a758df2cf41bf66f3c59a2cfdef92fae | |
parent | 6611baa49ce42a3fa6c52815de98333655012f9f (diff) | |
download | golang-53fad1c26901af67373c1dfb322689dcb2440b2d.tar.gz |
Darwin kqueue/kevent-based network FDs
R=r
OCL=15998
CL=16004
-rw-r--r-- | src/lib/net/fd_darwin.go | 333 | ||||
-rw-r--r-- | test/dialgoogle.go | 19 |
2 files changed, 311 insertions, 41 deletions
diff --git a/src/lib/net/fd_darwin.go b/src/lib/net/fd_darwin.go index 28b24d9ba..23d085ccc 100644 --- a/src/lib/net/fd_darwin.go +++ b/src/lib/net/fd_darwin.go @@ -2,78 +2,333 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Network file descriptors. +// TODO(rsc): All the prints in this file should go to standard error. package net import ( + "net"; + "once"; "os"; "syscall"; - "net" ) -/* BUG 6g has trouble with this. +const Debug = false -export type FD os.FD; +// Network file descriptor. Only intended to be used internally, +// but have to export to make it available in other files implementing package net. +export type FD struct { + fd int64; + cr *chan *FD; + cw *chan *FD; + + // owned by fd wait server + ncr, ncw int; + next *FD; +} + +func WaitRead(fd *FD); +func WaitWrite(fd *FD); +func StartServer(); -export func NewFD(fd int64) (nfd *FD, err *os.Error) { - ofd := os.NewFD(fd) - return ofd, nil +func MakeNonblocking(fd int64) *os.Error { + if Debug { print("MakeNonBlocking ", fd, "\n") } + flags, e := syscall.fcntl(fd, syscall.F_GETFL, 0) + if e != 0 { + return os.ErrnoToError(e) + } + flags, e = syscall.fcntl(fd, syscall.F_SETFL, flags | syscall.O_NONBLOCK) + if e != 0 { + return os.ErrnoToError(e) + } + return nil +} + +export func NewFD(fd int64) (f *FD, err *os.Error) { + once.Do(&StartServer); + if err = MakeNonblocking(fd); err != nil { + return nil, err + } + f = new(FD); + f.fd = fd; + f.cr = new(chan *FD); + f.cw = new(chan *FD); + return f, nil } func (fd *FD) Close() *os.Error { - var ofd *os.FD = fd - return ofd.Close() + if fd == nil { + return os.EINVAL + } + r1, e := syscall.close(fd.fd); + if e != 0 { + return os.ErrnoToError(e) + } + return nil } func (fd *FD) Read(p *[]byte) (n int, err *os.Error) { - var ofd *os.FD = fd; - n, err = ofd.Read(p) - return n, err + if fd == nil { + return -1, os.EINVAL + } +L: nn, e := syscall.read(fd.fd, &p[0], int64(len(p))) + switch { + case e == syscall.EAGAIN: + WaitRead(fd) + goto L + case e != 0: + return -1, os.ErrnoToError(e) + } + return int(nn), nil } func (fd *FD) Write(p *[]byte) (n int, err *os.Error) { - var ofd *os.FD = fd; - n, err = ofd.Write(p) - return n, err + if fd == nil { + return -1, os.EINVAL + } + total := len(p) + for len(p) > 0 { + L: nn, e := syscall.write(fd.fd, &p[0], int64(len(p))) + switch { + case e == syscall.EAGAIN: + WaitWrite(fd) + goto L + case e != 0: + return total - len(p), os.ErrnoToError(e) + } + p = p[nn:len(p)] + } + return total, nil } -*/ +func (fd *FD) Accept(sa *syscall.Sockaddr) (nfd *FD, err *os.Error) { + if fd == nil { + return nil, os.EINVAL + } +L: s, e := syscall.accept(fd.fd, sa) + switch { + case e == syscall.EAGAIN: + WaitRead(fd) + goto L + case e != 0: + return nil, os.ErrnoToError(e) + } + nfd, err = NewFD(s) + if err != nil { + syscall.close(s) + return nil, err + } + return nfd, nil +} -// TODO: Replace with kqueue/kevent. -export type FD struct { - fd int64; - osfd *os.FD; +// Waiting for FDs via kqueue(2). +type Kstate struct { + cr *chan *FD; + cw *chan *FD; + pr *os.FD; + pw *os.FD; + pend *map[int64] *FD; + kq int64; } -export func NewFD(fd int64) (nfd *FD, err *os.Error) { - nfd = new(FD); - nfd.osfd = os.NewFD(fd); - nfd.fd = fd - return nfd, nil +var kstate Kstate; + +func KqueueAdd(fd int64, mode byte, repeat bool) *os.Error { + if Debug { print("Kqueue add ", fd, " ", mode, " ", repeat, "\n") } + var kmode int16; + if mode == 'r' { + kmode = syscall.EVFILT_READ + } else { + kmode = syscall.EVFILT_WRITE + } + + var events [1]syscall.Kevent; + ev := &events[0]; + ev.ident = fd; + ev.filter = kmode; + + // EV_ADD - add event to kqueue list + // EV_RECEIPT - generate fake EV_ERROR as result of add + // EV_ONESHOT - delete the event the first time it triggers + ev.flags = syscall.EV_ADD | syscall.EV_RECEIPT + if !repeat { + ev.flags |= syscall.EV_ONESHOT + } + + n, e := syscall.kevent(kstate.kq, &events, &events, nil); + if e != 0 { + return os.ErrnoToError(e) + } + if n != 1 || (ev.flags & syscall.EV_ERROR) == 0 || ev.ident != fd || ev.filter != kmode { + return os.NewError("kqueue phase error") + } + if ev.data != 0 { + return os.ErrnoToError(ev.data) + } + return nil } -func (fd *FD) Close() *os.Error { - return fd.osfd.Close() +func KqueueAddFD(fd *FD, mode byte) *os.Error { + if e := KqueueAdd(fd.fd, 'r', false); e != nil { + return e + } + id := fd.fd << 1 + if mode == 'r' { + fd.ncr++ + } else { + id++ + fd.ncw++ + } + kstate.pend[id] = fd + return nil } -func (fd *FD) Read(p *[]byte) (n int, err *os.Error) { - n, err = fd.osfd.Read(p) - return n, err +func KqueueGet(events *[]syscall.Kevent) (n int, err *os.Error) { + var nn, e int64; + if nn, e = syscall.kevent(kstate.kq, nil, events, nil); e != 0 { + return -1, os.ErrnoToError(e) + } + return int(nn), nil } -func (fd *FD) Write(p *[]byte) (n int, err *os.Error) { - n, err = fd.osfd.Write(p) - return n, err +func KqueueLookup(ev *syscall.Kevent) (fd *FD, mode byte) { + id := ev.ident << 1 + if ev.filter == syscall.EVFILT_READ { + mode = 'r' + } else { + id++ + mode = 'w' + } + var ok bool + if fd, ok = kstate.pend[id]; !ok { + return nil, 0 + } + kstate.pend[id] = nil, false + return fd, mode } -func (fd *FD) Accept(sa *syscall.Sockaddr) (nfd *FD, err *os.Error) { - s, e := syscall.accept(fd.fd, sa); - if e != 0 { - return nil, os.ErrnoToError(e) +func Serve() { + var r, e int64; + k := &kstate; + + if Debug { print("Kqueue server running\n") } + var events [10]syscall.Kevent; + var scratch [100]byte; + for { + var n int + var err *os.Error; + if n, err = KqueueGet(&events); err != nil { + print("kqueue get: ", err.String(), "\n") + return + } + if Debug { print("Kqueue server get ", n, "\n") } + for i := 0; i < n; i++ { + ev := &events[i] + if ev.ident == k.pr.fd { + if Debug { print("Kqueue server wakeup\n") } + // Drain our wakeup pipe + for { + nn, e := k.pr.Read(&scratch) + if Debug { print("Read ", k.pr.fd, " ", nn, " ", e.String(), "\n") } + if nn <= 0 { + break + } + } + + if Debug { print("Kqueue server drain channels\n") } + // Then read from channels. + for { + fd, ok := <-k.cr + if !ok { + break + } + KqueueAddFD(fd, 'r') + } + for { + fd, ok := <-k.cw + if !ok { + break + } + KqueueAddFD(fd, 'w') + } + if Debug { print("Kqueue server awake\n") } + continue + } + + // Otherwise, wakeup the right FD. + fd, mode := KqueueLookup(ev); + if fd == nil { + print("kqueue: unexpected wakeup for fd=", ev.ident, " filter=", ev.filter, "\n") + continue + } + if mode == 'r' { + if Debug { print("Kqueue server r fd=", fd.fd, " ncr=", fd.ncr, "\n") } + for fd.ncr > 0 { + fd.ncr-- + fd.cr <- fd + } + } else { + if Debug { print("Kqueue server w fd=", fd.fd, " ncw=", fd.ncw, "\n") } + for fd.ncw > 0 { + fd.ncw-- + fd.cw <- fd + } + } + } } - nfd, err = NewFD(s) - return nfd, err } +func StartServer() { + k := &kstate; + + k.cr = new(chan *FD, 1); + k.cw = new(chan *FD, 1); + k.pend = new(map[int64] *FD) + + var err *os.Error + if k.pr, k.pw, err = os.Pipe(); err != nil { + print("kqueue pipe: ", err.String(), "\n") + return + } + + if err := MakeNonblocking(k.pr.fd); err != nil { + print("make nonblocking pr: ", err.String(), "\n") + return + } + if err := MakeNonblocking(k.pw.fd); err != nil { + print("make nonblocking pw: ", err.String(), "\n") + return + } + + var e int64 + if k.kq, e = syscall.kqueue(); e != 0 { + err := os.ErrnoToError(e); + print("kqueue: ", err.String(), "\n") + return + } + + if err := KqueueAdd(k.pr.fd, 'r', true); err != nil { + print("kqueue add pipe: ", err.String(), "\n") + return + } + + go Serve() +} + +func WakeupServer() { + var b [1]byte; + kstate.pw.Write(&b); +} + +func WaitRead(fd *FD) { + kstate.cr <- fd; + WakeupServer(); + <-fd.cr +} + +func WaitWrite(fd *FD) { + kstate.cw <- fd; + WakeupServer(); + <-fd.cw +} diff --git a/test/dialgoogle.go b/test/dialgoogle.go index 56ef2dea0..58dc0af53 100644 --- a/test/dialgoogle.go +++ b/test/dialgoogle.go @@ -2,13 +2,14 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// $G $F.go && $L $F.$A && ./$A.out +// $G $F.go && $L $F.$A && GOMAXPROCS=2 ./$A.out package main import ( "net"; "flag"; + "io"; "os"; "syscall" ) @@ -27,6 +28,20 @@ func StringToBuf(s string) *[]byte return b; } +func Readn(fd io.Read, buf *[]byte) (n int, err *os.Error) { + n = 0; + for n < len(buf) { + nn, e := fd.Read(buf[n:len(buf)]); + if nn > 0 { + n += nn + } + if e != nil { + return n, e + } + } + return n, nil +} + // fd is already connected to www.google.com port 80. // Run an HTTP request to fetch the main page. @@ -35,7 +50,7 @@ func FetchGoogle(fd net.Conn) { n, errno := fd.Write(req); buf := new([1000]byte); - n, errno = fd.Read(buf); + n, errno = Readn(fd, buf); fd.Close(); if n < 1000 { |