diff options
Diffstat (limited to 'src/pkg/netchan/import.go')
-rw-r--r-- | src/pkg/netchan/import.go | 83 |
1 files changed, 53 insertions, 30 deletions
diff --git a/src/pkg/netchan/import.go b/src/pkg/netchan/import.go index 263ee4404..bde36f615 100644 --- a/src/pkg/netchan/import.go +++ b/src/pkg/netchan/import.go @@ -1,4 +1,4 @@ -// Copyright 2009 The Go Authors. All rights reserved. +// Copyright 2010 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. @@ -14,12 +14,12 @@ import ( // Import -// A channel and its associated information: a template value, direction and a count +// A channel and its associated information: a template value and direction, +// plus a handy marshaling place for its data. type importChan struct { - ch *reflect.ChanValue - dir Dir - ptr *reflect.PtrValue // a pointer value we can point at each new item - count int + ch *reflect.ChanValue + dir Dir + ptr *reflect.PtrValue // a pointer value we can point at each new received item } // An Importer allows a set of channels to be imported from a single @@ -32,8 +32,6 @@ type Importer struct { chans map[string]*importChan } -// TODO: ASSUMES IMPORT MEANS RECEIVE - // NewImporter creates a new Importer object to import channels // from an Exporter at the network and remote address as defined in net.Dial. // The Exporter must be available and serving when the Importer is @@ -54,37 +52,49 @@ func NewImporter(network, remoteaddr string) (*Importer, os.Error) { // Handle the data from a single imported data stream, which will // have the form // (response, data)* -// The response identifies by name which channel is receiving data. -// TODO: allow an importer to send. +// The response identifies by name which channel is transmitting data. func (imp *Importer) run() { // Loop on responses; requests are sent by ImportNValues() - resp := new(response) + hdr := new(header) + err := new(error) for { - if err := imp.decode(resp); err != nil { - log.Stderr("importer response decode:", err) - break + if e := imp.decode(hdr); e != nil { + log.Stderr("importer header:", e) + return } - if resp.error != "" { - log.Stderr("importer response error:", resp.error) - // TODO: tear down connection - break + switch hdr.payloadType { + case payData: + // done lower in loop + case payError: + if e := imp.decode(err); e != nil { + log.Stderr("importer error:", e) + return + } + if err.error != "" { + log.Stderr("importer response error:", err.error) + // TODO: tear down connection + return + } + default: + log.Stderr("unexpected payload type:", hdr.payloadType) + return } imp.chanLock.Lock() - ich, ok := imp.chans[resp.name] + ich, ok := imp.chans[hdr.name] imp.chanLock.Unlock() if !ok { - log.Stderr("unknown name in request:", resp.name) - break + log.Stderr("unknown name in request:", hdr.name) + return } if ich.dir != Recv { - log.Stderr("TODO: import send unimplemented") - break + log.Stderr("cannot happen: receive from non-Recv channel") + return } // Create a new value for each received item. val := reflect.MakeZero(ich.ptr.Type().(*reflect.PtrType).Elem()) ich.ptr.PointTo(val) - if err := imp.decode(ich.ptr.Interface()); err != nil { - log.Stderr("importer value decode:", err) + if e := imp.decode(ich.ptr.Interface()); e != nil { + log.Stderr("importer value decode:", e) return } ich.ch.Send(val) @@ -103,7 +113,7 @@ func (imp *Importer) Import(name string, chT interface{}, dir Dir, pT interface{ // the remote site's channel is provided in the call and may be of arbitrary // channel type. // Despite the literal signature, the effective signature is -// ImportNValues(name string, chT chan T, dir Dir, pT T) +// ImportNValues(name string, chT chan T, dir Dir, pT T, n int) os.Error // where T must be a struct, pointer to struct, etc. pT may be more indirect // than the value type of the channel (e.g. chan T, pT *T) but it must be a // pointer. @@ -114,7 +124,7 @@ func (imp *Importer) Import(name string, chT interface{}, dir Dir, pT interface{ // err := imp.ImportNValues("name", ch, Recv, new(myType), 1) // if err != nil { log.Exit(err) } // fmt.Printf("%+v\n", <-ch) -// (TODO: Can we eliminate the need for pT?) +// TODO: fix gob interface so we can eliminate the need for pT, and for structs. func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, pT interface{}, n int) os.Error { ch, err := checkChan(chT, dir) if err != nil { @@ -135,15 +145,28 @@ func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, pT int return os.ErrorString("channel name already being imported:" + name) } ptr := reflect.MakeZero(reflect.Typeof(pT)).(*reflect.PtrValue) - imp.chans[name] = &importChan{ch, dir, ptr, n} + imp.chans[name] = &importChan{ch, dir, ptr} // Tell the other side about this channel. + hdr := new(header) + hdr.name = name + hdr.payloadType = payRequest req := new(request) - req.name = name req.dir = dir req.count = n - if err := imp.encode(req, nil); err != nil { + if err := imp.encode(hdr, payRequest, req); err != nil { log.Stderr("importer request encode:", err) return err } + if dir == Send { + go func() { + for i := 0; n == 0 || i < n; i++ { + val := ch.Recv() + if err := imp.encode(hdr, payData, val.Interface()); err != nil { + log.Stderr("error encoding client response:", err) + return + } + } + }() + } return nil } |