summaryrefslogtreecommitdiff
path: root/src/pkg/netchan/export.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/pkg/netchan/export.go')
-rw-r--r--src/pkg/netchan/export.go94
1 files changed, 58 insertions, 36 deletions
diff --git a/src/pkg/netchan/export.go b/src/pkg/netchan/export.go
index 9ad388c18..0b28536ed 100644
--- a/src/pkg/netchan/export.go
+++ b/src/pkg/netchan/export.go
@@ -26,6 +26,7 @@ import (
"net"
"os"
"reflect"
+ "strconv"
"sync"
)
@@ -48,11 +49,12 @@ type Exporter struct {
type expClient struct {
*encDec
exp *Exporter
- mu sync.Mutex // protects remaining fields
- errored bool // client has been sent an error
- seqNum int64 // sequences messages sent to client; has value of highest sent
- ackNum int64 // highest sequence number acknowledged
- seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu
+ chans map[int]*netChan // channels in use by client
+ mu sync.Mutex // protects remaining fields
+ errored bool // client has been sent an error
+ seqNum int64 // sequences messages sent to client; has value of highest sent
+ ackNum int64 // highest sequence number acknowledged
+ seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu
}
func newClient(exp *Exporter, conn net.Conn) *expClient {
@@ -61,8 +63,8 @@ func newClient(exp *Exporter, conn net.Conn) *expClient {
client.encDec = newEncDec(conn)
client.seqNum = 0
client.ackNum = 0
+ client.chans = make(map[int]*netChan)
return client
-
}
func (client *expClient) sendError(hdr *header, err string) {
@@ -74,20 +76,33 @@ func (client *expClient) sendError(hdr *header, err string) {
client.mu.Unlock()
}
-func (client *expClient) getChan(hdr *header, dir Dir) *chanDir {
+func (client *expClient) newChan(hdr *header, dir Dir, name string, size int, count int64) *netChan {
exp := client.exp
exp.mu.Lock()
- ech, ok := exp.chans[hdr.Name]
+ ech, ok := exp.names[name]
exp.mu.Unlock()
if !ok {
- client.sendError(hdr, "no such channel: "+hdr.Name)
+ client.sendError(hdr, "no such channel: "+name)
return nil
}
if ech.dir != dir {
- client.sendError(hdr, "wrong direction for channel: "+hdr.Name)
+ client.sendError(hdr, "wrong direction for channel: "+name)
+ return nil
+ }
+ nch := newNetChan(name, hdr.Id, ech, client.encDec, size, count)
+ client.chans[hdr.Id] = nch
+ return nch
+}
+
+func (client *expClient) getChan(hdr *header, dir Dir) *netChan {
+ nch := client.chans[hdr.Id]
+ if nch == nil {
return nil
}
- return ech
+ if nch.dir != dir {
+ client.sendError(hdr, "wrong direction for channel: "+nch.name)
+ }
+ return nch
}
// The function run manages sends and receives for a single client. For each
@@ -113,12 +128,18 @@ func (client *expClient) run() {
expLog("error decoding client request:", err)
break
}
+ if req.Size < 1 {
+ panic("netchan: remote requested " + strconv.Itoa(req.Size) + " values")
+ }
switch req.Dir {
case Recv:
- go client.serveRecv(*hdr, req.Count)
+ // look up channel before calling serveRecv to
+ // avoid a lock around client.chans.
+ if nch := client.newChan(hdr, Send, req.Name, req.Size, req.Count); nch != nil {
+ go client.serveRecv(nch, *hdr, req.Count)
+ }
case Send:
- // Request to send is clear as a matter of protocol
- // but not actually used by the implementation.
+ client.newChan(hdr, Recv, req.Name, req.Size, req.Count)
// The actual sends will have payload type payData.
// TODO: manage the count?
default:
@@ -143,8 +164,12 @@ func (client *expClient) run() {
client.ackNum = hdr.SeqNum
}
client.mu.Unlock()
+ case payAckSend:
+ if nch := client.getChan(hdr, Send); nch != nil {
+ nch.acked()
+ }
default:
- log.Exit("netchan export: unknown payload type", hdr.PayloadType)
+ log.Fatal("netchan export: unknown payload type", hdr.PayloadType)
}
}
client.exp.delClient(client)
@@ -152,14 +177,10 @@ func (client *expClient) run() {
// Send all the data on a single channel to a client asking for a Recv.
// The header is passed by value to avoid issues of overwriting.
-func (client *expClient) serveRecv(hdr header, count int64) {
- ech := client.getChan(&hdr, Send)
- if ech == nil {
- return
- }
+func (client *expClient) serveRecv(nch *netChan, hdr header, count int64) {
for {
- val := ech.ch.Recv()
- if ech.ch.Closed() {
+ val, closed := nch.recv()
+ if closed {
if err := client.encode(&hdr, payClosed, nil); err != nil {
expLog("error encoding server closed message:", err)
}
@@ -167,7 +188,7 @@ func (client *expClient) serveRecv(hdr header, count int64) {
}
// We hold the lock during transmission to guarantee messages are
// sent in sequence number order. Also, we increment first so the
- // value of client.seqNum is the value of the highest used sequence
+ // value of client.SeqNum is the value of the highest used sequence
// number, not one beyond.
client.mu.Lock()
client.seqNum++
@@ -193,27 +214,27 @@ func (client *expClient) serveRecv(hdr header, count int64) {
// Receive and deliver locally one item from a client asking for a Send
// The header is passed by value to avoid issues of overwriting.
func (client *expClient) serveSend(hdr header) {
- ech := client.getChan(&hdr, Recv)
- if ech == nil {
+ nch := client.getChan(&hdr, Recv)
+ if nch == nil {
return
}
// Create a new value for each received item.
- val := reflect.MakeZero(ech.ch.Type().(*reflect.ChanType).Elem())
+ val := reflect.MakeZero(nch.ch.Type().(*reflect.ChanType).Elem())
if err := client.decode(val); err != nil {
- expLog("value decode:", err)
+ expLog("value decode:", err, "; type ", nch.ch.Type())
return
}
- ech.ch.Send(val)
+ nch.send(val)
}
// Report that client has closed the channel that is sending to us.
// The header is passed by value to avoid issues of overwriting.
func (client *expClient) serveClosed(hdr header) {
- ech := client.getChan(&hdr, Recv)
- if ech == nil {
+ nch := client.getChan(&hdr, Recv)
+ if nch == nil {
return
}
- ech.ch.Close()
+ nch.close()
}
func (client *expClient) unackedCount() int64 {
@@ -260,7 +281,7 @@ func NewExporter(network, localaddr string) (*Exporter, os.Error) {
e := &Exporter{
listener: listener,
clientSet: &clientSet{
- chans: make(map[string]*chanDir),
+ names: make(map[string]*chanDir),
clients: make(map[unackedCounter]bool),
},
}
@@ -343,11 +364,11 @@ func (exp *Exporter) Export(name string, chT interface{}, dir Dir) os.Error {
}
exp.mu.Lock()
defer exp.mu.Unlock()
- _, present := exp.chans[name]
+ _, present := exp.names[name]
if present {
return os.ErrorString("channel name already being exported:" + name)
}
- exp.chans[name] = &chanDir{ch, dir}
+ exp.names[name] = &chanDir{ch, dir}
return nil
}
@@ -355,10 +376,11 @@ func (exp *Exporter) Export(name string, chT interface{}, dir Dir) os.Error {
// the channel. Messages in flight for the channel may be dropped.
func (exp *Exporter) Hangup(name string) os.Error {
exp.mu.Lock()
- chDir, ok := exp.chans[name]
+ chDir, ok := exp.names[name]
if ok {
- exp.chans[name] = nil, false
+ exp.names[name] = nil, false
}
+ // TODO drop all instances of channel from client sets
exp.mu.Unlock()
if !ok {
return os.ErrorString("netchan export: hangup: no such channel: " + name)