summaryrefslogtreecommitdiff
path: root/src/pkg/rpc/client.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/pkg/rpc/client.go')
-rw-r--r--src/pkg/rpc/client.go45
1 files changed, 27 insertions, 18 deletions
diff --git a/src/pkg/rpc/client.go b/src/pkg/rpc/client.go
index 6de6d1325..8af4afcf6 100644
--- a/src/pkg/rpc/client.go
+++ b/src/pkg/rpc/client.go
@@ -39,8 +39,9 @@ type Call struct {
// There may be multiple outstanding Calls associated
// with a single Client.
type Client struct {
- mutex sync.Mutex // protects pending, seq
+ mutex sync.Mutex // protects pending, seq, request
sending sync.Mutex
+ request Request
seq uint64
codec ClientCodec
pending map[uint64]*Call
@@ -79,21 +80,21 @@ func (client *Client) send(c *Call) {
client.mutex.Unlock()
// Encode and send the request.
- request := new(Request)
client.sending.Lock()
defer client.sending.Unlock()
- request.Seq = c.seq
- request.ServiceMethod = c.ServiceMethod
- if err := client.codec.WriteRequest(request, c.Args); err != nil {
+ client.request.Seq = c.seq
+ client.request.ServiceMethod = c.ServiceMethod
+ if err := client.codec.WriteRequest(&client.request, c.Args); err != nil {
panic("rpc: client encode error: " + err.String())
}
}
func (client *Client) input() {
var err os.Error
+ var response Response
for err == nil {
- response := new(Response)
- err = client.codec.ReadResponseHeader(response)
+ response = Response{}
+ err = client.codec.ReadResponseHeader(&response)
if err != nil {
if err == os.EOF && !client.closing {
err = io.ErrUnexpectedEOF
@@ -148,8 +149,12 @@ func (call *Call) done() {
// NewClient returns a new Client to handle requests to the
// set of services at the other end of the connection.
+// It adds a buffer to the write side of the connection so
+// the header and payload are sent as a unit.
func NewClient(conn io.ReadWriteCloser) *Client {
- return NewClientWithCodec(&gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(conn)})
+ encBuf := bufio.NewWriter(conn)
+ client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
+ return NewClientWithCodec(client)
}
// NewClientWithCodec is like NewClient but uses the specified
@@ -164,16 +169,20 @@ func NewClientWithCodec(codec ClientCodec) *Client {
}
type gobClientCodec struct {
- rwc io.ReadWriteCloser
- dec *gob.Decoder
- enc *gob.Encoder
+ rwc io.ReadWriteCloser
+ dec *gob.Decoder
+ enc *gob.Encoder
+ encBuf *bufio.Writer
}
-func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) os.Error {
- if err := c.enc.Encode(r); err != nil {
- return err
+func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err os.Error) {
+ if err = c.enc.Encode(r); err != nil {
+ return
+ }
+ if err = c.enc.Encode(body); err != nil {
+ return
}
- return c.enc.Encode(body)
+ return c.encBuf.Flush()
}
func (c *gobClientCodec) ReadResponseHeader(r *Response) os.Error {
@@ -199,7 +208,7 @@ func DialHTTP(network, address string) (*Client, os.Error) {
// at the specified network address and path.
func DialHTTPPath(network, address, path string) (*Client, os.Error) {
var err os.Error
- conn, err := net.Dial(network, "", address)
+ conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
@@ -220,7 +229,7 @@ func DialHTTPPath(network, address, path string) (*Client, os.Error) {
// Dial connects to an RPC server at the specified network address.
func Dial(network, address string) (*Client, os.Error) {
- conn, err := net.Dial(network, "", address)
+ conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
@@ -273,6 +282,6 @@ func (client *Client) Call(serviceMethod string, args interface{}, reply interfa
if client.shutdown {
return ErrShutdown
}
- call := <-client.Go(serviceMethod, args, reply, nil).Done
+ call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}