1// Package stdlib is the compatibility layer from pgx to database/sql.
2//
3// A database/sql connection can be established through sql.Open.
4//
5//	db, err := sql.Open("pgx", "postgres://pgx_md5:secret@localhost:5432/pgx_test?sslmode=disable")
6//	if err != nil {
7//		return err
8//	}
9//
10// Or from a DSN string.
11//
12//	db, err := sql.Open("pgx", "user=postgres password=secret host=localhost port=5432 database=pgx_test sslmode=disable")
13//	if err != nil {
14//		return err
15//	}
16//
17// Or a pgx.ConnConfig can be used to set configuration not accessible via connection string. In this case the
18// pgx.ConnConfig must first be registered with the driver. This registration returns a connection string which is used
19// with sql.Open.
20//
21//	connConfig, _ := pgx.ParseConfig(os.Getenv("DATABASE_URL"))
22//	connConfig.Logger = myLogger
23//	connStr := stdlib.RegisterConnConfig(connConfig)
24//	db, _ := sql.Open("pgx", connStr)
25//
26// pgx uses standard PostgreSQL positional parameters in queries. e.g. $1, $2.
27// It does not support named parameters.
28//
29//	db.QueryRow("select * from users where id=$1", userID)
30//
31// In Go 1.13 and above (*sql.Conn) Raw() can be used to get a *pgx.Conn from the standard
32// database/sql.DB connection pool. This allows operations that use pgx specific functionality.
33//
34//	// Given db is a *sql.DB
35//	conn, err := db.Conn(context.Background())
36//	if err != nil {
37//		// handle error from acquiring connection from DB pool
38//	}
39//
40//	err = conn.Raw(func(driverConn interface{}) error {
41//		conn := driverConn.(*stdlib.Conn).Conn() // conn is a *pgx.Conn
42//		// Do pgx specific stuff with conn
43//		conn.CopyFrom(...)
44//		return nil
45//	})
46//	if err != nil {
47//		// handle error that occurred while using *pgx.Conn
48//	}
49package stdlib
50
51import (
52	"context"
53	"database/sql"
54	"database/sql/driver"
55	"errors"
56	"fmt"
57	"io"
58	"math"
59	"math/rand"
60	"reflect"
61	"strconv"
62	"strings"
63	"sync"
64	"time"
65
66	"github.com/jackc/pgconn"
67	"github.com/jackc/pgtype"
68	"github.com/jackc/pgx/v4"
69)
70
71// Only intrinsic types should be binary format with database/sql.
72var databaseSQLResultFormats pgx.QueryResultFormatsByOID
73
74var pgxDriver *Driver
75
76type ctxKey int
77
78var ctxKeyFakeTx ctxKey = 0
79
80var ErrNotPgx = errors.New("not pgx *sql.DB")
81
82func init() {
83	pgxDriver = &Driver{
84		configs: make(map[string]*pgx.ConnConfig),
85	}
86	fakeTxConns = make(map[*pgx.Conn]*sql.Tx)
87	sql.Register("pgx", pgxDriver)
88
89	databaseSQLResultFormats = pgx.QueryResultFormatsByOID{
90		pgtype.BoolOID:        1,
91		pgtype.ByteaOID:       1,
92		pgtype.CIDOID:         1,
93		pgtype.DateOID:        1,
94		pgtype.Float4OID:      1,
95		pgtype.Float8OID:      1,
96		pgtype.Int2OID:        1,
97		pgtype.Int4OID:        1,
98		pgtype.Int8OID:        1,
99		pgtype.OIDOID:         1,
100		pgtype.TimestampOID:   1,
101		pgtype.TimestamptzOID: 1,
102		pgtype.XIDOID:         1,
103	}
104}
105
106var (
107	fakeTxMutex sync.Mutex
108	fakeTxConns map[*pgx.Conn]*sql.Tx
109)
110
111// OptionOpenDB options for configuring the driver when opening a new db pool.
112type OptionOpenDB func(*connector)
113
114// OptionBeforeConnect provides a callback for before connect. It is passed a shallow copy of the ConnConfig that will
115// be used to connect, so only its immediate members should be modified.
116func OptionBeforeConnect(bc func(context.Context, *pgx.ConnConfig) error) OptionOpenDB {
117	return func(dc *connector) {
118		dc.BeforeConnect = bc
119	}
120}
121
122// OptionAfterConnect provides a callback for after connect.
123func OptionAfterConnect(ac func(context.Context, *pgx.Conn) error) OptionOpenDB {
124	return func(dc *connector) {
125		dc.AfterConnect = ac
126	}
127}
128
129// OptionResetSession provides a callback that can be used to add custom logic prior to executing a query on the
130// connection if the connection has been used before.
131// If ResetSessionFunc returns ErrBadConn error the connection will be discarded.
132func OptionResetSession(rs func(context.Context, *pgx.Conn) error) OptionOpenDB {
133	return func(dc *connector) {
134		dc.ResetSession = rs
135	}
136}
137
138// RandomizeHostOrderFunc is a BeforeConnect hook that randomizes the host order in the provided connConfig, so that a
139// new host becomes primary each time. This is useful to distribute connections for multi-master databases like
140// CockroachDB. If you use this you likely should set https://golang.org/pkg/database/sql/#DB.SetConnMaxLifetime as well
141// to ensure that connections are periodically rebalanced across your nodes.
142func RandomizeHostOrderFunc(ctx context.Context, connConfig *pgx.ConnConfig) error {
143	if len(connConfig.Fallbacks) == 0 {
144		return nil
145	}
146
147	newFallbacks := append([]*pgconn.FallbackConfig{&pgconn.FallbackConfig{
148		Host:      connConfig.Host,
149		Port:      connConfig.Port,
150		TLSConfig: connConfig.TLSConfig,
151	}}, connConfig.Fallbacks...)
152
153	rand.Shuffle(len(newFallbacks), func(i, j int) {
154		newFallbacks[i], newFallbacks[j] = newFallbacks[j], newFallbacks[i]
155	})
156
157	// Use the one that sorted last as the primary and keep the rest as the fallbacks
158	newPrimary := newFallbacks[len(newFallbacks)-1]
159	connConfig.Host = newPrimary.Host
160	connConfig.Port = newPrimary.Port
161	connConfig.TLSConfig = newPrimary.TLSConfig
162	connConfig.Fallbacks = newFallbacks[:len(newFallbacks)-1]
163	return nil
164}
165
166func OpenDB(config pgx.ConnConfig, opts ...OptionOpenDB) *sql.DB {
167	c := connector{
168		ConnConfig:    config,
169		BeforeConnect: func(context.Context, *pgx.ConnConfig) error { return nil }, // noop before connect by default
170		AfterConnect:  func(context.Context, *pgx.Conn) error { return nil },       // noop after connect by default
171		ResetSession:  func(context.Context, *pgx.Conn) error { return nil },       // noop reset session by default
172		driver:        pgxDriver,
173	}
174
175	for _, opt := range opts {
176		opt(&c)
177	}
178
179	return sql.OpenDB(c)
180}
181
182type connector struct {
183	pgx.ConnConfig
184	BeforeConnect func(context.Context, *pgx.ConnConfig) error // function to call before creation of every new connection
185	AfterConnect  func(context.Context, *pgx.Conn) error       // function to call after creation of every new connection
186	ResetSession  func(context.Context, *pgx.Conn) error       // function is called before a connection is reused
187	driver        *Driver
188}
189
190// Connect implement driver.Connector interface
191func (c connector) Connect(ctx context.Context) (driver.Conn, error) {
192	var (
193		err  error
194		conn *pgx.Conn
195	)
196
197	// Create a shallow copy of the config, so that BeforeConnect can safely modify it
198	connConfig := c.ConnConfig
199	if err = c.BeforeConnect(ctx, &connConfig); err != nil {
200		return nil, err
201	}
202
203	if conn, err = pgx.ConnectConfig(ctx, &connConfig); err != nil {
204		return nil, err
205	}
206
207	if err = c.AfterConnect(ctx, conn); err != nil {
208		return nil, err
209	}
210
211	return &Conn{conn: conn, driver: c.driver, connConfig: connConfig, resetSessionFunc: c.ResetSession}, nil
212}
213
214// Driver implement driver.Connector interface
215func (c connector) Driver() driver.Driver {
216	return c.driver
217}
218
219// GetDefaultDriver returns the driver initialized in the init function
220// and used when the pgx driver is registered.
221func GetDefaultDriver() driver.Driver {
222	return pgxDriver
223}
224
225type Driver struct {
226	configMutex sync.Mutex
227	configs     map[string]*pgx.ConnConfig
228	sequence    int
229}
230
231func (d *Driver) Open(name string) (driver.Conn, error) {
232	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // Ensure eventual timeout
233	defer cancel()
234
235	connector, err := d.OpenConnector(name)
236	if err != nil {
237		return nil, err
238	}
239	return connector.Connect(ctx)
240}
241
242func (d *Driver) OpenConnector(name string) (driver.Connector, error) {
243	return &driverConnector{driver: d, name: name}, nil
244}
245
246func (d *Driver) registerConnConfig(c *pgx.ConnConfig) string {
247	d.configMutex.Lock()
248	connStr := fmt.Sprintf("registeredConnConfig%d", d.sequence)
249	d.sequence++
250	d.configs[connStr] = c
251	d.configMutex.Unlock()
252	return connStr
253}
254
255func (d *Driver) unregisterConnConfig(connStr string) {
256	d.configMutex.Lock()
257	delete(d.configs, connStr)
258	d.configMutex.Unlock()
259}
260
261type driverConnector struct {
262	driver *Driver
263	name   string
264}
265
266func (dc *driverConnector) Connect(ctx context.Context) (driver.Conn, error) {
267	var connConfig *pgx.ConnConfig
268
269	dc.driver.configMutex.Lock()
270	connConfig = dc.driver.configs[dc.name]
271	dc.driver.configMutex.Unlock()
272
273	if connConfig == nil {
274		var err error
275		connConfig, err = pgx.ParseConfig(dc.name)
276		if err != nil {
277			return nil, err
278		}
279	}
280
281	conn, err := pgx.ConnectConfig(ctx, connConfig)
282	if err != nil {
283		return nil, err
284	}
285
286	c := &Conn{
287		conn:             conn,
288		driver:           dc.driver,
289		connConfig:       *connConfig,
290		resetSessionFunc: func(context.Context, *pgx.Conn) error { return nil },
291	}
292
293	return c, nil
294}
295
296func (dc *driverConnector) Driver() driver.Driver {
297	return dc.driver
298}
299
300// RegisterConnConfig registers a ConnConfig and returns the connection string to use with Open.
301func RegisterConnConfig(c *pgx.ConnConfig) string {
302	return pgxDriver.registerConnConfig(c)
303}
304
305// UnregisterConnConfig removes the ConnConfig registration for connStr.
306func UnregisterConnConfig(connStr string) {
307	pgxDriver.unregisterConnConfig(connStr)
308}
309
310type Conn struct {
311	conn             *pgx.Conn
312	psCount          int64 // Counter used for creating unique prepared statement names
313	driver           *Driver
314	connConfig       pgx.ConnConfig
315	resetSessionFunc func(context.Context, *pgx.Conn) error // Function is called before a connection is reused
316}
317
318// Conn returns the underlying *pgx.Conn
319func (c *Conn) Conn() *pgx.Conn {
320	return c.conn
321}
322
323func (c *Conn) Prepare(query string) (driver.Stmt, error) {
324	return c.PrepareContext(context.Background(), query)
325}
326
327func (c *Conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
328	if c.conn.IsClosed() {
329		return nil, driver.ErrBadConn
330	}
331
332	name := fmt.Sprintf("pgx_%d", c.psCount)
333	c.psCount++
334
335	sd, err := c.conn.Prepare(ctx, name, query)
336	if err != nil {
337		return nil, err
338	}
339
340	return &Stmt{sd: sd, conn: c}, nil
341}
342
343func (c *Conn) Close() error {
344	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
345	defer cancel()
346	return c.conn.Close(ctx)
347}
348
349func (c *Conn) Begin() (driver.Tx, error) {
350	return c.BeginTx(context.Background(), driver.TxOptions{})
351}
352
353func (c *Conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
354	if c.conn.IsClosed() {
355		return nil, driver.ErrBadConn
356	}
357
358	if pconn, ok := ctx.Value(ctxKeyFakeTx).(**pgx.Conn); ok {
359		*pconn = c.conn
360		return fakeTx{}, nil
361	}
362
363	var pgxOpts pgx.TxOptions
364	switch sql.IsolationLevel(opts.Isolation) {
365	case sql.LevelDefault:
366	case sql.LevelReadUncommitted:
367		pgxOpts.IsoLevel = pgx.ReadUncommitted
368	case sql.LevelReadCommitted:
369		pgxOpts.IsoLevel = pgx.ReadCommitted
370	case sql.LevelRepeatableRead, sql.LevelSnapshot:
371		pgxOpts.IsoLevel = pgx.RepeatableRead
372	case sql.LevelSerializable:
373		pgxOpts.IsoLevel = pgx.Serializable
374	default:
375		return nil, fmt.Errorf("unsupported isolation: %v", opts.Isolation)
376	}
377
378	if opts.ReadOnly {
379		pgxOpts.AccessMode = pgx.ReadOnly
380	}
381
382	tx, err := c.conn.BeginTx(ctx, pgxOpts)
383	if err != nil {
384		return nil, err
385	}
386
387	return wrapTx{ctx: ctx, tx: tx}, nil
388}
389
390func (c *Conn) ExecContext(ctx context.Context, query string, argsV []driver.NamedValue) (driver.Result, error) {
391	if c.conn.IsClosed() {
392		return nil, driver.ErrBadConn
393	}
394
395	args := namedValueToInterface(argsV)
396
397	commandTag, err := c.conn.Exec(ctx, query, args...)
398	// if we got a network error before we had a chance to send the query, retry
399	if err != nil {
400		if pgconn.SafeToRetry(err) {
401			return nil, driver.ErrBadConn
402		}
403	}
404	return driver.RowsAffected(commandTag.RowsAffected()), err
405}
406
407func (c *Conn) QueryContext(ctx context.Context, query string, argsV []driver.NamedValue) (driver.Rows, error) {
408	if c.conn.IsClosed() {
409		return nil, driver.ErrBadConn
410	}
411
412	args := []interface{}{databaseSQLResultFormats}
413	args = append(args, namedValueToInterface(argsV)...)
414
415	rows, err := c.conn.Query(ctx, query, args...)
416	if err != nil {
417		if pgconn.SafeToRetry(err) {
418			return nil, driver.ErrBadConn
419		}
420		return nil, err
421	}
422
423	// Preload first row because otherwise we won't know what columns are available when database/sql asks.
424	more := rows.Next()
425	if err = rows.Err(); err != nil {
426		rows.Close()
427		return nil, err
428	}
429	return &Rows{conn: c, rows: rows, skipNext: true, skipNextMore: more}, nil
430}
431
432func (c *Conn) Ping(ctx context.Context) error {
433	if c.conn.IsClosed() {
434		return driver.ErrBadConn
435	}
436
437	err := c.conn.Ping(ctx)
438	if err != nil {
439		// A Ping failure implies some sort of fatal state. The connection is almost certainly already closed by the
440		// failure, but manually close it just to be sure.
441		c.Close()
442		return driver.ErrBadConn
443	}
444
445	return nil
446}
447
448func (c *Conn) CheckNamedValue(*driver.NamedValue) error {
449	// Underlying pgx supports sql.Scanner and driver.Valuer interfaces natively. So everything can be passed through directly.
450	return nil
451}
452
453func (c *Conn) ResetSession(ctx context.Context) error {
454	if c.conn.IsClosed() {
455		return driver.ErrBadConn
456	}
457
458	return c.resetSessionFunc(ctx, c.conn)
459}
460
461type Stmt struct {
462	sd   *pgconn.StatementDescription
463	conn *Conn
464}
465
466func (s *Stmt) Close() error {
467	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
468	defer cancel()
469	return s.conn.conn.Deallocate(ctx, s.sd.Name)
470}
471
472func (s *Stmt) NumInput() int {
473	return len(s.sd.ParamOIDs)
474}
475
476func (s *Stmt) Exec(argsV []driver.Value) (driver.Result, error) {
477	return nil, errors.New("Stmt.Exec deprecated and not implemented")
478}
479
480func (s *Stmt) ExecContext(ctx context.Context, argsV []driver.NamedValue) (driver.Result, error) {
481	return s.conn.ExecContext(ctx, s.sd.Name, argsV)
482}
483
484func (s *Stmt) Query(argsV []driver.Value) (driver.Rows, error) {
485	return nil, errors.New("Stmt.Query deprecated and not implemented")
486}
487
488func (s *Stmt) QueryContext(ctx context.Context, argsV []driver.NamedValue) (driver.Rows, error) {
489	return s.conn.QueryContext(ctx, s.sd.Name, argsV)
490}
491
492type rowValueFunc func(src []byte) (driver.Value, error)
493
494type Rows struct {
495	conn         *Conn
496	rows         pgx.Rows
497	valueFuncs   []rowValueFunc
498	skipNext     bool
499	skipNextMore bool
500
501	columnNames []string
502}
503
504func (r *Rows) Columns() []string {
505	if r.columnNames == nil {
506		fields := r.rows.FieldDescriptions()
507		r.columnNames = make([]string, len(fields))
508		for i, fd := range fields {
509			r.columnNames[i] = string(fd.Name)
510		}
511	}
512
513	return r.columnNames
514}
515
516// ColumnTypeDatabaseTypeName returns the database system type name. If the name is unknown the OID is returned.
517func (r *Rows) ColumnTypeDatabaseTypeName(index int) string {
518	if dt, ok := r.conn.conn.ConnInfo().DataTypeForOID(r.rows.FieldDescriptions()[index].DataTypeOID); ok {
519		return strings.ToUpper(dt.Name)
520	}
521
522	return strconv.FormatInt(int64(r.rows.FieldDescriptions()[index].DataTypeOID), 10)
523}
524
525const varHeaderSize = 4
526
527// ColumnTypeLength returns the length of the column type if the column is a
528// variable length type. If the column is not a variable length type ok
529// should return false.
530func (r *Rows) ColumnTypeLength(index int) (int64, bool) {
531	fd := r.rows.FieldDescriptions()[index]
532
533	switch fd.DataTypeOID {
534	case pgtype.TextOID, pgtype.ByteaOID:
535		return math.MaxInt64, true
536	case pgtype.VarcharOID, pgtype.BPCharArrayOID:
537		return int64(fd.TypeModifier - varHeaderSize), true
538	default:
539		return 0, false
540	}
541}
542
543// ColumnTypePrecisionScale should return the precision and scale for decimal
544// types. If not applicable, ok should be false.
545func (r *Rows) ColumnTypePrecisionScale(index int) (precision, scale int64, ok bool) {
546	fd := r.rows.FieldDescriptions()[index]
547
548	switch fd.DataTypeOID {
549	case pgtype.NumericOID:
550		mod := fd.TypeModifier - varHeaderSize
551		precision = int64((mod >> 16) & 0xffff)
552		scale = int64(mod & 0xffff)
553		return precision, scale, true
554	default:
555		return 0, 0, false
556	}
557}
558
559// ColumnTypeScanType returns the value type that can be used to scan types into.
560func (r *Rows) ColumnTypeScanType(index int) reflect.Type {
561	fd := r.rows.FieldDescriptions()[index]
562
563	switch fd.DataTypeOID {
564	case pgtype.Float8OID:
565		return reflect.TypeOf(float64(0))
566	case pgtype.Float4OID:
567		return reflect.TypeOf(float32(0))
568	case pgtype.Int8OID:
569		return reflect.TypeOf(int64(0))
570	case pgtype.Int4OID:
571		return reflect.TypeOf(int32(0))
572	case pgtype.Int2OID:
573		return reflect.TypeOf(int16(0))
574	case pgtype.BoolOID:
575		return reflect.TypeOf(false)
576	case pgtype.NumericOID:
577		return reflect.TypeOf(float64(0))
578	case pgtype.DateOID, pgtype.TimestampOID, pgtype.TimestamptzOID:
579		return reflect.TypeOf(time.Time{})
580	case pgtype.ByteaOID:
581		return reflect.TypeOf([]byte(nil))
582	default:
583		return reflect.TypeOf("")
584	}
585}
586
587func (r *Rows) Close() error {
588	r.rows.Close()
589	return r.rows.Err()
590}
591
592func (r *Rows) Next(dest []driver.Value) error {
593	ci := r.conn.conn.ConnInfo()
594	fieldDescriptions := r.rows.FieldDescriptions()
595
596	if r.valueFuncs == nil {
597		r.valueFuncs = make([]rowValueFunc, len(fieldDescriptions))
598
599		for i, fd := range fieldDescriptions {
600			dataTypeOID := fd.DataTypeOID
601			format := fd.Format
602
603			switch fd.DataTypeOID {
604			case pgtype.BoolOID:
605				var d bool
606				scanPlan := ci.PlanScan(dataTypeOID, format, &d)
607				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
608					err := scanPlan.Scan(ci, dataTypeOID, format, src, &d)
609					return d, err
610				}
611			case pgtype.ByteaOID:
612				var d []byte
613				scanPlan := ci.PlanScan(dataTypeOID, format, &d)
614				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
615					err := scanPlan.Scan(ci, dataTypeOID, format, src, &d)
616					return d, err
617				}
618			case pgtype.CIDOID:
619				var d pgtype.CID
620				scanPlan := ci.PlanScan(dataTypeOID, format, &d)
621				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
622					err := scanPlan.Scan(ci, dataTypeOID, format, src, &d)
623					if err != nil {
624						return nil, err
625					}
626					return d.Value()
627				}
628			case pgtype.DateOID:
629				var d pgtype.Date
630				scanPlan := ci.PlanScan(dataTypeOID, format, &d)
631				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
632					err := scanPlan.Scan(ci, dataTypeOID, format, src, &d)
633					if err != nil {
634						return nil, err
635					}
636					return d.Value()
637				}
638			case pgtype.Float4OID:
639				var d float32
640				scanPlan := ci.PlanScan(dataTypeOID, format, &d)
641				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
642					err := scanPlan.Scan(ci, dataTypeOID, format, src, &d)
643					return float64(d), err
644				}
645			case pgtype.Float8OID:
646				var d float64
647				scanPlan := ci.PlanScan(dataTypeOID, format, &d)
648				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
649					err := scanPlan.Scan(ci, dataTypeOID, format, src, &d)
650					return d, err
651				}
652			case pgtype.Int2OID:
653				var d int16
654				scanPlan := ci.PlanScan(dataTypeOID, format, &d)
655				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
656					err := scanPlan.Scan(ci, dataTypeOID, format, src, &d)
657					return int64(d), err
658				}
659			case pgtype.Int4OID:
660				var d int32
661				scanPlan := ci.PlanScan(dataTypeOID, format, &d)
662				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
663					err := scanPlan.Scan(ci, dataTypeOID, format, src, &d)
664					return int64(d), err
665				}
666			case pgtype.Int8OID:
667				var d int64
668				scanPlan := ci.PlanScan(dataTypeOID, format, &d)
669				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
670					err := scanPlan.Scan(ci, dataTypeOID, format, src, &d)
671					return d, err
672				}
673			case pgtype.JSONOID:
674				var d pgtype.JSON
675				scanPlan := ci.PlanScan(dataTypeOID, format, &d)
676				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
677					err := scanPlan.Scan(ci, dataTypeOID, format, src, &d)
678					if err != nil {
679						return nil, err
680					}
681					return d.Value()
682				}
683			case pgtype.JSONBOID:
684				var d pgtype.JSONB
685				scanPlan := ci.PlanScan(dataTypeOID, format, &d)
686				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
687					err := scanPlan.Scan(ci, dataTypeOID, format, src, &d)
688					if err != nil {
689						return nil, err
690					}
691					return d.Value()
692				}
693			case pgtype.OIDOID:
694				var d pgtype.OIDValue
695				scanPlan := ci.PlanScan(dataTypeOID, format, &d)
696				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
697					err := scanPlan.Scan(ci, dataTypeOID, format, src, &d)
698					if err != nil {
699						return nil, err
700					}
701					return d.Value()
702				}
703			case pgtype.TimestampOID:
704				var d pgtype.Timestamp
705				scanPlan := ci.PlanScan(dataTypeOID, format, &d)
706				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
707					err := scanPlan.Scan(ci, dataTypeOID, format, src, &d)
708					if err != nil {
709						return nil, err
710					}
711					return d.Value()
712				}
713			case pgtype.TimestamptzOID:
714				var d pgtype.Timestamptz
715				scanPlan := ci.PlanScan(dataTypeOID, format, &d)
716				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
717					err := scanPlan.Scan(ci, dataTypeOID, format, src, &d)
718					if err != nil {
719						return nil, err
720					}
721					return d.Value()
722				}
723			case pgtype.XIDOID:
724				var d pgtype.XID
725				scanPlan := ci.PlanScan(dataTypeOID, format, &d)
726				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
727					err := scanPlan.Scan(ci, dataTypeOID, format, src, &d)
728					if err != nil {
729						return nil, err
730					}
731					return d.Value()
732				}
733			default:
734				var d string
735				scanPlan := ci.PlanScan(dataTypeOID, format, &d)
736				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
737					err := scanPlan.Scan(ci, dataTypeOID, format, src, &d)
738					return d, err
739				}
740			}
741		}
742	}
743
744	var more bool
745	if r.skipNext {
746		more = r.skipNextMore
747		r.skipNext = false
748	} else {
749		more = r.rows.Next()
750	}
751
752	if !more {
753		if r.rows.Err() == nil {
754			return io.EOF
755		} else {
756			return r.rows.Err()
757		}
758	}
759
760	for i, rv := range r.rows.RawValues() {
761		if rv != nil {
762			var err error
763			dest[i], err = r.valueFuncs[i](rv)
764			if err != nil {
765				return fmt.Errorf("convert field %d failed: %v", i, err)
766			}
767		} else {
768			dest[i] = nil
769		}
770	}
771
772	return nil
773}
774
775func valueToInterface(argsV []driver.Value) []interface{} {
776	args := make([]interface{}, 0, len(argsV))
777	for _, v := range argsV {
778		if v != nil {
779			args = append(args, v.(interface{}))
780		} else {
781			args = append(args, nil)
782		}
783	}
784	return args
785}
786
787func namedValueToInterface(argsV []driver.NamedValue) []interface{} {
788	args := make([]interface{}, 0, len(argsV))
789	for _, v := range argsV {
790		if v.Value != nil {
791			args = append(args, v.Value.(interface{}))
792		} else {
793			args = append(args, nil)
794		}
795	}
796	return args
797}
798
799type wrapTx struct {
800	ctx context.Context
801	tx  pgx.Tx
802}
803
804func (wtx wrapTx) Commit() error { return wtx.tx.Commit(wtx.ctx) }
805
806func (wtx wrapTx) Rollback() error { return wtx.tx.Rollback(wtx.ctx) }
807
808type fakeTx struct{}
809
810func (fakeTx) Commit() error { return nil }
811
812func (fakeTx) Rollback() error { return nil }
813
814// AcquireConn acquires a *pgx.Conn from database/sql connection pool. It must be released with ReleaseConn.
815//
816// In Go 1.13 this functionality has been incorporated into the standard library in the db.Conn.Raw() method.
817func AcquireConn(db *sql.DB) (*pgx.Conn, error) {
818	var conn *pgx.Conn
819	ctx := context.WithValue(context.Background(), ctxKeyFakeTx, &conn)
820	tx, err := db.BeginTx(ctx, nil)
821	if err != nil {
822		return nil, err
823	}
824	if conn == nil {
825		tx.Rollback()
826		return nil, ErrNotPgx
827	}
828
829	fakeTxMutex.Lock()
830	fakeTxConns[conn] = tx
831	fakeTxMutex.Unlock()
832
833	return conn, nil
834}
835
836// ReleaseConn releases a *pgx.Conn acquired with AcquireConn.
837func ReleaseConn(db *sql.DB, conn *pgx.Conn) error {
838	var tx *sql.Tx
839	var ok bool
840
841	if conn.PgConn().IsBusy() || conn.PgConn().TxStatus() != 'I' {
842		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
843		defer cancel()
844		conn.Close(ctx)
845	}
846
847	fakeTxMutex.Lock()
848	tx, ok = fakeTxConns[conn]
849	if ok {
850		delete(fakeTxConns, conn)
851		fakeTxMutex.Unlock()
852	} else {
853		fakeTxMutex.Unlock()
854		return fmt.Errorf("can't release conn that is not acquired")
855	}
856
857	return tx.Rollback()
858}
859