summaryrefslogtreecommitdiff
path: root/src/pkg/runtime/chan.c
diff options
context:
space:
mode:
authorOndřej Surý <ondrej@sury.org>2011-02-14 13:23:51 +0100
committerOndřej Surý <ondrej@sury.org>2011-02-14 13:23:51 +0100
commit758ff64c69e34965f8af5b2d6ffd65e8d7ab2150 (patch)
tree6d6b34f8c678862fe9b56c945a7b63f68502c245 /src/pkg/runtime/chan.c
parent3e45412327a2654a77944249962b3652e6142299 (diff)
downloadgolang-upstream/2011-02-01.1.tar.gz
Imported Upstream version 2011-02-01.1upstream/2011-02-01.1
Diffstat (limited to 'src/pkg/runtime/chan.c')
-rw-r--r--src/pkg/runtime/chan.c260
1 files changed, 134 insertions, 126 deletions
diff --git a/src/pkg/runtime/chan.c b/src/pkg/runtime/chan.c
index 94ea513e7..8d3ac2ca4 100644
--- a/src/pkg/runtime/chan.c
+++ b/src/pkg/runtime/chan.c
@@ -11,8 +11,6 @@ enum
{
Wclosed = 0x0001, // writer has closed
Rclosed = 0x0002, // reader has seen close
- Eincr = 0x0004, // increment errors
- Emax = 0x0800, // error limit before throw
};
typedef struct Link Link;
@@ -76,6 +74,7 @@ struct Select
uint16 tcase; // total count of scase[]
uint16 ncase; // currently filled scase[]
Select* link; // for freelist
+ uint16* order;
Scase* scase[1]; // one per case
};
@@ -84,9 +83,7 @@ static SudoG* dequeue(WaitQ*, Hchan*);
static void enqueue(WaitQ*, SudoG*);
static SudoG* allocsg(Hchan*);
static void freesg(Hchan*, SudoG*);
-static uint32 gcd(uint32, uint32);
-static uint32 fastrand1(void);
-static uint32 fastrand2(void);
+static uint32 fastrandn(uint32);
static void destroychan(Hchan*);
Hchan*
@@ -152,16 +149,6 @@ runtime·makechan(Type *elem, int64 hint, Hchan *ret)
FLUSH(&ret);
}
-static void
-incerr(Hchan* c)
-{
- c->closed += Eincr;
- if(c->closed & Emax) {
- // Note that channel locks may still be held at this point.
- runtime·throw("too many operations on a closed channel");
- }
-}
-
/*
* generic single channel send/recv
* if the bool pointer is nil,
@@ -277,14 +264,12 @@ asynch:
return;
closed:
- incerr(c);
- if(pres != nil)
- *pres = true;
runtime·unlock(c);
+ runtime·panicstring("send on closed channel");
}
void
-runtime·chanrecv(Hchan* c, byte *ep, bool* pres)
+runtime·chanrecv(Hchan* c, byte *ep, bool *pres, bool *closed)
{
SudoG *sg;
G *gp;
@@ -299,6 +284,9 @@ runtime·chanrecv(Hchan* c, byte *ep, bool* pres)
runtime·printf("chanrecv: chan=%p\n", c);
runtime·lock(c);
+ if(closed != nil)
+ *closed = false;
+
loop:
if(c->dataqsiz > 0)
goto asynch;
@@ -308,7 +296,8 @@ loop:
sg = dequeue(&c->sendq, c);
if(sg != nil) {
- c->elemalg->copy(c->elemsize, ep, sg->elem);
+ if(ep != nil)
+ c->elemalg->copy(c->elemsize, ep, sg->elem);
c->elemalg->copy(c->elemsize, sg->elem, nil);
gp = sg->g;
@@ -323,7 +312,6 @@ loop:
if(pres != nil) {
runtime·unlock(c);
- c->elemalg->copy(c->elemsize, ep, nil);
*pres = false;
return;
}
@@ -340,7 +328,8 @@ loop:
if(sg == nil)
goto loop;
- c->elemalg->copy(c->elemsize, ep, sg->elem);
+ if(ep != nil)
+ c->elemalg->copy(c->elemsize, ep, sg->elem);
c->elemalg->copy(c->elemsize, sg->elem, nil);
freesg(c, sg);
runtime·unlock(c);
@@ -353,7 +342,6 @@ asynch:
if(pres != nil) {
runtime·unlock(c);
- c->elemalg->copy(c->elemsize, ep, nil);
*pres = false;
return;
}
@@ -366,7 +354,8 @@ asynch:
runtime·lock(c);
goto asynch;
}
- c->elemalg->copy(c->elemsize, ep, c->recvdataq->elem);
+ if(ep != nil)
+ c->elemalg->copy(c->elemsize, ep, c->recvdataq->elem);
c->elemalg->copy(c->elemsize, c->recvdataq->elem, nil);
c->recvdataq = c->recvdataq->link;
c->qcount--;
@@ -387,9 +376,11 @@ asynch:
return;
closed:
- c->elemalg->copy(c->elemsize, ep, nil);
+ if(closed != nil)
+ *closed = true;
+ if(ep != nil)
+ c->elemalg->copy(c->elemsize, ep, nil);
c->closed |= Rclosed;
- incerr(c);
if(pres != nil)
*pres = true;
runtime·unlock(c);
@@ -411,55 +402,99 @@ runtime·chansend1(Hchan* c, ...)
runtime·chansend(c, ae, nil);
}
-// chansend2(hchan *chan any, elem any) (pres bool);
+// chanrecv1(hchan *chan any) (elem any);
#pragma textflag 7
void
-runtime·chansend2(Hchan* c, ...)
+runtime·chanrecv1(Hchan* c, ...)
{
int32 o;
- byte *ae, *ap;
-
- if(c == nil)
- runtime·panicstring("send to nil channel");
+ byte *ae;
- o = runtime·rnd(sizeof(c), c->elemalign);
+ o = runtime·rnd(sizeof(c), Structrnd);
ae = (byte*)&c + o;
- o = runtime·rnd(o+c->elemsize, Structrnd);
- ap = (byte*)&c + o;
- runtime·chansend(c, ae, ap);
+ runtime·chanrecv(c, ae, nil, nil);
}
-// chanrecv1(hchan *chan any) (elem any);
+// chanrecv3(hchan *chan any) (elem any, closed bool);
#pragma textflag 7
void
-runtime·chanrecv1(Hchan* c, ...)
+runtime·chanrecv3(Hchan* c, ...)
{
int32 o;
- byte *ae;
+ byte *ae, *ac;
+
+ if(c == nil)
+ runtime·panicstring("range over nil channel");
o = runtime·rnd(sizeof(c), Structrnd);
ae = (byte*)&c + o;
+ o = runtime·rnd(o+c->elemsize, 1);
+ ac = (byte*)&c + o;
- runtime·chanrecv(c, ae, nil);
+ runtime·chanrecv(c, ae, nil, ac);
}
-// chanrecv2(hchan *chan any) (elem any, pres bool);
+// func selectnbsend(c chan any, elem any) bool
+//
+// compiler implements
+//
+// select {
+// case c <- v:
+// ... foo
+// default:
+// ... bar
+// }
+//
+// as
+//
+// if c != nil && selectnbsend(c, v) {
+// ... foo
+// } else {
+// ... bar
+// }
+//
#pragma textflag 7
void
-runtime·chanrecv2(Hchan* c, ...)
+runtime·selectnbsend(Hchan *c, ...)
{
int32 o;
byte *ae, *ap;
- o = runtime·rnd(sizeof(c), Structrnd);
+ o = runtime·rnd(sizeof(c), c->elemalign);
ae = (byte*)&c + o;
- o = runtime·rnd(o+c->elemsize, 1);
+ o = runtime·rnd(o+c->elemsize, Structrnd);
ap = (byte*)&c + o;
- runtime·chanrecv(c, ae, ap);
+ runtime·chansend(c, ae, ap);
}
+// func selectnbrecv(elem *any, c chan any) bool
+//
+// compiler implements
+//
+// select {
+// case v = <-c:
+// ... foo
+// default:
+// ... bar
+// }
+//
+// as
+//
+// if c != nil && selectnbrecv(&v, c) {
+// ... foo
+// } else {
+// ... bar
+// }
+//
+#pragma textflag 7
+void
+runtime·selectnbrecv(byte *v, Hchan *c, bool ok)
+{
+ runtime·chanrecv(c, v, &ok, nil);
+}
+
// newselect(size uint32) (sel *byte);
#pragma textflag 7
void
@@ -475,10 +510,11 @@ runtime·newselect(int32 size, ...)
if(size > 1)
n = size-1;
- sel = runtime·mal(sizeof(*sel) + n*sizeof(sel->scase[0]));
+ sel = runtime·mal(sizeof(*sel) + n*sizeof(sel->scase[0]) + size*sizeof(sel->order[0]));
sel->tcase = size;
sel->ncase = 0;
+ sel->order = (void*)(sel->scase + size);
*selp = sel;
if(debug)
runtime·printf("newselect s=%p size=%d\n", sel, size);
@@ -619,6 +655,13 @@ selunlock(Select *sel)
}
}
+void
+runtime·block(void)
+{
+ g->status = Gwaiting; // forever
+ runtime·gosched();
+}
+
// selectgo(sel *byte);
//
// overwrites return pc on stack to signal which case of the select
@@ -629,7 +672,7 @@ selunlock(Select *sel)
void
runtime·selectgo(Select *sel)
{
- uint32 p, o, i, j;
+ uint32 o, i, j;
Scase *cas, *dfl;
Hchan *c;
SudoG *sg;
@@ -642,29 +685,24 @@ runtime·selectgo(Select *sel)
if(debug)
runtime·printf("select: sel=%p\n", sel);
- if(sel->ncase < 2) {
- if(sel->ncase < 1) {
- g->status = Gwaiting; // forever
- runtime·gosched();
- }
- // TODO: make special case of one.
- }
+ // The compiler rewrites selects that statically have
+ // only 0 or 1 cases plus default into simpler constructs.
+ // The only way we can end up with such small sel->ncase
+ // values here is for a larger select in which most channels
+ // have been nilled out. The general code handles those
+ // cases correctly, and they are rare enough not to bother
+ // optimizing (and needing to test).
- // select a (relative) prime
- for(i=0;; i++) {
- p = fastrand1();
- if(gcd(p, sel->ncase) == 1)
- break;
- if(i > 1000)
- runtime·throw("select: failed to select prime");
+ // generate permuted order
+ for(i=0; i<sel->ncase; i++)
+ sel->order[i] = i;
+ for(i=1; i<sel->ncase; i++) {
+ o = sel->order[i];
+ j = fastrandn(i+1);
+ sel->order[i] = sel->order[j];
+ sel->order[j] = o;
}
- // select an initial offset
- o = fastrand2();
-
- p %= sel->ncase;
- o %= sel->ncase;
-
// sort the cases by Hchan address to get the locking order.
for(i=1; i<sel->ncase; i++) {
cas = sel->scase[i];
@@ -672,13 +710,13 @@ runtime·selectgo(Select *sel)
sel->scase[j] = sel->scase[j-1];
sel->scase[j] = cas;
}
-
sellock(sel);
loop:
// pass 1 - look for something already waiting
dfl = nil;
for(i=0; i<sel->ncase; i++) {
+ o = sel->order[i];
cas = sel->scase[o];
c = cas->chan;
@@ -713,10 +751,6 @@ loop:
dfl = cas;
break;
}
-
- o += p;
- if(o >= sel->ncase)
- o -= sel->ncase;
}
if(dfl != nil) {
@@ -727,6 +761,7 @@ loop:
// pass 2 - enqueue on all chans
for(i=0; i<sel->ncase; i++) {
+ o = sel->order[i];
cas = sel->scase[o];
c = cas->chan;
sg = allocsg(c);
@@ -734,32 +769,15 @@ loop:
switch(cas->send) {
case 0: // recv
- if(c->dataqsiz > 0) {
- if(c->qcount > 0)
- runtime·throw("select: pass 2 async recv");
- } else {
- if(dequeue(&c->sendq, c))
- runtime·throw("select: pass 2 sync recv");
- }
enqueue(&c->recvq, sg);
break;
case 1: // send
- if(c->dataqsiz > 0) {
- if(c->qcount < c->dataqsiz)
- runtime·throw("select: pass 2 async send");
- } else {
- if(dequeue(&c->recvq, c))
- runtime·throw("select: pass 2 sync send");
+ if(c->dataqsiz == 0)
c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
- }
enqueue(&c->sendq, sg);
break;
}
-
- o += p;
- if(o >= sel->ncase)
- o -= sel->ncase;
}
g->param = nil;
@@ -773,18 +791,14 @@ loop:
// pass 3 - dequeue from unsuccessful chans
// otherwise they stack up on quiet channels
for(i=0; i<sel->ncase; i++) {
- if(sg == nil || o != sg->offset) {
- cas = sel->scase[o];
+ if(sg == nil || i != sg->offset) {
+ cas = sel->scase[i];
c = cas->chan;
if(cas->send)
dequeueg(&c->sendq, c);
else
dequeueg(&c->recvq, c);
}
-
- o += p;
- if(o >= sel->ncase)
- o -= sel->ncase;
}
if(sg == nil)
@@ -858,7 +872,6 @@ rclose:
if(cas->u.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.elemp, nil);
c->closed |= Rclosed;
- incerr(c);
goto retc;
syncsend:
@@ -871,12 +884,6 @@ syncsend:
gp = sg->g;
gp->param = sg;
runtime·ready(gp);
- goto retc;
-
-sclose:
- // send on closed channel
- incerr(c);
- goto retc;
retc:
selunlock(sel);
@@ -886,6 +893,12 @@ retc:
as = (byte*)&sel + cas->so;
freesel(sel);
*as = true;
+ return;
+
+sclose:
+ // send on closed channel
+ selunlock(sel);
+ runtime·panicstring("send on closed channel");
}
// closechan(sel *byte);
@@ -899,7 +912,11 @@ runtime·closechan(Hchan *c)
runtime·gosched();
runtime·lock(c);
- incerr(c);
+ if(c->closed & Wclosed) {
+ runtime·unlock(c);
+ runtime·panicstring("close of closed channel");
+ }
+
c->closed |= Wclosed;
// release all readers
@@ -1039,22 +1056,6 @@ freesg(Hchan *c, SudoG *sg)
}
static uint32
-gcd(uint32 u, uint32 v)
-{
- for(;;) {
- if(u > v) {
- if(v == 0)
- return u;
- u = u%v;
- continue;
- }
- if(u == 0)
- return v;
- v = v%u;
- }
-}
-
-static uint32
fastrand1(void)
{
static uint32 x = 0x49f6428aUL;
@@ -1066,12 +1067,19 @@ fastrand1(void)
}
static uint32
-fastrand2(void)
+fastrandn(uint32 n)
{
- static uint32 x = 0x49f6428aUL;
+ uint32 max, r;
- x += x;
- if(x & 0x80000000L)
- x ^= 0xfafd871bUL;
- return x;
+ if(n <= 1)
+ return 0;
+
+ r = fastrand1();
+ if(r < (1ULL<<31)-n) // avoid computing max in common case
+ return r%n;
+
+ max = (1ULL<<31)/n * n;
+ while(r >= max)
+ r = fastrand1();
+ return r%n;
}