diff options
Diffstat (limited to 'src/pkg/runtime/chan_test.go')
-rw-r--r-- | src/pkg/runtime/chan_test.go | 637 |
1 files changed, 483 insertions, 154 deletions
diff --git a/src/pkg/runtime/chan_test.go b/src/pkg/runtime/chan_test.go index eb2c7c60d..ce4b39627 100644 --- a/src/pkg/runtime/chan_test.go +++ b/src/pkg/runtime/chan_test.go @@ -9,8 +9,327 @@ import ( "sync" "sync/atomic" "testing" + "time" ) +func TestChan(t *testing.T) { + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(4)) + N := 200 + if testing.Short() { + N = 20 + } + for chanCap := 0; chanCap < N; chanCap++ { + { + // Ensure that receive from empty chan blocks. + c := make(chan int, chanCap) + recv1 := false + go func() { + _ = <-c + recv1 = true + }() + recv2 := false + go func() { + _, _ = <-c + recv2 = true + }() + time.Sleep(time.Millisecond) + if recv1 || recv2 { + t.Fatalf("chan[%d]: receive from empty chan", chanCap) + } + // Ensure that non-blocking receive does not block. + select { + case _ = <-c: + t.Fatalf("chan[%d]: receive from empty chan", chanCap) + default: + } + select { + case _, _ = <-c: + t.Fatalf("chan[%d]: receive from empty chan", chanCap) + default: + } + c <- 0 + c <- 0 + } + + { + // Ensure that send to full chan blocks. + c := make(chan int, chanCap) + for i := 0; i < chanCap; i++ { + c <- i + } + sent := uint32(0) + go func() { + c <- 0 + atomic.StoreUint32(&sent, 1) + }() + time.Sleep(time.Millisecond) + if atomic.LoadUint32(&sent) != 0 { + t.Fatalf("chan[%d]: send to full chan", chanCap) + } + // Ensure that non-blocking send does not block. + select { + case c <- 0: + t.Fatalf("chan[%d]: send to full chan", chanCap) + default: + } + <-c + } + + { + // Ensure that we receive 0 from closed chan. + c := make(chan int, chanCap) + for i := 0; i < chanCap; i++ { + c <- i + } + close(c) + for i := 0; i < chanCap; i++ { + v := <-c + if v != i { + t.Fatalf("chan[%d]: received %v, expected %v", chanCap, v, i) + } + } + if v := <-c; v != 0 { + t.Fatalf("chan[%d]: received %v, expected %v", chanCap, v, 0) + } + if v, ok := <-c; v != 0 || ok { + t.Fatalf("chan[%d]: received %v/%v, expected %v/%v", chanCap, v, ok, 0, false) + } + } + + { + // Ensure that close unblocks receive. + c := make(chan int, chanCap) + done := make(chan bool) + go func() { + v, ok := <-c + done <- v == 0 && ok == false + }() + time.Sleep(time.Millisecond) + close(c) + if !<-done { + t.Fatalf("chan[%d]: received non zero from closed chan", chanCap) + } + } + + { + // Send 100 integers, + // ensure that we receive them non-corrupted in FIFO order. + c := make(chan int, chanCap) + go func() { + for i := 0; i < 100; i++ { + c <- i + } + }() + for i := 0; i < 100; i++ { + v := <-c + if v != i { + t.Fatalf("chan[%d]: received %v, expected %v", chanCap, v, i) + } + } + + // Same, but using recv2. + go func() { + for i := 0; i < 100; i++ { + c <- i + } + }() + for i := 0; i < 100; i++ { + v, ok := <-c + if !ok { + t.Fatalf("chan[%d]: receive failed, expected %v", chanCap, i) + } + if v != i { + t.Fatalf("chan[%d]: received %v, expected %v", chanCap, v, i) + } + } + + // Send 1000 integers in 4 goroutines, + // ensure that we receive what we send. + const P = 4 + const L = 1000 + for p := 0; p < P; p++ { + go func() { + for i := 0; i < L; i++ { + c <- i + } + }() + } + done := make(chan map[int]int) + for p := 0; p < P; p++ { + go func() { + recv := make(map[int]int) + for i := 0; i < L; i++ { + v := <-c + recv[v] = recv[v] + 1 + } + done <- recv + }() + } + recv := make(map[int]int) + for p := 0; p < P; p++ { + for k, v := range <-done { + recv[k] = recv[k] + v + } + } + if len(recv) != L { + t.Fatalf("chan[%d]: received %v values, expected %v", chanCap, len(recv), L) + } + for _, v := range recv { + if v != P { + t.Fatalf("chan[%d]: received %v values, expected %v", chanCap, v, P) + } + } + } + + { + // Test len/cap. + c := make(chan int, chanCap) + if len(c) != 0 || cap(c) != chanCap { + t.Fatalf("chan[%d]: bad len/cap, expect %v/%v, got %v/%v", chanCap, 0, chanCap, len(c), cap(c)) + } + for i := 0; i < chanCap; i++ { + c <- i + } + if len(c) != chanCap || cap(c) != chanCap { + t.Fatalf("chan[%d]: bad len/cap, expect %v/%v, got %v/%v", chanCap, chanCap, chanCap, len(c), cap(c)) + } + } + + } +} + +func TestSelfSelect(t *testing.T) { + // Ensure that send/recv on the same chan in select + // does not crash nor deadlock. + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(2)) + for _, chanCap := range []int{0, 10} { + var wg sync.WaitGroup + wg.Add(2) + c := make(chan int, chanCap) + for p := 0; p < 2; p++ { + p := p + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + if p == 0 || i%2 == 0 { + select { + case c <- p: + case v := <-c: + if chanCap == 0 && v == p { + t.Fatalf("self receive") + } + } + } else { + select { + case v := <-c: + if chanCap == 0 && v == p { + t.Fatalf("self receive") + } + case c <- p: + } + } + } + }() + } + wg.Wait() + } +} + +func TestSelectStress(t *testing.T) { + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(10)) + var c [4]chan int + c[0] = make(chan int) + c[1] = make(chan int) + c[2] = make(chan int, 2) + c[3] = make(chan int, 3) + N := int(1e5) + if testing.Short() { + N /= 10 + } + // There are 4 goroutines that send N values on each of the chans, + // + 4 goroutines that receive N values on each of the chans, + // + 1 goroutine that sends N values on each of the chans in a single select, + // + 1 goroutine that receives N values on each of the chans in a single select. + // All these sends, receives and selects interact chaotically at runtime, + // but we are careful that this whole construct does not deadlock. + var wg sync.WaitGroup + wg.Add(10) + for k := 0; k < 4; k++ { + k := k + go func() { + for i := 0; i < N; i++ { + c[k] <- 0 + } + wg.Done() + }() + go func() { + for i := 0; i < N; i++ { + <-c[k] + } + wg.Done() + }() + } + go func() { + var n [4]int + c1 := c + for i := 0; i < 4*N; i++ { + select { + case c1[3] <- 0: + n[3]++ + if n[3] == N { + c1[3] = nil + } + case c1[2] <- 0: + n[2]++ + if n[2] == N { + c1[2] = nil + } + case c1[0] <- 0: + n[0]++ + if n[0] == N { + c1[0] = nil + } + case c1[1] <- 0: + n[1]++ + if n[1] == N { + c1[1] = nil + } + } + } + wg.Done() + }() + go func() { + var n [4]int + c1 := c + for i := 0; i < 4*N; i++ { + select { + case <-c1[0]: + n[0]++ + if n[0] == N { + c1[0] = nil + } + case <-c1[1]: + n[1]++ + if n[1] == N { + c1[1] = nil + } + case <-c1[2]: + n[2]++ + if n[2] == N { + c1[2] = nil + } + case <-c1[3]: + n[3]++ + if n[3] == N { + c1[3] = nil + } + } + } + wg.Done() + }() + wg.Wait() +} + func TestChanSendInterface(t *testing.T) { type mt struct{} m := &mt{} @@ -29,34 +348,35 @@ func TestChanSendInterface(t *testing.T) { func TestPseudoRandomSend(t *testing.T) { n := 100 - c := make(chan int) - l := make([]int, n) - var m sync.Mutex - m.Lock() - go func() { + for _, chanCap := range []int{0, n} { + c := make(chan int, chanCap) + l := make([]int, n) + var m sync.Mutex + m.Lock() + go func() { + for i := 0; i < n; i++ { + runtime.Gosched() + l[i] = <-c + } + m.Unlock() + }() for i := 0; i < n; i++ { - runtime.Gosched() - l[i] = <-c + select { + case c <- 1: + case c <- 0: + } } - m.Unlock() - }() - for i := 0; i < n; i++ { - select { - case c <- 0: - case c <- 1: + m.Lock() // wait + n0 := 0 + n1 := 0 + for _, i := range l { + n0 += (i + 1) % 2 + n1 += i } - } - m.Lock() // wait - n0 := 0 - n1 := 0 - for _, i := range l { - n0 += (i + 1) % 2 - n1 += i - if n0 > n/10 && n1 > n/10 { - return + if n0 <= n/10 || n1 <= n/10 { + t.Errorf("Want pseudorandom, got %d zeros and %d ones (chan cap %d)", n0, n1, chanCap) } } - t.Errorf("Want pseudo random, got %d zeros and %d ones", n0, n1) } func TestMultiConsumer(t *testing.T) { @@ -110,147 +430,106 @@ func TestMultiConsumer(t *testing.T) { } } +func BenchmarkChanNonblocking(b *testing.B) { + myc := make(chan int) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + select { + case <-myc: + default: + } + } + }) +} + func BenchmarkSelectUncontended(b *testing.B) { - const CallsPerSched = 1000 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) - for p := 0; p < procs; p++ { - go func() { - myc1 := make(chan int, 1) - myc2 := make(chan int, 1) - myc1 <- 0 - for atomic.AddInt32(&N, -1) >= 0 { - for g := 0; g < CallsPerSched; g++ { - select { - case <-myc1: - myc2 <- 0 - case <-myc2: - myc1 <- 0 - } - } + b.RunParallel(func(pb *testing.PB) { + myc1 := make(chan int, 1) + myc2 := make(chan int, 1) + myc1 <- 0 + for pb.Next() { + select { + case <-myc1: + myc2 <- 0 + case <-myc2: + myc1 <- 0 } - c <- true - }() - } - for p := 0; p < procs; p++ { - <-c - } + } + }) } func BenchmarkSelectContended(b *testing.B) { - const CallsPerSched = 1000 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) + procs := runtime.GOMAXPROCS(0) myc1 := make(chan int, procs) myc2 := make(chan int, procs) - for p := 0; p < procs; p++ { + b.RunParallel(func(pb *testing.PB) { myc1 <- 0 - go func() { - for atomic.AddInt32(&N, -1) >= 0 { - for g := 0; g < CallsPerSched; g++ { - select { - case <-myc1: - myc2 <- 0 - case <-myc2: - myc1 <- 0 - } - } + for pb.Next() { + select { + case <-myc1: + myc2 <- 0 + case <-myc2: + myc1 <- 0 } - c <- true - }() - } - for p := 0; p < procs; p++ { - <-c - } + } + }) } func BenchmarkSelectNonblock(b *testing.B) { - const CallsPerSched = 1000 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) - for p := 0; p < procs; p++ { - go func() { - myc1 := make(chan int) - myc2 := make(chan int) - myc3 := make(chan int, 1) - myc4 := make(chan int, 1) - for atomic.AddInt32(&N, -1) >= 0 { - for g := 0; g < CallsPerSched; g++ { - select { - case <-myc1: - default: - } - select { - case myc2 <- 0: - default: - } - select { - case <-myc3: - default: - } - select { - case myc4 <- 0: - default: - } - } + b.RunParallel(func(pb *testing.PB) { + myc1 := make(chan int) + myc2 := make(chan int) + myc3 := make(chan int, 1) + myc4 := make(chan int, 1) + for pb.Next() { + select { + case <-myc1: + default: } - c <- true - }() - } - for p := 0; p < procs; p++ { - <-c - } + select { + case myc2 <- 0: + default: + } + select { + case <-myc3: + default: + } + select { + case myc4 <- 0: + default: + } + } + }) } func BenchmarkChanUncontended(b *testing.B) { - const CallsPerSched = 1000 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) - for p := 0; p < procs; p++ { - go func() { - myc := make(chan int, CallsPerSched) - for atomic.AddInt32(&N, -1) >= 0 { - for g := 0; g < CallsPerSched; g++ { - myc <- 0 - } - for g := 0; g < CallsPerSched; g++ { - <-myc - } + const C = 100 + b.RunParallel(func(pb *testing.PB) { + myc := make(chan int, C) + for pb.Next() { + for i := 0; i < C; i++ { + myc <- 0 } - c <- true - }() - } - for p := 0; p < procs; p++ { - <-c - } + for i := 0; i < C; i++ { + <-myc + } + } + }) } func BenchmarkChanContended(b *testing.B) { - const CallsPerSched = 1000 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) - myc := make(chan int, procs*CallsPerSched) - for p := 0; p < procs; p++ { - go func() { - for atomic.AddInt32(&N, -1) >= 0 { - for g := 0; g < CallsPerSched; g++ { - myc <- 0 - } - for g := 0; g < CallsPerSched; g++ { - <-myc - } + const C = 100 + myc := make(chan int, C*runtime.GOMAXPROCS(0)) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for i := 0; i < C; i++ { + myc <- 0 } - c <- true - }() - } - for p := 0; p < procs; p++ { - <-c - } + for i := 0; i < C; i++ { + <-myc + } + } + }) } func BenchmarkChanSync(b *testing.B) { @@ -350,33 +629,83 @@ func BenchmarkChanProdConsWork100(b *testing.B) { benchmarkChanProdCons(b, 100, 100) } -func BenchmarkChanCreation(b *testing.B) { +func BenchmarkSelectProdCons(b *testing.B) { const CallsPerSched = 1000 procs := runtime.GOMAXPROCS(-1) N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) + c := make(chan bool, 2*procs) + myc := make(chan int, 128) + myclose := make(chan bool) for p := 0; p < procs; p++ { go func() { + // Producer: sends to myc. + foo := 0 + // Intended to not fire during benchmarking. + mytimer := time.After(time.Hour) for atomic.AddInt32(&N, -1) >= 0 { for g := 0; g < CallsPerSched; g++ { - myc := make(chan int, 1) - myc <- 0 - <-myc + // Model some local work. + for i := 0; i < 100; i++ { + foo *= 2 + foo /= 2 + } + select { + case myc <- 1: + case <-mytimer: + case <-myclose: + } } } - c <- true + myc <- 0 + c <- foo == 42 + }() + go func() { + // Consumer: receives from myc. + foo := 0 + // Intended to not fire during benchmarking. + mytimer := time.After(time.Hour) + loop: + for { + select { + case v := <-myc: + if v == 0 { + break loop + } + case <-mytimer: + case <-myclose: + } + // Model some local work. + for i := 0; i < 100; i++ { + foo *= 2 + foo /= 2 + } + } + c <- foo == 42 }() } for p := 0; p < procs; p++ { <-c + <-c } } +func BenchmarkChanCreation(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + myc := make(chan int, 1) + myc <- 0 + <-myc + } + }) +} + func BenchmarkChanSem(b *testing.B) { type Empty struct{} - c := make(chan Empty, 1) - for i := 0; i < b.N; i++ { - c <- Empty{} - <-c - } + myc := make(chan Empty, runtime.GOMAXPROCS(0)) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + myc <- Empty{} + <-myc + } + }) } |