summaryrefslogtreecommitdiff
path: root/src/sync/waitgroup.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/waitgroup.go')
-rw-r--r--src/sync/waitgroup.go137
1 files changed, 137 insertions, 0 deletions
diff --git a/src/sync/waitgroup.go b/src/sync/waitgroup.go
new file mode 100644
index 000000000..92cc57d2c
--- /dev/null
+++ b/src/sync/waitgroup.go
@@ -0,0 +1,137 @@
+// Copyright 2011 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sync
+
+import (
+ "sync/atomic"
+ "unsafe"
+)
+
+// A WaitGroup waits for a collection of goroutines to finish.
+// The main goroutine calls Add to set the number of
+// goroutines to wait for. Then each of the goroutines
+// runs and calls Done when finished. At the same time,
+// Wait can be used to block until all goroutines have finished.
+type WaitGroup struct {
+ m Mutex
+ counter int32
+ waiters int32
+ sema *uint32
+}
+
+// WaitGroup creates a new semaphore each time the old semaphore
+// is released. This is to avoid the following race:
+//
+// G1: Add(1)
+// G1: go G2()
+// G1: Wait() // Context switch after Unlock() and before Semacquire().
+// G2: Done() // Release semaphore: sema == 1, waiters == 0. G1 doesn't run yet.
+// G3: Wait() // Finds counter == 0, waiters == 0, doesn't block.
+// G3: Add(1) // Makes counter == 1, waiters == 0.
+// G3: go G4()
+// G3: Wait() // G1 still hasn't run, G3 finds sema == 1, unblocked! Bug.
+
+// Add adds delta, which may be negative, to the WaitGroup counter.
+// If the counter becomes zero, all goroutines blocked on Wait are released.
+// If the counter goes negative, Add panics.
+//
+// Note that calls with a positive delta that occur when the counter is zero
+// must happen before a Wait. Calls with a negative delta, or calls with a
+// positive delta that start when the counter is greater than zero, may happen
+// at any time.
+// Typically this means the calls to Add should execute before the statement
+// creating the goroutine or other event to be waited for.
+// See the WaitGroup example.
+func (wg *WaitGroup) Add(delta int) {
+ if raceenabled {
+ _ = wg.m.state // trigger nil deref early
+ if delta < 0 {
+ // Synchronize decrements with Wait.
+ raceReleaseMerge(unsafe.Pointer(wg))
+ }
+ raceDisable()
+ defer raceEnable()
+ }
+ v := atomic.AddInt32(&wg.counter, int32(delta))
+ if raceenabled {
+ if delta > 0 && v == int32(delta) {
+ // The first increment must be synchronized with Wait.
+ // Need to model this as a read, because there can be
+ // several concurrent wg.counter transitions from 0.
+ raceRead(unsafe.Pointer(&wg.sema))
+ }
+ }
+ if v < 0 {
+ panic("sync: negative WaitGroup counter")
+ }
+ if v > 0 || atomic.LoadInt32(&wg.waiters) == 0 {
+ return
+ }
+ wg.m.Lock()
+ if atomic.LoadInt32(&wg.counter) == 0 {
+ for i := int32(0); i < wg.waiters; i++ {
+ runtime_Semrelease(wg.sema)
+ }
+ wg.waiters = 0
+ wg.sema = nil
+ }
+ wg.m.Unlock()
+}
+
+// Done decrements the WaitGroup counter.
+func (wg *WaitGroup) Done() {
+ wg.Add(-1)
+}
+
+// Wait blocks until the WaitGroup counter is zero.
+func (wg *WaitGroup) Wait() {
+ if raceenabled {
+ _ = wg.m.state // trigger nil deref early
+ raceDisable()
+ }
+ if atomic.LoadInt32(&wg.counter) == 0 {
+ if raceenabled {
+ raceEnable()
+ raceAcquire(unsafe.Pointer(wg))
+ }
+ return
+ }
+ wg.m.Lock()
+ w := atomic.AddInt32(&wg.waiters, 1)
+ // This code is racing with the unlocked path in Add above.
+ // The code above modifies counter and then reads waiters.
+ // We must modify waiters and then read counter (the opposite order)
+ // to avoid missing an Add.
+ if atomic.LoadInt32(&wg.counter) == 0 {
+ atomic.AddInt32(&wg.waiters, -1)
+ if raceenabled {
+ raceEnable()
+ raceAcquire(unsafe.Pointer(wg))
+ raceDisable()
+ }
+ wg.m.Unlock()
+ if raceenabled {
+ raceEnable()
+ }
+ return
+ }
+ if raceenabled && w == 1 {
+ // Wait must be synchronized with the first Add.
+ // Need to model this is as a write to race with the read in Add.
+ // As a consequence, can do the write only for the first waiter,
+ // otherwise concurrent Waits will race with each other.
+ raceWrite(unsafe.Pointer(&wg.sema))
+ }
+ if wg.sema == nil {
+ wg.sema = new(uint32)
+ }
+ s := wg.sema
+ wg.m.Unlock()
+ runtime_Semacquire(s)
+ if raceenabled {
+ raceEnable()
+ raceAcquire(unsafe.Pointer(wg))
+ }
+}