diff options
author | Ondřej Surý <ondrej@sury.org> | 2011-02-14 13:23:51 +0100 |
---|---|---|
committer | Ondřej Surý <ondrej@sury.org> | 2011-02-14 13:23:51 +0100 |
commit | 758ff64c69e34965f8af5b2d6ffd65e8d7ab2150 (patch) | |
tree | 6d6b34f8c678862fe9b56c945a7b63f68502c245 /src/pkg/netchan/export.go | |
parent | 3e45412327a2654a77944249962b3652e6142299 (diff) | |
download | golang-upstream/2011-02-01.1.tar.gz |
Imported Upstream version 2011-02-01.1upstream/2011-02-01.1
Diffstat (limited to 'src/pkg/netchan/export.go')
-rw-r--r-- | src/pkg/netchan/export.go | 94 |
1 files changed, 58 insertions, 36 deletions
diff --git a/src/pkg/netchan/export.go b/src/pkg/netchan/export.go index 9ad388c18..0b28536ed 100644 --- a/src/pkg/netchan/export.go +++ b/src/pkg/netchan/export.go @@ -26,6 +26,7 @@ import ( "net" "os" "reflect" + "strconv" "sync" ) @@ -48,11 +49,12 @@ type Exporter struct { type expClient struct { *encDec exp *Exporter - mu sync.Mutex // protects remaining fields - errored bool // client has been sent an error - seqNum int64 // sequences messages sent to client; has value of highest sent - ackNum int64 // highest sequence number acknowledged - seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu + chans map[int]*netChan // channels in use by client + mu sync.Mutex // protects remaining fields + errored bool // client has been sent an error + seqNum int64 // sequences messages sent to client; has value of highest sent + ackNum int64 // highest sequence number acknowledged + seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu } func newClient(exp *Exporter, conn net.Conn) *expClient { @@ -61,8 +63,8 @@ func newClient(exp *Exporter, conn net.Conn) *expClient { client.encDec = newEncDec(conn) client.seqNum = 0 client.ackNum = 0 + client.chans = make(map[int]*netChan) return client - } func (client *expClient) sendError(hdr *header, err string) { @@ -74,20 +76,33 @@ func (client *expClient) sendError(hdr *header, err string) { client.mu.Unlock() } -func (client *expClient) getChan(hdr *header, dir Dir) *chanDir { +func (client *expClient) newChan(hdr *header, dir Dir, name string, size int, count int64) *netChan { exp := client.exp exp.mu.Lock() - ech, ok := exp.chans[hdr.Name] + ech, ok := exp.names[name] exp.mu.Unlock() if !ok { - client.sendError(hdr, "no such channel: "+hdr.Name) + client.sendError(hdr, "no such channel: "+name) return nil } if ech.dir != dir { - client.sendError(hdr, "wrong direction for channel: "+hdr.Name) + client.sendError(hdr, "wrong direction for channel: "+name) + return nil + } + nch := newNetChan(name, hdr.Id, ech, client.encDec, size, count) + client.chans[hdr.Id] = nch + return nch +} + +func (client *expClient) getChan(hdr *header, dir Dir) *netChan { + nch := client.chans[hdr.Id] + if nch == nil { return nil } - return ech + if nch.dir != dir { + client.sendError(hdr, "wrong direction for channel: "+nch.name) + } + return nch } // The function run manages sends and receives for a single client. For each @@ -113,12 +128,18 @@ func (client *expClient) run() { expLog("error decoding client request:", err) break } + if req.Size < 1 { + panic("netchan: remote requested " + strconv.Itoa(req.Size) + " values") + } switch req.Dir { case Recv: - go client.serveRecv(*hdr, req.Count) + // look up channel before calling serveRecv to + // avoid a lock around client.chans. + if nch := client.newChan(hdr, Send, req.Name, req.Size, req.Count); nch != nil { + go client.serveRecv(nch, *hdr, req.Count) + } case Send: - // Request to send is clear as a matter of protocol - // but not actually used by the implementation. + client.newChan(hdr, Recv, req.Name, req.Size, req.Count) // The actual sends will have payload type payData. // TODO: manage the count? default: @@ -143,8 +164,12 @@ func (client *expClient) run() { client.ackNum = hdr.SeqNum } client.mu.Unlock() + case payAckSend: + if nch := client.getChan(hdr, Send); nch != nil { + nch.acked() + } default: - log.Exit("netchan export: unknown payload type", hdr.PayloadType) + log.Fatal("netchan export: unknown payload type", hdr.PayloadType) } } client.exp.delClient(client) @@ -152,14 +177,10 @@ func (client *expClient) run() { // Send all the data on a single channel to a client asking for a Recv. // The header is passed by value to avoid issues of overwriting. -func (client *expClient) serveRecv(hdr header, count int64) { - ech := client.getChan(&hdr, Send) - if ech == nil { - return - } +func (client *expClient) serveRecv(nch *netChan, hdr header, count int64) { for { - val := ech.ch.Recv() - if ech.ch.Closed() { + val, closed := nch.recv() + if closed { if err := client.encode(&hdr, payClosed, nil); err != nil { expLog("error encoding server closed message:", err) } @@ -167,7 +188,7 @@ func (client *expClient) serveRecv(hdr header, count int64) { } // We hold the lock during transmission to guarantee messages are // sent in sequence number order. Also, we increment first so the - // value of client.seqNum is the value of the highest used sequence + // value of client.SeqNum is the value of the highest used sequence // number, not one beyond. client.mu.Lock() client.seqNum++ @@ -193,27 +214,27 @@ func (client *expClient) serveRecv(hdr header, count int64) { // Receive and deliver locally one item from a client asking for a Send // The header is passed by value to avoid issues of overwriting. func (client *expClient) serveSend(hdr header) { - ech := client.getChan(&hdr, Recv) - if ech == nil { + nch := client.getChan(&hdr, Recv) + if nch == nil { return } // Create a new value for each received item. - val := reflect.MakeZero(ech.ch.Type().(*reflect.ChanType).Elem()) + val := reflect.MakeZero(nch.ch.Type().(*reflect.ChanType).Elem()) if err := client.decode(val); err != nil { - expLog("value decode:", err) + expLog("value decode:", err, "; type ", nch.ch.Type()) return } - ech.ch.Send(val) + nch.send(val) } // Report that client has closed the channel that is sending to us. // The header is passed by value to avoid issues of overwriting. func (client *expClient) serveClosed(hdr header) { - ech := client.getChan(&hdr, Recv) - if ech == nil { + nch := client.getChan(&hdr, Recv) + if nch == nil { return } - ech.ch.Close() + nch.close() } func (client *expClient) unackedCount() int64 { @@ -260,7 +281,7 @@ func NewExporter(network, localaddr string) (*Exporter, os.Error) { e := &Exporter{ listener: listener, clientSet: &clientSet{ - chans: make(map[string]*chanDir), + names: make(map[string]*chanDir), clients: make(map[unackedCounter]bool), }, } @@ -343,11 +364,11 @@ func (exp *Exporter) Export(name string, chT interface{}, dir Dir) os.Error { } exp.mu.Lock() defer exp.mu.Unlock() - _, present := exp.chans[name] + _, present := exp.names[name] if present { return os.ErrorString("channel name already being exported:" + name) } - exp.chans[name] = &chanDir{ch, dir} + exp.names[name] = &chanDir{ch, dir} return nil } @@ -355,10 +376,11 @@ func (exp *Exporter) Export(name string, chT interface{}, dir Dir) os.Error { // the channel. Messages in flight for the channel may be dropped. func (exp *Exporter) Hangup(name string) os.Error { exp.mu.Lock() - chDir, ok := exp.chans[name] + chDir, ok := exp.names[name] if ok { - exp.chans[name] = nil, false + exp.names[name] = nil, false } + // TODO drop all instances of channel from client sets exp.mu.Unlock() if !ok { return os.ErrorString("netchan export: hangup: no such channel: " + name) |