diff options
Diffstat (limited to 'src/pkg/netchan')
| -rw-r--r-- | src/pkg/netchan/common.go | 6 | ||||
| -rw-r--r-- | src/pkg/netchan/export.go | 24 | ||||
| -rw-r--r-- | src/pkg/netchan/import.go | 8 | ||||
| -rw-r--r-- | src/pkg/netchan/netchan_test.go | 18 |
4 files changed, 28 insertions, 28 deletions
diff --git a/src/pkg/netchan/common.go b/src/pkg/netchan/common.go index dd06050ee..a319391bf 100644 --- a/src/pkg/netchan/common.go +++ b/src/pkg/netchan/common.go @@ -73,7 +73,7 @@ type unackedCounter interface { // A channel and its direction. type chanDir struct { - ch *reflect.ChanValue + ch reflect.Value dir Dir } @@ -306,7 +306,7 @@ func (nch *netChan) sender() { } // Receive value from local side for sending to remote side. -func (nch *netChan) recv() (val reflect.Value, closed bool) { +func (nch *netChan) recv() (val reflect.Value, ok bool) { if nch.dir != Send { panic("recv on wrong direction of channel") } @@ -317,7 +317,7 @@ func (nch *netChan) recv() (val reflect.Value, closed bool) { nch.space++ } nch.space-- - return nch.ch.Recv(), nch.ch.Closed() + return nch.ch.Recv() } // acked is called when the remote side indicates that diff --git a/src/pkg/netchan/export.go b/src/pkg/netchan/export.go index 55eba0e2e..2209f04e8 100644 --- a/src/pkg/netchan/export.go +++ b/src/pkg/netchan/export.go @@ -181,8 +181,8 @@ func (client *expClient) run() { // The header is passed by value to avoid issues of overwriting. func (client *expClient) serveRecv(nch *netChan, hdr header, count int64) { for { - val, closed := nch.recv() - if closed { + val, ok := nch.recv() + if !ok { if err := client.encode(&hdr, payClosed, nil); err != nil { expLog("error encoding server closed message:", err) } @@ -221,7 +221,7 @@ func (client *expClient) serveSend(hdr header) { return } // Create a new value for each received item. - val := reflect.MakeZero(nch.ch.Type().(*reflect.ChanType).Elem()) + val := reflect.Zero(nch.ch.Type().Elem()) if err := client.decode(val); err != nil { expLog("value decode:", err, "; type ", nch.ch.Type()) return @@ -340,26 +340,26 @@ func (exp *Exporter) Sync(timeout int64) os.Error { return exp.clientSet.sync(timeout) } -func checkChan(chT interface{}, dir Dir) (*reflect.ChanValue, os.Error) { - chanType, ok := reflect.Typeof(chT).(*reflect.ChanType) - if !ok { - return nil, os.ErrorString("not a channel") +func checkChan(chT interface{}, dir Dir) (reflect.Value, os.Error) { + chanType := reflect.Typeof(chT) + if chanType.Kind() != reflect.Chan { + return reflect.Value{}, os.ErrorString("not a channel") } if dir != Send && dir != Recv { - return nil, os.ErrorString("unknown channel direction") + return reflect.Value{}, os.ErrorString("unknown channel direction") } - switch chanType.Dir() { + switch chanType.ChanDir() { case reflect.BothDir: case reflect.SendDir: if dir != Recv { - return nil, os.ErrorString("to import/export with Send, must provide <-chan") + return reflect.Value{}, os.ErrorString("to import/export with Send, must provide <-chan") } case reflect.RecvDir: if dir != Send { - return nil, os.ErrorString("to import/export with Recv, must provide chan<-") + return reflect.Value{}, os.ErrorString("to import/export with Recv, must provide chan<-") } } - return reflect.NewValue(chT).(*reflect.ChanValue), nil + return reflect.NewValue(chT), nil } // Export exports a channel of a given type and specified direction. The diff --git a/src/pkg/netchan/import.go b/src/pkg/netchan/import.go index 30edcd812..9921486bd 100644 --- a/src/pkg/netchan/import.go +++ b/src/pkg/netchan/import.go @@ -48,7 +48,7 @@ func NewImporter(conn io.ReadWriter) *Importer { // Import imports a set of channels from the given network and address. func Import(network, remoteaddr string) (*Importer, os.Error) { - conn, err := net.Dial(network, "", remoteaddr) + conn, err := net.Dial(network, remoteaddr) if err != nil { return nil, err } @@ -133,7 +133,7 @@ func (imp *Importer) run() { ackHdr.SeqNum = hdr.SeqNum imp.encode(ackHdr, payAck, nil) // Create a new value for each received item. - value := reflect.MakeZero(nch.ch.Type().(*reflect.ChanType).Elem()) + value := reflect.Zero(nch.ch.Type().Elem()) if e := imp.decode(value); e != nil { impLog("importer value decode:", e) return @@ -213,8 +213,8 @@ func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size, if dir == Send { go func() { for i := 0; n == -1 || i < n; i++ { - val, closed := nch.recv() - if closed { + val, ok := nch.recv() + if !ok { if err = imp.encode(hdr, payClosed, nil); err != nil { impLog("error encoding client closed message:", err) } diff --git a/src/pkg/netchan/netchan_test.go b/src/pkg/netchan/netchan_test.go index 1c84a9d14..fd4d8f780 100644 --- a/src/pkg/netchan/netchan_test.go +++ b/src/pkg/netchan/netchan_test.go @@ -41,8 +41,8 @@ func exportReceive(exp *Exporter, t *testing.T, expDone chan bool) { t.Fatal("exportReceive:", err) } for i := 0; i < count; i++ { - v := <-ch - if closed(ch) { + v, ok := <-ch + if !ok { if i != closeCount { t.Errorf("exportReceive expected close at %d; got one at %d", closeCount, i) } @@ -78,8 +78,8 @@ func importReceive(imp *Importer, t *testing.T, done chan bool) { t.Fatal("importReceive:", err) } for i := 0; i < count; i++ { - v := <-ch - if closed(ch) { + v, ok := <-ch + if !ok { if i != closeCount { t.Errorf("importReceive expected close at %d; got one at %d", closeCount, i) } @@ -212,8 +212,8 @@ func TestExportHangup(t *testing.T) { } // Now hang up the channel. Importer should see it close. exp.Hangup("exportedSend") - v = <-ich - if !closed(ich) { + v, ok := <-ich + if ok { t.Fatal("expected channel to be closed; got value", v) } } @@ -242,8 +242,8 @@ func TestImportHangup(t *testing.T) { } // Now hang up the channel. Exporter should see it close. imp.Hangup("exportedRecv") - v = <-ech - if !closed(ech) { + v, ok := <-ech + if ok { t.Fatal("expected channel to be closed; got value", v) } } @@ -399,7 +399,7 @@ func TestImportFlowControl(t *testing.T) { func testFlow(sendDone chan bool, ch <-chan int, N int, t *testing.T) { go func() { - time.Sleep(1e9) + time.Sleep(0.5e9) sendDone <- false }() |
