diff options
Diffstat (limited to 'src/pkg/net/fd_windows.go')
-rw-r--r-- | src/pkg/net/fd_windows.go | 139 |
1 files changed, 124 insertions, 15 deletions
diff --git a/src/pkg/net/fd_windows.go b/src/pkg/net/fd_windows.go index 72685d612..9b91eb398 100644 --- a/src/pkg/net/fd_windows.go +++ b/src/pkg/net/fd_windows.go @@ -6,14 +6,13 @@ package net import ( "os" + "runtime" "sync" "syscall" + "time" "unsafe" - "runtime" ) -// BUG(brainman): The Windows implementation does not implement SetTimeout. - // IO completion result parameters. type ioResult struct { key uint32 @@ -79,6 +78,8 @@ type ioPacket struct { // Link to the io owner. c chan *ioResult + + w *syscall.WSABuf } func (s *pollServer) getCompletedIO() (ov *syscall.Overlapped, result *ioResult, err os.Error) { @@ -126,6 +127,8 @@ func startServer() { panic("Start pollServer: " + err.String() + "\n") } pollserver = p + + go timeoutIO() } var initErr os.Error @@ -143,8 +146,8 @@ func newFD(fd, family, proto int, net string, laddr, raddr Addr) (f *netFD, err sysfd: fd, family: family, proto: proto, - cr: make(chan *ioResult), - cw: make(chan *ioResult), + cr: make(chan *ioResult, 1), + cw: make(chan *ioResult, 1), net: net, laddr: laddr, raddr: raddr, @@ -199,6 +202,80 @@ func newWSABuf(p []byte) *syscall.WSABuf { return &syscall.WSABuf{uint32(len(p)), p0} } +func waitPacket(fd *netFD, pckt *ioPacket, mode int) (r *ioResult) { + var delta int64 + if mode == 'r' { + delta = fd.rdeadline_delta + } + if mode == 'w' { + delta = fd.wdeadline_delta + } + if delta <= 0 { + return <-pckt.c + } + + select { + case r = <-pckt.c: + case <-time.After(delta): + a := &arg{f: cancel, fd: fd, pckt: pckt, c: make(chan int)} + ioChan <- a + <-a.c + r = <-pckt.c + if r.errno == 995 { // IO Canceled + r.errno = syscall.EWOULDBLOCK + } + } + return r +} + +const ( + read = iota + readfrom + write + writeto + cancel +) + +type arg struct { + f int + fd *netFD + pckt *ioPacket + done *uint32 + flags *uint32 + rsa *syscall.RawSockaddrAny + size *int32 + sa *syscall.Sockaddr + c chan int +} + +var ioChan chan *arg = make(chan *arg) + +func timeoutIO() { + // CancelIO only cancels all pending input and output (I/O) operations that are + // issued by the calling thread for the specified file, does not cancel I/O + // operations that other threads issue for a file handle. So we need do all timeout + // I/O in single OS thread. + runtime.LockOSThread() + defer runtime.UnlockOSThread() + for { + o := <-ioChan + var e int + switch o.f { + case read: + e = syscall.WSARecv(uint32(o.fd.sysfd), o.pckt.w, 1, o.done, o.flags, &o.pckt.o, nil) + case readfrom: + e = syscall.WSARecvFrom(uint32(o.fd.sysfd), o.pckt.w, 1, o.done, o.flags, o.rsa, o.size, &o.pckt.o, nil) + case write: + e = syscall.WSASend(uint32(o.fd.sysfd), o.pckt.w, 1, o.done, uint32(0), &o.pckt.o, nil) + case writeto: + e = syscall.WSASendto(uint32(o.fd.sysfd), o.pckt.w, 1, o.done, 0, *o.sa, &o.pckt.o, nil) + case cancel: + _, e = syscall.CancelIo(uint32(o.fd.sysfd)) + } + o.c <- e + } +} + func (fd *netFD) Read(p []byte) (n int, err os.Error) { if fd == nil { return 0, os.EINVAL @@ -213,9 +290,17 @@ func (fd *netFD) Read(p []byte) (n int, err os.Error) { // Submit receive request. var pckt ioPacket pckt.c = fd.cr + pckt.w = newWSABuf(p) var done uint32 flags := uint32(0) - e := syscall.WSARecv(uint32(fd.sysfd), newWSABuf(p), 1, &done, &flags, &pckt.o, nil) + var e int + if fd.rdeadline_delta > 0 { + a := &arg{f: read, fd: fd, pckt: &pckt, done: &done, flags: &flags, c: make(chan int)} + ioChan <- a + e = <-a.c + } else { + e = syscall.WSARecv(uint32(fd.sysfd), pckt.w, 1, &done, &flags, &pckt.o, nil) + } switch e { case 0: // IO completed immediately, but we need to get our completion message anyway. @@ -225,7 +310,7 @@ func (fd *netFD) Read(p []byte) (n int, err os.Error) { return 0, &OpError{"WSARecv", fd.net, fd.laddr, os.Errno(e)} } // Wait for our request to complete. - r := <-pckt.c + r := waitPacket(fd, &pckt, 'r') if r.errno != 0 { err = &OpError{"WSARecv", fd.net, fd.laddr, os.Errno(r.errno)} } @@ -253,11 +338,19 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err os.Error) { // Submit receive request. var pckt ioPacket pckt.c = fd.cr + pckt.w = newWSABuf(p) var done uint32 flags := uint32(0) var rsa syscall.RawSockaddrAny l := int32(unsafe.Sizeof(rsa)) - e := syscall.WSARecvFrom(uint32(fd.sysfd), newWSABuf(p), 1, &done, &flags, &rsa, &l, &pckt.o, nil) + var e int + if fd.rdeadline_delta > 0 { + a := &arg{f: readfrom, fd: fd, pckt: &pckt, done: &done, flags: &flags, rsa: &rsa, size: &l, c: make(chan int)} + ioChan <- a + e = <-a.c + } else { + e = syscall.WSARecvFrom(uint32(fd.sysfd), pckt.w, 1, &done, &flags, &rsa, &l, &pckt.o, nil) + } switch e { case 0: // IO completed immediately, but we need to get our completion message anyway. @@ -267,7 +360,7 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err os.Error) { return 0, nil, &OpError{"WSARecvFrom", fd.net, fd.laddr, os.Errno(e)} } // Wait for our request to complete. - r := <-pckt.c + r := waitPacket(fd, &pckt, 'r') if r.errno != 0 { err = &OpError{"WSARecvFrom", fd.net, fd.laddr, os.Errno(r.errno)} } @@ -290,8 +383,16 @@ func (fd *netFD) Write(p []byte) (n int, err os.Error) { // Submit send request. var pckt ioPacket pckt.c = fd.cw + pckt.w = newWSABuf(p) var done uint32 - e := syscall.WSASend(uint32(fd.sysfd), newWSABuf(p), 1, &done, uint32(0), &pckt.o, nil) + var e int + if fd.wdeadline_delta > 0 { + a := &arg{f: write, fd: fd, pckt: &pckt, done: &done, c: make(chan int)} + ioChan <- a + e = <-a.c + } else { + e = syscall.WSASend(uint32(fd.sysfd), pckt.w, 1, &done, uint32(0), &pckt.o, nil) + } switch e { case 0: // IO completed immediately, but we need to get our completion message anyway. @@ -301,7 +402,7 @@ func (fd *netFD) Write(p []byte) (n int, err os.Error) { return 0, &OpError{"WSASend", fd.net, fd.laddr, os.Errno(e)} } // Wait for our request to complete. - r := <-pckt.c + r := waitPacket(fd, &pckt, 'w') if r.errno != 0 { err = &OpError{"WSASend", fd.net, fd.laddr, os.Errno(r.errno)} } @@ -326,8 +427,16 @@ func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err os.Error) { // Submit send request. var pckt ioPacket pckt.c = fd.cw + pckt.w = newWSABuf(p) var done uint32 - e := syscall.WSASendto(uint32(fd.sysfd), newWSABuf(p), 1, &done, 0, sa, &pckt.o, nil) + var e int + if fd.wdeadline_delta > 0 { + a := &arg{f: writeto, fd: fd, pckt: &pckt, done: &done, sa: &sa, c: make(chan int)} + ioChan <- a + e = <-a.c + } else { + e = syscall.WSASendto(uint32(fd.sysfd), pckt.w, 1, &done, 0, sa, &pckt.o, nil) + } switch e { case 0: // IO completed immediately, but we need to get our completion message anyway. @@ -337,7 +446,7 @@ func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err os.Error) { return 0, &OpError{"WSASendTo", fd.net, fd.laddr, os.Errno(e)} } // Wait for our request to complete. - r := <-pckt.c + r := waitPacket(fd, &pckt, 'w') if r.errno != 0 { err = &OpError{"WSASendTo", fd.net, fd.laddr, os.Errno(r.errno)} } @@ -410,8 +519,8 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (nfd *netFD, err os. sysfd: s, family: fd.family, proto: fd.proto, - cr: make(chan *ioResult), - cw: make(chan *ioResult), + cr: make(chan *ioResult, 1), + cw: make(chan *ioResult, 1), net: fd.net, laddr: laddr, raddr: raddr, |