1package mgo
2
3import (
4	"bytes"
5	"sort"
6
7	"gopkg.in/mgo.v2/bson"
8)
9
10// Bulk represents an operation that can be prepared with several
11// orthogonal changes before being delivered to the server.
12//
13// MongoDB servers older than version 2.6 do not have proper support for bulk
14// operations, so the driver attempts to map its API as much as possible into
15// the functionality that works. In particular, in those releases updates and
16// removals are sent individually, and inserts are sent in bulk but have
17// suboptimal error reporting compared to more recent versions of the server.
18// See the documentation of BulkErrorCase for details on that.
19//
20// Relevant documentation:
21//
22//   http://blog.mongodb.org/post/84922794768/mongodbs-new-bulk-api
23//
24type Bulk struct {
25	c       *Collection
26	opcount int
27	actions []bulkAction
28	ordered bool
29}
30
31type bulkOp int
32
33const (
34	bulkInsert bulkOp = iota + 1
35	bulkUpdate
36	bulkUpdateAll
37	bulkRemove
38)
39
40type bulkAction struct {
41	op   bulkOp
42	docs []interface{}
43	idxs []int
44}
45
46type bulkUpdateOp []interface{}
47type bulkDeleteOp []interface{}
48
49// BulkResult holds the results for a bulk operation.
50type BulkResult struct {
51	Matched  int
52	Modified int // Available only for MongoDB 2.6+
53
54	// Be conservative while we understand exactly how to report these
55	// results in a useful and convenient way, and also how to emulate
56	// them with prior servers.
57	private bool
58}
59
60// BulkError holds an error returned from running a Bulk operation.
61// Individual errors may be obtained and inspected via the Cases method.
62type BulkError struct {
63	ecases []BulkErrorCase
64}
65
66func (e *BulkError) Error() string {
67	if len(e.ecases) == 0 {
68		return "invalid BulkError instance: no errors"
69	}
70	if len(e.ecases) == 1 {
71		return e.ecases[0].Err.Error()
72	}
73	msgs := make([]string, 0, len(e.ecases))
74	seen := make(map[string]bool)
75	for _, ecase := range e.ecases {
76		msg := ecase.Err.Error()
77		if !seen[msg] {
78			seen[msg] = true
79			msgs = append(msgs, msg)
80		}
81	}
82	if len(msgs) == 1 {
83		return msgs[0]
84	}
85	var buf bytes.Buffer
86	buf.WriteString("multiple errors in bulk operation:\n")
87	for _, msg := range msgs {
88		buf.WriteString("  - ")
89		buf.WriteString(msg)
90		buf.WriteByte('\n')
91	}
92	return buf.String()
93}
94
95type bulkErrorCases []BulkErrorCase
96
97func (slice bulkErrorCases) Len() int           { return len(slice) }
98func (slice bulkErrorCases) Less(i, j int) bool { return slice[i].Index < slice[j].Index }
99func (slice bulkErrorCases) Swap(i, j int)      { slice[i], slice[j] = slice[j], slice[i] }
100
101// BulkErrorCase holds an individual error found while attempting a single change
102// within a bulk operation, and the position in which it was enqueued.
103//
104// MongoDB servers older than version 2.6 do not have proper support for bulk
105// operations, so the driver attempts to map its API as much as possible into
106// the functionality that works. In particular, only the last error is reported
107// for bulk inserts and without any positional information, so the Index
108// field is set to -1 in these cases.
109type BulkErrorCase struct {
110	Index int // Position of operation that failed, or -1 if unknown.
111	Err   error
112}
113
114// Cases returns all individual errors found while attempting the requested changes.
115//
116// See the documentation of BulkErrorCase for limitations in older MongoDB releases.
117func (e *BulkError) Cases() []BulkErrorCase {
118	return e.ecases
119}
120
121// Bulk returns a value to prepare the execution of a bulk operation.
122func (c *Collection) Bulk() *Bulk {
123	return &Bulk{c: c, ordered: true}
124}
125
126// Unordered puts the bulk operation in unordered mode.
127//
128// In unordered mode the indvidual operations may be sent
129// out of order, which means latter operations may proceed
130// even if prior ones have failed.
131func (b *Bulk) Unordered() {
132	b.ordered = false
133}
134
135func (b *Bulk) action(op bulkOp, opcount int) *bulkAction {
136	var action *bulkAction
137	if len(b.actions) > 0 && b.actions[len(b.actions)-1].op == op {
138		action = &b.actions[len(b.actions)-1]
139	} else if !b.ordered {
140		for i := range b.actions {
141			if b.actions[i].op == op {
142				action = &b.actions[i]
143				break
144			}
145		}
146	}
147	if action == nil {
148		b.actions = append(b.actions, bulkAction{op: op})
149		action = &b.actions[len(b.actions)-1]
150	}
151	for i := 0; i < opcount; i++ {
152		action.idxs = append(action.idxs, b.opcount)
153		b.opcount++
154	}
155	return action
156}
157
158// Insert queues up the provided documents for insertion.
159func (b *Bulk) Insert(docs ...interface{}) {
160	action := b.action(bulkInsert, len(docs))
161	action.docs = append(action.docs, docs...)
162}
163
164// Remove queues up the provided selectors for removing matching documents.
165// Each selector will remove only a single matching document.
166func (b *Bulk) Remove(selectors ...interface{}) {
167	action := b.action(bulkRemove, len(selectors))
168	for _, selector := range selectors {
169		if selector == nil {
170			selector = bson.D{}
171		}
172		action.docs = append(action.docs, &deleteOp{
173			Collection: b.c.FullName,
174			Selector:   selector,
175			Flags:      1,
176			Limit:      1,
177		})
178	}
179}
180
181// RemoveAll queues up the provided selectors for removing all matching documents.
182// Each selector will remove all matching documents.
183func (b *Bulk) RemoveAll(selectors ...interface{}) {
184	action := b.action(bulkRemove, len(selectors))
185	for _, selector := range selectors {
186		if selector == nil {
187			selector = bson.D{}
188		}
189		action.docs = append(action.docs, &deleteOp{
190			Collection: b.c.FullName,
191			Selector:   selector,
192			Flags:      0,
193			Limit:      0,
194		})
195	}
196}
197
198// Update queues up the provided pairs of updating instructions.
199// The first element of each pair selects which documents must be
200// updated, and the second element defines how to update it.
201// Each pair matches exactly one document for updating at most.
202func (b *Bulk) Update(pairs ...interface{}) {
203	if len(pairs)%2 != 0 {
204		panic("Bulk.Update requires an even number of parameters")
205	}
206	action := b.action(bulkUpdate, len(pairs)/2)
207	for i := 0; i < len(pairs); i += 2 {
208		selector := pairs[i]
209		if selector == nil {
210			selector = bson.D{}
211		}
212		action.docs = append(action.docs, &updateOp{
213			Collection: b.c.FullName,
214			Selector:   selector,
215			Update:     pairs[i+1],
216		})
217	}
218}
219
220// UpdateAll queues up the provided pairs of updating instructions.
221// The first element of each pair selects which documents must be
222// updated, and the second element defines how to update it.
223// Each pair updates all documents matching the selector.
224func (b *Bulk) UpdateAll(pairs ...interface{}) {
225	if len(pairs)%2 != 0 {
226		panic("Bulk.UpdateAll requires an even number of parameters")
227	}
228	action := b.action(bulkUpdate, len(pairs)/2)
229	for i := 0; i < len(pairs); i += 2 {
230		selector := pairs[i]
231		if selector == nil {
232			selector = bson.D{}
233		}
234		action.docs = append(action.docs, &updateOp{
235			Collection: b.c.FullName,
236			Selector:   selector,
237			Update:     pairs[i+1],
238			Flags:      2,
239			Multi:      true,
240		})
241	}
242}
243
244// Upsert queues up the provided pairs of upserting instructions.
245// The first element of each pair selects which documents must be
246// updated, and the second element defines how to update it.
247// Each pair matches exactly one document for updating at most.
248func (b *Bulk) Upsert(pairs ...interface{}) {
249	if len(pairs)%2 != 0 {
250		panic("Bulk.Update requires an even number of parameters")
251	}
252	action := b.action(bulkUpdate, len(pairs)/2)
253	for i := 0; i < len(pairs); i += 2 {
254		selector := pairs[i]
255		if selector == nil {
256			selector = bson.D{}
257		}
258		action.docs = append(action.docs, &updateOp{
259			Collection: b.c.FullName,
260			Selector:   selector,
261			Update:     pairs[i+1],
262			Flags:      1,
263			Upsert:     true,
264		})
265	}
266}
267
268// Run runs all the operations queued up.
269//
270// If an error is reported on an unordered bulk operation, the error value may
271// be an aggregation of all issues observed. As an exception to that, Insert
272// operations running on MongoDB versions prior to 2.6 will report the last
273// error only due to a limitation in the wire protocol.
274func (b *Bulk) Run() (*BulkResult, error) {
275	var result BulkResult
276	var berr BulkError
277	var failed bool
278	for i := range b.actions {
279		action := &b.actions[i]
280		var ok bool
281		switch action.op {
282		case bulkInsert:
283			ok = b.runInsert(action, &result, &berr)
284		case bulkUpdate:
285			ok = b.runUpdate(action, &result, &berr)
286		case bulkRemove:
287			ok = b.runRemove(action, &result, &berr)
288		default:
289			panic("unknown bulk operation")
290		}
291		if !ok {
292			failed = true
293			if b.ordered {
294				break
295			}
296		}
297	}
298	if failed {
299		sort.Sort(bulkErrorCases(berr.ecases))
300		return nil, &berr
301	}
302	return &result, nil
303}
304
305func (b *Bulk) runInsert(action *bulkAction, result *BulkResult, berr *BulkError) bool {
306	op := &insertOp{b.c.FullName, action.docs, 0}
307	if !b.ordered {
308		op.flags = 1 // ContinueOnError
309	}
310	lerr, err := b.c.writeOp(op, b.ordered)
311	return b.checkSuccess(action, berr, lerr, err)
312}
313
314func (b *Bulk) runUpdate(action *bulkAction, result *BulkResult, berr *BulkError) bool {
315	lerr, err := b.c.writeOp(bulkUpdateOp(action.docs), b.ordered)
316	if lerr != nil {
317		result.Matched += lerr.N
318		result.Modified += lerr.modified
319	}
320	return b.checkSuccess(action, berr, lerr, err)
321}
322
323func (b *Bulk) runRemove(action *bulkAction, result *BulkResult, berr *BulkError) bool {
324	lerr, err := b.c.writeOp(bulkDeleteOp(action.docs), b.ordered)
325	if lerr != nil {
326		result.Matched += lerr.N
327		result.Modified += lerr.modified
328	}
329	return b.checkSuccess(action, berr, lerr, err)
330}
331
332func (b *Bulk) checkSuccess(action *bulkAction, berr *BulkError, lerr *LastError, err error) bool {
333	if lerr != nil && len(lerr.ecases) > 0 {
334		for i := 0; i < len(lerr.ecases); i++ {
335			// Map back from the local error index into the visible one.
336			ecase := lerr.ecases[i]
337			idx := ecase.Index
338			if idx >= 0 {
339				idx = action.idxs[idx]
340			}
341			berr.ecases = append(berr.ecases, BulkErrorCase{idx, ecase.Err})
342		}
343		return false
344	} else if err != nil {
345		for i := 0; i < len(action.idxs); i++ {
346			berr.ecases = append(berr.ecases, BulkErrorCase{action.idxs[i], err})
347		}
348		return false
349	}
350	return true
351}
352