diff options
Diffstat (limited to 'src/pkg/rpc/client.go')
-rw-r--r-- | src/pkg/rpc/client.go | 164 |
1 files changed, 82 insertions, 82 deletions
diff --git a/src/pkg/rpc/client.go b/src/pkg/rpc/client.go index 827dbe1b4..673283be3 100644 --- a/src/pkg/rpc/client.go +++ b/src/pkg/rpc/client.go @@ -5,84 +5,84 @@ package rpc import ( - "bufio"; - "gob"; - "http"; - "io"; - "log"; - "net"; - "os"; - "sync"; + "bufio" + "gob" + "http" + "io" + "log" + "net" + "os" + "sync" ) // Call represents an active RPC. type Call struct { - ServiceMethod string; // The name of the service and method to call. - Args interface{}; // The argument to the function (*struct). - Reply interface{}; // The reply from the function (*struct). - Error os.Error; // After completion, the error status. - Done chan *Call; // Strobes when call is complete; value is the error status. - seq uint64; + ServiceMethod string // The name of the service and method to call. + Args interface{} // The argument to the function (*struct). + Reply interface{} // The reply from the function (*struct). + Error os.Error // After completion, the error status. + Done chan *Call // Strobes when call is complete; value is the error status. + seq uint64 } // Client represents an RPC Client. // There may be multiple outstanding Calls associated // with a single Client. type Client struct { - mutex sync.Mutex; // protects pending, seq - shutdown os.Error; // non-nil if the client is shut down - sending sync.Mutex; - seq uint64; - conn io.ReadWriteCloser; - enc *gob.Encoder; - dec *gob.Decoder; - pending map[uint64]*Call; + mutex sync.Mutex // protects pending, seq + shutdown os.Error // non-nil if the client is shut down + sending sync.Mutex + seq uint64 + conn io.ReadWriteCloser + enc *gob.Encoder + dec *gob.Decoder + pending map[uint64]*Call } func (client *Client) send(c *Call) { // Register this call. - client.mutex.Lock(); + client.mutex.Lock() if client.shutdown != nil { - c.Error = client.shutdown; - client.mutex.Unlock(); - _ = c.Done <- c; // do not block - return; + c.Error = client.shutdown + client.mutex.Unlock() + _ = c.Done <- c // do not block + return } - c.seq = client.seq; - client.seq++; - client.pending[c.seq] = c; - client.mutex.Unlock(); + c.seq = client.seq + client.seq++ + client.pending[c.seq] = c + client.mutex.Unlock() // Encode and send the request. - request := new(Request); - client.sending.Lock(); - request.Seq = c.seq; - request.ServiceMethod = c.ServiceMethod; - client.enc.Encode(request); - err := client.enc.Encode(c.Args); + request := new(Request) + client.sending.Lock() + request.Seq = c.seq + request.ServiceMethod = c.ServiceMethod + client.enc.Encode(request) + err := client.enc.Encode(c.Args) if err != nil { panicln("rpc: client encode error:", err.String()) } - client.sending.Unlock(); + client.sending.Unlock() } func (client *Client) input() { - var err os.Error; + var err os.Error for err == nil { - response := new(Response); - err = client.dec.Decode(response); + response := new(Response) + err = client.dec.Decode(response) if err != nil { if err == os.EOF { err = io.ErrUnexpectedEOF } - break; + break } - seq := response.Seq; - client.mutex.Lock(); - c := client.pending[seq]; - client.pending[seq] = c, false; - client.mutex.Unlock(); - err = client.dec.Decode(c.Reply); + seq := response.Seq + client.mutex.Lock() + c := client.pending[seq] + client.pending[seq] = c, false + client.mutex.Unlock() + err = client.dec.Decode(c.Reply) // Empty strings should turn into nil os.Errors if response.Error != "" { c.Error = os.ErrorString(response.Error) @@ -91,59 +91,59 @@ func (client *Client) input() { } // We don't want to block here. It is the caller's responsibility to make // sure the channel has enough buffer space. See comment in Go(). - _ = c.Done <- c; // do not block + _ = c.Done <- c // do not block } // Terminate pending calls. - client.mutex.Lock(); - client.shutdown = err; + client.mutex.Lock() + client.shutdown = err for _, call := range client.pending { - call.Error = err; - _ = call.Done <- call; // do not block + call.Error = err + _ = call.Done <- call // do not block } - client.mutex.Unlock(); - log.Stderr("rpc: client protocol error:", err); + client.mutex.Unlock() + log.Stderr("rpc: client protocol error:", err) } // NewClient returns a new Client to handle requests to the // set of services at the other end of the connection. func NewClient(conn io.ReadWriteCloser) *Client { - client := new(Client); - client.conn = conn; - client.enc = gob.NewEncoder(conn); - client.dec = gob.NewDecoder(conn); - client.pending = make(map[uint64]*Call); - go client.input(); - return client; + client := new(Client) + client.conn = conn + client.enc = gob.NewEncoder(conn) + client.dec = gob.NewDecoder(conn) + client.pending = make(map[uint64]*Call) + go client.input() + return client } // DialHTTP connects to an HTTP RPC server at the specified network address. func DialHTTP(network, address string) (*Client, os.Error) { - conn, err := net.Dial(network, "", address); + conn, err := net.Dial(network, "", address) if err != nil { return nil, err } - io.WriteString(conn, "CONNECT "+rpcPath+" HTTP/1.0\n\n"); + io.WriteString(conn, "CONNECT "+rpcPath+" HTTP/1.0\n\n") // Require successful HTTP response // before switching to RPC protocol. - resp, err := http.ReadResponse(bufio.NewReader(conn)); + resp, err := http.ReadResponse(bufio.NewReader(conn)) if err == nil && resp.Status == connected { return NewClient(conn), nil } if err == nil { err = os.ErrorString("unexpected HTTP response: " + resp.Status) } - conn.Close(); - return nil, &net.OpError{"dial-http", network + " " + address, nil, err}; + conn.Close() + return nil, &net.OpError{"dial-http", network + " " + address, nil, err} } // Dial connects to an RPC server at the specified network address. func Dial(network, address string) (*Client, os.Error) { - conn, err := net.Dial(network, "", address); + conn, err := net.Dial(network, "", address) if err != nil { return nil, err } - return NewClient(conn), nil; + return NewClient(conn), nil } // Go invokes the function asynchronously. It returns the Call structure representing @@ -151,12 +151,12 @@ func Dial(network, address string) (*Client, os.Error) { // the same Call object. If done is nil, Go will allocate a new channel. // If non-nil, done must be buffered or Go will deliberately crash. func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call { - c := new(Call); - c.ServiceMethod = serviceMethod; - c.Args = args; - c.Reply = reply; + c := new(Call) + c.ServiceMethod = serviceMethod + c.Args = args + c.Reply = reply if done == nil { - done = make(chan *Call, 10) // buffered. + done = make(chan *Call, 10) // buffered. } else { // If caller passes done != nil, it must arrange that // done has enough buffer for the number of simultaneous @@ -166,14 +166,14 @@ func (client *Client) Go(serviceMethod string, args interface{}, reply interface log.Crash("rpc: done channel is unbuffered") } } - c.Done = done; + c.Done = done if client.shutdown != nil { - c.Error = client.shutdown; - _ = c.Done <- c; // do not block - return c; + c.Error = client.shutdown + _ = c.Done <- c // do not block + return c } - client.send(c); - return c; + client.send(c) + return c } // Call invokes the named function, waits for it to complete, and returns its error status. @@ -181,6 +181,6 @@ func (client *Client) Call(serviceMethod string, args interface{}, reply interfa if client.shutdown != nil { return client.shutdown } - call := <-client.Go(serviceMethod, args, reply, nil).Done; - return call.Error; + call := <-client.Go(serviceMethod, args, reply, nil).Done + return call.Error } |