diff options
Diffstat (limited to 'src/pkg/net/fd.go')
-rw-r--r-- | src/pkg/net/fd.go | 530 |
1 files changed, 274 insertions, 256 deletions
diff --git a/src/pkg/net/fd.go b/src/pkg/net/fd.go index 707dccaa4..76c953b9b 100644 --- a/src/pkg/net/fd.go +++ b/src/pkg/net/fd.go @@ -2,9 +2,12 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. +// +build darwin freebsd linux netbsd openbsd + package net import ( + "errors" "io" "os" "sync" @@ -15,39 +18,35 @@ import ( // Network file descriptor. type netFD struct { // locking/lifetime of sysfd - sysmu sync.Mutex - sysref int + sysmu sync.Mutex + sysref int + + // must lock both sysmu and pollserver to write + // can lock either to read closing bool // immutable until Close - sysfd int - family int - proto int - sysfile *os.File - cr chan bool - cw chan bool - net string - laddr Addr - raddr Addr + sysfd int + family int + sotype int + isConnected bool + sysfile *os.File + cr chan error + cw chan error + net string + laddr Addr + raddr Addr // owned by client - rdeadline_delta int64 - rdeadline int64 - rio sync.Mutex - wdeadline_delta int64 - wdeadline int64 - wio sync.Mutex + rdeadline int64 + rio sync.Mutex + wdeadline int64 + wio sync.Mutex // owned by fd wait server ncr, ncw int } -type InvalidConnError struct{} - -func (e *InvalidConnError) String() string { return "invalid net.Conn" } -func (e *InvalidConnError) Temporary() bool { return false } -func (e *InvalidConnError) Timeout() bool { return false } - // A pollServer helps FDs determine when to retry a non-blocking // read or write after they get EAGAIN. When an FD needs to wait, // send the fd on s.cr (for a read) or s.cw (for a write) to pass the @@ -91,20 +90,15 @@ type pollServer struct { deadline int64 // next deadline (nsec since 1970) } -func (s *pollServer) AddFD(fd *netFD, mode int) { +func (s *pollServer) AddFD(fd *netFD, mode int) error { + s.Lock() intfd := fd.sysfd - if intfd < 0 { + if intfd < 0 || fd.closing { // fd closed underfoot - if mode == 'r' { - fd.cr <- true - } else { - fd.cw <- true - } - return + s.Unlock() + return errClosing } - s.Lock() - var t int64 key := intfd << 1 if mode == 'r' { @@ -124,17 +118,33 @@ func (s *pollServer) AddFD(fd *netFD, mode int) { wake, err := s.poll.AddFD(intfd, mode, false) if err != nil { - panic("pollServer AddFD " + err.String()) + panic("pollServer AddFD " + err.Error()) } if wake { doWakeup = true } - s.Unlock() if doWakeup { s.Wakeup() } + return nil +} + +// Evict evicts fd from the pending list, unblocking +// any I/O running on fd. The caller must have locked +// pollserver. +func (s *pollServer) Evict(fd *netFD) { + if s.pending[fd.sysfd<<1] == fd { + s.WakeFD(fd, 'r', errClosing) + s.poll.DelFD(fd.sysfd, 'r') + delete(s.pending, fd.sysfd<<1) + } + if s.pending[fd.sysfd<<1|1] == fd { + s.WakeFD(fd, 'w', errClosing) + s.poll.DelFD(fd.sysfd, 'w') + delete(s.pending, fd.sysfd<<1|1) + } } var wakeupbuf [1]byte @@ -150,26 +160,26 @@ func (s *pollServer) LookupFD(fd int, mode int) *netFD { if !ok { return nil } - s.pending[key] = nil, false + delete(s.pending, key) return netfd } -func (s *pollServer) WakeFD(fd *netFD, mode int) { +func (s *pollServer) WakeFD(fd *netFD, mode int, err error) { if mode == 'r' { for fd.ncr > 0 { fd.ncr-- - fd.cr <- true + fd.cr <- err } } else { for fd.ncw > 0 { fd.ncw-- - fd.cw <- true + fd.cw <- err } } } func (s *pollServer) Now() int64 { - return time.Nanoseconds() + return time.Now().UnixNano() } func (s *pollServer) CheckDeadlines() { @@ -193,7 +203,7 @@ func (s *pollServer) CheckDeadlines() { } if t > 0 { if t <= now { - s.pending[key] = nil, false + delete(s.pending, key) if mode == 'r' { s.poll.DelFD(fd.sysfd, mode) fd.rdeadline = -1 @@ -201,7 +211,7 @@ func (s *pollServer) CheckDeadlines() { s.poll.DelFD(fd.sysfd, mode) fd.wdeadline = -1 } - s.WakeFD(fd, mode) + s.WakeFD(fd, mode, nil) } else if next_deadline == 0 || t < next_deadline { next_deadline = t } @@ -225,7 +235,7 @@ func (s *pollServer) Run() { } fd, mode, err := s.poll.WaitFD(s, t) if err != nil { - print("pollServer WaitFD: ", err.String(), "\n") + print("pollServer WaitFD: ", err.Error(), "\n") return } if fd < 0 { @@ -233,7 +243,7 @@ func (s *pollServer) Run() { s.CheckDeadlines() continue } - if fd == s.pr.Fd() { + if fd == int(s.pr.Fd()) { // Drain our wakeup pipe (we could loop here, // but it's unlikely that there are more than // len(scratch) wakeup calls). @@ -242,22 +252,30 @@ func (s *pollServer) Run() { } else { netfd := s.LookupFD(fd, mode) if netfd == nil { - print("pollServer: unexpected wakeup for fd=", fd, " mode=", string(mode), "\n") + // This can happen because the WaitFD runs without + // holding s's lock, so there might be a pending wakeup + // for an fd that has been evicted. No harm done. continue } - s.WakeFD(netfd, mode) + s.WakeFD(netfd, mode, nil) } } } -func (s *pollServer) WaitRead(fd *netFD) { - s.AddFD(fd, 'r') - <-fd.cr +func (s *pollServer) WaitRead(fd *netFD) error { + err := s.AddFD(fd, 'r') + if err == nil { + err = <-fd.cr + } + return err } -func (s *pollServer) WaitWrite(fd *netFD) { - s.AddFD(fd, 'w') - <-fd.cw +func (s *pollServer) WaitWrite(fd *netFD) error { + err := s.AddFD(fd, 'w') + if err == nil { + err = <-fd.cw + } + return err } // Network FD methods. @@ -269,25 +287,25 @@ var onceStartServer sync.Once func startServer() { p, err := newPollServer() if err != nil { - print("Start pollServer: ", err.String(), "\n") + print("Start pollServer: ", err.Error(), "\n") } pollserver = p } -func newFD(fd, family, proto int, net string) (f *netFD, err os.Error) { +func newFD(fd, family, sotype int, net string) (*netFD, error) { onceStartServer.Do(startServer) - if e := syscall.SetNonblock(fd, true); e != 0 { - return nil, os.Errno(e) + if err := syscall.SetNonblock(fd, true); err != nil { + return nil, err } - f = &netFD{ + netfd := &netFD{ sysfd: fd, family: family, - proto: proto, + sotype: sotype, net: net, } - f.cr = make(chan bool, 1) - f.cw = make(chan bool, 1) - return f, nil + netfd.cr = make(chan error, 1) + netfd.cw = make(chan error, 1) + return netfd, nil } func (fd *netFD) setAddr(laddr, raddr Addr) { @@ -300,43 +318,58 @@ func (fd *netFD) setAddr(laddr, raddr Addr) { if raddr != nil { rs = raddr.String() } - fd.sysfile = os.NewFile(fd.sysfd, fd.net+":"+ls+"->"+rs) + fd.sysfile = os.NewFile(uintptr(fd.sysfd), fd.net+":"+ls+"->"+rs) } -func (fd *netFD) connect(ra syscall.Sockaddr) (err os.Error) { - e := syscall.Connect(fd.sysfd, ra) - if e == syscall.EINPROGRESS { - var errno int - pollserver.WaitWrite(fd) - e, errno = syscall.GetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_ERROR) - if errno != 0 { - return os.NewSyscallError("getsockopt", errno) +func (fd *netFD) connect(ra syscall.Sockaddr) error { + err := syscall.Connect(fd.sysfd, ra) + if err == syscall.EINPROGRESS { + if err = pollserver.WaitWrite(fd); err != nil { + return err + } + var e int + e, err = syscall.GetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_ERROR) + if err != nil { + return os.NewSyscallError("getsockopt", err) + } + if e != 0 { + err = syscall.Errno(e) } } - if e != 0 { - return os.Errno(e) - } - return nil + return err } +var errClosing = errors.New("use of closed network connection") + // Add a reference to this fd. -func (fd *netFD) incref() { +// If closing==true, pollserver must be locked; mark the fd as closing. +// Returns an error if the fd cannot be used. +func (fd *netFD) incref(closing bool) error { + if fd == nil { + return errClosing + } fd.sysmu.Lock() + if fd.closing { + fd.sysmu.Unlock() + return errClosing + } fd.sysref++ + if closing { + fd.closing = true + } fd.sysmu.Unlock() + return nil } // Remove a reference to this FD and close if we've been asked to do so (and // there are no references left. func (fd *netFD) decref() { + if fd == nil { + return + } fd.sysmu.Lock() fd.sysref-- - if fd.closing && fd.sysref == 0 && fd.sysfd >= 0 { - // In case the user has set linger, switch to blocking mode so - // the close blocks. As long as this doesn't happen often, we - // can handle the extra OS processes. Otherwise we'll need to - // use the pollserver for Close too. Sigh. - syscall.SetNonblock(fd.sysfd, false) + if fd.closing && fd.sysref == 0 && fd.sysfile != nil { fd.sysfile.Close() fd.sysfile = nil fd.sysfd = -1 @@ -344,302 +377,287 @@ func (fd *netFD) decref() { fd.sysmu.Unlock() } -func (fd *netFD) Close() os.Error { - if fd == nil || fd.sysfile == nil { - return os.EINVAL - } - - fd.incref() - syscall.Shutdown(fd.sysfd, syscall.SHUT_RDWR) - fd.closing = true +func (fd *netFD) Close() error { + pollserver.Lock() // needed for both fd.incref(true) and pollserver.Evict + defer pollserver.Unlock() + if err := fd.incref(true); err != nil { + return err + } + // Unblock any I/O. Once it all unblocks and returns, + // so that it cannot be referring to fd.sysfd anymore, + // the final decref will close fd.sysfd. This should happen + // fairly quickly, since all the I/O is non-blocking, and any + // attempts to block in the pollserver will return errClosing. + pollserver.Evict(fd) fd.decref() return nil } -func (fd *netFD) Read(p []byte) (n int, err os.Error) { - if fd == nil { - return 0, os.EINVAL +func (fd *netFD) shutdown(how int) error { + if err := fd.incref(false); err != nil { + return err } - fd.rio.Lock() - defer fd.rio.Unlock() - fd.incref() defer fd.decref() - if fd.sysfile == nil { - return 0, os.EINVAL + err := syscall.Shutdown(fd.sysfd, how) + if err != nil { + return &OpError{"shutdown", fd.net, fd.laddr, err} } - if fd.rdeadline_delta > 0 { - fd.rdeadline = pollserver.Now() + fd.rdeadline_delta - } else { - fd.rdeadline = 0 + return nil +} + +func (fd *netFD) CloseRead() error { + return fd.shutdown(syscall.SHUT_RD) +} + +func (fd *netFD) CloseWrite() error { + return fd.shutdown(syscall.SHUT_WR) +} + +func (fd *netFD) Read(p []byte) (n int, err error) { + fd.rio.Lock() + defer fd.rio.Unlock() + if err := fd.incref(false); err != nil { + return 0, err } - var oserr os.Error + defer fd.decref() for { - var errno int - n, errno = syscall.Read(fd.sysfile.Fd(), p) - if errno == syscall.EAGAIN && fd.rdeadline >= 0 { - pollserver.WaitRead(fd) - continue + n, err = syscall.Read(int(fd.sysfd), p) + if err == syscall.EAGAIN { + err = errTimeout + if fd.rdeadline >= 0 { + if err = pollserver.WaitRead(fd); err == nil { + continue + } + } } - if errno != 0 { + if err != nil { n = 0 - oserr = os.Errno(errno) - } else if n == 0 && errno == 0 && fd.proto != syscall.SOCK_DGRAM { - err = os.EOF + } else if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM { + err = io.EOF } break } - if oserr != nil { - err = &OpError{"read", fd.net, fd.raddr, oserr} + if err != nil && err != io.EOF { + err = &OpError{"read", fd.net, fd.raddr, err} } return } -func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err os.Error) { - if fd == nil || fd.sysfile == nil { - return 0, nil, os.EINVAL - } +func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) { fd.rio.Lock() defer fd.rio.Unlock() - fd.incref() - defer fd.decref() - if fd.rdeadline_delta > 0 { - fd.rdeadline = pollserver.Now() + fd.rdeadline_delta - } else { - fd.rdeadline = 0 + if err := fd.incref(false); err != nil { + return 0, nil, err } - var oserr os.Error + defer fd.decref() for { - var errno int - n, sa, errno = syscall.Recvfrom(fd.sysfd, p, 0) - if errno == syscall.EAGAIN && fd.rdeadline >= 0 { - pollserver.WaitRead(fd) - continue + n, sa, err = syscall.Recvfrom(fd.sysfd, p, 0) + if err == syscall.EAGAIN { + err = errTimeout + if fd.rdeadline >= 0 { + if err = pollserver.WaitRead(fd); err == nil { + continue + } + } } - if errno != 0 { + if err != nil { n = 0 - oserr = os.Errno(errno) } break } - if oserr != nil { - err = &OpError{"read", fd.net, fd.laddr, oserr} + if err != nil && err != io.EOF { + err = &OpError{"read", fd.net, fd.laddr, err} } return } -func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err os.Error) { - if fd == nil || fd.sysfile == nil { - return 0, 0, 0, nil, os.EINVAL - } +func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err error) { fd.rio.Lock() defer fd.rio.Unlock() - fd.incref() - defer fd.decref() - if fd.rdeadline_delta > 0 { - fd.rdeadline = pollserver.Now() + fd.rdeadline_delta - } else { - fd.rdeadline = 0 + if err := fd.incref(false); err != nil { + return 0, 0, 0, nil, err } - var oserr os.Error + defer fd.decref() for { - var errno int - n, oobn, flags, sa, errno = syscall.Recvmsg(fd.sysfd, p, oob, 0) - if errno == syscall.EAGAIN && fd.rdeadline >= 0 { - pollserver.WaitRead(fd) - continue - } - if errno != 0 { - oserr = os.Errno(errno) + n, oobn, flags, sa, err = syscall.Recvmsg(fd.sysfd, p, oob, 0) + if err == syscall.EAGAIN { + err = errTimeout + if fd.rdeadline >= 0 { + if err = pollserver.WaitRead(fd); err == nil { + continue + } + } } - if n == 0 { - oserr = os.EOF + if err == nil && n == 0 { + err = io.EOF } break } - if oserr != nil { - err = &OpError{"read", fd.net, fd.laddr, oserr} + if err != nil && err != io.EOF { + err = &OpError{"read", fd.net, fd.laddr, err} return } return } -func (fd *netFD) Write(p []byte) (n int, err os.Error) { - if fd == nil { - return 0, os.EINVAL - } +func (fd *netFD) Write(p []byte) (int, error) { fd.wio.Lock() defer fd.wio.Unlock() - fd.incref() + if err := fd.incref(false); err != nil { + return 0, err + } defer fd.decref() if fd.sysfile == nil { - return 0, os.EINVAL + return 0, syscall.EINVAL } - if fd.wdeadline_delta > 0 { - fd.wdeadline = pollserver.Now() + fd.wdeadline_delta - } else { - fd.wdeadline = 0 - } - nn := 0 - var oserr os.Error + var err error + nn := 0 for { - n, errno := syscall.Write(fd.sysfile.Fd(), p[nn:]) + var n int + n, err = syscall.Write(int(fd.sysfd), p[nn:]) if n > 0 { nn += n } if nn == len(p) { break } - if errno == syscall.EAGAIN && fd.wdeadline >= 0 { - pollserver.WaitWrite(fd) - continue + if err == syscall.EAGAIN { + err = errTimeout + if fd.wdeadline >= 0 { + if err = pollserver.WaitWrite(fd); err == nil { + continue + } + } } - if errno != 0 { + if err != nil { n = 0 - oserr = os.Errno(errno) break } if n == 0 { - oserr = io.ErrUnexpectedEOF + err = io.ErrUnexpectedEOF break } } - if oserr != nil { - err = &OpError{"write", fd.net, fd.raddr, oserr} + if err != nil { + err = &OpError{"write", fd.net, fd.raddr, err} } return nn, err } -func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err os.Error) { - if fd == nil || fd.sysfile == nil { - return 0, os.EINVAL - } +func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) { fd.wio.Lock() defer fd.wio.Unlock() - fd.incref() - defer fd.decref() - if fd.wdeadline_delta > 0 { - fd.wdeadline = pollserver.Now() + fd.wdeadline_delta - } else { - fd.wdeadline = 0 + if err := fd.incref(false); err != nil { + return 0, err } - var oserr os.Error + defer fd.decref() for { - errno := syscall.Sendto(fd.sysfd, p, 0, sa) - if errno == syscall.EAGAIN && fd.wdeadline >= 0 { - pollserver.WaitWrite(fd) - continue - } - if errno != 0 { - oserr = os.Errno(errno) + err = syscall.Sendto(fd.sysfd, p, 0, sa) + if err == syscall.EAGAIN { + err = errTimeout + if fd.wdeadline >= 0 { + if err = pollserver.WaitWrite(fd); err == nil { + continue + } + } } break } - if oserr == nil { + if err == nil { n = len(p) } else { - err = &OpError{"write", fd.net, fd.raddr, oserr} + err = &OpError{"write", fd.net, fd.raddr, err} } return } -func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err os.Error) { - if fd == nil || fd.sysfile == nil { - return 0, 0, os.EINVAL - } +func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err error) { fd.wio.Lock() defer fd.wio.Unlock() - fd.incref() - defer fd.decref() - if fd.wdeadline_delta > 0 { - fd.wdeadline = pollserver.Now() + fd.wdeadline_delta - } else { - fd.wdeadline = 0 + if err := fd.incref(false); err != nil { + return 0, 0, err } - var oserr os.Error + defer fd.decref() for { - var errno int - errno = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0) - if errno == syscall.EAGAIN && fd.wdeadline >= 0 { - pollserver.WaitWrite(fd) - continue - } - if errno != 0 { - oserr = os.Errno(errno) + err = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0) + if err == syscall.EAGAIN { + err = errTimeout + if fd.wdeadline >= 0 { + if err = pollserver.WaitWrite(fd); err == nil { + continue + } + } } break } - if oserr == nil { + if err == nil { n = len(p) oobn = len(oob) } else { - err = &OpError{"write", fd.net, fd.raddr, oserr} + err = &OpError{"write", fd.net, fd.raddr, err} } return } -func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (nfd *netFD, err os.Error) { - if fd == nil || fd.sysfile == nil { - return nil, os.EINVAL +func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err error) { + if err := fd.incref(false); err != nil { + return nil, err } - - fd.incref() defer fd.decref() - if fd.rdeadline_delta > 0 { - fd.rdeadline = pollserver.Now() + fd.rdeadline_delta - } else { - fd.rdeadline = 0 - } // See ../syscall/exec.go for description of ForkLock. // It is okay to hold the lock across syscall.Accept // because we have put fd.sysfd into non-blocking mode. - syscall.ForkLock.RLock() - var s, e int + var s int var rsa syscall.Sockaddr for { - if fd.closing { + syscall.ForkLock.RLock() + s, rsa, err = syscall.Accept(fd.sysfd) + if err != nil { syscall.ForkLock.RUnlock() - return nil, os.EINVAL - } - s, rsa, e = syscall.Accept(fd.sysfd) - if e != syscall.EAGAIN || fd.rdeadline < 0 { - break + if err == syscall.EAGAIN { + err = errTimeout + if fd.rdeadline >= 0 { + if err = pollserver.WaitRead(fd); err == nil { + continue + } + } + } else if err == syscall.ECONNABORTED { + // This means that a socket on the listen queue was closed + // before we Accept()ed it; it's a silly error, so try again. + continue + } + return nil, &OpError{"accept", fd.net, fd.laddr, err} } - syscall.ForkLock.RUnlock() - pollserver.WaitRead(fd) - syscall.ForkLock.RLock() - } - if e != 0 { - syscall.ForkLock.RUnlock() - return nil, &OpError{"accept", fd.net, fd.laddr, os.Errno(e)} + break } syscall.CloseOnExec(s) syscall.ForkLock.RUnlock() - if nfd, err = newFD(s, fd.family, fd.proto, fd.net); err != nil { + if netfd, err = newFD(s, fd.family, fd.sotype, fd.net); err != nil { syscall.Close(s) return nil, err } - lsa, _ := syscall.Getsockname(nfd.sysfd) - nfd.setAddr(toAddr(lsa), toAddr(rsa)) - return nfd, nil + lsa, _ := syscall.Getsockname(netfd.sysfd) + netfd.setAddr(toAddr(lsa), toAddr(rsa)) + return netfd, nil } -func (fd *netFD) dup() (f *os.File, err os.Error) { - ns, e := syscall.Dup(fd.sysfd) - if e != 0 { - return nil, &OpError{"dup", fd.net, fd.laddr, os.Errno(e)} +func (fd *netFD) dup() (f *os.File, err error) { + ns, err := syscall.Dup(fd.sysfd) + if err != nil { + return nil, &OpError{"dup", fd.net, fd.laddr, err} } // We want blocking mode for the new fd, hence the double negative. - if e = syscall.SetNonblock(ns, false); e != 0 { - return nil, &OpError{"setnonblock", fd.net, fd.laddr, os.Errno(e)} + if err = syscall.SetNonblock(ns, false); err != nil { + return nil, &OpError{"setnonblock", fd.net, fd.laddr, err} } - return os.NewFile(ns, fd.sysfile.Name()), nil + return os.NewFile(uintptr(ns), fd.sysfile.Name()), nil } -func closesocket(s int) (errno int) { +func closesocket(s int) error { return syscall.Close(s) } |