diff options
Diffstat (limited to 'src/pkg/rpc/client.go')
-rw-r--r-- | src/pkg/rpc/client.go | 58 |
1 files changed, 39 insertions, 19 deletions
diff --git a/src/pkg/rpc/client.go b/src/pkg/rpc/client.go index 6f028c10d..6de6d1325 100644 --- a/src/pkg/rpc/client.go +++ b/src/pkg/rpc/client.go @@ -15,6 +15,16 @@ import ( "sync" ) +// ServerError represents an error that has been returned from +// the remote side of the RPC connection. +type ServerError string + +func (e ServerError) String() string { + return string(e) +} + +const ErrShutdown = os.ErrorString("connection is shut down") + // Call represents an active RPC. type Call struct { ServiceMethod string // The name of the service and method to call. @@ -30,12 +40,12 @@ type Call struct { // with a single Client. type Client struct { mutex sync.Mutex // protects pending, seq - shutdown os.Error // non-nil if the client is shut down sending sync.Mutex seq uint64 codec ClientCodec pending map[uint64]*Call closing bool + shutdown bool } // A ClientCodec implements writing of RPC requests and @@ -43,7 +53,9 @@ type Client struct { // The client calls WriteRequest to write a request to the connection // and calls ReadResponseHeader and ReadResponseBody in pairs // to read responses. The client calls Close when finished with the -// connection. +// connection. ReadResponseBody may be called with a nil +// argument to force the body of the response to be read and then +// discarded. type ClientCodec interface { WriteRequest(*Request, interface{}) os.Error ReadResponseHeader(*Response) os.Error @@ -55,8 +67,8 @@ type ClientCodec interface { func (client *Client) send(c *Call) { // Register this call. client.mutex.Lock() - if client.shutdown != nil { - c.Error = client.shutdown + if client.shutdown { + c.Error = ErrShutdown client.mutex.Unlock() c.done() return @@ -93,20 +105,27 @@ func (client *Client) input() { c := client.pending[seq] client.pending[seq] = c, false client.mutex.Unlock() - err = client.codec.ReadResponseBody(c.Reply) - if response.Error != "" { - c.Error = os.ErrorString(response.Error) - } else if err != nil { - c.Error = err + + if response.Error == "" { + err = client.codec.ReadResponseBody(c.Reply) + if err != nil { + c.Error = os.ErrorString("reading body " + err.String()) + } } else { - // Empty strings should turn into nil os.Errors - c.Error = nil + // We've got an error response. Give this to the request; + // any subsequent requests will get the ReadResponseBody + // error if there is one. + c.Error = ServerError(response.Error) + err = client.codec.ReadResponseBody(nil) + if err != nil { + err = os.ErrorString("reading error body: " + err.String()) + } } c.done() } // Terminate pending calls. client.mutex.Lock() - client.shutdown = err + client.shutdown = true for _, call := range client.pending { call.Error = err call.done() @@ -209,10 +228,11 @@ func Dial(network, address string) (*Client, os.Error) { } func (client *Client) Close() os.Error { - if client.shutdown != nil || client.closing { - return os.ErrorString("rpc: already closed") - } client.mutex.Lock() + if client.shutdown || client.closing { + client.mutex.Unlock() + return ErrShutdown + } client.closing = true client.mutex.Unlock() return client.codec.Close() @@ -239,8 +259,8 @@ func (client *Client) Go(serviceMethod string, args interface{}, reply interface } } c.Done = done - if client.shutdown != nil { - c.Error = client.shutdown + if client.shutdown { + c.Error = ErrShutdown c.done() return c } @@ -250,8 +270,8 @@ func (client *Client) Go(serviceMethod string, args interface{}, reply interface // Call invokes the named function, waits for it to complete, and returns its error status. func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) os.Error { - if client.shutdown != nil { - return client.shutdown + if client.shutdown { + return ErrShutdown } call := <-client.Go(serviceMethod, args, reply, nil).Done return call.Error |