diff options
Diffstat (limited to 'src/pkg/http')
-rw-r--r-- | src/pkg/http/persist.go | 136 | ||||
-rw-r--r-- | src/pkg/http/response_test.go | 41 | ||||
-rw-r--r-- | src/pkg/http/serve_test.go | 68 | ||||
-rw-r--r-- | src/pkg/http/server.go | 48 | ||||
-rw-r--r-- | src/pkg/http/transfer.go | 27 | ||||
-rw-r--r-- | src/pkg/http/triv.go | 5 |
6 files changed, 277 insertions, 48 deletions
diff --git a/src/pkg/http/persist.go b/src/pkg/http/persist.go index 8bfc09755..000a4200e 100644 --- a/src/pkg/http/persist.go +++ b/src/pkg/http/persist.go @@ -6,14 +6,17 @@ package http import ( "bufio" - "container/list" "io" "net" + "net/textproto" "os" "sync" ) -var ErrPersistEOF = &ProtocolError{"persistent connection closed"} +var ( + ErrPersistEOF = &ProtocolError{"persistent connection closed"} + ErrPipeline = &ProtocolError{"pipeline error"} +) // A ServerConn reads requests and sends responses over an underlying // connection, until the HTTP keepalive logic commands an end. ServerConn @@ -26,8 +29,10 @@ type ServerConn struct { r *bufio.Reader clsd bool // indicates a graceful close re, we os.Error // read/write errors - lastBody io.ReadCloser + lastbody io.ReadCloser nread, nwritten int + pipe textproto.Pipeline + pipereq map[*Request]uint lk sync.Mutex // protected read/write to re,we } @@ -37,7 +42,7 @@ func NewServerConn(c net.Conn, r *bufio.Reader) *ServerConn { if r == nil { r = bufio.NewReader(c) } - return &ServerConn{c: c, r: r} + return &ServerConn{c: c, r: r, pipereq: make(map[*Request]uint)} } // Close detaches the ServerConn and returns the underlying connection as well @@ -57,10 +62,25 @@ func (sc *ServerConn) Close() (c net.Conn, r *bufio.Reader) { // Read returns the next request on the wire. An ErrPersistEOF is returned if // it is gracefully determined that there are no more requests (e.g. after the // first request on an HTTP/1.0 connection, or after a Connection:close on a -// HTTP/1.1 connection). Read can be called concurrently with Write, but not -// with another Read. +// HTTP/1.1 connection). func (sc *ServerConn) Read() (req *Request, err os.Error) { + // Ensure ordered execution of Reads and Writes + id := sc.pipe.Next() + sc.pipe.StartRequest(id) + defer func() { + sc.pipe.EndRequest(id) + if req == nil { + sc.pipe.StartResponse(id) + sc.pipe.EndResponse(id) + } else { + // Remember the pipeline id of this request + sc.lk.Lock() + sc.pipereq[req] = id + sc.lk.Unlock() + } + }() + sc.lk.Lock() if sc.we != nil { // no point receiving if write-side broken or closed defer sc.lk.Unlock() @@ -73,12 +93,12 @@ func (sc *ServerConn) Read() (req *Request, err os.Error) { sc.lk.Unlock() // Make sure body is fully consumed, even if user does not call body.Close - if sc.lastBody != nil { + if sc.lastbody != nil { // body.Close is assumed to be idempotent and multiple calls to // it should return the error that its first invokation // returned. - err = sc.lastBody.Close() - sc.lastBody = nil + err = sc.lastbody.Close() + sc.lastbody = nil if err != nil { sc.lk.Lock() defer sc.lk.Unlock() @@ -102,7 +122,7 @@ func (sc *ServerConn) Read() (req *Request, err os.Error) { return } } - sc.lastBody = req.Body + sc.lastbody = req.Body sc.nread++ if req.Close { sc.lk.Lock() @@ -121,11 +141,24 @@ func (sc *ServerConn) Pending() int { return sc.nread - sc.nwritten } -// Write writes a repsonse. To close the connection gracefully, set the +// Write writes resp in response to req. To close the connection gracefully, set the // Response.Close field to true. Write should be considered operational until // it returns an error, regardless of any errors returned on the Read side. -// Write can be called concurrently with Read, but not with another Write. -func (sc *ServerConn) Write(resp *Response) os.Error { +func (sc *ServerConn) Write(req *Request, resp *Response) os.Error { + + // Retrieve the pipeline ID of this request/response pair + sc.lk.Lock() + id, ok := sc.pipereq[req] + sc.pipereq[req] = 0, false + if !ok { + sc.lk.Unlock() + return ErrPipeline + } + sc.lk.Unlock() + + // Ensure pipeline order + sc.pipe.StartResponse(id) + defer sc.pipe.EndResponse(id) sc.lk.Lock() if sc.we != nil { @@ -166,10 +199,11 @@ type ClientConn struct { c net.Conn r *bufio.Reader re, we os.Error // read/write errors - lastBody io.ReadCloser + lastbody io.ReadCloser nread, nwritten int - reqm list.List // request methods in order of execution - lk sync.Mutex // protects read/write to reqm,re,we + pipe textproto.Pipeline + pipereq map[*Request]uint + lk sync.Mutex // protects read/write to re,we,pipereq,etc. } // NewClientConn returns a new ClientConn reading and writing c. If r is not @@ -178,7 +212,7 @@ func NewClientConn(c net.Conn, r *bufio.Reader) *ClientConn { if r == nil { r = bufio.NewReader(c) } - return &ClientConn{c: c, r: r} + return &ClientConn{c: c, r: r, pipereq: make(map[*Request]uint)} } // Close detaches the ClientConn and returns the underlying connection as well @@ -191,7 +225,6 @@ func (cc *ClientConn) Close() (c net.Conn, r *bufio.Reader) { r = cc.r cc.c = nil cc.r = nil - cc.reqm.Init() cc.lk.Unlock() return } @@ -201,8 +234,23 @@ func (cc *ClientConn) Close() (c net.Conn, r *bufio.Reader) { // keepalive connection is logically closed after this request and the opposing // server is informed. An ErrUnexpectedEOF indicates the remote closed the // underlying TCP connection, which is usually considered as graceful close. -// Write can be called concurrently with Read, but not with another Write. -func (cc *ClientConn) Write(req *Request) os.Error { +func (cc *ClientConn) Write(req *Request) (err os.Error) { + + // Ensure ordered execution of Writes + id := cc.pipe.Next() + cc.pipe.StartRequest(id) + defer func() { + cc.pipe.EndRequest(id) + if err != nil { + cc.pipe.StartResponse(id) + cc.pipe.EndResponse(id) + } else { + // Remember the pipeline id of this request + cc.lk.Lock() + cc.pipereq[req] = id + cc.lk.Unlock() + } + }() cc.lk.Lock() if cc.re != nil { // no point sending if read-side closed or broken @@ -223,7 +271,7 @@ func (cc *ClientConn) Write(req *Request) os.Error { cc.lk.Unlock() } - err := req.Write(cc.c) + err = req.Write(cc.c) if err != nil { cc.lk.Lock() defer cc.lk.Unlock() @@ -231,9 +279,6 @@ func (cc *ClientConn) Write(req *Request) os.Error { return err } cc.nwritten++ - cc.lk.Lock() - cc.reqm.PushBack(req.Method) - cc.lk.Unlock() return nil } @@ -250,7 +295,21 @@ func (cc *ClientConn) Pending() int { // returned together with an ErrPersistEOF, which means that the remote // requested that this be the last request serviced. Read can be called // concurrently with Write, but not with another Read. -func (cc *ClientConn) Read() (resp *Response, err os.Error) { +func (cc *ClientConn) Read(req *Request) (resp *Response, err os.Error) { + + // Retrieve the pipeline ID of this request/response pair + cc.lk.Lock() + id, ok := cc.pipereq[req] + cc.pipereq[req] = 0, false + if !ok { + cc.lk.Unlock() + return nil, ErrPipeline + } + cc.lk.Unlock() + + // Ensure pipeline order + cc.pipe.StartResponse(id) + defer cc.pipe.EndResponse(id) cc.lk.Lock() if cc.re != nil { @@ -259,17 +318,13 @@ func (cc *ClientConn) Read() (resp *Response, err os.Error) { } cc.lk.Unlock() - if cc.nread >= cc.nwritten { - return nil, os.NewError("persist client pipe count") - } - // Make sure body is fully consumed, even if user does not call body.Close - if cc.lastBody != nil { + if cc.lastbody != nil { // body.Close is assumed to be idempotent and multiple calls to // it should return the error that its first invokation // returned. - err = cc.lastBody.Close() - cc.lastBody = nil + err = cc.lastbody.Close() + cc.lastbody = nil if err != nil { cc.lk.Lock() defer cc.lk.Unlock() @@ -278,18 +333,14 @@ func (cc *ClientConn) Read() (resp *Response, err os.Error) { } } - cc.lk.Lock() - m := cc.reqm.Front() - cc.reqm.Remove(m) - cc.lk.Unlock() - resp, err = ReadResponse(cc.r, m.Value.(string)) + resp, err = ReadResponse(cc.r, req.Method) if err != nil { cc.lk.Lock() defer cc.lk.Unlock() cc.re = err return } - cc.lastBody = resp.Body + cc.lastbody = resp.Body cc.nread++ @@ -301,3 +352,12 @@ func (cc *ClientConn) Read() (resp *Response, err os.Error) { } return } + +// Do is convenience method that writes a request and reads a response. +func (cc *ClientConn) Do(req *Request) (resp *Response, err os.Error) { + err = cc.Write(req) + if err != nil { + return + } + return cc.Read(req) +} diff --git a/src/pkg/http/response_test.go b/src/pkg/http/response_test.go index 89a8c3b44..11bfdd08c 100644 --- a/src/pkg/http/response_test.go +++ b/src/pkg/http/response_test.go @@ -44,6 +44,47 @@ var respTests = []respTest{ "Body here\n", }, + // Unchunked HTTP/1.1 response without Content-Length or + // Connection headers. + { + "HTTP/1.1 200 OK\r\n" + + "\r\n" + + "Body here\n", + + Response{ + Status: "200 OK", + StatusCode: 200, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + RequestMethod: "GET", + Close: true, + ContentLength: -1, + }, + + "Body here\n", + }, + + // Unchunked HTTP/1.1 204 response without Content-Length. + { + "HTTP/1.1 204 No Content\r\n" + + "\r\n" + + "Body should not be read!\n", + + Response{ + Status: "204 No Content", + StatusCode: 204, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + RequestMethod: "GET", + Close: false, + ContentLength: 0, + }, + + "", + }, + // Unchunked response with Content-Length. { "HTTP/1.0 200 OK\r\n" + diff --git a/src/pkg/http/serve_test.go b/src/pkg/http/serve_test.go index 7da3fc6f3..5594d512a 100644 --- a/src/pkg/http/serve_test.go +++ b/src/pkg/http/serve_test.go @@ -9,10 +9,13 @@ package http import ( "bufio" "bytes" + "fmt" "io" + "io/ioutil" "os" "net" "testing" + "time" ) type dummyAddr string @@ -189,7 +192,7 @@ func TestHostHandlers(t *testing.T) { t.Errorf("writing request: %v", err) continue } - r, err := cc.Read() + r, err := cc.Read(&req) if err != nil { t.Errorf("reading response: %v", err) continue @@ -283,3 +286,66 @@ func TestMuxRedirectLeadingSlashes(t *testing.T) { } } } + +func TestServerTimeouts(t *testing.T) { + l, err := net.ListenTCP("tcp", &net.TCPAddr{Port: 0}) + if err != nil { + t.Fatalf("listen error: %v", err) + } + addr, _ := l.Addr().(*net.TCPAddr) + + reqNum := 0 + handler := HandlerFunc(func(res ResponseWriter, req *Request) { + reqNum++ + fmt.Fprintf(res, "req=%d", reqNum) + }) + + const second = 1000000000 /* nanos */ + server := &Server{Handler: handler, ReadTimeout: 0.25 * second, WriteTimeout: 0.25 * second} + go server.Serve(l) + + url := fmt.Sprintf("http://localhost:%d/", addr.Port) + + // Hit the HTTP server successfully. + r, _, err := Get(url) + if err != nil { + t.Fatalf("http Get #1: %v", err) + } + got, _ := ioutil.ReadAll(r.Body) + expected := "req=1" + if string(got) != expected { + t.Errorf("Unexpected response for request #1; got %q; expected %q", + string(got), expected) + } + + // Slow client that should timeout. + t1 := time.Nanoseconds() + conn, err := net.Dial("tcp", "", fmt.Sprintf("localhost:%d", addr.Port)) + if err != nil { + t.Fatalf("Dial: %v", err) + } + buf := make([]byte, 1) + n, err := conn.Read(buf) + latency := time.Nanoseconds() - t1 + if n != 0 || err != os.EOF { + t.Errorf("Read = %v, %v, wanted %v, %v", n, err, 0, os.EOF) + } + if latency < second*0.20 /* fudge from 0.25 above */ { + t.Errorf("got EOF after %d ns, want >= %d", latency, second*0.20) + } + + // Hit the HTTP server successfully again, verifying that the + // previous slow connection didn't run our handler. (that we + // get "req=2", not "req=3") + r, _, err = Get(url) + if err != nil { + t.Fatalf("http Get #2: %v", err) + } + got, _ = ioutil.ReadAll(r.Body) + expected = "req=2" + if string(got) != expected { + t.Errorf("Get #2 got %q, want %q", string(got), expected) + } + + l.Close() +} diff --git a/src/pkg/http/server.go b/src/pkg/http/server.go index 6672c494b..0be270ad3 100644 --- a/src/pkg/http/server.go +++ b/src/pkg/http/server.go @@ -670,6 +670,39 @@ func HandleFunc(pattern string, handler func(ResponseWriter, *Request)) { // read requests and then call handler to reply to them. // Handler is typically nil, in which case the DefaultServeMux is used. func Serve(l net.Listener, handler Handler) os.Error { + srv := &Server{Handler: handler} + return srv.Serve(l) +} + +// A Server defines parameters for running an HTTP server. +type Server struct { + Addr string // TCP address to listen on, ":http" if empty + Handler Handler // handler to invoke, http.DefaultServeMux if nil + ReadTimeout int64 // the net.Conn.SetReadTimeout value for new connections + WriteTimeout int64 // the net.Conn.SetWriteTimeout value for new connections +} + +// ListenAndServe listens on the TCP network address srv.Addr and then +// calls Serve to handle requests on incoming connections. If +// srv.Addr is blank, ":http" is used. +func (srv *Server) ListenAndServe() os.Error { + addr := srv.Addr + if addr == "" { + addr = ":http" + } + l, e := net.Listen("tcp", addr) + if e != nil { + return e + } + return srv.Serve(l) +} + +// Serve accepts incoming connections on the Listener l, creating a +// new service thread for each. The service threads read requests and +// then call srv.Handler to reply to them. +func (srv *Server) Serve(l net.Listener) os.Error { + defer l.Close() + handler := srv.Handler if handler == nil { handler = DefaultServeMux } @@ -678,6 +711,12 @@ func Serve(l net.Listener, handler Handler) os.Error { if e != nil { return e } + if srv.ReadTimeout != 0 { + rw.SetReadTimeout(srv.ReadTimeout) + } + if srv.WriteTimeout != 0 { + rw.SetWriteTimeout(srv.WriteTimeout) + } c, err := newConn(rw, handler) if err != nil { continue @@ -715,13 +754,8 @@ func Serve(l net.Listener, handler Handler) os.Error { // } // } func ListenAndServe(addr string, handler Handler) os.Error { - l, e := net.Listen("tcp", addr) - if e != nil { - return e - } - e = Serve(l, handler) - l.Close() - return e + server := &Server{Addr: addr, Handler: handler} + return server.ListenAndServe() } // ListenAndServeTLS acts identically to ListenAndServe, except that it diff --git a/src/pkg/http/transfer.go b/src/pkg/http/transfer.go index e62885d62..f80f0ac63 100644 --- a/src/pkg/http/transfer.go +++ b/src/pkg/http/transfer.go @@ -172,6 +172,20 @@ type transferReader struct { Trailer map[string]string } +// bodyAllowedForStatus returns whether a given response status code +// permits a body. See RFC2616, section 4.4. +func bodyAllowedForStatus(status int) bool { + switch { + case status >= 100 && status <= 199: + return false + case status == 204: + return false + case status == 304: + return false + } + return true +} + // msg is *Request or *Response. func readTransfer(msg interface{}, r *bufio.Reader) (err os.Error) { t := &transferReader{} @@ -217,6 +231,19 @@ func readTransfer(msg interface{}, r *bufio.Reader) (err os.Error) { return err } + // If there is no Content-Length or chunked Transfer-Encoding on a *Response + // and the status is not 1xx, 204 or 304, then the body is unbounded. + // See RFC2616, section 4.4. + switch msg.(type) { + case *Response: + if t.ContentLength == -1 && + !chunked(t.TransferEncoding) && + bodyAllowedForStatus(t.StatusCode) { + // Unbounded body. + t.Close = true + } + } + // Prepare body reader. ContentLength < 0 means chunked encoding // or close connection when finished, since multipart is not supported yet switch { diff --git a/src/pkg/http/triv.go b/src/pkg/http/triv.go index 03cfafa7b..52d521d3d 100644 --- a/src/pkg/http/triv.go +++ b/src/pkg/http/triv.go @@ -99,15 +99,16 @@ func DateServer(rw http.ResponseWriter, req *http.Request) { fmt.Fprintf(rw, "pipe: %s\n", err) return } - pid, err := os.ForkExec("/bin/date", []string{"date"}, os.Environ(), "", []*os.File{nil, w, w}) + p, err := os.StartProcess("/bin/date", []string{"date"}, os.Environ(), "", []*os.File{nil, w, w}) defer r.Close() w.Close() if err != nil { fmt.Fprintf(rw, "fork/exec: %s\n", err) return } + defer p.Release() io.Copy(rw, r) - wait, err := os.Wait(pid, 0) + wait, err := p.Wait(0) if err != nil { fmt.Fprintf(rw, "wait: %s\n", err) return |