summaryrefslogtreecommitdiff
path: root/src/pkg/http/transport.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/pkg/http/transport.go')
-rw-r--r--src/pkg/http/transport.go601
1 files changed, 518 insertions, 83 deletions
diff --git a/src/pkg/http/transport.go b/src/pkg/http/transport.go
index 78d316a55..7fa37af3b 100644
--- a/src/pkg/http/transport.go
+++ b/src/pkg/http/transport.go
@@ -6,9 +6,12 @@ package http
import (
"bufio"
+ "compress/gzip"
"crypto/tls"
"encoding/base64"
"fmt"
+ "io"
+ "log"
"net"
"os"
"strings"
@@ -20,46 +23,109 @@ import (
// each call to Do and uses HTTP proxies as directed by the
// $HTTP_PROXY and $NO_PROXY (or $http_proxy and $no_proxy)
// environment variables.
-var DefaultTransport Transport = &transport{}
-
-// transport implements Tranport for the default case, using TCP
-// connections to either the host or a proxy, serving http or https
-// schemes. In the future this may become public and support options
-// on keep-alive connection duration, pipelining controls, etc. For
-// now this is simply a port of the old Go code client code to the
-// Transport interface.
-type transport struct {
- // TODO: keep-alives, pipelining, etc using a map from
- // scheme/host to a connection. Something like:
- l sync.Mutex
- hostConn map[string]*ClientConn
-}
-
-func (ct *transport) Do(req *Request) (resp *Response, err os.Error) {
+var DefaultTransport RoundTripper = &Transport{}
+
+// DefaultMaxIdleConnsPerHost is the default value of Transport's
+// MaxIdleConnsPerHost.
+const DefaultMaxIdleConnsPerHost = 2
+
+// Transport is an implementation of RoundTripper that supports http,
+// https, and http proxies (for either http or https with CONNECT).
+// Transport can also cache connections for future re-use.
+type Transport struct {
+ lk sync.Mutex
+ idleConn map[string][]*persistConn
+
+ // TODO: tunable on global max cached connections
+ // TODO: tunable on timeout on cached connections
+ // TODO: optional pipelining
+
+ IgnoreEnvironment bool // don't look at environment variables for proxy configuration
+ DisableKeepAlives bool
+ DisableCompression bool
+
+ // MaxIdleConnsPerHost, if non-zero, controls the maximum idle
+ // (keep-alive) to keep to keep per-host. If zero,
+ // DefaultMaxIdleConnsPerHost is used.
+ MaxIdleConnsPerHost int
+}
+
+// RoundTrip implements the RoundTripper interface.
+func (t *Transport) RoundTrip(req *Request) (resp *Response, err os.Error) {
+ if req.URL == nil {
+ if req.URL, err = ParseURL(req.RawURL); err != nil {
+ return
+ }
+ }
if req.URL.Scheme != "http" && req.URL.Scheme != "https" {
return nil, &badStringError{"unsupported protocol scheme", req.URL.Scheme}
}
- addr := req.URL.Host
- if !hasPort(addr) {
- addr += ":" + req.URL.Scheme
+ cm, err := t.connectMethodForRequest(req)
+ if err != nil {
+ return nil, err
+ }
+
+ // Get the cached or newly-created connection to either the
+ // host (for http or https), the http proxy, or the http proxy
+ // pre-CONNECTed to https server. In any case, we'll be ready
+ // to send it requests.
+ pconn, err := t.getConn(cm)
+ if err != nil {
+ return nil, err
}
- var proxyURL *URL
- proxyAuth := ""
- proxy := ""
- if !matchNoProxy(addr) {
- proxy = os.Getenv("HTTP_PROXY")
- if proxy == "" {
- proxy = os.Getenv("http_proxy")
+ return pconn.roundTrip(req)
+}
+
+// CloseIdleConnections closes any connections which were previously
+// connected from previous requests but are now sitting idle in
+// a "keep-alive" state. It does not interrupt any connections currently
+// in use.
+func (t *Transport) CloseIdleConnections() {
+ t.lk.Lock()
+ defer t.lk.Unlock()
+ if t.idleConn == nil {
+ return
+ }
+ for _, conns := range t.idleConn {
+ for _, pconn := range conns {
+ pconn.close()
}
}
+ t.idleConn = nil
+}
- var write = (*Request).Write
+//
+// Private implementation past this point.
+//
- if proxy != "" {
- write = (*Request).WriteProxy
- proxyURL, err = ParseRequestURL(proxy)
+func (t *Transport) getenvEitherCase(k string) string {
+ if t.IgnoreEnvironment {
+ return ""
+ }
+ if v := t.getenv(strings.ToUpper(k)); v != "" {
+ return v
+ }
+ return t.getenv(strings.ToLower(k))
+}
+
+func (t *Transport) getenv(k string) string {
+ if t.IgnoreEnvironment {
+ return ""
+ }
+ return os.Getenv(k)
+}
+
+func (t *Transport) connectMethodForRequest(req *Request) (*connectMethod, os.Error) {
+ cm := &connectMethod{
+ targetScheme: req.URL.Scheme,
+ targetAddr: canonicalAddr(req.URL),
+ }
+
+ proxy := t.getenvEitherCase("HTTP_PROXY")
+ if proxy != "" && t.useProxy(cm.targetAddr) {
+ proxyURL, err := ParseRequestURL(proxy)
if err != nil {
return nil, os.ErrorString("invalid proxy address")
}
@@ -69,83 +135,452 @@ func (ct *transport) Do(req *Request) (resp *Response, err os.Error) {
return nil, os.ErrorString("invalid proxy address")
}
}
- addr = proxyURL.Host
- proxyInfo := proxyURL.RawUserinfo
- if proxyInfo != "" {
- enc := base64.URLEncoding
- encoded := make([]byte, enc.EncodedLen(len(proxyInfo)))
- enc.Encode(encoded, []byte(proxyInfo))
- proxyAuth = "Basic " + string(encoded)
+ cm.proxyURL = proxyURL
+ }
+ return cm, nil
+}
+
+// proxyAuth returns the Proxy-Authorization header to set
+// on requests, if applicable.
+func (cm *connectMethod) proxyAuth() string {
+ if cm.proxyURL == nil {
+ return ""
+ }
+ proxyInfo := cm.proxyURL.RawUserinfo
+ if proxyInfo != "" {
+ enc := base64.URLEncoding
+ encoded := make([]byte, enc.EncodedLen(len(proxyInfo)))
+ enc.Encode(encoded, []byte(proxyInfo))
+ return "Basic " + string(encoded)
+ }
+ return ""
+}
+
+func (t *Transport) putIdleConn(pconn *persistConn) {
+ t.lk.Lock()
+ defer t.lk.Unlock()
+ if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
+ pconn.close()
+ return
+ }
+ if pconn.isBroken() {
+ return
+ }
+ key := pconn.cacheKey
+ max := t.MaxIdleConnsPerHost
+ if max == 0 {
+ max = DefaultMaxIdleConnsPerHost
+ }
+ if len(t.idleConn[key]) >= max {
+ pconn.close()
+ return
+ }
+ t.idleConn[key] = append(t.idleConn[key], pconn)
+}
+
+func (t *Transport) getIdleConn(cm *connectMethod) (pconn *persistConn) {
+ t.lk.Lock()
+ defer t.lk.Unlock()
+ if t.idleConn == nil {
+ t.idleConn = make(map[string][]*persistConn)
+ }
+ key := cm.String()
+ for {
+ pconns, ok := t.idleConn[key]
+ if !ok {
+ return nil
+ }
+ if len(pconns) == 1 {
+ pconn = pconns[0]
+ t.idleConn[key] = nil, false
+ } else {
+ // 2 or more cached connections; pop last
+ // TODO: queue?
+ pconn = pconns[len(pconns)-1]
+ t.idleConn[key] = pconns[0 : len(pconns)-1]
}
+ if !pconn.isBroken() {
+ return
+ }
+ }
+ return
+}
+
+// getConn dials and creates a new persistConn to the target as
+// specified in the connectMethod. This includes doing a proxy CONNECT
+// and/or setting up TLS. If this doesn't return an error, the persistConn
+// is ready to write requests to.
+func (t *Transport) getConn(cm *connectMethod) (*persistConn, os.Error) {
+ if pc := t.getIdleConn(cm); pc != nil {
+ return pc, nil
}
- // Connect to server or proxy
- conn, err := net.Dial("tcp", "", addr)
+ conn, err := net.Dial("tcp", cm.addr())
if err != nil {
return nil, err
}
- if req.URL.Scheme == "http" {
- // Include proxy http header if needed.
- if proxyAuth != "" {
- req.Header.Set("Proxy-Authorization", proxyAuth)
- }
- } else { // https
- if proxyURL != nil {
- // Ask proxy for direct connection to server.
- // addr defaults above to ":https" but we need to use numbers
- addr = req.URL.Host
- if !hasPort(addr) {
- addr += ":443"
- }
- fmt.Fprintf(conn, "CONNECT %s HTTP/1.1\r\n", addr)
- fmt.Fprintf(conn, "Host: %s\r\n", addr)
- if proxyAuth != "" {
- fmt.Fprintf(conn, "Proxy-Authorization: %s\r\n", proxyAuth)
- }
- fmt.Fprintf(conn, "\r\n")
+ pa := cm.proxyAuth()
- // Read response.
- // Okay to use and discard buffered reader here, because
- // TLS server will not speak until spoken to.
- br := bufio.NewReader(conn)
- resp, err := ReadResponse(br, "CONNECT")
- if err != nil {
- return nil, err
- }
- if resp.StatusCode != 200 {
- f := strings.Split(resp.Status, " ", 2)
- return nil, os.ErrorString(f[1])
+ pconn := &persistConn{
+ t: t,
+ cacheKey: cm.String(),
+ conn: conn,
+ reqch: make(chan requestAndChan, 50),
+ }
+ newClientConnFunc := NewClientConn
+
+ switch {
+ case cm.proxyURL == nil:
+ // Do nothing.
+ case cm.targetScheme == "http":
+ newClientConnFunc = NewProxyClientConn
+ if pa != "" {
+ pconn.mutateRequestFunc = func(req *Request) {
+ if req.Header == nil {
+ req.Header = make(Header)
+ }
+ req.Header.Set("Proxy-Authorization", pa)
}
}
+ case cm.targetScheme == "https":
+ fmt.Fprintf(conn, "CONNECT %s HTTP/1.1\r\n", cm.targetAddr)
+ fmt.Fprintf(conn, "Host: %s\r\n", cm.targetAddr)
+ if pa != "" {
+ fmt.Fprintf(conn, "Proxy-Authorization: %s\r\n", pa)
+ }
+ fmt.Fprintf(conn, "\r\n")
+ // Read response.
+ // Okay to use and discard buffered reader here, because
+ // TLS server will not speak until spoken to.
+ br := bufio.NewReader(conn)
+ resp, err := ReadResponse(br, "CONNECT")
+ if err != nil {
+ conn.Close()
+ return nil, err
+ }
+ if resp.StatusCode != 200 {
+ f := strings.Split(resp.Status, " ", 2)
+ conn.Close()
+ return nil, os.ErrorString(f[1])
+ }
+ }
+
+ if cm.targetScheme == "https" {
// Initiate TLS and check remote host name against certificate.
conn = tls.Client(conn, nil)
if err = conn.(*tls.Conn).Handshake(); err != nil {
return nil, err
}
- h := req.URL.Host
- if hasPort(h) {
- h = h[:strings.LastIndex(h, ":")]
- }
- if err = conn.(*tls.Conn).VerifyHostname(h); err != nil {
+ if err = conn.(*tls.Conn).VerifyHostname(cm.tlsHost()); err != nil {
return nil, err
}
+ pconn.conn = conn
}
- err = write(req, conn)
- if err != nil {
- conn.Close()
- return nil, err
+ pconn.br = bufio.NewReader(pconn.conn)
+ pconn.cc = newClientConnFunc(conn, pconn.br)
+ pconn.cc.readRes = readResponseWithEOFSignal
+ go pconn.readLoop()
+ return pconn, nil
+}
+
+// useProxy returns true if requests to addr should use a proxy,
+// according to the NO_PROXY or no_proxy environment variable.
+func (t *Transport) useProxy(addr string) bool {
+ if len(addr) == 0 {
+ return true
+ }
+ no_proxy := t.getenvEitherCase("NO_PROXY")
+ if no_proxy == "*" {
+ return false
}
- reader := bufio.NewReader(conn)
- resp, err = ReadResponse(reader, req.Method)
+ addr = strings.ToLower(strings.TrimSpace(addr))
+ if hasPort(addr) {
+ addr = addr[:strings.LastIndex(addr, ":")]
+ }
+
+ for _, p := range strings.Split(no_proxy, ",", -1) {
+ p = strings.ToLower(strings.TrimSpace(p))
+ if len(p) == 0 {
+ continue
+ }
+ if hasPort(p) {
+ p = p[:strings.LastIndex(p, ":")]
+ }
+ if addr == p || (p[0] == '.' && (strings.HasSuffix(addr, p) || addr == p[1:])) {
+ return false
+ }
+ }
+ return true
+}
+
+// connectMethod is the map key (in its String form) for keeping persistent
+// TCP connections alive for subsequent HTTP requests.
+//
+// A connect method may be of the following types:
+//
+// Cache key form Description
+// ----------------- -------------------------
+// ||http|foo.com http directly to server, no proxy
+// ||https|foo.com https directly to server, no proxy
+// http://proxy.com|https|foo.com http to proxy, then CONNECT to foo.com
+// http://proxy.com|http http to proxy, http to anywhere after that
+//
+// Note: no support to https to the proxy yet.
+//
+type connectMethod struct {
+ proxyURL *URL // "" for no proxy, else full proxy URL
+ targetScheme string // "http" or "https"
+ targetAddr string // Not used if proxy + http targetScheme (4th example in table)
+}
+
+func (ck *connectMethod) String() string {
+ proxyStr := ""
+ if ck.proxyURL != nil {
+ proxyStr = ck.proxyURL.String()
+ }
+ return strings.Join([]string{proxyStr, ck.targetScheme, ck.targetAddr}, "|")
+}
+
+// addr returns the first hop "host:port" to which we need to TCP connect.
+func (cm *connectMethod) addr() string {
+ if cm.proxyURL != nil {
+ return canonicalAddr(cm.proxyURL)
+ }
+ return cm.targetAddr
+}
+
+// tlsHost returns the host name to match against the peer's
+// TLS certificate.
+func (cm *connectMethod) tlsHost() string {
+ h := cm.targetAddr
+ if hasPort(h) {
+ h = h[:strings.LastIndex(h, ":")]
+ }
+ return h
+}
+
+type readResult struct {
+ res *Response // either res or err will be set
+ err os.Error
+}
+
+type writeRequest struct {
+ // Set by client (in pc.roundTrip)
+ req *Request
+ resch chan *readResult
+
+ // Set by writeLoop if an error writing headers.
+ writeErr os.Error
+}
+
+// persistConn wraps a connection, usually a persistent one
+// (but may be used for non-keep-alive requests as well)
+type persistConn struct {
+ t *Transport
+ cacheKey string // its connectMethod.String()
+ conn net.Conn
+ cc *ClientConn
+ br *bufio.Reader
+ reqch chan requestAndChan // written by roundTrip(); read by readLoop()
+ mutateRequestFunc func(*Request) // nil or func to modify each outbound request
+
+ lk sync.Mutex // guards numExpectedResponses and broken
+ numExpectedResponses int
+ broken bool // an error has happened on this connection; marked broken so it's not reused.
+}
+
+func (pc *persistConn) isBroken() bool {
+ pc.lk.Lock()
+ defer pc.lk.Unlock()
+ return pc.broken
+}
+
+func (pc *persistConn) expectingResponse() bool {
+ pc.lk.Lock()
+ defer pc.lk.Unlock()
+ return pc.numExpectedResponses > 0
+}
+
+func (pc *persistConn) readLoop() {
+ alive := true
+ for alive {
+ pb, err := pc.br.Peek(1)
+ if err != nil {
+ if (err == os.EOF || err == os.EINVAL) && !pc.expectingResponse() {
+ // Remote side closed on us. (We probably hit their
+ // max idle timeout)
+ pc.close()
+ return
+ }
+ }
+ if !pc.expectingResponse() {
+ log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v",
+ string(pb), err)
+ pc.close()
+ return
+ }
+
+ rc := <-pc.reqch
+ resp, err := pc.cc.Read(rc.req)
+
+ if err == ErrPersistEOF {
+ // Succeeded, but we can't send any more
+ // persistent connections on this again. We
+ // hide this error to upstream callers.
+ alive = false
+ err = nil
+ } else if err != nil || rc.req.Close {
+ alive = false
+ }
+
+ hasBody := resp != nil && resp.ContentLength != 0
+ var waitForBodyRead chan bool
+ if alive {
+ if hasBody {
+ waitForBodyRead = make(chan bool)
+ resp.Body.(*bodyEOFSignal).fn = func() {
+ pc.t.putIdleConn(pc)
+ waitForBodyRead <- true
+ }
+ } else {
+ pc.t.putIdleConn(pc)
+ }
+ }
+
+ rc.ch <- responseAndError{resp, err}
+
+ // Wait for the just-returned response body to be fully consumed
+ // before we race and peek on the underlying bufio reader.
+ if waitForBodyRead != nil {
+ <-waitForBodyRead
+ }
+ }
+}
+
+type responseAndError struct {
+ res *Response
+ err os.Error
+}
+
+type requestAndChan struct {
+ req *Request
+ ch chan responseAndError
+}
+
+func (pc *persistConn) roundTrip(req *Request) (resp *Response, err os.Error) {
+ if pc.mutateRequestFunc != nil {
+ pc.mutateRequestFunc(req)
+ }
+
+ // Ask for a compressed version if the caller didn't set their
+ // own value for Accept-Encoding. We only attempted to
+ // uncompress the gzip stream if we were the layer that
+ // requested it.
+ requestedGzip := false
+ if !pc.t.DisableCompression && req.Header.Get("Accept-Encoding") == "" {
+ // Request gzip only, not deflate. Deflate is ambiguous and
+ // as universally supported anyway.
+ // See: http://www.gzip.org/zlib/zlib_faq.html#faq38
+ requestedGzip = true
+ req.Header.Set("Accept-Encoding", "gzip")
+ }
+
+ pc.lk.Lock()
+ pc.numExpectedResponses++
+ pc.lk.Unlock()
+
+ err = pc.cc.Write(req)
if err != nil {
- conn.Close()
- return nil, err
+ pc.close()
+ return
+ }
+
+ ch := make(chan responseAndError, 1)
+ pc.reqch <- requestAndChan{req, ch}
+ re := <-ch
+ pc.lk.Lock()
+ pc.numExpectedResponses--
+ pc.lk.Unlock()
+
+ if re.err == nil && requestedGzip && re.res.Header.Get("Content-Encoding") == "gzip" {
+ re.res.Header.Del("Content-Encoding")
+ re.res.Header.Del("Content-Length")
+ re.res.ContentLength = -1
+ var err os.Error
+ re.res.Body, err = gzip.NewReader(re.res.Body)
+ if err != nil {
+ pc.close()
+ return nil, err
+ }
+ }
+
+ return re.res, re.err
+}
+
+func (pc *persistConn) close() {
+ pc.lk.Lock()
+ defer pc.lk.Unlock()
+ pc.broken = true
+ pc.cc.Close()
+ pc.conn.Close()
+ pc.mutateRequestFunc = nil
+}
+
+var portMap = map[string]string{
+ "http": "80",
+ "https": "443",
+}
+
+// canonicalAddr returns url.Host but always with a ":port" suffix
+func canonicalAddr(url *URL) string {
+ addr := url.Host
+ if !hasPort(addr) {
+ return addr + ":" + portMap[url.Scheme]
}
+ return addr
+}
+
+func responseIsKeepAlive(res *Response) bool {
+ // TODO: implement. for now just always shutting down the connection.
+ return false
+}
- resp.Body = readClose{resp.Body, conn}
+// readResponseWithEOFSignal is a wrapper around ReadResponse that replaces
+// the response body with a bodyEOFSignal-wrapped version.
+func readResponseWithEOFSignal(r *bufio.Reader, requestMethod string) (resp *Response, err os.Error) {
+ resp, err = ReadResponse(r, requestMethod)
+ if err == nil && resp.ContentLength != 0 {
+ resp.Body = &bodyEOFSignal{resp.Body, nil}
+ }
+ return
+}
+
+// bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most
+// once, right before the final Read() or Close() call returns, but after
+// EOF has been seen.
+type bodyEOFSignal struct {
+ body io.ReadCloser
+ fn func()
+}
+
+func (es *bodyEOFSignal) Read(p []byte) (n int, err os.Error) {
+ n, err = es.body.Read(p)
+ if err == os.EOF && es.fn != nil {
+ es.fn()
+ es.fn = nil
+ }
+ return
+}
+
+func (es *bodyEOFSignal) Close() (err os.Error) {
+ err = es.body.Close()
+ if err == nil && es.fn != nil {
+ es.fn()
+ es.fn = nil
+ }
return
}