// Copyright 2009 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Semaphore implementation exposed to Go. // Intended use is provide a sleep and wakeup // primitive that can be used in the contended case // of other synchronization primitives. // Thus it targets the same goal as Linux's futex, // but it has much simpler semantics. // // That is, don't think of these as semaphores. // Think of them as a way to implement sleep and wakeup // such that every sleep is paired with a single wakeup, // even if, due to races, the wakeup happens before the sleep. // // See Mullender and Cox, ``Semaphores in Plan 9,'' // http://swtch.com/semaphore.pdf package sync #include "runtime.h" #include "arch_GOARCH.h" #include "../../cmd/ld/textflag.h" typedef struct SemaWaiter SemaWaiter; struct SemaWaiter { uint32 volatile* addr; G* g; int64 releasetime; int32 nrelease; // -1 for acquire SemaWaiter* prev; SemaWaiter* next; }; typedef struct SemaRoot SemaRoot; struct SemaRoot { Lock; SemaWaiter* head; SemaWaiter* tail; // Number of waiters. Read w/o the lock. uint32 volatile nwait; }; // Prime to not correlate with any user patterns. #define SEMTABLESZ 251 struct semtable { SemaRoot; uint8 pad[CacheLineSize-sizeof(SemaRoot)]; }; #pragma dataflag NOPTR /* mark semtable as 'no pointers', hiding from garbage collector */ static struct semtable semtable[SEMTABLESZ]; static SemaRoot* semroot(uint32 *addr) { return &semtable[((uintptr)addr >> 3) % SEMTABLESZ]; } static void semqueue(SemaRoot *root, uint32 volatile *addr, SemaWaiter *s) { s->g = g; s->addr = addr; s->next = nil; s->prev = root->tail; if(root->tail) root->tail->next = s; else root->head = s; root->tail = s; } static void semdequeue(SemaRoot *root, SemaWaiter *s) { if(s->next) s->next->prev = s->prev; else root->tail = s->prev; if(s->prev) s->prev->next = s->next; else root->head = s->next; s->prev = nil; s->next = nil; } static int32 cansemacquire(uint32 *addr) { uint32 v; while((v = runtime·atomicload(addr)) > 0) if(runtime·cas(addr, v, v-1)) return 1; return 0; } void runtime·semacquire(uint32 volatile *addr, bool profile) { SemaWaiter s; // Needs to be allocated on stack, otherwise garbage collector could deallocate it SemaRoot *root; int64 t0; // Easy case. if(cansemacquire(addr)) return; // Harder case: // increment waiter count // try cansemacquire one more time, return if succeeded // enqueue itself as a waiter // sleep // (waiter descriptor is dequeued by signaler) root = semroot(addr); t0 = 0; s.releasetime = 0; if(profile && runtime·blockprofilerate > 0) { t0 = runtime·cputicks(); s.releasetime = -1; } for(;;) { runtime·lock(root); // Add ourselves to nwait to disable "easy case" in semrelease. runtime·xadd(&root->nwait, 1); // Check cansemacquire to avoid missed wakeup. if(cansemacquire(addr)) { runtime·xadd(&root->nwait, -1); runtime·unlock(root); return; } // Any semrelease after the cansemacquire knows we're waiting // (we set nwait above), so go to sleep. semqueue(root, addr, &s); runtime·park(runtime·unlock, root, "semacquire"); if(cansemacquire(addr)) { if(t0) runtime·blockevent(s.releasetime - t0, 3); return; } } } void runtime·semrelease(uint32 volatile *addr) { SemaWaiter *s; SemaRoot *root; root = semroot(addr); runtime·xadd(addr, 1); // Easy case: no waiters? // This check must happen after the xadd, to avoid a missed wakeup // (see loop in semacquire). if(runtime·atomicload(&root->nwait) == 0) return; // Harder case: search for a waiter and wake it. runtime·lock(root); if(runtime·atomicload(&root->nwait) == 0) { // The count is already consumed by another goroutine, // so no need to wake up another goroutine. runtime·unlock(root); return; } for(s = root->head; s; s = s->next) { if(s->addr == addr) { runtime·xadd(&root->nwait, -1); semdequeue(root, s); break; } } runtime·unlock(root); if(s) { if(s->releasetime) s->releasetime = runtime·cputicks(); runtime·ready(s->g); } } // TODO(dvyukov): move to netpoll.goc once it's used by all OSes. void net·runtime_Semacquire(uint32 *addr) { runtime·semacquire(addr, true); } void net·runtime_Semrelease(uint32 *addr) { runtime·semrelease(addr); } func runtime_Semacquire(addr *uint32) { runtime·semacquire(addr, true); } func runtime_Semrelease(addr *uint32) { runtime·semrelease(addr); } typedef struct SyncSema SyncSema; struct SyncSema { Lock; SemaWaiter* head; SemaWaiter* tail; }; func runtime_Syncsemcheck(size uintptr) { if(size != sizeof(SyncSema)) { runtime·printf("bad SyncSema size: sync:%D runtime:%D\n", (int64)size, (int64)sizeof(SyncSema)); runtime·throw("bad SyncSema size"); } } // Syncsemacquire waits for a pairing Syncsemrelease on the same semaphore s. func runtime_Syncsemacquire(s *SyncSema) { SemaWaiter w, *wake; int64 t0; w.g = g; w.nrelease = -1; w.next = nil; w.releasetime = 0; t0 = 0; if(runtime·blockprofilerate > 0) { t0 = runtime·cputicks(); w.releasetime = -1; } runtime·lock(s); if(s->head && s->head->nrelease > 0) { // have pending release, consume it wake = nil; s->head->nrelease--; if(s->head->nrelease == 0) { wake = s->head; s->head = wake->next; if(s->head == nil) s->tail = nil; } runtime·unlock(s); if(wake) runtime·ready(wake->g); } else { // enqueue itself if(s->tail == nil) s->head = &w; else s->tail->next = &w; s->tail = &w; runtime·park(runtime·unlock, s, "semacquire"); if(t0) runtime·blockevent(w.releasetime - t0, 2); } } // Syncsemrelease waits for n pairing Syncsemacquire on the same semaphore s. func runtime_Syncsemrelease(s *SyncSema, n uint32) { SemaWaiter w, *wake; w.g = g; w.nrelease = (int32)n; w.next = nil; w.releasetime = 0; runtime·lock(s); while(w.nrelease > 0 && s->head && s->head->nrelease < 0) { // have pending acquire, satisfy it wake = s->head; s->head = wake->next; if(s->head == nil) s->tail = nil; if(wake->releasetime) wake->releasetime = runtime·cputicks(); runtime·ready(wake->g); w.nrelease--; } if(w.nrelease > 0) { // enqueue itself if(s->tail == nil) s->head = &w; else s->tail->next = &w; s->tail = &w; runtime·park(runtime·unlock, s, "semarelease"); } else runtime·unlock(s); }