diff options
Diffstat (limited to 'src/pkg/runtime/chan.c')
-rw-r--r-- | src/pkg/runtime/chan.c | 260 |
1 files changed, 134 insertions, 126 deletions
diff --git a/src/pkg/runtime/chan.c b/src/pkg/runtime/chan.c index 94ea513e7..8d3ac2ca4 100644 --- a/src/pkg/runtime/chan.c +++ b/src/pkg/runtime/chan.c @@ -11,8 +11,6 @@ enum { Wclosed = 0x0001, // writer has closed Rclosed = 0x0002, // reader has seen close - Eincr = 0x0004, // increment errors - Emax = 0x0800, // error limit before throw }; typedef struct Link Link; @@ -76,6 +74,7 @@ struct Select uint16 tcase; // total count of scase[] uint16 ncase; // currently filled scase[] Select* link; // for freelist + uint16* order; Scase* scase[1]; // one per case }; @@ -84,9 +83,7 @@ static SudoG* dequeue(WaitQ*, Hchan*); static void enqueue(WaitQ*, SudoG*); static SudoG* allocsg(Hchan*); static void freesg(Hchan*, SudoG*); -static uint32 gcd(uint32, uint32); -static uint32 fastrand1(void); -static uint32 fastrand2(void); +static uint32 fastrandn(uint32); static void destroychan(Hchan*); Hchan* @@ -152,16 +149,6 @@ runtime·makechan(Type *elem, int64 hint, Hchan *ret) FLUSH(&ret); } -static void -incerr(Hchan* c) -{ - c->closed += Eincr; - if(c->closed & Emax) { - // Note that channel locks may still be held at this point. - runtime·throw("too many operations on a closed channel"); - } -} - /* * generic single channel send/recv * if the bool pointer is nil, @@ -277,14 +264,12 @@ asynch: return; closed: - incerr(c); - if(pres != nil) - *pres = true; runtime·unlock(c); + runtime·panicstring("send on closed channel"); } void -runtime·chanrecv(Hchan* c, byte *ep, bool* pres) +runtime·chanrecv(Hchan* c, byte *ep, bool *pres, bool *closed) { SudoG *sg; G *gp; @@ -299,6 +284,9 @@ runtime·chanrecv(Hchan* c, byte *ep, bool* pres) runtime·printf("chanrecv: chan=%p\n", c); runtime·lock(c); + if(closed != nil) + *closed = false; + loop: if(c->dataqsiz > 0) goto asynch; @@ -308,7 +296,8 @@ loop: sg = dequeue(&c->sendq, c); if(sg != nil) { - c->elemalg->copy(c->elemsize, ep, sg->elem); + if(ep != nil) + c->elemalg->copy(c->elemsize, ep, sg->elem); c->elemalg->copy(c->elemsize, sg->elem, nil); gp = sg->g; @@ -323,7 +312,6 @@ loop: if(pres != nil) { runtime·unlock(c); - c->elemalg->copy(c->elemsize, ep, nil); *pres = false; return; } @@ -340,7 +328,8 @@ loop: if(sg == nil) goto loop; - c->elemalg->copy(c->elemsize, ep, sg->elem); + if(ep != nil) + c->elemalg->copy(c->elemsize, ep, sg->elem); c->elemalg->copy(c->elemsize, sg->elem, nil); freesg(c, sg); runtime·unlock(c); @@ -353,7 +342,6 @@ asynch: if(pres != nil) { runtime·unlock(c); - c->elemalg->copy(c->elemsize, ep, nil); *pres = false; return; } @@ -366,7 +354,8 @@ asynch: runtime·lock(c); goto asynch; } - c->elemalg->copy(c->elemsize, ep, c->recvdataq->elem); + if(ep != nil) + c->elemalg->copy(c->elemsize, ep, c->recvdataq->elem); c->elemalg->copy(c->elemsize, c->recvdataq->elem, nil); c->recvdataq = c->recvdataq->link; c->qcount--; @@ -387,9 +376,11 @@ asynch: return; closed: - c->elemalg->copy(c->elemsize, ep, nil); + if(closed != nil) + *closed = true; + if(ep != nil) + c->elemalg->copy(c->elemsize, ep, nil); c->closed |= Rclosed; - incerr(c); if(pres != nil) *pres = true; runtime·unlock(c); @@ -411,55 +402,99 @@ runtime·chansend1(Hchan* c, ...) runtime·chansend(c, ae, nil); } -// chansend2(hchan *chan any, elem any) (pres bool); +// chanrecv1(hchan *chan any) (elem any); #pragma textflag 7 void -runtime·chansend2(Hchan* c, ...) +runtime·chanrecv1(Hchan* c, ...) { int32 o; - byte *ae, *ap; - - if(c == nil) - runtime·panicstring("send to nil channel"); + byte *ae; - o = runtime·rnd(sizeof(c), c->elemalign); + o = runtime·rnd(sizeof(c), Structrnd); ae = (byte*)&c + o; - o = runtime·rnd(o+c->elemsize, Structrnd); - ap = (byte*)&c + o; - runtime·chansend(c, ae, ap); + runtime·chanrecv(c, ae, nil, nil); } -// chanrecv1(hchan *chan any) (elem any); +// chanrecv3(hchan *chan any) (elem any, closed bool); #pragma textflag 7 void -runtime·chanrecv1(Hchan* c, ...) +runtime·chanrecv3(Hchan* c, ...) { int32 o; - byte *ae; + byte *ae, *ac; + + if(c == nil) + runtime·panicstring("range over nil channel"); o = runtime·rnd(sizeof(c), Structrnd); ae = (byte*)&c + o; + o = runtime·rnd(o+c->elemsize, 1); + ac = (byte*)&c + o; - runtime·chanrecv(c, ae, nil); + runtime·chanrecv(c, ae, nil, ac); } -// chanrecv2(hchan *chan any) (elem any, pres bool); +// func selectnbsend(c chan any, elem any) bool +// +// compiler implements +// +// select { +// case c <- v: +// ... foo +// default: +// ... bar +// } +// +// as +// +// if c != nil && selectnbsend(c, v) { +// ... foo +// } else { +// ... bar +// } +// #pragma textflag 7 void -runtime·chanrecv2(Hchan* c, ...) +runtime·selectnbsend(Hchan *c, ...) { int32 o; byte *ae, *ap; - o = runtime·rnd(sizeof(c), Structrnd); + o = runtime·rnd(sizeof(c), c->elemalign); ae = (byte*)&c + o; - o = runtime·rnd(o+c->elemsize, 1); + o = runtime·rnd(o+c->elemsize, Structrnd); ap = (byte*)&c + o; - runtime·chanrecv(c, ae, ap); + runtime·chansend(c, ae, ap); } +// func selectnbrecv(elem *any, c chan any) bool +// +// compiler implements +// +// select { +// case v = <-c: +// ... foo +// default: +// ... bar +// } +// +// as +// +// if c != nil && selectnbrecv(&v, c) { +// ... foo +// } else { +// ... bar +// } +// +#pragma textflag 7 +void +runtime·selectnbrecv(byte *v, Hchan *c, bool ok) +{ + runtime·chanrecv(c, v, &ok, nil); +} + // newselect(size uint32) (sel *byte); #pragma textflag 7 void @@ -475,10 +510,11 @@ runtime·newselect(int32 size, ...) if(size > 1) n = size-1; - sel = runtime·mal(sizeof(*sel) + n*sizeof(sel->scase[0])); + sel = runtime·mal(sizeof(*sel) + n*sizeof(sel->scase[0]) + size*sizeof(sel->order[0])); sel->tcase = size; sel->ncase = 0; + sel->order = (void*)(sel->scase + size); *selp = sel; if(debug) runtime·printf("newselect s=%p size=%d\n", sel, size); @@ -619,6 +655,13 @@ selunlock(Select *sel) } } +void +runtime·block(void) +{ + g->status = Gwaiting; // forever + runtime·gosched(); +} + // selectgo(sel *byte); // // overwrites return pc on stack to signal which case of the select @@ -629,7 +672,7 @@ selunlock(Select *sel) void runtime·selectgo(Select *sel) { - uint32 p, o, i, j; + uint32 o, i, j; Scase *cas, *dfl; Hchan *c; SudoG *sg; @@ -642,29 +685,24 @@ runtime·selectgo(Select *sel) if(debug) runtime·printf("select: sel=%p\n", sel); - if(sel->ncase < 2) { - if(sel->ncase < 1) { - g->status = Gwaiting; // forever - runtime·gosched(); - } - // TODO: make special case of one. - } + // The compiler rewrites selects that statically have + // only 0 or 1 cases plus default into simpler constructs. + // The only way we can end up with such small sel->ncase + // values here is for a larger select in which most channels + // have been nilled out. The general code handles those + // cases correctly, and they are rare enough not to bother + // optimizing (and needing to test). - // select a (relative) prime - for(i=0;; i++) { - p = fastrand1(); - if(gcd(p, sel->ncase) == 1) - break; - if(i > 1000) - runtime·throw("select: failed to select prime"); + // generate permuted order + for(i=0; i<sel->ncase; i++) + sel->order[i] = i; + for(i=1; i<sel->ncase; i++) { + o = sel->order[i]; + j = fastrandn(i+1); + sel->order[i] = sel->order[j]; + sel->order[j] = o; } - // select an initial offset - o = fastrand2(); - - p %= sel->ncase; - o %= sel->ncase; - // sort the cases by Hchan address to get the locking order. for(i=1; i<sel->ncase; i++) { cas = sel->scase[i]; @@ -672,13 +710,13 @@ runtime·selectgo(Select *sel) sel->scase[j] = sel->scase[j-1]; sel->scase[j] = cas; } - sellock(sel); loop: // pass 1 - look for something already waiting dfl = nil; for(i=0; i<sel->ncase; i++) { + o = sel->order[i]; cas = sel->scase[o]; c = cas->chan; @@ -713,10 +751,6 @@ loop: dfl = cas; break; } - - o += p; - if(o >= sel->ncase) - o -= sel->ncase; } if(dfl != nil) { @@ -727,6 +761,7 @@ loop: // pass 2 - enqueue on all chans for(i=0; i<sel->ncase; i++) { + o = sel->order[i]; cas = sel->scase[o]; c = cas->chan; sg = allocsg(c); @@ -734,32 +769,15 @@ loop: switch(cas->send) { case 0: // recv - if(c->dataqsiz > 0) { - if(c->qcount > 0) - runtime·throw("select: pass 2 async recv"); - } else { - if(dequeue(&c->sendq, c)) - runtime·throw("select: pass 2 sync recv"); - } enqueue(&c->recvq, sg); break; case 1: // send - if(c->dataqsiz > 0) { - if(c->qcount < c->dataqsiz) - runtime·throw("select: pass 2 async send"); - } else { - if(dequeue(&c->recvq, c)) - runtime·throw("select: pass 2 sync send"); + if(c->dataqsiz == 0) c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem); - } enqueue(&c->sendq, sg); break; } - - o += p; - if(o >= sel->ncase) - o -= sel->ncase; } g->param = nil; @@ -773,18 +791,14 @@ loop: // pass 3 - dequeue from unsuccessful chans // otherwise they stack up on quiet channels for(i=0; i<sel->ncase; i++) { - if(sg == nil || o != sg->offset) { - cas = sel->scase[o]; + if(sg == nil || i != sg->offset) { + cas = sel->scase[i]; c = cas->chan; if(cas->send) dequeueg(&c->sendq, c); else dequeueg(&c->recvq, c); } - - o += p; - if(o >= sel->ncase) - o -= sel->ncase; } if(sg == nil) @@ -858,7 +872,6 @@ rclose: if(cas->u.elemp != nil) c->elemalg->copy(c->elemsize, cas->u.elemp, nil); c->closed |= Rclosed; - incerr(c); goto retc; syncsend: @@ -871,12 +884,6 @@ syncsend: gp = sg->g; gp->param = sg; runtime·ready(gp); - goto retc; - -sclose: - // send on closed channel - incerr(c); - goto retc; retc: selunlock(sel); @@ -886,6 +893,12 @@ retc: as = (byte*)&sel + cas->so; freesel(sel); *as = true; + return; + +sclose: + // send on closed channel + selunlock(sel); + runtime·panicstring("send on closed channel"); } // closechan(sel *byte); @@ -899,7 +912,11 @@ runtime·closechan(Hchan *c) runtime·gosched(); runtime·lock(c); - incerr(c); + if(c->closed & Wclosed) { + runtime·unlock(c); + runtime·panicstring("close of closed channel"); + } + c->closed |= Wclosed; // release all readers @@ -1039,22 +1056,6 @@ freesg(Hchan *c, SudoG *sg) } static uint32 -gcd(uint32 u, uint32 v) -{ - for(;;) { - if(u > v) { - if(v == 0) - return u; - u = u%v; - continue; - } - if(u == 0) - return v; - v = v%u; - } -} - -static uint32 fastrand1(void) { static uint32 x = 0x49f6428aUL; @@ -1066,12 +1067,19 @@ fastrand1(void) } static uint32 -fastrand2(void) +fastrandn(uint32 n) { - static uint32 x = 0x49f6428aUL; + uint32 max, r; - x += x; - if(x & 0x80000000L) - x ^= 0xfafd871bUL; - return x; + if(n <= 1) + return 0; + + r = fastrand1(); + if(r < (1ULL<<31)-n) // avoid computing max in common case + return r%n; + + max = (1ULL<<31)/n * n; + while(r >= max) + r = fastrand1(); + return r%n; } |