summaryrefslogtreecommitdiff
path: root/src/pkg/database/sql/sql.go
diff options
context:
space:
mode:
authorMichael Stapelberg <stapelberg@debian.org>2013-03-04 21:27:36 +0100
committerMichael Stapelberg <michael@stapelberg.de>2013-03-04 21:27:36 +0100
commit04b08da9af0c450d645ab7389d1467308cfc2db8 (patch)
treedb247935fa4f2f94408edc3acd5d0d4f997aa0d8 /src/pkg/database/sql/sql.go
parent917c5fb8ec48e22459d77e3849e6d388f93d3260 (diff)
downloadgolang-04b08da9af0c450d645ab7389d1467308cfc2db8.tar.gz
Imported Upstream version 1.1~hg20130304upstream/1.1_hg20130304
Diffstat (limited to 'src/pkg/database/sql/sql.go')
-rw-r--r--src/pkg/database/sql/sql.go408
1 files changed, 289 insertions, 119 deletions
diff --git a/src/pkg/database/sql/sql.go b/src/pkg/database/sql/sql.go
index 89136ef6e..4faaa11b1 100644
--- a/src/pkg/database/sql/sql.go
+++ b/src/pkg/database/sql/sql.go
@@ -11,6 +11,7 @@ import (
"errors"
"fmt"
"io"
+ "runtime"
"sync"
)
@@ -189,9 +190,66 @@ type DB struct {
driver driver.Driver
dsn string
- mu sync.Mutex // protects freeConn and closed
- freeConn []driver.Conn
- closed bool
+ mu sync.Mutex // protects following fields
+ outConn map[driver.Conn]bool // whether the conn is in use
+ freeConn []driver.Conn
+ closed bool
+ dep map[finalCloser]depSet
+ onConnPut map[driver.Conn][]func() // code (with mu held) run when conn is next returned
+ lastPut map[driver.Conn]string // stacktrace of last conn's put; debug only
+}
+
+// depSet is a finalCloser's outstanding dependencies
+type depSet map[interface{}]bool // set of true bools
+
+// The finalCloser interface is used by (*DB).addDep and (*DB).get
+type finalCloser interface {
+ // finalClose is called when the reference count of an object
+ // goes to zero. (*DB).mu is not held while calling it.
+ finalClose() error
+}
+
+// addDep notes that x now depends on dep, and x's finalClose won't be
+// called until all of x's dependencies are removed with removeDep.
+func (db *DB) addDep(x finalCloser, dep interface{}) {
+ //println(fmt.Sprintf("addDep(%T %p, %T %p)", x, x, dep, dep))
+ db.mu.Lock()
+ defer db.mu.Unlock()
+ if db.dep == nil {
+ db.dep = make(map[finalCloser]depSet)
+ }
+ xdep := db.dep[x]
+ if xdep == nil {
+ xdep = make(depSet)
+ db.dep[x] = xdep
+ }
+ xdep[dep] = true
+}
+
+// removeDep notes that x no longer depends on dep.
+// If x still has dependencies, nil is returned.
+// If x no longer has any dependencies, its finalClose method will be
+// called and its error value will be returned.
+func (db *DB) removeDep(x finalCloser, dep interface{}) error {
+ //println(fmt.Sprintf("removeDep(%T %p, %T %p)", x, x, dep, dep))
+ done := false
+
+ db.mu.Lock()
+ xdep := db.dep[x]
+ if xdep != nil {
+ delete(xdep, dep)
+ if len(xdep) == 0 {
+ delete(db.dep, x)
+ done = true
+ }
+ }
+ db.mu.Unlock()
+
+ if !done {
+ return nil
+ }
+ //println(fmt.Sprintf("calling final close on %T %v (%#v)", x, x, x))
+ return x.finalClose()
}
// Open opens a database specified by its database driver name and a
@@ -201,11 +259,20 @@ type DB struct {
// Most users will open a database via a driver-specific connection
// helper function that returns a *DB.
func Open(driverName, dataSourceName string) (*DB, error) {
- driver, ok := drivers[driverName]
+ driveri, ok := drivers[driverName]
if !ok {
return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
}
- return &DB{driver: driver, dsn: dataSourceName}, nil
+ // TODO: optionally proactively connect to a Conn to check
+ // the dataSourceName: golang.org/issue/4804
+ db := &DB{
+ driver: driveri,
+ dsn: dataSourceName,
+ outConn: make(map[driver.Conn]bool),
+ lastPut: make(map[driver.Conn]string),
+ onConnPut: make(map[driver.Conn][]func()),
+ }
+ return db, nil
}
// Close closes the database, releasing any open resources.
@@ -241,22 +308,38 @@ func (db *DB) conn() (driver.Conn, error) {
if n := len(db.freeConn); n > 0 {
conn := db.freeConn[n-1]
db.freeConn = db.freeConn[:n-1]
+ db.outConn[conn] = true
db.mu.Unlock()
return conn, nil
}
db.mu.Unlock()
- return db.driver.Open(db.dsn)
+ conn, err := db.driver.Open(db.dsn)
+ if err == nil {
+ db.mu.Lock()
+ db.outConn[conn] = true
+ db.mu.Unlock()
+ }
+ return conn, err
}
+// connIfFree returns (wanted, true) if wanted is still a valid conn and
+// isn't in use.
+//
+// If wanted is valid but in use, connIfFree returns (wanted, false).
+// If wanted is invalid, connIfFre returns (nil, false).
func (db *DB) connIfFree(wanted driver.Conn) (conn driver.Conn, ok bool) {
db.mu.Lock()
defer db.mu.Unlock()
+ if db.outConn[wanted] {
+ return conn, false
+ }
for i, conn := range db.freeConn {
if conn != wanted {
continue
}
db.freeConn[i] = db.freeConn[len(db.freeConn)-1]
db.freeConn = db.freeConn[:len(db.freeConn)-1]
+ db.outConn[wanted] = true
return wanted, true
}
return nil, false
@@ -265,14 +348,52 @@ func (db *DB) connIfFree(wanted driver.Conn) (conn driver.Conn, ok bool) {
// putConnHook is a hook for testing.
var putConnHook func(*DB, driver.Conn)
+// noteUnusedDriverStatement notes that si is no longer used and should
+// be closed whenever possible (when c is next not in use), unless c is
+// already closed.
+func (db *DB) noteUnusedDriverStatement(c driver.Conn, si driver.Stmt) {
+ db.mu.Lock()
+ defer db.mu.Unlock()
+ if db.outConn[c] {
+ db.onConnPut[c] = append(db.onConnPut[c], func() {
+ si.Close()
+ })
+ } else {
+ si.Close()
+ }
+}
+
+// debugGetPut determines whether getConn & putConn calls' stack traces
+// are returned for more verbose crashes.
+const debugGetPut = false
+
// putConn adds a connection to the db's free pool.
-// err is optionally the last error that occured on this connection.
+// err is optionally the last error that occurred on this connection.
func (db *DB) putConn(c driver.Conn, err error) {
+ db.mu.Lock()
+ if !db.outConn[c] {
+ if debugGetPut {
+ fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", c, stack(), db.lastPut[c])
+ }
+ panic("sql: connection returned that was never out")
+ }
+ if debugGetPut {
+ db.lastPut[c] = stack()
+ }
+ delete(db.outConn, c)
+
+ if fns, ok := db.onConnPut[c]; ok {
+ for _, fn := range fns {
+ fn()
+ }
+ delete(db.onConnPut, c)
+ }
+
if err == driver.ErrBadConn {
// Don't reuse bad connections.
+ db.mu.Unlock()
return
}
- db.mu.Lock()
if putConnHook != nil {
putConnHook(db, c)
}
@@ -287,7 +408,9 @@ func (db *DB) putConn(c driver.Conn, err error) {
c.Close()
}
-// Prepare creates a prepared statement for later execution.
+// Prepare creates a prepared statement for later queries or executions.
+// Multiple queries or executions may be run concurrently from the
+// returned statement.
func (db *DB) Prepare(query string) (*Stmt, error) {
var stmt *Stmt
var err error
@@ -300,7 +423,7 @@ func (db *DB) Prepare(query string) (*Stmt, error) {
return stmt, err
}
-func (db *DB) prepare(query string) (stmt *Stmt, err error) {
+func (db *DB) prepare(query string) (*Stmt, error) {
// TODO: check if db.driver supports an optional
// driver.Preparer interface and call that instead, if so,
// otherwise we make a prepared statement that's bound
@@ -311,28 +434,27 @@ func (db *DB) prepare(query string) (stmt *Stmt, err error) {
if err != nil {
return nil, err
}
- defer db.putConn(ci, err)
si, err := ci.Prepare(query)
if err != nil {
+ db.putConn(ci, err)
return nil, err
}
- stmt = &Stmt{
+ stmt := &Stmt{
db: db,
query: query,
css: []connStmt{{ci, si}},
}
+ db.addDep(stmt, stmt)
return stmt, nil
}
// Exec executes a query without returning any rows.
+// The args are for any placeholder parameters in the query.
func (db *DB) Exec(query string, args ...interface{}) (Result, error) {
- sargs, err := subsetTypeArgs(args)
- if err != nil {
- return nil, err
- }
var res Result
+ var err error
for i := 0; i < 10; i++ {
- res, err = db.exec(query, sargs)
+ res, err = db.exec(query, args)
if err != driver.ErrBadConn {
break
}
@@ -340,15 +462,21 @@ func (db *DB) Exec(query string, args ...interface{}) (Result, error) {
return res, err
}
-func (db *DB) exec(query string, sargs []driver.Value) (res Result, err error) {
+func (db *DB) exec(query string, args []interface{}) (res Result, err error) {
ci, err := db.conn()
if err != nil {
return nil, err
}
- defer db.putConn(ci, err)
+ defer func() {
+ db.putConn(ci, err)
+ }()
if execer, ok := ci.(driver.Execer); ok {
- resi, err := execer.Exec(query, sargs)
+ dargs, err := driverArgs(nil, args)
+ if err != nil {
+ return nil, err
+ }
+ resi, err := execer.Exec(query, dargs)
if err != driver.ErrSkip {
if err != nil {
return nil, err
@@ -363,25 +491,83 @@ func (db *DB) exec(query string, sargs []driver.Value) (res Result, err error) {
}
defer sti.Close()
- resi, err := sti.Exec(sargs)
+ return resultFromStatement(sti, args...)
+}
+
+// Query executes a query that returns rows, typically a SELECT.
+// The args are for any placeholder parameters in the query.
+func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {
+ var rows *Rows
+ var err error
+ for i := 0; i < 10; i++ {
+ rows, err = db.query(query, args)
+ if err != driver.ErrBadConn {
+ break
+ }
+ }
+ return rows, err
+}
+
+func (db *DB) query(query string, args []interface{}) (*Rows, error) {
+ ci, err := db.conn()
if err != nil {
return nil, err
}
- return result{resi}, nil
+
+ releaseConn := func(err error) { db.putConn(ci, err) }
+
+ return db.queryConn(ci, releaseConn, query, args)
}
-// Query executes a query that returns rows, typically a SELECT.
-func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {
- stmt, err := db.Prepare(query)
+// queryConn executes a query on the given connection.
+// The connection gets released by the releaseConn function.
+func (db *DB) queryConn(ci driver.Conn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {
+ if queryer, ok := ci.(driver.Queryer); ok {
+ dargs, err := driverArgs(nil, args)
+ if err != nil {
+ releaseConn(err)
+ return nil, err
+ }
+ rowsi, err := queryer.Query(query, dargs)
+ if err != driver.ErrSkip {
+ if err != nil {
+ releaseConn(err)
+ return nil, err
+ }
+ // Note: ownership of ci passes to the *Rows, to be freed
+ // with releaseConn.
+ rows := &Rows{
+ db: db,
+ ci: ci,
+ releaseConn: releaseConn,
+ rowsi: rowsi,
+ }
+ return rows, nil
+ }
+ }
+
+ sti, err := ci.Prepare(query)
if err != nil {
+ releaseConn(err)
return nil, err
}
- rows, err := stmt.Query(args...)
+
+ rowsi, err := rowsiFromStatement(sti, args...)
if err != nil {
- stmt.Close()
+ releaseConn(err)
+ sti.Close()
return nil, err
}
- rows.closeStmt = stmt
+
+ // Note: ownership of ci passes to the *Rows, to be freed
+ // with releaseConn.
+ rows := &Rows{
+ db: db,
+ ci: ci,
+ releaseConn: releaseConn,
+ rowsi: rowsi,
+ closeStmt: sti,
+ }
return rows, nil
}
@@ -415,7 +601,7 @@ func (db *DB) begin() (tx *Tx, err error) {
txi, err := ci.Begin()
if err != nil {
db.putConn(ci, err)
- return nil, fmt.Errorf("sql: failed to Begin transaction: %v", err)
+ return nil, err
}
return &Tx{
db: db,
@@ -577,13 +763,12 @@ func (tx *Tx) Exec(query string, args ...interface{}) (Result, error) {
}
defer tx.releaseConn()
- sargs, err := subsetTypeArgs(args)
- if err != nil {
- return nil, err
- }
-
if execer, ok := ci.(driver.Execer); ok {
- resi, err := execer.Exec(query, sargs)
+ dargs, err := driverArgs(nil, args)
+ if err != nil {
+ return nil, err
+ }
+ resi, err := execer.Exec(query, dargs)
if err == nil {
return result{resi}, nil
}
@@ -598,29 +783,19 @@ func (tx *Tx) Exec(query string, args ...interface{}) (Result, error) {
}
defer sti.Close()
- resi, err := sti.Exec(sargs)
- if err != nil {
- return nil, err
- }
- return result{resi}, nil
+ return resultFromStatement(sti, args...)
}
// Query executes a query that returns rows, typically a SELECT.
func (tx *Tx) Query(query string, args ...interface{}) (*Rows, error) {
- if tx.done {
- return nil, ErrTxDone
- }
- stmt, err := tx.Prepare(query)
- if err != nil {
- return nil, err
- }
- rows, err := stmt.Query(args...)
+ ci, err := tx.grabConn()
if err != nil {
- stmt.Close()
return nil, err
}
- rows.closeStmt = stmt
- return rows, err
+
+ releaseConn := func(err error) { tx.releaseConn() }
+
+ return tx.db.queryConn(ci, releaseConn, query, args)
}
// QueryRow executes a query that is expected to return at most one row.
@@ -644,6 +819,8 @@ type Stmt struct {
query string // that created the Stmt
stickyErr error // if non-nil, this error is returned for all operations
+ closemu sync.RWMutex // held exclusively during close, for read otherwise.
+
// If in a transaction, else both nil:
tx *Tx
txsi driver.Stmt
@@ -661,12 +838,18 @@ type Stmt struct {
// Exec executes a prepared statement with the given arguments and
// returns a Result summarizing the effect of the statement.
func (s *Stmt) Exec(args ...interface{}) (Result, error) {
+ s.closemu.RLock()
+ defer s.closemu.RUnlock()
_, releaseConn, si, err := s.connStmt()
if err != nil {
return nil, err
}
defer releaseConn(nil)
+ return resultFromStatement(si, args...)
+}
+
+func resultFromStatement(si driver.Stmt, args ...interface{}) (Result, error) {
// -1 means the driver doesn't know how to count the number of
// placeholders, so we won't sanity check input here and instead let the
// driver deal with errors.
@@ -674,51 +857,12 @@ func (s *Stmt) Exec(args ...interface{}) (Result, error) {
return nil, fmt.Errorf("sql: expected %d arguments, got %d", want, len(args))
}
- sargs := make([]driver.Value, len(args))
-
- // Convert args to subset types.
- if cc, ok := si.(driver.ColumnConverter); ok {
- for n, arg := range args {
- // First, see if the value itself knows how to convert
- // itself to a driver type. For example, a NullString
- // struct changing into a string or nil.
- if svi, ok := arg.(driver.Valuer); ok {
- sv, err := svi.Value()
- if err != nil {
- return nil, fmt.Errorf("sql: argument index %d from Value: %v", n, err)
- }
- if !driver.IsValue(sv) {
- return nil, fmt.Errorf("sql: argument index %d: non-subset type %T returned from Value", n, sv)
- }
- arg = sv
- }
-
- // Second, ask the column to sanity check itself. For
- // example, drivers might use this to make sure that
- // an int64 values being inserted into a 16-bit
- // integer field is in range (before getting
- // truncated), or that a nil can't go into a NOT NULL
- // column before going across the network to get the
- // same error.
- sargs[n], err = cc.ColumnConverter(n).ConvertValue(arg)
- if err != nil {
- return nil, fmt.Errorf("sql: converting Exec argument #%d's type: %v", n, err)
- }
- if !driver.IsValue(sargs[n]) {
- return nil, fmt.Errorf("sql: driver ColumnConverter error converted %T to unsupported type %T",
- arg, sargs[n])
- }
- }
- } else {
- for n, arg := range args {
- sargs[n], err = driver.DefaultParameterConverter.ConvertValue(arg)
- if err != nil {
- return nil, fmt.Errorf("sql: converting Exec argument #%d's type: %v", n, err)
- }
- }
+ dargs, err := driverArgs(si, args)
+ if err != nil {
+ return nil, err
}
- resi, err := si.Exec(sargs)
+ resi, err := si.Exec(dargs)
if err != nil {
return nil, err
}
@@ -794,35 +938,54 @@ func (s *Stmt) connStmt() (ci driver.Conn, releaseConn func(error), si driver.St
// Query executes a prepared query statement with the given arguments
// and returns the query results as a *Rows.
func (s *Stmt) Query(args ...interface{}) (*Rows, error) {
+ s.closemu.RLock()
+ defer s.closemu.RUnlock()
+
ci, releaseConn, si, err := s.connStmt()
if err != nil {
return nil, err
}
+ rowsi, err := rowsiFromStatement(si, args...)
+ if err != nil {
+ releaseConn(err)
+ return nil, err
+ }
+
+ // Note: ownership of ci passes to the *Rows, to be freed
+ // with releaseConn.
+ rows := &Rows{
+ db: s.db,
+ ci: ci,
+ rowsi: rowsi,
+ // releaseConn set below
+ }
+ s.db.addDep(s, rows)
+ rows.releaseConn = func(err error) {
+ releaseConn(err)
+ s.db.removeDep(s, rows)
+ }
+ return rows, nil
+}
+
+func rowsiFromStatement(si driver.Stmt, args ...interface{}) (driver.Rows, error) {
// -1 means the driver doesn't know how to count the number of
// placeholders, so we won't sanity check input here and instead let the
// driver deal with errors.
if want := si.NumInput(); want != -1 && len(args) != want {
return nil, fmt.Errorf("sql: statement expects %d inputs; got %d", si.NumInput(), len(args))
}
- sargs, err := subsetTypeArgs(args)
+
+ dargs, err := driverArgs(si, args)
if err != nil {
return nil, err
}
- rowsi, err := si.Query(sargs)
+
+ rowsi, err := si.Query(dargs)
if err != nil {
- releaseConn(err)
return nil, err
}
- // Note: ownership of ci passes to the *Rows, to be freed
- // with releaseConn.
- rows := &Rows{
- db: s.db,
- ci: ci,
- releaseConn: releaseConn,
- rowsi: rowsi,
- }
- return rows, nil
+ return rowsi, nil
}
// QueryRow executes a prepared query statement with the given arguments.
@@ -846,6 +1009,9 @@ func (s *Stmt) QueryRow(args ...interface{}) *Row {
// Close closes the statement.
func (s *Stmt) Close() error {
+ s.closemu.Lock()
+ defer s.closemu.Unlock()
+
if s.stickyErr != nil {
return s.stickyErr
}
@@ -858,18 +1024,17 @@ func (s *Stmt) Close() error {
if s.tx != nil {
s.txsi.Close()
- } else {
- for _, v := range s.css {
- if ci, match := s.db.connIfFree(v.ci); match {
- v.si.Close()
- s.db.putConn(ci, nil)
- } else {
- // TODO(bradfitz): care that we can't close
- // this statement because the statement's
- // connection is in use?
- }
- }
+ return nil
+ }
+
+ return s.db.removeDep(s, s)
+}
+
+func (s *Stmt) finalClose() error {
+ for _, v := range s.css {
+ s.db.noteUnusedDriverStatement(v.ci, v.si)
}
+ s.css = nil
return nil
}
@@ -888,14 +1053,14 @@ func (s *Stmt) Close() error {
// ...
type Rows struct {
db *DB
- ci driver.Conn // owned; must call putconn when closed to release
+ ci driver.Conn // owned; must call releaseConn when closed to release
releaseConn func(error)
rowsi driver.Rows
closed bool
lastcols []driver.Value
lasterr error
- closeStmt *Stmt // if non-nil, statement to Close on close
+ closeStmt driver.Stmt // if non-nil, statement to Close on close
}
// Next prepares the next result row for reading with the Scan method.
@@ -1064,3 +1229,8 @@ type Result interface {
type result struct {
driver.Result
}
+
+func stack() string {
+ var buf [1024]byte
+ return string(buf[:runtime.Stack(buf[:], false)])
+}