summaryrefslogtreecommitdiff
path: root/src/pkg/rpc/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/pkg/rpc/server.go')
-rw-r--r--src/pkg/rpc/server.go139
1 files changed, 91 insertions, 48 deletions
diff --git a/src/pkg/rpc/server.go b/src/pkg/rpc/server.go
index f185cd16e..af31a65cc 100644
--- a/src/pkg/rpc/server.go
+++ b/src/pkg/rpc/server.go
@@ -110,6 +110,7 @@
package rpc
import (
+ "bufio"
"gob"
"http"
"log"
@@ -132,13 +133,13 @@ const (
// Precompute the reflect type for os.Error. Can't use os.Error directly
// because Typeof takes an empty interface value. This is annoying.
var unusedError *os.Error
-var typeOfOsError = reflect.Typeof(unusedError).(*reflect.PtrType).Elem()
+var typeOfOsError = reflect.Typeof(unusedError).Elem()
type methodType struct {
sync.Mutex // protects counters
method reflect.Method
- ArgType *reflect.PtrType
- ReplyType *reflect.PtrType
+ ArgType reflect.Type
+ ReplyType reflect.Type
numCalls uint
}
@@ -153,29 +154,29 @@ type service struct {
// but documented here as an aid to debugging, such as when analyzing
// network traffic.
type Request struct {
- ServiceMethod string // format: "Service.Method"
- Seq uint64 // sequence number chosen by client
+ ServiceMethod string // format: "Service.Method"
+ Seq uint64 // sequence number chosen by client
+ next *Request // for free list in Server
}
// Response is a header written before every RPC return. It is used internally
// but documented here as an aid to debugging, such as when analyzing
// network traffic.
type Response struct {
- ServiceMethod string // echoes that of the Request
- Seq uint64 // echoes that of the request
- Error string // error, if any.
-}
-
-// ClientInfo records information about an RPC client connection.
-type ClientInfo struct {
- LocalAddr string
- RemoteAddr string
+ ServiceMethod string // echoes that of the Request
+ Seq uint64 // echoes that of the request
+ Error string // error, if any.
+ next *Response // for free list in Server
}
// Server represents an RPC Server.
type Server struct {
sync.Mutex // protects the serviceMap
serviceMap map[string]*service
+ reqLock sync.Mutex // protects freeReq
+ freeReq *Request
+ respLock sync.Mutex // protects freeResp
+ freeResp *Response
}
// NewServer returns a new Server.
@@ -251,13 +252,14 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E
log.Println("method", mname, "has wrong number of ins:", mtype.NumIn())
continue
}
- argType, ok := mtype.In(1).(*reflect.PtrType)
+ argType := mtype.In(1)
+ ok := argType.Kind() == reflect.Ptr
if !ok {
log.Println(mname, "arg type not a pointer:", mtype.In(1))
continue
}
- replyType, ok := mtype.In(2).(*reflect.PtrType)
- if !ok {
+ replyType := mtype.In(2)
+ if replyType.Kind() != reflect.Ptr {
log.Println(mname, "reply type not a pointer:", mtype.In(2))
continue
}
@@ -269,13 +271,6 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E
log.Println(mname, "reply type not exported:", replyType)
continue
}
- if mtype.NumIn() == 4 {
- t := mtype.In(3)
- if t != reflect.Typeof((*ClientInfo)(nil)) {
- log.Println(mname, "last argument not *ClientInfo")
- continue
- }
- }
// Method needs one out: os.Error.
if mtype.NumOut() != 1 {
log.Println("method", mname, "has wrong number of outs:", mtype.NumOut())
@@ -298,20 +293,18 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E
}
// A value sent as a placeholder for the response when the server receives an invalid request.
-type InvalidRequest struct {
- Marker int
-}
+type InvalidRequest struct{}
var invalidRequest = InvalidRequest{}
-func _new(t *reflect.PtrType) *reflect.PtrValue {
- v := reflect.MakeZero(t).(*reflect.PtrValue)
- v.PointTo(reflect.MakeZero(t.Elem()))
+func _new(t reflect.Type) reflect.Value {
+ v := reflect.Zero(t)
+ v.Set(reflect.Zero(t.Elem()).Addr())
return v
}
-func sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
- resp := new(Response)
+func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
+ resp := server.getResponse()
// Encode the response header
resp.ServiceMethod = req.ServiceMethod
if errmsg != "" {
@@ -325,6 +318,7 @@ func sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec Se
log.Println("rpc: writing response:", err)
}
sending.Unlock()
+ server.freeResponse(resp)
}
func (m *methodType) NumCalls() (n uint) {
@@ -334,7 +328,7 @@ func (m *methodType) NumCalls() (n uint) {
return n
}
-func (s *service) call(sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
+func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
mtype.Lock()
mtype.numCalls++
mtype.Unlock()
@@ -347,13 +341,15 @@ func (s *service) call(sending *sync.Mutex, mtype *methodType, req *Request, arg
if errInter != nil {
errmsg = errInter.(os.Error).String()
}
- sendResponse(sending, req, replyv.Interface(), codec, errmsg)
+ server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
+ server.freeRequest(req)
}
type gobServerCodec struct {
- rwc io.ReadWriteCloser
- dec *gob.Decoder
- enc *gob.Encoder
+ rwc io.ReadWriteCloser
+ dec *gob.Decoder
+ enc *gob.Encoder
+ encBuf *bufio.Writer
}
func (c *gobServerCodec) ReadRequestHeader(r *Request) os.Error {
@@ -364,11 +360,14 @@ func (c *gobServerCodec) ReadRequestBody(body interface{}) os.Error {
return c.dec.Decode(body)
}
-func (c *gobServerCodec) WriteResponse(r *Response, body interface{}) os.Error {
- if err := c.enc.Encode(r); err != nil {
- return err
+func (c *gobServerCodec) WriteResponse(r *Response, body interface{}) (err os.Error) {
+ if err = c.enc.Encode(r); err != nil {
+ return
}
- return c.enc.Encode(body)
+ if err = c.enc.Encode(body); err != nil {
+ return
+ }
+ return c.encBuf.Flush()
}
func (c *gobServerCodec) Close() os.Error {
@@ -382,7 +381,9 @@ func (c *gobServerCodec) Close() os.Error {
// ServeConn uses the gob wire format (see package gob) on the
// connection. To use an alternate codec, use ServeCodec.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
- server.ServeCodec(&gobServerCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(conn)})
+ buf := bufio.NewWriter(conn)
+ srv := &gobServerCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(buf), buf}
+ server.ServeCodec(srv)
}
// ServeCodec is like ServeConn but uses the specified codec to
@@ -403,7 +404,8 @@ func (server *Server) ServeCodec(codec ServerCodec) {
// send a response if we actually managed to read a header.
if req != nil {
- sendResponse(sending, req, invalidRequest, codec, err.String())
+ server.sendResponse(sending, req, invalidRequest, codec, err.String())
+ server.freeRequest(req)
}
continue
}
@@ -419,16 +421,57 @@ func (server *Server) ServeCodec(codec ServerCodec) {
}
break
}
- sendResponse(sending, req, replyv.Interface(), codec, err.String())
+ server.sendResponse(sending, req, replyv.Interface(), codec, err.String())
continue
}
- go service.call(sending, mtype, req, argv, replyv, codec)
+ go service.call(server, sending, mtype, req, argv, replyv, codec)
}
codec.Close()
}
+
+func (server *Server) getRequest() *Request {
+ server.reqLock.Lock()
+ req := server.freeReq
+ if req == nil {
+ req = new(Request)
+ } else {
+ server.freeReq = req.next
+ *req = Request{}
+ }
+ server.reqLock.Unlock()
+ return req
+}
+
+func (server *Server) freeRequest(req *Request) {
+ server.reqLock.Lock()
+ req.next = server.freeReq
+ server.freeReq = req
+ server.reqLock.Unlock()
+}
+
+func (server *Server) getResponse() *Response {
+ server.respLock.Lock()
+ resp := server.freeResp
+ if resp == nil {
+ resp = new(Response)
+ } else {
+ server.freeResp = resp.next
+ *resp = Response{}
+ }
+ server.respLock.Unlock()
+ return resp
+}
+
+func (server *Server) freeResponse(resp *Response) {
+ server.respLock.Lock()
+ resp.next = server.freeResp
+ server.freeResp = resp
+ server.respLock.Unlock()
+}
+
func (server *Server) readRequest(codec ServerCodec) (req *Request, service *service, mtype *methodType, err os.Error) {
// Grab the request header.
- req = new(Request)
+ req = server.getRequest()
err = codec.ReadRequestHeader(req)
if err != nil {
req = nil
@@ -522,14 +565,14 @@ var connected = "200 Connected to Go RPC"
// ServeHTTP implements an http.Handler that answers RPC requests.
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "CONNECT" {
- w.SetHeader("Content-Type", "text/plain; charset=utf-8")
+ w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusMethodNotAllowed)
io.WriteString(w, "405 must CONNECT\n")
return
}
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
- log.Print("rpc hijacking ", w.RemoteAddr(), ": ", err.String())
+ log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.String())
return
}
io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")