diff options
Diffstat (limited to 'src/lib/net/fd.go')
-rw-r--r-- | src/lib/net/fd.go | 114 |
1 files changed, 65 insertions, 49 deletions
diff --git a/src/lib/net/fd.go b/src/lib/net/fd.go index 45269c81a..1ec0d8af9 100644 --- a/src/lib/net/fd.go +++ b/src/lib/net/fd.go @@ -13,21 +13,23 @@ import ( "syscall"; ) -// Network file descriptor. Only intended to be used internally, -// but have to export to make it available in other files implementing package net. -type FD struct { +// Network file descriptor. +type netFD struct { // immutable until Close fd int64; osfd *os.FD; - cr chan *FD; - cw chan *FD; + cr chan *netFD; + cw chan *netFD; + net string; + laddr string; + raddr string; // owned by fd wait server ncr, ncw int; } // Make reads and writes on fd return EAGAIN instead of blocking. -func _SetNonblock(fd int64) *os.Error { +func setNonblock(fd int64) *os.Error { flags, e := syscall.Fcntl(fd, syscall.F_GETFL, 0); if e != 0 { return os.ErrnoToError(e) @@ -40,11 +42,11 @@ func _SetNonblock(fd int64) *os.Error { } -// A _PollServer helps FDs determine when to retry a non-blocking +// A pollServer helps FDs determine when to retry a non-blocking // read or write after they get EAGAIN. When an FD needs to wait, // send the fd on s.cr (for a read) or s.cw (for a write) to pass the // request to the poll server. Then receive on fd.cr/fd.cw. -// When the _PollServer finds that i/o on FD should be possible +// When the pollServer finds that i/o on FD should be possible // again, it will send fd on fd.cr/fd.cw to wake any waiting processes. // This protocol is implemented as s.WaitRead() and s.WaitWrite(). // @@ -54,8 +56,8 @@ func _SetNonblock(fd int64) *os.Error { // To resolve this, the poll server waits not just on the FDs it has // been given but also its own pipe. After sending on the // buffered channel s.cr/s.cw, WaitRead/WaitWrite writes a -// byte to the pipe, causing the _PollServer's poll system call to -// return. In response to the pipe being readable, the _PollServer +// byte to the pipe, causing the pollServer's poll system call to +// return. In response to the pipe being readable, the pollServer // re-polls its request channels. // // Note that the ordering is "send request" and then "wake up server". @@ -65,32 +67,32 @@ func _SetNonblock(fd int64) *os.Error { // to send the request. Because the send must complete before the wakeup, // the request channel must be buffered. A buffer of size 1 is sufficient // for any request load. If many processes are trying to submit requests, -// one will succeed, the _PollServer will read the request, and then the +// one will succeed, the pollServer will read the request, and then the // channel will be empty for the next process's request. A larger buffer // might help batch requests. -type _PollServer struct { - cr, cw chan *FD; // buffered >= 1 +type pollServer struct { + cr, cw chan *netFD; // buffered >= 1 pr, pw *os.FD; - pending map[int64] *FD; + pending map[int64] *netFD; poll *Pollster; // low-level OS hooks } -func (s *_PollServer) Run(); +func (s *pollServer) Run(); -func _NewPollServer() (s *_PollServer, err *os.Error) { - s = new(_PollServer); - s.cr = make(chan *FD, 1); - s.cw = make(chan *FD, 1); +func newPollServer() (s *pollServer, err *os.Error) { + s = new(pollServer); + s.cr = make(chan *netFD, 1); + s.cw = make(chan *netFD, 1); if s.pr, s.pw, err = os.Pipe(); err != nil { return nil, err } - if err = _SetNonblock(s.pr.Fd()); err != nil { + if err = setNonblock(s.pr.Fd()); err != nil { Error: s.pr.Close(); s.pw.Close(); return nil, err } - if err = _SetNonblock(s.pw.Fd()); err != nil { + if err = setNonblock(s.pw.Fd()); err != nil { goto Error } if s.poll, err = NewPollster(); err != nil { @@ -100,14 +102,14 @@ func _NewPollServer() (s *_PollServer, err *os.Error) { s.poll.Close(); goto Error } - s.pending = make(map[int64] *FD); + s.pending = make(map[int64] *netFD); go s.Run(); return s, nil } -func (s *_PollServer) AddFD(fd *FD, mode int) { +func (s *pollServer) AddFD(fd *netFD, mode int) { if err := s.poll.AddFD(fd.fd, mode, false); err != nil { - print("_PollServer AddFD: ", err.String(), "\n"); + print("pollServer AddFD: ", err.String(), "\n"); return } @@ -121,7 +123,7 @@ func (s *_PollServer) AddFD(fd *FD, mode int) { s.pending[key] = fd } -func (s *_PollServer) LookupFD(fd int64, mode int) *FD { +func (s *pollServer) LookupFD(fd int64, mode int) *netFD { key := fd << 1; if mode == 'w' { key++; @@ -134,12 +136,12 @@ func (s *_PollServer) LookupFD(fd int64, mode int) *FD { return netfd } -func (s *_PollServer) Run() { +func (s *pollServer) Run() { var scratch [100]byte; for { fd, mode, err := s.poll.WaitFD(); if err != nil { - print("_PollServer WaitFD: ", err.String(), "\n"); + print("pollServer WaitFD: ", err.String(), "\n"); return } if fd == s.pr.Fd() { @@ -158,7 +160,7 @@ func (s *_PollServer) Run() { } else { netfd := s.LookupFD(fd, mode); if netfd == nil { - print("_PollServer: unexpected wakeup for fd=", netfd, " mode=", string(mode), "\n"); + print("pollServer: unexpected wakeup for fd=", netfd, " mode=", string(mode), "\n"); continue } if mode == 'r' { @@ -176,18 +178,18 @@ func (s *_PollServer) Run() { } } -func (s *_PollServer) Wakeup() { +func (s *pollServer) Wakeup() { var b [1]byte; s.pw.Write(b) } -func (s *_PollServer) WaitRead(fd *FD) { +func (s *pollServer) WaitRead(fd *netFD) { s.cr <- fd; s.Wakeup(); <-fd.cr } -func (s *_PollServer) WaitWrite(fd *FD) { +func (s *pollServer) WaitWrite(fd *netFD) { s.cr <- fd; s.Wakeup(); <-fd.cr @@ -195,34 +197,37 @@ func (s *_PollServer) WaitWrite(fd *FD) { // Network FD methods. -// All the network FDs use a single _PollServer. +// All the network FDs use a single pollServer. -var pollserver *_PollServer +var pollserver *pollServer func _StartServer() { - p, err := _NewPollServer(); + p, err := newPollServer(); if err != nil { - print("Start _PollServer: ", err.String(), "\n") + print("Start pollServer: ", err.String(), "\n") } pollserver = p } -func NewFD(fd int64) (f *FD, err *os.Error) { +func newFD(fd int64, net, laddr, raddr string) (f *netFD, err *os.Error) { if pollserver == nil { once.Do(_StartServer); } - if err = _SetNonblock(fd); err != nil { + if err = setNonblock(fd); err != nil { return nil, err } - f = new(FD); + f = new(netFD); f.fd = fd; - f.osfd = os.NewFD(fd, "socket"); - f.cr = make(chan *FD, 1); - f.cw = make(chan *FD, 1); + f.net = net; + f.laddr = laddr; + f.raddr = raddr; + f.osfd = os.NewFD(fd, "net: " + net + " " + laddr + " " + raddr); + f.cr = make(chan *netFD, 1); + f.cw = make(chan *netFD, 1); return f, nil } -func (fd *FD) Close() *os.Error { +func (fd *netFD) Close() *os.Error { if fd == nil || fd.osfd == nil { return os.EINVAL } @@ -232,7 +237,7 @@ func (fd *FD) Close() *os.Error { return e } -func (fd *FD) Read(p []byte) (n int, err *os.Error) { +func (fd *netFD) Read(p []byte) (n int, err *os.Error) { if fd == nil || fd.osfd == nil { return -1, os.EINVAL } @@ -244,7 +249,7 @@ func (fd *FD) Read(p []byte) (n int, err *os.Error) { return n, err } -func (fd *FD) Write(p []byte) (n int, err *os.Error) { +func (fd *netFD) Write(p []byte) (n int, err *os.Error) { if fd == nil || fd.osfd == nil { return -1, os.EINVAL } @@ -268,19 +273,30 @@ func (fd *FD) Write(p []byte) (n int, err *os.Error) { return nn, err } -func (fd *FD) Accept(sa *syscall.Sockaddr) (nfd *FD, err *os.Error) { +func sockaddrToHostPort(sa *syscall.Sockaddr) (hostport string, err *os.Error) + +func (fd *netFD) Accept(sa *syscall.Sockaddr) (nfd *netFD, err *os.Error) { if fd == nil || fd.osfd == nil { return nil, os.EINVAL } - s, e := syscall.Accept(fd.fd, sa); - for e == syscall.EAGAIN { + + var s, e int64; + for { + s, e = syscall.Accept(fd.fd, sa); + if e != syscall.EAGAIN { + break; + } pollserver.WaitRead(fd); - s, e = syscall.Accept(fd.fd, sa) } if e != 0 { return nil, os.ErrnoToError(e) } - if nfd, err = NewFD(s); err != nil { + + raddr, err1 := sockaddrToHostPort(sa); + if err1 != nil { + raddr = "invalid-address"; + } + if nfd, err = newFD(s, fd.net, fd.laddr, raddr); err != nil { syscall.Close(s); return nil, err } |