diff options
Diffstat (limited to 'src/pkg/http/persist.go')
| -rw-r--r-- | src/pkg/http/persist.go | 136 | 
1 files changed, 98 insertions, 38 deletions
| diff --git a/src/pkg/http/persist.go b/src/pkg/http/persist.go index 8bfc09755..000a4200e 100644 --- a/src/pkg/http/persist.go +++ b/src/pkg/http/persist.go @@ -6,14 +6,17 @@ package http  import (  	"bufio" -	"container/list"  	"io"  	"net" +	"net/textproto"  	"os"  	"sync"  ) -var ErrPersistEOF = &ProtocolError{"persistent connection closed"} +var ( +	ErrPersistEOF = &ProtocolError{"persistent connection closed"} +	ErrPipeline   = &ProtocolError{"pipeline error"} +)  // A ServerConn reads requests and sends responses over an underlying  // connection, until the HTTP keepalive logic commands an end. ServerConn @@ -26,8 +29,10 @@ type ServerConn struct {  	r               *bufio.Reader  	clsd            bool     // indicates a graceful close  	re, we          os.Error // read/write errors -	lastBody        io.ReadCloser +	lastbody        io.ReadCloser  	nread, nwritten int +	pipe            textproto.Pipeline +	pipereq         map[*Request]uint  	lk              sync.Mutex // protected read/write to re,we  } @@ -37,7 +42,7 @@ func NewServerConn(c net.Conn, r *bufio.Reader) *ServerConn {  	if r == nil {  		r = bufio.NewReader(c)  	} -	return &ServerConn{c: c, r: r} +	return &ServerConn{c: c, r: r, pipereq: make(map[*Request]uint)}  }  // Close detaches the ServerConn and returns the underlying connection as well @@ -57,10 +62,25 @@ func (sc *ServerConn) Close() (c net.Conn, r *bufio.Reader) {  // Read returns the next request on the wire. An ErrPersistEOF is returned if  // it is gracefully determined that there are no more requests (e.g. after the  // first request on an HTTP/1.0 connection, or after a Connection:close on a -// HTTP/1.1 connection). Read can be called concurrently with Write, but not -// with another Read. +// HTTP/1.1 connection).  func (sc *ServerConn) Read() (req *Request, err os.Error) { +	// Ensure ordered execution of Reads and Writes +	id := sc.pipe.Next() +	sc.pipe.StartRequest(id) +	defer func() { +		sc.pipe.EndRequest(id) +		if req == nil { +			sc.pipe.StartResponse(id) +			sc.pipe.EndResponse(id) +		} else { +			// Remember the pipeline id of this request +			sc.lk.Lock() +			sc.pipereq[req] = id +			sc.lk.Unlock() +		} +	}() +  	sc.lk.Lock()  	if sc.we != nil { // no point receiving if write-side broken or closed  		defer sc.lk.Unlock() @@ -73,12 +93,12 @@ func (sc *ServerConn) Read() (req *Request, err os.Error) {  	sc.lk.Unlock()  	// Make sure body is fully consumed, even if user does not call body.Close -	if sc.lastBody != nil { +	if sc.lastbody != nil {  		// body.Close is assumed to be idempotent and multiple calls to  		// it should return the error that its first invokation  		// returned. -		err = sc.lastBody.Close() -		sc.lastBody = nil +		err = sc.lastbody.Close() +		sc.lastbody = nil  		if err != nil {  			sc.lk.Lock()  			defer sc.lk.Unlock() @@ -102,7 +122,7 @@ func (sc *ServerConn) Read() (req *Request, err os.Error) {  			return  		}  	} -	sc.lastBody = req.Body +	sc.lastbody = req.Body  	sc.nread++  	if req.Close {  		sc.lk.Lock() @@ -121,11 +141,24 @@ func (sc *ServerConn) Pending() int {  	return sc.nread - sc.nwritten  } -// Write writes a repsonse. To close the connection gracefully, set the +// Write writes resp in response to req. To close the connection gracefully, set the  // Response.Close field to true. Write should be considered operational until  // it returns an error, regardless of any errors returned on the Read side. -// Write can be called concurrently with Read, but not with another Write. -func (sc *ServerConn) Write(resp *Response) os.Error { +func (sc *ServerConn) Write(req *Request, resp *Response) os.Error { + +	// Retrieve the pipeline ID of this request/response pair +	sc.lk.Lock() +	id, ok := sc.pipereq[req] +	sc.pipereq[req] = 0, false +	if !ok { +		sc.lk.Unlock() +		return ErrPipeline +	} +	sc.lk.Unlock() + +	// Ensure pipeline order +	sc.pipe.StartResponse(id) +	defer sc.pipe.EndResponse(id)  	sc.lk.Lock()  	if sc.we != nil { @@ -166,10 +199,11 @@ type ClientConn struct {  	c               net.Conn  	r               *bufio.Reader  	re, we          os.Error // read/write errors -	lastBody        io.ReadCloser +	lastbody        io.ReadCloser  	nread, nwritten int -	reqm            list.List  // request methods in order of execution -	lk              sync.Mutex // protects read/write to reqm,re,we +	pipe            textproto.Pipeline +	pipereq         map[*Request]uint +	lk              sync.Mutex // protects read/write to re,we,pipereq,etc.  }  // NewClientConn returns a new ClientConn reading and writing c.  If r is not @@ -178,7 +212,7 @@ func NewClientConn(c net.Conn, r *bufio.Reader) *ClientConn {  	if r == nil {  		r = bufio.NewReader(c)  	} -	return &ClientConn{c: c, r: r} +	return &ClientConn{c: c, r: r, pipereq: make(map[*Request]uint)}  }  // Close detaches the ClientConn and returns the underlying connection as well @@ -191,7 +225,6 @@ func (cc *ClientConn) Close() (c net.Conn, r *bufio.Reader) {  	r = cc.r  	cc.c = nil  	cc.r = nil -	cc.reqm.Init()  	cc.lk.Unlock()  	return  } @@ -201,8 +234,23 @@ func (cc *ClientConn) Close() (c net.Conn, r *bufio.Reader) {  // keepalive connection is logically closed after this request and the opposing  // server is informed. An ErrUnexpectedEOF indicates the remote closed the  // underlying TCP connection, which is usually considered as graceful close. -// Write can be called concurrently with Read, but not with another Write. -func (cc *ClientConn) Write(req *Request) os.Error { +func (cc *ClientConn) Write(req *Request) (err os.Error) { + +	// Ensure ordered execution of Writes +	id := cc.pipe.Next() +	cc.pipe.StartRequest(id) +	defer func() { +		cc.pipe.EndRequest(id) +		if err != nil { +			cc.pipe.StartResponse(id) +			cc.pipe.EndResponse(id) +		} else { +			// Remember the pipeline id of this request +			cc.lk.Lock() +			cc.pipereq[req] = id +			cc.lk.Unlock() +		} +	}()  	cc.lk.Lock()  	if cc.re != nil { // no point sending if read-side closed or broken @@ -223,7 +271,7 @@ func (cc *ClientConn) Write(req *Request) os.Error {  		cc.lk.Unlock()  	} -	err := req.Write(cc.c) +	err = req.Write(cc.c)  	if err != nil {  		cc.lk.Lock()  		defer cc.lk.Unlock() @@ -231,9 +279,6 @@ func (cc *ClientConn) Write(req *Request) os.Error {  		return err  	}  	cc.nwritten++ -	cc.lk.Lock() -	cc.reqm.PushBack(req.Method) -	cc.lk.Unlock()  	return nil  } @@ -250,7 +295,21 @@ func (cc *ClientConn) Pending() int {  // returned together with an ErrPersistEOF, which means that the remote  // requested that this be the last request serviced. Read can be called  // concurrently with Write, but not with another Read. -func (cc *ClientConn) Read() (resp *Response, err os.Error) { +func (cc *ClientConn) Read(req *Request) (resp *Response, err os.Error) { + +	// Retrieve the pipeline ID of this request/response pair +	cc.lk.Lock() +	id, ok := cc.pipereq[req] +	cc.pipereq[req] = 0, false +	if !ok { +		cc.lk.Unlock() +		return nil, ErrPipeline +	} +	cc.lk.Unlock() + +	// Ensure pipeline order +	cc.pipe.StartResponse(id) +	defer cc.pipe.EndResponse(id)  	cc.lk.Lock()  	if cc.re != nil { @@ -259,17 +318,13 @@ func (cc *ClientConn) Read() (resp *Response, err os.Error) {  	}  	cc.lk.Unlock() -	if cc.nread >= cc.nwritten { -		return nil, os.NewError("persist client pipe count") -	} -  	// Make sure body is fully consumed, even if user does not call body.Close -	if cc.lastBody != nil { +	if cc.lastbody != nil {  		// body.Close is assumed to be idempotent and multiple calls to  		// it should return the error that its first invokation  		// returned. -		err = cc.lastBody.Close() -		cc.lastBody = nil +		err = cc.lastbody.Close() +		cc.lastbody = nil  		if err != nil {  			cc.lk.Lock()  			defer cc.lk.Unlock() @@ -278,18 +333,14 @@ func (cc *ClientConn) Read() (resp *Response, err os.Error) {  		}  	} -	cc.lk.Lock() -	m := cc.reqm.Front() -	cc.reqm.Remove(m) -	cc.lk.Unlock() -	resp, err = ReadResponse(cc.r, m.Value.(string)) +	resp, err = ReadResponse(cc.r, req.Method)  	if err != nil {  		cc.lk.Lock()  		defer cc.lk.Unlock()  		cc.re = err  		return  	} -	cc.lastBody = resp.Body +	cc.lastbody = resp.Body  	cc.nread++ @@ -301,3 +352,12 @@ func (cc *ClientConn) Read() (resp *Response, err os.Error) {  	}  	return  } + +// Do is convenience method that writes a request and reads a response. +func (cc *ClientConn) Do(req *Request) (resp *Response, err os.Error) { +	err = cc.Write(req) +	if err != nil { +		return +	} +	return cc.Read(req) +} | 
