summaryrefslogtreecommitdiff
path: root/src/pkg/runtime/chan.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pkg/runtime/chan.c')
-rw-r--r--src/pkg/runtime/chan.c544
1 files changed, 227 insertions, 317 deletions
diff --git a/src/pkg/runtime/chan.c b/src/pkg/runtime/chan.c
index f94c3ef40..ef5342353 100644
--- a/src/pkg/runtime/chan.c
+++ b/src/pkg/runtime/chan.c
@@ -6,6 +6,7 @@
#include "type.h"
#define MAXALIGN 7
+#define NOSELGEN 1
static int32 debug = 0;
@@ -18,10 +19,8 @@ struct SudoG
{
G* g; // g and selgen constitute
uint32 selgen; // a weak pointer to g
- int16 offset; // offset of case number
- int8 isfree; // offset of case number
SudoG* link;
- byte elem[8]; // synch data element (+ more)
+ byte* elem; // data element
};
struct WaitQ
@@ -38,11 +37,10 @@ struct Hchan
bool closed;
uint8 elemalign;
Alg* elemalg; // interface for element type
- uint32 sendx; // send index
- uint32 recvx; // receive index
+ uint32 sendx; // send index
+ uint32 recvx; // receive index
WaitQ recvq; // list of recv waiters
WaitQ sendq; // list of send waiters
- SudoG* free; // freelist
Lock;
};
@@ -60,44 +58,38 @@ enum
struct Scase
{
+ SudoG sg; // must be first member (cast to Scase)
Hchan* chan; // chan
byte* pc; // return pc
uint16 kind;
uint16 so; // vararg of selected bool
- union {
- byte elem[2*sizeof(void*)]; // element (send)
- struct {
- byte* elemp; // pointer to element (recv)
- bool* receivedp; // pointer to received bool (recv2)
- } recv;
- } u;
+ bool* receivedp; // pointer to received bool (recv2)
};
struct Select
{
uint16 tcase; // total count of scase[]
uint16 ncase; // currently filled scase[]
- Select* link; // for freelist
- uint16* order;
- Scase* scase[1]; // one per case
+ uint16* pollorder; // case poll order
+ Hchan** lockorder; // channel lock order
+ Scase scase[1]; // one per case (in order of appearance)
};
-static void dequeueg(WaitQ*, Hchan*);
-static SudoG* dequeue(WaitQ*, Hchan*);
+static void dequeueg(WaitQ*);
+static SudoG* dequeue(WaitQ*);
static void enqueue(WaitQ*, SudoG*);
-static SudoG* allocsg(Hchan*);
-static void freesg(Hchan*, SudoG*);
-static uint32 fastrandn(uint32);
static void destroychan(Hchan*);
Hchan*
-runtime·makechan_c(Type *elem, int64 hint)
+runtime·makechan_c(ChanType *t, int64 hint)
{
Hchan *c;
int32 n;
- byte *by;
+ Type *elem;
+
+ elem = t->elem;
- if(hint < 0 || (int32)hint != hint || hint > ((uintptr)-1) / elem->size)
+ if(hint < 0 || (int32)hint != hint || (elem->size > 0 && hint > ((uintptr)-1) / elem->size))
runtime·panicstring("makechan: size out of range");
if(elem->alg >= nelem(runtime·algarray)) {
@@ -111,10 +103,9 @@ runtime·makechan_c(Type *elem, int64 hint)
n++;
// allocate memory in one call
- by = runtime·mal(n + hint*elem->size);
-
- c = (Hchan*)by;
- runtime·addfinalizer(c, destroychan, 0);
+ c = (Hchan*)runtime·mal(n + hint*elem->size);
+ if(runtime·destroylock)
+ runtime·addfinalizer(c, destroychan, 0);
c->elemsize = elem->size;
c->elemalg = &runtime·algarray[elem->alg];
@@ -133,7 +124,7 @@ runtime·makechan_c(Type *elem, int64 hint)
void
reflect·makechan(ChanType *t, uint32 size, Hchan *c)
{
- c = runtime·makechan_c(t->elem, size);
+ c = runtime·makechan_c(t, size);
FLUSH(&c);
}
@@ -144,11 +135,11 @@ destroychan(Hchan *c)
}
-// makechan(elem *Type, hint int64) (hchan *chan any);
+// makechan(t *ChanType, hint int64) (hchan *chan any);
void
-runtime·makechan(Type *elem, int64 hint, Hchan *ret)
+runtime·makechan(ChanType *t, int64 hint, Hchan *ret)
{
- ret = runtime·makechan_c(elem, hint);
+ ret = runtime·makechan_c(t, hint);
FLUSH(&ret);
}
@@ -167,13 +158,22 @@ runtime·makechan(Type *elem, int64 hint, Hchan *ret)
* the operation; we'll see that it's now closed.
*/
void
-runtime·chansend(Hchan *c, byte *ep, bool *pres)
+runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres)
{
SudoG *sg;
+ SudoG mysg;
G* gp;
- if(c == nil)
- runtime·panicstring("send to nil channel");
+ if(c == nil) {
+ USED(t);
+ if(pres != nil) {
+ *pres = false;
+ return;
+ }
+ g->status = Gwaiting;
+ runtime·gosched();
+ return; // not reached
+ }
if(runtime·gcwaiting)
runtime·gosched();
@@ -185,21 +185,20 @@ runtime·chansend(Hchan *c, byte *ep, bool *pres)
}
runtime·lock(c);
-loop:
if(c->closed)
goto closed;
if(c->dataqsiz > 0)
goto asynch;
- sg = dequeue(&c->recvq, c);
+ sg = dequeue(&c->recvq);
if(sg != nil) {
- if(ep != nil)
- c->elemalg->copy(c->elemsize, sg->elem, ep);
-
+ runtime·unlock(c);
+
gp = sg->g;
gp->param = sg;
- runtime·unlock(c);
+ if(sg->elem != nil)
+ c->elemalg->copy(c->elemsize, sg->elem, ep);
runtime·ready(gp);
if(pres != nil)
@@ -213,21 +212,22 @@ loop:
return;
}
- sg = allocsg(c);
- if(ep != nil)
- c->elemalg->copy(c->elemsize, sg->elem, ep);
+ mysg.elem = ep;
+ mysg.g = g;
+ mysg.selgen = NOSELGEN;
g->param = nil;
g->status = Gwaiting;
- enqueue(&c->sendq, sg);
+ enqueue(&c->sendq, &mysg);
runtime·unlock(c);
runtime·gosched();
- runtime·lock(c);
- sg = g->param;
- if(sg == nil)
- goto loop;
- freesg(c, sg);
- runtime·unlock(c);
+ if(g->param == nil) {
+ runtime·lock(c);
+ if(!c->closed)
+ runtime·throw("chansend: spurious wakeup");
+ goto closed;
+ }
+
return;
asynch:
@@ -240,25 +240,25 @@ asynch:
*pres = false;
return;
}
- sg = allocsg(c);
+ mysg.g = g;
+ mysg.elem = nil;
+ mysg.selgen = NOSELGEN;
g->status = Gwaiting;
- enqueue(&c->sendq, sg);
+ enqueue(&c->sendq, &mysg);
runtime·unlock(c);
runtime·gosched();
runtime·lock(c);
goto asynch;
}
- if(ep != nil)
- c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep);
+ c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep);
if(++c->sendx == c->dataqsiz)
c->sendx = 0;
c->qcount++;
- sg = dequeue(&c->recvq, c);
+ sg = dequeue(&c->recvq);
if(sg != nil) {
gp = sg->g;
- freesg(c, sg);
runtime·unlock(c);
runtime·ready(gp);
} else
@@ -274,38 +274,44 @@ closed:
void
-runtime·chanrecv(Hchan* c, byte *ep, bool *selected, bool *received)
+runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *received)
{
SudoG *sg;
+ SudoG mysg;
G *gp;
- if(c == nil)
- runtime·panicstring("receive from nil channel");
-
if(runtime·gcwaiting)
runtime·gosched();
if(debug)
runtime·printf("chanrecv: chan=%p\n", c);
- runtime·lock(c);
+ if(c == nil) {
+ USED(t);
+ if(selected != nil) {
+ *selected = false;
+ return;
+ }
+ g->status = Gwaiting;
+ runtime·gosched();
+ return; // not reached
+ }
-loop:
+ runtime·lock(c);
if(c->dataqsiz > 0)
goto asynch;
if(c->closed)
goto closed;
- sg = dequeue(&c->sendq, c);
+ sg = dequeue(&c->sendq);
if(sg != nil) {
+ runtime·unlock(c);
+
if(ep != nil)
c->elemalg->copy(c->elemsize, ep, sg->elem);
- c->elemalg->copy(c->elemsize, sg->elem, nil);
-
gp = sg->g;
gp->param = sg;
- runtime·unlock(c);
runtime·ready(gp);
if(selected != nil)
@@ -321,25 +327,24 @@ loop:
return;
}
- sg = allocsg(c);
+ mysg.elem = ep;
+ mysg.g = g;
+ mysg.selgen = NOSELGEN;
g->param = nil;
g->status = Gwaiting;
- enqueue(&c->recvq, sg);
+ enqueue(&c->recvq, &mysg);
runtime·unlock(c);
runtime·gosched();
- runtime·lock(c);
- sg = g->param;
- if(sg == nil)
- goto loop;
+ if(g->param == nil) {
+ runtime·lock(c);
+ if(!c->closed)
+ runtime·throw("chanrecv: spurious wakeup");
+ goto closed;
+ }
- if(ep != nil)
- c->elemalg->copy(c->elemsize, ep, sg->elem);
- c->elemalg->copy(c->elemsize, sg->elem, nil);
if(received != nil)
*received = true;
- freesg(c, sg);
- runtime·unlock(c);
return;
asynch:
@@ -354,9 +359,11 @@ asynch:
*received = false;
return;
}
- sg = allocsg(c);
+ mysg.g = g;
+ mysg.elem = nil;
+ mysg.selgen = NOSELGEN;
g->status = Gwaiting;
- enqueue(&c->recvq, sg);
+ enqueue(&c->recvq, &mysg);
runtime·unlock(c);
runtime·gosched();
@@ -369,10 +376,10 @@ asynch:
if(++c->recvx == c->dataqsiz)
c->recvx = 0;
c->qcount--;
- sg = dequeue(&c->sendq, c);
+
+ sg = dequeue(&c->sendq);
if(sg != nil) {
gp = sg->g;
- freesg(c, sg);
runtime·unlock(c);
runtime·ready(gp);
} else
@@ -397,50 +404,29 @@ closed:
// chansend1(hchan *chan any, elem any);
#pragma textflag 7
void
-runtime·chansend1(Hchan* c, ...)
+runtime·chansend1(ChanType *t, Hchan* c, ...)
{
- int32 o;
- byte *ae;
-
- if(c == nil)
- runtime·panicstring("send to nil channel");
-
- o = runtime·rnd(sizeof(c), c->elemalign);
- ae = (byte*)&c + o;
- runtime·chansend(c, ae, nil);
+ runtime·chansend(t, c, (byte*)(&c+1), nil);
}
// chanrecv1(hchan *chan any) (elem any);
#pragma textflag 7
void
-runtime·chanrecv1(Hchan* c, ...)
+runtime·chanrecv1(ChanType *t, Hchan* c, ...)
{
- int32 o;
- byte *ae;
-
- o = runtime·rnd(sizeof(c), Structrnd);
- ae = (byte*)&c + o;
-
- runtime·chanrecv(c, ae, nil, nil);
+ runtime·chanrecv(t, c, (byte*)(&c+1), nil, nil);
}
// chanrecv2(hchan *chan any) (elem any, received bool);
#pragma textflag 7
void
-runtime·chanrecv2(Hchan* c, ...)
+runtime·chanrecv2(ChanType *t, Hchan* c, ...)
{
- int32 o;
- byte *ae, *ac;
-
- if(c == nil)
- runtime·panicstring("receive from nil channel");
-
- o = runtime·rnd(sizeof(c), Structrnd);
- ae = (byte*)&c + o;
- o = runtime·rnd(o+c->elemsize, 1);
- ac = (byte*)&c + o;
+ byte *ae, *ap;
- runtime·chanrecv(c, ae, nil, ac);
+ ae = (byte*)(&c+1);
+ ap = ae + t->elem->size;
+ runtime·chanrecv(t, c, ae, nil, ap);
}
// func selectnbsend(c chan any, elem any) bool
@@ -456,7 +442,7 @@ runtime·chanrecv2(Hchan* c, ...)
//
// as
//
-// if c != nil && selectnbsend(c, v) {
+// if selectnbsend(c, v) {
// ... foo
// } else {
// ... bar
@@ -464,17 +450,13 @@ runtime·chanrecv2(Hchan* c, ...)
//
#pragma textflag 7
void
-runtime·selectnbsend(Hchan *c, ...)
+runtime·selectnbsend(ChanType *t, Hchan *c, ...)
{
- int32 o;
byte *ae, *ap;
- o = runtime·rnd(sizeof(c), c->elemalign);
- ae = (byte*)&c + o;
- o = runtime·rnd(o+c->elemsize, Structrnd);
- ap = (byte*)&c + o;
-
- runtime·chansend(c, ae, ap);
+ ae = (byte*)(&c + 1);
+ ap = ae + runtime·rnd(t->elem->size, Structrnd);
+ runtime·chansend(t, c, ae, ap);
}
// func selectnbrecv(elem *any, c chan any) bool
@@ -490,7 +472,7 @@ runtime·selectnbsend(Hchan *c, ...)
//
// as
//
-// if c != nil && selectnbrecv(&v, c) {
+// if selectnbrecv(&v, c) {
// ... foo
// } else {
// ... bar
@@ -498,9 +480,9 @@ runtime·selectnbsend(Hchan *c, ...)
//
#pragma textflag 7
void
-runtime·selectnbrecv(byte *v, Hchan *c, bool selected)
+runtime·selectnbrecv(ChanType *t, byte *v, Hchan *c, bool selected)
{
- runtime·chanrecv(c, v, &selected, nil);
+ runtime·chanrecv(t, c, v, &selected, nil);
}
// func selectnbrecv2(elem *any, ok *bool, c chan any) bool
@@ -524,9 +506,9 @@ runtime·selectnbrecv(byte *v, Hchan *c, bool selected)
//
#pragma textflag 7
void
-runtime·selectnbrecv2(byte *v, bool *received, Hchan *c, bool selected)
+runtime·selectnbrecv2(ChanType *t, byte *v, bool *received, Hchan *c, bool selected)
{
- runtime·chanrecv(c, v, &selected, received);
+ runtime·chanrecv(t, c, v, &selected, received);
}
// For reflect:
@@ -537,14 +519,11 @@ runtime·selectnbrecv2(byte *v, bool *received, Hchan *c, bool selected)
// The "uintptr selected" is really "bool selected" but saying
// uintptr gets us the right alignment for the output parameter block.
void
-reflect·chansend(Hchan *c, uintptr val, bool nb, uintptr selected)
+reflect·chansend(ChanType *t, Hchan *c, uintptr val, bool nb, uintptr selected)
{
bool *sp;
byte *vp;
- if(c == nil)
- runtime·panicstring("send to nil channel");
-
if(nb) {
selected = false;
sp = (bool*)&selected;
@@ -553,11 +532,11 @@ reflect·chansend(Hchan *c, uintptr val, bool nb, uintptr selected)
FLUSH(&selected);
sp = nil;
}
- if(c->elemsize <= sizeof(val))
+ if(t->elem->size <= sizeof(val))
vp = (byte*)&val;
else
vp = (byte*)val;
- runtime·chansend(c, vp, sp);
+ runtime·chansend(t, c, vp, sp);
}
// For reflect:
@@ -565,13 +544,10 @@ reflect·chansend(Hchan *c, uintptr val, bool nb, uintptr selected)
// where an iword is the same word an interface value would use:
// the actual data if it fits, or else a pointer to the data.
void
-reflect·chanrecv(Hchan *c, bool nb, uintptr val, bool selected, bool received)
+reflect·chanrecv(ChanType *t, Hchan *c, bool nb, uintptr val, bool selected, bool received)
{
byte *vp;
bool *sp;
-
- if(c == nil)
- runtime·panicstring("receive from nil channel");
if(nb) {
selected = false;
@@ -583,15 +559,15 @@ reflect·chanrecv(Hchan *c, bool nb, uintptr val, bool selected, bool received)
}
received = false;
FLUSH(&received);
- if(c->elemsize <= sizeof(val)) {
+ if(t->elem->size <= sizeof(val)) {
val = 0;
vp = (byte*)&val;
} else {
- vp = runtime·mal(c->elemsize);
+ vp = runtime·mal(t->elem->size);
val = (uintptr)vp;
FLUSH(&val);
}
- runtime·chanrecv(c, vp, sp, &received);
+ runtime·chanrecv(t, c, vp, sp, &received);
}
static void newselect(int32, Select**);
@@ -619,57 +595,56 @@ newselect(int32 size, Select **selp)
if(size > 1)
n = size-1;
- sel = runtime·mal(sizeof(*sel) + n*sizeof(sel->scase[0]) + size*sizeof(sel->order[0]));
+ sel = runtime·mal(sizeof(*sel) +
+ n*sizeof(sel->scase[0]) +
+ size*sizeof(sel->lockorder[0]) +
+ size*sizeof(sel->pollorder[0]));
sel->tcase = size;
sel->ncase = 0;
- sel->order = (void*)(sel->scase + size);
+ sel->pollorder = (void*)(sel->scase + size);
+ sel->lockorder = (void*)(sel->pollorder + size);
*selp = sel;
+
if(debug)
runtime·printf("newselect s=%p size=%d\n", sel, size);
}
// cut in half to give stack a chance to split
-static void selectsend(Select **selp, Hchan *c, void *pc);
+static void selectsend(Select *sel, Hchan *c, void *pc, void *elem, int32 so);
-// selectsend(sel *byte, hchan *chan any, elem any) (selected bool);
+// selectsend(sel *byte, hchan *chan any, elem *any) (selected bool);
#pragma textflag 7
void
-runtime·selectsend(Select *sel, Hchan *c, ...)
+runtime·selectsend(Select *sel, Hchan *c, void *elem, bool selected)
{
+ selected = false;
+ FLUSH(&selected);
+
// nil cases do not compete
if(c == nil)
return;
- selectsend(&sel, c, runtime·getcallerpc(&sel));
+ selectsend(sel, c, runtime·getcallerpc(&sel), elem, (byte*)&selected - (byte*)&sel);
}
static void
-selectsend(Select **selp, Hchan *c, void *pc)
+selectsend(Select *sel, Hchan *c, void *pc, void *elem, int32 so)
{
- int32 i, eo;
+ int32 i;
Scase *cas;
- byte *ae;
- Select *sel;
- sel = *selp;
i = sel->ncase;
if(i >= sel->tcase)
runtime·throw("selectsend: too many cases");
sel->ncase = i+1;
- cas = runtime·mal(sizeof *cas + c->elemsize - sizeof(cas->u.elem));
- sel->scase[i] = cas;
+ cas = &sel->scase[i];
cas->pc = pc;
cas->chan = c;
-
- eo = runtime·rnd(sizeof(sel), sizeof(c));
- eo = runtime·rnd(eo+sizeof(c), c->elemsize);
- cas->so = runtime·rnd(eo+c->elemsize, Structrnd);
+ cas->so = so;
cas->kind = CaseSend;
-
- ae = (byte*)selp + eo;
- c->elemalg->copy(c->elemsize, cas->u.elem, ae);
+ cas->sg.elem = elem;
if(debug)
runtime·printf("selectsend s=%p pc=%p chan=%p so=%d\n",
@@ -684,6 +659,9 @@ static void selectrecv(Select *sel, Hchan *c, void *pc, void *elem, bool*, int32
void
runtime·selectrecv(Select *sel, Hchan *c, void *elem, bool selected)
{
+ selected = false;
+ FLUSH(&selected);
+
// nil cases do not compete
if(c == nil)
return;
@@ -696,6 +674,9 @@ runtime·selectrecv(Select *sel, Hchan *c, void *elem, bool selected)
void
runtime·selectrecv2(Select *sel, Hchan *c, void *elem, bool *received, bool selected)
{
+ selected = false;
+ FLUSH(&selected);
+
// nil cases do not compete
if(c == nil)
return;
@@ -713,16 +694,14 @@ selectrecv(Select *sel, Hchan *c, void *pc, void *elem, bool *received, int32 so
if(i >= sel->tcase)
runtime·throw("selectrecv: too many cases");
sel->ncase = i+1;
- cas = runtime·mal(sizeof *cas);
- sel->scase[i] = cas;
+ cas = &sel->scase[i];
cas->pc = pc;
cas->chan = c;
cas->so = so;
cas->kind = CaseRecv;
- cas->u.recv.elemp = elem;
- cas->u.recv.receivedp = nil;
- cas->u.recv.receivedp = received;
+ cas->sg.elem = elem;
+ cas->receivedp = received;
if(debug)
runtime·printf("selectrecv s=%p pc=%p chan=%p so=%d\n",
@@ -737,6 +716,9 @@ static void selectdefault(Select*, void*, int32);
void
runtime·selectdefault(Select *sel, bool selected)
{
+ selected = false;
+ FLUSH(&selected);
+
selectdefault(sel, runtime·getcallerpc(&sel), (byte*)&selected - (byte*)&sel);
}
@@ -750,8 +732,7 @@ selectdefault(Select *sel, void *callerpc, int32 so)
if(i >= sel->tcase)
runtime·throw("selectdefault: too many cases");
sel->ncase = i+1;
- cas = runtime·mal(sizeof *cas);
- sel->scase[i] = cas;
+ cas = &sel->scase[i];
cas->pc = callerpc;
cas->chan = nil;
@@ -764,25 +745,16 @@ selectdefault(Select *sel, void *callerpc, int32 so)
}
static void
-freesel(Select *sel)
-{
- uint32 i;
-
- for(i=0; i<sel->ncase; i++)
- runtime·free(sel->scase[i]);
- runtime·free(sel);
-}
-
-static void
sellock(Select *sel)
{
uint32 i;
- Hchan *c;
+ Hchan *c, *c0;
c = nil;
for(i=0; i<sel->ncase; i++) {
- if(sel->scase[i]->chan != c) {
- c = sel->scase[i]->chan;
+ c0 = sel->lockorder[i];
+ if(c0 && c0 != c) {
+ c = sel->lockorder[i];
runtime·lock(c);
}
}
@@ -792,12 +764,13 @@ static void
selunlock(Select *sel)
{
uint32 i;
- Hchan *c;
+ Hchan *c, *c0;
c = nil;
- for(i=sel->ncase; i>0; i--) {
- if(sel->scase[i-1]->chan && sel->scase[i-1]->chan != c) {
- c = sel->scase[i-1]->chan;
+ for(i=sel->ncase; i-->0;) {
+ c0 = sel->lockorder[i];
+ if(c0 && c0 != c) {
+ c = c0;
runtime·unlock(c);
}
}
@@ -852,20 +825,20 @@ selectgo(Select **selp)
// generate permuted order
for(i=0; i<sel->ncase; i++)
- sel->order[i] = i;
+ sel->pollorder[i] = i;
for(i=1; i<sel->ncase; i++) {
- o = sel->order[i];
- j = fastrandn(i+1);
- sel->order[i] = sel->order[j];
- sel->order[j] = o;
+ o = sel->pollorder[i];
+ j = runtime·fastrand1()%(i+1);
+ sel->pollorder[i] = sel->pollorder[j];
+ sel->pollorder[j] = o;
}
// sort the cases by Hchan address to get the locking order.
- for(i=1; i<sel->ncase; i++) {
- cas = sel->scase[i];
- for(j=i; j>0 && sel->scase[j-1]->chan >= cas->chan; j--)
- sel->scase[j] = sel->scase[j-1];
- sel->scase[j] = cas;
+ for(i=0; i<sel->ncase; i++) {
+ c = sel->scase[i].chan;
+ for(j=i; j>0 && sel->lockorder[j-1] >= c; j--)
+ sel->lockorder[j] = sel->lockorder[j-1];
+ sel->lockorder[j] = c;
}
sellock(sel);
@@ -873,8 +846,8 @@ loop:
// pass 1 - look for something already waiting
dfl = nil;
for(i=0; i<sel->ncase; i++) {
- o = sel->order[i];
- cas = sel->scase[o];
+ o = sel->pollorder[i];
+ cas = &sel->scase[o];
c = cas->chan;
switch(cas->kind) {
@@ -883,7 +856,7 @@ loop:
if(c->qcount > 0)
goto asyncrecv;
} else {
- sg = dequeue(&c->sendq, c);
+ sg = dequeue(&c->sendq);
if(sg != nil)
goto syncrecv;
}
@@ -898,7 +871,7 @@ loop:
if(c->qcount < c->dataqsiz)
goto asyncsend;
} else {
- sg = dequeue(&c->recvq, c);
+ sg = dequeue(&c->recvq);
if(sg != nil)
goto syncsend;
}
@@ -911,6 +884,7 @@ loop:
}
if(dfl != nil) {
+ selunlock(sel);
cas = dfl;
goto retc;
}
@@ -918,11 +892,12 @@ loop:
// pass 2 - enqueue on all chans
for(i=0; i<sel->ncase; i++) {
- o = sel->order[i];
- cas = sel->scase[o];
+ o = sel->pollorder[i];
+ cas = &sel->scase[o];
c = cas->chan;
- sg = allocsg(c);
- sg->offset = o;
+ sg = &cas->sg;
+ sg->g = g;
+ sg->selgen = g->selgen;
switch(cas->kind) {
case CaseRecv:
@@ -930,8 +905,6 @@ loop:
break;
case CaseSend:
- if(c->dataqsiz == 0)
- c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
enqueue(&c->sendq, sg);
break;
}
@@ -948,85 +921,82 @@ loop:
// pass 3 - dequeue from unsuccessful chans
// otherwise they stack up on quiet channels
for(i=0; i<sel->ncase; i++) {
- if(sg == nil || i != sg->offset) {
- cas = sel->scase[i];
+ cas = &sel->scase[i];
+ if(cas != (Scase*)sg) {
c = cas->chan;
if(cas->kind == CaseSend)
- dequeueg(&c->sendq, c);
+ dequeueg(&c->sendq);
else
- dequeueg(&c->recvq, c);
+ dequeueg(&c->recvq);
}
}
if(sg == nil)
goto loop;
- o = sg->offset;
- cas = sel->scase[o];
+ cas = (Scase*)sg;
c = cas->chan;
- if(c->dataqsiz > 0) {
-// prints("shouldnt happen\n");
- goto loop;
- }
+ if(c->dataqsiz > 0)
+ runtime·throw("selectgo: shouldnt happen");
if(debug)
- runtime·printf("wait-return: sel=%p c=%p cas=%p kind=%d o=%d\n",
- sel, c, cas, cas->kind, o);
+ runtime·printf("wait-return: sel=%p c=%p cas=%p kind=%d\n",
+ sel, c, cas, cas->kind);
if(cas->kind == CaseRecv) {
- if(cas->u.recv.receivedp != nil)
- *cas->u.recv.receivedp = true;
- if(cas->u.recv.elemp != nil)
- c->elemalg->copy(c->elemsize, cas->u.recv.elemp, sg->elem);
- c->elemalg->copy(c->elemsize, sg->elem, nil);
+ if(cas->receivedp != nil)
+ *cas->receivedp = true;
}
- freesg(c, sg);
+ selunlock(sel);
goto retc;
asyncrecv:
// can receive from buffer
- if(cas->u.recv.receivedp != nil)
- *cas->u.recv.receivedp = true;
- if(cas->u.recv.elemp != nil)
- c->elemalg->copy(c->elemsize, cas->u.recv.elemp, chanbuf(c, c->recvx));
+ if(cas->receivedp != nil)
+ *cas->receivedp = true;
+ if(cas->sg.elem != nil)
+ c->elemalg->copy(c->elemsize, cas->sg.elem, chanbuf(c, c->recvx));
c->elemalg->copy(c->elemsize, chanbuf(c, c->recvx), nil);
if(++c->recvx == c->dataqsiz)
c->recvx = 0;
c->qcount--;
- sg = dequeue(&c->sendq, c);
+ sg = dequeue(&c->sendq);
if(sg != nil) {
gp = sg->g;
- freesg(c, sg);
+ selunlock(sel);
runtime·ready(gp);
+ } else {
+ selunlock(sel);
}
goto retc;
asyncsend:
// can send to buffer
- if(cas->u.elem != nil)
- c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), cas->u.elem);
+ c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), cas->sg.elem);
if(++c->sendx == c->dataqsiz)
c->sendx = 0;
c->qcount++;
- sg = dequeue(&c->recvq, c);
+ sg = dequeue(&c->recvq);
if(sg != nil) {
gp = sg->g;
- freesg(c, sg);
+ selunlock(sel);
runtime·ready(gp);
+ } else {
+ selunlock(sel);
}
goto retc;
syncrecv:
// can receive from sleeping sender (sg)
+ selunlock(sel);
if(debug)
runtime·printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o);
- if(cas->u.recv.receivedp != nil)
- *cas->u.recv.receivedp = true;
- if(cas->u.recv.elemp != nil)
- c->elemalg->copy(c->elemsize, cas->u.recv.elemp, sg->elem);
- c->elemalg->copy(c->elemsize, sg->elem, nil);
+ if(cas->receivedp != nil)
+ *cas->receivedp = true;
+ if(cas->sg.elem != nil)
+ c->elemalg->copy(c->elemsize, cas->sg.elem, sg->elem);
gp = sg->g;
gp->param = sg;
runtime·ready(gp);
@@ -1034,30 +1004,28 @@ syncrecv:
rclose:
// read at end of closed channel
- if(cas->u.recv.receivedp != nil)
- *cas->u.recv.receivedp = false;
- if(cas->u.recv.elemp != nil)
- c->elemalg->copy(c->elemsize, cas->u.recv.elemp, nil);
+ selunlock(sel);
+ if(cas->receivedp != nil)
+ *cas->receivedp = false;
+ if(cas->sg.elem != nil)
+ c->elemalg->copy(c->elemsize, cas->sg.elem, nil);
goto retc;
syncsend:
// can send to sleeping receiver (sg)
+ selunlock(sel);
if(debug)
runtime·printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o);
- if(c->closed)
- goto sclose;
- c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
+ c->elemalg->copy(c->elemsize, sg->elem, cas->sg.elem);
gp = sg->g;
gp->param = sg;
runtime·ready(gp);
retc:
- selunlock(sel);
-
// return to pc corresponding to chosen case
pc = cas->pc;
as = (byte*)selp + cas->so;
- freesel(sel);
+ runtime·free(sel);
*as = true;
return pc;
@@ -1088,23 +1056,21 @@ runtime·closechan(Hchan *c)
// release all readers
for(;;) {
- sg = dequeue(&c->recvq, c);
+ sg = dequeue(&c->recvq);
if(sg == nil)
break;
gp = sg->g;
gp->param = nil;
- freesg(c, sg);
runtime·ready(gp);
}
// release all writers
for(;;) {
- sg = dequeue(&c->sendq, c);
+ sg = dequeue(&c->sendq);
if(sg == nil)
break;
gp = sg->g;
gp->param = nil;
- freesg(c, sg);
runtime·ready(gp);
}
@@ -1144,7 +1110,7 @@ reflect·chancap(Hchan *c, int32 cap)
}
static SudoG*
-dequeue(WaitQ *q, Hchan *c)
+dequeue(WaitQ *q)
{
SudoG *sgp;
@@ -1155,9 +1121,10 @@ loop:
q->first = sgp->link;
// if sgp is stale, ignore it
- if(!runtime·cas(&sgp->g->selgen, sgp->selgen, sgp->selgen + 1)) {
+ if(sgp->selgen != NOSELGEN &&
+ (sgp->selgen != sgp->g->selgen ||
+ !runtime·cas(&sgp->g->selgen, sgp->selgen, sgp->selgen + 2))) {
//prints("INVALID PSEUDOG POINTER\n");
- freesg(c, sgp);
goto loop;
}
@@ -1165,14 +1132,16 @@ loop:
}
static void
-dequeueg(WaitQ *q, Hchan *c)
+dequeueg(WaitQ *q)
{
- SudoG **l, *sgp;
-
- for(l=&q->first; (sgp=*l) != nil; l=&sgp->link) {
+ SudoG **l, *sgp, *prevsgp;
+
+ prevsgp = nil;
+ for(l=&q->first; (sgp=*l) != nil; l=&sgp->link, prevsgp=sgp) {
if(sgp->g == g) {
*l = sgp->link;
- freesg(c, sgp);
+ if(q->last == sgp)
+ q->last = prevsgp;
break;
}
}
@@ -1190,62 +1159,3 @@ enqueue(WaitQ *q, SudoG *sgp)
q->last->link = sgp;
q->last = sgp;
}
-
-static SudoG*
-allocsg(Hchan *c)
-{
- SudoG* sg;
-
- sg = c->free;
- if(sg != nil) {
- c->free = sg->link;
- } else
- sg = runtime·mal(sizeof(*sg) + c->elemsize - sizeof(sg->elem));
- sg->selgen = g->selgen;
- sg->g = g;
- sg->offset = 0;
- sg->isfree = 0;
-
- return sg;
-}
-
-static void
-freesg(Hchan *c, SudoG *sg)
-{
- if(sg != nil) {
- if(sg->isfree)
- runtime·throw("chan.freesg: already free");
- sg->isfree = 1;
- sg->link = c->free;
- c->free = sg;
- }
-}
-
-static uint32
-fastrand1(void)
-{
- static uint32 x = 0x49f6428aUL;
-
- x += x;
- if(x & 0x80000000L)
- x ^= 0x88888eefUL;
- return x;
-}
-
-static uint32
-fastrandn(uint32 n)
-{
- uint32 max, r;
-
- if(n <= 1)
- return 0;
-
- r = fastrand1();
- if(r < (1ULL<<31)-n) // avoid computing max in common case
- return r%n;
-
- max = (1ULL<<31)/n * n;
- while(r >= max)
- r = fastrand1();
- return r%n;
-}