1package pgx
2
3import (
4	"context"
5	"errors"
6
7	"github.com/jackc/pgconn"
8)
9
10type batchItem struct {
11	query     string
12	arguments []interface{}
13}
14
15// Batch queries are a way of bundling multiple queries together to avoid
16// unnecessary network round trips.
17type Batch struct {
18	items []*batchItem
19}
20
21// Queue queues a query to batch b. query can be an SQL query or the name of a prepared statement.
22func (b *Batch) Queue(query string, arguments ...interface{}) {
23	b.items = append(b.items, &batchItem{
24		query:     query,
25		arguments: arguments,
26	})
27}
28
29// Len returns number of queries that have been queued so far.
30func (b *Batch) Len() int {
31	return len(b.items)
32}
33
34type BatchResults interface {
35	// Exec reads the results from the next query in the batch as if the query has been sent with Conn.Exec.
36	Exec() (pgconn.CommandTag, error)
37
38	// Query reads the results from the next query in the batch as if the query has been sent with Conn.Query.
39	Query() (Rows, error)
40
41	// QueryRow reads the results from the next query in the batch as if the query has been sent with Conn.QueryRow.
42	QueryRow() Row
43
44	// Close closes the batch operation. This must be called before the underlying connection can be used again. Any error
45	// that occurred during a batch operation may have made it impossible to resyncronize the connection with the server.
46	// In this case the underlying connection will have been closed.
47	Close() error
48}
49
50type batchResults struct {
51	ctx  context.Context
52	conn *Conn
53	mrr  *pgconn.MultiResultReader
54	err  error
55	b    *Batch
56	ix   int
57}
58
59// Exec reads the results from the next query in the batch as if the query has been sent with Exec.
60func (br *batchResults) Exec() (pgconn.CommandTag, error) {
61	if br.err != nil {
62		return nil, br.err
63	}
64
65	query, arguments, _ := br.nextQueryAndArgs()
66
67	if !br.mrr.NextResult() {
68		err := br.mrr.Close()
69		if err == nil {
70			err = errors.New("no result")
71		}
72		if br.conn.shouldLog(LogLevelError) {
73			br.conn.log(br.ctx, LogLevelError, "BatchResult.Exec", map[string]interface{}{
74				"sql":  query,
75				"args": logQueryArgs(arguments),
76				"err":  err,
77			})
78		}
79		return nil, err
80	}
81
82	commandTag, err := br.mrr.ResultReader().Close()
83
84	if err != nil {
85		if br.conn.shouldLog(LogLevelError) {
86			br.conn.log(br.ctx, LogLevelError, "BatchResult.Exec", map[string]interface{}{
87				"sql":  query,
88				"args": logQueryArgs(arguments),
89				"err":  err,
90			})
91		}
92	} else if br.conn.shouldLog(LogLevelInfo) {
93		br.conn.log(br.ctx, LogLevelInfo, "BatchResult.Exec", map[string]interface{}{
94			"sql":        query,
95			"args":       logQueryArgs(arguments),
96			"commandTag": commandTag,
97		})
98	}
99
100	return commandTag, err
101}
102
103// Query reads the results from the next query in the batch as if the query has been sent with Query.
104func (br *batchResults) Query() (Rows, error) {
105	query, arguments, ok := br.nextQueryAndArgs()
106	if !ok {
107		query = "batch query"
108	}
109
110	if br.err != nil {
111		return &connRows{err: br.err, closed: true}, br.err
112	}
113
114	rows := br.conn.getRows(br.ctx, query, arguments)
115
116	if !br.mrr.NextResult() {
117		rows.err = br.mrr.Close()
118		if rows.err == nil {
119			rows.err = errors.New("no result")
120		}
121		rows.closed = true
122
123		if br.conn.shouldLog(LogLevelError) {
124			br.conn.log(br.ctx, LogLevelError, "BatchResult.Query", map[string]interface{}{
125				"sql":  query,
126				"args": logQueryArgs(arguments),
127				"err":  rows.err,
128			})
129		}
130
131		return rows, rows.err
132	}
133
134	rows.resultReader = br.mrr.ResultReader()
135	return rows, nil
136}
137
138// QueryRow reads the results from the next query in the batch as if the query has been sent with QueryRow.
139func (br *batchResults) QueryRow() Row {
140	rows, _ := br.Query()
141	return (*connRow)(rows.(*connRows))
142
143}
144
145// Close closes the batch operation. Any error that occurred during a batch operation may have made it impossible to
146// resyncronize the connection with the server. In this case the underlying connection will have been closed.
147func (br *batchResults) Close() error {
148	if br.err != nil {
149		return br.err
150	}
151
152	// log any queries that haven't yet been logged by Exec or Query
153	for {
154		query, args, ok := br.nextQueryAndArgs()
155		if !ok {
156			break
157		}
158
159		if br.conn.shouldLog(LogLevelInfo) {
160			br.conn.log(br.ctx, LogLevelInfo, "BatchResult.Close", map[string]interface{}{
161				"sql":  query,
162				"args": logQueryArgs(args),
163			})
164		}
165	}
166
167	return br.mrr.Close()
168}
169
170func (br *batchResults) nextQueryAndArgs() (query string, args []interface{}, ok bool) {
171	if br.b != nil && br.ix < len(br.b.items) {
172		bi := br.b.items[br.ix]
173		query = bi.query
174		args = bi.arguments
175		ok = true
176		br.ix++
177	}
178	return
179}
180