diff options
author | Ondřej Surý <ondrej@sury.org> | 2011-02-14 13:23:51 +0100 |
---|---|---|
committer | Ondřej Surý <ondrej@sury.org> | 2011-02-14 13:23:51 +0100 |
commit | 758ff64c69e34965f8af5b2d6ffd65e8d7ab2150 (patch) | |
tree | 6d6b34f8c678862fe9b56c945a7b63f68502c245 /src/pkg/netchan/import.go | |
parent | 3e45412327a2654a77944249962b3652e6142299 (diff) | |
download | golang-upstream/2011-02-01.1.tar.gz |
Imported Upstream version 2011-02-01.1upstream/2011-02-01.1
Diffstat (limited to 'src/pkg/netchan/import.go')
-rw-r--r-- | src/pkg/netchan/import.go | 102 |
1 files changed, 63 insertions, 39 deletions
diff --git a/src/pkg/netchan/import.go b/src/pkg/netchan/import.go index baae367a0..d220d9a66 100644 --- a/src/pkg/netchan/import.go +++ b/src/pkg/netchan/import.go @@ -27,8 +27,10 @@ type Importer struct { *encDec conn net.Conn chanLock sync.Mutex // protects access to channel map - chans map[string]*chanDir + names map[string]*netChan + chans map[int]*netChan errors chan os.Error + maxId int } // NewImporter creates a new Importer object to import channels @@ -43,7 +45,8 @@ func NewImporter(network, remoteaddr string) (*Importer, os.Error) { imp := new(Importer) imp.encDec = newEncDec(conn) imp.conn = conn - imp.chans = make(map[string]*chanDir) + imp.chans = make(map[int]*netChan) + imp.names = make(map[string]*netChan) imp.errors = make(chan os.Error, 10) go imp.run() return imp, nil @@ -54,7 +57,7 @@ func (imp *Importer) shutdown() { imp.chanLock.Lock() for _, ich := range imp.chans { if ich.dir == Recv { - ich.ch.Close() + ich.close() } } imp.chanLock.Unlock() @@ -88,50 +91,62 @@ func (imp *Importer) run() { } if err.Error != "" { impLog("response error:", err.Error) - if sent := imp.errors <- os.ErrorString(err.Error); !sent { + select { + case imp.errors <- os.ErrorString(err.Error): + continue // errors are not acknowledged + default: imp.shutdown() return } - continue // errors are not acknowledged. } case payClosed: - ich := imp.getChan(hdr.Name) - if ich != nil { - ich.ch.Close() + nch := imp.getChan(hdr.Id, false) + if nch != nil { + nch.close() } continue // closes are not acknowledged. + case payAckSend: + // we can receive spurious acks if the channel is + // hung up, so we ask getChan to ignore any errors. + nch := imp.getChan(hdr.Id, true) + if nch != nil { + nch.acked() + } + continue default: impLog("unexpected payload type:", hdr.PayloadType) return } - ich := imp.getChan(hdr.Name) - if ich == nil { + nch := imp.getChan(hdr.Id, false) + if nch == nil { continue } - if ich.dir != Recv { + if nch.dir != Recv { impLog("cannot happen: receive from non-Recv channel") return } // Acknowledge receipt - ackHdr.Name = hdr.Name + ackHdr.Id = hdr.Id ackHdr.SeqNum = hdr.SeqNum imp.encode(ackHdr, payAck, nil) // Create a new value for each received item. - value := reflect.MakeZero(ich.ch.Type().(*reflect.ChanType).Elem()) + value := reflect.MakeZero(nch.ch.Type().(*reflect.ChanType).Elem()) if e := imp.decode(value); e != nil { impLog("importer value decode:", e) return } - ich.ch.Send(value) + nch.send(value) } } -func (imp *Importer) getChan(name string) *chanDir { +func (imp *Importer) getChan(id int, errOk bool) *netChan { imp.chanLock.Lock() - ich := imp.chans[name] + ich := imp.chans[id] imp.chanLock.Unlock() if ich == nil { - impLog("unknown name in netchan request:", name) + if !errOk { + impLog("unknown id in netchan request: ", id) + } return nil } return ich @@ -145,41 +160,49 @@ func (imp *Importer) Errors() chan os.Error { return imp.errors } -// Import imports a channel of the given type and specified direction. +// Import imports a channel of the given type, size and specified direction. // It is equivalent to ImportNValues with a count of -1, meaning unbounded. -func (imp *Importer) Import(name string, chT interface{}, dir Dir) os.Error { - return imp.ImportNValues(name, chT, dir, -1) +func (imp *Importer) Import(name string, chT interface{}, dir Dir, size int) os.Error { + return imp.ImportNValues(name, chT, dir, size, -1) } -// ImportNValues imports a channel of the given type and specified direction -// and then receives or transmits up to n values on that channel. A value of -// n==-1 implies an unbounded number of values. The channel to be bound to -// the remote site's channel is provided in the call and may be of arbitrary -// channel type. +// ImportNValues imports a channel of the given type and specified +// direction and then receives or transmits up to n values on that +// channel. A value of n==-1 implies an unbounded number of values. The +// channel will have buffer space for size values, or 1 value if size < 1. +// The channel to be bound to the remote site's channel is provided +// in the call and may be of arbitrary channel type. // Despite the literal signature, the effective signature is -// ImportNValues(name string, chT chan T, dir Dir, n int) os.Error +// ImportNValues(name string, chT chan T, dir Dir, size, n int) os.Error // Example usage: // imp, err := NewImporter("tcp", "netchanserver.mydomain.com:1234") -// if err != nil { log.Exit(err) } +// if err != nil { log.Fatal(err) } // ch := make(chan myType) -// err = imp.ImportNValues("name", ch, Recv, 1) -// if err != nil { log.Exit(err) } +// err = imp.ImportNValues("name", ch, Recv, 1, 1) +// if err != nil { log.Fatal(err) } // fmt.Printf("%+v\n", <-ch) -func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, n int) os.Error { +func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size, n int) os.Error { ch, err := checkChan(chT, dir) if err != nil { return err } imp.chanLock.Lock() defer imp.chanLock.Unlock() - _, present := imp.chans[name] + _, present := imp.names[name] if present { return os.ErrorString("channel name already being imported:" + name) } - imp.chans[name] = &chanDir{ch, dir} + if size < 1 { + size = 1 + } + id := imp.maxId + imp.maxId++ + nch := newNetChan(name, id, &chanDir{ch, dir}, imp.encDec, size, int64(n)) + imp.names[name] = nch + imp.chans[id] = nch // Tell the other side about this channel. - hdr := &header{Name: name} - req := &request{Count: int64(n), Dir: dir} + hdr := &header{Id: id} + req := &request{Name: name, Count: int64(n), Dir: dir, Size: size} if err = imp.encode(hdr, payRequest, req); err != nil { impLog("request encode:", err) return err @@ -187,8 +210,8 @@ func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, n int) if dir == Send { go func() { for i := 0; n == -1 || i < n; i++ { - val := ch.Recv() - if ch.Closed() { + val, closed := nch.recv() + if closed { if err = imp.encode(hdr, payClosed, nil); err != nil { impLog("error encoding client closed message:", err) } @@ -208,14 +231,15 @@ func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, n int) // the channel. Messages in flight for the channel may be dropped. func (imp *Importer) Hangup(name string) os.Error { imp.chanLock.Lock() - chDir, ok := imp.chans[name] + nc, ok := imp.names[name] if ok { - imp.chans[name] = nil, false + imp.names[name] = nil, false + imp.chans[nc.id] = nil, false } imp.chanLock.Unlock() if !ok { return os.ErrorString("netchan import: hangup: no such channel: " + name) } - chDir.ch.Close() + nc.close() return nil } |