1// mgo - MongoDB driver for Go
2//
3// Copyright (c) 2010-2015 - Gustavo Niemeyer <gustavo@niemeyer.net>
4//
5// All rights reserved.
6//
7// Redistribution and use in source and binary forms, with or without
8// modification, are permitted provided that the following conditions are met:
9//
10// 1. Redistributions of source code must retain the above copyright notice, this
11//    list of conditions and the following disclaimer.
12// 2. Redistributions in binary form must reproduce the above copyright notice,
13//    this list of conditions and the following disclaimer in the documentation
14//    and/or other materials provided with the distribution.
15//
16// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
20// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27package mgo_test
28
29import (
30	. "gopkg.in/check.v1"
31	"gopkg.in/mgo.v2"
32)
33
34func (s *S) TestBulkInsert(c *C) {
35	session, err := mgo.Dial("localhost:40001")
36	c.Assert(err, IsNil)
37	defer session.Close()
38
39	coll := session.DB("mydb").C("mycoll")
40	bulk := coll.Bulk()
41	bulk.Insert(M{"n": 1})
42	bulk.Insert(M{"n": 2}, M{"n": 3})
43	r, err := bulk.Run()
44	c.Assert(err, IsNil)
45	c.Assert(r, FitsTypeOf, &mgo.BulkResult{})
46
47	type doc struct{ N int }
48	var res []doc
49	err = coll.Find(nil).Sort("n").All(&res)
50	c.Assert(err, IsNil)
51	c.Assert(res, DeepEquals, []doc{{1}, {2}, {3}})
52}
53
54func (s *S) TestBulkInsertError(c *C) {
55	session, err := mgo.Dial("localhost:40001")
56	c.Assert(err, IsNil)
57	defer session.Close()
58
59	coll := session.DB("mydb").C("mycoll")
60	bulk := coll.Bulk()
61	bulk.Insert(M{"_id": 1}, M{"_id": 2}, M{"_id": 2}, M{"_id": 3})
62	_, err = bulk.Run()
63	c.Assert(err, ErrorMatches, ".*duplicate key.*")
64	c.Assert(mgo.IsDup(err), Equals, true)
65
66	type doc struct {
67		N int `_id`
68	}
69	var res []doc
70	err = coll.Find(nil).Sort("_id").All(&res)
71	c.Assert(err, IsNil)
72	c.Assert(res, DeepEquals, []doc{{1}, {2}})
73}
74
75func (s *S) TestBulkInsertErrorUnordered(c *C) {
76	session, err := mgo.Dial("localhost:40001")
77	c.Assert(err, IsNil)
78	defer session.Close()
79
80	coll := session.DB("mydb").C("mycoll")
81	bulk := coll.Bulk()
82	bulk.Unordered()
83	bulk.Insert(M{"_id": 1}, M{"_id": 2}, M{"_id": 2}, M{"_id": 3})
84	_, err = bulk.Run()
85	c.Assert(err, ErrorMatches, ".*duplicate key.*")
86
87	type doc struct {
88		N int `_id`
89	}
90	var res []doc
91	err = coll.Find(nil).Sort("_id").All(&res)
92	c.Assert(err, IsNil)
93	c.Assert(res, DeepEquals, []doc{{1}, {2}, {3}})
94}
95
96func (s *S) TestBulkInsertErrorUnorderedSplitBatch(c *C) {
97	// The server has a batch limit of 1000 documents when using write commands.
98	// This artificial limit did not exist with the old wire protocol, so to
99	// avoid compatibility issues the implementation internally split batches
100	// into the proper size and delivers them one by one. This test ensures that
101	// the behavior of unordered (that is, continue on error) remains correct
102	// when errors happen and there are batches left.
103	session, err := mgo.Dial("localhost:40001")
104	c.Assert(err, IsNil)
105	defer session.Close()
106
107	coll := session.DB("mydb").C("mycoll")
108	bulk := coll.Bulk()
109	bulk.Unordered()
110
111	const total = 4096
112	type doc struct {
113		Id int `_id`
114	}
115	docs := make([]interface{}, total)
116	for i := 0; i < total; i++ {
117		docs[i] = doc{i}
118	}
119	docs[1] = doc{0}
120	bulk.Insert(docs...)
121	_, err = bulk.Run()
122	c.Assert(err, ErrorMatches, ".*duplicate key.*")
123
124	n, err := coll.Count()
125	c.Assert(err, IsNil)
126	c.Assert(n, Equals, total-1)
127
128	var res doc
129	err = coll.FindId(1500).One(&res)
130	c.Assert(err, IsNil)
131	c.Assert(res.Id, Equals, 1500)
132}
133
134func (s *S) TestBulkErrorString(c *C) {
135	session, err := mgo.Dial("localhost:40001")
136	c.Assert(err, IsNil)
137	defer session.Close()
138
139	coll := session.DB("mydb").C("mycoll")
140
141	// If it's just the same string multiple times, join it into a single message.
142	bulk := coll.Bulk()
143	bulk.Unordered()
144	bulk.Insert(M{"_id": 1}, M{"_id": 2}, M{"_id": 2})
145	_, err = bulk.Run()
146	c.Assert(err, ErrorMatches, ".*duplicate key.*")
147	c.Assert(err, Not(ErrorMatches), ".*duplicate key.*duplicate key")
148	c.Assert(mgo.IsDup(err), Equals, true)
149
150	// With matching errors but different messages, present them all.
151	bulk = coll.Bulk()
152	bulk.Unordered()
153	bulk.Insert(M{"_id": "dupone"}, M{"_id": "dupone"}, M{"_id": "duptwo"}, M{"_id": "duptwo"})
154	_, err = bulk.Run()
155	if s.versionAtLeast(2, 6) {
156		c.Assert(err, ErrorMatches, "multiple errors in bulk operation:\n(  - .*duplicate.*\n){2}$")
157		c.Assert(err, ErrorMatches, "(?s).*dupone.*")
158		c.Assert(err, ErrorMatches, "(?s).*duptwo.*")
159	} else {
160		// Wire protocol query doesn't return all errors.
161		c.Assert(err, ErrorMatches, ".*duplicate.*")
162	}
163	c.Assert(mgo.IsDup(err), Equals, true)
164
165	// With mixed errors, present them all.
166	bulk = coll.Bulk()
167	bulk.Unordered()
168	bulk.Insert(M{"_id": 1}, M{"_id": []int{2}})
169	_, err = bulk.Run()
170	if s.versionAtLeast(2, 6) {
171		c.Assert(err, ErrorMatches, "multiple errors in bulk operation:\n  - .*duplicate.*\n  - .*array.*\n$")
172	} else {
173		// Wire protocol query doesn't return all errors.
174		c.Assert(err, ErrorMatches, ".*array.*")
175	}
176	c.Assert(mgo.IsDup(err), Equals, false)
177}
178
179func (s *S) TestBulkErrorCases_2_6(c *C) {
180	if !s.versionAtLeast(2, 6) {
181		c.Skip("2.4- has poor bulk reporting")
182	}
183	session, err := mgo.Dial("localhost:40001")
184	c.Assert(err, IsNil)
185	defer session.Close()
186
187	coll := session.DB("mydb").C("mycoll")
188
189	bulk := coll.Bulk()
190	bulk.Unordered()
191
192	// There's a limit of 1000 operations per command, so
193	// this forces the more complex indexing logic to act.
194	for i := 0; i < 1010; i++ {
195		switch i {
196		case 3, 14:
197			bulk.Insert(M{"_id": "dupone"})
198		case 5, 106:
199			bulk.Update(M{"_id": i - 1}, M{"$set": M{"_id": 4}})
200		case 7, 1008:
201			bulk.Insert(M{"_id": "duptwo"})
202		default:
203			bulk.Insert(M{"_id": i})
204		}
205	}
206
207	_, err = bulk.Run()
208	ecases := err.(*mgo.BulkError).Cases()
209
210	c.Check(ecases[0].Err, ErrorMatches, ".*duplicate.*dupone.*")
211	c.Check(ecases[0].Index, Equals, 14)
212	c.Check(ecases[1].Err, ErrorMatches, ".*update.*_id.*")
213	c.Check(ecases[1].Index, Equals, 106)
214	c.Check(ecases[2].Err, ErrorMatches, ".*duplicate.*duptwo.*")
215	c.Check(ecases[2].Index, Equals, 1008)
216}
217
218func (s *S) TestBulkErrorCases_2_4(c *C) {
219	if s.versionAtLeast(2, 6) {
220		c.Skip("2.6+ has better reporting")
221	}
222	session, err := mgo.Dial("localhost:40001")
223	c.Assert(err, IsNil)
224	defer session.Close()
225
226	coll := session.DB("mydb").C("mycoll")
227
228	bulk := coll.Bulk()
229	bulk.Unordered()
230
231	// There's a limit of 1000 operations per command, so
232	// this forces the more complex indexing logic to act.
233	for i := 0; i < 1010; i++ {
234		switch i {
235		case 3, 14:
236			bulk.Insert(M{"_id": "dupone"})
237		case 5:
238			bulk.Update(M{"_id": i - 1}, M{"$set": M{"n": 4}})
239		case 106:
240			bulk.Update(M{"_id": i - 1}, M{"$bogus": M{"n": 4}})
241		case 7, 1008:
242			bulk.Insert(M{"_id": "duptwo"})
243		default:
244			bulk.Insert(M{"_id": i})
245		}
246	}
247
248	_, err = bulk.Run()
249	ecases := err.(*mgo.BulkError).Cases()
250
251	c.Check(ecases[0].Err, ErrorMatches, ".*duplicate.*duptwo.*")
252	c.Check(ecases[0].Index, Equals, -1)
253	c.Check(ecases[1].Err, ErrorMatches, `.*\$bogus.*`)
254	c.Check(ecases[1].Index, Equals, 106)
255}
256
257func (s *S) TestBulkErrorCasesOrdered(c *C) {
258	session, err := mgo.Dial("localhost:40001")
259	c.Assert(err, IsNil)
260	defer session.Close()
261
262	coll := session.DB("mydb").C("mycoll")
263
264	bulk := coll.Bulk()
265
266	// There's a limit of 1000 operations per command, so
267	// this forces the more complex indexing logic to act.
268	for i := 0; i < 20; i++ {
269		switch i {
270		case 3, 14:
271			bulk.Insert(M{"_id": "dupone"})
272		case 7, 17:
273			bulk.Insert(M{"_id": "duptwo"})
274		default:
275			bulk.Insert(M{"_id": i})
276		}
277	}
278
279	_, err = bulk.Run()
280	ecases := err.(*mgo.BulkError).Cases()
281
282	c.Check(ecases[0].Err, ErrorMatches, ".*duplicate.*dupone.*")
283	if s.versionAtLeast(2, 6) {
284		c.Check(ecases[0].Index, Equals, 14)
285	} else {
286		c.Check(ecases[0].Index, Equals, -1)
287	}
288	c.Check(ecases, HasLen, 1)
289}
290
291func (s *S) TestBulkUpdate(c *C) {
292	session, err := mgo.Dial("localhost:40001")
293	c.Assert(err, IsNil)
294	defer session.Close()
295
296	coll := session.DB("mydb").C("mycoll")
297
298	err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3})
299	c.Assert(err, IsNil)
300
301	bulk := coll.Bulk()
302	bulk.Update(M{"n": 1}, M{"$set": M{"n": 1}})
303	bulk.Update(M{"n": 2}, M{"$set": M{"n": 20}})
304	bulk.Update(M{"n": 5}, M{"$set": M{"n": 50}}) // Won't match.
305	bulk.Update(M{"n": 1}, M{"$set": M{"n": 10}}, M{"n": 3}, M{"$set": M{"n": 30}})
306	r, err := bulk.Run()
307	c.Assert(err, IsNil)
308	c.Assert(r.Matched, Equals, 4)
309	if s.versionAtLeast(2, 6) {
310		c.Assert(r.Modified, Equals, 3)
311	}
312
313	type doc struct{ N int }
314	var res []doc
315	err = coll.Find(nil).Sort("n").All(&res)
316	c.Assert(err, IsNil)
317	c.Assert(res, DeepEquals, []doc{{10}, {20}, {30}})
318}
319
320func (s *S) TestBulkUpdateError(c *C) {
321	session, err := mgo.Dial("localhost:40001")
322	c.Assert(err, IsNil)
323	defer session.Close()
324
325	coll := session.DB("mydb").C("mycoll")
326
327	err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3})
328	c.Assert(err, IsNil)
329
330	bulk := coll.Bulk()
331	bulk.Update(
332		M{"n": 1}, M{"$set": M{"n": 10}},
333		M{"n": 2}, M{"$set": M{"n": 20, "_id": 20}},
334		M{"n": 3}, M{"$set": M{"n": 30}},
335	)
336	r, err := bulk.Run()
337	c.Assert(err, ErrorMatches, ".*_id.*")
338	c.Assert(r, FitsTypeOf, &mgo.BulkResult{})
339
340	type doc struct{ N int }
341	var res []doc
342	err = coll.Find(nil).Sort("n").All(&res)
343	c.Assert(err, IsNil)
344	c.Assert(res, DeepEquals, []doc{{2}, {3}, {10}})
345}
346
347func (s *S) TestBulkUpdateErrorUnordered(c *C) {
348	session, err := mgo.Dial("localhost:40001")
349	c.Assert(err, IsNil)
350	defer session.Close()
351
352	coll := session.DB("mydb").C("mycoll")
353
354	err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3})
355	c.Assert(err, IsNil)
356
357	bulk := coll.Bulk()
358	bulk.Unordered()
359	bulk.Update(
360		M{"n": 1}, M{"$set": M{"n": 10}},
361		M{"n": 2}, M{"$set": M{"n": 20, "_id": 20}},
362		M{"n": 3}, M{"$set": M{"n": 30}},
363	)
364	r, err := bulk.Run()
365	c.Assert(err, ErrorMatches, ".*_id.*")
366	c.Assert(r, FitsTypeOf, &mgo.BulkResult{})
367
368	type doc struct{ N int }
369	var res []doc
370	err = coll.Find(nil).Sort("n").All(&res)
371	c.Assert(err, IsNil)
372	c.Assert(res, DeepEquals, []doc{{2}, {10}, {30}})
373}
374
375func (s *S) TestBulkUpdateAll(c *C) {
376	session, err := mgo.Dial("localhost:40001")
377	c.Assert(err, IsNil)
378	defer session.Close()
379
380	coll := session.DB("mydb").C("mycoll")
381
382	err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3})
383	c.Assert(err, IsNil)
384
385	bulk := coll.Bulk()
386	bulk.UpdateAll(M{"n": 1}, M{"$set": M{"n": 10}})
387	bulk.UpdateAll(M{"n": 2}, M{"$set": M{"n": 2}})  // Won't change.
388	bulk.UpdateAll(M{"n": 5}, M{"$set": M{"n": 50}}) // Won't match.
389	bulk.UpdateAll(M{}, M{"$inc": M{"n": 1}}, M{"n": 11}, M{"$set": M{"n": 5}})
390	r, err := bulk.Run()
391	c.Assert(err, IsNil)
392	c.Assert(r.Matched, Equals, 6)
393	if s.versionAtLeast(2, 6) {
394		c.Assert(r.Modified, Equals, 5)
395	}
396
397	type doc struct{ N int }
398	var res []doc
399	err = coll.Find(nil).Sort("n").All(&res)
400	c.Assert(err, IsNil)
401	c.Assert(res, DeepEquals, []doc{{3}, {4}, {5}})
402}
403
404func (s *S) TestBulkMixedUnordered(c *C) {
405	session, err := mgo.Dial("localhost:40001")
406	c.Assert(err, IsNil)
407	defer session.Close()
408
409	coll := session.DB("mydb").C("mycoll")
410
411	// Abuse undefined behavior to ensure the desired implementation is in place.
412	bulk := coll.Bulk()
413	bulk.Unordered()
414	bulk.Insert(M{"n": 1})
415	bulk.Update(M{"n": 2}, M{"$inc": M{"n": 1}})
416	bulk.Insert(M{"n": 2})
417	bulk.Update(M{"n": 3}, M{"$inc": M{"n": 1}})
418	bulk.Update(M{"n": 1}, M{"$inc": M{"n": 1}})
419	bulk.Insert(M{"n": 3})
420	r, err := bulk.Run()
421	c.Assert(err, IsNil)
422	c.Assert(r.Matched, Equals, 3)
423	if s.versionAtLeast(2, 6) {
424		c.Assert(r.Modified, Equals, 3)
425	}
426
427	type doc struct{ N int }
428	var res []doc
429	err = coll.Find(nil).Sort("n").All(&res)
430	c.Assert(err, IsNil)
431	c.Assert(res, DeepEquals, []doc{{2}, {3}, {4}})
432}
433
434func (s *S) TestBulkUpsert(c *C) {
435	session, err := mgo.Dial("localhost:40001")
436	c.Assert(err, IsNil)
437	defer session.Close()
438
439	coll := session.DB("mydb").C("mycoll")
440
441	err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3})
442	c.Assert(err, IsNil)
443
444	bulk := coll.Bulk()
445	bulk.Upsert(M{"n": 2}, M{"$set": M{"n": 20}})
446	bulk.Upsert(M{"n": 4}, M{"$set": M{"n": 40}}, M{"n": 3}, M{"$set": M{"n": 30}})
447	r, err := bulk.Run()
448	c.Assert(err, IsNil)
449	c.Assert(r, FitsTypeOf, &mgo.BulkResult{})
450
451	type doc struct{ N int }
452	var res []doc
453	err = coll.Find(nil).Sort("n").All(&res)
454	c.Assert(err, IsNil)
455	c.Assert(res, DeepEquals, []doc{{1}, {20}, {30}, {40}})
456}
457
458func (s *S) TestBulkRemove(c *C) {
459	session, err := mgo.Dial("localhost:40001")
460	c.Assert(err, IsNil)
461	defer session.Close()
462
463	coll := session.DB("mydb").C("mycoll")
464
465	err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3}, M{"n": 4}, M{"n": 4})
466	c.Assert(err, IsNil)
467
468	bulk := coll.Bulk()
469	bulk.Remove(M{"n": 1})
470	bulk.Remove(M{"n": 2}, M{"n": 4})
471	r, err := bulk.Run()
472	c.Assert(err, IsNil)
473	c.Assert(r.Matched, Equals, 3)
474
475	type doc struct{ N int }
476	var res []doc
477	err = coll.Find(nil).Sort("n").All(&res)
478	c.Assert(err, IsNil)
479	c.Assert(res, DeepEquals, []doc{{3}, {4}})
480}
481
482func (s *S) TestBulkRemoveAll(c *C) {
483	session, err := mgo.Dial("localhost:40001")
484	c.Assert(err, IsNil)
485	defer session.Close()
486
487	coll := session.DB("mydb").C("mycoll")
488
489	err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3}, M{"n": 4}, M{"n": 4})
490	c.Assert(err, IsNil)
491
492	bulk := coll.Bulk()
493	bulk.RemoveAll(M{"n": 1})
494	bulk.RemoveAll(M{"n": 2}, M{"n": 4})
495	r, err := bulk.Run()
496	c.Assert(err, IsNil)
497	c.Assert(r.Matched, Equals, 4)
498
499	type doc struct{ N int }
500	var res []doc
501	err = coll.Find(nil).Sort("n").All(&res)
502	c.Assert(err, IsNil)
503	c.Assert(res, DeepEquals, []doc{{3}})
504}
505