summaryrefslogtreecommitdiff
path: root/src/pkg/netchan
diff options
context:
space:
mode:
Diffstat (limited to 'src/pkg/netchan')
-rw-r--r--src/pkg/netchan/common.go6
-rw-r--r--src/pkg/netchan/export.go24
-rw-r--r--src/pkg/netchan/import.go8
-rw-r--r--src/pkg/netchan/netchan_test.go18
4 files changed, 28 insertions, 28 deletions
diff --git a/src/pkg/netchan/common.go b/src/pkg/netchan/common.go
index dd06050ee..a319391bf 100644
--- a/src/pkg/netchan/common.go
+++ b/src/pkg/netchan/common.go
@@ -73,7 +73,7 @@ type unackedCounter interface {
// A channel and its direction.
type chanDir struct {
- ch *reflect.ChanValue
+ ch reflect.Value
dir Dir
}
@@ -306,7 +306,7 @@ func (nch *netChan) sender() {
}
// Receive value from local side for sending to remote side.
-func (nch *netChan) recv() (val reflect.Value, closed bool) {
+func (nch *netChan) recv() (val reflect.Value, ok bool) {
if nch.dir != Send {
panic("recv on wrong direction of channel")
}
@@ -317,7 +317,7 @@ func (nch *netChan) recv() (val reflect.Value, closed bool) {
nch.space++
}
nch.space--
- return nch.ch.Recv(), nch.ch.Closed()
+ return nch.ch.Recv()
}
// acked is called when the remote side indicates that
diff --git a/src/pkg/netchan/export.go b/src/pkg/netchan/export.go
index 55eba0e2e..2209f04e8 100644
--- a/src/pkg/netchan/export.go
+++ b/src/pkg/netchan/export.go
@@ -181,8 +181,8 @@ func (client *expClient) run() {
// The header is passed by value to avoid issues of overwriting.
func (client *expClient) serveRecv(nch *netChan, hdr header, count int64) {
for {
- val, closed := nch.recv()
- if closed {
+ val, ok := nch.recv()
+ if !ok {
if err := client.encode(&hdr, payClosed, nil); err != nil {
expLog("error encoding server closed message:", err)
}
@@ -221,7 +221,7 @@ func (client *expClient) serveSend(hdr header) {
return
}
// Create a new value for each received item.
- val := reflect.MakeZero(nch.ch.Type().(*reflect.ChanType).Elem())
+ val := reflect.Zero(nch.ch.Type().Elem())
if err := client.decode(val); err != nil {
expLog("value decode:", err, "; type ", nch.ch.Type())
return
@@ -340,26 +340,26 @@ func (exp *Exporter) Sync(timeout int64) os.Error {
return exp.clientSet.sync(timeout)
}
-func checkChan(chT interface{}, dir Dir) (*reflect.ChanValue, os.Error) {
- chanType, ok := reflect.Typeof(chT).(*reflect.ChanType)
- if !ok {
- return nil, os.ErrorString("not a channel")
+func checkChan(chT interface{}, dir Dir) (reflect.Value, os.Error) {
+ chanType := reflect.Typeof(chT)
+ if chanType.Kind() != reflect.Chan {
+ return reflect.Value{}, os.ErrorString("not a channel")
}
if dir != Send && dir != Recv {
- return nil, os.ErrorString("unknown channel direction")
+ return reflect.Value{}, os.ErrorString("unknown channel direction")
}
- switch chanType.Dir() {
+ switch chanType.ChanDir() {
case reflect.BothDir:
case reflect.SendDir:
if dir != Recv {
- return nil, os.ErrorString("to import/export with Send, must provide <-chan")
+ return reflect.Value{}, os.ErrorString("to import/export with Send, must provide <-chan")
}
case reflect.RecvDir:
if dir != Send {
- return nil, os.ErrorString("to import/export with Recv, must provide chan<-")
+ return reflect.Value{}, os.ErrorString("to import/export with Recv, must provide chan<-")
}
}
- return reflect.NewValue(chT).(*reflect.ChanValue), nil
+ return reflect.NewValue(chT), nil
}
// Export exports a channel of a given type and specified direction. The
diff --git a/src/pkg/netchan/import.go b/src/pkg/netchan/import.go
index 30edcd812..9921486bd 100644
--- a/src/pkg/netchan/import.go
+++ b/src/pkg/netchan/import.go
@@ -48,7 +48,7 @@ func NewImporter(conn io.ReadWriter) *Importer {
// Import imports a set of channels from the given network and address.
func Import(network, remoteaddr string) (*Importer, os.Error) {
- conn, err := net.Dial(network, "", remoteaddr)
+ conn, err := net.Dial(network, remoteaddr)
if err != nil {
return nil, err
}
@@ -133,7 +133,7 @@ func (imp *Importer) run() {
ackHdr.SeqNum = hdr.SeqNum
imp.encode(ackHdr, payAck, nil)
// Create a new value for each received item.
- value := reflect.MakeZero(nch.ch.Type().(*reflect.ChanType).Elem())
+ value := reflect.Zero(nch.ch.Type().Elem())
if e := imp.decode(value); e != nil {
impLog("importer value decode:", e)
return
@@ -213,8 +213,8 @@ func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size,
if dir == Send {
go func() {
for i := 0; n == -1 || i < n; i++ {
- val, closed := nch.recv()
- if closed {
+ val, ok := nch.recv()
+ if !ok {
if err = imp.encode(hdr, payClosed, nil); err != nil {
impLog("error encoding client closed message:", err)
}
diff --git a/src/pkg/netchan/netchan_test.go b/src/pkg/netchan/netchan_test.go
index 1c84a9d14..fd4d8f780 100644
--- a/src/pkg/netchan/netchan_test.go
+++ b/src/pkg/netchan/netchan_test.go
@@ -41,8 +41,8 @@ func exportReceive(exp *Exporter, t *testing.T, expDone chan bool) {
t.Fatal("exportReceive:", err)
}
for i := 0; i < count; i++ {
- v := <-ch
- if closed(ch) {
+ v, ok := <-ch
+ if !ok {
if i != closeCount {
t.Errorf("exportReceive expected close at %d; got one at %d", closeCount, i)
}
@@ -78,8 +78,8 @@ func importReceive(imp *Importer, t *testing.T, done chan bool) {
t.Fatal("importReceive:", err)
}
for i := 0; i < count; i++ {
- v := <-ch
- if closed(ch) {
+ v, ok := <-ch
+ if !ok {
if i != closeCount {
t.Errorf("importReceive expected close at %d; got one at %d", closeCount, i)
}
@@ -212,8 +212,8 @@ func TestExportHangup(t *testing.T) {
}
// Now hang up the channel. Importer should see it close.
exp.Hangup("exportedSend")
- v = <-ich
- if !closed(ich) {
+ v, ok := <-ich
+ if ok {
t.Fatal("expected channel to be closed; got value", v)
}
}
@@ -242,8 +242,8 @@ func TestImportHangup(t *testing.T) {
}
// Now hang up the channel. Exporter should see it close.
imp.Hangup("exportedRecv")
- v = <-ech
- if !closed(ech) {
+ v, ok := <-ech
+ if ok {
t.Fatal("expected channel to be closed; got value", v)
}
}
@@ -399,7 +399,7 @@ func TestImportFlowControl(t *testing.T) {
func testFlow(sendDone chan bool, ch <-chan int, N int, t *testing.T) {
go func() {
- time.Sleep(1e9)
+ time.Sleep(0.5e9)
sendDone <- false
}()