summaryrefslogtreecommitdiff
path: root/src/pkg/runtime/netpoll.goc
diff options
context:
space:
mode:
Diffstat (limited to 'src/pkg/runtime/netpoll.goc')
-rw-r--r--src/pkg/runtime/netpoll.goc169
1 files changed, 122 insertions, 47 deletions
diff --git a/src/pkg/runtime/netpoll.goc b/src/pkg/runtime/netpoll.goc
index d27bef167..7b3d16d02 100644
--- a/src/pkg/runtime/netpoll.goc
+++ b/src/pkg/runtime/netpoll.goc
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-// +build darwin dragonfly freebsd linux netbsd openbsd windows
+// +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris windows
package net
@@ -19,21 +19,46 @@ package net
// An implementation must call the following function to denote that the pd is ready.
// void runtime·netpollready(G **gpp, PollDesc *pd, int32 mode);
+// PollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
+// goroutines respectively. The semaphore can be in the following states:
+// READY - io readiness notification is pending;
+// a goroutine consumes the notification by changing the state to nil.
+// WAIT - a goroutine prepares to park on the semaphore, but not yet parked;
+// the goroutine commits to park by changing the state to G pointer,
+// or, alternatively, concurrent io notification changes the state to READY,
+// or, alternatively, concurrent timeout/close changes the state to nil.
+// G pointer - the goroutine is blocked on the semaphore;
+// io notification or timeout/close changes the state to READY or nil respectively
+// and unparks the goroutine.
+// nil - nothing of the above.
#define READY ((G*)1)
+#define WAIT ((G*)2)
+
+enum
+{
+ PollBlockSize = 4*1024,
+};
struct PollDesc
{
PollDesc* link; // in pollcache, protected by pollcache.Lock
+
+ // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
+ // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
+ // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO rediness notification)
+ // proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated
+ // in a lock-free way by all operations.
Lock; // protectes the following fields
uintptr fd;
bool closing;
uintptr seq; // protects from stale timers and ready notifications
- G* rg; // G waiting for read or READY (binary semaphore)
+ G* rg; // READY, WAIT, G waiting for read or nil
Timer rt; // read deadline timer (set if rt.fv != nil)
int64 rd; // read deadline
- G* wg; // the same for writes
- Timer wt;
- int64 wd;
+ G* wg; // READY, WAIT, G waiting for write or nil
+ Timer wt; // write deadline timer
+ int64 wd; // write deadline
+ void* user; // user settable cookie
};
static struct
@@ -47,7 +72,7 @@ static struct
// seq is incremented when deadlines are changed or descriptor is reused.
} pollcache;
-static bool netpollblock(PollDesc*, int32);
+static bool netpollblock(PollDesc*, int32, bool);
static G* netpollunblock(PollDesc*, int32, bool);
static void deadline(int64, Eface);
static void readDeadline(int64, Eface);
@@ -59,6 +84,11 @@ static FuncVal deadlineFn = {(void(*)(void))deadline};
static FuncVal readDeadlineFn = {(void(*)(void))readDeadline};
static FuncVal writeDeadlineFn = {(void(*)(void))writeDeadline};
+// runtimeNano returns the current value of the runtime clock in nanoseconds.
+func runtimeNano() (ns int64) {
+ ns = runtime·nanotime();
+}
+
func runtime_pollServerInit() {
runtime·netpollinit();
}
@@ -97,7 +127,6 @@ func runtime_pollClose(pd *PollDesc) {
}
func runtime_pollReset(pd *PollDesc, mode int) (err int) {
- runtime·lock(pd);
err = checkerr(pd, mode);
if(err)
goto ret;
@@ -106,14 +135,15 @@ func runtime_pollReset(pd *PollDesc, mode int) (err int) {
else if(mode == 'w')
pd->wg = nil;
ret:
- runtime·unlock(pd);
}
func runtime_pollWait(pd *PollDesc, mode int) (err int) {
- runtime·lock(pd);
err = checkerr(pd, mode);
if(err == 0) {
- while(!netpollblock(pd, mode)) {
+ // As for now only Solaris uses level-triggered IO.
+ if(Solaris)
+ runtime·netpollarm(pd, mode);
+ while(!netpollblock(pd, mode, false)) {
err = checkerr(pd, mode);
if(err != 0)
break;
@@ -122,15 +152,13 @@ func runtime_pollWait(pd *PollDesc, mode int) (err int) {
// Pretend it has not happened and retry.
}
}
- runtime·unlock(pd);
}
func runtime_pollWaitCanceled(pd *PollDesc, mode int) {
- runtime·lock(pd);
- // wait for ioready, ignore closing or timeouts.
- while(!netpollblock(pd, mode))
+ // This function is used only on windows after a failed attempt to cancel
+ // a pending async IO operation. Wait for ioready, ignore closing or timeouts.
+ while(!netpollblock(pd, mode, true))
;
- runtime·unlock(pd);
}
func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) {
@@ -185,7 +213,7 @@ func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) {
}
// If we set the new deadline in the past, unblock currently pending IO if any.
rg = nil;
- wg = nil;
+ runtime·atomicstorep(&wg, nil); // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock
if(pd->rd < 0)
rg = netpollunblock(pd, 'r', false);
if(pd->wd < 0)
@@ -205,6 +233,7 @@ func runtime_pollUnblock(pd *PollDesc) {
runtime·throw("runtime_pollUnblock: already closing");
pd->closing = true;
pd->seq++;
+ runtime·atomicstorep(&rg, nil); // full memory barrier between store to closing and read of rg/wg in netpollunblock
rg = netpollunblock(pd, 'r', false);
wg = netpollunblock(pd, 'w', false);
if(pd->rt.fv) {
@@ -228,6 +257,30 @@ runtime·netpollfd(PollDesc *pd)
return pd->fd;
}
+void**
+runtime·netpolluser(PollDesc *pd)
+{
+ return &pd->user;
+}
+
+bool
+runtime·netpollclosing(PollDesc *pd)
+{
+ return pd->closing;
+}
+
+void
+runtime·netpolllock(PollDesc *pd)
+{
+ runtime·lock(pd);
+}
+
+void
+runtime·netpollunlock(PollDesc *pd)
+{
+ runtime·unlock(pd);
+}
+
// make pd ready, newly runnable goroutines (if any) are enqueued info gpp list
void
runtime·netpollready(G **gpp, PollDesc *pd, int32 mode)
@@ -235,12 +288,10 @@ runtime·netpollready(G **gpp, PollDesc *pd, int32 mode)
G *rg, *wg;
rg = wg = nil;
- runtime·lock(pd);
if(mode == 'r' || mode == 'r'+'w')
rg = netpollunblock(pd, 'r', true);
if(mode == 'w' || mode == 'r'+'w')
wg = netpollunblock(pd, 'w', true);
- runtime·unlock(pd);
if(rg) {
rg->schedlink = *gpp;
*gpp = rg;
@@ -261,51 +312,75 @@ checkerr(PollDesc *pd, int32 mode)
return 0;
}
+static bool
+blockcommit(G *gp, G **gpp)
+{
+ return runtime·casp(gpp, WAIT, gp);
+}
+
// returns true if IO is ready, or false if timedout or closed
+// waitio - wait only for completed IO, ignore errors
static bool
-netpollblock(PollDesc *pd, int32 mode)
+netpollblock(PollDesc *pd, int32 mode, bool waitio)
{
- G **gpp;
+ G **gpp, *old;
gpp = &pd->rg;
if(mode == 'w')
gpp = &pd->wg;
- if(*gpp == READY) {
- *gpp = nil;
- return true;
+
+ // set the gpp semaphore to WAIT
+ for(;;) {
+ old = *gpp;
+ if(old == READY) {
+ *gpp = nil;
+ return true;
+ }
+ if(old != nil)
+ runtime·throw("netpollblock: double wait");
+ if(runtime·casp(gpp, nil, WAIT))
+ break;
}
- if(*gpp != nil)
- runtime·throw("netpollblock: double wait");
- *gpp = g;
- runtime·park(runtime·unlock, &pd->Lock, "IO wait");
- runtime·lock(pd);
- if(g->param)
- return true;
- return false;
+
+ // need to recheck error states after setting gpp to WAIT
+ // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
+ // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
+ if(waitio || checkerr(pd, mode) == 0)
+ runtime·park((bool(*)(G*, void*))blockcommit, gpp, "IO wait");
+ // be careful to not lose concurrent READY notification
+ old = runtime·xchgp(gpp, nil);
+ if(old > WAIT)
+ runtime·throw("netpollblock: corrupted state");
+ return old == READY;
}
static G*
netpollunblock(PollDesc *pd, int32 mode, bool ioready)
{
- G **gpp, *old;
+ G **gpp, *old, *new;
gpp = &pd->rg;
if(mode == 'w')
gpp = &pd->wg;
- if(*gpp == READY)
- return nil;
- if(*gpp == nil) {
- // Only set READY for ioready. runtime_pollWait
- // will check for timeout/cancel before waiting.
+
+ for(;;) {
+ old = *gpp;
+ if(old == READY)
+ return nil;
+ if(old == nil && !ioready) {
+ // Only set READY for ioready. runtime_pollWait
+ // will check for timeout/cancel before waiting.
+ return nil;
+ }
+ new = nil;
if(ioready)
- *gpp = READY;
- return nil;
+ new = READY;
+ if(runtime·casp(gpp, old, new))
+ break;
}
- old = *gpp;
- // pass unblock reason onto blocked g
- old->param = (void*)ioready;
- *gpp = nil;
- return old;
+ if(old > WAIT)
+ return old; // must be G*
+ return nil;
}
static void
@@ -331,14 +406,14 @@ deadlineimpl(int64 now, Eface arg, bool read, bool write)
if(pd->rd <= 0 || pd->rt.fv == nil)
runtime·throw("deadlineimpl: inconsistent read deadline");
pd->rd = -1;
- pd->rt.fv = nil;
+ runtime·atomicstorep(&pd->rt.fv, nil); // full memory barrier between store to rd and load of rg in netpollunblock
rg = netpollunblock(pd, 'r', false);
}
if(write) {
if(pd->wd <= 0 || (pd->wt.fv == nil && !read))
runtime·throw("deadlineimpl: inconsistent write deadline");
pd->wd = -1;
- pd->wt.fv = nil;
+ runtime·atomicstorep(&pd->wt.fv, nil); // full memory barrier between store to wd and load of wg in netpollunblock
wg = netpollunblock(pd, 'w', false);
}
runtime·unlock(pd);
@@ -374,7 +449,7 @@ allocPollDesc(void)
runtime·lock(&pollcache);
if(pollcache.first == nil) {
- n = PageSize/sizeof(*pd);
+ n = PollBlockSize/sizeof(*pd);
if(n == 0)
n = 1;
// Must be in non-GC memory because can be referenced