diff options
Diffstat (limited to 'src/pkg/net/rpc')
-rw-r--r-- | src/pkg/net/rpc/client.go | 14 | ||||
-rw-r--r-- | src/pkg/net/rpc/client_test.go | 36 | ||||
-rw-r--r-- | src/pkg/net/rpc/jsonrpc/all_test.go | 35 | ||||
-rw-r--r-- | src/pkg/net/rpc/jsonrpc/server.go | 6 | ||||
-rw-r--r-- | src/pkg/net/rpc/server.go | 5 | ||||
-rw-r--r-- | src/pkg/net/rpc/server_test.go | 38 |
6 files changed, 98 insertions, 36 deletions
diff --git a/src/pkg/net/rpc/client.go b/src/pkg/net/rpc/client.go index c524d0a0a..21f79b068 100644 --- a/src/pkg/net/rpc/client.go +++ b/src/pkg/net/rpc/client.go @@ -39,14 +39,16 @@ type Call struct { // with a single Client, and a Client may be used by // multiple goroutines simultaneously. type Client struct { - mutex sync.Mutex // protects pending, seq, request - sending sync.Mutex + codec ClientCodec + + sending sync.Mutex + + mutex sync.Mutex // protects following request Request seq uint64 - codec ClientCodec pending map[uint64]*Call - closing bool - shutdown bool + closing bool // user has called Close + shutdown bool // server has told us to stop } // A ClientCodec implements writing of RPC requests and @@ -274,7 +276,7 @@ func Dial(network, address string) (*Client, error) { func (client *Client) Close() error { client.mutex.Lock() - if client.shutdown || client.closing { + if client.closing { client.mutex.Unlock() return ErrShutdown } diff --git a/src/pkg/net/rpc/client_test.go b/src/pkg/net/rpc/client_test.go new file mode 100644 index 000000000..bbfc1ec3a --- /dev/null +++ b/src/pkg/net/rpc/client_test.go @@ -0,0 +1,36 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package rpc + +import ( + "errors" + "testing" +) + +type shutdownCodec struct { + responded chan int + closed bool +} + +func (c *shutdownCodec) WriteRequest(*Request, interface{}) error { return nil } +func (c *shutdownCodec) ReadResponseBody(interface{}) error { return nil } +func (c *shutdownCodec) ReadResponseHeader(*Response) error { + c.responded <- 1 + return errors.New("shutdownCodec ReadResponseHeader") +} +func (c *shutdownCodec) Close() error { + c.closed = true + return nil +} + +func TestCloseCodec(t *testing.T) { + codec := &shutdownCodec{responded: make(chan int)} + client := NewClientWithCodec(codec) + <-codec.responded + client.Close() + if !codec.closed { + t.Error("client.Close did not close codec") + } +} diff --git a/src/pkg/net/rpc/jsonrpc/all_test.go b/src/pkg/net/rpc/jsonrpc/all_test.go index 40d4b82d7..a433a365e 100644 --- a/src/pkg/net/rpc/jsonrpc/all_test.go +++ b/src/pkg/net/rpc/jsonrpc/all_test.go @@ -5,6 +5,7 @@ package jsonrpc import ( + "bytes" "encoding/json" "errors" "fmt" @@ -12,6 +13,7 @@ import ( "io/ioutil" "net" "net/rpc" + "strings" "testing" ) @@ -202,6 +204,39 @@ func TestMalformedOutput(t *testing.T) { } } +func TestServerErrorHasNullResult(t *testing.T) { + var out bytes.Buffer + sc := NewServerCodec(struct { + io.Reader + io.Writer + io.Closer + }{ + Reader: strings.NewReader(`{"method": "Arith.Add", "id": "123", "params": []}`), + Writer: &out, + Closer: ioutil.NopCloser(nil), + }) + r := new(rpc.Request) + if err := sc.ReadRequestHeader(r); err != nil { + t.Fatal(err) + } + const valueText = "the value we don't want to see" + const errorText = "some error" + err := sc.WriteResponse(&rpc.Response{ + ServiceMethod: "Method", + Seq: 1, + Error: errorText, + }, valueText) + if err != nil { + t.Fatal(err) + } + if !strings.Contains(out.String(), errorText) { + t.Fatalf("Response didn't contain expected error %q: %s", errorText, &out) + } + if strings.Contains(out.String(), valueText) { + t.Errorf("Response contains both an error and value: %s", &out) + } +} + func TestUnexpectedError(t *testing.T) { cli, srv := myPipe() go cli.PipeWriter.CloseWithError(errors.New("unexpected error!")) // reader will get this error diff --git a/src/pkg/net/rpc/jsonrpc/server.go b/src/pkg/net/rpc/jsonrpc/server.go index 16ec0fe9a..e6d37cfa6 100644 --- a/src/pkg/net/rpc/jsonrpc/server.go +++ b/src/pkg/net/rpc/jsonrpc/server.go @@ -100,7 +100,6 @@ func (c *serverCodec) ReadRequestBody(x interface{}) error { var null = json.RawMessage([]byte("null")) func (c *serverCodec) WriteResponse(r *rpc.Response, x interface{}) error { - var resp serverResponse c.mutex.Lock() b, ok := c.pending[r.Seq] if !ok { @@ -114,10 +113,9 @@ func (c *serverCodec) WriteResponse(r *rpc.Response, x interface{}) error { // Invalid request so no id. Use JSON null. b = &null } - resp.Id = b - resp.Result = x + resp := serverResponse{Id: b} if r.Error == "" { - resp.Error = nil + resp.Result = x } else { resp.Error = r.Error } diff --git a/src/pkg/net/rpc/server.go b/src/pkg/net/rpc/server.go index 7eb2dcf5a..6b264b46b 100644 --- a/src/pkg/net/rpc/server.go +++ b/src/pkg/net/rpc/server.go @@ -217,10 +217,11 @@ func isExportedOrBuiltinType(t reflect.Type) bool { // Register publishes in the server the set of methods of the // receiver value that satisfy the following conditions: // - exported method -// - two arguments, both pointers to exported structs +// - two arguments, both of exported type +// - the second argument is a pointer // - one return value, of type error // It returns an error if the receiver is not an exported type or has -// no methods or unsuitable methods. It also logs the error using package log. +// no suitable methods. It also logs the error using package log. // The client accesses each method using a string of the form "Type.Method", // where Type is the receiver's concrete type. func (server *Server) Register(rcvr interface{}) error { diff --git a/src/pkg/net/rpc/server_test.go b/src/pkg/net/rpc/server_test.go index 3b9a88380..0dc4ddc2d 100644 --- a/src/pkg/net/rpc/server_test.go +++ b/src/pkg/net/rpc/server_test.go @@ -594,7 +594,6 @@ func TestErrorAfterClientClose(t *testing.T) { } func benchmarkEndToEnd(dial func() (*Client, error), b *testing.B) { - b.StopTimer() once.Do(startServer) client, err := dial() if err != nil { @@ -604,33 +603,24 @@ func benchmarkEndToEnd(dial func() (*Client, error), b *testing.B) { // Synchronous calls args := &Args{7, 8} - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N) - var wg sync.WaitGroup - wg.Add(procs) - b.StartTimer() - - for p := 0; p < procs; p++ { - go func() { - reply := new(Reply) - for atomic.AddInt32(&N, -1) >= 0 { - err := client.Call("Arith.Add", args, reply) - if err != nil { - b.Fatalf("rpc error: Add: expected no error but got string %q", err.Error()) - } - if reply.C != args.A+args.B { - b.Fatalf("rpc error: Add: expected %d got %d", reply.C, args.A+args.B) - } + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + reply := new(Reply) + for pb.Next() { + err := client.Call("Arith.Add", args, reply) + if err != nil { + b.Fatalf("rpc error: Add: expected no error but got string %q", err.Error()) } - wg.Done() - }() - } - wg.Wait() + if reply.C != args.A+args.B { + b.Fatalf("rpc error: Add: expected %d got %d", reply.C, args.A+args.B) + } + } + }) } func benchmarkEndToEndAsync(dial func() (*Client, error), b *testing.B) { const MaxConcurrentCalls = 100 - b.StopTimer() once.Do(startServer) client, err := dial() if err != nil { @@ -647,7 +637,7 @@ func benchmarkEndToEndAsync(dial func() (*Client, error), b *testing.B) { wg.Add(procs) gate := make(chan bool, MaxConcurrentCalls) res := make(chan *Call, MaxConcurrentCalls) - b.StartTimer() + b.ResetTimer() for p := 0; p < procs; p++ { go func() { |