diff options
author | Michael Stapelberg <stapelberg@debian.org> | 2013-03-04 21:27:36 +0100 |
---|---|---|
committer | Michael Stapelberg <michael@stapelberg.de> | 2013-03-04 21:27:36 +0100 |
commit | 04b08da9af0c450d645ab7389d1467308cfc2db8 (patch) | |
tree | db247935fa4f2f94408edc3acd5d0d4f997aa0d8 /src/pkg/net/http/transport.go | |
parent | 917c5fb8ec48e22459d77e3849e6d388f93d3260 (diff) | |
download | golang-upstream/1.1_hg20130304.tar.gz |
Imported Upstream version 1.1~hg20130304upstream/1.1_hg20130304
Diffstat (limited to 'src/pkg/net/http/transport.go')
-rw-r--r-- | src/pkg/net/http/transport.go | 392 |
1 files changed, 295 insertions, 97 deletions
diff --git a/src/pkg/net/http/transport.go b/src/pkg/net/http/transport.go index 6efe191eb..685d7d56c 100644 --- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -3,7 +3,7 @@ // license that can be found in the LICENSE file. // HTTP client implementation. See RFC 2616. -// +// // This is the low-level Transport implementation of RoundTripper. // The high-level interface is in client.go. @@ -24,13 +24,14 @@ import ( "os" "strings" "sync" + "time" ) // DefaultTransport is the default implementation of Transport and is -// used by DefaultClient. It establishes a new network connection for -// each call to Do and uses HTTP proxies as directed by the -// $HTTP_PROXY and $NO_PROXY (or $http_proxy and $no_proxy) -// environment variables. +// used by DefaultClient. It establishes network connections as needed +// and caches them for reuse by subsequent calls. It uses HTTP proxies +// as directed by the $HTTP_PROXY and $NO_PROXY (or $http_proxy and +// $no_proxy) environment variables. var DefaultTransport RoundTripper = &Transport{Proxy: ProxyFromEnvironment} // DefaultMaxIdleConnsPerHost is the default value of Transport's @@ -41,8 +42,11 @@ const DefaultMaxIdleConnsPerHost = 2 // https, and http proxies (for either http or https with CONNECT). // Transport can also cache connections for future re-use. type Transport struct { - lk sync.Mutex + idleMu sync.Mutex idleConn map[string][]*persistConn + reqMu sync.Mutex + reqConn map[*Request]*persistConn + altMu sync.RWMutex altProto map[string]RoundTripper // nil or map of URI scheme => RoundTripper // TODO: tunable on global max cached connections @@ -68,9 +72,15 @@ type Transport struct { DisableCompression bool // MaxIdleConnsPerHost, if non-zero, controls the maximum idle - // (keep-alive) to keep to keep per-host. If zero, + // (keep-alive) to keep per-host. If zero, // DefaultMaxIdleConnsPerHost is used. MaxIdleConnsPerHost int + + // ResponseHeaderTimeout, if non-zero, specifies the amount of + // time to wait for a server's response headers after fully + // writing the request (including its body, if any). This + // time does not include the time to read the response body. + ResponseHeaderTimeout time.Duration } // ProxyFromEnvironment returns the URL of the proxy to use for a @@ -88,7 +98,7 @@ func ProxyFromEnvironment(req *Request) (*url.URL, error) { return nil, nil } proxyURL, err := url.Parse(proxy) - if err != nil || proxyURL.Scheme == "" { + if err != nil || !strings.HasPrefix(proxyURL.Scheme, "http") { if u, err := url.Parse("http://" + proxy); err == nil { proxyURL = u err = nil @@ -131,17 +141,20 @@ func (t *Transport) RoundTrip(req *Request) (resp *Response, err error) { return nil, errors.New("http: nil Request.Header") } if req.URL.Scheme != "http" && req.URL.Scheme != "https" { - t.lk.Lock() + t.altMu.RLock() var rt RoundTripper if t.altProto != nil { rt = t.altProto[req.URL.Scheme] } - t.lk.Unlock() + t.altMu.RUnlock() if rt == nil { return nil, &badStringError{"unsupported protocol scheme", req.URL.Scheme} } return rt.RoundTrip(req) } + if req.URL.Host == "" { + return nil, errors.New("http: no Host in request URL") + } treq := &transportRequest{Request: req} cm, err := t.connectMethodForRequest(treq) if err != nil { @@ -170,8 +183,8 @@ func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) { if scheme == "http" || scheme == "https" { panic("protocol " + scheme + " already registered") } - t.lk.Lock() - defer t.lk.Unlock() + t.altMu.Lock() + defer t.altMu.Unlock() if t.altProto == nil { t.altProto = make(map[string]RoundTripper) } @@ -186,17 +199,29 @@ func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) { // a "keep-alive" state. It does not interrupt any connections currently // in use. func (t *Transport) CloseIdleConnections() { - t.lk.Lock() - defer t.lk.Unlock() - if t.idleConn == nil { + t.idleMu.Lock() + m := t.idleConn + t.idleConn = nil + t.idleMu.Unlock() + if m == nil { return } - for _, conns := range t.idleConn { + for _, conns := range m { for _, pconn := range conns { pconn.close() } } - t.idleConn = make(map[string][]*persistConn) +} + +// CancelRequest cancels an in-flight request by closing its +// connection. +func (t *Transport) CancelRequest(req *Request) { + t.reqMu.Lock() + pc := t.reqConn[req] + t.reqMu.Unlock() + if pc != nil { + pc.conn.Close() + } } // @@ -242,8 +267,6 @@ func (cm *connectMethod) proxyAuth() string { // If pconn is no longer needed or not in a good state, putIdleConn // returns false. func (t *Transport) putIdleConn(pconn *persistConn) bool { - t.lk.Lock() - defer t.lk.Unlock() if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 { pconn.close() return false @@ -256,21 +279,32 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool { if max == 0 { max = DefaultMaxIdleConnsPerHost } + t.idleMu.Lock() + if t.idleConn == nil { + t.idleConn = make(map[string][]*persistConn) + } if len(t.idleConn[key]) >= max { + t.idleMu.Unlock() pconn.close() return false } + for _, exist := range t.idleConn[key] { + if exist == pconn { + log.Fatalf("dup idle pconn %p in freelist", pconn) + } + } t.idleConn[key] = append(t.idleConn[key], pconn) + t.idleMu.Unlock() return true } func (t *Transport) getIdleConn(cm *connectMethod) (pconn *persistConn) { - t.lk.Lock() - defer t.lk.Unlock() + key := cm.String() + t.idleMu.Lock() + defer t.idleMu.Unlock() if t.idleConn == nil { - t.idleConn = make(map[string][]*persistConn) + return nil } - key := cm.String() for { pconns, ok := t.idleConn[key] if !ok { @@ -289,7 +323,20 @@ func (t *Transport) getIdleConn(cm *connectMethod) (pconn *persistConn) { return } } - return + panic("unreachable") +} + +func (t *Transport) setReqConn(r *Request, pc *persistConn) { + t.reqMu.Lock() + defer t.reqMu.Unlock() + if t.reqConn == nil { + t.reqConn = make(map[*Request]*persistConn) + } + if pc != nil { + t.reqConn[r] = pc + } else { + delete(t.reqConn, r) + } } func (t *Transport) dial(network, addr string) (c net.Conn, err error) { @@ -323,6 +370,8 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { cacheKey: cm.String(), conn: conn, reqch: make(chan requestAndChan, 50), + writech: make(chan writeRequest, 50), + closech: make(chan struct{}), } switch { @@ -365,7 +414,18 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { if cm.targetScheme == "https" { // Initiate TLS and check remote host name against certificate. - conn = tls.Client(conn, t.TLSClientConfig) + cfg := t.TLSClientConfig + if cfg == nil || cfg.ServerName == "" { + host := cm.tlsHost() + if cfg == nil { + cfg = &tls.Config{ServerName: host} + } else { + clone := *cfg // shallow clone + clone.ServerName = host + cfg = &clone + } + } + conn = tls.Client(conn, cfg) if err = conn.(*tls.Conn).Handshake(); err != nil { return nil, err } @@ -380,6 +440,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { pconn.br = bufio.NewReader(pconn.conn) pconn.bw = bufio.NewWriter(pconn.conn) go pconn.readLoop() + go pconn.writeLoop() return pconn, nil } @@ -421,7 +482,15 @@ func useProxy(addr string) bool { if hasPort(p) { p = p[:strings.LastIndex(p, ":")] } - if addr == p || (p[0] == '.' && (strings.HasSuffix(addr, p) || addr == p[1:])) { + if addr == p { + return false + } + if p[0] == '.' && (strings.HasSuffix(addr, p) || addr == p[1:]) { + // no_proxy ".foo.com" matches "bar.foo.com" or "foo.com" + return false + } + if p[0] != '.' && strings.HasSuffix(addr, p) && addr[len(addr)-len(p)-1] == '.' { + // no_proxy "foo.com" matches "bar.foo.com" return false } } @@ -484,25 +553,28 @@ type persistConn struct { t *Transport cacheKey string // its connectMethod.String() conn net.Conn + closed bool // whether conn has been closed br *bufio.Reader // from conn bw *bufio.Writer // to conn - reqch chan requestAndChan // written by roundTrip(); read by readLoop() + reqch chan requestAndChan // written by roundTrip; read by readLoop + writech chan writeRequest // written by roundTrip; read by writeLoop + closech chan struct{} // broadcast close when readLoop (TCP connection) closes isProxy bool + lk sync.Mutex // guards following 3 fields + numExpectedResponses int + broken bool // an error has happened on this connection; marked broken so it's not reused. // mutateHeaderFunc is an optional func to modify extra // headers on each outbound request before it's written. (the // original Request given to RoundTrip is not modified) mutateHeaderFunc func(Header) - - lk sync.Mutex // guards numExpectedResponses and broken - numExpectedResponses int - broken bool // an error has happened on this connection; marked broken so it's not reused. } func (pc *persistConn) isBroken() bool { pc.lk.Lock() - defer pc.lk.Unlock() - return pc.broken + b := pc.broken + pc.lk.Unlock() + return b } var remoteSideClosedFunc func(error) bool // or nil to use default @@ -518,6 +590,7 @@ func remoteSideClosed(err error) bool { } func (pc *persistConn) readLoop() { + defer close(pc.closech) alive := true var lastbody io.ReadCloser // last response body, if any, read on this connection @@ -544,12 +617,16 @@ func (pc *persistConn) readLoop() { lastbody.Close() // assumed idempotent lastbody = nil } - resp, err := ReadResponse(pc.br, rc.req) + + var resp *Response + if err == nil { + resp, err = ReadResponse(pc.br, rc.req) + } + hasBody := resp != nil && rc.req.Method != "HEAD" && resp.ContentLength != 0 if err != nil { pc.close() } else { - hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0 if rc.addedGzip && hasBody && resp.Header.Get("Content-Encoding") == "gzip" { resp.Header.Del("Content-Encoding") resp.Header.Del("Content-Length") @@ -569,31 +646,37 @@ func (pc *persistConn) readLoop() { alive = false } - hasBody := resp != nil && resp.ContentLength != 0 var waitForBodyRead chan bool - if alive { - if hasBody { - lastbody = resp.Body - waitForBodyRead = make(chan bool) - resp.Body.(*bodyEOFSignal).fn = func() { - if !pc.t.putIdleConn(pc) { - alive = false - } - waitForBodyRead <- true + if hasBody { + lastbody = resp.Body + waitForBodyRead = make(chan bool, 1) + resp.Body.(*bodyEOFSignal).fn = func(err error) { + alive1 := alive + if err != nil { + alive1 = false } - } else { - // When there's no response body, we immediately - // reuse the TCP connection (putIdleConn), but - // we need to prevent ClientConn.Read from - // closing the Response.Body on the next - // loop, otherwise it might close the body - // before the client code has had a chance to - // read it (even though it'll just be 0, EOF). - lastbody = nil - - if !pc.t.putIdleConn(pc) { - alive = false + if alive1 && !pc.t.putIdleConn(pc) { + alive1 = false + } + if !alive1 || pc.isBroken() { + pc.close() } + waitForBodyRead <- alive1 + } + } + + if alive && !hasBody { + // When there's no response body, we immediately + // reuse the TCP connection (putIdleConn), but + // we need to prevent ClientConn.Read from + // closing the Response.Body on the next + // loop, otherwise it might close the body + // before the client code has had a chance to + // read it (even though it'll just be 0, EOF). + lastbody = nil + + if !pc.t.putIdleConn(pc) { + alive = false } } @@ -602,7 +685,35 @@ func (pc *persistConn) readLoop() { // Wait for the just-returned response body to be fully consumed // before we race and peek on the underlying bufio reader. if waitForBodyRead != nil { - <-waitForBodyRead + alive = <-waitForBodyRead + } + + pc.t.setReqConn(rc.req, nil) + + if !alive { + pc.close() + } + } +} + +func (pc *persistConn) writeLoop() { + for { + select { + case wr := <-pc.writech: + if pc.isBroken() { + wr.ch <- errors.New("http: can't write HTTP request on broken connection") + continue + } + err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) + if err == nil { + err = pc.bw.Flush() + } + if err != nil { + pc.markBroken() + } + wr.ch <- err + case <-pc.closech: + return } } } @@ -622,9 +733,24 @@ type requestAndChan struct { addedGzip bool } +// A writeRequest is sent by the readLoop's goroutine to the +// writeLoop's goroutine to write a request while the read loop +// concurrently waits on both the write response and the server's +// reply. +type writeRequest struct { + req *transportRequest + ch chan<- error +} + func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { - if pc.mutateHeaderFunc != nil { - pc.mutateHeaderFunc(req.extraHeaders()) + pc.t.setReqConn(req.Request, pc) + pc.lk.Lock() + pc.numExpectedResponses++ + headerFn := pc.mutateHeaderFunc + pc.lk.Unlock() + + if headerFn != nil { + headerFn(req.extraHeaders()) } // Ask for a compressed version if the caller didn't set their @@ -633,34 +759,84 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err // requested it. requestedGzip := false if !pc.t.DisableCompression && req.Header.Get("Accept-Encoding") == "" { - // Request gzip only, not deflate. Deflate is ambiguous and + // Request gzip only, not deflate. Deflate is ambiguous and // not as universally supported anyway. // See: http://www.gzip.org/zlib/zlib_faq.html#faq38 requestedGzip = true req.extraHeaders().Set("Accept-Encoding", "gzip") } - pc.lk.Lock() - pc.numExpectedResponses++ - pc.lk.Unlock() + // Write the request concurrently with waiting for a response, + // in case the server decides to reply before reading our full + // request body. + writeErrCh := make(chan error, 1) + pc.writech <- writeRequest{req, writeErrCh} - err = req.Request.write(pc.bw, pc.isProxy, req.extra) - if err != nil { - pc.close() - return + resc := make(chan responseAndError, 1) + pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} + + var re responseAndError + var pconnDeadCh = pc.closech + var failTicker <-chan time.Time + var respHeaderTimer <-chan time.Time +WaitResponse: + for { + select { + case err := <-writeErrCh: + if err != nil { + re = responseAndError{nil, err} + pc.close() + break WaitResponse + } + if d := pc.t.ResponseHeaderTimeout; d > 0 { + respHeaderTimer = time.After(d) + } + case <-pconnDeadCh: + // The persist connection is dead. This shouldn't + // usually happen (only with Connection: close responses + // with no response bodies), but if it does happen it + // means either a) the remote server hung up on us + // prematurely, or b) the readLoop sent us a response & + // closed its closech at roughly the same time, and we + // selected this case first, in which case a response + // might still be coming soon. + // + // We can't avoid the select race in b) by using a unbuffered + // resc channel instead, because then goroutines can + // leak if we exit due to other errors. + pconnDeadCh = nil // avoid spinning + failTicker = time.After(100 * time.Millisecond) // arbitrary time to wait for resc + case <-failTicker: + re = responseAndError{err: errors.New("net/http: transport closed before response was received")} + break WaitResponse + case <-respHeaderTimer: + pc.close() + re = responseAndError{err: errors.New("net/http: timeout awaiting response headers")} + break WaitResponse + case re = <-resc: + break WaitResponse + } } - pc.bw.Flush() - ch := make(chan responseAndError, 1) - pc.reqch <- requestAndChan{req.Request, ch, requestedGzip} - re := <-ch pc.lk.Lock() pc.numExpectedResponses-- pc.lk.Unlock() + if re.err != nil { + pc.t.setReqConn(req.Request, nil) + } return re.res, re.err } +// markBroken marks a connection as broken (so it's not reused). +// It differs from close in that it doesn't close the underlying +// connection for use when it's still being read. +func (pc *persistConn) markBroken() { + pc.lk.Lock() + defer pc.lk.Unlock() + pc.broken = true +} + func (pc *persistConn) close() { pc.lk.Lock() defer pc.lk.Unlock() @@ -669,7 +845,10 @@ func (pc *persistConn) close() { func (pc *persistConn) closeLocked() { pc.broken = true - pc.conn.Close() + if !pc.closed { + pc.conn.Close() + pc.closed = true + } pc.mutateHeaderFunc = nil } @@ -687,43 +866,62 @@ func canonicalAddr(url *url.URL) string { return addr } -func responseIsKeepAlive(res *Response) bool { - // TODO: implement. for now just always shutting down the connection. - return false -} - // bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most -// once, right before the final Read() or Close() call returns, but after -// EOF has been seen. +// once, right before its final (error-producing) Read or Close call +// returns. type bodyEOFSignal struct { - body io.ReadCloser - fn func() - isClosed bool + body io.ReadCloser + mu sync.Mutex // guards closed, rerr and fn + closed bool // whether Close has been called + rerr error // sticky Read error + fn func(error) // error will be nil on Read io.EOF } func (es *bodyEOFSignal) Read(p []byte) (n int, err error) { - n, err = es.body.Read(p) - if es.isClosed && n > 0 { - panic("http: unexpected bodyEOFSignal Read after Close; see issue 1725") + es.mu.Lock() + closed, rerr := es.closed, es.rerr + es.mu.Unlock() + if closed { + return 0, errors.New("http: read on closed response body") + } + if rerr != nil { + return 0, rerr } - if err == io.EOF && es.fn != nil { - es.fn() - es.fn = nil + + n, err = es.body.Read(p) + if err != nil { + es.mu.Lock() + defer es.mu.Unlock() + if es.rerr == nil { + es.rerr = err + } + es.condfn(err) } return } -func (es *bodyEOFSignal) Close() (err error) { - if es.isClosed { +func (es *bodyEOFSignal) Close() error { + es.mu.Lock() + defer es.mu.Unlock() + if es.closed { return nil } - es.isClosed = true - err = es.body.Close() - if err == nil && es.fn != nil { - es.fn() - es.fn = nil + es.closed = true + err := es.body.Close() + es.condfn(err) + return err +} + +// caller must hold es.mu. +func (es *bodyEOFSignal) condfn(err error) { + if es.fn == nil { + return } - return + if err == io.EOF { + err = nil + } + es.fn(err) + es.fn = nil } type readFirstCloseBoth struct { |