summaryrefslogtreecommitdiff
path: root/src/pkg/netchan/netchan_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/pkg/netchan/netchan_test.go')
-rw-r--r--src/pkg/netchan/netchan_test.go178
1 files changed, 155 insertions, 23 deletions
diff --git a/src/pkg/netchan/netchan_test.go b/src/pkg/netchan/netchan_test.go
index 766c4c474..4076aefeb 100644
--- a/src/pkg/netchan/netchan_test.go
+++ b/src/pkg/netchan/netchan_test.go
@@ -15,7 +15,7 @@ const closeCount = 5 // number of items when sender closes early
const base = 23
-func exportSend(exp *Exporter, n int, t *testing.T) {
+func exportSend(exp *Exporter, n int, t *testing.T, done chan bool) {
ch := make(chan int)
err := exp.Export("exportedSend", ch, Send)
if err != nil {
@@ -23,9 +23,12 @@ func exportSend(exp *Exporter, n int, t *testing.T) {
}
go func() {
for i := 0; i < n; i++ {
- ch <- base+i
+ ch <- base + i
}
close(ch)
+ if done != nil {
+ done <- true
+ }
}()
}
@@ -50,23 +53,26 @@ func exportReceive(exp *Exporter, t *testing.T, expDone chan bool) {
}
}
-func importSend(imp *Importer, n int, t *testing.T) {
+func importSend(imp *Importer, n int, t *testing.T, done chan bool) {
ch := make(chan int)
- err := imp.ImportNValues("exportedRecv", ch, Send, count)
+ err := imp.ImportNValues("exportedRecv", ch, Send, 3, -1)
if err != nil {
t.Fatal("importSend:", err)
}
go func() {
for i := 0; i < n; i++ {
- ch <- base+i
+ ch <- base + i
}
close(ch)
+ if done != nil {
+ done <- true
+ }
}()
}
func importReceive(imp *Importer, t *testing.T, done chan bool) {
ch := make(chan int)
- err := imp.ImportNValues("exportedSend", ch, Recv, count)
+ err := imp.ImportNValues("exportedSend", ch, Recv, 3, count)
if err != nil {
t.Fatal("importReceive:", err)
}
@@ -78,7 +84,7 @@ func importReceive(imp *Importer, t *testing.T, done chan bool) {
}
break
}
- if v != 23+i {
+ if v != base+i {
t.Errorf("importReceive: bad value: expected %d+%d=%d; got %+d", base, i, base+i, v)
}
}
@@ -96,7 +102,7 @@ func TestExportSendImportReceive(t *testing.T) {
if err != nil {
t.Fatal("new importer:", err)
}
- exportSend(exp, count, t)
+ exportSend(exp, count, t, nil)
importReceive(imp, t, nil)
}
@@ -116,7 +122,7 @@ func TestExportReceiveImportSend(t *testing.T) {
done <- true
}()
<-expDone
- importSend(imp, count, t)
+ importSend(imp, count, t, nil)
<-done
}
@@ -129,7 +135,7 @@ func TestClosingExportSendImportReceive(t *testing.T) {
if err != nil {
t.Fatal("new importer:", err)
}
- exportSend(exp, closeCount, t)
+ exportSend(exp, closeCount, t, nil)
importReceive(imp, t, nil)
}
@@ -149,7 +155,7 @@ func TestClosingImportSendExportReceive(t *testing.T) {
done <- true
}()
<-expDone
- importSend(imp, closeCount, t)
+ importSend(imp, closeCount, t, nil)
<-done
}
@@ -172,7 +178,7 @@ func TestErrorForIllegalChannel(t *testing.T) {
close(ch)
// Now try to import a different channel.
ch = make(chan int)
- err = imp.Import("notAChannel", ch, Recv)
+ err = imp.Import("notAChannel", ch, Recv, 1)
if err != nil {
t.Fatal("import:", err)
}
@@ -204,7 +210,7 @@ func TestExportDrain(t *testing.T) {
}
done := make(chan bool)
go func() {
- exportSend(exp, closeCount, t)
+ exportSend(exp, closeCount, t, nil)
done <- true
}()
<-done
@@ -224,7 +230,7 @@ func TestExportSync(t *testing.T) {
t.Fatal("new importer:", err)
}
done := make(chan bool)
- exportSend(exp, closeCount, t)
+ exportSend(exp, closeCount, t, nil)
go importReceive(imp, t, done)
exp.Sync(0)
<-done
@@ -248,7 +254,7 @@ func TestExportHangup(t *testing.T) {
}
// Prepare to receive two values. We'll actually deliver only one.
ich := make(chan int)
- err = imp.ImportNValues("exportedSend", ich, Recv, 2)
+ err = imp.ImportNValues("exportedSend", ich, Recv, 1, 2)
if err != nil {
t.Fatal("import exportedSend:", err)
}
@@ -285,7 +291,7 @@ func TestImportHangup(t *testing.T) {
}
// Prepare to Send two values. We'll actually deliver only one.
ich := make(chan int)
- err = imp.ImportNValues("exportedRecv", ich, Send, 2)
+ err = imp.ImportNValues("exportedRecv", ich, Send, 1, 2)
if err != nil {
t.Fatal("import exportedRecv:", err)
}
@@ -304,10 +310,70 @@ func TestImportHangup(t *testing.T) {
}
}
+// loop back exportedRecv to exportedSend,
+// but receive a value from ctlch before starting the loop.
+func exportLoopback(exp *Exporter, t *testing.T) {
+ inch := make(chan int)
+ if err := exp.Export("exportedRecv", inch, Recv); err != nil {
+ t.Fatal("exportRecv")
+ }
+
+ outch := make(chan int)
+ if err := exp.Export("exportedSend", outch, Send); err != nil {
+ t.Fatal("exportSend")
+ }
+
+ ctlch := make(chan int)
+ if err := exp.Export("exportedCtl", ctlch, Recv); err != nil {
+ t.Fatal("exportRecv")
+ }
+
+ go func() {
+ <-ctlch
+ for i := 0; i < count; i++ {
+ x := <-inch
+ if x != base+i {
+ t.Errorf("exportLoopback expected %d; got %d", i, x)
+ }
+ outch <- x
+ }
+ }()
+}
+
+// This test checks that channel operations can proceed
+// even when other concurrent operations are blocked.
+func TestIndependentSends(t *testing.T) {
+ exp, err := NewExporter("tcp", "127.0.0.1:0")
+ if err != nil {
+ t.Fatal("new exporter:", err)
+ }
+ imp, err := NewImporter("tcp", exp.Addr().String())
+ if err != nil {
+ t.Fatal("new importer:", err)
+ }
+
+ exportLoopback(exp, t)
+
+ importSend(imp, count, t, nil)
+ done := make(chan bool)
+ go importReceive(imp, t, done)
+
+ // wait for export side to try to deliver some values.
+ time.Sleep(0.25e9)
+
+ ctlch := make(chan int)
+ if err := imp.ImportNValues("exportedCtl", ctlch, Send, 1, 1); err != nil {
+ t.Fatal("importSend:", err)
+ }
+ ctlch <- 0
+
+ <-done
+}
+
// This test cross-connects a pair of exporter/importer pairs.
type value struct {
- i int
- source string
+ I int
+ Source string
}
func TestCrossConnect(t *testing.T) {
@@ -329,7 +395,7 @@ func TestCrossConnect(t *testing.T) {
t.Fatal("new importer:", err)
}
- go crossExport(e1, e2, t)
+ crossExport(e1, e2, t)
crossImport(i1, i2, t)
}
@@ -347,19 +413,19 @@ func crossExport(e1, e2 *Exporter, t *testing.T) {
t.Fatal("exportReceive:", err)
}
- crossLoop("export", s, r, t)
+ go crossLoop("export", s, r, t)
}
// Import side of cross-traffic.
func crossImport(i1, i2 *Importer, t *testing.T) {
s := make(chan value)
- err := i2.Import("exportedReceive", s, Send)
+ err := i2.Import("exportedReceive", s, Send, 2)
if err != nil {
t.Fatal("import of exportedReceive:", err)
}
r := make(chan value)
- err = i1.Import("exportedSend", r, Recv)
+ err = i1.Import("exportedSend", r, Recv, 2)
if err != nil {
t.Fatal("import of exported Send:", err)
}
@@ -374,10 +440,76 @@ func crossLoop(name string, s, r chan value, t *testing.T) {
case s <- value{si, name}:
si++
case v := <-r:
- if v.i != ri {
+ if v.I != ri {
t.Errorf("loop: bad value: expected %d, hello; got %+v", ri, v)
}
ri++
}
}
}
+
+const flowCount = 100
+
+// test flow control from exporter to importer.
+func TestExportFlowControl(t *testing.T) {
+ exp, err := NewExporter("tcp", "127.0.0.1:0")
+ if err != nil {
+ t.Fatal("new exporter:", err)
+ }
+ imp, err := NewImporter("tcp", exp.Addr().String())
+ if err != nil {
+ t.Fatal("new importer:", err)
+ }
+
+ sendDone := make(chan bool, 1)
+ exportSend(exp, flowCount, t, sendDone)
+
+ ch := make(chan int)
+ err = imp.ImportNValues("exportedSend", ch, Recv, 20, -1)
+ if err != nil {
+ t.Fatal("importReceive:", err)
+ }
+
+ testFlow(sendDone, ch, flowCount, t)
+}
+
+// test flow control from importer to exporter.
+func TestImportFlowControl(t *testing.T) {
+ exp, err := NewExporter("tcp", "127.0.0.1:0")
+ if err != nil {
+ t.Fatal("new exporter:", err)
+ }
+ imp, err := NewImporter("tcp", exp.Addr().String())
+ if err != nil {
+ t.Fatal("new importer:", err)
+ }
+
+ ch := make(chan int)
+ err = exp.Export("exportedRecv", ch, Recv)
+ if err != nil {
+ t.Fatal("importReceive:", err)
+ }
+
+ sendDone := make(chan bool, 1)
+ importSend(imp, flowCount, t, sendDone)
+ testFlow(sendDone, ch, flowCount, t)
+}
+
+func testFlow(sendDone chan bool, ch <-chan int, N int, t *testing.T) {
+ go func() {
+ time.Sleep(1e9)
+ sendDone <- false
+ }()
+
+ if <-sendDone {
+ t.Fatal("send did not block")
+ }
+ n := 0
+ for i := range ch {
+ t.Log("after blocking, got value ", i)
+ n++
+ }
+ if n != N {
+ t.Fatalf("expected %d values; got %d", N, n)
+ }
+}