summaryrefslogtreecommitdiff
path: root/src/pkg/rpc/client.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/pkg/rpc/client.go')
-rw-r--r--src/pkg/rpc/client.go58
1 files changed, 39 insertions, 19 deletions
diff --git a/src/pkg/rpc/client.go b/src/pkg/rpc/client.go
index 6f028c10d..6de6d1325 100644
--- a/src/pkg/rpc/client.go
+++ b/src/pkg/rpc/client.go
@@ -15,6 +15,16 @@ import (
"sync"
)
+// ServerError represents an error that has been returned from
+// the remote side of the RPC connection.
+type ServerError string
+
+func (e ServerError) String() string {
+ return string(e)
+}
+
+const ErrShutdown = os.ErrorString("connection is shut down")
+
// Call represents an active RPC.
type Call struct {
ServiceMethod string // The name of the service and method to call.
@@ -30,12 +40,12 @@ type Call struct {
// with a single Client.
type Client struct {
mutex sync.Mutex // protects pending, seq
- shutdown os.Error // non-nil if the client is shut down
sending sync.Mutex
seq uint64
codec ClientCodec
pending map[uint64]*Call
closing bool
+ shutdown bool
}
// A ClientCodec implements writing of RPC requests and
@@ -43,7 +53,9 @@ type Client struct {
// The client calls WriteRequest to write a request to the connection
// and calls ReadResponseHeader and ReadResponseBody in pairs
// to read responses. The client calls Close when finished with the
-// connection.
+// connection. ReadResponseBody may be called with a nil
+// argument to force the body of the response to be read and then
+// discarded.
type ClientCodec interface {
WriteRequest(*Request, interface{}) os.Error
ReadResponseHeader(*Response) os.Error
@@ -55,8 +67,8 @@ type ClientCodec interface {
func (client *Client) send(c *Call) {
// Register this call.
client.mutex.Lock()
- if client.shutdown != nil {
- c.Error = client.shutdown
+ if client.shutdown {
+ c.Error = ErrShutdown
client.mutex.Unlock()
c.done()
return
@@ -93,20 +105,27 @@ func (client *Client) input() {
c := client.pending[seq]
client.pending[seq] = c, false
client.mutex.Unlock()
- err = client.codec.ReadResponseBody(c.Reply)
- if response.Error != "" {
- c.Error = os.ErrorString(response.Error)
- } else if err != nil {
- c.Error = err
+
+ if response.Error == "" {
+ err = client.codec.ReadResponseBody(c.Reply)
+ if err != nil {
+ c.Error = os.ErrorString("reading body " + err.String())
+ }
} else {
- // Empty strings should turn into nil os.Errors
- c.Error = nil
+ // We've got an error response. Give this to the request;
+ // any subsequent requests will get the ReadResponseBody
+ // error if there is one.
+ c.Error = ServerError(response.Error)
+ err = client.codec.ReadResponseBody(nil)
+ if err != nil {
+ err = os.ErrorString("reading error body: " + err.String())
+ }
}
c.done()
}
// Terminate pending calls.
client.mutex.Lock()
- client.shutdown = err
+ client.shutdown = true
for _, call := range client.pending {
call.Error = err
call.done()
@@ -209,10 +228,11 @@ func Dial(network, address string) (*Client, os.Error) {
}
func (client *Client) Close() os.Error {
- if client.shutdown != nil || client.closing {
- return os.ErrorString("rpc: already closed")
- }
client.mutex.Lock()
+ if client.shutdown || client.closing {
+ client.mutex.Unlock()
+ return ErrShutdown
+ }
client.closing = true
client.mutex.Unlock()
return client.codec.Close()
@@ -239,8 +259,8 @@ func (client *Client) Go(serviceMethod string, args interface{}, reply interface
}
}
c.Done = done
- if client.shutdown != nil {
- c.Error = client.shutdown
+ if client.shutdown {
+ c.Error = ErrShutdown
c.done()
return c
}
@@ -250,8 +270,8 @@ func (client *Client) Go(serviceMethod string, args interface{}, reply interface
// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) os.Error {
- if client.shutdown != nil {
- return client.shutdown
+ if client.shutdown {
+ return ErrShutdown
}
call := <-client.Go(serviceMethod, args, reply, nil).Done
return call.Error