summaryrefslogtreecommitdiff
path: root/src/pkg/sync/waitgroup.go
blob: ca38837833e8613ee08c481224b0b9e912e51895 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package sync

import (
	"sync/atomic"
	"unsafe"
)

// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for.  Then each of the goroutines
// runs and calls Done when finished.  At the same time,
// Wait can be used to block until all goroutines have finished.
type WaitGroup struct {
	m       Mutex
	counter int32
	waiters int32
	sema    *uint32
}

// WaitGroup creates a new semaphore each time the old semaphore
// is released. This is to avoid the following race:
//
// G1: Add(1)
// G1: go G2()
// G1: Wait() // Context switch after Unlock() and before Semacquire().
// G2: Done() // Release semaphore: sema == 1, waiters == 0. G1 doesn't run yet.
// G3: Wait() // Finds counter == 0, waiters == 0, doesn't block.
// G3: Add(1) // Makes counter == 1, waiters == 0.
// G3: go G4()
// G3: Wait() // G1 still hasn't run, G3 finds sema == 1, unblocked! Bug.

// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with positive delta must happen before the call to Wait,
// or else Wait may wait for too small a group. Typically this means the calls
// to Add should execute before the statement creating the goroutine or
// other event to be waited for. See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
	if raceenabled {
		_ = wg.m.state
		raceReleaseMerge(unsafe.Pointer(wg))
		raceDisable()
		defer raceEnable()
	}
	v := atomic.AddInt32(&wg.counter, int32(delta))
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
	if v > 0 || atomic.LoadInt32(&wg.waiters) == 0 {
		return
	}
	wg.m.Lock()
	for i := int32(0); i < wg.waiters; i++ {
		runtime_Semrelease(wg.sema)
	}
	wg.waiters = 0
	wg.sema = nil
	wg.m.Unlock()
}

// Done decrements the WaitGroup counter.
func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
	if raceenabled {
		_ = wg.m.state
		raceDisable()
	}
	if atomic.LoadInt32(&wg.counter) == 0 {
		if raceenabled {
			raceEnable()
			raceAcquire(unsafe.Pointer(wg))
		}
		return
	}
	wg.m.Lock()
	atomic.AddInt32(&wg.waiters, 1)
	// This code is racing with the unlocked path in Add above.
	// The code above modifies counter and then reads waiters.
	// We must modify waiters and then read counter (the opposite order)
	// to avoid missing an Add.
	if atomic.LoadInt32(&wg.counter) == 0 {
		atomic.AddInt32(&wg.waiters, -1)
		if raceenabled {
			raceEnable()
			raceAcquire(unsafe.Pointer(wg))
			raceDisable()
		}
		wg.m.Unlock()
		if raceenabled {
			raceEnable()
		}
		return
	}
	if wg.sema == nil {
		wg.sema = new(uint32)
	}
	s := wg.sema
	wg.m.Unlock()
	runtime_Semacquire(s)
	if raceenabled {
		raceEnable()
		raceAcquire(unsafe.Pointer(wg))
	}
}