diff options
Diffstat (limited to 'src/pkg/rpc/server.go')
-rw-r--r-- | src/pkg/rpc/server.go | 139 |
1 files changed, 91 insertions, 48 deletions
diff --git a/src/pkg/rpc/server.go b/src/pkg/rpc/server.go index f185cd16e..af31a65cc 100644 --- a/src/pkg/rpc/server.go +++ b/src/pkg/rpc/server.go @@ -110,6 +110,7 @@ package rpc import ( + "bufio" "gob" "http" "log" @@ -132,13 +133,13 @@ const ( // Precompute the reflect type for os.Error. Can't use os.Error directly // because Typeof takes an empty interface value. This is annoying. var unusedError *os.Error -var typeOfOsError = reflect.Typeof(unusedError).(*reflect.PtrType).Elem() +var typeOfOsError = reflect.Typeof(unusedError).Elem() type methodType struct { sync.Mutex // protects counters method reflect.Method - ArgType *reflect.PtrType - ReplyType *reflect.PtrType + ArgType reflect.Type + ReplyType reflect.Type numCalls uint } @@ -153,29 +154,29 @@ type service struct { // but documented here as an aid to debugging, such as when analyzing // network traffic. type Request struct { - ServiceMethod string // format: "Service.Method" - Seq uint64 // sequence number chosen by client + ServiceMethod string // format: "Service.Method" + Seq uint64 // sequence number chosen by client + next *Request // for free list in Server } // Response is a header written before every RPC return. It is used internally // but documented here as an aid to debugging, such as when analyzing // network traffic. type Response struct { - ServiceMethod string // echoes that of the Request - Seq uint64 // echoes that of the request - Error string // error, if any. -} - -// ClientInfo records information about an RPC client connection. -type ClientInfo struct { - LocalAddr string - RemoteAddr string + ServiceMethod string // echoes that of the Request + Seq uint64 // echoes that of the request + Error string // error, if any. + next *Response // for free list in Server } // Server represents an RPC Server. type Server struct { sync.Mutex // protects the serviceMap serviceMap map[string]*service + reqLock sync.Mutex // protects freeReq + freeReq *Request + respLock sync.Mutex // protects freeResp + freeResp *Response } // NewServer returns a new Server. @@ -251,13 +252,14 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E log.Println("method", mname, "has wrong number of ins:", mtype.NumIn()) continue } - argType, ok := mtype.In(1).(*reflect.PtrType) + argType := mtype.In(1) + ok := argType.Kind() == reflect.Ptr if !ok { log.Println(mname, "arg type not a pointer:", mtype.In(1)) continue } - replyType, ok := mtype.In(2).(*reflect.PtrType) - if !ok { + replyType := mtype.In(2) + if replyType.Kind() != reflect.Ptr { log.Println(mname, "reply type not a pointer:", mtype.In(2)) continue } @@ -269,13 +271,6 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E log.Println(mname, "reply type not exported:", replyType) continue } - if mtype.NumIn() == 4 { - t := mtype.In(3) - if t != reflect.Typeof((*ClientInfo)(nil)) { - log.Println(mname, "last argument not *ClientInfo") - continue - } - } // Method needs one out: os.Error. if mtype.NumOut() != 1 { log.Println("method", mname, "has wrong number of outs:", mtype.NumOut()) @@ -298,20 +293,18 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E } // A value sent as a placeholder for the response when the server receives an invalid request. -type InvalidRequest struct { - Marker int -} +type InvalidRequest struct{} var invalidRequest = InvalidRequest{} -func _new(t *reflect.PtrType) *reflect.PtrValue { - v := reflect.MakeZero(t).(*reflect.PtrValue) - v.PointTo(reflect.MakeZero(t.Elem())) +func _new(t reflect.Type) reflect.Value { + v := reflect.Zero(t) + v.Set(reflect.Zero(t.Elem()).Addr()) return v } -func sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) { - resp := new(Response) +func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) { + resp := server.getResponse() // Encode the response header resp.ServiceMethod = req.ServiceMethod if errmsg != "" { @@ -325,6 +318,7 @@ func sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec Se log.Println("rpc: writing response:", err) } sending.Unlock() + server.freeResponse(resp) } func (m *methodType) NumCalls() (n uint) { @@ -334,7 +328,7 @@ func (m *methodType) NumCalls() (n uint) { return n } -func (s *service) call(sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) { +func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) { mtype.Lock() mtype.numCalls++ mtype.Unlock() @@ -347,13 +341,15 @@ func (s *service) call(sending *sync.Mutex, mtype *methodType, req *Request, arg if errInter != nil { errmsg = errInter.(os.Error).String() } - sendResponse(sending, req, replyv.Interface(), codec, errmsg) + server.sendResponse(sending, req, replyv.Interface(), codec, errmsg) + server.freeRequest(req) } type gobServerCodec struct { - rwc io.ReadWriteCloser - dec *gob.Decoder - enc *gob.Encoder + rwc io.ReadWriteCloser + dec *gob.Decoder + enc *gob.Encoder + encBuf *bufio.Writer } func (c *gobServerCodec) ReadRequestHeader(r *Request) os.Error { @@ -364,11 +360,14 @@ func (c *gobServerCodec) ReadRequestBody(body interface{}) os.Error { return c.dec.Decode(body) } -func (c *gobServerCodec) WriteResponse(r *Response, body interface{}) os.Error { - if err := c.enc.Encode(r); err != nil { - return err +func (c *gobServerCodec) WriteResponse(r *Response, body interface{}) (err os.Error) { + if err = c.enc.Encode(r); err != nil { + return } - return c.enc.Encode(body) + if err = c.enc.Encode(body); err != nil { + return + } + return c.encBuf.Flush() } func (c *gobServerCodec) Close() os.Error { @@ -382,7 +381,9 @@ func (c *gobServerCodec) Close() os.Error { // ServeConn uses the gob wire format (see package gob) on the // connection. To use an alternate codec, use ServeCodec. func (server *Server) ServeConn(conn io.ReadWriteCloser) { - server.ServeCodec(&gobServerCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(conn)}) + buf := bufio.NewWriter(conn) + srv := &gobServerCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(buf), buf} + server.ServeCodec(srv) } // ServeCodec is like ServeConn but uses the specified codec to @@ -403,7 +404,8 @@ func (server *Server) ServeCodec(codec ServerCodec) { // send a response if we actually managed to read a header. if req != nil { - sendResponse(sending, req, invalidRequest, codec, err.String()) + server.sendResponse(sending, req, invalidRequest, codec, err.String()) + server.freeRequest(req) } continue } @@ -419,16 +421,57 @@ func (server *Server) ServeCodec(codec ServerCodec) { } break } - sendResponse(sending, req, replyv.Interface(), codec, err.String()) + server.sendResponse(sending, req, replyv.Interface(), codec, err.String()) continue } - go service.call(sending, mtype, req, argv, replyv, codec) + go service.call(server, sending, mtype, req, argv, replyv, codec) } codec.Close() } + +func (server *Server) getRequest() *Request { + server.reqLock.Lock() + req := server.freeReq + if req == nil { + req = new(Request) + } else { + server.freeReq = req.next + *req = Request{} + } + server.reqLock.Unlock() + return req +} + +func (server *Server) freeRequest(req *Request) { + server.reqLock.Lock() + req.next = server.freeReq + server.freeReq = req + server.reqLock.Unlock() +} + +func (server *Server) getResponse() *Response { + server.respLock.Lock() + resp := server.freeResp + if resp == nil { + resp = new(Response) + } else { + server.freeResp = resp.next + *resp = Response{} + } + server.respLock.Unlock() + return resp +} + +func (server *Server) freeResponse(resp *Response) { + server.respLock.Lock() + resp.next = server.freeResp + server.freeResp = resp + server.respLock.Unlock() +} + func (server *Server) readRequest(codec ServerCodec) (req *Request, service *service, mtype *methodType, err os.Error) { // Grab the request header. - req = new(Request) + req = server.getRequest() err = codec.ReadRequestHeader(req) if err != nil { req = nil @@ -522,14 +565,14 @@ var connected = "200 Connected to Go RPC" // ServeHTTP implements an http.Handler that answers RPC requests. func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { if req.Method != "CONNECT" { - w.SetHeader("Content-Type", "text/plain; charset=utf-8") + w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.WriteHeader(http.StatusMethodNotAllowed) io.WriteString(w, "405 must CONNECT\n") return } conn, _, err := w.(http.Hijacker).Hijack() if err != nil { - log.Print("rpc hijacking ", w.RemoteAddr(), ": ", err.String()) + log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.String()) return } io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n") |