diff options
author | Tianon Gravi <admwiggin@gmail.com> | 2015-01-15 11:54:00 -0700 |
---|---|---|
committer | Tianon Gravi <admwiggin@gmail.com> | 2015-01-15 11:54:00 -0700 |
commit | f154da9e12608589e8d5f0508f908a0c3e88a1bb (patch) | |
tree | f8255d51e10c6f1e0ed69702200b966c9556a431 /src/sync/waitgroup.go | |
parent | 8d8329ed5dfb9622c82a9fbec6fd99a580f9c9f6 (diff) | |
download | golang-upstream/1.4.tar.gz |
Imported Upstream version 1.4upstream/1.4
Diffstat (limited to 'src/sync/waitgroup.go')
-rw-r--r-- | src/sync/waitgroup.go | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/src/sync/waitgroup.go b/src/sync/waitgroup.go new file mode 100644 index 000000000..92cc57d2c --- /dev/null +++ b/src/sync/waitgroup.go @@ -0,0 +1,137 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "sync/atomic" + "unsafe" +) + +// A WaitGroup waits for a collection of goroutines to finish. +// The main goroutine calls Add to set the number of +// goroutines to wait for. Then each of the goroutines +// runs and calls Done when finished. At the same time, +// Wait can be used to block until all goroutines have finished. +type WaitGroup struct { + m Mutex + counter int32 + waiters int32 + sema *uint32 +} + +// WaitGroup creates a new semaphore each time the old semaphore +// is released. This is to avoid the following race: +// +// G1: Add(1) +// G1: go G2() +// G1: Wait() // Context switch after Unlock() and before Semacquire(). +// G2: Done() // Release semaphore: sema == 1, waiters == 0. G1 doesn't run yet. +// G3: Wait() // Finds counter == 0, waiters == 0, doesn't block. +// G3: Add(1) // Makes counter == 1, waiters == 0. +// G3: go G4() +// G3: Wait() // G1 still hasn't run, G3 finds sema == 1, unblocked! Bug. + +// Add adds delta, which may be negative, to the WaitGroup counter. +// If the counter becomes zero, all goroutines blocked on Wait are released. +// If the counter goes negative, Add panics. +// +// Note that calls with a positive delta that occur when the counter is zero +// must happen before a Wait. Calls with a negative delta, or calls with a +// positive delta that start when the counter is greater than zero, may happen +// at any time. +// Typically this means the calls to Add should execute before the statement +// creating the goroutine or other event to be waited for. +// See the WaitGroup example. +func (wg *WaitGroup) Add(delta int) { + if raceenabled { + _ = wg.m.state // trigger nil deref early + if delta < 0 { + // Synchronize decrements with Wait. + raceReleaseMerge(unsafe.Pointer(wg)) + } + raceDisable() + defer raceEnable() + } + v := atomic.AddInt32(&wg.counter, int32(delta)) + if raceenabled { + if delta > 0 && v == int32(delta) { + // The first increment must be synchronized with Wait. + // Need to model this as a read, because there can be + // several concurrent wg.counter transitions from 0. + raceRead(unsafe.Pointer(&wg.sema)) + } + } + if v < 0 { + panic("sync: negative WaitGroup counter") + } + if v > 0 || atomic.LoadInt32(&wg.waiters) == 0 { + return + } + wg.m.Lock() + if atomic.LoadInt32(&wg.counter) == 0 { + for i := int32(0); i < wg.waiters; i++ { + runtime_Semrelease(wg.sema) + } + wg.waiters = 0 + wg.sema = nil + } + wg.m.Unlock() +} + +// Done decrements the WaitGroup counter. +func (wg *WaitGroup) Done() { + wg.Add(-1) +} + +// Wait blocks until the WaitGroup counter is zero. +func (wg *WaitGroup) Wait() { + if raceenabled { + _ = wg.m.state // trigger nil deref early + raceDisable() + } + if atomic.LoadInt32(&wg.counter) == 0 { + if raceenabled { + raceEnable() + raceAcquire(unsafe.Pointer(wg)) + } + return + } + wg.m.Lock() + w := atomic.AddInt32(&wg.waiters, 1) + // This code is racing with the unlocked path in Add above. + // The code above modifies counter and then reads waiters. + // We must modify waiters and then read counter (the opposite order) + // to avoid missing an Add. + if atomic.LoadInt32(&wg.counter) == 0 { + atomic.AddInt32(&wg.waiters, -1) + if raceenabled { + raceEnable() + raceAcquire(unsafe.Pointer(wg)) + raceDisable() + } + wg.m.Unlock() + if raceenabled { + raceEnable() + } + return + } + if raceenabled && w == 1 { + // Wait must be synchronized with the first Add. + // Need to model this is as a write to race with the read in Add. + // As a consequence, can do the write only for the first waiter, + // otherwise concurrent Waits will race with each other. + raceWrite(unsafe.Pointer(&wg.sema)) + } + if wg.sema == nil { + wg.sema = new(uint32) + } + s := wg.sema + wg.m.Unlock() + runtime_Semacquire(s) + if raceenabled { + raceEnable() + raceAcquire(unsafe.Pointer(wg)) + } +} |