summaryrefslogtreecommitdiff
path: root/src/pkg/net/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/pkg/net/rpc')
-rw-r--r--src/pkg/net/rpc/client.go14
-rw-r--r--src/pkg/net/rpc/client_test.go36
-rw-r--r--src/pkg/net/rpc/jsonrpc/all_test.go35
-rw-r--r--src/pkg/net/rpc/jsonrpc/server.go6
-rw-r--r--src/pkg/net/rpc/server.go5
-rw-r--r--src/pkg/net/rpc/server_test.go38
6 files changed, 98 insertions, 36 deletions
diff --git a/src/pkg/net/rpc/client.go b/src/pkg/net/rpc/client.go
index c524d0a0a..21f79b068 100644
--- a/src/pkg/net/rpc/client.go
+++ b/src/pkg/net/rpc/client.go
@@ -39,14 +39,16 @@ type Call struct {
// with a single Client, and a Client may be used by
// multiple goroutines simultaneously.
type Client struct {
- mutex sync.Mutex // protects pending, seq, request
- sending sync.Mutex
+ codec ClientCodec
+
+ sending sync.Mutex
+
+ mutex sync.Mutex // protects following
request Request
seq uint64
- codec ClientCodec
pending map[uint64]*Call
- closing bool
- shutdown bool
+ closing bool // user has called Close
+ shutdown bool // server has told us to stop
}
// A ClientCodec implements writing of RPC requests and
@@ -274,7 +276,7 @@ func Dial(network, address string) (*Client, error) {
func (client *Client) Close() error {
client.mutex.Lock()
- if client.shutdown || client.closing {
+ if client.closing {
client.mutex.Unlock()
return ErrShutdown
}
diff --git a/src/pkg/net/rpc/client_test.go b/src/pkg/net/rpc/client_test.go
new file mode 100644
index 000000000..bbfc1ec3a
--- /dev/null
+++ b/src/pkg/net/rpc/client_test.go
@@ -0,0 +1,36 @@
+// Copyright 2014 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package rpc
+
+import (
+ "errors"
+ "testing"
+)
+
+type shutdownCodec struct {
+ responded chan int
+ closed bool
+}
+
+func (c *shutdownCodec) WriteRequest(*Request, interface{}) error { return nil }
+func (c *shutdownCodec) ReadResponseBody(interface{}) error { return nil }
+func (c *shutdownCodec) ReadResponseHeader(*Response) error {
+ c.responded <- 1
+ return errors.New("shutdownCodec ReadResponseHeader")
+}
+func (c *shutdownCodec) Close() error {
+ c.closed = true
+ return nil
+}
+
+func TestCloseCodec(t *testing.T) {
+ codec := &shutdownCodec{responded: make(chan int)}
+ client := NewClientWithCodec(codec)
+ <-codec.responded
+ client.Close()
+ if !codec.closed {
+ t.Error("client.Close did not close codec")
+ }
+}
diff --git a/src/pkg/net/rpc/jsonrpc/all_test.go b/src/pkg/net/rpc/jsonrpc/all_test.go
index 40d4b82d7..a433a365e 100644
--- a/src/pkg/net/rpc/jsonrpc/all_test.go
+++ b/src/pkg/net/rpc/jsonrpc/all_test.go
@@ -5,6 +5,7 @@
package jsonrpc
import (
+ "bytes"
"encoding/json"
"errors"
"fmt"
@@ -12,6 +13,7 @@ import (
"io/ioutil"
"net"
"net/rpc"
+ "strings"
"testing"
)
@@ -202,6 +204,39 @@ func TestMalformedOutput(t *testing.T) {
}
}
+func TestServerErrorHasNullResult(t *testing.T) {
+ var out bytes.Buffer
+ sc := NewServerCodec(struct {
+ io.Reader
+ io.Writer
+ io.Closer
+ }{
+ Reader: strings.NewReader(`{"method": "Arith.Add", "id": "123", "params": []}`),
+ Writer: &out,
+ Closer: ioutil.NopCloser(nil),
+ })
+ r := new(rpc.Request)
+ if err := sc.ReadRequestHeader(r); err != nil {
+ t.Fatal(err)
+ }
+ const valueText = "the value we don't want to see"
+ const errorText = "some error"
+ err := sc.WriteResponse(&rpc.Response{
+ ServiceMethod: "Method",
+ Seq: 1,
+ Error: errorText,
+ }, valueText)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !strings.Contains(out.String(), errorText) {
+ t.Fatalf("Response didn't contain expected error %q: %s", errorText, &out)
+ }
+ if strings.Contains(out.String(), valueText) {
+ t.Errorf("Response contains both an error and value: %s", &out)
+ }
+}
+
func TestUnexpectedError(t *testing.T) {
cli, srv := myPipe()
go cli.PipeWriter.CloseWithError(errors.New("unexpected error!")) // reader will get this error
diff --git a/src/pkg/net/rpc/jsonrpc/server.go b/src/pkg/net/rpc/jsonrpc/server.go
index 16ec0fe9a..e6d37cfa6 100644
--- a/src/pkg/net/rpc/jsonrpc/server.go
+++ b/src/pkg/net/rpc/jsonrpc/server.go
@@ -100,7 +100,6 @@ func (c *serverCodec) ReadRequestBody(x interface{}) error {
var null = json.RawMessage([]byte("null"))
func (c *serverCodec) WriteResponse(r *rpc.Response, x interface{}) error {
- var resp serverResponse
c.mutex.Lock()
b, ok := c.pending[r.Seq]
if !ok {
@@ -114,10 +113,9 @@ func (c *serverCodec) WriteResponse(r *rpc.Response, x interface{}) error {
// Invalid request so no id. Use JSON null.
b = &null
}
- resp.Id = b
- resp.Result = x
+ resp := serverResponse{Id: b}
if r.Error == "" {
- resp.Error = nil
+ resp.Result = x
} else {
resp.Error = r.Error
}
diff --git a/src/pkg/net/rpc/server.go b/src/pkg/net/rpc/server.go
index 7eb2dcf5a..6b264b46b 100644
--- a/src/pkg/net/rpc/server.go
+++ b/src/pkg/net/rpc/server.go
@@ -217,10 +217,11 @@ func isExportedOrBuiltinType(t reflect.Type) bool {
// Register publishes in the server the set of methods of the
// receiver value that satisfy the following conditions:
// - exported method
-// - two arguments, both pointers to exported structs
+// - two arguments, both of exported type
+// - the second argument is a pointer
// - one return value, of type error
// It returns an error if the receiver is not an exported type or has
-// no methods or unsuitable methods. It also logs the error using package log.
+// no suitable methods. It also logs the error using package log.
// The client accesses each method using a string of the form "Type.Method",
// where Type is the receiver's concrete type.
func (server *Server) Register(rcvr interface{}) error {
diff --git a/src/pkg/net/rpc/server_test.go b/src/pkg/net/rpc/server_test.go
index 3b9a88380..0dc4ddc2d 100644
--- a/src/pkg/net/rpc/server_test.go
+++ b/src/pkg/net/rpc/server_test.go
@@ -594,7 +594,6 @@ func TestErrorAfterClientClose(t *testing.T) {
}
func benchmarkEndToEnd(dial func() (*Client, error), b *testing.B) {
- b.StopTimer()
once.Do(startServer)
client, err := dial()
if err != nil {
@@ -604,33 +603,24 @@ func benchmarkEndToEnd(dial func() (*Client, error), b *testing.B) {
// Synchronous calls
args := &Args{7, 8}
- procs := runtime.GOMAXPROCS(-1)
- N := int32(b.N)
- var wg sync.WaitGroup
- wg.Add(procs)
- b.StartTimer()
-
- for p := 0; p < procs; p++ {
- go func() {
- reply := new(Reply)
- for atomic.AddInt32(&N, -1) >= 0 {
- err := client.Call("Arith.Add", args, reply)
- if err != nil {
- b.Fatalf("rpc error: Add: expected no error but got string %q", err.Error())
- }
- if reply.C != args.A+args.B {
- b.Fatalf("rpc error: Add: expected %d got %d", reply.C, args.A+args.B)
- }
+ b.ResetTimer()
+
+ b.RunParallel(func(pb *testing.PB) {
+ reply := new(Reply)
+ for pb.Next() {
+ err := client.Call("Arith.Add", args, reply)
+ if err != nil {
+ b.Fatalf("rpc error: Add: expected no error but got string %q", err.Error())
}
- wg.Done()
- }()
- }
- wg.Wait()
+ if reply.C != args.A+args.B {
+ b.Fatalf("rpc error: Add: expected %d got %d", reply.C, args.A+args.B)
+ }
+ }
+ })
}
func benchmarkEndToEndAsync(dial func() (*Client, error), b *testing.B) {
const MaxConcurrentCalls = 100
- b.StopTimer()
once.Do(startServer)
client, err := dial()
if err != nil {
@@ -647,7 +637,7 @@ func benchmarkEndToEndAsync(dial func() (*Client, error), b *testing.B) {
wg.Add(procs)
gate := make(chan bool, MaxConcurrentCalls)
res := make(chan *Call, MaxConcurrentCalls)
- b.StartTimer()
+ b.ResetTimer()
for p := 0; p < procs; p++ {
go func() {