1package pgx 2 3import ( 4 "context" 5 "errors" 6 7 "github.com/jackc/pgconn" 8) 9 10type batchItem struct { 11 query string 12 arguments []interface{} 13} 14 15// Batch queries are a way of bundling multiple queries together to avoid 16// unnecessary network round trips. 17type Batch struct { 18 items []*batchItem 19} 20 21// Queue queues a query to batch b. query can be an SQL query or the name of a prepared statement. 22func (b *Batch) Queue(query string, arguments ...interface{}) { 23 b.items = append(b.items, &batchItem{ 24 query: query, 25 arguments: arguments, 26 }) 27} 28 29// Len returns number of queries that have been queued so far. 30func (b *Batch) Len() int { 31 return len(b.items) 32} 33 34type BatchResults interface { 35 // Exec reads the results from the next query in the batch as if the query has been sent with Conn.Exec. 36 Exec() (pgconn.CommandTag, error) 37 38 // Query reads the results from the next query in the batch as if the query has been sent with Conn.Query. 39 Query() (Rows, error) 40 41 // QueryRow reads the results from the next query in the batch as if the query has been sent with Conn.QueryRow. 42 QueryRow() Row 43 44 // Close closes the batch operation. This must be called before the underlying connection can be used again. Any error 45 // that occurred during a batch operation may have made it impossible to resyncronize the connection with the server. 46 // In this case the underlying connection will have been closed. 47 Close() error 48} 49 50type batchResults struct { 51 ctx context.Context 52 conn *Conn 53 mrr *pgconn.MultiResultReader 54 err error 55 b *Batch 56 ix int 57} 58 59// Exec reads the results from the next query in the batch as if the query has been sent with Exec. 60func (br *batchResults) Exec() (pgconn.CommandTag, error) { 61 if br.err != nil { 62 return nil, br.err 63 } 64 65 query, arguments, _ := br.nextQueryAndArgs() 66 67 if !br.mrr.NextResult() { 68 err := br.mrr.Close() 69 if err == nil { 70 err = errors.New("no result") 71 } 72 if br.conn.shouldLog(LogLevelError) { 73 br.conn.log(br.ctx, LogLevelError, "BatchResult.Exec", map[string]interface{}{ 74 "sql": query, 75 "args": logQueryArgs(arguments), 76 "err": err, 77 }) 78 } 79 return nil, err 80 } 81 82 commandTag, err := br.mrr.ResultReader().Close() 83 84 if err != nil { 85 if br.conn.shouldLog(LogLevelError) { 86 br.conn.log(br.ctx, LogLevelError, "BatchResult.Exec", map[string]interface{}{ 87 "sql": query, 88 "args": logQueryArgs(arguments), 89 "err": err, 90 }) 91 } 92 } else if br.conn.shouldLog(LogLevelInfo) { 93 br.conn.log(br.ctx, LogLevelInfo, "BatchResult.Exec", map[string]interface{}{ 94 "sql": query, 95 "args": logQueryArgs(arguments), 96 "commandTag": commandTag, 97 }) 98 } 99 100 return commandTag, err 101} 102 103// Query reads the results from the next query in the batch as if the query has been sent with Query. 104func (br *batchResults) Query() (Rows, error) { 105 query, arguments, ok := br.nextQueryAndArgs() 106 if !ok { 107 query = "batch query" 108 } 109 110 if br.err != nil { 111 return &connRows{err: br.err, closed: true}, br.err 112 } 113 114 rows := br.conn.getRows(br.ctx, query, arguments) 115 116 if !br.mrr.NextResult() { 117 rows.err = br.mrr.Close() 118 if rows.err == nil { 119 rows.err = errors.New("no result") 120 } 121 rows.closed = true 122 123 if br.conn.shouldLog(LogLevelError) { 124 br.conn.log(br.ctx, LogLevelError, "BatchResult.Query", map[string]interface{}{ 125 "sql": query, 126 "args": logQueryArgs(arguments), 127 "err": rows.err, 128 }) 129 } 130 131 return rows, rows.err 132 } 133 134 rows.resultReader = br.mrr.ResultReader() 135 return rows, nil 136} 137 138// QueryRow reads the results from the next query in the batch as if the query has been sent with QueryRow. 139func (br *batchResults) QueryRow() Row { 140 rows, _ := br.Query() 141 return (*connRow)(rows.(*connRows)) 142 143} 144 145// Close closes the batch operation. Any error that occurred during a batch operation may have made it impossible to 146// resyncronize the connection with the server. In this case the underlying connection will have been closed. 147func (br *batchResults) Close() error { 148 if br.err != nil { 149 return br.err 150 } 151 152 // log any queries that haven't yet been logged by Exec or Query 153 for { 154 query, args, ok := br.nextQueryAndArgs() 155 if !ok { 156 break 157 } 158 159 if br.conn.shouldLog(LogLevelInfo) { 160 br.conn.log(br.ctx, LogLevelInfo, "BatchResult.Close", map[string]interface{}{ 161 "sql": query, 162 "args": logQueryArgs(args), 163 }) 164 } 165 } 166 167 return br.mrr.Close() 168} 169 170func (br *batchResults) nextQueryAndArgs() (query string, args []interface{}, ok bool) { 171 if br.b != nil && br.ix < len(br.b.items) { 172 bi := br.b.items[br.ix] 173 query = bi.query 174 args = bi.arguments 175 ok = true 176 br.ix++ 177 } 178 return 179} 180