diff options
author | Ondřej Surý <ondrej@sury.org> | 2011-08-03 16:54:30 +0200 |
---|---|---|
committer | Ondřej Surý <ondrej@sury.org> | 2011-08-03 16:54:30 +0200 |
commit | 28592ee1ea1f5cdffcf85472f9de0285d928cf12 (patch) | |
tree | 32944e18b23f7fe4a0818a694aa2a6dfb1835463 /src/pkg/sync | |
parent | e836bee4716dc0d4d913537ad3ad1925a7ac32d0 (diff) | |
download | golang-upstream/59.tar.gz |
Imported Upstream version 59upstream/59
Diffstat (limited to 'src/pkg/sync')
-rw-r--r-- | src/pkg/sync/mutex.go | 63 | ||||
-rw-r--r-- | src/pkg/sync/mutex_test.go | 101 | ||||
-rw-r--r-- | src/pkg/sync/once.go | 14 | ||||
-rw-r--r-- | src/pkg/sync/once_test.go | 25 |
4 files changed, 168 insertions, 35 deletions
diff --git a/src/pkg/sync/mutex.go b/src/pkg/sync/mutex.go index 13f03cad3..2d46c8994 100644 --- a/src/pkg/sync/mutex.go +++ b/src/pkg/sync/mutex.go @@ -17,8 +17,8 @@ import ( // Mutexes can be created as part of other structures; // the zero value for a Mutex is an unlocked mutex. type Mutex struct { - key int32 - sema uint32 + state int32 + sema uint32 } // A Locker represents an object that can be locked and unlocked. @@ -27,15 +27,41 @@ type Locker interface { Unlock() } +const ( + mutexLocked = 1 << iota // mutex is locked + mutexWoken + mutexWaiterShift = iota +) + // Lock locks m. // If the lock is already in use, the calling goroutine // blocks until the mutex is available. func (m *Mutex) Lock() { - if atomic.AddInt32(&m.key, 1) == 1 { - // changed from 0 to 1; we hold lock + // Fast path: grab unlocked mutex. + if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } - runtime.Semacquire(&m.sema) + + awoke := false + for { + old := m.state + new := old | mutexLocked + if old&mutexLocked != 0 { + new = old + 1<<mutexWaiterShift + } + if awoke { + // The goroutine has been woken from sleep, + // so we need to reset the flag in either case. + new &^= mutexWoken + } + if atomic.CompareAndSwapInt32(&m.state, old, new) { + if old&mutexLocked == 0 { + break + } + runtime.Semacquire(&m.sema) + awoke = true + } + } } // Unlock unlocks m. @@ -45,14 +71,25 @@ func (m *Mutex) Lock() { // It is allowed for one goroutine to lock a Mutex and then // arrange for another goroutine to unlock it. func (m *Mutex) Unlock() { - switch v := atomic.AddInt32(&m.key, -1); { - case v == 0: - // changed from 1 to 0; no contention - return - case v == -1: - // changed from 0 to -1: wasn't locked - // (or there are 4 billion goroutines waiting) + // Fast path: drop lock bit. + new := atomic.AddInt32(&m.state, -mutexLocked) + if (new+mutexLocked)&mutexLocked == 0 { panic("sync: unlock of unlocked mutex") } - runtime.Semrelease(&m.sema) + + old := new + for { + // If there are no waiters or a goroutine has already + // been woken or grabbed the lock, no need to wake anyone. + if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { + return + } + // Grab the right to wake someone. + new = (old - 1<<mutexWaiterShift) | mutexWoken + if atomic.CompareAndSwapInt32(&m.state, old, new) { + runtime.Semrelease(&m.sema) + return + } + old = m.state + } } diff --git a/src/pkg/sync/mutex_test.go b/src/pkg/sync/mutex_test.go index f5c20ca49..d5ada8567 100644 --- a/src/pkg/sync/mutex_test.go +++ b/src/pkg/sync/mutex_test.go @@ -9,6 +9,7 @@ package sync_test import ( "runtime" . "sync" + "sync/atomic" "testing" ) @@ -43,7 +44,7 @@ func BenchmarkContendedSemaphore(b *testing.B) { s := new(uint32) *s = 1 c := make(chan bool) - runtime.GOMAXPROCS(2) + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(2)) b.StartTimer() go HammerSemaphore(s, b.N/2, c) @@ -72,24 +73,6 @@ func TestMutex(t *testing.T) { } } -func BenchmarkUncontendedMutex(b *testing.B) { - m := new(Mutex) - HammerMutex(m, b.N, make(chan bool, 2)) -} - -func BenchmarkContendedMutex(b *testing.B) { - b.StopTimer() - m := new(Mutex) - c := make(chan bool) - runtime.GOMAXPROCS(2) - b.StartTimer() - - go HammerMutex(m, b.N/2, c) - go HammerMutex(m, b.N/2, c) - <-c - <-c -} - func TestMutexPanic(t *testing.T) { defer func() { if recover() == nil { @@ -102,3 +85,83 @@ func TestMutexPanic(t *testing.T) { mu.Unlock() mu.Unlock() } + +func BenchmarkMutexUncontended(b *testing.B) { + type PaddedMutex struct { + Mutex + pad [128]uint8 + } + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + for p := 0; p < procs; p++ { + go func() { + var mu PaddedMutex + for atomic.AddInt32(&N, -1) >= 0 { + runtime.Gosched() + for g := 0; g < CallsPerSched; g++ { + mu.Lock() + mu.Unlock() + } + } + c <- true + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func benchmarkMutex(b *testing.B, slack, work bool) { + const ( + CallsPerSched = 1000 + LocalWork = 100 + GoroutineSlack = 10 + ) + procs := runtime.GOMAXPROCS(-1) + if slack { + procs *= GoroutineSlack + } + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + var mu Mutex + for p := 0; p < procs; p++ { + go func() { + foo := 0 + for atomic.AddInt32(&N, -1) >= 0 { + runtime.Gosched() + for g := 0; g < CallsPerSched; g++ { + mu.Lock() + mu.Unlock() + if work { + for i := 0; i < LocalWork; i++ { + foo *= 2 + foo /= 2 + } + } + } + } + c <- foo == 42 + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func BenchmarkMutex(b *testing.B) { + benchmarkMutex(b, false, false) +} + +func BenchmarkMutexSlack(b *testing.B) { + benchmarkMutex(b, true, false) +} + +func BenchmarkMutexWork(b *testing.B) { + benchmarkMutex(b, false, true) +} + +func BenchmarkMutexWorkSlack(b *testing.B) { + benchmarkMutex(b, true, true) +} diff --git a/src/pkg/sync/once.go b/src/pkg/sync/once.go index b6f5f5a87..447b71dcb 100644 --- a/src/pkg/sync/once.go +++ b/src/pkg/sync/once.go @@ -4,10 +4,14 @@ package sync +import ( + "sync/atomic" +) + // Once is an object that will perform exactly one action. type Once struct { m Mutex - done bool + done int32 } // Do calls the function f if and only if the method is being called for the @@ -26,10 +30,14 @@ type Once struct { // Do to be called, it will deadlock. // func (o *Once) Do(f func()) { + if atomic.AddInt32(&o.done, 0) == 1 { + return + } + // Slow-path. o.m.Lock() defer o.m.Unlock() - if !o.done { - o.done = true + if o.done == 0 { f() + atomic.CompareAndSwapInt32(&o.done, 0, 1) } } diff --git a/src/pkg/sync/once_test.go b/src/pkg/sync/once_test.go index 155954a49..157a3667a 100644 --- a/src/pkg/sync/once_test.go +++ b/src/pkg/sync/once_test.go @@ -6,6 +6,8 @@ package sync_test import ( . "sync" + "sync/atomic" + "runtime" "testing" ) @@ -35,3 +37,26 @@ func TestOnce(t *testing.T) { t.Errorf("once failed: %d is not 1", *o) } } + +func BenchmarkOnce(b *testing.B) { + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + var once Once + f := func() {} + c := make(chan bool, procs) + for p := 0; p < procs; p++ { + go func() { + for atomic.AddInt32(&N, -1) >= 0 { + runtime.Gosched() + for g := 0; g < CallsPerSched; g++ { + once.Do(f) + } + } + c <- true + }() + } + for p := 0; p < procs; p++ { + <-c + } +} |