diff options
Diffstat (limited to 'src/pkg/sync')
-rw-r--r-- | src/pkg/sync/atomic/asm_386.s | 14 | ||||
-rw-r--r-- | src/pkg/sync/atomic/asm_amd64.s | 4 | ||||
-rw-r--r-- | src/pkg/sync/atomic/asm_amd64p32.s | 159 | ||||
-rw-r--r-- | src/pkg/sync/atomic/asm_linux_arm.s | 12 | ||||
-rw-r--r-- | src/pkg/sync/atomic/atomic_test.go | 7 | ||||
-rw-r--r-- | src/pkg/sync/atomic/export_linux_arm_test.go | 2 | ||||
-rw-r--r-- | src/pkg/sync/mutex_test.go | 72 | ||||
-rw-r--r-- | src/pkg/sync/once_test.go | 25 | ||||
-rw-r--r-- | src/pkg/sync/pool.go | 223 | ||||
-rw-r--r-- | src/pkg/sync/pool_test.go | 151 | ||||
-rw-r--r-- | src/pkg/sync/runtime_sema_test.go | 85 | ||||
-rw-r--r-- | src/pkg/sync/rwmutex_test.go | 79 | ||||
-rw-r--r-- | src/pkg/sync/waitgroup.go | 10 | ||||
-rw-r--r-- | src/pkg/sync/waitgroup_test.go | 125 |
14 files changed, 695 insertions, 273 deletions
diff --git a/src/pkg/sync/atomic/asm_386.s b/src/pkg/sync/atomic/asm_386.s index eaa72eabb..807c2f873 100644 --- a/src/pkg/sync/atomic/asm_386.s +++ b/src/pkg/sync/atomic/asm_386.s @@ -13,7 +13,7 @@ TEXT ·SwapUint32(SB),NOSPLIT,$0-12 MOVL addr+0(FP), BP MOVL new+4(FP), AX XCHGL AX, 0(BP) - MOVL AX, new+8(FP) + MOVL AX, old+8(FP) RET TEXT ·SwapInt64(SB),NOSPLIT,$0-20 @@ -43,8 +43,8 @@ swaploop: // success // return DX:AX - MOVL AX, new_lo+12(FP) - MOVL DX, new_hi+16(FP) + MOVL AX, old_lo+12(FP) + MOVL DX, old_hi+16(FP) RET TEXT ·SwapUintptr(SB),NOSPLIT,$0-12 @@ -155,10 +155,10 @@ TEXT ·LoadUint32(SB),NOSPLIT,$0-8 MOVL AX, val+4(FP) RET -TEXT ·LoadInt64(SB),NOSPLIT,$0-16 +TEXT ·LoadInt64(SB),NOSPLIT,$0-12 JMP ·LoadUint64(SB) -TEXT ·LoadUint64(SB),NOSPLIT,$0-16 +TEXT ·LoadUint64(SB),NOSPLIT,$0-12 MOVL addr+0(FP), AX TESTL $7, AX JZ 2(PC) @@ -186,10 +186,10 @@ TEXT ·StoreUint32(SB),NOSPLIT,$0-8 XCHGL AX, 0(BP) RET -TEXT ·StoreInt64(SB),NOSPLIT,$0-16 +TEXT ·StoreInt64(SB),NOSPLIT,$0-12 JMP ·StoreUint64(SB) -TEXT ·StoreUint64(SB),NOSPLIT,$0-16 +TEXT ·StoreUint64(SB),NOSPLIT,$0-12 MOVL addr+0(FP), AX TESTL $7, AX JZ 2(PC) diff --git a/src/pkg/sync/atomic/asm_amd64.s b/src/pkg/sync/atomic/asm_amd64.s index 0900492dc..77afa129e 100644 --- a/src/pkg/sync/atomic/asm_amd64.s +++ b/src/pkg/sync/atomic/asm_amd64.s @@ -13,7 +13,7 @@ TEXT ·SwapUint32(SB),NOSPLIT,$0-20 MOVQ addr+0(FP), BP MOVL new+8(FP), AX XCHGL AX, 0(BP) - MOVL AX, new+16(FP) + MOVL AX, old+16(FP) RET TEXT ·SwapInt64(SB),NOSPLIT,$0-24 @@ -23,7 +23,7 @@ TEXT ·SwapUint64(SB),NOSPLIT,$0-24 MOVQ addr+0(FP), BP MOVQ new+8(FP), AX XCHGQ AX, 0(BP) - MOVQ AX, new+16(FP) + MOVQ AX, old+16(FP) RET TEXT ·SwapUintptr(SB),NOSPLIT,$0-24 diff --git a/src/pkg/sync/atomic/asm_amd64p32.s b/src/pkg/sync/atomic/asm_amd64p32.s new file mode 100644 index 000000000..b24ae7a59 --- /dev/null +++ b/src/pkg/sync/atomic/asm_amd64p32.s @@ -0,0 +1,159 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +#include "../../../cmd/ld/textflag.h" + +TEXT ·SwapInt32(SB),NOSPLIT,$0-12 + JMP ·SwapUint32(SB) + +TEXT ·SwapUint32(SB),NOSPLIT,$0-12 + MOVL addr+0(FP), BX + MOVL new+4(FP), AX + XCHGL AX, 0(BX) + MOVL AX, old+8(FP) + RET + +TEXT ·SwapInt64(SB),NOSPLIT,$0-24 + JMP ·SwapUint64(SB) + +TEXT ·SwapUint64(SB),NOSPLIT,$0-24 + MOVL addr+0(FP), BX + TESTL $7, BX + JZ 2(PC) + MOVL 0, BX // crash with nil ptr deref + MOVQ new+8(FP), AX + XCHGQ AX, 0(BX) + MOVQ AX, old+16(FP) + RET + +TEXT ·SwapUintptr(SB),NOSPLIT,$0-12 + JMP ·SwapUint32(SB) + +TEXT ·SwapPointer(SB),NOSPLIT,$0-12 + JMP ·SwapUint32(SB) + +TEXT ·CompareAndSwapInt32(SB),NOSPLIT,$0-17 + JMP ·CompareAndSwapUint32(SB) + +TEXT ·CompareAndSwapUint32(SB),NOSPLIT,$0-17 + MOVL addr+0(FP), BX + MOVL old+4(FP), AX + MOVL new+8(FP), CX + LOCK + CMPXCHGL CX, 0(BX) + SETEQ swapped+16(FP) + RET + +TEXT ·CompareAndSwapUintptr(SB),NOSPLIT,$0-17 + JMP ·CompareAndSwapUint32(SB) + +TEXT ·CompareAndSwapPointer(SB),NOSPLIT,$0-17 + JMP ·CompareAndSwapUint32(SB) + +TEXT ·CompareAndSwapInt64(SB),NOSPLIT,$0-25 + JMP ·CompareAndSwapUint64(SB) + +TEXT ·CompareAndSwapUint64(SB),NOSPLIT,$0-25 + MOVL addr+0(FP), BX + TESTL $7, BX + JZ 2(PC) + MOVL 0, BX // crash with nil ptr deref + MOVQ old+8(FP), AX + MOVQ new+16(FP), CX + LOCK + CMPXCHGQ CX, 0(BX) + SETEQ swapped+24(FP) + RET + +TEXT ·AddInt32(SB),NOSPLIT,$0-12 + JMP ·AddUint32(SB) + +TEXT ·AddUint32(SB),NOSPLIT,$0-12 + MOVL addr+0(FP), BX + MOVL delta+4(FP), AX + MOVL AX, CX + LOCK + XADDL AX, 0(BX) + ADDL AX, CX + MOVL CX, new+8(FP) + RET + +TEXT ·AddUintptr(SB),NOSPLIT,$0-12 + JMP ·AddUint32(SB) + +TEXT ·AddInt64(SB),NOSPLIT,$0-24 + JMP ·AddUint64(SB) + +TEXT ·AddUint64(SB),NOSPLIT,$0-24 + MOVL addr+0(FP), BX + TESTL $7, BX + JZ 2(PC) + MOVL 0, BX // crash with nil ptr deref + MOVQ delta+8(FP), AX + MOVQ AX, CX + LOCK + XADDQ AX, 0(BX) + ADDQ AX, CX + MOVQ CX, new+16(FP) + RET + +TEXT ·LoadInt32(SB),NOSPLIT,$0-12 + JMP ·LoadUint32(SB) + +TEXT ·LoadUint32(SB),NOSPLIT,$0-12 + MOVL addr+0(FP), AX + MOVL 0(AX), AX + MOVL AX, val+8(FP) + RET + +TEXT ·LoadInt64(SB),NOSPLIT,$0-16 + JMP ·LoadUint64(SB) + +TEXT ·LoadUint64(SB),NOSPLIT,$0-16 + MOVL addr+0(FP), AX + TESTL $7, AX + JZ 2(PC) + MOVL 0, AX // crash with nil ptr deref + MOVQ 0(AX), AX + MOVQ AX, val+8(FP) + RET + +TEXT ·LoadUintptr(SB),NOSPLIT,$0-12 + JMP ·LoadPointer(SB) + +TEXT ·LoadPointer(SB),NOSPLIT,$0-12 + MOVL addr+0(FP), AX + MOVL 0(AX), AX + MOVL AX, val+8(FP) + RET + +TEXT ·StoreInt32(SB),NOSPLIT,$0-8 + JMP ·StoreUint32(SB) + +TEXT ·StoreUint32(SB),NOSPLIT,$0-8 + MOVL addr+0(FP), BX + MOVL val+4(FP), AX + XCHGL AX, 0(BX) + RET + +TEXT ·StoreInt64(SB),NOSPLIT,$0-16 + JMP ·StoreUint64(SB) + +TEXT ·StoreUint64(SB),NOSPLIT,$0-16 + MOVL addr+0(FP), BX + TESTL $7, BX + JZ 2(PC) + MOVL 0, BX // crash with nil ptr deref + MOVQ val+8(FP), AX + XCHGQ AX, 0(BX) + RET + +TEXT ·StoreUintptr(SB),NOSPLIT,$0-8 + JMP ·StorePointer(SB) + +TEXT ·StorePointer(SB),NOSPLIT,$0-8 + MOVL addr+0(FP), BX + MOVL val+4(FP), AX + XCHGL AX, 0(BX) + RET diff --git a/src/pkg/sync/atomic/asm_linux_arm.s b/src/pkg/sync/atomic/asm_linux_arm.s index b85ca0a13..27be57aa1 100644 --- a/src/pkg/sync/atomic/asm_linux_arm.s +++ b/src/pkg/sync/atomic/asm_linux_arm.s @@ -42,7 +42,7 @@ casagain: BCC cascheck MOVW $1, R0 casret: - MOVW R0, ret+12(FP) + MOVB R0, swapped+12(FP) RET cascheck: // Kernel lies; double-check. @@ -73,7 +73,7 @@ addloop1: ADD R4, R1 BL cas<>(SB) BCC addloop1 - MOVW R1, ret+8(FP) + MOVW R1, new+8(FP) RET TEXT ·AddUintptr(SB),NOSPLIT,$0 @@ -132,13 +132,13 @@ TEXT ·generalCAS64(SB),NOSPLIT,$20-21 BEQ 2(PC) MOVW R1, (R1) MOVW R0, 4(R13) - MOVW oldlo+4(FP), R1 + MOVW old_lo+4(FP), R1 MOVW R1, 8(R13) - MOVW oldhi+8(FP), R1 + MOVW old_hi+8(FP), R1 MOVW R1, 12(R13) - MOVW newlo+12(FP), R2 + MOVW new_lo+12(FP), R2 MOVW R2, 16(R13) - MOVW newhi+16(FP), R3 + MOVW new_hi+16(FP), R3 MOVW R3, 20(R13) BL runtime·cas64(SB) MOVB R0, ret+20(FP) diff --git a/src/pkg/sync/atomic/atomic_test.go b/src/pkg/sync/atomic/atomic_test.go index e10effe7e..a5f44f70d 100644 --- a/src/pkg/sync/atomic/atomic_test.go +++ b/src/pkg/sync/atomic/atomic_test.go @@ -813,7 +813,7 @@ func hammerSwapUintptr32(uaddr *uint32, count int) { new := uintptr(seed+i)<<16 | uintptr(seed+i)<<16>>16 old := SwapUintptr(addr, new) if old>>16 != old<<16>>16 { - panic(fmt.Sprintf("SwapUintptr is not atomic: %v", old)) + panic(fmt.Sprintf("SwapUintptr is not atomic: %#08x", old)) } } } @@ -827,7 +827,7 @@ func hammerSwapPointer32(uaddr *uint32, count int) { new := uintptr(seed+i)<<16 | uintptr(seed+i)<<16>>16 old := uintptr(SwapPointer(addr, unsafe.Pointer(new))) if old>>16 != old<<16>>16 { - panic(fmt.Sprintf("SwapPointer is not atomic: %v", old)) + panic(fmt.Sprintf("SwapPointer is not atomic: %#08x", old)) } } } @@ -1463,6 +1463,9 @@ func TestUnaligned64(t *testing.T) { } func TestNilDeref(t *testing.T) { + if p := runtime.GOOS + "/" + runtime.GOARCH; p == "freebsd/arm" || p == "netbsd/arm" { + t.Skipf("issue 7338: skipping test on %q", p) + } funcs := [...]func(){ func() { CompareAndSwapInt32(nil, 0, 0) }, func() { CompareAndSwapInt64(nil, 0, 0) }, diff --git a/src/pkg/sync/atomic/export_linux_arm_test.go b/src/pkg/sync/atomic/export_linux_arm_test.go index 8c0b5a75c..5cd43353e 100644 --- a/src/pkg/sync/atomic/export_linux_arm_test.go +++ b/src/pkg/sync/atomic/export_linux_arm_test.go @@ -4,6 +4,6 @@ package atomic -func generalCAS64(*uint64, uint64, uint64) bool +func generalCAS64(addr *uint64, old uint64, new uint64) bool var GeneralCAS64 = generalCAS64 diff --git a/src/pkg/sync/mutex_test.go b/src/pkg/sync/mutex_test.go index bf78c6f60..151b25c10 100644 --- a/src/pkg/sync/mutex_test.go +++ b/src/pkg/sync/mutex_test.go @@ -9,7 +9,6 @@ package sync_test import ( "runtime" . "sync" - "sync/atomic" "testing" ) @@ -90,63 +89,34 @@ func BenchmarkMutexUncontended(b *testing.B) { Mutex pad [128]uint8 } - const CallsPerSched = 1000 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) - for p := 0; p < procs; p++ { - go func() { - var mu PaddedMutex - for atomic.AddInt32(&N, -1) >= 0 { - runtime.Gosched() - for g := 0; g < CallsPerSched; g++ { - mu.Lock() - mu.Unlock() - } - } - c <- true - }() - } - for p := 0; p < procs; p++ { - <-c - } + b.RunParallel(func(pb *testing.PB) { + var mu PaddedMutex + for pb.Next() { + mu.Lock() + mu.Unlock() + } + }) } func benchmarkMutex(b *testing.B, slack, work bool) { - const ( - CallsPerSched = 1000 - LocalWork = 100 - GoroutineSlack = 10 - ) - procs := runtime.GOMAXPROCS(-1) + var mu Mutex if slack { - procs *= GoroutineSlack + b.SetParallelism(10) } - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) - var mu Mutex - for p := 0; p < procs; p++ { - go func() { - foo := 0 - for atomic.AddInt32(&N, -1) >= 0 { - runtime.Gosched() - for g := 0; g < CallsPerSched; g++ { - mu.Lock() - mu.Unlock() - if work { - for i := 0; i < LocalWork; i++ { - foo *= 2 - foo /= 2 - } - } + b.RunParallel(func(pb *testing.PB) { + foo := 0 + for pb.Next() { + mu.Lock() + mu.Unlock() + if work { + for i := 0; i < 100; i++ { + foo *= 2 + foo /= 2 } } - c <- foo == 42 - }() - } - for p := 0; p < procs; p++ { - <-c - } + } + _ = foo + }) } func BenchmarkMutex(b *testing.B) { diff --git a/src/pkg/sync/once_test.go b/src/pkg/sync/once_test.go index 183069a1a..8afda82f3 100644 --- a/src/pkg/sync/once_test.go +++ b/src/pkg/sync/once_test.go @@ -5,9 +5,7 @@ package sync_test import ( - "runtime" . "sync" - "sync/atomic" "testing" ) @@ -62,24 +60,11 @@ func TestOncePanic(t *testing.T) { } func BenchmarkOnce(b *testing.B) { - const CallsPerSched = 1000 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) var once Once f := func() {} - c := make(chan bool, procs) - for p := 0; p < procs; p++ { - go func() { - for atomic.AddInt32(&N, -1) >= 0 { - runtime.Gosched() - for g := 0; g < CallsPerSched; g++ { - once.Do(f) - } - } - c <- true - }() - } - for p := 0; p < procs; p++ { - <-c - } + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + once.Do(f) + } + }) } diff --git a/src/pkg/sync/pool.go b/src/pkg/sync/pool.go new file mode 100644 index 000000000..1f08707cd --- /dev/null +++ b/src/pkg/sync/pool.go @@ -0,0 +1,223 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "runtime" + "sync/atomic" + "unsafe" +) + +// A Pool is a set of temporary objects that may be individually saved and +// retrieved. +// +// Any item stored in the Pool may be removed automatically at any time without +// notification. If the Pool holds the only reference when this happens, the +// item might be deallocated. +// +// A Pool is safe for use by multiple goroutines simultaneously. +// +// Pool's purpose is to cache allocated but unused items for later reuse, +// relieving pressure on the garbage collector. That is, it makes it easy to +// build efficient, thread-safe free lists. However, it is not suitable for all +// free lists. +// +// An appropriate use of a Pool is to manage a group of temporary items +// silently shared among and potentially reused by concurrent independent +// clients of a package. Pool provides a way to amortize allocation overhead +// across many clients. +// +// An example of good use of a Pool is in the fmt package, which maintains a +// dynamically-sized store of temporary output buffers. The store scales under +// load (when many goroutines are actively printing) and shrinks when +// quiescent. +// +// On the other hand, a free list maintained as part of a short-lived object is +// not a suitable use for a Pool, since the overhead does not amortize well in +// that scenario. It is more efficient to have such objects implement their own +// free list. +// +type Pool struct { + local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal + localSize uintptr // size of the local array + + // New optionally specifies a function to generate + // a value when Get would otherwise return nil. + // It may not be changed concurrently with calls to Get. + New func() interface{} +} + +// Local per-P Pool appendix. +type poolLocal struct { + private interface{} // Can be used only by the respective P. + shared []interface{} // Can be used by any P. + Mutex // Protects shared. + pad [128]byte // Prevents false sharing. +} + +// Put adds x to the pool. +func (p *Pool) Put(x interface{}) { + if raceenabled { + // Under race detector the Pool degenerates into no-op. + // It's conforming, simple and does not introduce excessive + // happens-before edges between unrelated goroutines. + return + } + if x == nil { + return + } + l := p.pin() + if l.private == nil { + l.private = x + x = nil + } + runtime_procUnpin() + if x == nil { + return + } + l.Lock() + l.shared = append(l.shared, x) + l.Unlock() +} + +// Get selects an arbitrary item from the Pool, removes it from the +// Pool, and returns it to the caller. +// Get may choose to ignore the pool and treat it as empty. +// Callers should not assume any relation between values passed to Put and +// the values returned by Get. +// +// If Get would otherwise return nil and p.New is non-nil, Get returns +// the result of calling p.New. +func (p *Pool) Get() interface{} { + if raceenabled { + if p.New != nil { + return p.New() + } + return nil + } + l := p.pin() + x := l.private + l.private = nil + runtime_procUnpin() + if x != nil { + return x + } + l.Lock() + last := len(l.shared) - 1 + if last >= 0 { + x = l.shared[last] + l.shared = l.shared[:last] + } + l.Unlock() + if x != nil { + return x + } + return p.getSlow() +} + +func (p *Pool) getSlow() (x interface{}) { + // See the comment in pin regarding ordering of the loads. + size := atomic.LoadUintptr(&p.localSize) // load-acquire + local := p.local // load-consume + // Try to steal one element from other procs. + pid := runtime_procPin() + runtime_procUnpin() + for i := 0; i < int(size); i++ { + l := indexLocal(local, (pid+i+1)%int(size)) + l.Lock() + last := len(l.shared) - 1 + if last >= 0 { + x = l.shared[last] + l.shared = l.shared[:last] + l.Unlock() + break + } + l.Unlock() + } + + if x == nil && p.New != nil { + x = p.New() + } + return x +} + +// pin pins the current goroutine to P, disables preemption and returns poolLocal pool for the P. +// Caller must call runtime_procUnpin() when done with the pool. +func (p *Pool) pin() *poolLocal { + pid := runtime_procPin() + // In pinSlow we store to localSize and then to local, here we load in opposite order. + // Since we've disabled preemption, GC can not happen in between. + // Thus here we must observe local at least as large localSize. + // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness). + s := atomic.LoadUintptr(&p.localSize) // load-acquire + l := p.local // load-consume + if uintptr(pid) < s { + return indexLocal(l, pid) + } + return p.pinSlow() +} + +func (p *Pool) pinSlow() *poolLocal { + // Retry under the mutex. + // Can not lock the mutex while pinned. + runtime_procUnpin() + allPoolsMu.Lock() + defer allPoolsMu.Unlock() + pid := runtime_procPin() + // poolCleanup won't be called while we are pinned. + s := p.localSize + l := p.local + if uintptr(pid) < s { + return indexLocal(l, pid) + } + if p.local == nil { + allPools = append(allPools, p) + } + // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one. + size := runtime.GOMAXPROCS(0) + local := make([]poolLocal, size) + atomic.StorePointer((*unsafe.Pointer)(&p.local), unsafe.Pointer(&local[0])) // store-release + atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release + return &local[pid] +} + +func poolCleanup() { + // This function is called with the world stopped, at the beginning of a garbage collection. + // It must not allocate and probably should not call any runtime functions. + // Defensively zero out everything, 2 reasons: + // 1. To prevent false retention of whole Pools. + // 2. If GC happens while a goroutine works with l.shared in Put/Get, + // it will retain whole Pool. So next cycle memory consumption would be doubled. + for i, p := range allPools { + allPools[i] = nil + for i := 0; i < int(p.localSize); i++ { + l := indexLocal(p.local, i) + l.private = nil + for j := range l.shared { + l.shared[j] = nil + } + l.shared = nil + } + } + allPools = []*Pool{} +} + +var ( + allPoolsMu Mutex + allPools []*Pool +) + +func init() { + runtime_registerPoolCleanup(poolCleanup) +} + +func indexLocal(l unsafe.Pointer, i int) *poolLocal { + return &(*[1000000]poolLocal)(l)[i] +} + +// Implemented in runtime. +func runtime_registerPoolCleanup(cleanup func()) +func runtime_procPin() int +func runtime_procUnpin() diff --git a/src/pkg/sync/pool_test.go b/src/pkg/sync/pool_test.go new file mode 100644 index 000000000..509448b62 --- /dev/null +++ b/src/pkg/sync/pool_test.go @@ -0,0 +1,151 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Pool is no-op under race detector, so all these tests do not work. +// +build !race + +package sync_test + +import ( + "runtime" + "runtime/debug" + . "sync" + "sync/atomic" + "testing" + "time" +) + +func TestPool(t *testing.T) { + // disable GC so we can control when it happens. + defer debug.SetGCPercent(debug.SetGCPercent(-1)) + var p Pool + if p.Get() != nil { + t.Fatal("expected empty") + } + p.Put("a") + p.Put("b") + if g := p.Get(); g != "a" { + t.Fatalf("got %#v; want a", g) + } + if g := p.Get(); g != "b" { + t.Fatalf("got %#v; want b", g) + } + if g := p.Get(); g != nil { + t.Fatalf("got %#v; want nil", g) + } + + p.Put("c") + debug.SetGCPercent(100) // to allow following GC to actually run + runtime.GC() + if g := p.Get(); g != nil { + t.Fatalf("got %#v; want nil after GC", g) + } +} + +func TestPoolNew(t *testing.T) { + // disable GC so we can control when it happens. + defer debug.SetGCPercent(debug.SetGCPercent(-1)) + + i := 0 + p := Pool{ + New: func() interface{} { + i++ + return i + }, + } + if v := p.Get(); v != 1 { + t.Fatalf("got %v; want 1", v) + } + if v := p.Get(); v != 2 { + t.Fatalf("got %v; want 2", v) + } + p.Put(42) + if v := p.Get(); v != 42 { + t.Fatalf("got %v; want 42", v) + } + if v := p.Get(); v != 3 { + t.Fatalf("got %v; want 3", v) + } +} + +// Test that Pool does not hold pointers to previously cached +// resources +func TestPoolGC(t *testing.T) { + var p Pool + var fin uint32 + const N = 100 + for i := 0; i < N; i++ { + v := new(string) + runtime.SetFinalizer(v, func(vv *string) { + atomic.AddUint32(&fin, 1) + }) + p.Put(v) + } + for i := 0; i < N; i++ { + p.Get() + } + for i := 0; i < 5; i++ { + runtime.GC() + time.Sleep(time.Duration(i*100+10) * time.Millisecond) + // 1 pointer can remain on stack or elsewhere + if atomic.LoadUint32(&fin) >= N-1 { + return + } + } + t.Fatalf("only %v out of %v resources are finalized", + atomic.LoadUint32(&fin), N) +} + +func TestPoolStress(t *testing.T) { + const P = 10 + N := int(1e6) + if testing.Short() { + N /= 100 + } + var p Pool + done := make(chan bool) + for i := 0; i < P; i++ { + go func() { + var v interface{} = 0 + for j := 0; j < N; j++ { + if v == nil { + v = 0 + } + p.Put(v) + v = p.Get() + if v != nil && v.(int) != 0 { + t.Fatalf("expect 0, got %v", v) + } + } + done <- true + }() + } + for i := 0; i < P; i++ { + <-done + } +} + +func BenchmarkPool(b *testing.B) { + var p Pool + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + p.Put(1) + p.Get() + } + }) +} + +func BenchmarkPoolOverlflow(b *testing.B) { + var p Pool + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for b := 0; b < 100; b++ { + p.Put(1) + } + for b := 0; b < 100; b++ { + p.Get() + } + } + }) +} diff --git a/src/pkg/sync/runtime_sema_test.go b/src/pkg/sync/runtime_sema_test.go index 57a8dbee7..5b7dd3df3 100644 --- a/src/pkg/sync/runtime_sema_test.go +++ b/src/pkg/sync/runtime_sema_test.go @@ -7,7 +7,6 @@ package sync_test import ( "runtime" . "sync" - "sync/atomic" "testing" ) @@ -16,72 +15,44 @@ func BenchmarkSemaUncontended(b *testing.B) { sem uint32 pad [32]uint32 } - const CallsPerSched = 1000 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) - for p := 0; p < procs; p++ { - go func() { - sem := new(PaddedSem) - for atomic.AddInt32(&N, -1) >= 0 { - runtime.Gosched() - for g := 0; g < CallsPerSched; g++ { - Runtime_Semrelease(&sem.sem) - Runtime_Semacquire(&sem.sem) - } - } - c <- true - }() - } - for p := 0; p < procs; p++ { - <-c - } + b.RunParallel(func(pb *testing.PB) { + sem := new(PaddedSem) + for pb.Next() { + Runtime_Semrelease(&sem.sem) + Runtime_Semacquire(&sem.sem) + } + }) } func benchmarkSema(b *testing.B, block, work bool) { - const CallsPerSched = 1000 - const LocalWork = 100 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) - c2 := make(chan bool, procs/2) sem := uint32(0) if block { - for p := 0; p < procs/2; p++ { - go func() { - Runtime_Semacquire(&sem) - c2 <- true - }() - } - } - for p := 0; p < procs; p++ { + done := make(chan bool) go func() { - foo := 0 - for atomic.AddInt32(&N, -1) >= 0 { - runtime.Gosched() - for g := 0; g < CallsPerSched; g++ { - Runtime_Semrelease(&sem) - if work { - for i := 0; i < LocalWork; i++ { - foo *= 2 - foo /= 2 - } - } - Runtime_Semacquire(&sem) - } + for p := 0; p < runtime.GOMAXPROCS(0)/2; p++ { + Runtime_Semacquire(&sem) } - c <- foo == 42 - Runtime_Semrelease(&sem) + done <- true + }() + defer func() { + <-done }() } - if block { - for p := 0; p < procs/2; p++ { - <-c2 + b.RunParallel(func(pb *testing.PB) { + foo := 0 + for pb.Next() { + Runtime_Semrelease(&sem) + if work { + for i := 0; i < 100; i++ { + foo *= 2 + foo /= 2 + } + } + Runtime_Semacquire(&sem) } - } - for p := 0; p < procs; p++ { - <-c - } + _ = foo + Runtime_Semrelease(&sem) + }) } func BenchmarkSemaSyntNonblock(b *testing.B) { diff --git a/src/pkg/sync/rwmutex_test.go b/src/pkg/sync/rwmutex_test.go index 39d5d6540..0436f9723 100644 --- a/src/pkg/sync/rwmutex_test.go +++ b/src/pkg/sync/rwmutex_test.go @@ -160,64 +160,39 @@ func BenchmarkRWMutexUncontended(b *testing.B) { RWMutex pad [32]uint32 } - const CallsPerSched = 1000 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) - for p := 0; p < procs; p++ { - go func() { - var rwm PaddedRWMutex - for atomic.AddInt32(&N, -1) >= 0 { - runtime.Gosched() - for g := 0; g < CallsPerSched; g++ { - rwm.RLock() - rwm.RLock() - rwm.RUnlock() - rwm.RUnlock() - rwm.Lock() - rwm.Unlock() - } - } - c <- true - }() - } - for p := 0; p < procs; p++ { - <-c - } + b.RunParallel(func(pb *testing.PB) { + var rwm PaddedRWMutex + for pb.Next() { + rwm.RLock() + rwm.RLock() + rwm.RUnlock() + rwm.RUnlock() + rwm.Lock() + rwm.Unlock() + } + }) } func benchmarkRWMutex(b *testing.B, localWork, writeRatio int) { - const CallsPerSched = 1000 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) var rwm RWMutex - for p := 0; p < procs; p++ { - go func() { - foo := 0 - for atomic.AddInt32(&N, -1) >= 0 { - runtime.Gosched() - for g := 0; g < CallsPerSched; g++ { - foo++ - if foo%writeRatio == 0 { - rwm.Lock() - rwm.Unlock() - } else { - rwm.RLock() - for i := 0; i != localWork; i += 1 { - foo *= 2 - foo /= 2 - } - rwm.RUnlock() - } + b.RunParallel(func(pb *testing.PB) { + foo := 0 + for pb.Next() { + foo++ + if foo%writeRatio == 0 { + rwm.Lock() + rwm.Unlock() + } else { + rwm.RLock() + for i := 0; i != localWork; i += 1 { + foo *= 2 + foo /= 2 } + rwm.RUnlock() } - c <- foo == 42 - }() - } - for p := 0; p < procs; p++ { - <-c - } + } + _ = foo + }) } func BenchmarkRWMutexWrite100(b *testing.B) { diff --git a/src/pkg/sync/waitgroup.go b/src/pkg/sync/waitgroup.go index 22681115c..4c64dca39 100644 --- a/src/pkg/sync/waitgroup.go +++ b/src/pkg/sync/waitgroup.go @@ -67,11 +67,13 @@ func (wg *WaitGroup) Add(delta int) { return } wg.m.Lock() - for i := int32(0); i < wg.waiters; i++ { - runtime_Semrelease(wg.sema) + if atomic.LoadInt32(&wg.counter) == 0 { + for i := int32(0); i < wg.waiters; i++ { + runtime_Semrelease(wg.sema) + } + wg.waiters = 0 + wg.sema = nil } - wg.waiters = 0 - wg.sema = nil wg.m.Unlock() } diff --git a/src/pkg/sync/waitgroup_test.go b/src/pkg/sync/waitgroup_test.go index 84c4cfc37..4c0a043c0 100644 --- a/src/pkg/sync/waitgroup_test.go +++ b/src/pkg/sync/waitgroup_test.go @@ -5,7 +5,6 @@ package sync_test import ( - "runtime" . "sync" "sync/atomic" "testing" @@ -61,60 +60,60 @@ func TestWaitGroupMisuse(t *testing.T) { t.Fatal("Should panic") } +func TestWaitGroupRace(t *testing.T) { + // Run this test for about 1ms. + for i := 0; i < 1000; i++ { + wg := &WaitGroup{} + n := new(int32) + // spawn goroutine 1 + wg.Add(1) + go func() { + atomic.AddInt32(n, 1) + wg.Done() + }() + // spawn goroutine 2 + wg.Add(1) + go func() { + atomic.AddInt32(n, 1) + wg.Done() + }() + // Wait for goroutine 1 and 2 + wg.Wait() + if atomic.LoadInt32(n) != 2 { + t.Fatal("Spurious wakeup from Wait") + } + } +} + func BenchmarkWaitGroupUncontended(b *testing.B) { type PaddedWaitGroup struct { WaitGroup pad [128]uint8 } - const CallsPerSched = 1000 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) - for p := 0; p < procs; p++ { - go func() { - var wg PaddedWaitGroup - for atomic.AddInt32(&N, -1) >= 0 { - runtime.Gosched() - for g := 0; g < CallsPerSched; g++ { - wg.Add(1) - wg.Done() - wg.Wait() - } - } - c <- true - }() - } - for p := 0; p < procs; p++ { - <-c - } + b.RunParallel(func(pb *testing.PB) { + var wg PaddedWaitGroup + for pb.Next() { + wg.Add(1) + wg.Done() + wg.Wait() + } + }) } func benchmarkWaitGroupAddDone(b *testing.B, localWork int) { - const CallsPerSched = 1000 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) var wg WaitGroup - for p := 0; p < procs; p++ { - go func() { - foo := 0 - for atomic.AddInt32(&N, -1) >= 0 { - runtime.Gosched() - for g := 0; g < CallsPerSched; g++ { - wg.Add(1) - for i := 0; i < localWork; i++ { - foo *= 2 - foo /= 2 - } - wg.Done() - } + b.RunParallel(func(pb *testing.PB) { + foo := 0 + for pb.Next() { + wg.Add(1) + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 } - c <- foo == 42 - }() - } - for p := 0; p < procs; p++ { - <-c - } + wg.Done() + } + _ = foo + }) } func BenchmarkWaitGroupAddDone(b *testing.B) { @@ -126,34 +125,18 @@ func BenchmarkWaitGroupAddDoneWork(b *testing.B) { } func benchmarkWaitGroupWait(b *testing.B, localWork int) { - const CallsPerSched = 1000 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) var wg WaitGroup - wg.Add(procs) - for p := 0; p < procs; p++ { - go wg.Done() - } - for p := 0; p < procs; p++ { - go func() { - foo := 0 - for atomic.AddInt32(&N, -1) >= 0 { - runtime.Gosched() - for g := 0; g < CallsPerSched; g++ { - wg.Wait() - for i := 0; i < localWork; i++ { - foo *= 2 - foo /= 2 - } - } + b.RunParallel(func(pb *testing.PB) { + foo := 0 + for pb.Next() { + wg.Wait() + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 } - c <- foo == 42 - }() - } - for p := 0; p < procs; p++ { - <-c - } + } + _ = foo + }) } func BenchmarkWaitGroupWait(b *testing.B) { |