diff options
Diffstat (limited to 'src/pkg/runtime/netpoll.goc')
-rw-r--r-- | src/pkg/runtime/netpoll.goc | 169 |
1 files changed, 122 insertions, 47 deletions
diff --git a/src/pkg/runtime/netpoll.goc b/src/pkg/runtime/netpoll.goc index d27bef167..7b3d16d02 100644 --- a/src/pkg/runtime/netpoll.goc +++ b/src/pkg/runtime/netpoll.goc @@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// +build darwin dragonfly freebsd linux netbsd openbsd windows +// +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris windows package net @@ -19,21 +19,46 @@ package net // An implementation must call the following function to denote that the pd is ready. // void runtime·netpollready(G **gpp, PollDesc *pd, int32 mode); +// PollDesc contains 2 binary semaphores, rg and wg, to park reader and writer +// goroutines respectively. The semaphore can be in the following states: +// READY - io readiness notification is pending; +// a goroutine consumes the notification by changing the state to nil. +// WAIT - a goroutine prepares to park on the semaphore, but not yet parked; +// the goroutine commits to park by changing the state to G pointer, +// or, alternatively, concurrent io notification changes the state to READY, +// or, alternatively, concurrent timeout/close changes the state to nil. +// G pointer - the goroutine is blocked on the semaphore; +// io notification or timeout/close changes the state to READY or nil respectively +// and unparks the goroutine. +// nil - nothing of the above. #define READY ((G*)1) +#define WAIT ((G*)2) + +enum +{ + PollBlockSize = 4*1024, +}; struct PollDesc { PollDesc* link; // in pollcache, protected by pollcache.Lock + + // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations. + // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime. + // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO rediness notification) + // proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated + // in a lock-free way by all operations. Lock; // protectes the following fields uintptr fd; bool closing; uintptr seq; // protects from stale timers and ready notifications - G* rg; // G waiting for read or READY (binary semaphore) + G* rg; // READY, WAIT, G waiting for read or nil Timer rt; // read deadline timer (set if rt.fv != nil) int64 rd; // read deadline - G* wg; // the same for writes - Timer wt; - int64 wd; + G* wg; // READY, WAIT, G waiting for write or nil + Timer wt; // write deadline timer + int64 wd; // write deadline + void* user; // user settable cookie }; static struct @@ -47,7 +72,7 @@ static struct // seq is incremented when deadlines are changed or descriptor is reused. } pollcache; -static bool netpollblock(PollDesc*, int32); +static bool netpollblock(PollDesc*, int32, bool); static G* netpollunblock(PollDesc*, int32, bool); static void deadline(int64, Eface); static void readDeadline(int64, Eface); @@ -59,6 +84,11 @@ static FuncVal deadlineFn = {(void(*)(void))deadline}; static FuncVal readDeadlineFn = {(void(*)(void))readDeadline}; static FuncVal writeDeadlineFn = {(void(*)(void))writeDeadline}; +// runtimeNano returns the current value of the runtime clock in nanoseconds. +func runtimeNano() (ns int64) { + ns = runtime·nanotime(); +} + func runtime_pollServerInit() { runtime·netpollinit(); } @@ -97,7 +127,6 @@ func runtime_pollClose(pd *PollDesc) { } func runtime_pollReset(pd *PollDesc, mode int) (err int) { - runtime·lock(pd); err = checkerr(pd, mode); if(err) goto ret; @@ -106,14 +135,15 @@ func runtime_pollReset(pd *PollDesc, mode int) (err int) { else if(mode == 'w') pd->wg = nil; ret: - runtime·unlock(pd); } func runtime_pollWait(pd *PollDesc, mode int) (err int) { - runtime·lock(pd); err = checkerr(pd, mode); if(err == 0) { - while(!netpollblock(pd, mode)) { + // As for now only Solaris uses level-triggered IO. + if(Solaris) + runtime·netpollarm(pd, mode); + while(!netpollblock(pd, mode, false)) { err = checkerr(pd, mode); if(err != 0) break; @@ -122,15 +152,13 @@ func runtime_pollWait(pd *PollDesc, mode int) (err int) { // Pretend it has not happened and retry. } } - runtime·unlock(pd); } func runtime_pollWaitCanceled(pd *PollDesc, mode int) { - runtime·lock(pd); - // wait for ioready, ignore closing or timeouts. - while(!netpollblock(pd, mode)) + // This function is used only on windows after a failed attempt to cancel + // a pending async IO operation. Wait for ioready, ignore closing or timeouts. + while(!netpollblock(pd, mode, true)) ; - runtime·unlock(pd); } func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) { @@ -185,7 +213,7 @@ func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) { } // If we set the new deadline in the past, unblock currently pending IO if any. rg = nil; - wg = nil; + runtime·atomicstorep(&wg, nil); // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock if(pd->rd < 0) rg = netpollunblock(pd, 'r', false); if(pd->wd < 0) @@ -205,6 +233,7 @@ func runtime_pollUnblock(pd *PollDesc) { runtime·throw("runtime_pollUnblock: already closing"); pd->closing = true; pd->seq++; + runtime·atomicstorep(&rg, nil); // full memory barrier between store to closing and read of rg/wg in netpollunblock rg = netpollunblock(pd, 'r', false); wg = netpollunblock(pd, 'w', false); if(pd->rt.fv) { @@ -228,6 +257,30 @@ runtime·netpollfd(PollDesc *pd) return pd->fd; } +void** +runtime·netpolluser(PollDesc *pd) +{ + return &pd->user; +} + +bool +runtime·netpollclosing(PollDesc *pd) +{ + return pd->closing; +} + +void +runtime·netpolllock(PollDesc *pd) +{ + runtime·lock(pd); +} + +void +runtime·netpollunlock(PollDesc *pd) +{ + runtime·unlock(pd); +} + // make pd ready, newly runnable goroutines (if any) are enqueued info gpp list void runtime·netpollready(G **gpp, PollDesc *pd, int32 mode) @@ -235,12 +288,10 @@ runtime·netpollready(G **gpp, PollDesc *pd, int32 mode) G *rg, *wg; rg = wg = nil; - runtime·lock(pd); if(mode == 'r' || mode == 'r'+'w') rg = netpollunblock(pd, 'r', true); if(mode == 'w' || mode == 'r'+'w') wg = netpollunblock(pd, 'w', true); - runtime·unlock(pd); if(rg) { rg->schedlink = *gpp; *gpp = rg; @@ -261,51 +312,75 @@ checkerr(PollDesc *pd, int32 mode) return 0; } +static bool +blockcommit(G *gp, G **gpp) +{ + return runtime·casp(gpp, WAIT, gp); +} + // returns true if IO is ready, or false if timedout or closed +// waitio - wait only for completed IO, ignore errors static bool -netpollblock(PollDesc *pd, int32 mode) +netpollblock(PollDesc *pd, int32 mode, bool waitio) { - G **gpp; + G **gpp, *old; gpp = &pd->rg; if(mode == 'w') gpp = &pd->wg; - if(*gpp == READY) { - *gpp = nil; - return true; + + // set the gpp semaphore to WAIT + for(;;) { + old = *gpp; + if(old == READY) { + *gpp = nil; + return true; + } + if(old != nil) + runtime·throw("netpollblock: double wait"); + if(runtime·casp(gpp, nil, WAIT)) + break; } - if(*gpp != nil) - runtime·throw("netpollblock: double wait"); - *gpp = g; - runtime·park(runtime·unlock, &pd->Lock, "IO wait"); - runtime·lock(pd); - if(g->param) - return true; - return false; + + // need to recheck error states after setting gpp to WAIT + // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl + // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg + if(waitio || checkerr(pd, mode) == 0) + runtime·park((bool(*)(G*, void*))blockcommit, gpp, "IO wait"); + // be careful to not lose concurrent READY notification + old = runtime·xchgp(gpp, nil); + if(old > WAIT) + runtime·throw("netpollblock: corrupted state"); + return old == READY; } static G* netpollunblock(PollDesc *pd, int32 mode, bool ioready) { - G **gpp, *old; + G **gpp, *old, *new; gpp = &pd->rg; if(mode == 'w') gpp = &pd->wg; - if(*gpp == READY) - return nil; - if(*gpp == nil) { - // Only set READY for ioready. runtime_pollWait - // will check for timeout/cancel before waiting. + + for(;;) { + old = *gpp; + if(old == READY) + return nil; + if(old == nil && !ioready) { + // Only set READY for ioready. runtime_pollWait + // will check for timeout/cancel before waiting. + return nil; + } + new = nil; if(ioready) - *gpp = READY; - return nil; + new = READY; + if(runtime·casp(gpp, old, new)) + break; } - old = *gpp; - // pass unblock reason onto blocked g - old->param = (void*)ioready; - *gpp = nil; - return old; + if(old > WAIT) + return old; // must be G* + return nil; } static void @@ -331,14 +406,14 @@ deadlineimpl(int64 now, Eface arg, bool read, bool write) if(pd->rd <= 0 || pd->rt.fv == nil) runtime·throw("deadlineimpl: inconsistent read deadline"); pd->rd = -1; - pd->rt.fv = nil; + runtime·atomicstorep(&pd->rt.fv, nil); // full memory barrier between store to rd and load of rg in netpollunblock rg = netpollunblock(pd, 'r', false); } if(write) { if(pd->wd <= 0 || (pd->wt.fv == nil && !read)) runtime·throw("deadlineimpl: inconsistent write deadline"); pd->wd = -1; - pd->wt.fv = nil; + runtime·atomicstorep(&pd->wt.fv, nil); // full memory barrier between store to wd and load of wg in netpollunblock wg = netpollunblock(pd, 'w', false); } runtime·unlock(pd); @@ -374,7 +449,7 @@ allocPollDesc(void) runtime·lock(&pollcache); if(pollcache.first == nil) { - n = PageSize/sizeof(*pd); + n = PollBlockSize/sizeof(*pd); if(n == 0) n = 1; // Must be in non-GC memory because can be referenced |