summaryrefslogtreecommitdiff
path: root/src/pkg/runtime/chan.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pkg/runtime/chan.c')
-rw-r--r--src/pkg/runtime/chan.c316
1 files changed, 252 insertions, 64 deletions
diff --git a/src/pkg/runtime/chan.c b/src/pkg/runtime/chan.c
index ef27144ef..32995c6dd 100644
--- a/src/pkg/runtime/chan.c
+++ b/src/pkg/runtime/chan.c
@@ -3,7 +3,10 @@
// license that can be found in the LICENSE file.
#include "runtime.h"
+#include "arch_GOARCH.h"
#include "type.h"
+#include "race.h"
+#include "malloc.h"
#define MAXALIGN 7
#define NOSELGEN 1
@@ -20,6 +23,7 @@ struct SudoG
G* g; // g and selgen constitute
uint32 selgen; // a weak pointer to g
SudoG* link;
+ int64 releasetime;
byte* elem; // data element
};
@@ -29,21 +33,25 @@ struct WaitQ
SudoG* last;
};
+// The garbage collector is assuming that Hchan can only contain pointers into the stack
+// and cannot contain pointers into the heap.
struct Hchan
{
- uint32 qcount; // total data in the q
- uint32 dataqsiz; // size of the circular q
+ uintgo qcount; // total data in the q
+ uintgo dataqsiz; // size of the circular q
uint16 elemsize;
bool closed;
uint8 elemalign;
Alg* elemalg; // interface for element type
- uint32 sendx; // send index
- uint32 recvx; // receive index
+ uintgo sendx; // send index
+ uintgo recvx; // receive index
WaitQ recvq; // list of recv waiters
WaitQ sendq; // list of send waiters
Lock;
};
+uint32 runtime·Hchansize = sizeof(Hchan);
+
// Buffer follows Hchan immediately in memory.
// chanbuf(c, i) is pointer to the i'th slot in the buffer.
#define chanbuf(c, i) ((byte*)((c)+1)+(uintptr)(c)->elemsize*(i))
@@ -79,17 +87,22 @@ static void dequeueg(WaitQ*);
static SudoG* dequeue(WaitQ*);
static void enqueue(WaitQ*, SudoG*);
static void destroychan(Hchan*);
+static void racesync(Hchan*, SudoG*);
Hchan*
runtime·makechan_c(ChanType *t, int64 hint)
{
Hchan *c;
- int32 n;
+ uintptr n;
Type *elem;
-
+
elem = t->elem;
- if(hint < 0 || (int32)hint != hint || (elem->size > 0 && hint > ((uintptr)-1) / elem->size))
+ // compiler checks this but be safe.
+ if(elem->size >= (1<<16))
+ runtime·throw("makechan: invalid channel element type");
+
+ if(hint < 0 || (intgo)hint != hint || (elem->size > 0 && hint > MaxMem / elem->size))
runtime·panicstring("makechan: size out of range");
// calculate rounded size of Hchan
@@ -103,18 +116,19 @@ runtime·makechan_c(ChanType *t, int64 hint)
c->elemalg = elem->alg;
c->elemalign = elem->align;
c->dataqsiz = hint;
+ runtime·settype(c, (uintptr)t | TypeInfo_Chan);
if(debug)
- runtime·printf("makechan: chan=%p; elemsize=%D; elemalg=%p; elemalign=%d; dataqsiz=%d\n",
- c, (int64)elem->size, elem->alg, elem->align, c->dataqsiz);
+ runtime·printf("makechan: chan=%p; elemsize=%D; elemalg=%p; elemalign=%d; dataqsiz=%D\n",
+ c, (int64)elem->size, elem->alg, elem->align, (int64)c->dataqsiz);
return c;
}
// For reflect
-// func makechan(typ *ChanType, size uint32) (chan)
+// func makechan(typ *ChanType, size uint64) (chan)
void
-reflect·makechan(ChanType *t, uint32 size, Hchan *c)
+reflect·makechan(ChanType *t, uint64 size, Hchan *c)
{
c = runtime·makechan_c(t, size);
FLUSH(&c);
@@ -143,11 +157,12 @@ runtime·makechan(ChanType *t, int64 hint, Hchan *ret)
* the operation; we'll see that it's now closed.
*/
void
-runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres)
+runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc)
{
SudoG *sg;
SudoG mysg;
G* gp;
+ int64 t0;
if(c == nil) {
USED(t);
@@ -155,9 +170,7 @@ runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres)
*pres = false;
return;
}
- g->status = Gwaiting;
- g->waitreason = "chan send (nil chan)";
- runtime·gosched();
+ runtime·park(nil, nil, "chan send (nil chan)");
return; // not reached
}
@@ -170,7 +183,17 @@ runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres)
runtime·prints("\n");
}
+ t0 = 0;
+ mysg.releasetime = 0;
+ if(runtime·blockprofilerate > 0) {
+ t0 = runtime·cputicks();
+ mysg.releasetime = -1;
+ }
+
runtime·lock(c);
+ // TODO(dvyukov): add similar instrumentation to select.
+ if(raceenabled)
+ runtime·racereadpc(c, pc, runtime·chansend);
if(c->closed)
goto closed;
@@ -179,12 +202,16 @@ runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres)
sg = dequeue(&c->recvq);
if(sg != nil) {
+ if(raceenabled)
+ racesync(c, sg);
runtime·unlock(c);
-
+
gp = sg->g;
gp->param = sg;
if(sg->elem != nil)
c->elemalg->copy(c->elemsize, sg->elem, ep);
+ if(sg->releasetime)
+ sg->releasetime = runtime·cputicks();
runtime·ready(gp);
if(pres != nil)
@@ -202,11 +229,8 @@ runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres)
mysg.g = g;
mysg.selgen = NOSELGEN;
g->param = nil;
- g->status = Gwaiting;
- g->waitreason = "chan send";
enqueue(&c->sendq, &mysg);
- runtime·unlock(c);
- runtime·gosched();
+ runtime·park(runtime·unlock, c, "chan send");
if(g->param == nil) {
runtime·lock(c);
@@ -215,6 +239,9 @@ runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres)
goto closed;
}
+ if(mysg.releasetime > 0)
+ runtime·blockevent(mysg.releasetime - t0, 2);
+
return;
asynch:
@@ -230,15 +257,16 @@ asynch:
mysg.g = g;
mysg.elem = nil;
mysg.selgen = NOSELGEN;
- g->status = Gwaiting;
- g->waitreason = "chan send";
enqueue(&c->sendq, &mysg);
- runtime·unlock(c);
- runtime·gosched();
+ runtime·park(runtime·unlock, c, "chan send");
runtime·lock(c);
goto asynch;
}
+
+ if(raceenabled)
+ runtime·racerelease(chanbuf(c, c->sendx));
+
c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep);
if(++c->sendx == c->dataqsiz)
c->sendx = 0;
@@ -248,11 +276,15 @@ asynch:
if(sg != nil) {
gp = sg->g;
runtime·unlock(c);
+ if(sg->releasetime)
+ sg->releasetime = runtime·cputicks();
runtime·ready(gp);
} else
runtime·unlock(c);
if(pres != nil)
*pres = true;
+ if(mysg.releasetime > 0)
+ runtime·blockevent(mysg.releasetime - t0, 2);
return;
closed:
@@ -267,6 +299,7 @@ runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *receive
SudoG *sg;
SudoG mysg;
G *gp;
+ int64 t0;
if(runtime·gcwaiting)
runtime·gosched();
@@ -280,12 +313,17 @@ runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *receive
*selected = false;
return;
}
- g->status = Gwaiting;
- g->waitreason = "chan receive (nil chan)";
- runtime·gosched();
+ runtime·park(nil, nil, "chan receive (nil chan)");
return; // not reached
}
+ t0 = 0;
+ mysg.releasetime = 0;
+ if(runtime·blockprofilerate > 0) {
+ t0 = runtime·cputicks();
+ mysg.releasetime = -1;
+ }
+
runtime·lock(c);
if(c->dataqsiz > 0)
goto asynch;
@@ -295,12 +333,16 @@ runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *receive
sg = dequeue(&c->sendq);
if(sg != nil) {
+ if(raceenabled)
+ racesync(c, sg);
runtime·unlock(c);
if(ep != nil)
c->elemalg->copy(c->elemsize, ep, sg->elem);
gp = sg->g;
gp->param = sg;
+ if(sg->releasetime)
+ sg->releasetime = runtime·cputicks();
runtime·ready(gp);
if(selected != nil)
@@ -320,11 +362,8 @@ runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *receive
mysg.g = g;
mysg.selgen = NOSELGEN;
g->param = nil;
- g->status = Gwaiting;
- g->waitreason = "chan receive";
enqueue(&c->recvq, &mysg);
- runtime·unlock(c);
- runtime·gosched();
+ runtime·park(runtime·unlock, c, "chan receive");
if(g->param == nil) {
runtime·lock(c);
@@ -335,6 +374,8 @@ runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *receive
if(received != nil)
*received = true;
+ if(mysg.releasetime > 0)
+ runtime·blockevent(mysg.releasetime - t0, 2);
return;
asynch:
@@ -352,15 +393,16 @@ asynch:
mysg.g = g;
mysg.elem = nil;
mysg.selgen = NOSELGEN;
- g->status = Gwaiting;
- g->waitreason = "chan receive";
enqueue(&c->recvq, &mysg);
- runtime·unlock(c);
- runtime·gosched();
+ runtime·park(runtime·unlock, c, "chan receive");
runtime·lock(c);
goto asynch;
}
+
+ if(raceenabled)
+ runtime·raceacquire(chanbuf(c, c->recvx));
+
if(ep != nil)
c->elemalg->copy(c->elemsize, ep, chanbuf(c, c->recvx));
c->elemalg->copy(c->elemsize, chanbuf(c, c->recvx), nil);
@@ -372,6 +414,8 @@ asynch:
if(sg != nil) {
gp = sg->g;
runtime·unlock(c);
+ if(sg->releasetime)
+ sg->releasetime = runtime·cputicks();
runtime·ready(gp);
} else
runtime·unlock(c);
@@ -380,6 +424,8 @@ asynch:
*selected = true;
if(received != nil)
*received = true;
+ if(mysg.releasetime > 0)
+ runtime·blockevent(mysg.releasetime - t0, 2);
return;
closed:
@@ -389,7 +435,11 @@ closed:
*selected = true;
if(received != nil)
*received = false;
+ if(raceenabled)
+ runtime·raceacquire(c);
runtime·unlock(c);
+ if(mysg.releasetime > 0)
+ runtime·blockevent(mysg.releasetime - t0, 2);
}
// chansend1(hchan *chan any, elem any);
@@ -397,7 +447,7 @@ closed:
void
runtime·chansend1(ChanType *t, Hchan* c, ...)
{
- runtime·chansend(t, c, (byte*)(&c+1), nil);
+ runtime·chansend(t, c, (byte*)(&c+1), nil, runtime·getcallerpc(&t));
}
// chanrecv1(hchan *chan any) (elem any);
@@ -446,8 +496,8 @@ runtime·selectnbsend(ChanType *t, Hchan *c, ...)
byte *ae, *ap;
ae = (byte*)(&c + 1);
- ap = ae + runtime·rnd(t->elem->size, Structrnd);
- runtime·chansend(t, c, ae, ap);
+ ap = ae + ROUND(t->elem->size, Structrnd);
+ runtime·chansend(t, c, ae, ap, runtime·getcallerpc(&t));
}
// func selectnbrecv(elem *any, c chan any) bool
@@ -474,7 +524,7 @@ void
runtime·selectnbrecv(ChanType *t, byte *v, Hchan *c, bool selected)
{
runtime·chanrecv(t, c, v, &selected, nil);
-}
+}
// func selectnbrecv2(elem *any, ok *bool, c chan any) bool
//
@@ -500,7 +550,7 @@ void
runtime·selectnbrecv2(ChanType *t, byte *v, bool *received, Hchan *c, bool selected)
{
runtime·chanrecv(t, c, v, &selected, received);
-}
+}
// For reflect:
// func chansend(c chan, val iword, nb bool) (selected bool)
@@ -509,12 +559,13 @@ runtime·selectnbrecv2(ChanType *t, byte *v, bool *received, Hchan *c, bool sele
//
// The "uintptr selected" is really "bool selected" but saying
// uintptr gets us the right alignment for the output parameter block.
+#pragma textflag 7
void
reflect·chansend(ChanType *t, Hchan *c, uintptr val, bool nb, uintptr selected)
{
bool *sp;
byte *vp;
-
+
if(nb) {
selected = false;
sp = (bool*)&selected;
@@ -527,7 +578,7 @@ reflect·chansend(ChanType *t, Hchan *c, uintptr val, bool nb, uintptr selected)
vp = (byte*)&val;
else
vp = (byte*)val;
- runtime·chansend(t, c, vp, sp);
+ runtime·chansend(t, c, vp, sp, runtime·getcallerpc(&t));
}
// For reflect:
@@ -571,7 +622,7 @@ runtime·newselect(int32 size, ...)
int32 o;
Select **selp;
- o = runtime·rnd(sizeof(size), Structrnd);
+ o = ROUND(sizeof(size), Structrnd);
selp = (Select**)((byte*)&size + o);
newselect(size, selp);
}
@@ -619,7 +670,7 @@ runtime·selectsend(Select *sel, Hchan *c, void *elem, bool selected)
// nil cases do not compete
if(c == nil)
return;
-
+
selectsend(sel, c, runtime·getcallerpc(&sel), elem, (byte*)&selected - (byte*)&sel);
}
@@ -628,7 +679,7 @@ selectsend(Select *sel, Hchan *c, void *pc, void *elem, int32 so)
{
int32 i;
Scase *cas;
-
+
i = sel->ncase;
if(i >= sel->tcase)
runtime·throw("selectsend: too many cases");
@@ -774,9 +825,7 @@ selunlock(Select *sel)
void
runtime·block(void)
{
- g->status = Gwaiting; // forever
- g->waitreason = "select (no cases)";
- runtime·gosched();
+ runtime·park(nil, nil, "select (no cases)"); // forever
}
static void* selectgo(Select**);
@@ -796,7 +845,7 @@ static void*
selectgo(Select **selp)
{
Select *sel;
- uint32 o, i, j;
+ uint32 o, i, j, k;
Scase *cas, *dfl;
Hchan *c;
SudoG *sg;
@@ -830,12 +879,42 @@ selectgo(Select **selp)
}
// sort the cases by Hchan address to get the locking order.
+ // simple heap sort, to guarantee n log n time and constant stack footprint.
for(i=0; i<sel->ncase; i++) {
- c = sel->scase[i].chan;
- for(j=i; j>0 && sel->lockorder[j-1] >= c; j--)
- sel->lockorder[j] = sel->lockorder[j-1];
+ j = i;
+ c = sel->scase[j].chan;
+ while(j > 0 && sel->lockorder[k=(j-1)/2] < c) {
+ sel->lockorder[j] = sel->lockorder[k];
+ j = k;
+ }
+ sel->lockorder[j] = c;
+ }
+ for(i=sel->ncase; i-->0; ) {
+ c = sel->lockorder[i];
+ sel->lockorder[i] = sel->lockorder[0];
+ j = 0;
+ for(;;) {
+ k = j*2+1;
+ if(k >= i)
+ break;
+ if(k+1 < i && sel->lockorder[k] < sel->lockorder[k+1])
+ k++;
+ if(c < sel->lockorder[k]) {
+ sel->lockorder[j] = sel->lockorder[k];
+ j = k;
+ continue;
+ }
+ break;
+ }
sel->lockorder[j] = c;
}
+ /*
+ for(i=0; i+1<sel->ncase; i++)
+ if(sel->lockorder[i] > sel->lockorder[i+1]) {
+ runtime·printf("i=%d %p %p\n", i, sel->lockorder[i], sel->lockorder[i+1]);
+ runtime·throw("select: broken sort");
+ }
+ */
sellock(sel);
loop:
@@ -899,7 +978,7 @@ loop:
case CaseRecv:
enqueue(&c->recvq, sg);
break;
-
+
case CaseSend:
enqueue(&c->sendq, sg);
break;
@@ -907,10 +986,7 @@ loop:
}
g->param = nil;
- g->status = Gwaiting;
- g->waitreason = "select";
- selunlock(sel);
- runtime·gosched();
+ runtime·park((void(*)(Lock*))selunlock, (Lock*)sel, "select");
sellock(sel);
sg = g->param;
@@ -951,6 +1027,8 @@ loop:
asyncrecv:
// can receive from buffer
+ if(raceenabled)
+ runtime·raceacquire(chanbuf(c, c->recvx));
if(cas->receivedp != nil)
*cas->receivedp = true;
if(cas->sg.elem != nil)
@@ -971,6 +1049,8 @@ asyncrecv:
asyncsend:
// can send to buffer
+ if(raceenabled)
+ runtime·racerelease(chanbuf(c, c->sendx));
c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), cas->sg.elem);
if(++c->sendx == c->dataqsiz)
c->sendx = 0;
@@ -987,6 +1067,8 @@ asyncsend:
syncrecv:
// can receive from sleeping sender (sg)
+ if(raceenabled)
+ racesync(c, sg);
selunlock(sel);
if(debug)
runtime·printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o);
@@ -1006,10 +1088,14 @@ rclose:
*cas->receivedp = false;
if(cas->sg.elem != nil)
c->elemalg->copy(c->elemsize, cas->sg.elem, nil);
+ if(raceenabled)
+ runtime·raceacquire(c);
goto retc;
syncsend:
// can send to sleeping receiver (sg)
+ if(raceenabled)
+ racesync(c, sg);
selunlock(sel);
if(debug)
runtime·printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o);
@@ -1020,11 +1106,17 @@ syncsend:
runtime·ready(gp);
retc:
- // return to pc corresponding to chosen case
+ // return pc corresponding to chosen case.
+ // Set boolean passed during select creation
+ // (at offset selp + cas->so) to true.
+ // If cas->so == 0, this is a reflect-driven select and we
+ // don't need to update the boolean.
pc = cas->pc;
- as = (byte*)selp + cas->so;
+ if(cas->so > 0) {
+ as = (byte*)selp + cas->so;
+ *as = true;
+ }
runtime·free(sel);
- *as = true;
return pc;
sclose:
@@ -1034,7 +1126,89 @@ sclose:
return nil; // not reached
}
+// This struct must match ../reflect/value.go:/runtimeSelect.
+typedef struct runtimeSelect runtimeSelect;
+struct runtimeSelect
+{
+ uintptr dir;
+ ChanType *typ;
+ Hchan *ch;
+ uintptr val;
+};
+
+// This enum must match ../reflect/value.go:/SelectDir.
+enum SelectDir {
+ SelectSend = 1,
+ SelectRecv,
+ SelectDefault,
+};
+
+// func rselect(cases []runtimeSelect) (chosen int, word uintptr, recvOK bool)
+void
+reflect·rselect(Slice cases, intgo chosen, uintptr word, bool recvOK)
+{
+ int32 i;
+ Select *sel;
+ runtimeSelect* rcase, *rc;
+ void *elem;
+ void *recvptr;
+ uintptr maxsize;
+
+ chosen = -1;
+ word = 0;
+ recvOK = false;
+
+ maxsize = 0;
+ rcase = (runtimeSelect*)cases.array;
+ for(i=0; i<cases.len; i++) {
+ rc = &rcase[i];
+ if(rc->dir == SelectRecv && rc->ch != nil && maxsize < rc->typ->elem->size)
+ maxsize = rc->typ->elem->size;
+ }
+
+ recvptr = nil;
+ if(maxsize > sizeof(void*))
+ recvptr = runtime·mal(maxsize);
+
+ newselect(cases.len, &sel);
+ for(i=0; i<cases.len; i++) {
+ rc = &rcase[i];
+ switch(rc->dir) {
+ case SelectDefault:
+ selectdefault(sel, (void*)i, 0);
+ break;
+ case SelectSend:
+ if(rc->ch == nil)
+ break;
+ if(rc->typ->elem->size > sizeof(void*))
+ elem = (void*)rc->val;
+ else
+ elem = (void*)&rc->val;
+ selectsend(sel, rc->ch, (void*)i, elem, 0);
+ break;
+ case SelectRecv:
+ if(rc->ch == nil)
+ break;
+ if(rc->typ->elem->size > sizeof(void*))
+ elem = recvptr;
+ else
+ elem = &word;
+ selectrecv(sel, rc->ch, (void*)i, elem, &recvOK, 0);
+ break;
+ }
+ }
+
+ chosen = (intgo)(uintptr)selectgo(&sel);
+ if(rcase[chosen].dir == SelectRecv && rcase[chosen].typ->elem->size > sizeof(void*))
+ word = (uintptr)recvptr;
+
+ FLUSH(&chosen);
+ FLUSH(&word);
+ FLUSH(&recvOK);
+}
+
// closechan(sel *byte);
+#pragma textflag 7
void
runtime·closechan(Hchan *c)
{
@@ -1053,6 +1227,11 @@ runtime·closechan(Hchan *c)
runtime·panicstring("close of closed channel");
}
+ if(raceenabled) {
+ runtime·racewritepc(c, runtime·getcallerpc(&c), runtime·closechan);
+ runtime·racerelease(c);
+ }
+
c->closed = true;
// release all readers
@@ -1087,9 +1266,9 @@ reflect·chanclose(Hchan *c)
}
// For reflect
-// func chanlen(c chan) (len int32)
+// func chanlen(c chan) (len int)
void
-reflect·chanlen(Hchan *c, int32 len)
+reflect·chanlen(Hchan *c, intgo len)
{
if(c == nil)
len = 0;
@@ -1099,9 +1278,9 @@ reflect·chanlen(Hchan *c, int32 len)
}
// For reflect
-// func chancap(c chan) (cap int32)
+// func chancap(c chan) int
void
-reflect·chancap(Hchan *c, int32 cap)
+reflect·chancap(Hchan *c, intgo cap)
{
if(c == nil)
cap = 0;
@@ -1160,3 +1339,12 @@ enqueue(WaitQ *q, SudoG *sgp)
q->last->link = sgp;
q->last = sgp;
}
+
+static void
+racesync(Hchan *c, SudoG *sg)
+{
+ runtime·racerelease(chanbuf(c, 0));
+ runtime·raceacquireg(sg->g, chanbuf(c, 0));
+ runtime·racereleaseg(sg->g, chanbuf(c, 0));
+ runtime·raceacquire(chanbuf(c, 0));
+}