summaryrefslogtreecommitdiff
path: root/src/pkg/runtime/chan.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pkg/runtime/chan.c')
-rw-r--r--src/pkg/runtime/chan.c346
1 files changed, 201 insertions, 145 deletions
diff --git a/src/pkg/runtime/chan.c b/src/pkg/runtime/chan.c
index 3177c2295..8c45b076d 100644
--- a/src/pkg/runtime/chan.c
+++ b/src/pkg/runtime/chan.c
@@ -5,13 +5,9 @@
#include "runtime.h"
#include "type.h"
-static int32 debug = 0;
+#define MAXALIGN 7
-enum
-{
- Wclosed = 0x0001, // writer has closed
- Rclosed = 0x0002, // reader has seen close
-};
+static int32 debug = 0;
typedef struct Link Link;
typedef struct WaitQ WaitQ;
@@ -40,32 +36,47 @@ struct Hchan
uint32 qcount; // total data in the q
uint32 dataqsiz; // size of the circular q
uint16 elemsize;
- uint16 closed; // Wclosed Rclosed errorcount
+ bool closed;
uint8 elemalign;
Alg* elemalg; // interface for element type
- Link* senddataq; // pointer for sender
- Link* recvdataq; // pointer for receiver
+ uint32 sendx; // send index
+ uint32 recvx; // receive index
WaitQ recvq; // list of recv waiters
WaitQ sendq; // list of send waiters
SudoG* free; // freelist
Lock;
};
+// Buffer follows Hchan immediately in memory.
+// chanbuf(c, i) is pointer to the i'th slot in the buffer.
+#define chanbuf(c, i) ((byte*)((c)+1)+(uintptr)(c)->elemsize*(i))
+
struct Link
{
Link* link; // asynch queue circular linked list
byte elem[8]; // asynch queue data element (+ more)
};
+enum
+{
+ // Scase.kind
+ CaseRecv,
+ CaseSend,
+ CaseDefault,
+};
+
struct Scase
{
Hchan* chan; // chan
byte* pc; // return pc
- uint16 send; // 0-recv 1-send 2-default
+ uint16 kind;
uint16 so; // vararg of selected bool
union {
- byte elem[8]; // element (send)
- byte* elemp; // pointer to element (recv)
+ byte elem[2*sizeof(void*)]; // element (send)
+ struct {
+ byte* elemp; // pointer to element (recv)
+ bool* receivedp; // pointer to received bool (recv2)
+ } recv;
} u;
};
@@ -90,7 +101,8 @@ Hchan*
runtime·makechan_c(Type *elem, int64 hint)
{
Hchan *c;
- int32 i;
+ int32 n;
+ byte *by;
if(hint < 0 || (int32)hint != hint || hint > ((uintptr)-1) / elem->size)
runtime·panicstring("makechan: size out of range");
@@ -100,32 +112,22 @@ runtime·makechan_c(Type *elem, int64 hint)
runtime·throw("runtime.makechan: unsupported elem type");
}
- c = runtime·mal(sizeof(*c));
+ // calculate rounded size of Hchan
+ n = sizeof(*c);
+ while(n & MAXALIGN)
+ n++;
+
+ // allocate memory in one call
+ by = runtime·mal(n + hint*elem->size);
+
+ c = (Hchan*)by;
+ by += n;
runtime·addfinalizer(c, destroychan, 0);
c->elemsize = elem->size;
c->elemalg = &runtime·algarray[elem->alg];
c->elemalign = elem->align;
-
- if(hint > 0) {
- Link *d, *b, *e;
-
- // make a circular q
- b = nil;
- e = nil;
- for(i=0; i<hint; i++) {
- d = runtime·mal(sizeof(*d) + c->elemsize - sizeof(d->elem));
- if(e == nil)
- e = d;
- d->link = b;
- b = d;
- }
- e->link = b;
- c->recvdataq = b;
- c->senddataq = b;
- c->qcount = 0;
- c->dataqsiz = hint;
- }
+ c->dataqsiz = hint;
if(debug)
runtime·printf("makechan: chan=%p; elemsize=%D; elemalg=%d; elemalign=%d; dataqsiz=%d\n",
@@ -183,7 +185,7 @@ runtime·chansend(Hchan *c, byte *ep, bool *pres)
runtime·lock(c);
loop:
- if(c->closed & Wclosed)
+ if(c->closed)
goto closed;
if(c->dataqsiz > 0)
@@ -228,7 +230,7 @@ loop:
return;
asynch:
- if(c->closed & Wclosed)
+ if(c->closed)
goto closed;
if(c->qcount >= c->dataqsiz) {
@@ -247,8 +249,9 @@ asynch:
goto asynch;
}
if(ep != nil)
- c->elemalg->copy(c->elemsize, c->senddataq->elem, ep);
- c->senddataq = c->senddataq->link;
+ c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep);
+ if(++c->sendx == c->dataqsiz)
+ c->sendx = 0;
c->qcount++;
sg = dequeue(&c->recvq, c);
@@ -269,7 +272,7 @@ closed:
}
void
-runtime·chanrecv(Hchan* c, byte *ep, bool *pres, bool *closed)
+runtime·chanrecv(Hchan* c, byte *ep, bool *selected, bool *received)
{
SudoG *sg;
G *gp;
@@ -284,14 +287,12 @@ runtime·chanrecv(Hchan* c, byte *ep, bool *pres, bool *closed)
runtime·printf("chanrecv: chan=%p\n", c);
runtime·lock(c);
- if(closed != nil)
- *closed = false;
loop:
if(c->dataqsiz > 0)
goto asynch;
- if(c->closed & Wclosed)
+ if(c->closed)
goto closed;
sg = dequeue(&c->sendq, c);
@@ -305,14 +306,16 @@ loop:
runtime·unlock(c);
runtime·ready(gp);
- if(pres != nil)
- *pres = true;
+ if(selected != nil)
+ *selected = true;
+ if(received != nil)
+ *received = true;
return;
}
- if(pres != nil) {
+ if(selected != nil) {
runtime·unlock(c);
- *pres = false;
+ *selected = false;
return;
}
@@ -331,18 +334,22 @@ loop:
if(ep != nil)
c->elemalg->copy(c->elemsize, ep, sg->elem);
c->elemalg->copy(c->elemsize, sg->elem, nil);
+ if(received != nil)
+ *received = true;
freesg(c, sg);
runtime·unlock(c);
return;
asynch:
if(c->qcount <= 0) {
- if(c->closed & Wclosed)
+ if(c->closed)
goto closed;
- if(pres != nil) {
+ if(selected != nil) {
runtime·unlock(c);
- *pres = false;
+ *selected = false;
+ if(received != nil)
+ *received = false;
return;
}
sg = allocsg(c);
@@ -355,9 +362,10 @@ asynch:
goto asynch;
}
if(ep != nil)
- c->elemalg->copy(c->elemsize, ep, c->recvdataq->elem);
- c->elemalg->copy(c->elemsize, c->recvdataq->elem, nil);
- c->recvdataq = c->recvdataq->link;
+ c->elemalg->copy(c->elemsize, ep, chanbuf(c, c->recvx));
+ c->elemalg->copy(c->elemsize, chanbuf(c, c->recvx), nil);
+ if(++c->recvx == c->dataqsiz)
+ c->recvx = 0;
c->qcount--;
sg = dequeue(&c->sendq, c);
if(sg != nil) {
@@ -365,24 +373,22 @@ asynch:
freesg(c, sg);
runtime·unlock(c);
runtime·ready(gp);
- if(pres != nil)
- *pres = true;
- return;
- }
+ } else
+ runtime·unlock(c);
- runtime·unlock(c);
- if(pres != nil)
- *pres = true;
+ if(selected != nil)
+ *selected = true;
+ if(received != nil)
+ *received = true;
return;
closed:
- if(closed != nil)
- *closed = true;
if(ep != nil)
c->elemalg->copy(c->elemsize, ep, nil);
- c->closed |= Rclosed;
- if(pres != nil)
- *pres = true;
+ if(selected != nil)
+ *selected = true;
+ if(received != nil)
+ *received = false;
runtime·unlock(c);
}
@@ -416,16 +422,16 @@ runtime·chanrecv1(Hchan* c, ...)
runtime·chanrecv(c, ae, nil, nil);
}
-// chanrecv3(hchan *chan any) (elem any, closed bool);
+// chanrecv2(hchan *chan any) (elem any, received bool);
#pragma textflag 7
void
-runtime·chanrecv3(Hchan* c, ...)
+runtime·chanrecv2(Hchan* c, ...)
{
int32 o;
byte *ae, *ac;
if(c == nil)
- runtime·panicstring("range over nil channel");
+ runtime·panicstring("receive from nil channel");
o = runtime·rnd(sizeof(c), Structrnd);
ae = (byte*)&c + o;
@@ -490,9 +496,35 @@ runtime·selectnbsend(Hchan *c, ...)
//
#pragma textflag 7
void
-runtime·selectnbrecv(byte *v, Hchan *c, bool ok)
+runtime·selectnbrecv(byte *v, Hchan *c, bool selected)
{
- runtime·chanrecv(c, v, &ok, nil);
+ runtime·chanrecv(c, v, &selected, nil);
+}
+
+// func selectnbrecv2(elem *any, ok *bool, c chan any) bool
+//
+// compiler implements
+//
+// select {
+// case v, ok = <-c:
+// ... foo
+// default:
+// ... bar
+// }
+//
+// as
+//
+// if c != nil && selectnbrecv2(&v, &ok, c) {
+// ... foo
+// } else {
+// ... bar
+// }
+//
+#pragma textflag 7
+void
+runtime·selectnbrecv2(byte *v, bool *received, Hchan *c, bool selected)
+{
+ runtime·chanrecv(c, v, &selected, received);
}
static void newselect(int32, Select**);
@@ -530,19 +562,30 @@ newselect(int32 size, Select **selp)
runtime·printf("newselect s=%p size=%d\n", sel, size);
}
+// cut in half to give stack a chance to split
+static void selectsend(Select **selp, Hchan *c, void *pc);
+
// selectsend(sel *byte, hchan *chan any, elem any) (selected bool);
#pragma textflag 7
void
runtime·selectsend(Select *sel, Hchan *c, ...)
{
- int32 i, eo;
- Scase *cas;
- byte *ae;
-
// nil cases do not compete
if(c == nil)
return;
+
+ selectsend(&sel, c, runtime·getcallerpc(&sel));
+}
+static void
+selectsend(Select **selp, Hchan *c, void *pc)
+{
+ int32 i, eo;
+ Scase *cas;
+ byte *ae;
+ Select *sel;
+
+ sel = *selp;
i = sel->ncase;
if(i >= sel->tcase)
runtime·throw("selectsend: too many cases");
@@ -550,67 +593,88 @@ runtime·selectsend(Select *sel, Hchan *c, ...)
cas = runtime·mal(sizeof *cas + c->elemsize - sizeof(cas->u.elem));
sel->scase[i] = cas;
- cas->pc = runtime·getcallerpc(&sel);
+ cas->pc = pc;
cas->chan = c;
eo = runtime·rnd(sizeof(sel), sizeof(c));
eo = runtime·rnd(eo+sizeof(c), c->elemsize);
cas->so = runtime·rnd(eo+c->elemsize, Structrnd);
- cas->send = 1;
+ cas->kind = CaseSend;
- ae = (byte*)&sel + eo;
+ ae = (byte*)selp + eo;
c->elemalg->copy(c->elemsize, cas->u.elem, ae);
if(debug)
- runtime·printf("selectsend s=%p pc=%p chan=%p so=%d send=%d\n",
- sel, cas->pc, cas->chan, cas->so, cas->send);
+ runtime·printf("selectsend s=%p pc=%p chan=%p so=%d\n",
+ sel, cas->pc, cas->chan, cas->so);
}
+// cut in half to give stack a chance to split
+static void selectrecv(Select *sel, Hchan *c, void *pc, void *elem, bool*, int32 so);
+
// selectrecv(sel *byte, hchan *chan any, elem *any) (selected bool);
#pragma textflag 7
void
-runtime·selectrecv(Select *sel, Hchan *c, ...)
+runtime·selectrecv(Select *sel, Hchan *c, void *elem, bool selected)
{
- int32 i, eo;
- Scase *cas;
+ // nil cases do not compete
+ if(c == nil)
+ return;
+
+ selectrecv(sel, c, runtime·getcallerpc(&sel), elem, nil, (byte*)&selected - (byte*)&sel);
+}
+// selectrecv2(sel *byte, hchan *chan any, elem *any, received *bool) (selected bool);
+#pragma textflag 7
+void
+runtime·selectrecv2(Select *sel, Hchan *c, void *elem, bool *received, bool selected)
+{
// nil cases do not compete
if(c == nil)
return;
+ selectrecv(sel, c, runtime·getcallerpc(&sel), elem, received, (byte*)&selected - (byte*)&sel);
+}
+
+static void
+selectrecv(Select *sel, Hchan *c, void *pc, void *elem, bool *received, int32 so)
+{
+ int32 i;
+ Scase *cas;
+
i = sel->ncase;
if(i >= sel->tcase)
runtime·throw("selectrecv: too many cases");
sel->ncase = i+1;
cas = runtime·mal(sizeof *cas);
sel->scase[i] = cas;
- cas->pc = runtime·getcallerpc(&sel);
+ cas->pc = pc;
cas->chan = c;
- eo = runtime·rnd(sizeof(sel), sizeof(c));
- eo = runtime·rnd(eo+sizeof(c), sizeof(byte*));
- cas->so = runtime·rnd(eo+sizeof(byte*), Structrnd);
- cas->send = 0;
- cas->u.elemp = *(byte**)((byte*)&sel + eo);
+ cas->so = so;
+ cas->kind = CaseRecv;
+ cas->u.recv.elemp = elem;
+ cas->u.recv.receivedp = nil;
+ cas->u.recv.receivedp = received;
if(debug)
- runtime·printf("selectrecv s=%p pc=%p chan=%p so=%d send=%d\n",
- sel, cas->pc, cas->chan, cas->so, cas->send);
+ runtime·printf("selectrecv s=%p pc=%p chan=%p so=%d\n",
+ sel, cas->pc, cas->chan, cas->so);
}
-
-static void selectdefault(Select*, void*);
+// cut in half to give stack a chance to split
+static void selectdefault(Select*, void*, int32);
// selectdefault(sel *byte) (selected bool);
#pragma textflag 7
void
-runtime·selectdefault(Select *sel, ...)
+runtime·selectdefault(Select *sel, bool selected)
{
- selectdefault(sel, runtime·getcallerpc(&sel));
+ selectdefault(sel, runtime·getcallerpc(&sel), (byte*)&selected - (byte*)&sel);
}
static void
-selectdefault(Select *sel, void *callerpc)
+selectdefault(Select *sel, void *callerpc, int32 so)
{
int32 i;
Scase *cas;
@@ -624,13 +688,12 @@ selectdefault(Select *sel, void *callerpc)
cas->pc = callerpc;
cas->chan = nil;
- cas->so = runtime·rnd(sizeof(sel), Structrnd);
- cas->send = 2;
- cas->u.elemp = nil;
+ cas->so = so;
+ cas->kind = CaseDefault;
if(debug)
- runtime·printf("selectdefault s=%p pc=%p so=%d send=%d\n",
- sel, cas->pc, cas->so, cas->send);
+ runtime·printf("selectdefault s=%p pc=%p so=%d\n",
+ sel, cas->pc, cas->so);
}
static void
@@ -747,8 +810,8 @@ loop:
cas = sel->scase[o];
c = cas->chan;
- switch(cas->send) {
- case 0: // recv
+ switch(cas->kind) {
+ case CaseRecv:
if(c->dataqsiz > 0) {
if(c->qcount > 0)
goto asyncrecv;
@@ -757,12 +820,12 @@ loop:
if(sg != nil)
goto syncrecv;
}
- if(c->closed & Wclosed)
+ if(c->closed)
goto rclose;
break;
- case 1: // send
- if(c->closed & Wclosed)
+ case CaseSend:
+ if(c->closed)
goto sclose;
if(c->dataqsiz > 0) {
if(c->qcount < c->dataqsiz)
@@ -774,7 +837,7 @@ loop:
}
break;
- case 2: // default
+ case CaseDefault:
dfl = cas;
break;
}
@@ -794,12 +857,12 @@ loop:
sg = allocsg(c);
sg->offset = o;
- switch(cas->send) {
- case 0: // recv
+ switch(cas->kind) {
+ case CaseRecv:
enqueue(&c->recvq, sg);
break;
- case 1: // send
+ case CaseSend:
if(c->dataqsiz == 0)
c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
enqueue(&c->sendq, sg);
@@ -821,7 +884,7 @@ loop:
if(sg == nil || i != sg->offset) {
cas = sel->scase[i];
c = cas->chan;
- if(cas->send)
+ if(cas->kind == CaseSend)
dequeueg(&c->sendq, c);
else
dequeueg(&c->recvq, c);
@@ -841,12 +904,14 @@ loop:
}
if(debug)
- runtime·printf("wait-return: sel=%p c=%p cas=%p send=%d o=%d\n",
- sel, c, cas, cas->send, o);
-
- if(!cas->send) {
- if(cas->u.elemp != nil)
- c->elemalg->copy(c->elemsize, cas->u.elemp, sg->elem);
+ runtime·printf("wait-return: sel=%p c=%p cas=%p kind=%d o=%d\n",
+ sel, c, cas, cas->kind, o);
+
+ if(cas->kind == CaseRecv) {
+ if(cas->u.recv.receivedp != nil)
+ *cas->u.recv.receivedp = true;
+ if(cas->u.recv.elemp != nil)
+ c->elemalg->copy(c->elemsize, cas->u.recv.elemp, sg->elem);
c->elemalg->copy(c->elemsize, sg->elem, nil);
}
@@ -855,10 +920,13 @@ loop:
asyncrecv:
// can receive from buffer
- if(cas->u.elemp != nil)
- c->elemalg->copy(c->elemsize, cas->u.elemp, c->recvdataq->elem);
- c->elemalg->copy(c->elemsize, c->recvdataq->elem, nil);
- c->recvdataq = c->recvdataq->link;
+ if(cas->u.recv.receivedp != nil)
+ *cas->u.recv.receivedp = true;
+ if(cas->u.recv.elemp != nil)
+ c->elemalg->copy(c->elemsize, cas->u.recv.elemp, chanbuf(c, c->recvx));
+ c->elemalg->copy(c->elemsize, chanbuf(c, c->recvx), nil);
+ if(++c->recvx == c->dataqsiz)
+ c->recvx = 0;
c->qcount--;
sg = dequeue(&c->sendq, c);
if(sg != nil) {
@@ -871,8 +939,9 @@ asyncrecv:
asyncsend:
// can send to buffer
if(cas->u.elem != nil)
- c->elemalg->copy(c->elemsize, c->senddataq->elem, cas->u.elem);
- c->senddataq = c->senddataq->link;
+ c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), cas->u.elem);
+ if(++c->sendx == c->dataqsiz)
+ c->sendx = 0;
c->qcount++;
sg = dequeue(&c->recvq, c);
if(sg != nil) {
@@ -886,8 +955,10 @@ syncrecv:
// can receive from sleeping sender (sg)
if(debug)
runtime·printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o);
- if(cas->u.elemp != nil)
- c->elemalg->copy(c->elemsize, cas->u.elemp, sg->elem);
+ if(cas->u.recv.receivedp != nil)
+ *cas->u.recv.receivedp = true;
+ if(cas->u.recv.elemp != nil)
+ c->elemalg->copy(c->elemsize, cas->u.recv.elemp, sg->elem);
c->elemalg->copy(c->elemsize, sg->elem, nil);
gp = sg->g;
gp->param = sg;
@@ -896,16 +967,17 @@ syncrecv:
rclose:
// read at end of closed channel
- if(cas->u.elemp != nil)
- c->elemalg->copy(c->elemsize, cas->u.elemp, nil);
- c->closed |= Rclosed;
+ if(cas->u.recv.receivedp != nil)
+ *cas->u.recv.receivedp = false;
+ if(cas->u.recv.elemp != nil)
+ c->elemalg->copy(c->elemsize, cas->u.recv.elemp, nil);
goto retc;
syncsend:
// can send to sleeping receiver (sg)
if(debug)
runtime·printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o);
- if(c->closed & Wclosed)
+ if(c->closed)
goto sclose;
c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
gp = sg->g;
@@ -916,7 +988,6 @@ retc:
selunlock(sel);
// return to pc corresponding to chosen case
-
pc = cas->pc;
as = (byte*)selp + cas->so;
freesel(sel);
@@ -941,12 +1012,12 @@ runtime·closechan(Hchan *c)
runtime·gosched();
runtime·lock(c);
- if(c->closed & Wclosed) {
+ if(c->closed) {
runtime·unlock(c);
runtime·panicstring("close of closed channel");
}
- c->closed |= Wclosed;
+ c->closed = true;
// release all readers
for(;;) {
@@ -979,12 +1050,6 @@ runtime·chanclose(Hchan *c)
runtime·closechan(c);
}
-bool
-runtime·chanclosed(Hchan *c)
-{
- return (c->closed & Rclosed) != 0;
-}
-
int32
runtime·chanlen(Hchan *c)
{
@@ -997,15 +1062,6 @@ runtime·chancap(Hchan *c)
return c->dataqsiz;
}
-
-// closedchan(sel *byte) bool;
-void
-runtime·closedchan(Hchan *c, bool closed)
-{
- closed = runtime·chanclosed(c);
- FLUSH(&closed);
-}
-
static SudoG*
dequeue(WaitQ *q, Hchan *c)
{