diff options
Diffstat (limited to 'src/pkg/netchan/netchan_test.go')
-rw-r--r-- | src/pkg/netchan/netchan_test.go | 178 |
1 files changed, 155 insertions, 23 deletions
diff --git a/src/pkg/netchan/netchan_test.go b/src/pkg/netchan/netchan_test.go index 766c4c474..4076aefeb 100644 --- a/src/pkg/netchan/netchan_test.go +++ b/src/pkg/netchan/netchan_test.go @@ -15,7 +15,7 @@ const closeCount = 5 // number of items when sender closes early const base = 23 -func exportSend(exp *Exporter, n int, t *testing.T) { +func exportSend(exp *Exporter, n int, t *testing.T, done chan bool) { ch := make(chan int) err := exp.Export("exportedSend", ch, Send) if err != nil { @@ -23,9 +23,12 @@ func exportSend(exp *Exporter, n int, t *testing.T) { } go func() { for i := 0; i < n; i++ { - ch <- base+i + ch <- base + i } close(ch) + if done != nil { + done <- true + } }() } @@ -50,23 +53,26 @@ func exportReceive(exp *Exporter, t *testing.T, expDone chan bool) { } } -func importSend(imp *Importer, n int, t *testing.T) { +func importSend(imp *Importer, n int, t *testing.T, done chan bool) { ch := make(chan int) - err := imp.ImportNValues("exportedRecv", ch, Send, count) + err := imp.ImportNValues("exportedRecv", ch, Send, 3, -1) if err != nil { t.Fatal("importSend:", err) } go func() { for i := 0; i < n; i++ { - ch <- base+i + ch <- base + i } close(ch) + if done != nil { + done <- true + } }() } func importReceive(imp *Importer, t *testing.T, done chan bool) { ch := make(chan int) - err := imp.ImportNValues("exportedSend", ch, Recv, count) + err := imp.ImportNValues("exportedSend", ch, Recv, 3, count) if err != nil { t.Fatal("importReceive:", err) } @@ -78,7 +84,7 @@ func importReceive(imp *Importer, t *testing.T, done chan bool) { } break } - if v != 23+i { + if v != base+i { t.Errorf("importReceive: bad value: expected %d+%d=%d; got %+d", base, i, base+i, v) } } @@ -96,7 +102,7 @@ func TestExportSendImportReceive(t *testing.T) { if err != nil { t.Fatal("new importer:", err) } - exportSend(exp, count, t) + exportSend(exp, count, t, nil) importReceive(imp, t, nil) } @@ -116,7 +122,7 @@ func TestExportReceiveImportSend(t *testing.T) { done <- true }() <-expDone - importSend(imp, count, t) + importSend(imp, count, t, nil) <-done } @@ -129,7 +135,7 @@ func TestClosingExportSendImportReceive(t *testing.T) { if err != nil { t.Fatal("new importer:", err) } - exportSend(exp, closeCount, t) + exportSend(exp, closeCount, t, nil) importReceive(imp, t, nil) } @@ -149,7 +155,7 @@ func TestClosingImportSendExportReceive(t *testing.T) { done <- true }() <-expDone - importSend(imp, closeCount, t) + importSend(imp, closeCount, t, nil) <-done } @@ -172,7 +178,7 @@ func TestErrorForIllegalChannel(t *testing.T) { close(ch) // Now try to import a different channel. ch = make(chan int) - err = imp.Import("notAChannel", ch, Recv) + err = imp.Import("notAChannel", ch, Recv, 1) if err != nil { t.Fatal("import:", err) } @@ -204,7 +210,7 @@ func TestExportDrain(t *testing.T) { } done := make(chan bool) go func() { - exportSend(exp, closeCount, t) + exportSend(exp, closeCount, t, nil) done <- true }() <-done @@ -224,7 +230,7 @@ func TestExportSync(t *testing.T) { t.Fatal("new importer:", err) } done := make(chan bool) - exportSend(exp, closeCount, t) + exportSend(exp, closeCount, t, nil) go importReceive(imp, t, done) exp.Sync(0) <-done @@ -248,7 +254,7 @@ func TestExportHangup(t *testing.T) { } // Prepare to receive two values. We'll actually deliver only one. ich := make(chan int) - err = imp.ImportNValues("exportedSend", ich, Recv, 2) + err = imp.ImportNValues("exportedSend", ich, Recv, 1, 2) if err != nil { t.Fatal("import exportedSend:", err) } @@ -285,7 +291,7 @@ func TestImportHangup(t *testing.T) { } // Prepare to Send two values. We'll actually deliver only one. ich := make(chan int) - err = imp.ImportNValues("exportedRecv", ich, Send, 2) + err = imp.ImportNValues("exportedRecv", ich, Send, 1, 2) if err != nil { t.Fatal("import exportedRecv:", err) } @@ -304,10 +310,70 @@ func TestImportHangup(t *testing.T) { } } +// loop back exportedRecv to exportedSend, +// but receive a value from ctlch before starting the loop. +func exportLoopback(exp *Exporter, t *testing.T) { + inch := make(chan int) + if err := exp.Export("exportedRecv", inch, Recv); err != nil { + t.Fatal("exportRecv") + } + + outch := make(chan int) + if err := exp.Export("exportedSend", outch, Send); err != nil { + t.Fatal("exportSend") + } + + ctlch := make(chan int) + if err := exp.Export("exportedCtl", ctlch, Recv); err != nil { + t.Fatal("exportRecv") + } + + go func() { + <-ctlch + for i := 0; i < count; i++ { + x := <-inch + if x != base+i { + t.Errorf("exportLoopback expected %d; got %d", i, x) + } + outch <- x + } + }() +} + +// This test checks that channel operations can proceed +// even when other concurrent operations are blocked. +func TestIndependentSends(t *testing.T) { + exp, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + + exportLoopback(exp, t) + + importSend(imp, count, t, nil) + done := make(chan bool) + go importReceive(imp, t, done) + + // wait for export side to try to deliver some values. + time.Sleep(0.25e9) + + ctlch := make(chan int) + if err := imp.ImportNValues("exportedCtl", ctlch, Send, 1, 1); err != nil { + t.Fatal("importSend:", err) + } + ctlch <- 0 + + <-done +} + // This test cross-connects a pair of exporter/importer pairs. type value struct { - i int - source string + I int + Source string } func TestCrossConnect(t *testing.T) { @@ -329,7 +395,7 @@ func TestCrossConnect(t *testing.T) { t.Fatal("new importer:", err) } - go crossExport(e1, e2, t) + crossExport(e1, e2, t) crossImport(i1, i2, t) } @@ -347,19 +413,19 @@ func crossExport(e1, e2 *Exporter, t *testing.T) { t.Fatal("exportReceive:", err) } - crossLoop("export", s, r, t) + go crossLoop("export", s, r, t) } // Import side of cross-traffic. func crossImport(i1, i2 *Importer, t *testing.T) { s := make(chan value) - err := i2.Import("exportedReceive", s, Send) + err := i2.Import("exportedReceive", s, Send, 2) if err != nil { t.Fatal("import of exportedReceive:", err) } r := make(chan value) - err = i1.Import("exportedSend", r, Recv) + err = i1.Import("exportedSend", r, Recv, 2) if err != nil { t.Fatal("import of exported Send:", err) } @@ -374,10 +440,76 @@ func crossLoop(name string, s, r chan value, t *testing.T) { case s <- value{si, name}: si++ case v := <-r: - if v.i != ri { + if v.I != ri { t.Errorf("loop: bad value: expected %d, hello; got %+v", ri, v) } ri++ } } } + +const flowCount = 100 + +// test flow control from exporter to importer. +func TestExportFlowControl(t *testing.T) { + exp, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + + sendDone := make(chan bool, 1) + exportSend(exp, flowCount, t, sendDone) + + ch := make(chan int) + err = imp.ImportNValues("exportedSend", ch, Recv, 20, -1) + if err != nil { + t.Fatal("importReceive:", err) + } + + testFlow(sendDone, ch, flowCount, t) +} + +// test flow control from importer to exporter. +func TestImportFlowControl(t *testing.T) { + exp, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + + ch := make(chan int) + err = exp.Export("exportedRecv", ch, Recv) + if err != nil { + t.Fatal("importReceive:", err) + } + + sendDone := make(chan bool, 1) + importSend(imp, flowCount, t, sendDone) + testFlow(sendDone, ch, flowCount, t) +} + +func testFlow(sendDone chan bool, ch <-chan int, N int, t *testing.T) { + go func() { + time.Sleep(1e9) + sendDone <- false + }() + + if <-sendDone { + t.Fatal("send did not block") + } + n := 0 + for i := range ch { + t.Log("after blocking, got value ", i) + n++ + } + if n != N { + t.Fatalf("expected %d values; got %d", N, n) + } +} |