diff options
Diffstat (limited to 'src/pkg/netchan/export.go')
-rw-r--r-- | src/pkg/netchan/export.go | 147 |
1 files changed, 87 insertions, 60 deletions
diff --git a/src/pkg/netchan/export.go b/src/pkg/netchan/export.go index 626630b4a..89deb20ae 100644 --- a/src/pkg/netchan/export.go +++ b/src/pkg/netchan/export.go @@ -1,4 +1,4 @@ -// Copyright 2009 The Go Authors. All rights reserved. +// Copyright 2010 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. @@ -17,10 +17,6 @@ Networked channels are not synchronized; they always behave as if there is a buffer of at least one element between the two machines. - - TODO: at the moment, the exporting machine must send and - the importing machine must receive. This restriction will - be lifted soon. */ package netchan @@ -34,10 +30,12 @@ import ( // Export -// A channel and its associated information: a direction +// A channel and its associated information: a direction plus +// a handy marshaling place for its data. type exportChan struct { ch *reflect.ChanValue dir Dir + ptr *reflect.PtrValue // a pointer value we can point at each new received item } // An Exporter allows a set of channels to be published on a single @@ -62,21 +60,6 @@ func newClient(exp *Exporter, conn net.Conn) *expClient { } -// TODO: ASSUMES EXPORT MEANS SEND - -// Sent once per channel from importer to exporter to report that it's listening to a channel -type request struct { - name string - dir Dir - count int -} - -// Reply to request, sent from exporter to importer on each send. -type response struct { - name string - error string -} - // Wait for incoming connections, start a new runner for each func (exp *Exporter) listen() { for { @@ -85,70 +68,112 @@ func (exp *Exporter) listen() { log.Stderr("exporter.listen:", err) break } - log.Stderr("accepted call from", conn.RemoteAddr()) client := newClient(exp, conn) go client.run() } } -// Send a single client all its data. For each request, this will launch -// a serveRecv goroutine to deliver the data for that channel. +func (client *expClient) sendError(hdr *header, err string) { + error := &error{err} + log.Stderr("export:", error.error) + client.encode(hdr, payError, error) // ignore any encode error, hope client gets it +} + +func (client *expClient) getChan(hdr *header, dir Dir) *exportChan { + exp := client.exp + exp.chanLock.Lock() + ech, ok := exp.chans[hdr.name] + exp.chanLock.Unlock() + if !ok { + client.sendError(hdr, "no such channel: "+hdr.name) + return nil + } + if ech.dir != dir { + client.sendError(hdr, "wrong direction for channel: "+hdr.name) + return nil + } + return ech +} + +// Manage sends and receives for a single client. For each (client Recv) request, +// this will launch a serveRecv goroutine to deliver the data for that channel, +// while (client Send) requests are handled as data arrives from the client. func (client *expClient) run() { + hdr := new(header) req := new(request) + error := new(error) for { - if err := client.decode(req); err != nil { - log.Stderr("error decoding client request:", err) + if err := client.decode(hdr); err != nil { + log.Stderr("error decoding client header:", err) // TODO: tear down connection - break + return } - log.Stderrf("export request: %+v", req) - if req.dir == Recv { - go client.serveRecv(req) - } else { - log.Stderr("export request: can't handle channel direction", req.dir) - resp := new(response) - resp.name = req.name - resp.error = "export request: can't handle channel direction" - client.encode(resp, nil) - break + switch hdr.payloadType { + case payRequest: + if err := client.decode(req); err != nil { + log.Stderr("error decoding client request:", err) + // TODO: tear down connection + return + } + switch req.dir { + case Recv: + go client.serveRecv(*hdr, req.count) + case Send: + // Request to send is clear as a matter of protocol + // but not actually used by the implementation. + // The actual sends will have payload type payData. + // TODO: manage the count? + default: + error.error = "export request: can't handle channel direction" + log.Stderr(error.error, req.dir) + client.encode(hdr, payError, error) + } + case payData: + client.serveSend(*hdr) } } } -// Send all the data on a single channel to a client asking for a Recv -func (client *expClient) serveRecv(req *request) { - exp := client.exp - resp := new(response) - resp.name = req.name - var ok bool - exp.chanLock.Lock() - ech, ok := exp.chans[req.name] - exp.chanLock.Unlock() - if !ok { - resp.error = "no such channel: " + req.name - log.Stderr("export:", resp.error) - client.encode(resp, nil) // ignore any encode error, hope client gets it +// Send all the data on a single channel to a client asking for a Recv. +// The header is passed by value to avoid issues of overwriting. +func (client *expClient) serveRecv(hdr header, count int) { + ech := client.getChan(&hdr, Send) + if ech == nil { return } for { - if ech.dir != Send { - log.Stderr("TODO: recv export unimplemented") - break - } val := ech.ch.Recv() - if err := client.encode(resp, val.Interface()); err != nil { + if err := client.encode(&hdr, payData, val.Interface()); err != nil { log.Stderr("error encoding client response:", err) + client.sendError(&hdr, err.String()) break } - if req.count > 0 { - req.count-- - if req.count == 0 { + if count > 0 { + if count--; count == 0 { break } } } } +// Receive and deliver locally one item from a client asking for a Send +// The header is passed by value to avoid issues of overwriting. +func (client *expClient) serveSend(hdr header) { + ech := client.getChan(&hdr, Recv) + if ech == nil { + return + } + // Create a new value for each received item. + val := reflect.MakeZero(ech.ptr.Type().(*reflect.PtrType).Elem()) + ech.ptr.PointTo(val) + if err := client.decode(ech.ptr.Interface()); err != nil { + log.Stderr("exporter value decode:", err) + return + } + ech.ch.Send(val) + // TODO count +} + // NewExporter creates a new Exporter to export channels // on the network and local address defined as in net.Listen. func NewExporter(network, localaddr string) (*Exporter, os.Error) { @@ -195,7 +220,8 @@ func checkChan(chT interface{}, dir Dir) (*reflect.ChanValue, os.Error) { // Despite the literal signature, the effective signature is // Export(name string, chT chan T, dir Dir) // where T must be a struct, pointer to struct, etc. -func (exp *Exporter) Export(name string, chT interface{}, dir Dir) os.Error { +// TODO: fix gob interface so we can eliminate the need for pT, and for structs. +func (exp *Exporter) Export(name string, chT interface{}, dir Dir, pT interface{}) os.Error { ch, err := checkChan(chT, dir) if err != nil { return err @@ -206,6 +232,7 @@ func (exp *Exporter) Export(name string, chT interface{}, dir Dir) os.Error { if present { return os.ErrorString("channel name already being exported:" + name) } - exp.chans[name] = &exportChan{ch, dir} + ptr := reflect.MakeZero(reflect.Typeof(pT)).(*reflect.PtrValue) + exp.chans[name] = &exportChan{ch, dir, ptr} return nil } |