Newer
Older
return nil, err
}
Daniel Theophanes
committed
return c.db.execDC(ctx, dc, release, query, args)
}
// QueryContext executes a query that returns rows, typically a SELECT.
// The args are for any placeholder parameters in the query.
func (c *Conn) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
Daniel Theophanes
committed
dc, release, err := c.grabConn(ctx)
if err != nil {
return nil, err
}
Daniel Theophanes
committed
return c.db.queryDC(ctx, nil, dc, release, query, args)
}
// QueryRowContext executes a query that is expected to return at most one row.
// QueryRowContext always returns a non-nil value. Errors are deferred until
// Row's Scan method is called.
// If the query selects no rows, the *Row's Scan will return ErrNoRows.
// Otherwise, the *Row's Scan scans the first selected row and discards
// the rest.
func (c *Conn) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {
rows, err := c.QueryContext(ctx, query, args...)
return &Row{rows: rows, err: err}
}
// PrepareContext creates a prepared statement for later queries or executions.
// Multiple queries or executions may be run concurrently from the
// returned statement.
// The caller must call the statement's Close method
// when the statement is no longer needed.
//
// The provided context is used for the preparation of the statement, not for the
// execution of the statement.
func (c *Conn) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
Daniel Theophanes
committed
dc, release, err := c.grabConn(ctx)
if err != nil {
return nil, err
}
Daniel Theophanes
committed
return c.db.prepareDC(ctx, dc, release, c, query)
}
// Raw executes f exposing the underlying driver connection for the
// duration of f. The driverConn must not be used outside of f.
//
// Once f returns and err is not driver.ErrBadConn, the Conn will continue to be usable
// until Conn.Close is called.
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
func (c *Conn) Raw(f func(driverConn interface{}) error) (err error) {
var dc *driverConn
var release releaseConn
// grabConn takes a context to implement stmtConnGrabber, but the context is not used.
dc, release, err = c.grabConn(nil)
if err != nil {
return
}
fPanic := true
dc.Mutex.Lock()
defer func() {
dc.Mutex.Unlock()
// If f panics fPanic will remain true.
// Ensure an error is passed to release so the connection
// may be discarded.
if fPanic {
err = driver.ErrBadConn
}
release(err)
}()
err = f(dc.ci)
fPanic = false
return
}
// BeginTx starts a transaction.
//
// The provided context is used until the transaction is committed or rolled back.
// If the context is canceled, the sql package will roll back
// the transaction. Tx.Commit will return an error if the context provided to
// BeginTx is canceled.
//
// The provided TxOptions is optional and may be nil if defaults should be used.
// If a non-default isolation level is used that the driver doesn't support,
// an error will be returned.
func (c *Conn) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
Daniel Theophanes
committed
dc, release, err := c.grabConn(ctx)
if err != nil {
return nil, err
}
Daniel Theophanes
committed
return c.db.beginDC(ctx, dc, release, opts)
}
// closemuRUnlockCondReleaseConn read unlocks closemu
// as the sql operation is done with the dc.
func (c *Conn) closemuRUnlockCondReleaseConn(err error) {
c.closemu.RUnlock()
if errors.Is(err, driver.ErrBadConn) {
c.close(err)
}
}
Daniel Theophanes
committed
func (c *Conn) txCtx() context.Context {
return nil
}
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
func (c *Conn) close(err error) error {
if !atomic.CompareAndSwapInt32(&c.done, 0, 1) {
return ErrConnDone
}
// Lock around releasing the driver connection
// to ensure all queries have been stopped before doing so.
c.closemu.Lock()
defer c.closemu.Unlock()
c.dc.releaseConn(err)
c.dc = nil
c.db = nil
return err
}
// Close returns the connection to the connection pool.
// All operations after a Close will return with ErrConnDone.
// Close is safe to call concurrently with other operations and will
// block until all other operations finish. It may be useful to first
// cancel any used context and then call close directly after.
func (c *Conn) Close() error {
return c.close(nil)
}
// Tx is an in-progress database transaction.
//
// A transaction must end with a call to Commit or Rollback.
//
// After a call to Commit or Rollback, all operations on the
// transaction fail with ErrTxDone.
//
// The statements prepared for a transaction by calling
// the transaction's Prepare or Stmt methods are closed
// by the call to Commit or Rollback.
// closemu prevents the transaction from closing while there
// is an active query. It is held for read during queries
// and exclusively during close.
closemu sync.RWMutex
// dc is owned exclusively until Commit or Rollback, at which point
// it's returned with putConn.
dc *driverConn
txi driver.Tx
// releaseConn is called once the Tx is closed to release
// any held driverConn back to the pool.
releaseConn func(error)
// done transitions from 0 to 1 exactly once, on Commit
// or Rollback. once done, all operations fail with
// ErrTxDone.
// Use atomic operations on value when checking value.
done int32
Daniel Theophanes
committed
// keepConnOnRollback is true if the driver knows
// how to reset the connection's session and if need be discard
// the connection.
keepConnOnRollback bool
// All Stmts prepared for this transaction. These will be closed after the
// transaction has been committed or rolled back.
stmts struct {
sync.Mutex
v []*Stmt
}
// cancel is called after done transitions from 0 to 1.
// ctx lives for the life of the transaction.
ctx context.Context
}
// awaitDone blocks until the context in Tx is canceled and rolls back
// the transaction if it's not already done.
func (tx *Tx) awaitDone() {
// Wait for either the transaction to be committed or rolled
// back, or for the associated context to be closed.
<-tx.ctx.Done()
// Discard and close the connection used to ensure the
// transaction is closed and the resources are released. This
// rollback does nothing if the transaction has already been
// committed or rolled back.
Daniel Theophanes
committed
// Do not discard the connection if the connection knows
// how to reset the session.
discardConnection := !tx.keepConnOnRollback
tx.rollback(discardConnection)
func (tx *Tx) isDone() bool {
return atomic.LoadInt32(&tx.done) != 0
// ErrTxDone is returned by any operation that is performed on a transaction
// that has already been committed or rolled back.
var ErrTxDone = errors.New("sql: transaction has already been committed or rolled back")
// close returns the connection to the pool and
// must only be called by Tx.rollback or Tx.Commit while
// tx is already canceled and won't be executed concurrently.
func (tx *Tx) close(err error) {
tx.dc = nil
tx.txi = nil
}
// hookTxGrabConn specifies an optional hook to be called on
// a successful call to (*Tx).grabConn. For tests.
var hookTxGrabConn func()
Daniel Theophanes
committed
func (tx *Tx) grabConn(ctx context.Context) (*driverConn, releaseConn, error) {
select {
default:
case <-ctx.Done():
Daniel Theophanes
committed
return nil, nil, ctx.Err()
Daniel Theophanes
committed
// closemu.RLock must come before the check for isDone to prevent the Tx from
Daniel Theophanes
committed
// closing while a query is executing.
tx.closemu.RLock()
if tx.isDone() {
Daniel Theophanes
committed
tx.closemu.RUnlock()
return nil, nil, ErrTxDone
if hookTxGrabConn != nil { // test hook
hookTxGrabConn()
}
Daniel Theophanes
committed
return tx.dc, tx.closemuRUnlockRelease, nil
}
func (tx *Tx) txCtx() context.Context {
return tx.ctx
// closemuRUnlockRelease is used as a func(error) method value in
// ExecContext and QueryContext. Unlocking in the releaseConn keeps
// the driver conn from being returned to the connection pool until
// the Rows has been closed.
func (tx *Tx) closemuRUnlockRelease(error) {
tx.closemu.RUnlock()
}
// Closes all Stmts prepared for this transaction.
func (tx *Tx) closePrepared() {
tx.stmts.Lock()
defer tx.stmts.Unlock()
for _, stmt := range tx.stmts.v {
stmt.Close()
}
}
// Commit commits the transaction.
// Check context first to avoid transaction leak.
// If put it behind tx.done CompareAndSwap statement, we can't ensure
// the consistency between tx.done and the real COMMIT operation.
select {
default:
case <-tx.ctx.Done():
if atomic.LoadInt32(&tx.done) == 1 {
return ErrTxDone
}
return tx.ctx.Err()
}
if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
return ErrTxDone
}
// Cancel the Tx to release any active R-closemu locks.
// This is safe to do because tx.done has already transitioned
// from 0 to 1. Hold the W-closemu lock prior to rollback
// to ensure no other connection has an active query.
tx.cancel()
tx.closemu.Lock()
var err error
withLock(tx.dc, func() {
err = tx.txi.Commit()
})
if !errors.Is(err, driver.ErrBadConn) {
tx.closePrepared()
}
return err
var rollbackHook func()
// rollback aborts the transaction and optionally forces the pool to discard
// the connection.
func (tx *Tx) rollback(discardConn bool) error {
if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
return ErrTxDone
if rollbackHook != nil {
rollbackHook()
}
// Cancel the Tx to release any active R-closemu locks.
// This is safe to do because tx.done has already transitioned
// from 0 to 1. Hold the W-closemu lock prior to rollback
// to ensure no other connection has an active query.
tx.cancel()
tx.closemu.Lock()
var err error
withLock(tx.dc, func() {
err = tx.txi.Rollback()
})
if !errors.Is(err, driver.ErrBadConn) {
tx.closePrepared()
}
if discardConn {
err = driver.ErrBadConn
}
return err
// Rollback aborts the transaction.
func (tx *Tx) Rollback() error {
return tx.rollback(false)
}
// PrepareContext creates a prepared statement for use within a transaction.
// The returned statement operates within the transaction and will be closed
// when the transaction has been committed or rolled back.
//
// To use an existing prepared statement on this transaction, see Tx.Stmt.
//
// The provided context will be used for the preparation of the context, not
// for the execution of the returned statement. The returned statement
// will run in the transaction context.
func (tx *Tx) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
Daniel Theophanes
committed
dc, release, err := tx.grabConn(ctx)
if err != nil {
return nil, err
}
Daniel Theophanes
committed
stmt, err := tx.db.prepareDC(ctx, dc, release, tx, query)
if err != nil {
return nil, err
}
tx.stmts.Lock()
tx.stmts.v = append(tx.stmts.v, stmt)
tx.stmts.Unlock()
return stmt, nil
// Prepare creates a prepared statement for use within a transaction.
//
// The returned statement operates within the transaction and will be closed
// when the transaction has been committed or rolled back.
//
// To use an existing prepared statement on this transaction, see Tx.Stmt.
//
// Prepare uses context.Background internally; to specify the context, use
// PrepareContext.
func (tx *Tx) Prepare(query string) (*Stmt, error) {
return tx.PrepareContext(context.Background(), query)
}
// StmtContext returns a transaction-specific prepared statement from
// an existing statement.
//
// Example:
// updateMoney, err := db.Prepare("UPDATE balance SET money=money+? WHERE id=?")
// ...
// tx, err := db.Begin()
// ...
// res, err := tx.StmtContext(ctx, updateMoney).Exec(123.45, 98293203)
// The provided context is used for the preparation of the statement, not for the
// execution of the statement.
//
// The returned statement operates within the transaction and will be closed
// when the transaction has been committed or rolled back.
func (tx *Tx) StmtContext(ctx context.Context, stmt *Stmt) *Stmt {
Daniel Theophanes
committed
dc, release, err := tx.grabConn(ctx)
if err != nil {
return &Stmt{stickyErr: err}
}
defer release(nil)
if tx.db != stmt.db {
return &Stmt{stickyErr: errors.New("sql: Tx.Stmt: statement from different database used")}
}
var si driver.Stmt
var parentStmt *Stmt
stmt.mu.Lock()
Daniel Theophanes
committed
if stmt.closed || stmt.cg != nil {
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
// If the statement has been closed or already belongs to a
// transaction, we can't reuse it in this connection.
// Since tx.StmtContext should never need to be called with a
// Stmt already belonging to tx, we ignore this edge case and
// re-prepare the statement in this case. No need to add
// code-complexity for this.
stmt.mu.Unlock()
withLock(dc, func() {
si, err = ctxDriverPrepare(ctx, dc.ci, stmt.query)
})
if err != nil {
return &Stmt{stickyErr: err}
}
} else {
stmt.removeClosedStmtLocked()
// See if the statement has already been prepared on this connection,
// and reuse it if possible.
for _, v := range stmt.css {
if v.dc == dc {
si = v.ds.si
break
}
}
stmt.mu.Unlock()
if si == nil {
var ds *driverStmt
withLock(dc, func() {
ds, err = stmt.prepareOnConnLocked(ctx, dc)
})
if err != nil {
return &Stmt{stickyErr: err}
}
}
parentStmt = stmt
}
txs := &Stmt{
db: tx.db,
Daniel Theophanes
committed
cg: tx,
cgds: &driverStmt{
Locker: dc,
si: si,
},
parentStmt: parentStmt,
query: stmt.query,
}
if parentStmt != nil {
tx.db.addDep(parentStmt, txs)
}
tx.stmts.Lock()
tx.stmts.v = append(tx.stmts.v, txs)
tx.stmts.Unlock()
return txs
}
// Stmt returns a transaction-specific prepared statement from
// an existing statement.
//
// Example:
// updateMoney, err := db.Prepare("UPDATE balance SET money=money+? WHERE id=?")
// ...
// tx, err := db.Begin()
// ...
// res, err := tx.Stmt(updateMoney).Exec(123.45, 98293203)
//
// The returned statement operates within the transaction and will be closed
// when the transaction has been committed or rolled back.
//
// Stmt uses context.Background internally; to specify the context, use
// StmtContext.
func (tx *Tx) Stmt(stmt *Stmt) *Stmt {
return tx.StmtContext(context.Background(), stmt)
}
// ExecContext executes a query that doesn't return rows.
// For example: an INSERT and UPDATE.
func (tx *Tx) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
Daniel Theophanes
committed
dc, release, err := tx.grabConn(ctx)
if err != nil {
return nil, err
}
Daniel Theophanes
committed
return tx.db.execDC(ctx, dc, release, query, args)
// Exec executes a query that doesn't return rows.
// For example: an INSERT and UPDATE.
//
// Exec uses context.Background internally; to specify the context, use
// ExecContext.
func (tx *Tx) Exec(query string, args ...interface{}) (Result, error) {
return tx.ExecContext(context.Background(), query, args...)
}
// QueryContext executes a query that returns rows, typically a SELECT.
func (tx *Tx) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
Daniel Theophanes
committed
dc, release, err := tx.grabConn(ctx)
if err != nil {
return nil, err
Daniel Theophanes
committed
return tx.db.queryDC(ctx, tx.ctx, dc, release, query, args)
}
// Query executes a query that returns rows, typically a SELECT.
//
// Query uses context.Background internally; to specify the context, use
// QueryContext.
func (tx *Tx) Query(query string, args ...interface{}) (*Rows, error) {
return tx.QueryContext(context.Background(), query, args...)
}
// QueryRowContext executes a query that is expected to return at most one row.
// QueryRowContext always returns a non-nil value. Errors are deferred until
// Row's Scan method is called.
// If the query selects no rows, the *Row's Scan will return ErrNoRows.
// Otherwise, the *Row's Scan scans the first selected row and discards
// the rest.
func (tx *Tx) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {
rows, err := tx.QueryContext(ctx, query, args...)
return &Row{rows: rows, err: err}
}
// QueryRow executes a query that is expected to return at most one row.
// QueryRow always returns a non-nil value. Errors are deferred until
// Row's Scan method is called.
// If the query selects no rows, the *Row's Scan will return ErrNoRows.
// Otherwise, the *Row's Scan scans the first selected row and discards
// the rest.
//
// QueryRow uses context.Background internally; to specify the context, use
// QueryRowContext.
func (tx *Tx) QueryRow(query string, args ...interface{}) *Row {
return tx.QueryRowContext(context.Background(), query, args...)
}
// connStmt is a prepared statement on a particular connection.
type connStmt struct {
dc *driverConn
ds *driverStmt
Daniel Theophanes
committed
// stmtConnGrabber represents a Tx or Conn that will return the underlying
// driverConn and release function.
type stmtConnGrabber interface {
// grabConn returns the driverConn and the associated release function
// that must be called when the operation completes.
grabConn(context.Context) (*driverConn, releaseConn, error)
// txCtx returns the transaction context if available.
// The returned context should be selected on along with
// any query context when awaiting a cancel.
txCtx() context.Context
}
var (
_ stmtConnGrabber = &Tx{}
_ stmtConnGrabber = &Conn{}
)
// Stmt is a prepared statement.
// A Stmt is safe for concurrent use by multiple goroutines.
//
// If a Stmt is prepared on a Tx or Conn, it will be bound to a single
// underlying connection forever. If the Tx or Conn closes, the Stmt will
// become unusable and all operations will return an error.
// If a Stmt is prepared on a DB, it will remain usable for the lifetime of the
// DB. When the Stmt needs to execute on a new underlying connection, it will
// prepare itself on the new connection automatically.
type Stmt struct {
// Immutable:
db *DB // where we came from
query string // that created the Stmt
stickyErr error // if non-nil, this error is returned for all operations
closemu sync.RWMutex // held exclusively during close, for read otherwise.
Daniel Theophanes
committed
// If Stmt is prepared on a Tx or Conn then cg is present and will
// only ever grab a connection from cg.
// If cg is nil then the Stmt must grab an arbitrary connection
// from db and determine if it must prepare the stmt again by
// inspecting css.
cg stmtConnGrabber
cgds *driverStmt
// parentStmt is set when a transaction-specific statement
// is requested from an identical statement prepared on the same
// conn. parentStmt is used to track the dependency of this statement
// on its originating ("parent") statement so that parentStmt may
// be closed by the user without them having to know whether or not
// any transactions are still using it.
parentStmt *Stmt
mu sync.Mutex // protects the rest of the fields
// css is a list of underlying driver statement interfaces
// that are valid on particular connections. This is only
Daniel Theophanes
committed
// used if cg == nil and one is found that has idle
// connections. If cg != nil, cgds is always used.
// lastNumClosed is copied from db.numClosed when Stmt is created
// without tx and closed connections in css are removed.
lastNumClosed uint64
// ExecContext executes a prepared statement with the given arguments and
// returns a Result summarizing the effect of the statement.
func (s *Stmt) ExecContext(ctx context.Context, args ...interface{}) (Result, error) {
s.closemu.RLock()
defer s.closemu.RUnlock()
var res Result
strategy := cachedOrNewConn
for i := 0; i < maxBadConnRetries+1; i++ {
if i == maxBadConnRetries {
strategy = alwaysNewConn
}
dc, releaseConn, ds, err := s.connStmt(ctx, strategy)
if err != nil {
if errors.Is(err, driver.ErrBadConn) {
continue
}
return nil, err
}
res, err = resultFromStatement(ctx, dc.ci, ds, args...)
releaseConn(err)
if !errors.Is(err, driver.ErrBadConn) {
return res, err
}
}
return nil, driver.ErrBadConn
// Exec executes a prepared statement with the given arguments and
// returns a Result summarizing the effect of the statement.
//
// Exec uses context.Background internally; to specify the context, use
// ExecContext.
func (s *Stmt) Exec(args ...interface{}) (Result, error) {
return s.ExecContext(context.Background(), args...)
}
func resultFromStatement(ctx context.Context, ci driver.Conn, ds *driverStmt, args ...interface{}) (Result, error) {
ds.Lock()
defer ds.Unlock()
dargs, err := driverArgsConnLocked(ci, ds, args)
if err != nil {
return nil, err
resi, err := ctxDriverStmtExec(ctx, ds.si, dargs)
if err != nil {
return nil, err
}
return driverResult{ds.Locker, resi}, nil
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
// removeClosedStmtLocked removes closed conns in s.css.
//
// To avoid lock contention on DB.mu, we do it only when
// s.db.numClosed - s.lastNum is large enough.
func (s *Stmt) removeClosedStmtLocked() {
t := len(s.css)/2 + 1
if t > 10 {
t = 10
}
dbClosed := atomic.LoadUint64(&s.db.numClosed)
if dbClosed-s.lastNumClosed < uint64(t) {
return
}
s.db.mu.Lock()
for i := 0; i < len(s.css); i++ {
if s.css[i].dc.dbmuClosed {
s.css[i] = s.css[len(s.css)-1]
s.css = s.css[:len(s.css)-1]
i--
}
}
s.db.mu.Unlock()
s.lastNumClosed = dbClosed
}
// connStmt returns a free driver connection on which to execute the
// statement, a function to call to release the connection, and a
// statement bound to that connection.
Daniel Theophanes
committed
func (s *Stmt) connStmt(ctx context.Context, strategy connReuseStrategy) (dc *driverConn, releaseConn func(error), ds *driverStmt, err error) {
if err = s.stickyErr; err != nil {
return
}
s.mu.Lock()
if s.closed {
Daniel Theophanes
committed
// In a transaction or connection, we always use the connection that the
Daniel Theophanes
committed
if s.cg != nil {
Daniel Theophanes
committed
dc, releaseConn, err = s.cg.grabConn(ctx) // blocks, waiting for the connection.
if err != nil {
return
}
Daniel Theophanes
committed
return dc, releaseConn, s.cgds, nil
s.removeClosedStmtLocked()
Daniel Theophanes
committed
dc, err = s.db.conn(ctx, strategy)
if err != nil {
return nil, nil, nil, err
}
s.mu.Lock()
for _, v := range s.css {
if v.dc == dc {
s.mu.Unlock()
return dc, dc.releaseConn, v.ds, nil
s.mu.Unlock()
// No luck; we need to prepare the statement on this connection
withLock(dc, func() {
ds, err = s.prepareOnConnLocked(ctx, dc)
if err != nil {
return nil, nil, nil, err
}
return dc, dc.releaseConn, ds, nil
}
// prepareOnConnLocked prepares the query in Stmt s on dc and adds it to the list of
// open connStmt on the statement. It assumes the caller is holding the lock on dc.
func (s *Stmt) prepareOnConnLocked(ctx context.Context, dc *driverConn) (*driverStmt, error) {
Daniel Theophanes
committed
si, err := dc.prepareLocked(ctx, s.cg, s.query)
if err != nil {
return nil, err
}
cs := connStmt{dc, si}
s.mu.Lock()
s.css = append(s.css, cs)
s.mu.Unlock()
return cs.ds, nil
// QueryContext executes a prepared query statement with the given arguments
// and returns the query results as a *Rows.
func (s *Stmt) QueryContext(ctx context.Context, args ...interface{}) (*Rows, error) {
s.closemu.RLock()
defer s.closemu.RUnlock()
var rowsi driver.Rows
strategy := cachedOrNewConn
for i := 0; i < maxBadConnRetries+1; i++ {
if i == maxBadConnRetries {
strategy = alwaysNewConn
}
dc, releaseConn, ds, err := s.connStmt(ctx, strategy)
if err != nil {
if errors.Is(err, driver.ErrBadConn) {
continue
}
return nil, err
}
rowsi, err = rowsiFromStatement(ctx, dc.ci, ds, args...)
if err == nil {
// Note: ownership of ci passes to the *Rows, to be freed
// with releaseConn.
rows := &Rows{
dc: dc,
rowsi: rowsi,
// releaseConn set below
}
// addDep must be added before initContextClose or it could attempt
// to removeDep before it has been added.
s.db.addDep(s, rows)
// releaseConn must be set before initContextClose or it could
// release the connection before it is set.
rows.releaseConn = func(err error) {
releaseConn(err)
s.db.removeDep(s, rows)
}
var txctx context.Context
Daniel Theophanes
committed
if s.cg != nil {
txctx = s.cg.txCtx()
}
rows.initContextClose(ctx, txctx)
return rows, nil
}
if !errors.Is(err, driver.ErrBadConn) {
return nil, err
}
return nil, driver.ErrBadConn
// Query executes a prepared query statement with the given arguments
// and returns the query results as a *Rows.
//
// Query uses context.Background internally; to specify the context, use
// QueryContext.
func (s *Stmt) Query(args ...interface{}) (*Rows, error) {
return s.QueryContext(context.Background(), args...)
}
func rowsiFromStatement(ctx context.Context, ci driver.Conn, ds *driverStmt, args ...interface{}) (driver.Rows, error) {
ds.Lock()
defer ds.Unlock()
dargs, err := driverArgsConnLocked(ci, ds, args)
if err != nil {
return nil, err
}
return ctxDriverStmtQuery(ctx, ds.si, dargs)
// QueryRowContext executes a prepared query statement with the given arguments.
// If an error occurs during the execution of the statement, that error will
// be returned by a call to Scan on the returned *Row, which is always non-nil.
// If the query selects no rows, the *Row's Scan will return ErrNoRows.
// Otherwise, the *Row's Scan scans the first selected row and discards
// the rest.
func (s *Stmt) QueryRowContext(ctx context.Context, args ...interface{}) *Row {
rows, err := s.QueryContext(ctx, args...)
if err != nil {
return &Row{err: err}
}
return &Row{rows: rows}
}
// QueryRow executes a prepared query statement with the given arguments.
// If an error occurs during the execution of the statement, that error will
// be returned by a call to Scan on the returned *Row, which is always non-nil.
// If the query selects no rows, the *Row's Scan will return ErrNoRows.
// Otherwise, the *Row's Scan scans the first selected row and discards
// the rest.
//
// Example usage:
//
// var name string
// err := nameByUseridStmt.QueryRow(id).Scan(&name)
//
// QueryRow uses context.Background internally; to specify the context, use
// QueryRowContext.
func (s *Stmt) QueryRow(args ...interface{}) *Row {
return s.QueryRowContext(context.Background(), args...)
}
// Close closes the statement.
s.closemu.Lock()
defer s.closemu.Unlock()
if s.stickyErr != nil {
return s.stickyErr
}
s.mu.Lock()
if s.closed {
return nil
}
s.closed = true
Daniel Theophanes
committed
txds := s.cgds
s.cgds = nil
Daniel Theophanes
committed
if s.cg == nil {
return s.db.removeDep(s, s)
if s.parentStmt != nil {
// If parentStmt is set, we must not close s.txds since it's stored
// in the css array of the parentStmt.
return s.db.removeDep(s.parentStmt, s)
}
Daniel Theophanes
committed
return txds.Close()
}
func (s *Stmt) finalClose() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.css != nil {
for _, v := range s.css {
s.db.noteUnusedDriverStatement(v.dc, v.ds)
v.dc.removeOpenStmt(v.ds)
}
return nil
}
// Rows is the result of a query. Its cursor starts before the first row
// of the result set. Use Next to advance from row to row.
dc *driverConn // owned; must call releaseConn when closed to release
releaseConn func(error)
rowsi driver.Rows
cancel func() // called when Rows is closed, may be nil.
closeStmt *driverStmt // if non-nil, statement to Close on close
// closemu prevents Rows from closing while there
// is an active streaming result. It is held for read during non-close operations
// and exclusively during close.
//
// closemu guards lasterr and closed.
closemu sync.RWMutex
closed bool
lasterr error // non-nil only if closed is true
// lastcols is only used in Scan, Next, and NextResultSet which are expected
lastcols []driver.Value
// lasterrOrErrLocked returns either lasterr or the provided err.
// rs.closemu must be read-locked.
func (rs *Rows) lasterrOrErrLocked(err error) error {
if rs.lasterr != nil && rs.lasterr != io.EOF {
return rs.lasterr
}
return err
}
// bypassRowsAwaitDone is only used for testing.
// If true, it will not close the Rows automatically from the context.
var bypassRowsAwaitDone = false
func (rs *Rows) initContextClose(ctx, txctx context.Context) {
if ctx.Done() == nil && (txctx == nil || txctx.Done() == nil) {
return
}
if bypassRowsAwaitDone {
return
}
ctx, rs.cancel = context.WithCancel(ctx)
go rs.awaitDone(ctx, txctx)
// awaitDone blocks until either ctx or txctx is canceled. The ctx is provided
// from the query context and is canceled when the query Rows is closed.
// If the query was issued in a transaction, the transaction's context
// is also provided in txctx to ensure Rows is closed if the Tx is closed.
func (rs *Rows) awaitDone(ctx, txctx context.Context) {
var txctxDone <-chan struct{}
if txctx != nil {
txctxDone = txctx.Done()
}
select {
case <-ctx.Done():
case <-txctxDone:
}
rs.close(ctx.Err())
// Next prepares the next result row for reading with the Scan method. It
// returns true on success, or false if there is no next result row or an error
// happened while preparing it. Err should be consulted to distinguish between
// the two cases.
//
// Every call to Scan, even the first one, must be preceded by a call to Next.
func (rs *Rows) Next() bool {
var doClose, ok bool
withLock(rs.closemu.RLocker(), func() {
doClose, ok = rs.nextLocked()
})
if doClose {
rs.Close()
}
return ok
}
func (rs *Rows) nextLocked() (doClose, ok bool) {
if rs.closed {
return false, false
// Lock the driver connection before calling the driver interface
// rowsi to prevent a Tx from rolling back the connection at the same time.