diff options
author | Michael Stapelberg <stapelberg@debian.org> | 2013-03-04 21:27:36 +0100 |
---|---|---|
committer | Michael Stapelberg <michael@stapelberg.de> | 2013-03-04 21:27:36 +0100 |
commit | 04b08da9af0c450d645ab7389d1467308cfc2db8 (patch) | |
tree | db247935fa4f2f94408edc3acd5d0d4f997aa0d8 /src/pkg/database/sql/sql.go | |
parent | 917c5fb8ec48e22459d77e3849e6d388f93d3260 (diff) | |
download | golang-04b08da9af0c450d645ab7389d1467308cfc2db8.tar.gz |
Imported Upstream version 1.1~hg20130304upstream/1.1_hg20130304
Diffstat (limited to 'src/pkg/database/sql/sql.go')
-rw-r--r-- | src/pkg/database/sql/sql.go | 408 |
1 files changed, 289 insertions, 119 deletions
diff --git a/src/pkg/database/sql/sql.go b/src/pkg/database/sql/sql.go index 89136ef6e..4faaa11b1 100644 --- a/src/pkg/database/sql/sql.go +++ b/src/pkg/database/sql/sql.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "io" + "runtime" "sync" ) @@ -189,9 +190,66 @@ type DB struct { driver driver.Driver dsn string - mu sync.Mutex // protects freeConn and closed - freeConn []driver.Conn - closed bool + mu sync.Mutex // protects following fields + outConn map[driver.Conn]bool // whether the conn is in use + freeConn []driver.Conn + closed bool + dep map[finalCloser]depSet + onConnPut map[driver.Conn][]func() // code (with mu held) run when conn is next returned + lastPut map[driver.Conn]string // stacktrace of last conn's put; debug only +} + +// depSet is a finalCloser's outstanding dependencies +type depSet map[interface{}]bool // set of true bools + +// The finalCloser interface is used by (*DB).addDep and (*DB).get +type finalCloser interface { + // finalClose is called when the reference count of an object + // goes to zero. (*DB).mu is not held while calling it. + finalClose() error +} + +// addDep notes that x now depends on dep, and x's finalClose won't be +// called until all of x's dependencies are removed with removeDep. +func (db *DB) addDep(x finalCloser, dep interface{}) { + //println(fmt.Sprintf("addDep(%T %p, %T %p)", x, x, dep, dep)) + db.mu.Lock() + defer db.mu.Unlock() + if db.dep == nil { + db.dep = make(map[finalCloser]depSet) + } + xdep := db.dep[x] + if xdep == nil { + xdep = make(depSet) + db.dep[x] = xdep + } + xdep[dep] = true +} + +// removeDep notes that x no longer depends on dep. +// If x still has dependencies, nil is returned. +// If x no longer has any dependencies, its finalClose method will be +// called and its error value will be returned. +func (db *DB) removeDep(x finalCloser, dep interface{}) error { + //println(fmt.Sprintf("removeDep(%T %p, %T %p)", x, x, dep, dep)) + done := false + + db.mu.Lock() + xdep := db.dep[x] + if xdep != nil { + delete(xdep, dep) + if len(xdep) == 0 { + delete(db.dep, x) + done = true + } + } + db.mu.Unlock() + + if !done { + return nil + } + //println(fmt.Sprintf("calling final close on %T %v (%#v)", x, x, x)) + return x.finalClose() } // Open opens a database specified by its database driver name and a @@ -201,11 +259,20 @@ type DB struct { // Most users will open a database via a driver-specific connection // helper function that returns a *DB. func Open(driverName, dataSourceName string) (*DB, error) { - driver, ok := drivers[driverName] + driveri, ok := drivers[driverName] if !ok { return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName) } - return &DB{driver: driver, dsn: dataSourceName}, nil + // TODO: optionally proactively connect to a Conn to check + // the dataSourceName: golang.org/issue/4804 + db := &DB{ + driver: driveri, + dsn: dataSourceName, + outConn: make(map[driver.Conn]bool), + lastPut: make(map[driver.Conn]string), + onConnPut: make(map[driver.Conn][]func()), + } + return db, nil } // Close closes the database, releasing any open resources. @@ -241,22 +308,38 @@ func (db *DB) conn() (driver.Conn, error) { if n := len(db.freeConn); n > 0 { conn := db.freeConn[n-1] db.freeConn = db.freeConn[:n-1] + db.outConn[conn] = true db.mu.Unlock() return conn, nil } db.mu.Unlock() - return db.driver.Open(db.dsn) + conn, err := db.driver.Open(db.dsn) + if err == nil { + db.mu.Lock() + db.outConn[conn] = true + db.mu.Unlock() + } + return conn, err } +// connIfFree returns (wanted, true) if wanted is still a valid conn and +// isn't in use. +// +// If wanted is valid but in use, connIfFree returns (wanted, false). +// If wanted is invalid, connIfFre returns (nil, false). func (db *DB) connIfFree(wanted driver.Conn) (conn driver.Conn, ok bool) { db.mu.Lock() defer db.mu.Unlock() + if db.outConn[wanted] { + return conn, false + } for i, conn := range db.freeConn { if conn != wanted { continue } db.freeConn[i] = db.freeConn[len(db.freeConn)-1] db.freeConn = db.freeConn[:len(db.freeConn)-1] + db.outConn[wanted] = true return wanted, true } return nil, false @@ -265,14 +348,52 @@ func (db *DB) connIfFree(wanted driver.Conn) (conn driver.Conn, ok bool) { // putConnHook is a hook for testing. var putConnHook func(*DB, driver.Conn) +// noteUnusedDriverStatement notes that si is no longer used and should +// be closed whenever possible (when c is next not in use), unless c is +// already closed. +func (db *DB) noteUnusedDriverStatement(c driver.Conn, si driver.Stmt) { + db.mu.Lock() + defer db.mu.Unlock() + if db.outConn[c] { + db.onConnPut[c] = append(db.onConnPut[c], func() { + si.Close() + }) + } else { + si.Close() + } +} + +// debugGetPut determines whether getConn & putConn calls' stack traces +// are returned for more verbose crashes. +const debugGetPut = false + // putConn adds a connection to the db's free pool. -// err is optionally the last error that occured on this connection. +// err is optionally the last error that occurred on this connection. func (db *DB) putConn(c driver.Conn, err error) { + db.mu.Lock() + if !db.outConn[c] { + if debugGetPut { + fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", c, stack(), db.lastPut[c]) + } + panic("sql: connection returned that was never out") + } + if debugGetPut { + db.lastPut[c] = stack() + } + delete(db.outConn, c) + + if fns, ok := db.onConnPut[c]; ok { + for _, fn := range fns { + fn() + } + delete(db.onConnPut, c) + } + if err == driver.ErrBadConn { // Don't reuse bad connections. + db.mu.Unlock() return } - db.mu.Lock() if putConnHook != nil { putConnHook(db, c) } @@ -287,7 +408,9 @@ func (db *DB) putConn(c driver.Conn, err error) { c.Close() } -// Prepare creates a prepared statement for later execution. +// Prepare creates a prepared statement for later queries or executions. +// Multiple queries or executions may be run concurrently from the +// returned statement. func (db *DB) Prepare(query string) (*Stmt, error) { var stmt *Stmt var err error @@ -300,7 +423,7 @@ func (db *DB) Prepare(query string) (*Stmt, error) { return stmt, err } -func (db *DB) prepare(query string) (stmt *Stmt, err error) { +func (db *DB) prepare(query string) (*Stmt, error) { // TODO: check if db.driver supports an optional // driver.Preparer interface and call that instead, if so, // otherwise we make a prepared statement that's bound @@ -311,28 +434,27 @@ func (db *DB) prepare(query string) (stmt *Stmt, err error) { if err != nil { return nil, err } - defer db.putConn(ci, err) si, err := ci.Prepare(query) if err != nil { + db.putConn(ci, err) return nil, err } - stmt = &Stmt{ + stmt := &Stmt{ db: db, query: query, css: []connStmt{{ci, si}}, } + db.addDep(stmt, stmt) return stmt, nil } // Exec executes a query without returning any rows. +// The args are for any placeholder parameters in the query. func (db *DB) Exec(query string, args ...interface{}) (Result, error) { - sargs, err := subsetTypeArgs(args) - if err != nil { - return nil, err - } var res Result + var err error for i := 0; i < 10; i++ { - res, err = db.exec(query, sargs) + res, err = db.exec(query, args) if err != driver.ErrBadConn { break } @@ -340,15 +462,21 @@ func (db *DB) Exec(query string, args ...interface{}) (Result, error) { return res, err } -func (db *DB) exec(query string, sargs []driver.Value) (res Result, err error) { +func (db *DB) exec(query string, args []interface{}) (res Result, err error) { ci, err := db.conn() if err != nil { return nil, err } - defer db.putConn(ci, err) + defer func() { + db.putConn(ci, err) + }() if execer, ok := ci.(driver.Execer); ok { - resi, err := execer.Exec(query, sargs) + dargs, err := driverArgs(nil, args) + if err != nil { + return nil, err + } + resi, err := execer.Exec(query, dargs) if err != driver.ErrSkip { if err != nil { return nil, err @@ -363,25 +491,83 @@ func (db *DB) exec(query string, sargs []driver.Value) (res Result, err error) { } defer sti.Close() - resi, err := sti.Exec(sargs) + return resultFromStatement(sti, args...) +} + +// Query executes a query that returns rows, typically a SELECT. +// The args are for any placeholder parameters in the query. +func (db *DB) Query(query string, args ...interface{}) (*Rows, error) { + var rows *Rows + var err error + for i := 0; i < 10; i++ { + rows, err = db.query(query, args) + if err != driver.ErrBadConn { + break + } + } + return rows, err +} + +func (db *DB) query(query string, args []interface{}) (*Rows, error) { + ci, err := db.conn() if err != nil { return nil, err } - return result{resi}, nil + + releaseConn := func(err error) { db.putConn(ci, err) } + + return db.queryConn(ci, releaseConn, query, args) } -// Query executes a query that returns rows, typically a SELECT. -func (db *DB) Query(query string, args ...interface{}) (*Rows, error) { - stmt, err := db.Prepare(query) +// queryConn executes a query on the given connection. +// The connection gets released by the releaseConn function. +func (db *DB) queryConn(ci driver.Conn, releaseConn func(error), query string, args []interface{}) (*Rows, error) { + if queryer, ok := ci.(driver.Queryer); ok { + dargs, err := driverArgs(nil, args) + if err != nil { + releaseConn(err) + return nil, err + } + rowsi, err := queryer.Query(query, dargs) + if err != driver.ErrSkip { + if err != nil { + releaseConn(err) + return nil, err + } + // Note: ownership of ci passes to the *Rows, to be freed + // with releaseConn. + rows := &Rows{ + db: db, + ci: ci, + releaseConn: releaseConn, + rowsi: rowsi, + } + return rows, nil + } + } + + sti, err := ci.Prepare(query) if err != nil { + releaseConn(err) return nil, err } - rows, err := stmt.Query(args...) + + rowsi, err := rowsiFromStatement(sti, args...) if err != nil { - stmt.Close() + releaseConn(err) + sti.Close() return nil, err } - rows.closeStmt = stmt + + // Note: ownership of ci passes to the *Rows, to be freed + // with releaseConn. + rows := &Rows{ + db: db, + ci: ci, + releaseConn: releaseConn, + rowsi: rowsi, + closeStmt: sti, + } return rows, nil } @@ -415,7 +601,7 @@ func (db *DB) begin() (tx *Tx, err error) { txi, err := ci.Begin() if err != nil { db.putConn(ci, err) - return nil, fmt.Errorf("sql: failed to Begin transaction: %v", err) + return nil, err } return &Tx{ db: db, @@ -577,13 +763,12 @@ func (tx *Tx) Exec(query string, args ...interface{}) (Result, error) { } defer tx.releaseConn() - sargs, err := subsetTypeArgs(args) - if err != nil { - return nil, err - } - if execer, ok := ci.(driver.Execer); ok { - resi, err := execer.Exec(query, sargs) + dargs, err := driverArgs(nil, args) + if err != nil { + return nil, err + } + resi, err := execer.Exec(query, dargs) if err == nil { return result{resi}, nil } @@ -598,29 +783,19 @@ func (tx *Tx) Exec(query string, args ...interface{}) (Result, error) { } defer sti.Close() - resi, err := sti.Exec(sargs) - if err != nil { - return nil, err - } - return result{resi}, nil + return resultFromStatement(sti, args...) } // Query executes a query that returns rows, typically a SELECT. func (tx *Tx) Query(query string, args ...interface{}) (*Rows, error) { - if tx.done { - return nil, ErrTxDone - } - stmt, err := tx.Prepare(query) - if err != nil { - return nil, err - } - rows, err := stmt.Query(args...) + ci, err := tx.grabConn() if err != nil { - stmt.Close() return nil, err } - rows.closeStmt = stmt - return rows, err + + releaseConn := func(err error) { tx.releaseConn() } + + return tx.db.queryConn(ci, releaseConn, query, args) } // QueryRow executes a query that is expected to return at most one row. @@ -644,6 +819,8 @@ type Stmt struct { query string // that created the Stmt stickyErr error // if non-nil, this error is returned for all operations + closemu sync.RWMutex // held exclusively during close, for read otherwise. + // If in a transaction, else both nil: tx *Tx txsi driver.Stmt @@ -661,12 +838,18 @@ type Stmt struct { // Exec executes a prepared statement with the given arguments and // returns a Result summarizing the effect of the statement. func (s *Stmt) Exec(args ...interface{}) (Result, error) { + s.closemu.RLock() + defer s.closemu.RUnlock() _, releaseConn, si, err := s.connStmt() if err != nil { return nil, err } defer releaseConn(nil) + return resultFromStatement(si, args...) +} + +func resultFromStatement(si driver.Stmt, args ...interface{}) (Result, error) { // -1 means the driver doesn't know how to count the number of // placeholders, so we won't sanity check input here and instead let the // driver deal with errors. @@ -674,51 +857,12 @@ func (s *Stmt) Exec(args ...interface{}) (Result, error) { return nil, fmt.Errorf("sql: expected %d arguments, got %d", want, len(args)) } - sargs := make([]driver.Value, len(args)) - - // Convert args to subset types. - if cc, ok := si.(driver.ColumnConverter); ok { - for n, arg := range args { - // First, see if the value itself knows how to convert - // itself to a driver type. For example, a NullString - // struct changing into a string or nil. - if svi, ok := arg.(driver.Valuer); ok { - sv, err := svi.Value() - if err != nil { - return nil, fmt.Errorf("sql: argument index %d from Value: %v", n, err) - } - if !driver.IsValue(sv) { - return nil, fmt.Errorf("sql: argument index %d: non-subset type %T returned from Value", n, sv) - } - arg = sv - } - - // Second, ask the column to sanity check itself. For - // example, drivers might use this to make sure that - // an int64 values being inserted into a 16-bit - // integer field is in range (before getting - // truncated), or that a nil can't go into a NOT NULL - // column before going across the network to get the - // same error. - sargs[n], err = cc.ColumnConverter(n).ConvertValue(arg) - if err != nil { - return nil, fmt.Errorf("sql: converting Exec argument #%d's type: %v", n, err) - } - if !driver.IsValue(sargs[n]) { - return nil, fmt.Errorf("sql: driver ColumnConverter error converted %T to unsupported type %T", - arg, sargs[n]) - } - } - } else { - for n, arg := range args { - sargs[n], err = driver.DefaultParameterConverter.ConvertValue(arg) - if err != nil { - return nil, fmt.Errorf("sql: converting Exec argument #%d's type: %v", n, err) - } - } + dargs, err := driverArgs(si, args) + if err != nil { + return nil, err } - resi, err := si.Exec(sargs) + resi, err := si.Exec(dargs) if err != nil { return nil, err } @@ -794,35 +938,54 @@ func (s *Stmt) connStmt() (ci driver.Conn, releaseConn func(error), si driver.St // Query executes a prepared query statement with the given arguments // and returns the query results as a *Rows. func (s *Stmt) Query(args ...interface{}) (*Rows, error) { + s.closemu.RLock() + defer s.closemu.RUnlock() + ci, releaseConn, si, err := s.connStmt() if err != nil { return nil, err } + rowsi, err := rowsiFromStatement(si, args...) + if err != nil { + releaseConn(err) + return nil, err + } + + // Note: ownership of ci passes to the *Rows, to be freed + // with releaseConn. + rows := &Rows{ + db: s.db, + ci: ci, + rowsi: rowsi, + // releaseConn set below + } + s.db.addDep(s, rows) + rows.releaseConn = func(err error) { + releaseConn(err) + s.db.removeDep(s, rows) + } + return rows, nil +} + +func rowsiFromStatement(si driver.Stmt, args ...interface{}) (driver.Rows, error) { // -1 means the driver doesn't know how to count the number of // placeholders, so we won't sanity check input here and instead let the // driver deal with errors. if want := si.NumInput(); want != -1 && len(args) != want { return nil, fmt.Errorf("sql: statement expects %d inputs; got %d", si.NumInput(), len(args)) } - sargs, err := subsetTypeArgs(args) + + dargs, err := driverArgs(si, args) if err != nil { return nil, err } - rowsi, err := si.Query(sargs) + + rowsi, err := si.Query(dargs) if err != nil { - releaseConn(err) return nil, err } - // Note: ownership of ci passes to the *Rows, to be freed - // with releaseConn. - rows := &Rows{ - db: s.db, - ci: ci, - releaseConn: releaseConn, - rowsi: rowsi, - } - return rows, nil + return rowsi, nil } // QueryRow executes a prepared query statement with the given arguments. @@ -846,6 +1009,9 @@ func (s *Stmt) QueryRow(args ...interface{}) *Row { // Close closes the statement. func (s *Stmt) Close() error { + s.closemu.Lock() + defer s.closemu.Unlock() + if s.stickyErr != nil { return s.stickyErr } @@ -858,18 +1024,17 @@ func (s *Stmt) Close() error { if s.tx != nil { s.txsi.Close() - } else { - for _, v := range s.css { - if ci, match := s.db.connIfFree(v.ci); match { - v.si.Close() - s.db.putConn(ci, nil) - } else { - // TODO(bradfitz): care that we can't close - // this statement because the statement's - // connection is in use? - } - } + return nil + } + + return s.db.removeDep(s, s) +} + +func (s *Stmt) finalClose() error { + for _, v := range s.css { + s.db.noteUnusedDriverStatement(v.ci, v.si) } + s.css = nil return nil } @@ -888,14 +1053,14 @@ func (s *Stmt) Close() error { // ... type Rows struct { db *DB - ci driver.Conn // owned; must call putconn when closed to release + ci driver.Conn // owned; must call releaseConn when closed to release releaseConn func(error) rowsi driver.Rows closed bool lastcols []driver.Value lasterr error - closeStmt *Stmt // if non-nil, statement to Close on close + closeStmt driver.Stmt // if non-nil, statement to Close on close } // Next prepares the next result row for reading with the Scan method. @@ -1064,3 +1229,8 @@ type Result interface { type result struct { driver.Result } + +func stack() string { + var buf [1024]byte + return string(buf[:runtime.Stack(buf[:], false)]) +} |