diff options
Diffstat (limited to 'src/pkg/runtime/chan.c')
-rw-r--r-- | src/pkg/runtime/chan.c | 316 |
1 files changed, 252 insertions, 64 deletions
diff --git a/src/pkg/runtime/chan.c b/src/pkg/runtime/chan.c index ef27144ef..32995c6dd 100644 --- a/src/pkg/runtime/chan.c +++ b/src/pkg/runtime/chan.c @@ -3,7 +3,10 @@ // license that can be found in the LICENSE file. #include "runtime.h" +#include "arch_GOARCH.h" #include "type.h" +#include "race.h" +#include "malloc.h" #define MAXALIGN 7 #define NOSELGEN 1 @@ -20,6 +23,7 @@ struct SudoG G* g; // g and selgen constitute uint32 selgen; // a weak pointer to g SudoG* link; + int64 releasetime; byte* elem; // data element }; @@ -29,21 +33,25 @@ struct WaitQ SudoG* last; }; +// The garbage collector is assuming that Hchan can only contain pointers into the stack +// and cannot contain pointers into the heap. struct Hchan { - uint32 qcount; // total data in the q - uint32 dataqsiz; // size of the circular q + uintgo qcount; // total data in the q + uintgo dataqsiz; // size of the circular q uint16 elemsize; bool closed; uint8 elemalign; Alg* elemalg; // interface for element type - uint32 sendx; // send index - uint32 recvx; // receive index + uintgo sendx; // send index + uintgo recvx; // receive index WaitQ recvq; // list of recv waiters WaitQ sendq; // list of send waiters Lock; }; +uint32 runtime·Hchansize = sizeof(Hchan); + // Buffer follows Hchan immediately in memory. // chanbuf(c, i) is pointer to the i'th slot in the buffer. #define chanbuf(c, i) ((byte*)((c)+1)+(uintptr)(c)->elemsize*(i)) @@ -79,17 +87,22 @@ static void dequeueg(WaitQ*); static SudoG* dequeue(WaitQ*); static void enqueue(WaitQ*, SudoG*); static void destroychan(Hchan*); +static void racesync(Hchan*, SudoG*); Hchan* runtime·makechan_c(ChanType *t, int64 hint) { Hchan *c; - int32 n; + uintptr n; Type *elem; - + elem = t->elem; - if(hint < 0 || (int32)hint != hint || (elem->size > 0 && hint > ((uintptr)-1) / elem->size)) + // compiler checks this but be safe. + if(elem->size >= (1<<16)) + runtime·throw("makechan: invalid channel element type"); + + if(hint < 0 || (intgo)hint != hint || (elem->size > 0 && hint > MaxMem / elem->size)) runtime·panicstring("makechan: size out of range"); // calculate rounded size of Hchan @@ -103,18 +116,19 @@ runtime·makechan_c(ChanType *t, int64 hint) c->elemalg = elem->alg; c->elemalign = elem->align; c->dataqsiz = hint; + runtime·settype(c, (uintptr)t | TypeInfo_Chan); if(debug) - runtime·printf("makechan: chan=%p; elemsize=%D; elemalg=%p; elemalign=%d; dataqsiz=%d\n", - c, (int64)elem->size, elem->alg, elem->align, c->dataqsiz); + runtime·printf("makechan: chan=%p; elemsize=%D; elemalg=%p; elemalign=%d; dataqsiz=%D\n", + c, (int64)elem->size, elem->alg, elem->align, (int64)c->dataqsiz); return c; } // For reflect -// func makechan(typ *ChanType, size uint32) (chan) +// func makechan(typ *ChanType, size uint64) (chan) void -reflect·makechan(ChanType *t, uint32 size, Hchan *c) +reflect·makechan(ChanType *t, uint64 size, Hchan *c) { c = runtime·makechan_c(t, size); FLUSH(&c); @@ -143,11 +157,12 @@ runtime·makechan(ChanType *t, int64 hint, Hchan *ret) * the operation; we'll see that it's now closed. */ void -runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres) +runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc) { SudoG *sg; SudoG mysg; G* gp; + int64 t0; if(c == nil) { USED(t); @@ -155,9 +170,7 @@ runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres) *pres = false; return; } - g->status = Gwaiting; - g->waitreason = "chan send (nil chan)"; - runtime·gosched(); + runtime·park(nil, nil, "chan send (nil chan)"); return; // not reached } @@ -170,7 +183,17 @@ runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres) runtime·prints("\n"); } + t0 = 0; + mysg.releasetime = 0; + if(runtime·blockprofilerate > 0) { + t0 = runtime·cputicks(); + mysg.releasetime = -1; + } + runtime·lock(c); + // TODO(dvyukov): add similar instrumentation to select. + if(raceenabled) + runtime·racereadpc(c, pc, runtime·chansend); if(c->closed) goto closed; @@ -179,12 +202,16 @@ runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres) sg = dequeue(&c->recvq); if(sg != nil) { + if(raceenabled) + racesync(c, sg); runtime·unlock(c); - + gp = sg->g; gp->param = sg; if(sg->elem != nil) c->elemalg->copy(c->elemsize, sg->elem, ep); + if(sg->releasetime) + sg->releasetime = runtime·cputicks(); runtime·ready(gp); if(pres != nil) @@ -202,11 +229,8 @@ runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres) mysg.g = g; mysg.selgen = NOSELGEN; g->param = nil; - g->status = Gwaiting; - g->waitreason = "chan send"; enqueue(&c->sendq, &mysg); - runtime·unlock(c); - runtime·gosched(); + runtime·park(runtime·unlock, c, "chan send"); if(g->param == nil) { runtime·lock(c); @@ -215,6 +239,9 @@ runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres) goto closed; } + if(mysg.releasetime > 0) + runtime·blockevent(mysg.releasetime - t0, 2); + return; asynch: @@ -230,15 +257,16 @@ asynch: mysg.g = g; mysg.elem = nil; mysg.selgen = NOSELGEN; - g->status = Gwaiting; - g->waitreason = "chan send"; enqueue(&c->sendq, &mysg); - runtime·unlock(c); - runtime·gosched(); + runtime·park(runtime·unlock, c, "chan send"); runtime·lock(c); goto asynch; } + + if(raceenabled) + runtime·racerelease(chanbuf(c, c->sendx)); + c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep); if(++c->sendx == c->dataqsiz) c->sendx = 0; @@ -248,11 +276,15 @@ asynch: if(sg != nil) { gp = sg->g; runtime·unlock(c); + if(sg->releasetime) + sg->releasetime = runtime·cputicks(); runtime·ready(gp); } else runtime·unlock(c); if(pres != nil) *pres = true; + if(mysg.releasetime > 0) + runtime·blockevent(mysg.releasetime - t0, 2); return; closed: @@ -267,6 +299,7 @@ runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *receive SudoG *sg; SudoG mysg; G *gp; + int64 t0; if(runtime·gcwaiting) runtime·gosched(); @@ -280,12 +313,17 @@ runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *receive *selected = false; return; } - g->status = Gwaiting; - g->waitreason = "chan receive (nil chan)"; - runtime·gosched(); + runtime·park(nil, nil, "chan receive (nil chan)"); return; // not reached } + t0 = 0; + mysg.releasetime = 0; + if(runtime·blockprofilerate > 0) { + t0 = runtime·cputicks(); + mysg.releasetime = -1; + } + runtime·lock(c); if(c->dataqsiz > 0) goto asynch; @@ -295,12 +333,16 @@ runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *receive sg = dequeue(&c->sendq); if(sg != nil) { + if(raceenabled) + racesync(c, sg); runtime·unlock(c); if(ep != nil) c->elemalg->copy(c->elemsize, ep, sg->elem); gp = sg->g; gp->param = sg; + if(sg->releasetime) + sg->releasetime = runtime·cputicks(); runtime·ready(gp); if(selected != nil) @@ -320,11 +362,8 @@ runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *receive mysg.g = g; mysg.selgen = NOSELGEN; g->param = nil; - g->status = Gwaiting; - g->waitreason = "chan receive"; enqueue(&c->recvq, &mysg); - runtime·unlock(c); - runtime·gosched(); + runtime·park(runtime·unlock, c, "chan receive"); if(g->param == nil) { runtime·lock(c); @@ -335,6 +374,8 @@ runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *receive if(received != nil) *received = true; + if(mysg.releasetime > 0) + runtime·blockevent(mysg.releasetime - t0, 2); return; asynch: @@ -352,15 +393,16 @@ asynch: mysg.g = g; mysg.elem = nil; mysg.selgen = NOSELGEN; - g->status = Gwaiting; - g->waitreason = "chan receive"; enqueue(&c->recvq, &mysg); - runtime·unlock(c); - runtime·gosched(); + runtime·park(runtime·unlock, c, "chan receive"); runtime·lock(c); goto asynch; } + + if(raceenabled) + runtime·raceacquire(chanbuf(c, c->recvx)); + if(ep != nil) c->elemalg->copy(c->elemsize, ep, chanbuf(c, c->recvx)); c->elemalg->copy(c->elemsize, chanbuf(c, c->recvx), nil); @@ -372,6 +414,8 @@ asynch: if(sg != nil) { gp = sg->g; runtime·unlock(c); + if(sg->releasetime) + sg->releasetime = runtime·cputicks(); runtime·ready(gp); } else runtime·unlock(c); @@ -380,6 +424,8 @@ asynch: *selected = true; if(received != nil) *received = true; + if(mysg.releasetime > 0) + runtime·blockevent(mysg.releasetime - t0, 2); return; closed: @@ -389,7 +435,11 @@ closed: *selected = true; if(received != nil) *received = false; + if(raceenabled) + runtime·raceacquire(c); runtime·unlock(c); + if(mysg.releasetime > 0) + runtime·blockevent(mysg.releasetime - t0, 2); } // chansend1(hchan *chan any, elem any); @@ -397,7 +447,7 @@ closed: void runtime·chansend1(ChanType *t, Hchan* c, ...) { - runtime·chansend(t, c, (byte*)(&c+1), nil); + runtime·chansend(t, c, (byte*)(&c+1), nil, runtime·getcallerpc(&t)); } // chanrecv1(hchan *chan any) (elem any); @@ -446,8 +496,8 @@ runtime·selectnbsend(ChanType *t, Hchan *c, ...) byte *ae, *ap; ae = (byte*)(&c + 1); - ap = ae + runtime·rnd(t->elem->size, Structrnd); - runtime·chansend(t, c, ae, ap); + ap = ae + ROUND(t->elem->size, Structrnd); + runtime·chansend(t, c, ae, ap, runtime·getcallerpc(&t)); } // func selectnbrecv(elem *any, c chan any) bool @@ -474,7 +524,7 @@ void runtime·selectnbrecv(ChanType *t, byte *v, Hchan *c, bool selected) { runtime·chanrecv(t, c, v, &selected, nil); -} +} // func selectnbrecv2(elem *any, ok *bool, c chan any) bool // @@ -500,7 +550,7 @@ void runtime·selectnbrecv2(ChanType *t, byte *v, bool *received, Hchan *c, bool selected) { runtime·chanrecv(t, c, v, &selected, received); -} +} // For reflect: // func chansend(c chan, val iword, nb bool) (selected bool) @@ -509,12 +559,13 @@ runtime·selectnbrecv2(ChanType *t, byte *v, bool *received, Hchan *c, bool sele // // The "uintptr selected" is really "bool selected" but saying // uintptr gets us the right alignment for the output parameter block. +#pragma textflag 7 void reflect·chansend(ChanType *t, Hchan *c, uintptr val, bool nb, uintptr selected) { bool *sp; byte *vp; - + if(nb) { selected = false; sp = (bool*)&selected; @@ -527,7 +578,7 @@ reflect·chansend(ChanType *t, Hchan *c, uintptr val, bool nb, uintptr selected) vp = (byte*)&val; else vp = (byte*)val; - runtime·chansend(t, c, vp, sp); + runtime·chansend(t, c, vp, sp, runtime·getcallerpc(&t)); } // For reflect: @@ -571,7 +622,7 @@ runtime·newselect(int32 size, ...) int32 o; Select **selp; - o = runtime·rnd(sizeof(size), Structrnd); + o = ROUND(sizeof(size), Structrnd); selp = (Select**)((byte*)&size + o); newselect(size, selp); } @@ -619,7 +670,7 @@ runtime·selectsend(Select *sel, Hchan *c, void *elem, bool selected) // nil cases do not compete if(c == nil) return; - + selectsend(sel, c, runtime·getcallerpc(&sel), elem, (byte*)&selected - (byte*)&sel); } @@ -628,7 +679,7 @@ selectsend(Select *sel, Hchan *c, void *pc, void *elem, int32 so) { int32 i; Scase *cas; - + i = sel->ncase; if(i >= sel->tcase) runtime·throw("selectsend: too many cases"); @@ -774,9 +825,7 @@ selunlock(Select *sel) void runtime·block(void) { - g->status = Gwaiting; // forever - g->waitreason = "select (no cases)"; - runtime·gosched(); + runtime·park(nil, nil, "select (no cases)"); // forever } static void* selectgo(Select**); @@ -796,7 +845,7 @@ static void* selectgo(Select **selp) { Select *sel; - uint32 o, i, j; + uint32 o, i, j, k; Scase *cas, *dfl; Hchan *c; SudoG *sg; @@ -830,12 +879,42 @@ selectgo(Select **selp) } // sort the cases by Hchan address to get the locking order. + // simple heap sort, to guarantee n log n time and constant stack footprint. for(i=0; i<sel->ncase; i++) { - c = sel->scase[i].chan; - for(j=i; j>0 && sel->lockorder[j-1] >= c; j--) - sel->lockorder[j] = sel->lockorder[j-1]; + j = i; + c = sel->scase[j].chan; + while(j > 0 && sel->lockorder[k=(j-1)/2] < c) { + sel->lockorder[j] = sel->lockorder[k]; + j = k; + } + sel->lockorder[j] = c; + } + for(i=sel->ncase; i-->0; ) { + c = sel->lockorder[i]; + sel->lockorder[i] = sel->lockorder[0]; + j = 0; + for(;;) { + k = j*2+1; + if(k >= i) + break; + if(k+1 < i && sel->lockorder[k] < sel->lockorder[k+1]) + k++; + if(c < sel->lockorder[k]) { + sel->lockorder[j] = sel->lockorder[k]; + j = k; + continue; + } + break; + } sel->lockorder[j] = c; } + /* + for(i=0; i+1<sel->ncase; i++) + if(sel->lockorder[i] > sel->lockorder[i+1]) { + runtime·printf("i=%d %p %p\n", i, sel->lockorder[i], sel->lockorder[i+1]); + runtime·throw("select: broken sort"); + } + */ sellock(sel); loop: @@ -899,7 +978,7 @@ loop: case CaseRecv: enqueue(&c->recvq, sg); break; - + case CaseSend: enqueue(&c->sendq, sg); break; @@ -907,10 +986,7 @@ loop: } g->param = nil; - g->status = Gwaiting; - g->waitreason = "select"; - selunlock(sel); - runtime·gosched(); + runtime·park((void(*)(Lock*))selunlock, (Lock*)sel, "select"); sellock(sel); sg = g->param; @@ -951,6 +1027,8 @@ loop: asyncrecv: // can receive from buffer + if(raceenabled) + runtime·raceacquire(chanbuf(c, c->recvx)); if(cas->receivedp != nil) *cas->receivedp = true; if(cas->sg.elem != nil) @@ -971,6 +1049,8 @@ asyncrecv: asyncsend: // can send to buffer + if(raceenabled) + runtime·racerelease(chanbuf(c, c->sendx)); c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), cas->sg.elem); if(++c->sendx == c->dataqsiz) c->sendx = 0; @@ -987,6 +1067,8 @@ asyncsend: syncrecv: // can receive from sleeping sender (sg) + if(raceenabled) + racesync(c, sg); selunlock(sel); if(debug) runtime·printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o); @@ -1006,10 +1088,14 @@ rclose: *cas->receivedp = false; if(cas->sg.elem != nil) c->elemalg->copy(c->elemsize, cas->sg.elem, nil); + if(raceenabled) + runtime·raceacquire(c); goto retc; syncsend: // can send to sleeping receiver (sg) + if(raceenabled) + racesync(c, sg); selunlock(sel); if(debug) runtime·printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o); @@ -1020,11 +1106,17 @@ syncsend: runtime·ready(gp); retc: - // return to pc corresponding to chosen case + // return pc corresponding to chosen case. + // Set boolean passed during select creation + // (at offset selp + cas->so) to true. + // If cas->so == 0, this is a reflect-driven select and we + // don't need to update the boolean. pc = cas->pc; - as = (byte*)selp + cas->so; + if(cas->so > 0) { + as = (byte*)selp + cas->so; + *as = true; + } runtime·free(sel); - *as = true; return pc; sclose: @@ -1034,7 +1126,89 @@ sclose: return nil; // not reached } +// This struct must match ../reflect/value.go:/runtimeSelect. +typedef struct runtimeSelect runtimeSelect; +struct runtimeSelect +{ + uintptr dir; + ChanType *typ; + Hchan *ch; + uintptr val; +}; + +// This enum must match ../reflect/value.go:/SelectDir. +enum SelectDir { + SelectSend = 1, + SelectRecv, + SelectDefault, +}; + +// func rselect(cases []runtimeSelect) (chosen int, word uintptr, recvOK bool) +void +reflect·rselect(Slice cases, intgo chosen, uintptr word, bool recvOK) +{ + int32 i; + Select *sel; + runtimeSelect* rcase, *rc; + void *elem; + void *recvptr; + uintptr maxsize; + + chosen = -1; + word = 0; + recvOK = false; + + maxsize = 0; + rcase = (runtimeSelect*)cases.array; + for(i=0; i<cases.len; i++) { + rc = &rcase[i]; + if(rc->dir == SelectRecv && rc->ch != nil && maxsize < rc->typ->elem->size) + maxsize = rc->typ->elem->size; + } + + recvptr = nil; + if(maxsize > sizeof(void*)) + recvptr = runtime·mal(maxsize); + + newselect(cases.len, &sel); + for(i=0; i<cases.len; i++) { + rc = &rcase[i]; + switch(rc->dir) { + case SelectDefault: + selectdefault(sel, (void*)i, 0); + break; + case SelectSend: + if(rc->ch == nil) + break; + if(rc->typ->elem->size > sizeof(void*)) + elem = (void*)rc->val; + else + elem = (void*)&rc->val; + selectsend(sel, rc->ch, (void*)i, elem, 0); + break; + case SelectRecv: + if(rc->ch == nil) + break; + if(rc->typ->elem->size > sizeof(void*)) + elem = recvptr; + else + elem = &word; + selectrecv(sel, rc->ch, (void*)i, elem, &recvOK, 0); + break; + } + } + + chosen = (intgo)(uintptr)selectgo(&sel); + if(rcase[chosen].dir == SelectRecv && rcase[chosen].typ->elem->size > sizeof(void*)) + word = (uintptr)recvptr; + + FLUSH(&chosen); + FLUSH(&word); + FLUSH(&recvOK); +} + // closechan(sel *byte); +#pragma textflag 7 void runtime·closechan(Hchan *c) { @@ -1053,6 +1227,11 @@ runtime·closechan(Hchan *c) runtime·panicstring("close of closed channel"); } + if(raceenabled) { + runtime·racewritepc(c, runtime·getcallerpc(&c), runtime·closechan); + runtime·racerelease(c); + } + c->closed = true; // release all readers @@ -1087,9 +1266,9 @@ reflect·chanclose(Hchan *c) } // For reflect -// func chanlen(c chan) (len int32) +// func chanlen(c chan) (len int) void -reflect·chanlen(Hchan *c, int32 len) +reflect·chanlen(Hchan *c, intgo len) { if(c == nil) len = 0; @@ -1099,9 +1278,9 @@ reflect·chanlen(Hchan *c, int32 len) } // For reflect -// func chancap(c chan) (cap int32) +// func chancap(c chan) int void -reflect·chancap(Hchan *c, int32 cap) +reflect·chancap(Hchan *c, intgo cap) { if(c == nil) cap = 0; @@ -1160,3 +1339,12 @@ enqueue(WaitQ *q, SudoG *sgp) q->last->link = sgp; q->last = sgp; } + +static void +racesync(Hchan *c, SudoG *sg) +{ + runtime·racerelease(chanbuf(c, 0)); + runtime·raceacquireg(sg->g, chanbuf(c, 0)); + runtime·racereleaseg(sg->g, chanbuf(c, 0)); + runtime·raceacquire(chanbuf(c, 0)); +} |