diff options
Diffstat (limited to 'src/pkg/rpc/client.go')
-rw-r--r-- | src/pkg/rpc/client.go | 287 |
1 files changed, 0 insertions, 287 deletions
diff --git a/src/pkg/rpc/client.go b/src/pkg/rpc/client.go deleted file mode 100644 index b1828614f..000000000 --- a/src/pkg/rpc/client.go +++ /dev/null @@ -1,287 +0,0 @@ -// Copyright 2009 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package rpc - -import ( - "bufio" - "gob" - "http" - "io" - "log" - "net" - "os" - "sync" -) - -// ServerError represents an error that has been returned from -// the remote side of the RPC connection. -type ServerError string - -func (e ServerError) String() string { - return string(e) -} - -var ErrShutdown = os.NewError("connection is shut down") - -// Call represents an active RPC. -type Call struct { - ServiceMethod string // The name of the service and method to call. - Args interface{} // The argument to the function (*struct). - Reply interface{} // The reply from the function (*struct). - Error os.Error // After completion, the error status. - Done chan *Call // Strobes when call is complete; value is the error status. - seq uint64 -} - -// Client represents an RPC Client. -// There may be multiple outstanding Calls associated -// with a single Client. -type Client struct { - mutex sync.Mutex // protects pending, seq, request - sending sync.Mutex - request Request - seq uint64 - codec ClientCodec - pending map[uint64]*Call - closing bool - shutdown bool -} - -// A ClientCodec implements writing of RPC requests and -// reading of RPC responses for the client side of an RPC session. -// The client calls WriteRequest to write a request to the connection -// and calls ReadResponseHeader and ReadResponseBody in pairs -// to read responses. The client calls Close when finished with the -// connection. ReadResponseBody may be called with a nil -// argument to force the body of the response to be read and then -// discarded. -type ClientCodec interface { - WriteRequest(*Request, interface{}) os.Error - ReadResponseHeader(*Response) os.Error - ReadResponseBody(interface{}) os.Error - - Close() os.Error -} - -func (client *Client) send(c *Call) { - // Register this call. - client.mutex.Lock() - if client.shutdown { - c.Error = ErrShutdown - client.mutex.Unlock() - c.done() - return - } - c.seq = client.seq - client.seq++ - client.pending[c.seq] = c - client.mutex.Unlock() - - // Encode and send the request. - client.sending.Lock() - defer client.sending.Unlock() - client.request.Seq = c.seq - client.request.ServiceMethod = c.ServiceMethod - if err := client.codec.WriteRequest(&client.request, c.Args); err != nil { - panic("rpc: client encode error: " + err.String()) - } -} - -func (client *Client) input() { - var err os.Error - var response Response - for err == nil { - response = Response{} - err = client.codec.ReadResponseHeader(&response) - if err != nil { - if err == os.EOF && !client.closing { - err = io.ErrUnexpectedEOF - } - break - } - seq := response.Seq - client.mutex.Lock() - c := client.pending[seq] - client.pending[seq] = c, false - client.mutex.Unlock() - - if response.Error == "" { - err = client.codec.ReadResponseBody(c.Reply) - if err != nil { - c.Error = os.NewError("reading body " + err.String()) - } - } else { - // We've got an error response. Give this to the request; - // any subsequent requests will get the ReadResponseBody - // error if there is one. - c.Error = ServerError(response.Error) - err = client.codec.ReadResponseBody(nil) - if err != nil { - err = os.NewError("reading error body: " + err.String()) - } - } - c.done() - } - // Terminate pending calls. - client.mutex.Lock() - client.shutdown = true - for _, call := range client.pending { - call.Error = err - call.done() - } - client.mutex.Unlock() - if err != os.EOF || !client.closing { - log.Println("rpc: client protocol error:", err) - } -} - -func (call *Call) done() { - select { - case call.Done <- call: - // ok - default: - // We don't want to block here. It is the caller's responsibility to make - // sure the channel has enough buffer space. See comment in Go(). - } -} - -// NewClient returns a new Client to handle requests to the -// set of services at the other end of the connection. -// It adds a buffer to the write side of the connection so -// the header and payload are sent as a unit. -func NewClient(conn io.ReadWriteCloser) *Client { - encBuf := bufio.NewWriter(conn) - client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf} - return NewClientWithCodec(client) -} - -// NewClientWithCodec is like NewClient but uses the specified -// codec to encode requests and decode responses. -func NewClientWithCodec(codec ClientCodec) *Client { - client := &Client{ - codec: codec, - pending: make(map[uint64]*Call), - } - go client.input() - return client -} - -type gobClientCodec struct { - rwc io.ReadWriteCloser - dec *gob.Decoder - enc *gob.Encoder - encBuf *bufio.Writer -} - -func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err os.Error) { - if err = c.enc.Encode(r); err != nil { - return - } - if err = c.enc.Encode(body); err != nil { - return - } - return c.encBuf.Flush() -} - -func (c *gobClientCodec) ReadResponseHeader(r *Response) os.Error { - return c.dec.Decode(r) -} - -func (c *gobClientCodec) ReadResponseBody(body interface{}) os.Error { - return c.dec.Decode(body) -} - -func (c *gobClientCodec) Close() os.Error { - return c.rwc.Close() -} - - -// DialHTTP connects to an HTTP RPC server at the specified network address -// listening on the default HTTP RPC path. -func DialHTTP(network, address string) (*Client, os.Error) { - return DialHTTPPath(network, address, DefaultRPCPath) -} - -// DialHTTPPath connects to an HTTP RPC server -// at the specified network address and path. -func DialHTTPPath(network, address, path string) (*Client, os.Error) { - var err os.Error - conn, err := net.Dial(network, address) - if err != nil { - return nil, err - } - io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n") - - // Require successful HTTP response - // before switching to RPC protocol. - resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"}) - if err == nil && resp.Status == connected { - return NewClient(conn), nil - } - if err == nil { - err = os.NewError("unexpected HTTP response: " + resp.Status) - } - conn.Close() - return nil, &net.OpError{"dial-http", network + " " + address, nil, err} -} - -// Dial connects to an RPC server at the specified network address. -func Dial(network, address string) (*Client, os.Error) { - conn, err := net.Dial(network, address) - if err != nil { - return nil, err - } - return NewClient(conn), nil -} - -func (client *Client) Close() os.Error { - client.mutex.Lock() - if client.shutdown || client.closing { - client.mutex.Unlock() - return ErrShutdown - } - client.closing = true - client.mutex.Unlock() - return client.codec.Close() -} - -// Go invokes the function asynchronously. It returns the Call structure representing -// the invocation. The done channel will signal when the call is complete by returning -// the same Call object. If done is nil, Go will allocate a new channel. -// If non-nil, done must be buffered or Go will deliberately crash. -func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call { - c := new(Call) - c.ServiceMethod = serviceMethod - c.Args = args - c.Reply = reply - if done == nil { - done = make(chan *Call, 10) // buffered. - } else { - // If caller passes done != nil, it must arrange that - // done has enough buffer for the number of simultaneous - // RPCs that will be using that channel. If the channel - // is totally unbuffered, it's best not to run at all. - if cap(done) == 0 { - log.Panic("rpc: done channel is unbuffered") - } - } - c.Done = done - if client.shutdown { - c.Error = ErrShutdown - c.done() - return c - } - client.send(c) - return c -} - -// Call invokes the named function, waits for it to complete, and returns its error status. -func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) os.Error { - if client.shutdown { - return ErrShutdown - } - call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done - return call.Error -} |