diff options
author | Ondřej Surý <ondrej@sury.org> | 2011-01-17 12:40:45 +0100 |
---|---|---|
committer | Ondřej Surý <ondrej@sury.org> | 2011-01-17 12:40:45 +0100 |
commit | 3e45412327a2654a77944249962b3652e6142299 (patch) | |
tree | bc3bf69452afa055423cbe0c5cfa8ca357df6ccf /src/pkg/netchan/export.go | |
parent | c533680039762cacbc37db8dc7eed074c3e497be (diff) | |
download | golang-upstream/2011.01.12.tar.gz |
Imported Upstream version 2011.01.12upstream/2011.01.12
Diffstat (limited to 'src/pkg/netchan/export.go')
-rw-r--r-- | src/pkg/netchan/export.go | 238 |
1 files changed, 183 insertions, 55 deletions
diff --git a/src/pkg/netchan/export.go b/src/pkg/netchan/export.go index a16714ba2..9ad388c18 100644 --- a/src/pkg/netchan/export.go +++ b/src/pkg/netchan/export.go @@ -19,7 +19,7 @@ */ package netchan -// BUG: can't use range clause to receive when using ImportNValues with N non-zero. +// BUG: can't use range clause to receive when using ImportNValues to limit the count. import ( "log" @@ -31,73 +31,69 @@ import ( // Export -// A channel and its associated information: a direction plus -// a handy marshaling place for its data. -type exportChan struct { - ch *reflect.ChanValue - dir Dir +// expLog is a logging convenience function. The first argument must be a string. +func expLog(args ...interface{}) { + args[0] = "netchan export: " + args[0].(string) + log.Print(args...) } // An Exporter allows a set of channels to be published on a single // network port. A single machine may have multiple Exporters // but they must use different ports. type Exporter struct { + *clientSet listener net.Listener - chanLock sync.Mutex // protects access to channel map - chans map[string]*exportChan } type expClient struct { *encDec - exp *Exporter + exp *Exporter + mu sync.Mutex // protects remaining fields + errored bool // client has been sent an error + seqNum int64 // sequences messages sent to client; has value of highest sent + ackNum int64 // highest sequence number acknowledged + seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu } func newClient(exp *Exporter, conn net.Conn) *expClient { client := new(expClient) client.exp = exp client.encDec = newEncDec(conn) + client.seqNum = 0 + client.ackNum = 0 return client } -// Wait for incoming connections, start a new runner for each -func (exp *Exporter) listen() { - for { - conn, err := exp.listener.Accept() - if err != nil { - log.Stderr("exporter.listen:", err) - break - } - client := newClient(exp, conn) - go client.run() - } -} - func (client *expClient) sendError(hdr *header, err string) { error := &error{err} - log.Stderr("export:", error.error) + expLog("sending error to client:", error.Error) client.encode(hdr, payError, error) // ignore any encode error, hope client gets it + client.mu.Lock() + client.errored = true + client.mu.Unlock() } -func (client *expClient) getChan(hdr *header, dir Dir) *exportChan { +func (client *expClient) getChan(hdr *header, dir Dir) *chanDir { exp := client.exp - exp.chanLock.Lock() - ech, ok := exp.chans[hdr.name] - exp.chanLock.Unlock() + exp.mu.Lock() + ech, ok := exp.chans[hdr.Name] + exp.mu.Unlock() if !ok { - client.sendError(hdr, "no such channel: "+hdr.name) + client.sendError(hdr, "no such channel: "+hdr.Name) return nil } if ech.dir != dir { - client.sendError(hdr, "wrong direction for channel: "+hdr.name) + client.sendError(hdr, "wrong direction for channel: "+hdr.Name) return nil } return ech } -// Manage sends and receives for a single client. For each (client Recv) request, -// this will launch a serveRecv goroutine to deliver the data for that channel, -// while (client Send) requests are handled as data arrives from the client. +// The function run manages sends and receives for a single client. For each +// (client Recv) request, this will launch a serveRecv goroutine to deliver +// the data for that channel, while (client Send) requests are handled as +// data arrives from the client. func (client *expClient) run() { hdr := new(header) hdrValue := reflect.NewValue(hdr) @@ -105,40 +101,58 @@ func (client *expClient) run() { reqValue := reflect.NewValue(req) error := new(error) for { + *hdr = header{} if err := client.decode(hdrValue); err != nil { - log.Stderr("error decoding client header:", err) - // TODO: tear down connection - return + expLog("error decoding client header:", err) + break } - switch hdr.payloadType { + switch hdr.PayloadType { case payRequest: + *req = request{} if err := client.decode(reqValue); err != nil { - log.Stderr("error decoding client request:", err) - // TODO: tear down connection - return + expLog("error decoding client request:", err) + break } - switch req.dir { + switch req.Dir { case Recv: - go client.serveRecv(*hdr, req.count) + go client.serveRecv(*hdr, req.Count) case Send: // Request to send is clear as a matter of protocol // but not actually used by the implementation. // The actual sends will have payload type payData. // TODO: manage the count? default: - error.error = "export request: can't handle channel direction" - log.Stderr(error.error, req.dir) + error.Error = "request: can't handle channel direction" + expLog(error.Error, req.Dir) client.encode(hdr, payError, error) } case payData: client.serveSend(*hdr) + case payClosed: + client.serveClosed(*hdr) + case payAck: + client.mu.Lock() + if client.ackNum != hdr.SeqNum-1 { + // Since the sequence number is incremented and the message is sent + // in a single instance of locking client.mu, the messages are guaranteed + // to be sent in order. Therefore receipt of acknowledgement N means + // all messages <=N have been seen by the recipient. We check anyway. + expLog("sequence out of order:", client.ackNum, hdr.SeqNum) + } + if client.ackNum < hdr.SeqNum { // If there has been an error, don't back up the count. + client.ackNum = hdr.SeqNum + } + client.mu.Unlock() + default: + log.Exit("netchan export: unknown payload type", hdr.PayloadType) } } + client.exp.delClient(client) } // Send all the data on a single channel to a client asking for a Recv. // The header is passed by value to avoid issues of overwriting. -func (client *expClient) serveRecv(hdr header, count int) { +func (client *expClient) serveRecv(hdr header, count int64) { ech := client.getChan(&hdr, Send) if ech == nil { return @@ -146,16 +160,30 @@ func (client *expClient) serveRecv(hdr header, count int) { for { val := ech.ch.Recv() if ech.ch.Closed() { - client.sendError(&hdr, os.EOF.String()) + if err := client.encode(&hdr, payClosed, nil); err != nil { + expLog("error encoding server closed message:", err) + } break } - if err := client.encode(&hdr, payData, val.Interface()); err != nil { - log.Stderr("error encoding client response:", err) + // We hold the lock during transmission to guarantee messages are + // sent in sequence number order. Also, we increment first so the + // value of client.seqNum is the value of the highest used sequence + // number, not one beyond. + client.mu.Lock() + client.seqNum++ + hdr.SeqNum = client.seqNum + client.seqLock.Lock() // guarantee ordering of messages + client.mu.Unlock() + err := client.encode(&hdr, payData, val.Interface()) + client.seqLock.Unlock() + if err != nil { + expLog("error encoding client response:", err) client.sendError(&hdr, err.String()) break } - if count > 0 { - if count--; count == 0 { + // Negative count means run forever. + if count >= 0 { + if count--; count <= 0 { break } } @@ -172,11 +200,54 @@ func (client *expClient) serveSend(hdr header) { // Create a new value for each received item. val := reflect.MakeZero(ech.ch.Type().(*reflect.ChanType).Elem()) if err := client.decode(val); err != nil { - log.Stderr("exporter value decode:", err) + expLog("value decode:", err) return } ech.ch.Send(val) - // TODO count +} + +// Report that client has closed the channel that is sending to us. +// The header is passed by value to avoid issues of overwriting. +func (client *expClient) serveClosed(hdr header) { + ech := client.getChan(&hdr, Recv) + if ech == nil { + return + } + ech.ch.Close() +} + +func (client *expClient) unackedCount() int64 { + client.mu.Lock() + n := client.seqNum - client.ackNum + client.mu.Unlock() + return n +} + +func (client *expClient) seq() int64 { + client.mu.Lock() + n := client.seqNum + client.mu.Unlock() + return n +} + +func (client *expClient) ack() int64 { + client.mu.Lock() + n := client.seqNum + client.mu.Unlock() + return n +} + +// Wait for incoming connections, start a new runner for each +func (exp *Exporter) listen() { + for { + conn, err := exp.listener.Accept() + if err != nil { + expLog("listen:", err) + break + } + client := exp.addClient(conn) + go client.run() + } } // NewExporter creates a new Exporter to export channels @@ -188,12 +259,53 @@ func NewExporter(network, localaddr string) (*Exporter, os.Error) { } e := &Exporter{ listener: listener, - chans: make(map[string]*exportChan), + clientSet: &clientSet{ + chans: make(map[string]*chanDir), + clients: make(map[unackedCounter]bool), + }, } go e.listen() return e, nil } +// addClient creates a new expClient and records its existence +func (exp *Exporter) addClient(conn net.Conn) *expClient { + client := newClient(exp, conn) + exp.mu.Lock() + exp.clients[client] = true + exp.mu.Unlock() + return client +} + +// delClient forgets the client existed +func (exp *Exporter) delClient(client *expClient) { + exp.mu.Lock() + exp.clients[client] = false, false + exp.mu.Unlock() +} + +// Drain waits until all messages sent from this exporter/importer, including +// those not yet sent to any client and possibly including those sent while +// Drain was executing, have been received by the importer. In short, it +// waits until all the exporter's messages have been received by a client. +// If the timeout (measured in nanoseconds) is positive and Drain takes +// longer than that to complete, an error is returned. +func (exp *Exporter) Drain(timeout int64) os.Error { + // This wrapper function is here so the method's comment will appear in godoc. + return exp.clientSet.drain(timeout) +} + +// Sync waits until all clients of the exporter have received the messages +// that were sent at the time Sync was invoked. Unlike Drain, it does not +// wait for messages sent while it is running or messages that have not been +// dispatched to any client. If the timeout (measured in nanoseconds) is +// positive and Sync takes longer than that to complete, an error is +// returned. +func (exp *Exporter) Sync(timeout int64) os.Error { + // This wrapper function is here so the method's comment will appear in godoc. + return exp.clientSet.sync(timeout) +} + // Addr returns the Exporter's local network address. func (exp *Exporter) Addr() net.Addr { return exp.listener.Addr() } @@ -229,12 +341,28 @@ func (exp *Exporter) Export(name string, chT interface{}, dir Dir) os.Error { if err != nil { return err } - exp.chanLock.Lock() - defer exp.chanLock.Unlock() + exp.mu.Lock() + defer exp.mu.Unlock() _, present := exp.chans[name] if present { return os.ErrorString("channel name already being exported:" + name) } - exp.chans[name] = &exportChan{ch, dir} + exp.chans[name] = &chanDir{ch, dir} + return nil +} + +// Hangup disassociates the named channel from the Exporter and closes +// the channel. Messages in flight for the channel may be dropped. +func (exp *Exporter) Hangup(name string) os.Error { + exp.mu.Lock() + chDir, ok := exp.chans[name] + if ok { + exp.chans[name] = nil, false + } + exp.mu.Unlock() + if !ok { + return os.ErrorString("netchan export: hangup: no such channel: " + name) + } + chDir.ch.Close() return nil } |