diff options
Diffstat (limited to 'src/pkg/runtime/chan.goc')
-rw-r--r-- | src/pkg/runtime/chan.goc | 1155 |
1 files changed, 1155 insertions, 0 deletions
diff --git a/src/pkg/runtime/chan.goc b/src/pkg/runtime/chan.goc new file mode 100644 index 000000000..7a584717b --- /dev/null +++ b/src/pkg/runtime/chan.goc @@ -0,0 +1,1155 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package runtime +#include "runtime.h" +#include "arch_GOARCH.h" +#include "type.h" +#include "race.h" +#include "malloc.h" +#include "chan.h" +#include "../../cmd/ld/textflag.h" + +uint32 runtime·Hchansize = sizeof(Hchan); + +static void dequeueg(WaitQ*); +static SudoG* dequeue(WaitQ*); +static void enqueue(WaitQ*, SudoG*); +static void destroychan(Hchan*); +static void racesync(Hchan*, SudoG*); + +static Hchan* +makechan(ChanType *t, int64 hint) +{ + Hchan *c; + Type *elem; + + elem = t->elem; + + // compiler checks this but be safe. + if(elem->size >= (1<<16)) + runtime·throw("makechan: invalid channel element type"); + if((sizeof(*c)%MAXALIGN) != 0 || elem->align > MAXALIGN) + runtime·throw("makechan: bad alignment"); + + if(hint < 0 || (intgo)hint != hint || (elem->size > 0 && hint > (MaxMem - sizeof(*c)) / elem->size)) + runtime·panicstring("makechan: size out of range"); + + // allocate memory in one call + c = (Hchan*)runtime·mallocgc(sizeof(*c) + hint*elem->size, (uintptr)t | TypeInfo_Chan, 0); + c->elemsize = elem->size; + c->elemtype = elem; + c->dataqsiz = hint; + + if(debug) + runtime·printf("makechan: chan=%p; elemsize=%D; elemalg=%p; dataqsiz=%D\n", + c, (int64)elem->size, elem->alg, (int64)c->dataqsiz); + + return c; +} + +func reflect·makechan(t *ChanType, size uint64) (c *Hchan) { + c = makechan(t, size); +} + +func makechan(t *ChanType, size int64) (c *Hchan) { + c = makechan(t, size); +} + +/* + * generic single channel send/recv + * if the bool pointer is nil, + * then the full exchange will + * occur. if pres is not nil, + * then the protocol will not + * sleep but return if it could + * not complete. + * + * sleep can wake up with g->param == nil + * when a channel involved in the sleep has + * been closed. it is easiest to loop and re-run + * the operation; we'll see that it's now closed. + */ +static bool +chansend(ChanType *t, Hchan *c, byte *ep, bool block, void *pc) +{ + SudoG *sg; + SudoG mysg; + G* gp; + int64 t0; + + if(raceenabled) + runtime·racereadobjectpc(ep, t->elem, runtime·getcallerpc(&t), chansend); + + if(c == nil) { + USED(t); + if(!block) + return false; + runtime·park(nil, nil, "chan send (nil chan)"); + return false; // not reached + } + + if(debug) { + runtime·printf("chansend: chan=%p; elem=", c); + c->elemtype->alg->print(c->elemsize, ep); + runtime·prints("\n"); + } + + t0 = 0; + mysg.releasetime = 0; + if(runtime·blockprofilerate > 0) { + t0 = runtime·cputicks(); + mysg.releasetime = -1; + } + + runtime·lock(c); + if(raceenabled) + runtime·racereadpc(c, pc, chansend); + if(c->closed) + goto closed; + + if(c->dataqsiz > 0) + goto asynch; + + sg = dequeue(&c->recvq); + if(sg != nil) { + if(raceenabled) + racesync(c, sg); + runtime·unlock(c); + + gp = sg->g; + gp->param = sg; + if(sg->elem != nil) + c->elemtype->alg->copy(c->elemsize, sg->elem, ep); + if(sg->releasetime) + sg->releasetime = runtime·cputicks(); + runtime·ready(gp); + return true; + } + + if(!block) { + runtime·unlock(c); + return false; + } + + mysg.elem = ep; + mysg.g = g; + mysg.selectdone = nil; + g->param = nil; + enqueue(&c->sendq, &mysg); + runtime·parkunlock(c, "chan send"); + + if(g->param == nil) { + runtime·lock(c); + if(!c->closed) + runtime·throw("chansend: spurious wakeup"); + goto closed; + } + + if(mysg.releasetime > 0) + runtime·blockevent(mysg.releasetime - t0, 2); + + return true; + +asynch: + if(c->closed) + goto closed; + + if(c->qcount >= c->dataqsiz) { + if(!block) { + runtime·unlock(c); + return false; + } + mysg.g = g; + mysg.elem = nil; + mysg.selectdone = nil; + enqueue(&c->sendq, &mysg); + runtime·parkunlock(c, "chan send"); + + runtime·lock(c); + goto asynch; + } + + if(raceenabled) { + runtime·raceacquire(chanbuf(c, c->sendx)); + runtime·racerelease(chanbuf(c, c->sendx)); + } + + c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->sendx), ep); + if(++c->sendx == c->dataqsiz) + c->sendx = 0; + c->qcount++; + + sg = dequeue(&c->recvq); + if(sg != nil) { + gp = sg->g; + runtime·unlock(c); + if(sg->releasetime) + sg->releasetime = runtime·cputicks(); + runtime·ready(gp); + } else + runtime·unlock(c); + if(mysg.releasetime > 0) + runtime·blockevent(mysg.releasetime - t0, 2); + return true; + +closed: + runtime·unlock(c); + runtime·panicstring("send on closed channel"); + return false; // not reached +} + + +static bool +chanrecv(ChanType *t, Hchan* c, byte *ep, bool block, bool *received) +{ + SudoG *sg; + SudoG mysg; + G *gp; + int64 t0; + + // raceenabled: don't need to check ep, as it is always on the stack. + + if(debug) + runtime·printf("chanrecv: chan=%p\n", c); + + if(c == nil) { + USED(t); + if(!block) + return false; + runtime·park(nil, nil, "chan receive (nil chan)"); + return false; // not reached + } + + t0 = 0; + mysg.releasetime = 0; + if(runtime·blockprofilerate > 0) { + t0 = runtime·cputicks(); + mysg.releasetime = -1; + } + + runtime·lock(c); + if(c->dataqsiz > 0) + goto asynch; + + if(c->closed) + goto closed; + + sg = dequeue(&c->sendq); + if(sg != nil) { + if(raceenabled) + racesync(c, sg); + runtime·unlock(c); + + if(ep != nil) + c->elemtype->alg->copy(c->elemsize, ep, sg->elem); + gp = sg->g; + gp->param = sg; + if(sg->releasetime) + sg->releasetime = runtime·cputicks(); + runtime·ready(gp); + + if(received != nil) + *received = true; + return true; + } + + if(!block) { + runtime·unlock(c); + return false; + } + + mysg.elem = ep; + mysg.g = g; + mysg.selectdone = nil; + g->param = nil; + enqueue(&c->recvq, &mysg); + runtime·parkunlock(c, "chan receive"); + + if(g->param == nil) { + runtime·lock(c); + if(!c->closed) + runtime·throw("chanrecv: spurious wakeup"); + goto closed; + } + + if(received != nil) + *received = true; + if(mysg.releasetime > 0) + runtime·blockevent(mysg.releasetime - t0, 2); + return true; + +asynch: + if(c->qcount <= 0) { + if(c->closed) + goto closed; + + if(!block) { + runtime·unlock(c); + if(received != nil) + *received = false; + return false; + } + mysg.g = g; + mysg.elem = nil; + mysg.selectdone = nil; + enqueue(&c->recvq, &mysg); + runtime·parkunlock(c, "chan receive"); + + runtime·lock(c); + goto asynch; + } + + if(raceenabled) { + runtime·raceacquire(chanbuf(c, c->recvx)); + runtime·racerelease(chanbuf(c, c->recvx)); + } + + if(ep != nil) + c->elemtype->alg->copy(c->elemsize, ep, chanbuf(c, c->recvx)); + c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->recvx), nil); + if(++c->recvx == c->dataqsiz) + c->recvx = 0; + c->qcount--; + + sg = dequeue(&c->sendq); + if(sg != nil) { + gp = sg->g; + runtime·unlock(c); + if(sg->releasetime) + sg->releasetime = runtime·cputicks(); + runtime·ready(gp); + } else + runtime·unlock(c); + + if(received != nil) + *received = true; + if(mysg.releasetime > 0) + runtime·blockevent(mysg.releasetime - t0, 2); + return true; + +closed: + if(ep != nil) + c->elemtype->alg->copy(c->elemsize, ep, nil); + if(received != nil) + *received = false; + if(raceenabled) + runtime·raceacquire(c); + runtime·unlock(c); + if(mysg.releasetime > 0) + runtime·blockevent(mysg.releasetime - t0, 2); + return true; +} + +#pragma textflag NOSPLIT +func chansend1(t *ChanType, c *Hchan, elem *byte) { + chansend(t, c, elem, true, runtime·getcallerpc(&t)); +} + +#pragma textflag NOSPLIT +func chanrecv1(t *ChanType, c *Hchan, elem *byte) { + chanrecv(t, c, elem, true, nil); +} + +// chanrecv2(hchan *chan any, elem *any) (received bool); +#pragma textflag NOSPLIT +func chanrecv2(t *ChanType, c *Hchan, elem *byte) (received bool) { + chanrecv(t, c, elem, true, &received); +} + +// compiler implements +// +// select { +// case c <- v: +// ... foo +// default: +// ... bar +// } +// +// as +// +// if selectnbsend(c, v) { +// ... foo +// } else { +// ... bar +// } +// +#pragma textflag NOSPLIT +func selectnbsend(t *ChanType, c *Hchan, elem *byte) (selected bool) { + selected = chansend(t, c, elem, false, runtime·getcallerpc(&t)); +} + +// compiler implements +// +// select { +// case v = <-c: +// ... foo +// default: +// ... bar +// } +// +// as +// +// if selectnbrecv(&v, c) { +// ... foo +// } else { +// ... bar +// } +// +#pragma textflag NOSPLIT +func selectnbrecv(t *ChanType, elem *byte, c *Hchan) (selected bool) { + selected = chanrecv(t, c, elem, false, nil); +} + +// compiler implements +// +// select { +// case v, ok = <-c: +// ... foo +// default: +// ... bar +// } +// +// as +// +// if c != nil && selectnbrecv2(&v, &ok, c) { +// ... foo +// } else { +// ... bar +// } +// +#pragma textflag NOSPLIT +func selectnbrecv2(t *ChanType, elem *byte, received *bool, c *Hchan) (selected bool) { + selected = chanrecv(t, c, elem, false, received); +} + +#pragma textflag NOSPLIT +func reflect·chansend(t *ChanType, c *Hchan, elem *byte, nb bool) (selected bool) { + selected = chansend(t, c, elem, !nb, runtime·getcallerpc(&t)); +} + +func reflect·chanrecv(t *ChanType, c *Hchan, nb bool, elem *byte) (selected bool, received bool) { + received = false; + selected = chanrecv(t, c, elem, !nb, &received); +} + +static Select* newselect(int32); + +#pragma textflag NOSPLIT +func newselect(size int32) (sel *byte) { + sel = (byte*)newselect(size); +} + +static Select* +newselect(int32 size) +{ + int32 n; + Select *sel; + + n = 0; + if(size > 1) + n = size-1; + + // allocate all the memory we need in a single allocation + // start with Select with size cases + // then lockorder with size entries + // then pollorder with size entries + sel = runtime·mal(sizeof(*sel) + + n*sizeof(sel->scase[0]) + + size*sizeof(sel->lockorder[0]) + + size*sizeof(sel->pollorder[0])); + + sel->tcase = size; + sel->ncase = 0; + sel->lockorder = (void*)(sel->scase + size); + sel->pollorder = (void*)(sel->lockorder + size); + + if(debug) + runtime·printf("newselect s=%p size=%d\n", sel, size); + return sel; +} + +// cut in half to give stack a chance to split +static void selectsend(Select *sel, Hchan *c, void *pc, void *elem, int32 so); + +#pragma textflag NOSPLIT +func selectsend(sel *Select, c *Hchan, elem *byte) (selected bool) { + selected = false; + + // nil cases do not compete + if(c != nil) + selectsend(sel, c, runtime·getcallerpc(&sel), elem, (byte*)&selected - (byte*)&sel); +} + +static void +selectsend(Select *sel, Hchan *c, void *pc, void *elem, int32 so) +{ + int32 i; + Scase *cas; + + i = sel->ncase; + if(i >= sel->tcase) + runtime·throw("selectsend: too many cases"); + sel->ncase = i+1; + cas = &sel->scase[i]; + + cas->pc = pc; + cas->chan = c; + cas->so = so; + cas->kind = CaseSend; + cas->sg.elem = elem; + + if(debug) + runtime·printf("selectsend s=%p pc=%p chan=%p so=%d\n", + sel, cas->pc, cas->chan, cas->so); +} + +// cut in half to give stack a chance to split +static void selectrecv(Select *sel, Hchan *c, void *pc, void *elem, bool*, int32 so); + +#pragma textflag NOSPLIT +func selectrecv(sel *Select, c *Hchan, elem *byte) (selected bool) { + selected = false; + + // nil cases do not compete + if(c != nil) + selectrecv(sel, c, runtime·getcallerpc(&sel), elem, nil, (byte*)&selected - (byte*)&sel); +} + +#pragma textflag NOSPLIT +func selectrecv2(sel *Select, c *Hchan, elem *byte, received *bool) (selected bool) { + selected = false; + + // nil cases do not compete + if(c != nil) + selectrecv(sel, c, runtime·getcallerpc(&sel), elem, received, (byte*)&selected - (byte*)&sel); +} + +static void +selectrecv(Select *sel, Hchan *c, void *pc, void *elem, bool *received, int32 so) +{ + int32 i; + Scase *cas; + + i = sel->ncase; + if(i >= sel->tcase) + runtime·throw("selectrecv: too many cases"); + sel->ncase = i+1; + cas = &sel->scase[i]; + cas->pc = pc; + cas->chan = c; + + cas->so = so; + cas->kind = CaseRecv; + cas->sg.elem = elem; + cas->receivedp = received; + + if(debug) + runtime·printf("selectrecv s=%p pc=%p chan=%p so=%d\n", + sel, cas->pc, cas->chan, cas->so); +} + +// cut in half to give stack a chance to split +static void selectdefault(Select*, void*, int32); + +#pragma textflag NOSPLIT +func selectdefault(sel *Select) (selected bool) { + selected = false; + selectdefault(sel, runtime·getcallerpc(&sel), (byte*)&selected - (byte*)&sel); +} + +static void +selectdefault(Select *sel, void *callerpc, int32 so) +{ + int32 i; + Scase *cas; + + i = sel->ncase; + if(i >= sel->tcase) + runtime·throw("selectdefault: too many cases"); + sel->ncase = i+1; + cas = &sel->scase[i]; + cas->pc = callerpc; + cas->chan = nil; + + cas->so = so; + cas->kind = CaseDefault; + + if(debug) + runtime·printf("selectdefault s=%p pc=%p so=%d\n", + sel, cas->pc, cas->so); +} + +static void +sellock(Select *sel) +{ + uint32 i; + Hchan *c, *c0; + + c = nil; + for(i=0; i<sel->ncase; i++) { + c0 = sel->lockorder[i]; + if(c0 && c0 != c) { + c = sel->lockorder[i]; + runtime·lock(c); + } + } +} + +static void +selunlock(Select *sel) +{ + int32 i, n, r; + Hchan *c; + + // We must be very careful here to not touch sel after we have unlocked + // the last lock, because sel can be freed right after the last unlock. + // Consider the following situation. + // First M calls runtime·park() in runtime·selectgo() passing the sel. + // Once runtime·park() has unlocked the last lock, another M makes + // the G that calls select runnable again and schedules it for execution. + // When the G runs on another M, it locks all the locks and frees sel. + // Now if the first M touches sel, it will access freed memory. + n = (int32)sel->ncase; + r = 0; + // skip the default case + if(n>0 && sel->lockorder[0] == nil) + r = 1; + for(i = n-1; i >= r; i--) { + c = sel->lockorder[i]; + if(i>0 && sel->lockorder[i-1] == c) + continue; // will unlock it on the next iteration + runtime·unlock(c); + } +} + +static bool +selparkcommit(G *gp, void *sel) +{ + USED(gp); + selunlock(sel); + return true; +} + +func block() { + runtime·park(nil, nil, "select (no cases)"); // forever +} + +static void* selectgo(Select**); + +// selectgo(sel *byte); +// +// overwrites return pc on stack to signal which case of the select +// to run, so cannot appear at the top of a split stack. +#pragma textflag NOSPLIT +func selectgo(sel *Select) { + runtime·setcallerpc(&sel, selectgo(&sel)); +} + +static void* +selectgo(Select **selp) +{ + Select *sel; + uint32 o, i, j, k, done; + int64 t0; + Scase *cas, *dfl; + Hchan *c; + SudoG *sg; + G *gp; + byte *as; + void *pc; + + sel = *selp; + + if(debug) + runtime·printf("select: sel=%p\n", sel); + + t0 = 0; + if(runtime·blockprofilerate > 0) { + t0 = runtime·cputicks(); + for(i=0; i<sel->ncase; i++) + sel->scase[i].sg.releasetime = -1; + } + + // The compiler rewrites selects that statically have + // only 0 or 1 cases plus default into simpler constructs. + // The only way we can end up with such small sel->ncase + // values here is for a larger select in which most channels + // have been nilled out. The general code handles those + // cases correctly, and they are rare enough not to bother + // optimizing (and needing to test). + + // generate permuted order + for(i=0; i<sel->ncase; i++) + sel->pollorder[i] = i; + for(i=1; i<sel->ncase; i++) { + o = sel->pollorder[i]; + j = runtime·fastrand1()%(i+1); + sel->pollorder[i] = sel->pollorder[j]; + sel->pollorder[j] = o; + } + + // sort the cases by Hchan address to get the locking order. + // simple heap sort, to guarantee n log n time and constant stack footprint. + for(i=0; i<sel->ncase; i++) { + j = i; + c = sel->scase[j].chan; + while(j > 0 && sel->lockorder[k=(j-1)/2] < c) { + sel->lockorder[j] = sel->lockorder[k]; + j = k; + } + sel->lockorder[j] = c; + } + for(i=sel->ncase; i-->0; ) { + c = sel->lockorder[i]; + sel->lockorder[i] = sel->lockorder[0]; + j = 0; + for(;;) { + k = j*2+1; + if(k >= i) + break; + if(k+1 < i && sel->lockorder[k] < sel->lockorder[k+1]) + k++; + if(c < sel->lockorder[k]) { + sel->lockorder[j] = sel->lockorder[k]; + j = k; + continue; + } + break; + } + sel->lockorder[j] = c; + } + /* + for(i=0; i+1<sel->ncase; i++) + if(sel->lockorder[i] > sel->lockorder[i+1]) { + runtime·printf("i=%d %p %p\n", i, sel->lockorder[i], sel->lockorder[i+1]); + runtime·throw("select: broken sort"); + } + */ + sellock(sel); + +loop: + // pass 1 - look for something already waiting + dfl = nil; + for(i=0; i<sel->ncase; i++) { + o = sel->pollorder[i]; + cas = &sel->scase[o]; + c = cas->chan; + + switch(cas->kind) { + case CaseRecv: + if(c->dataqsiz > 0) { + if(c->qcount > 0) + goto asyncrecv; + } else { + sg = dequeue(&c->sendq); + if(sg != nil) + goto syncrecv; + } + if(c->closed) + goto rclose; + break; + + case CaseSend: + if(raceenabled) + runtime·racereadpc(c, cas->pc, chansend); + if(c->closed) + goto sclose; + if(c->dataqsiz > 0) { + if(c->qcount < c->dataqsiz) + goto asyncsend; + } else { + sg = dequeue(&c->recvq); + if(sg != nil) + goto syncsend; + } + break; + + case CaseDefault: + dfl = cas; + break; + } + } + + if(dfl != nil) { + selunlock(sel); + cas = dfl; + goto retc; + } + + + // pass 2 - enqueue on all chans + done = 0; + for(i=0; i<sel->ncase; i++) { + o = sel->pollorder[i]; + cas = &sel->scase[o]; + c = cas->chan; + sg = &cas->sg; + sg->g = g; + sg->selectdone = &done; + + switch(cas->kind) { + case CaseRecv: + enqueue(&c->recvq, sg); + break; + + case CaseSend: + enqueue(&c->sendq, sg); + break; + } + } + + g->param = nil; + runtime·park(selparkcommit, sel, "select"); + + sellock(sel); + sg = g->param; + + // pass 3 - dequeue from unsuccessful chans + // otherwise they stack up on quiet channels + for(i=0; i<sel->ncase; i++) { + cas = &sel->scase[i]; + if(cas != (Scase*)sg) { + c = cas->chan; + if(cas->kind == CaseSend) + dequeueg(&c->sendq); + else + dequeueg(&c->recvq); + } + } + + if(sg == nil) + goto loop; + + cas = (Scase*)sg; + c = cas->chan; + + if(c->dataqsiz > 0) + runtime·throw("selectgo: shouldn't happen"); + + if(debug) + runtime·printf("wait-return: sel=%p c=%p cas=%p kind=%d\n", + sel, c, cas, cas->kind); + + if(cas->kind == CaseRecv) { + if(cas->receivedp != nil) + *cas->receivedp = true; + } + + if(raceenabled) { + if(cas->kind == CaseRecv && cas->sg.elem != nil) + runtime·racewriteobjectpc(cas->sg.elem, c->elemtype, cas->pc, chanrecv); + else if(cas->kind == CaseSend) + runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, chansend); + } + + selunlock(sel); + goto retc; + +asyncrecv: + // can receive from buffer + if(raceenabled) { + if(cas->sg.elem != nil) + runtime·racewriteobjectpc(cas->sg.elem, c->elemtype, cas->pc, chanrecv); + runtime·raceacquire(chanbuf(c, c->recvx)); + runtime·racerelease(chanbuf(c, c->recvx)); + } + if(cas->receivedp != nil) + *cas->receivedp = true; + if(cas->sg.elem != nil) + c->elemtype->alg->copy(c->elemsize, cas->sg.elem, chanbuf(c, c->recvx)); + c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->recvx), nil); + if(++c->recvx == c->dataqsiz) + c->recvx = 0; + c->qcount--; + sg = dequeue(&c->sendq); + if(sg != nil) { + gp = sg->g; + selunlock(sel); + if(sg->releasetime) + sg->releasetime = runtime·cputicks(); + runtime·ready(gp); + } else { + selunlock(sel); + } + goto retc; + +asyncsend: + // can send to buffer + if(raceenabled) { + runtime·raceacquire(chanbuf(c, c->sendx)); + runtime·racerelease(chanbuf(c, c->sendx)); + runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, chansend); + } + c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->sendx), cas->sg.elem); + if(++c->sendx == c->dataqsiz) + c->sendx = 0; + c->qcount++; + sg = dequeue(&c->recvq); + if(sg != nil) { + gp = sg->g; + selunlock(sel); + if(sg->releasetime) + sg->releasetime = runtime·cputicks(); + runtime·ready(gp); + } else { + selunlock(sel); + } + goto retc; + +syncrecv: + // can receive from sleeping sender (sg) + if(raceenabled) { + if(cas->sg.elem != nil) + runtime·racewriteobjectpc(cas->sg.elem, c->elemtype, cas->pc, chanrecv); + racesync(c, sg); + } + selunlock(sel); + if(debug) + runtime·printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o); + if(cas->receivedp != nil) + *cas->receivedp = true; + if(cas->sg.elem != nil) + c->elemtype->alg->copy(c->elemsize, cas->sg.elem, sg->elem); + gp = sg->g; + gp->param = sg; + if(sg->releasetime) + sg->releasetime = runtime·cputicks(); + runtime·ready(gp); + goto retc; + +rclose: + // read at end of closed channel + selunlock(sel); + if(cas->receivedp != nil) + *cas->receivedp = false; + if(cas->sg.elem != nil) + c->elemtype->alg->copy(c->elemsize, cas->sg.elem, nil); + if(raceenabled) + runtime·raceacquire(c); + goto retc; + +syncsend: + // can send to sleeping receiver (sg) + if(raceenabled) { + runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, chansend); + racesync(c, sg); + } + selunlock(sel); + if(debug) + runtime·printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o); + if(sg->elem != nil) + c->elemtype->alg->copy(c->elemsize, sg->elem, cas->sg.elem); + gp = sg->g; + gp->param = sg; + if(sg->releasetime) + sg->releasetime = runtime·cputicks(); + runtime·ready(gp); + +retc: + // return pc corresponding to chosen case. + // Set boolean passed during select creation + // (at offset selp + cas->so) to true. + // If cas->so == 0, this is a reflect-driven select and we + // don't need to update the boolean. + pc = cas->pc; + if(cas->so > 0) { + as = (byte*)selp + cas->so; + *as = true; + } + if(cas->sg.releasetime > 0) + runtime·blockevent(cas->sg.releasetime - t0, 2); + runtime·free(sel); + return pc; + +sclose: + // send on closed channel + selunlock(sel); + runtime·panicstring("send on closed channel"); + return nil; // not reached +} + +// This struct must match ../reflect/value.go:/runtimeSelect. +typedef struct runtimeSelect runtimeSelect; +struct runtimeSelect +{ + uintptr dir; + ChanType *typ; + Hchan *ch; + byte *val; +}; + +// This enum must match ../reflect/value.go:/SelectDir. +enum SelectDir { + SelectSend = 1, + SelectRecv, + SelectDefault, +}; + +func reflect·rselect(cases Slice) (chosen int, recvOK bool) { + int32 i; + Select *sel; + runtimeSelect* rcase, *rc; + + chosen = -1; + recvOK = false; + + rcase = (runtimeSelect*)cases.array; + + sel = newselect(cases.len); + for(i=0; i<cases.len; i++) { + rc = &rcase[i]; + switch(rc->dir) { + case SelectDefault: + selectdefault(sel, (void*)i, 0); + break; + case SelectSend: + if(rc->ch == nil) + break; + selectsend(sel, rc->ch, (void*)i, rc->val, 0); + break; + case SelectRecv: + if(rc->ch == nil) + break; + selectrecv(sel, rc->ch, (void*)i, rc->val, &recvOK, 0); + break; + } + } + + chosen = (intgo)(uintptr)selectgo(&sel); +} + +static void closechan(Hchan *c, void *pc); + +#pragma textflag NOSPLIT +func closechan(c *Hchan) { + closechan(c, runtime·getcallerpc(&c)); +} + +#pragma textflag NOSPLIT +func reflect·chanclose(c *Hchan) { + closechan(c, runtime·getcallerpc(&c)); +} + +static void +closechan(Hchan *c, void *pc) +{ + SudoG *sg; + G* gp; + + if(c == nil) + runtime·panicstring("close of nil channel"); + + runtime·lock(c); + if(c->closed) { + runtime·unlock(c); + runtime·panicstring("close of closed channel"); + } + + if(raceenabled) { + runtime·racewritepc(c, pc, runtime·closechan); + runtime·racerelease(c); + } + + c->closed = true; + + // release all readers + for(;;) { + sg = dequeue(&c->recvq); + if(sg == nil) + break; + gp = sg->g; + gp->param = nil; + if(sg->releasetime) + sg->releasetime = runtime·cputicks(); + runtime·ready(gp); + } + + // release all writers + for(;;) { + sg = dequeue(&c->sendq); + if(sg == nil) + break; + gp = sg->g; + gp->param = nil; + if(sg->releasetime) + sg->releasetime = runtime·cputicks(); + runtime·ready(gp); + } + + runtime·unlock(c); +} + +func reflect·chanlen(c *Hchan) (len int) { + if(c == nil) + len = 0; + else + len = c->qcount; +} + +func reflect·chancap(c *Hchan) (cap int) { + if(c == nil) + cap = 0; + else + cap = c->dataqsiz; +} + +static SudoG* +dequeue(WaitQ *q) +{ + SudoG *sgp; + +loop: + sgp = q->first; + if(sgp == nil) + return nil; + q->first = sgp->link; + + // if sgp participates in a select and is already signaled, ignore it + if(sgp->selectdone != nil) { + // claim the right to signal + if(*sgp->selectdone != 0 || !runtime·cas(sgp->selectdone, 0, 1)) + goto loop; + } + + return sgp; +} + +static void +dequeueg(WaitQ *q) +{ + SudoG **l, *sgp, *prevsgp; + + prevsgp = nil; + for(l=&q->first; (sgp=*l) != nil; l=&sgp->link, prevsgp=sgp) { + if(sgp->g == g) { + *l = sgp->link; + if(q->last == sgp) + q->last = prevsgp; + break; + } + } +} + +static void +enqueue(WaitQ *q, SudoG *sgp) +{ + sgp->link = nil; + if(q->first == nil) { + q->first = sgp; + q->last = sgp; + return; + } + q->last->link = sgp; + q->last = sgp; +} + +static void +racesync(Hchan *c, SudoG *sg) +{ + runtime·racerelease(chanbuf(c, 0)); + runtime·raceacquireg(sg->g, chanbuf(c, 0)); + runtime·racereleaseg(sg->g, chanbuf(c, 0)); + runtime·raceacquire(chanbuf(c, 0)); +} |