diff options
Diffstat (limited to 'src/pkg/runtime/chan.c')
| -rw-r--r-- | src/pkg/runtime/chan.c | 346 |
1 files changed, 201 insertions, 145 deletions
diff --git a/src/pkg/runtime/chan.c b/src/pkg/runtime/chan.c index 3177c2295..8c45b076d 100644 --- a/src/pkg/runtime/chan.c +++ b/src/pkg/runtime/chan.c @@ -5,13 +5,9 @@ #include "runtime.h" #include "type.h" -static int32 debug = 0; +#define MAXALIGN 7 -enum -{ - Wclosed = 0x0001, // writer has closed - Rclosed = 0x0002, // reader has seen close -}; +static int32 debug = 0; typedef struct Link Link; typedef struct WaitQ WaitQ; @@ -40,32 +36,47 @@ struct Hchan uint32 qcount; // total data in the q uint32 dataqsiz; // size of the circular q uint16 elemsize; - uint16 closed; // Wclosed Rclosed errorcount + bool closed; uint8 elemalign; Alg* elemalg; // interface for element type - Link* senddataq; // pointer for sender - Link* recvdataq; // pointer for receiver + uint32 sendx; // send index + uint32 recvx; // receive index WaitQ recvq; // list of recv waiters WaitQ sendq; // list of send waiters SudoG* free; // freelist Lock; }; +// Buffer follows Hchan immediately in memory. +// chanbuf(c, i) is pointer to the i'th slot in the buffer. +#define chanbuf(c, i) ((byte*)((c)+1)+(uintptr)(c)->elemsize*(i)) + struct Link { Link* link; // asynch queue circular linked list byte elem[8]; // asynch queue data element (+ more) }; +enum +{ + // Scase.kind + CaseRecv, + CaseSend, + CaseDefault, +}; + struct Scase { Hchan* chan; // chan byte* pc; // return pc - uint16 send; // 0-recv 1-send 2-default + uint16 kind; uint16 so; // vararg of selected bool union { - byte elem[8]; // element (send) - byte* elemp; // pointer to element (recv) + byte elem[2*sizeof(void*)]; // element (send) + struct { + byte* elemp; // pointer to element (recv) + bool* receivedp; // pointer to received bool (recv2) + } recv; } u; }; @@ -90,7 +101,8 @@ Hchan* runtime·makechan_c(Type *elem, int64 hint) { Hchan *c; - int32 i; + int32 n; + byte *by; if(hint < 0 || (int32)hint != hint || hint > ((uintptr)-1) / elem->size) runtime·panicstring("makechan: size out of range"); @@ -100,32 +112,22 @@ runtime·makechan_c(Type *elem, int64 hint) runtime·throw("runtime.makechan: unsupported elem type"); } - c = runtime·mal(sizeof(*c)); + // calculate rounded size of Hchan + n = sizeof(*c); + while(n & MAXALIGN) + n++; + + // allocate memory in one call + by = runtime·mal(n + hint*elem->size); + + c = (Hchan*)by; + by += n; runtime·addfinalizer(c, destroychan, 0); c->elemsize = elem->size; c->elemalg = &runtime·algarray[elem->alg]; c->elemalign = elem->align; - - if(hint > 0) { - Link *d, *b, *e; - - // make a circular q - b = nil; - e = nil; - for(i=0; i<hint; i++) { - d = runtime·mal(sizeof(*d) + c->elemsize - sizeof(d->elem)); - if(e == nil) - e = d; - d->link = b; - b = d; - } - e->link = b; - c->recvdataq = b; - c->senddataq = b; - c->qcount = 0; - c->dataqsiz = hint; - } + c->dataqsiz = hint; if(debug) runtime·printf("makechan: chan=%p; elemsize=%D; elemalg=%d; elemalign=%d; dataqsiz=%d\n", @@ -183,7 +185,7 @@ runtime·chansend(Hchan *c, byte *ep, bool *pres) runtime·lock(c); loop: - if(c->closed & Wclosed) + if(c->closed) goto closed; if(c->dataqsiz > 0) @@ -228,7 +230,7 @@ loop: return; asynch: - if(c->closed & Wclosed) + if(c->closed) goto closed; if(c->qcount >= c->dataqsiz) { @@ -247,8 +249,9 @@ asynch: goto asynch; } if(ep != nil) - c->elemalg->copy(c->elemsize, c->senddataq->elem, ep); - c->senddataq = c->senddataq->link; + c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep); + if(++c->sendx == c->dataqsiz) + c->sendx = 0; c->qcount++; sg = dequeue(&c->recvq, c); @@ -269,7 +272,7 @@ closed: } void -runtime·chanrecv(Hchan* c, byte *ep, bool *pres, bool *closed) +runtime·chanrecv(Hchan* c, byte *ep, bool *selected, bool *received) { SudoG *sg; G *gp; @@ -284,14 +287,12 @@ runtime·chanrecv(Hchan* c, byte *ep, bool *pres, bool *closed) runtime·printf("chanrecv: chan=%p\n", c); runtime·lock(c); - if(closed != nil) - *closed = false; loop: if(c->dataqsiz > 0) goto asynch; - if(c->closed & Wclosed) + if(c->closed) goto closed; sg = dequeue(&c->sendq, c); @@ -305,14 +306,16 @@ loop: runtime·unlock(c); runtime·ready(gp); - if(pres != nil) - *pres = true; + if(selected != nil) + *selected = true; + if(received != nil) + *received = true; return; } - if(pres != nil) { + if(selected != nil) { runtime·unlock(c); - *pres = false; + *selected = false; return; } @@ -331,18 +334,22 @@ loop: if(ep != nil) c->elemalg->copy(c->elemsize, ep, sg->elem); c->elemalg->copy(c->elemsize, sg->elem, nil); + if(received != nil) + *received = true; freesg(c, sg); runtime·unlock(c); return; asynch: if(c->qcount <= 0) { - if(c->closed & Wclosed) + if(c->closed) goto closed; - if(pres != nil) { + if(selected != nil) { runtime·unlock(c); - *pres = false; + *selected = false; + if(received != nil) + *received = false; return; } sg = allocsg(c); @@ -355,9 +362,10 @@ asynch: goto asynch; } if(ep != nil) - c->elemalg->copy(c->elemsize, ep, c->recvdataq->elem); - c->elemalg->copy(c->elemsize, c->recvdataq->elem, nil); - c->recvdataq = c->recvdataq->link; + c->elemalg->copy(c->elemsize, ep, chanbuf(c, c->recvx)); + c->elemalg->copy(c->elemsize, chanbuf(c, c->recvx), nil); + if(++c->recvx == c->dataqsiz) + c->recvx = 0; c->qcount--; sg = dequeue(&c->sendq, c); if(sg != nil) { @@ -365,24 +373,22 @@ asynch: freesg(c, sg); runtime·unlock(c); runtime·ready(gp); - if(pres != nil) - *pres = true; - return; - } + } else + runtime·unlock(c); - runtime·unlock(c); - if(pres != nil) - *pres = true; + if(selected != nil) + *selected = true; + if(received != nil) + *received = true; return; closed: - if(closed != nil) - *closed = true; if(ep != nil) c->elemalg->copy(c->elemsize, ep, nil); - c->closed |= Rclosed; - if(pres != nil) - *pres = true; + if(selected != nil) + *selected = true; + if(received != nil) + *received = false; runtime·unlock(c); } @@ -416,16 +422,16 @@ runtime·chanrecv1(Hchan* c, ...) runtime·chanrecv(c, ae, nil, nil); } -// chanrecv3(hchan *chan any) (elem any, closed bool); +// chanrecv2(hchan *chan any) (elem any, received bool); #pragma textflag 7 void -runtime·chanrecv3(Hchan* c, ...) +runtime·chanrecv2(Hchan* c, ...) { int32 o; byte *ae, *ac; if(c == nil) - runtime·panicstring("range over nil channel"); + runtime·panicstring("receive from nil channel"); o = runtime·rnd(sizeof(c), Structrnd); ae = (byte*)&c + o; @@ -490,9 +496,35 @@ runtime·selectnbsend(Hchan *c, ...) // #pragma textflag 7 void -runtime·selectnbrecv(byte *v, Hchan *c, bool ok) +runtime·selectnbrecv(byte *v, Hchan *c, bool selected) { - runtime·chanrecv(c, v, &ok, nil); + runtime·chanrecv(c, v, &selected, nil); +} + +// func selectnbrecv2(elem *any, ok *bool, c chan any) bool +// +// compiler implements +// +// select { +// case v, ok = <-c: +// ... foo +// default: +// ... bar +// } +// +// as +// +// if c != nil && selectnbrecv2(&v, &ok, c) { +// ... foo +// } else { +// ... bar +// } +// +#pragma textflag 7 +void +runtime·selectnbrecv2(byte *v, bool *received, Hchan *c, bool selected) +{ + runtime·chanrecv(c, v, &selected, received); } static void newselect(int32, Select**); @@ -530,19 +562,30 @@ newselect(int32 size, Select **selp) runtime·printf("newselect s=%p size=%d\n", sel, size); } +// cut in half to give stack a chance to split +static void selectsend(Select **selp, Hchan *c, void *pc); + // selectsend(sel *byte, hchan *chan any, elem any) (selected bool); #pragma textflag 7 void runtime·selectsend(Select *sel, Hchan *c, ...) { - int32 i, eo; - Scase *cas; - byte *ae; - // nil cases do not compete if(c == nil) return; + + selectsend(&sel, c, runtime·getcallerpc(&sel)); +} +static void +selectsend(Select **selp, Hchan *c, void *pc) +{ + int32 i, eo; + Scase *cas; + byte *ae; + Select *sel; + + sel = *selp; i = sel->ncase; if(i >= sel->tcase) runtime·throw("selectsend: too many cases"); @@ -550,67 +593,88 @@ runtime·selectsend(Select *sel, Hchan *c, ...) cas = runtime·mal(sizeof *cas + c->elemsize - sizeof(cas->u.elem)); sel->scase[i] = cas; - cas->pc = runtime·getcallerpc(&sel); + cas->pc = pc; cas->chan = c; eo = runtime·rnd(sizeof(sel), sizeof(c)); eo = runtime·rnd(eo+sizeof(c), c->elemsize); cas->so = runtime·rnd(eo+c->elemsize, Structrnd); - cas->send = 1; + cas->kind = CaseSend; - ae = (byte*)&sel + eo; + ae = (byte*)selp + eo; c->elemalg->copy(c->elemsize, cas->u.elem, ae); if(debug) - runtime·printf("selectsend s=%p pc=%p chan=%p so=%d send=%d\n", - sel, cas->pc, cas->chan, cas->so, cas->send); + runtime·printf("selectsend s=%p pc=%p chan=%p so=%d\n", + sel, cas->pc, cas->chan, cas->so); } +// cut in half to give stack a chance to split +static void selectrecv(Select *sel, Hchan *c, void *pc, void *elem, bool*, int32 so); + // selectrecv(sel *byte, hchan *chan any, elem *any) (selected bool); #pragma textflag 7 void -runtime·selectrecv(Select *sel, Hchan *c, ...) +runtime·selectrecv(Select *sel, Hchan *c, void *elem, bool selected) { - int32 i, eo; - Scase *cas; + // nil cases do not compete + if(c == nil) + return; + + selectrecv(sel, c, runtime·getcallerpc(&sel), elem, nil, (byte*)&selected - (byte*)&sel); +} +// selectrecv2(sel *byte, hchan *chan any, elem *any, received *bool) (selected bool); +#pragma textflag 7 +void +runtime·selectrecv2(Select *sel, Hchan *c, void *elem, bool *received, bool selected) +{ // nil cases do not compete if(c == nil) return; + selectrecv(sel, c, runtime·getcallerpc(&sel), elem, received, (byte*)&selected - (byte*)&sel); +} + +static void +selectrecv(Select *sel, Hchan *c, void *pc, void *elem, bool *received, int32 so) +{ + int32 i; + Scase *cas; + i = sel->ncase; if(i >= sel->tcase) runtime·throw("selectrecv: too many cases"); sel->ncase = i+1; cas = runtime·mal(sizeof *cas); sel->scase[i] = cas; - cas->pc = runtime·getcallerpc(&sel); + cas->pc = pc; cas->chan = c; - eo = runtime·rnd(sizeof(sel), sizeof(c)); - eo = runtime·rnd(eo+sizeof(c), sizeof(byte*)); - cas->so = runtime·rnd(eo+sizeof(byte*), Structrnd); - cas->send = 0; - cas->u.elemp = *(byte**)((byte*)&sel + eo); + cas->so = so; + cas->kind = CaseRecv; + cas->u.recv.elemp = elem; + cas->u.recv.receivedp = nil; + cas->u.recv.receivedp = received; if(debug) - runtime·printf("selectrecv s=%p pc=%p chan=%p so=%d send=%d\n", - sel, cas->pc, cas->chan, cas->so, cas->send); + runtime·printf("selectrecv s=%p pc=%p chan=%p so=%d\n", + sel, cas->pc, cas->chan, cas->so); } - -static void selectdefault(Select*, void*); +// cut in half to give stack a chance to split +static void selectdefault(Select*, void*, int32); // selectdefault(sel *byte) (selected bool); #pragma textflag 7 void -runtime·selectdefault(Select *sel, ...) +runtime·selectdefault(Select *sel, bool selected) { - selectdefault(sel, runtime·getcallerpc(&sel)); + selectdefault(sel, runtime·getcallerpc(&sel), (byte*)&selected - (byte*)&sel); } static void -selectdefault(Select *sel, void *callerpc) +selectdefault(Select *sel, void *callerpc, int32 so) { int32 i; Scase *cas; @@ -624,13 +688,12 @@ selectdefault(Select *sel, void *callerpc) cas->pc = callerpc; cas->chan = nil; - cas->so = runtime·rnd(sizeof(sel), Structrnd); - cas->send = 2; - cas->u.elemp = nil; + cas->so = so; + cas->kind = CaseDefault; if(debug) - runtime·printf("selectdefault s=%p pc=%p so=%d send=%d\n", - sel, cas->pc, cas->so, cas->send); + runtime·printf("selectdefault s=%p pc=%p so=%d\n", + sel, cas->pc, cas->so); } static void @@ -747,8 +810,8 @@ loop: cas = sel->scase[o]; c = cas->chan; - switch(cas->send) { - case 0: // recv + switch(cas->kind) { + case CaseRecv: if(c->dataqsiz > 0) { if(c->qcount > 0) goto asyncrecv; @@ -757,12 +820,12 @@ loop: if(sg != nil) goto syncrecv; } - if(c->closed & Wclosed) + if(c->closed) goto rclose; break; - case 1: // send - if(c->closed & Wclosed) + case CaseSend: + if(c->closed) goto sclose; if(c->dataqsiz > 0) { if(c->qcount < c->dataqsiz) @@ -774,7 +837,7 @@ loop: } break; - case 2: // default + case CaseDefault: dfl = cas; break; } @@ -794,12 +857,12 @@ loop: sg = allocsg(c); sg->offset = o; - switch(cas->send) { - case 0: // recv + switch(cas->kind) { + case CaseRecv: enqueue(&c->recvq, sg); break; - case 1: // send + case CaseSend: if(c->dataqsiz == 0) c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem); enqueue(&c->sendq, sg); @@ -821,7 +884,7 @@ loop: if(sg == nil || i != sg->offset) { cas = sel->scase[i]; c = cas->chan; - if(cas->send) + if(cas->kind == CaseSend) dequeueg(&c->sendq, c); else dequeueg(&c->recvq, c); @@ -841,12 +904,14 @@ loop: } if(debug) - runtime·printf("wait-return: sel=%p c=%p cas=%p send=%d o=%d\n", - sel, c, cas, cas->send, o); - - if(!cas->send) { - if(cas->u.elemp != nil) - c->elemalg->copy(c->elemsize, cas->u.elemp, sg->elem); + runtime·printf("wait-return: sel=%p c=%p cas=%p kind=%d o=%d\n", + sel, c, cas, cas->kind, o); + + if(cas->kind == CaseRecv) { + if(cas->u.recv.receivedp != nil) + *cas->u.recv.receivedp = true; + if(cas->u.recv.elemp != nil) + c->elemalg->copy(c->elemsize, cas->u.recv.elemp, sg->elem); c->elemalg->copy(c->elemsize, sg->elem, nil); } @@ -855,10 +920,13 @@ loop: asyncrecv: // can receive from buffer - if(cas->u.elemp != nil) - c->elemalg->copy(c->elemsize, cas->u.elemp, c->recvdataq->elem); - c->elemalg->copy(c->elemsize, c->recvdataq->elem, nil); - c->recvdataq = c->recvdataq->link; + if(cas->u.recv.receivedp != nil) + *cas->u.recv.receivedp = true; + if(cas->u.recv.elemp != nil) + c->elemalg->copy(c->elemsize, cas->u.recv.elemp, chanbuf(c, c->recvx)); + c->elemalg->copy(c->elemsize, chanbuf(c, c->recvx), nil); + if(++c->recvx == c->dataqsiz) + c->recvx = 0; c->qcount--; sg = dequeue(&c->sendq, c); if(sg != nil) { @@ -871,8 +939,9 @@ asyncrecv: asyncsend: // can send to buffer if(cas->u.elem != nil) - c->elemalg->copy(c->elemsize, c->senddataq->elem, cas->u.elem); - c->senddataq = c->senddataq->link; + c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), cas->u.elem); + if(++c->sendx == c->dataqsiz) + c->sendx = 0; c->qcount++; sg = dequeue(&c->recvq, c); if(sg != nil) { @@ -886,8 +955,10 @@ syncrecv: // can receive from sleeping sender (sg) if(debug) runtime·printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o); - if(cas->u.elemp != nil) - c->elemalg->copy(c->elemsize, cas->u.elemp, sg->elem); + if(cas->u.recv.receivedp != nil) + *cas->u.recv.receivedp = true; + if(cas->u.recv.elemp != nil) + c->elemalg->copy(c->elemsize, cas->u.recv.elemp, sg->elem); c->elemalg->copy(c->elemsize, sg->elem, nil); gp = sg->g; gp->param = sg; @@ -896,16 +967,17 @@ syncrecv: rclose: // read at end of closed channel - if(cas->u.elemp != nil) - c->elemalg->copy(c->elemsize, cas->u.elemp, nil); - c->closed |= Rclosed; + if(cas->u.recv.receivedp != nil) + *cas->u.recv.receivedp = false; + if(cas->u.recv.elemp != nil) + c->elemalg->copy(c->elemsize, cas->u.recv.elemp, nil); goto retc; syncsend: // can send to sleeping receiver (sg) if(debug) runtime·printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o); - if(c->closed & Wclosed) + if(c->closed) goto sclose; c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem); gp = sg->g; @@ -916,7 +988,6 @@ retc: selunlock(sel); // return to pc corresponding to chosen case - pc = cas->pc; as = (byte*)selp + cas->so; freesel(sel); @@ -941,12 +1012,12 @@ runtime·closechan(Hchan *c) runtime·gosched(); runtime·lock(c); - if(c->closed & Wclosed) { + if(c->closed) { runtime·unlock(c); runtime·panicstring("close of closed channel"); } - c->closed |= Wclosed; + c->closed = true; // release all readers for(;;) { @@ -979,12 +1050,6 @@ runtime·chanclose(Hchan *c) runtime·closechan(c); } -bool -runtime·chanclosed(Hchan *c) -{ - return (c->closed & Rclosed) != 0; -} - int32 runtime·chanlen(Hchan *c) { @@ -997,15 +1062,6 @@ runtime·chancap(Hchan *c) return c->dataqsiz; } - -// closedchan(sel *byte) bool; -void -runtime·closedchan(Hchan *c, bool closed) -{ - closed = runtime·chanclosed(c); - FLUSH(&closed); -} - static SudoG* dequeue(WaitQ *q, Hchan *c) { |
