diff options
Diffstat (limited to 'src/pkg/net/fd_windows.go')
-rw-r--r-- | src/pkg/net/fd_windows.go | 532 |
1 files changed, 532 insertions, 0 deletions
diff --git a/src/pkg/net/fd_windows.go b/src/pkg/net/fd_windows.go new file mode 100644 index 000000000..3757e143d --- /dev/null +++ b/src/pkg/net/fd_windows.go @@ -0,0 +1,532 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package net + +import ( + "os" + "runtime" + "sync" + "syscall" + "time" + "unsafe" +) + +type InvalidConnError struct{} + +func (e *InvalidConnError) String() string { return "invalid net.Conn" } +func (e *InvalidConnError) Temporary() bool { return false } +func (e *InvalidConnError) Timeout() bool { return false } + +var initErr os.Error + +func init() { + var d syscall.WSAData + e := syscall.WSAStartup(uint32(0x101), &d) + if e != 0 { + initErr = os.NewSyscallError("WSAStartup", e) + } +} + +func closesocket(s syscall.Handle) (errno int) { + return syscall.Closesocket(s) +} + +// Interface for all io operations. +type anOpIface interface { + Op() *anOp + Name() string + Submit() (errno int) +} + +// IO completion result parameters. +type ioResult struct { + qty uint32 + err int +} + +// anOp implements functionality common to all io operations. +type anOp struct { + // Used by IOCP interface, it must be first field + // of the struct, as our code rely on it. + o syscall.Overlapped + + resultc chan ioResult // io completion results + errnoc chan int // io submit / cancel operation errors + fd *netFD +} + +func (o *anOp) Init(fd *netFD) { + o.fd = fd + o.resultc = make(chan ioResult, 1) + o.errnoc = make(chan int) +} + +func (o *anOp) Op() *anOp { + return o +} + +// bufOp is used by io operations that read / write +// data from / to client buffer. +type bufOp struct { + anOp + buf syscall.WSABuf +} + +func (o *bufOp) Init(fd *netFD, buf []byte) { + o.anOp.Init(fd) + o.buf.Len = uint32(len(buf)) + if len(buf) == 0 { + o.buf.Buf = nil + } else { + o.buf.Buf = (*byte)(unsafe.Pointer(&buf[0])) + } +} + +// resultSrv will retrieve all io completion results from +// iocp and send them to the correspondent waiting client +// goroutine via channel supplied in the request. +type resultSrv struct { + iocp syscall.Handle +} + +func (s *resultSrv) Run() { + var o *syscall.Overlapped + var key uint32 + var r ioResult + for { + r.err = syscall.GetQueuedCompletionStatus(s.iocp, &(r.qty), &key, &o, syscall.INFINITE) + switch { + case r.err == 0: + // Dequeued successfully completed io packet. + case r.err == syscall.WAIT_TIMEOUT && o == nil: + // Wait has timed out (should not happen now, but might be used in the future). + panic("GetQueuedCompletionStatus timed out") + case o == nil: + // Failed to dequeue anything -> report the error. + panic("GetQueuedCompletionStatus failed " + syscall.Errstr(r.err)) + default: + // Dequeued failed io packet. + } + (*anOp)(unsafe.Pointer(o)).resultc <- r + } +} + +// ioSrv executes net io requests. +type ioSrv struct { + submchan chan anOpIface // submit io requests + canchan chan anOpIface // cancel io requests +} + +// ProcessRemoteIO will execute submit io requests on behalf +// of other goroutines, all on a single os thread, so it can +// cancel them later. Results of all operations will be sent +// back to their requesters via channel supplied in request. +func (s *ioSrv) ProcessRemoteIO() { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + for { + select { + case o := <-s.submchan: + o.Op().errnoc <- o.Submit() + case o := <-s.canchan: + o.Op().errnoc <- syscall.CancelIo(syscall.Handle(o.Op().fd.sysfd)) + } + } +} + +// ExecIO executes a single io operation. It either executes it +// inline, or, if timeouts are employed, passes the request onto +// a special goroutine and waits for completion or cancels request. +func (s *ioSrv) ExecIO(oi anOpIface, deadline_delta int64) (n int, err os.Error) { + var e int + o := oi.Op() + if deadline_delta > 0 { + // Send request to a special dedicated thread, + // so it can stop the io with CancelIO later. + s.submchan <- oi + e = <-o.errnoc + } else { + e = oi.Submit() + } + switch e { + case 0: + // IO completed immediately, but we need to get our completion message anyway. + case syscall.ERROR_IO_PENDING: + // IO started, and we have to wait for its completion. + default: + return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, os.Errno(e)} + } + // Wait for our request to complete. + var r ioResult + if deadline_delta > 0 { + select { + case r = <-o.resultc: + case <-time.After(deadline_delta): + s.canchan <- oi + <-o.errnoc + r = <-o.resultc + if r.err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled + r.err = syscall.EWOULDBLOCK + } + } + } else { + r = <-o.resultc + } + if r.err != 0 { + err = &OpError{oi.Name(), o.fd.net, o.fd.laddr, os.Errno(r.err)} + } + return int(r.qty), err +} + +// Start helper goroutines. +var resultsrv *resultSrv +var iosrv *ioSrv +var onceStartServer sync.Once + +func startServer() { + resultsrv = new(resultSrv) + var errno int + resultsrv.iocp, errno = syscall.CreateIoCompletionPort(syscall.InvalidHandle, 0, 0, 1) + if errno != 0 { + panic("CreateIoCompletionPort failed " + syscall.Errstr(errno)) + } + go resultsrv.Run() + + iosrv = new(ioSrv) + iosrv.submchan = make(chan anOpIface) + iosrv.canchan = make(chan anOpIface) + go iosrv.ProcessRemoteIO() +} + +// Network file descriptor. +type netFD struct { + // locking/lifetime of sysfd + sysmu sync.Mutex + sysref int + closing bool + + // immutable until Close + sysfd syscall.Handle + family int + proto int + net string + laddr Addr + raddr Addr + + // owned by client + rdeadline_delta int64 + rdeadline int64 + rio sync.Mutex + wdeadline_delta int64 + wdeadline int64 + wio sync.Mutex +} + +func allocFD(fd syscall.Handle, family, proto int, net string) (f *netFD) { + f = &netFD{ + sysfd: fd, + family: family, + proto: proto, + net: net, + } + runtime.SetFinalizer(f, (*netFD).Close) + return f +} + +func newFD(fd syscall.Handle, family, proto int, net string) (f *netFD, err os.Error) { + if initErr != nil { + return nil, initErr + } + onceStartServer.Do(startServer) + // Associate our socket with resultsrv.iocp. + if _, e := syscall.CreateIoCompletionPort(syscall.Handle(fd), resultsrv.iocp, 0, 0); e != 0 { + return nil, os.Errno(e) + } + return allocFD(fd, family, proto, net), nil +} + +func (fd *netFD) setAddr(laddr, raddr Addr) { + fd.laddr = laddr + fd.raddr = raddr +} + +func (fd *netFD) connect(ra syscall.Sockaddr) (err os.Error) { + e := syscall.Connect(fd.sysfd, ra) + if e != 0 { + return os.Errno(e) + } + return nil +} + +// Add a reference to this fd. +func (fd *netFD) incref() { + fd.sysmu.Lock() + fd.sysref++ + fd.sysmu.Unlock() +} + +// Remove a reference to this FD and close if we've been asked to do so (and +// there are no references left. +func (fd *netFD) decref() { + fd.sysmu.Lock() + fd.sysref-- + if fd.closing && fd.sysref == 0 && fd.sysfd != syscall.InvalidHandle { + // In case the user has set linger, switch to blocking mode so + // the close blocks. As long as this doesn't happen often, we + // can handle the extra OS processes. Otherwise we'll need to + // use the resultsrv for Close too. Sigh. + syscall.SetNonblock(fd.sysfd, false) + closesocket(fd.sysfd) + fd.sysfd = syscall.InvalidHandle + // no need for a finalizer anymore + runtime.SetFinalizer(fd, nil) + } + fd.sysmu.Unlock() +} + +func (fd *netFD) Close() os.Error { + if fd == nil || fd.sysfd == syscall.InvalidHandle { + return os.EINVAL + } + + fd.incref() + syscall.Shutdown(fd.sysfd, syscall.SHUT_RDWR) + fd.closing = true + fd.decref() + return nil +} + +// Read from network. + +type readOp struct { + bufOp +} + +func (o *readOp) Submit() (errno int) { + var d, f uint32 + return syscall.WSARecv(syscall.Handle(o.fd.sysfd), &o.buf, 1, &d, &f, &o.o, nil) +} + +func (o *readOp) Name() string { + return "WSARecv" +} + +func (fd *netFD) Read(buf []byte) (n int, err os.Error) { + if fd == nil { + return 0, os.EINVAL + } + fd.rio.Lock() + defer fd.rio.Unlock() + fd.incref() + defer fd.decref() + if fd.sysfd == syscall.InvalidHandle { + return 0, os.EINVAL + } + var o readOp + o.Init(fd, buf) + n, err = iosrv.ExecIO(&o, fd.rdeadline_delta) + if err == nil && n == 0 { + err = os.EOF + } + return +} + +// ReadFrom from network. + +type readFromOp struct { + bufOp + rsa syscall.RawSockaddrAny + rsan int32 +} + +func (o *readFromOp) Submit() (errno int) { + var d, f uint32 + return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &d, &f, &o.rsa, &o.rsan, &o.o, nil) +} + +func (o *readFromOp) Name() string { + return "WSARecvFrom" +} + +func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err os.Error) { + if fd == nil { + return 0, nil, os.EINVAL + } + if len(buf) == 0 { + return 0, nil, nil + } + fd.rio.Lock() + defer fd.rio.Unlock() + fd.incref() + defer fd.decref() + if fd.sysfd == syscall.InvalidHandle { + return 0, nil, os.EINVAL + } + var o readFromOp + o.Init(fd, buf) + o.rsan = int32(unsafe.Sizeof(o.rsa)) + n, err = iosrv.ExecIO(&o, fd.rdeadline_delta) + if err != nil { + return 0, nil, err + } + sa, _ = o.rsa.Sockaddr() + return +} + +// Write to network. + +type writeOp struct { + bufOp +} + +func (o *writeOp) Submit() (errno int) { + var d uint32 + return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &d, 0, &o.o, nil) +} + +func (o *writeOp) Name() string { + return "WSASend" +} + +func (fd *netFD) Write(buf []byte) (n int, err os.Error) { + if fd == nil { + return 0, os.EINVAL + } + fd.wio.Lock() + defer fd.wio.Unlock() + fd.incref() + defer fd.decref() + if fd.sysfd == syscall.InvalidHandle { + return 0, os.EINVAL + } + var o writeOp + o.Init(fd, buf) + return iosrv.ExecIO(&o, fd.wdeadline_delta) +} + +// WriteTo to network. + +type writeToOp struct { + bufOp + sa syscall.Sockaddr +} + +func (o *writeToOp) Submit() (errno int) { + var d uint32 + return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &d, 0, o.sa, &o.o, nil) +} + +func (o *writeToOp) Name() string { + return "WSASendto" +} + +func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (n int, err os.Error) { + if fd == nil { + return 0, os.EINVAL + } + if len(buf) == 0 { + return 0, nil + } + fd.wio.Lock() + defer fd.wio.Unlock() + fd.incref() + defer fd.decref() + if fd.sysfd == syscall.InvalidHandle { + return 0, os.EINVAL + } + var o writeToOp + o.Init(fd, buf) + o.sa = sa + return iosrv.ExecIO(&o, fd.wdeadline_delta) +} + +// Accept new network connections. + +type acceptOp struct { + anOp + newsock syscall.Handle + attrs [2]syscall.RawSockaddrAny // space for local and remote address only +} + +func (o *acceptOp) Submit() (errno int) { + var d uint32 + l := uint32(unsafe.Sizeof(o.attrs[0])) + return syscall.AcceptEx(o.fd.sysfd, o.newsock, + (*byte)(unsafe.Pointer(&o.attrs[0])), 0, l, l, &d, &o.o) +} + +func (o *acceptOp) Name() string { + return "AcceptEx" +} + +func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (nfd *netFD, err os.Error) { + if fd == nil || fd.sysfd == syscall.InvalidHandle { + return nil, os.EINVAL + } + fd.incref() + defer fd.decref() + + // Get new socket. + // See ../syscall/exec.go for description of ForkLock. + syscall.ForkLock.RLock() + s, e := syscall.Socket(fd.family, fd.proto, 0) + if e != 0 { + syscall.ForkLock.RUnlock() + return nil, os.Errno(e) + } + syscall.CloseOnExec(s) + syscall.ForkLock.RUnlock() + + // Associate our new socket with IOCP. + onceStartServer.Do(startServer) + if _, e = syscall.CreateIoCompletionPort(s, resultsrv.iocp, 0, 0); e != 0 { + return nil, &OpError{"CreateIoCompletionPort", fd.net, fd.laddr, os.Errno(e)} + } + + // Submit accept request. + var o acceptOp + o.Init(fd) + o.newsock = s + _, err = iosrv.ExecIO(&o, 0) + if err != nil { + closesocket(s) + return nil, err + } + + // Inherit properties of the listening socket. + e = syscall.Setsockopt(s, syscall.SOL_SOCKET, syscall.SO_UPDATE_ACCEPT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd))) + if e != 0 { + closesocket(s) + return nil, err + } + + // Get local and peer addr out of AcceptEx buffer. + var lrsa, rrsa *syscall.RawSockaddrAny + var llen, rlen int32 + l := uint32(unsafe.Sizeof(*lrsa)) + syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&o.attrs[0])), + 0, l, l, &lrsa, &llen, &rrsa, &rlen) + lsa, _ := lrsa.Sockaddr() + rsa, _ := rrsa.Sockaddr() + + nfd = allocFD(s, fd.family, fd.proto, fd.net) + nfd.setAddr(toAddr(lsa), toAddr(rsa)) + return nfd, nil +} + +// Unimplemented functions. + +func (fd *netFD) dup() (f *os.File, err os.Error) { + // TODO: Implement this + return nil, os.NewSyscallError("dup", syscall.EWINDOWS) +} + +func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err os.Error) { + return 0, 0, 0, nil, os.EAFNOSUPPORT +} + +func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err os.Error) { + return 0, 0, os.EAFNOSUPPORT +} |