summaryrefslogtreecommitdiff
path: root/src/pkg/netchan/import.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/pkg/netchan/import.go')
-rw-r--r--src/pkg/netchan/import.go83
1 files changed, 53 insertions, 30 deletions
diff --git a/src/pkg/netchan/import.go b/src/pkg/netchan/import.go
index 263ee4404..bde36f615 100644
--- a/src/pkg/netchan/import.go
+++ b/src/pkg/netchan/import.go
@@ -1,4 +1,4 @@
-// Copyright 2009 The Go Authors. All rights reserved.
+// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
@@ -14,12 +14,12 @@ import (
// Import
-// A channel and its associated information: a template value, direction and a count
+// A channel and its associated information: a template value and direction,
+// plus a handy marshaling place for its data.
type importChan struct {
- ch *reflect.ChanValue
- dir Dir
- ptr *reflect.PtrValue // a pointer value we can point at each new item
- count int
+ ch *reflect.ChanValue
+ dir Dir
+ ptr *reflect.PtrValue // a pointer value we can point at each new received item
}
// An Importer allows a set of channels to be imported from a single
@@ -32,8 +32,6 @@ type Importer struct {
chans map[string]*importChan
}
-// TODO: ASSUMES IMPORT MEANS RECEIVE
-
// NewImporter creates a new Importer object to import channels
// from an Exporter at the network and remote address as defined in net.Dial.
// The Exporter must be available and serving when the Importer is
@@ -54,37 +52,49 @@ func NewImporter(network, remoteaddr string) (*Importer, os.Error) {
// Handle the data from a single imported data stream, which will
// have the form
// (response, data)*
-// The response identifies by name which channel is receiving data.
-// TODO: allow an importer to send.
+// The response identifies by name which channel is transmitting data.
func (imp *Importer) run() {
// Loop on responses; requests are sent by ImportNValues()
- resp := new(response)
+ hdr := new(header)
+ err := new(error)
for {
- if err := imp.decode(resp); err != nil {
- log.Stderr("importer response decode:", err)
- break
+ if e := imp.decode(hdr); e != nil {
+ log.Stderr("importer header:", e)
+ return
}
- if resp.error != "" {
- log.Stderr("importer response error:", resp.error)
- // TODO: tear down connection
- break
+ switch hdr.payloadType {
+ case payData:
+ // done lower in loop
+ case payError:
+ if e := imp.decode(err); e != nil {
+ log.Stderr("importer error:", e)
+ return
+ }
+ if err.error != "" {
+ log.Stderr("importer response error:", err.error)
+ // TODO: tear down connection
+ return
+ }
+ default:
+ log.Stderr("unexpected payload type:", hdr.payloadType)
+ return
}
imp.chanLock.Lock()
- ich, ok := imp.chans[resp.name]
+ ich, ok := imp.chans[hdr.name]
imp.chanLock.Unlock()
if !ok {
- log.Stderr("unknown name in request:", resp.name)
- break
+ log.Stderr("unknown name in request:", hdr.name)
+ return
}
if ich.dir != Recv {
- log.Stderr("TODO: import send unimplemented")
- break
+ log.Stderr("cannot happen: receive from non-Recv channel")
+ return
}
// Create a new value for each received item.
val := reflect.MakeZero(ich.ptr.Type().(*reflect.PtrType).Elem())
ich.ptr.PointTo(val)
- if err := imp.decode(ich.ptr.Interface()); err != nil {
- log.Stderr("importer value decode:", err)
+ if e := imp.decode(ich.ptr.Interface()); e != nil {
+ log.Stderr("importer value decode:", e)
return
}
ich.ch.Send(val)
@@ -103,7 +113,7 @@ func (imp *Importer) Import(name string, chT interface{}, dir Dir, pT interface{
// the remote site's channel is provided in the call and may be of arbitrary
// channel type.
// Despite the literal signature, the effective signature is
-// ImportNValues(name string, chT chan T, dir Dir, pT T)
+// ImportNValues(name string, chT chan T, dir Dir, pT T, n int) os.Error
// where T must be a struct, pointer to struct, etc. pT may be more indirect
// than the value type of the channel (e.g. chan T, pT *T) but it must be a
// pointer.
@@ -114,7 +124,7 @@ func (imp *Importer) Import(name string, chT interface{}, dir Dir, pT interface{
// err := imp.ImportNValues("name", ch, Recv, new(myType), 1)
// if err != nil { log.Exit(err) }
// fmt.Printf("%+v\n", <-ch)
-// (TODO: Can we eliminate the need for pT?)
+// TODO: fix gob interface so we can eliminate the need for pT, and for structs.
func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, pT interface{}, n int) os.Error {
ch, err := checkChan(chT, dir)
if err != nil {
@@ -135,15 +145,28 @@ func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, pT int
return os.ErrorString("channel name already being imported:" + name)
}
ptr := reflect.MakeZero(reflect.Typeof(pT)).(*reflect.PtrValue)
- imp.chans[name] = &importChan{ch, dir, ptr, n}
+ imp.chans[name] = &importChan{ch, dir, ptr}
// Tell the other side about this channel.
+ hdr := new(header)
+ hdr.name = name
+ hdr.payloadType = payRequest
req := new(request)
- req.name = name
req.dir = dir
req.count = n
- if err := imp.encode(req, nil); err != nil {
+ if err := imp.encode(hdr, payRequest, req); err != nil {
log.Stderr("importer request encode:", err)
return err
}
+ if dir == Send {
+ go func() {
+ for i := 0; n == 0 || i < n; i++ {
+ val := ch.Recv()
+ if err := imp.encode(hdr, payData, val.Interface()); err != nil {
+ log.Stderr("error encoding client response:", err)
+ return
+ }
+ }
+ }()
+ }
return nil
}