1// Package stdlib is the compatibility layer from pgx to database/sql. 2// 3// A database/sql connection can be established through sql.Open. 4// 5// db, err := sql.Open("pgx", "postgres://pgx_md5:secret@localhost:5432/pgx_test?sslmode=disable") 6// if err != nil { 7// return err 8// } 9// 10// Or from a DSN string. 11// 12// db, err := sql.Open("pgx", "user=postgres password=secret host=localhost port=5432 database=pgx_test sslmode=disable") 13// if err != nil { 14// return err 15// } 16// 17// Or a pgx.ConnConfig can be used to set configuration not accessible via connection string. In this case the 18// pgx.ConnConfig must first be registered with the driver. This registration returns a connection string which is used 19// with sql.Open. 20// 21// connConfig, _ := pgx.ParseConfig(os.Getenv("DATABASE_URL")) 22// connConfig.Logger = myLogger 23// connStr := stdlib.RegisterConnConfig(connConfig) 24// db, _ := sql.Open("pgx", connStr) 25// 26// pgx uses standard PostgreSQL positional parameters in queries. e.g. $1, $2. 27// It does not support named parameters. 28// 29// db.QueryRow("select * from users where id=$1", userID) 30// 31// In Go 1.13 and above (*sql.Conn) Raw() can be used to get a *pgx.Conn from the standard 32// database/sql.DB connection pool. This allows operations that use pgx specific functionality. 33// 34// // Given db is a *sql.DB 35// conn, err := db.Conn(context.Background()) 36// if err != nil { 37// // handle error from acquiring connection from DB pool 38// } 39// 40// err = conn.Raw(func(driverConn interface{}) error { 41// conn := driverConn.(*stdlib.Conn).Conn() // conn is a *pgx.Conn 42// // Do pgx specific stuff with conn 43// conn.CopyFrom(...) 44// return nil 45// }) 46// if err != nil { 47// // handle error that occurred while using *pgx.Conn 48// } 49package stdlib 50 51import ( 52 "context" 53 "database/sql" 54 "database/sql/driver" 55 "errors" 56 "fmt" 57 "io" 58 "math" 59 "math/rand" 60 "reflect" 61 "strconv" 62 "strings" 63 "sync" 64 "time" 65 66 "github.com/jackc/pgconn" 67 "github.com/jackc/pgtype" 68 "github.com/jackc/pgx/v4" 69) 70 71// Only intrinsic types should be binary format with database/sql. 72var databaseSQLResultFormats pgx.QueryResultFormatsByOID 73 74var pgxDriver *Driver 75 76type ctxKey int 77 78var ctxKeyFakeTx ctxKey = 0 79 80var ErrNotPgx = errors.New("not pgx *sql.DB") 81 82func init() { 83 pgxDriver = &Driver{ 84 configs: make(map[string]*pgx.ConnConfig), 85 } 86 fakeTxConns = make(map[*pgx.Conn]*sql.Tx) 87 sql.Register("pgx", pgxDriver) 88 89 databaseSQLResultFormats = pgx.QueryResultFormatsByOID{ 90 pgtype.BoolOID: 1, 91 pgtype.ByteaOID: 1, 92 pgtype.CIDOID: 1, 93 pgtype.DateOID: 1, 94 pgtype.Float4OID: 1, 95 pgtype.Float8OID: 1, 96 pgtype.Int2OID: 1, 97 pgtype.Int4OID: 1, 98 pgtype.Int8OID: 1, 99 pgtype.OIDOID: 1, 100 pgtype.TimestampOID: 1, 101 pgtype.TimestamptzOID: 1, 102 pgtype.XIDOID: 1, 103 } 104} 105 106var ( 107 fakeTxMutex sync.Mutex 108 fakeTxConns map[*pgx.Conn]*sql.Tx 109) 110 111// OptionOpenDB options for configuring the driver when opening a new db pool. 112type OptionOpenDB func(*connector) 113 114// OptionBeforeConnect provides a callback for before connect. It is passed a shallow copy of the ConnConfig that will 115// be used to connect, so only its immediate members should be modified. 116func OptionBeforeConnect(bc func(context.Context, *pgx.ConnConfig) error) OptionOpenDB { 117 return func(dc *connector) { 118 dc.BeforeConnect = bc 119 } 120} 121 122// OptionAfterConnect provides a callback for after connect. 123func OptionAfterConnect(ac func(context.Context, *pgx.Conn) error) OptionOpenDB { 124 return func(dc *connector) { 125 dc.AfterConnect = ac 126 } 127} 128 129// OptionResetSession provides a callback that can be used to add custom logic prior to executing a query on the 130// connection if the connection has been used before. 131// If ResetSessionFunc returns ErrBadConn error the connection will be discarded. 132func OptionResetSession(rs func(context.Context, *pgx.Conn) error) OptionOpenDB { 133 return func(dc *connector) { 134 dc.ResetSession = rs 135 } 136} 137 138// RandomizeHostOrderFunc is a BeforeConnect hook that randomizes the host order in the provided connConfig, so that a 139// new host becomes primary each time. This is useful to distribute connections for multi-master databases like 140// CockroachDB. If you use this you likely should set https://golang.org/pkg/database/sql/#DB.SetConnMaxLifetime as well 141// to ensure that connections are periodically rebalanced across your nodes. 142func RandomizeHostOrderFunc(ctx context.Context, connConfig *pgx.ConnConfig) error { 143 if len(connConfig.Fallbacks) == 0 { 144 return nil 145 } 146 147 newFallbacks := append([]*pgconn.FallbackConfig{&pgconn.FallbackConfig{ 148 Host: connConfig.Host, 149 Port: connConfig.Port, 150 TLSConfig: connConfig.TLSConfig, 151 }}, connConfig.Fallbacks...) 152 153 rand.Shuffle(len(newFallbacks), func(i, j int) { 154 newFallbacks[i], newFallbacks[j] = newFallbacks[j], newFallbacks[i] 155 }) 156 157 // Use the one that sorted last as the primary and keep the rest as the fallbacks 158 newPrimary := newFallbacks[len(newFallbacks)-1] 159 connConfig.Host = newPrimary.Host 160 connConfig.Port = newPrimary.Port 161 connConfig.TLSConfig = newPrimary.TLSConfig 162 connConfig.Fallbacks = newFallbacks[:len(newFallbacks)-1] 163 return nil 164} 165 166func OpenDB(config pgx.ConnConfig, opts ...OptionOpenDB) *sql.DB { 167 c := connector{ 168 ConnConfig: config, 169 BeforeConnect: func(context.Context, *pgx.ConnConfig) error { return nil }, // noop before connect by default 170 AfterConnect: func(context.Context, *pgx.Conn) error { return nil }, // noop after connect by default 171 ResetSession: func(context.Context, *pgx.Conn) error { return nil }, // noop reset session by default 172 driver: pgxDriver, 173 } 174 175 for _, opt := range opts { 176 opt(&c) 177 } 178 179 return sql.OpenDB(c) 180} 181 182type connector struct { 183 pgx.ConnConfig 184 BeforeConnect func(context.Context, *pgx.ConnConfig) error // function to call before creation of every new connection 185 AfterConnect func(context.Context, *pgx.Conn) error // function to call after creation of every new connection 186 ResetSession func(context.Context, *pgx.Conn) error // function is called before a connection is reused 187 driver *Driver 188} 189 190// Connect implement driver.Connector interface 191func (c connector) Connect(ctx context.Context) (driver.Conn, error) { 192 var ( 193 err error 194 conn *pgx.Conn 195 ) 196 197 // Create a shallow copy of the config, so that BeforeConnect can safely modify it 198 connConfig := c.ConnConfig 199 if err = c.BeforeConnect(ctx, &connConfig); err != nil { 200 return nil, err 201 } 202 203 if conn, err = pgx.ConnectConfig(ctx, &connConfig); err != nil { 204 return nil, err 205 } 206 207 if err = c.AfterConnect(ctx, conn); err != nil { 208 return nil, err 209 } 210 211 return &Conn{conn: conn, driver: c.driver, connConfig: connConfig, resetSessionFunc: c.ResetSession}, nil 212} 213 214// Driver implement driver.Connector interface 215func (c connector) Driver() driver.Driver { 216 return c.driver 217} 218 219// GetDefaultDriver returns the driver initialized in the init function 220// and used when the pgx driver is registered. 221func GetDefaultDriver() driver.Driver { 222 return pgxDriver 223} 224 225type Driver struct { 226 configMutex sync.Mutex 227 configs map[string]*pgx.ConnConfig 228 sequence int 229} 230 231func (d *Driver) Open(name string) (driver.Conn, error) { 232 ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // Ensure eventual timeout 233 defer cancel() 234 235 connector, err := d.OpenConnector(name) 236 if err != nil { 237 return nil, err 238 } 239 return connector.Connect(ctx) 240} 241 242func (d *Driver) OpenConnector(name string) (driver.Connector, error) { 243 return &driverConnector{driver: d, name: name}, nil 244} 245 246func (d *Driver) registerConnConfig(c *pgx.ConnConfig) string { 247 d.configMutex.Lock() 248 connStr := fmt.Sprintf("registeredConnConfig%d", d.sequence) 249 d.sequence++ 250 d.configs[connStr] = c 251 d.configMutex.Unlock() 252 return connStr 253} 254 255func (d *Driver) unregisterConnConfig(connStr string) { 256 d.configMutex.Lock() 257 delete(d.configs, connStr) 258 d.configMutex.Unlock() 259} 260 261type driverConnector struct { 262 driver *Driver 263 name string 264} 265 266func (dc *driverConnector) Connect(ctx context.Context) (driver.Conn, error) { 267 var connConfig *pgx.ConnConfig 268 269 dc.driver.configMutex.Lock() 270 connConfig = dc.driver.configs[dc.name] 271 dc.driver.configMutex.Unlock() 272 273 if connConfig == nil { 274 var err error 275 connConfig, err = pgx.ParseConfig(dc.name) 276 if err != nil { 277 return nil, err 278 } 279 } 280 281 conn, err := pgx.ConnectConfig(ctx, connConfig) 282 if err != nil { 283 return nil, err 284 } 285 286 c := &Conn{ 287 conn: conn, 288 driver: dc.driver, 289 connConfig: *connConfig, 290 resetSessionFunc: func(context.Context, *pgx.Conn) error { return nil }, 291 } 292 293 return c, nil 294} 295 296func (dc *driverConnector) Driver() driver.Driver { 297 return dc.driver 298} 299 300// RegisterConnConfig registers a ConnConfig and returns the connection string to use with Open. 301func RegisterConnConfig(c *pgx.ConnConfig) string { 302 return pgxDriver.registerConnConfig(c) 303} 304 305// UnregisterConnConfig removes the ConnConfig registration for connStr. 306func UnregisterConnConfig(connStr string) { 307 pgxDriver.unregisterConnConfig(connStr) 308} 309 310type Conn struct { 311 conn *pgx.Conn 312 psCount int64 // Counter used for creating unique prepared statement names 313 driver *Driver 314 connConfig pgx.ConnConfig 315 resetSessionFunc func(context.Context, *pgx.Conn) error // Function is called before a connection is reused 316} 317 318// Conn returns the underlying *pgx.Conn 319func (c *Conn) Conn() *pgx.Conn { 320 return c.conn 321} 322 323func (c *Conn) Prepare(query string) (driver.Stmt, error) { 324 return c.PrepareContext(context.Background(), query) 325} 326 327func (c *Conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) { 328 if c.conn.IsClosed() { 329 return nil, driver.ErrBadConn 330 } 331 332 name := fmt.Sprintf("pgx_%d", c.psCount) 333 c.psCount++ 334 335 sd, err := c.conn.Prepare(ctx, name, query) 336 if err != nil { 337 return nil, err 338 } 339 340 return &Stmt{sd: sd, conn: c}, nil 341} 342 343func (c *Conn) Close() error { 344 ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) 345 defer cancel() 346 return c.conn.Close(ctx) 347} 348 349func (c *Conn) Begin() (driver.Tx, error) { 350 return c.BeginTx(context.Background(), driver.TxOptions{}) 351} 352 353func (c *Conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) { 354 if c.conn.IsClosed() { 355 return nil, driver.ErrBadConn 356 } 357 358 if pconn, ok := ctx.Value(ctxKeyFakeTx).(**pgx.Conn); ok { 359 *pconn = c.conn 360 return fakeTx{}, nil 361 } 362 363 var pgxOpts pgx.TxOptions 364 switch sql.IsolationLevel(opts.Isolation) { 365 case sql.LevelDefault: 366 case sql.LevelReadUncommitted: 367 pgxOpts.IsoLevel = pgx.ReadUncommitted 368 case sql.LevelReadCommitted: 369 pgxOpts.IsoLevel = pgx.ReadCommitted 370 case sql.LevelRepeatableRead, sql.LevelSnapshot: 371 pgxOpts.IsoLevel = pgx.RepeatableRead 372 case sql.LevelSerializable: 373 pgxOpts.IsoLevel = pgx.Serializable 374 default: 375 return nil, fmt.Errorf("unsupported isolation: %v", opts.Isolation) 376 } 377 378 if opts.ReadOnly { 379 pgxOpts.AccessMode = pgx.ReadOnly 380 } 381 382 tx, err := c.conn.BeginTx(ctx, pgxOpts) 383 if err != nil { 384 return nil, err 385 } 386 387 return wrapTx{ctx: ctx, tx: tx}, nil 388} 389 390func (c *Conn) ExecContext(ctx context.Context, query string, argsV []driver.NamedValue) (driver.Result, error) { 391 if c.conn.IsClosed() { 392 return nil, driver.ErrBadConn 393 } 394 395 args := namedValueToInterface(argsV) 396 397 commandTag, err := c.conn.Exec(ctx, query, args...) 398 // if we got a network error before we had a chance to send the query, retry 399 if err != nil { 400 if pgconn.SafeToRetry(err) { 401 return nil, driver.ErrBadConn 402 } 403 } 404 return driver.RowsAffected(commandTag.RowsAffected()), err 405} 406 407func (c *Conn) QueryContext(ctx context.Context, query string, argsV []driver.NamedValue) (driver.Rows, error) { 408 if c.conn.IsClosed() { 409 return nil, driver.ErrBadConn 410 } 411 412 args := []interface{}{databaseSQLResultFormats} 413 args = append(args, namedValueToInterface(argsV)...) 414 415 rows, err := c.conn.Query(ctx, query, args...) 416 if err != nil { 417 if pgconn.SafeToRetry(err) { 418 return nil, driver.ErrBadConn 419 } 420 return nil, err 421 } 422 423 // Preload first row because otherwise we won't know what columns are available when database/sql asks. 424 more := rows.Next() 425 if err = rows.Err(); err != nil { 426 rows.Close() 427 return nil, err 428 } 429 return &Rows{conn: c, rows: rows, skipNext: true, skipNextMore: more}, nil 430} 431 432func (c *Conn) Ping(ctx context.Context) error { 433 if c.conn.IsClosed() { 434 return driver.ErrBadConn 435 } 436 437 err := c.conn.Ping(ctx) 438 if err != nil { 439 // A Ping failure implies some sort of fatal state. The connection is almost certainly already closed by the 440 // failure, but manually close it just to be sure. 441 c.Close() 442 return driver.ErrBadConn 443 } 444 445 return nil 446} 447 448func (c *Conn) CheckNamedValue(*driver.NamedValue) error { 449 // Underlying pgx supports sql.Scanner and driver.Valuer interfaces natively. So everything can be passed through directly. 450 return nil 451} 452 453func (c *Conn) ResetSession(ctx context.Context) error { 454 if c.conn.IsClosed() { 455 return driver.ErrBadConn 456 } 457 458 return c.resetSessionFunc(ctx, c.conn) 459} 460 461type Stmt struct { 462 sd *pgconn.StatementDescription 463 conn *Conn 464} 465 466func (s *Stmt) Close() error { 467 ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) 468 defer cancel() 469 return s.conn.conn.Deallocate(ctx, s.sd.Name) 470} 471 472func (s *Stmt) NumInput() int { 473 return len(s.sd.ParamOIDs) 474} 475 476func (s *Stmt) Exec(argsV []driver.Value) (driver.Result, error) { 477 return nil, errors.New("Stmt.Exec deprecated and not implemented") 478} 479 480func (s *Stmt) ExecContext(ctx context.Context, argsV []driver.NamedValue) (driver.Result, error) { 481 return s.conn.ExecContext(ctx, s.sd.Name, argsV) 482} 483 484func (s *Stmt) Query(argsV []driver.Value) (driver.Rows, error) { 485 return nil, errors.New("Stmt.Query deprecated and not implemented") 486} 487 488func (s *Stmt) QueryContext(ctx context.Context, argsV []driver.NamedValue) (driver.Rows, error) { 489 return s.conn.QueryContext(ctx, s.sd.Name, argsV) 490} 491 492type rowValueFunc func(src []byte) (driver.Value, error) 493 494type Rows struct { 495 conn *Conn 496 rows pgx.Rows 497 valueFuncs []rowValueFunc 498 skipNext bool 499 skipNextMore bool 500 501 columnNames []string 502} 503 504func (r *Rows) Columns() []string { 505 if r.columnNames == nil { 506 fields := r.rows.FieldDescriptions() 507 r.columnNames = make([]string, len(fields)) 508 for i, fd := range fields { 509 r.columnNames[i] = string(fd.Name) 510 } 511 } 512 513 return r.columnNames 514} 515 516// ColumnTypeDatabaseTypeName returns the database system type name. If the name is unknown the OID is returned. 517func (r *Rows) ColumnTypeDatabaseTypeName(index int) string { 518 if dt, ok := r.conn.conn.ConnInfo().DataTypeForOID(r.rows.FieldDescriptions()[index].DataTypeOID); ok { 519 return strings.ToUpper(dt.Name) 520 } 521 522 return strconv.FormatInt(int64(r.rows.FieldDescriptions()[index].DataTypeOID), 10) 523} 524 525const varHeaderSize = 4 526 527// ColumnTypeLength returns the length of the column type if the column is a 528// variable length type. If the column is not a variable length type ok 529// should return false. 530func (r *Rows) ColumnTypeLength(index int) (int64, bool) { 531 fd := r.rows.FieldDescriptions()[index] 532 533 switch fd.DataTypeOID { 534 case pgtype.TextOID, pgtype.ByteaOID: 535 return math.MaxInt64, true 536 case pgtype.VarcharOID, pgtype.BPCharArrayOID: 537 return int64(fd.TypeModifier - varHeaderSize), true 538 default: 539 return 0, false 540 } 541} 542 543// ColumnTypePrecisionScale should return the precision and scale for decimal 544// types. If not applicable, ok should be false. 545func (r *Rows) ColumnTypePrecisionScale(index int) (precision, scale int64, ok bool) { 546 fd := r.rows.FieldDescriptions()[index] 547 548 switch fd.DataTypeOID { 549 case pgtype.NumericOID: 550 mod := fd.TypeModifier - varHeaderSize 551 precision = int64((mod >> 16) & 0xffff) 552 scale = int64(mod & 0xffff) 553 return precision, scale, true 554 default: 555 return 0, 0, false 556 } 557} 558 559// ColumnTypeScanType returns the value type that can be used to scan types into. 560func (r *Rows) ColumnTypeScanType(index int) reflect.Type { 561 fd := r.rows.FieldDescriptions()[index] 562 563 switch fd.DataTypeOID { 564 case pgtype.Float8OID: 565 return reflect.TypeOf(float64(0)) 566 case pgtype.Float4OID: 567 return reflect.TypeOf(float32(0)) 568 case pgtype.Int8OID: 569 return reflect.TypeOf(int64(0)) 570 case pgtype.Int4OID: 571 return reflect.TypeOf(int32(0)) 572 case pgtype.Int2OID: 573 return reflect.TypeOf(int16(0)) 574 case pgtype.BoolOID: 575 return reflect.TypeOf(false) 576 case pgtype.NumericOID: 577 return reflect.TypeOf(float64(0)) 578 case pgtype.DateOID, pgtype.TimestampOID, pgtype.TimestamptzOID: 579 return reflect.TypeOf(time.Time{}) 580 case pgtype.ByteaOID: 581 return reflect.TypeOf([]byte(nil)) 582 default: 583 return reflect.TypeOf("") 584 } 585} 586 587func (r *Rows) Close() error { 588 r.rows.Close() 589 return r.rows.Err() 590} 591 592func (r *Rows) Next(dest []driver.Value) error { 593 ci := r.conn.conn.ConnInfo() 594 fieldDescriptions := r.rows.FieldDescriptions() 595 596 if r.valueFuncs == nil { 597 r.valueFuncs = make([]rowValueFunc, len(fieldDescriptions)) 598 599 for i, fd := range fieldDescriptions { 600 dataTypeOID := fd.DataTypeOID 601 format := fd.Format 602 603 switch fd.DataTypeOID { 604 case pgtype.BoolOID: 605 var d bool 606 scanPlan := ci.PlanScan(dataTypeOID, format, &d) 607 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 608 err := scanPlan.Scan(ci, dataTypeOID, format, src, &d) 609 return d, err 610 } 611 case pgtype.ByteaOID: 612 var d []byte 613 scanPlan := ci.PlanScan(dataTypeOID, format, &d) 614 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 615 err := scanPlan.Scan(ci, dataTypeOID, format, src, &d) 616 return d, err 617 } 618 case pgtype.CIDOID: 619 var d pgtype.CID 620 scanPlan := ci.PlanScan(dataTypeOID, format, &d) 621 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 622 err := scanPlan.Scan(ci, dataTypeOID, format, src, &d) 623 if err != nil { 624 return nil, err 625 } 626 return d.Value() 627 } 628 case pgtype.DateOID: 629 var d pgtype.Date 630 scanPlan := ci.PlanScan(dataTypeOID, format, &d) 631 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 632 err := scanPlan.Scan(ci, dataTypeOID, format, src, &d) 633 if err != nil { 634 return nil, err 635 } 636 return d.Value() 637 } 638 case pgtype.Float4OID: 639 var d float32 640 scanPlan := ci.PlanScan(dataTypeOID, format, &d) 641 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 642 err := scanPlan.Scan(ci, dataTypeOID, format, src, &d) 643 return float64(d), err 644 } 645 case pgtype.Float8OID: 646 var d float64 647 scanPlan := ci.PlanScan(dataTypeOID, format, &d) 648 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 649 err := scanPlan.Scan(ci, dataTypeOID, format, src, &d) 650 return d, err 651 } 652 case pgtype.Int2OID: 653 var d int16 654 scanPlan := ci.PlanScan(dataTypeOID, format, &d) 655 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 656 err := scanPlan.Scan(ci, dataTypeOID, format, src, &d) 657 return int64(d), err 658 } 659 case pgtype.Int4OID: 660 var d int32 661 scanPlan := ci.PlanScan(dataTypeOID, format, &d) 662 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 663 err := scanPlan.Scan(ci, dataTypeOID, format, src, &d) 664 return int64(d), err 665 } 666 case pgtype.Int8OID: 667 var d int64 668 scanPlan := ci.PlanScan(dataTypeOID, format, &d) 669 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 670 err := scanPlan.Scan(ci, dataTypeOID, format, src, &d) 671 return d, err 672 } 673 case pgtype.JSONOID: 674 var d pgtype.JSON 675 scanPlan := ci.PlanScan(dataTypeOID, format, &d) 676 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 677 err := scanPlan.Scan(ci, dataTypeOID, format, src, &d) 678 if err != nil { 679 return nil, err 680 } 681 return d.Value() 682 } 683 case pgtype.JSONBOID: 684 var d pgtype.JSONB 685 scanPlan := ci.PlanScan(dataTypeOID, format, &d) 686 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 687 err := scanPlan.Scan(ci, dataTypeOID, format, src, &d) 688 if err != nil { 689 return nil, err 690 } 691 return d.Value() 692 } 693 case pgtype.OIDOID: 694 var d pgtype.OIDValue 695 scanPlan := ci.PlanScan(dataTypeOID, format, &d) 696 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 697 err := scanPlan.Scan(ci, dataTypeOID, format, src, &d) 698 if err != nil { 699 return nil, err 700 } 701 return d.Value() 702 } 703 case pgtype.TimestampOID: 704 var d pgtype.Timestamp 705 scanPlan := ci.PlanScan(dataTypeOID, format, &d) 706 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 707 err := scanPlan.Scan(ci, dataTypeOID, format, src, &d) 708 if err != nil { 709 return nil, err 710 } 711 return d.Value() 712 } 713 case pgtype.TimestamptzOID: 714 var d pgtype.Timestamptz 715 scanPlan := ci.PlanScan(dataTypeOID, format, &d) 716 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 717 err := scanPlan.Scan(ci, dataTypeOID, format, src, &d) 718 if err != nil { 719 return nil, err 720 } 721 return d.Value() 722 } 723 case pgtype.XIDOID: 724 var d pgtype.XID 725 scanPlan := ci.PlanScan(dataTypeOID, format, &d) 726 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 727 err := scanPlan.Scan(ci, dataTypeOID, format, src, &d) 728 if err != nil { 729 return nil, err 730 } 731 return d.Value() 732 } 733 default: 734 var d string 735 scanPlan := ci.PlanScan(dataTypeOID, format, &d) 736 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 737 err := scanPlan.Scan(ci, dataTypeOID, format, src, &d) 738 return d, err 739 } 740 } 741 } 742 } 743 744 var more bool 745 if r.skipNext { 746 more = r.skipNextMore 747 r.skipNext = false 748 } else { 749 more = r.rows.Next() 750 } 751 752 if !more { 753 if r.rows.Err() == nil { 754 return io.EOF 755 } else { 756 return r.rows.Err() 757 } 758 } 759 760 for i, rv := range r.rows.RawValues() { 761 if rv != nil { 762 var err error 763 dest[i], err = r.valueFuncs[i](rv) 764 if err != nil { 765 return fmt.Errorf("convert field %d failed: %v", i, err) 766 } 767 } else { 768 dest[i] = nil 769 } 770 } 771 772 return nil 773} 774 775func valueToInterface(argsV []driver.Value) []interface{} { 776 args := make([]interface{}, 0, len(argsV)) 777 for _, v := range argsV { 778 if v != nil { 779 args = append(args, v.(interface{})) 780 } else { 781 args = append(args, nil) 782 } 783 } 784 return args 785} 786 787func namedValueToInterface(argsV []driver.NamedValue) []interface{} { 788 args := make([]interface{}, 0, len(argsV)) 789 for _, v := range argsV { 790 if v.Value != nil { 791 args = append(args, v.Value.(interface{})) 792 } else { 793 args = append(args, nil) 794 } 795 } 796 return args 797} 798 799type wrapTx struct { 800 ctx context.Context 801 tx pgx.Tx 802} 803 804func (wtx wrapTx) Commit() error { return wtx.tx.Commit(wtx.ctx) } 805 806func (wtx wrapTx) Rollback() error { return wtx.tx.Rollback(wtx.ctx) } 807 808type fakeTx struct{} 809 810func (fakeTx) Commit() error { return nil } 811 812func (fakeTx) Rollback() error { return nil } 813 814// AcquireConn acquires a *pgx.Conn from database/sql connection pool. It must be released with ReleaseConn. 815// 816// In Go 1.13 this functionality has been incorporated into the standard library in the db.Conn.Raw() method. 817func AcquireConn(db *sql.DB) (*pgx.Conn, error) { 818 var conn *pgx.Conn 819 ctx := context.WithValue(context.Background(), ctxKeyFakeTx, &conn) 820 tx, err := db.BeginTx(ctx, nil) 821 if err != nil { 822 return nil, err 823 } 824 if conn == nil { 825 tx.Rollback() 826 return nil, ErrNotPgx 827 } 828 829 fakeTxMutex.Lock() 830 fakeTxConns[conn] = tx 831 fakeTxMutex.Unlock() 832 833 return conn, nil 834} 835 836// ReleaseConn releases a *pgx.Conn acquired with AcquireConn. 837func ReleaseConn(db *sql.DB, conn *pgx.Conn) error { 838 var tx *sql.Tx 839 var ok bool 840 841 if conn.PgConn().IsBusy() || conn.PgConn().TxStatus() != 'I' { 842 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 843 defer cancel() 844 conn.Close(ctx) 845 } 846 847 fakeTxMutex.Lock() 848 tx, ok = fakeTxConns[conn] 849 if ok { 850 delete(fakeTxConns, conn) 851 fakeTxMutex.Unlock() 852 } else { 853 fakeTxMutex.Unlock() 854 return fmt.Errorf("can't release conn that is not acquired") 855 } 856 857 return tx.Rollback() 858} 859