1// mgo - MongoDB driver for Go 2// 3// Copyright (c) 2010-2015 - Gustavo Niemeyer <gustavo@niemeyer.net> 4// 5// All rights reserved. 6// 7// Redistribution and use in source and binary forms, with or without 8// modification, are permitted provided that the following conditions are met: 9// 10// 1. Redistributions of source code must retain the above copyright notice, this 11// list of conditions and the following disclaimer. 12// 2. Redistributions in binary form must reproduce the above copyright notice, 13// this list of conditions and the following disclaimer in the documentation 14// and/or other materials provided with the distribution. 15// 16// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 17// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 18// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 19// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 20// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 21// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 22// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 23// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 27package mgo_test 28 29import ( 30 . "gopkg.in/check.v1" 31 "gopkg.in/mgo.v2" 32) 33 34func (s *S) TestBulkInsert(c *C) { 35 session, err := mgo.Dial("localhost:40001") 36 c.Assert(err, IsNil) 37 defer session.Close() 38 39 coll := session.DB("mydb").C("mycoll") 40 bulk := coll.Bulk() 41 bulk.Insert(M{"n": 1}) 42 bulk.Insert(M{"n": 2}, M{"n": 3}) 43 r, err := bulk.Run() 44 c.Assert(err, IsNil) 45 c.Assert(r, FitsTypeOf, &mgo.BulkResult{}) 46 47 type doc struct{ N int } 48 var res []doc 49 err = coll.Find(nil).Sort("n").All(&res) 50 c.Assert(err, IsNil) 51 c.Assert(res, DeepEquals, []doc{{1}, {2}, {3}}) 52} 53 54func (s *S) TestBulkInsertError(c *C) { 55 session, err := mgo.Dial("localhost:40001") 56 c.Assert(err, IsNil) 57 defer session.Close() 58 59 coll := session.DB("mydb").C("mycoll") 60 bulk := coll.Bulk() 61 bulk.Insert(M{"_id": 1}, M{"_id": 2}, M{"_id": 2}, M{"_id": 3}) 62 _, err = bulk.Run() 63 c.Assert(err, ErrorMatches, ".*duplicate key.*") 64 c.Assert(mgo.IsDup(err), Equals, true) 65 66 type doc struct { 67 N int `_id` 68 } 69 var res []doc 70 err = coll.Find(nil).Sort("_id").All(&res) 71 c.Assert(err, IsNil) 72 c.Assert(res, DeepEquals, []doc{{1}, {2}}) 73} 74 75func (s *S) TestBulkInsertErrorUnordered(c *C) { 76 session, err := mgo.Dial("localhost:40001") 77 c.Assert(err, IsNil) 78 defer session.Close() 79 80 coll := session.DB("mydb").C("mycoll") 81 bulk := coll.Bulk() 82 bulk.Unordered() 83 bulk.Insert(M{"_id": 1}, M{"_id": 2}, M{"_id": 2}, M{"_id": 3}) 84 _, err = bulk.Run() 85 c.Assert(err, ErrorMatches, ".*duplicate key.*") 86 87 type doc struct { 88 N int `_id` 89 } 90 var res []doc 91 err = coll.Find(nil).Sort("_id").All(&res) 92 c.Assert(err, IsNil) 93 c.Assert(res, DeepEquals, []doc{{1}, {2}, {3}}) 94} 95 96func (s *S) TestBulkInsertErrorUnorderedSplitBatch(c *C) { 97 // The server has a batch limit of 1000 documents when using write commands. 98 // This artificial limit did not exist with the old wire protocol, so to 99 // avoid compatibility issues the implementation internally split batches 100 // into the proper size and delivers them one by one. This test ensures that 101 // the behavior of unordered (that is, continue on error) remains correct 102 // when errors happen and there are batches left. 103 session, err := mgo.Dial("localhost:40001") 104 c.Assert(err, IsNil) 105 defer session.Close() 106 107 coll := session.DB("mydb").C("mycoll") 108 bulk := coll.Bulk() 109 bulk.Unordered() 110 111 const total = 4096 112 type doc struct { 113 Id int `_id` 114 } 115 docs := make([]interface{}, total) 116 for i := 0; i < total; i++ { 117 docs[i] = doc{i} 118 } 119 docs[1] = doc{0} 120 bulk.Insert(docs...) 121 _, err = bulk.Run() 122 c.Assert(err, ErrorMatches, ".*duplicate key.*") 123 124 n, err := coll.Count() 125 c.Assert(err, IsNil) 126 c.Assert(n, Equals, total-1) 127 128 var res doc 129 err = coll.FindId(1500).One(&res) 130 c.Assert(err, IsNil) 131 c.Assert(res.Id, Equals, 1500) 132} 133 134func (s *S) TestBulkErrorString(c *C) { 135 session, err := mgo.Dial("localhost:40001") 136 c.Assert(err, IsNil) 137 defer session.Close() 138 139 coll := session.DB("mydb").C("mycoll") 140 141 // If it's just the same string multiple times, join it into a single message. 142 bulk := coll.Bulk() 143 bulk.Unordered() 144 bulk.Insert(M{"_id": 1}, M{"_id": 2}, M{"_id": 2}) 145 _, err = bulk.Run() 146 c.Assert(err, ErrorMatches, ".*duplicate key.*") 147 c.Assert(err, Not(ErrorMatches), ".*duplicate key.*duplicate key") 148 c.Assert(mgo.IsDup(err), Equals, true) 149 150 // With matching errors but different messages, present them all. 151 bulk = coll.Bulk() 152 bulk.Unordered() 153 bulk.Insert(M{"_id": "dupone"}, M{"_id": "dupone"}, M{"_id": "duptwo"}, M{"_id": "duptwo"}) 154 _, err = bulk.Run() 155 if s.versionAtLeast(2, 6) { 156 c.Assert(err, ErrorMatches, "multiple errors in bulk operation:\n( - .*duplicate.*\n){2}$") 157 c.Assert(err, ErrorMatches, "(?s).*dupone.*") 158 c.Assert(err, ErrorMatches, "(?s).*duptwo.*") 159 } else { 160 // Wire protocol query doesn't return all errors. 161 c.Assert(err, ErrorMatches, ".*duplicate.*") 162 } 163 c.Assert(mgo.IsDup(err), Equals, true) 164 165 // With mixed errors, present them all. 166 bulk = coll.Bulk() 167 bulk.Unordered() 168 bulk.Insert(M{"_id": 1}, M{"_id": []int{2}}) 169 _, err = bulk.Run() 170 if s.versionAtLeast(2, 6) { 171 c.Assert(err, ErrorMatches, "multiple errors in bulk operation:\n - .*duplicate.*\n - .*array.*\n$") 172 } else { 173 // Wire protocol query doesn't return all errors. 174 c.Assert(err, ErrorMatches, ".*array.*") 175 } 176 c.Assert(mgo.IsDup(err), Equals, false) 177} 178 179func (s *S) TestBulkErrorCases_2_6(c *C) { 180 if !s.versionAtLeast(2, 6) { 181 c.Skip("2.4- has poor bulk reporting") 182 } 183 session, err := mgo.Dial("localhost:40001") 184 c.Assert(err, IsNil) 185 defer session.Close() 186 187 coll := session.DB("mydb").C("mycoll") 188 189 bulk := coll.Bulk() 190 bulk.Unordered() 191 192 // There's a limit of 1000 operations per command, so 193 // this forces the more complex indexing logic to act. 194 for i := 0; i < 1010; i++ { 195 switch i { 196 case 3, 14: 197 bulk.Insert(M{"_id": "dupone"}) 198 case 5, 106: 199 bulk.Update(M{"_id": i - 1}, M{"$set": M{"_id": 4}}) 200 case 7, 1008: 201 bulk.Insert(M{"_id": "duptwo"}) 202 default: 203 bulk.Insert(M{"_id": i}) 204 } 205 } 206 207 _, err = bulk.Run() 208 ecases := err.(*mgo.BulkError).Cases() 209 210 c.Check(ecases[0].Err, ErrorMatches, ".*duplicate.*dupone.*") 211 c.Check(ecases[0].Index, Equals, 14) 212 c.Check(ecases[1].Err, ErrorMatches, ".*update.*_id.*") 213 c.Check(ecases[1].Index, Equals, 106) 214 c.Check(ecases[2].Err, ErrorMatches, ".*duplicate.*duptwo.*") 215 c.Check(ecases[2].Index, Equals, 1008) 216} 217 218func (s *S) TestBulkErrorCases_2_4(c *C) { 219 if s.versionAtLeast(2, 6) { 220 c.Skip("2.6+ has better reporting") 221 } 222 session, err := mgo.Dial("localhost:40001") 223 c.Assert(err, IsNil) 224 defer session.Close() 225 226 coll := session.DB("mydb").C("mycoll") 227 228 bulk := coll.Bulk() 229 bulk.Unordered() 230 231 // There's a limit of 1000 operations per command, so 232 // this forces the more complex indexing logic to act. 233 for i := 0; i < 1010; i++ { 234 switch i { 235 case 3, 14: 236 bulk.Insert(M{"_id": "dupone"}) 237 case 5: 238 bulk.Update(M{"_id": i - 1}, M{"$set": M{"n": 4}}) 239 case 106: 240 bulk.Update(M{"_id": i - 1}, M{"$bogus": M{"n": 4}}) 241 case 7, 1008: 242 bulk.Insert(M{"_id": "duptwo"}) 243 default: 244 bulk.Insert(M{"_id": i}) 245 } 246 } 247 248 _, err = bulk.Run() 249 ecases := err.(*mgo.BulkError).Cases() 250 251 c.Check(ecases[0].Err, ErrorMatches, ".*duplicate.*duptwo.*") 252 c.Check(ecases[0].Index, Equals, -1) 253 c.Check(ecases[1].Err, ErrorMatches, `.*\$bogus.*`) 254 c.Check(ecases[1].Index, Equals, 106) 255} 256 257func (s *S) TestBulkErrorCasesOrdered(c *C) { 258 session, err := mgo.Dial("localhost:40001") 259 c.Assert(err, IsNil) 260 defer session.Close() 261 262 coll := session.DB("mydb").C("mycoll") 263 264 bulk := coll.Bulk() 265 266 // There's a limit of 1000 operations per command, so 267 // this forces the more complex indexing logic to act. 268 for i := 0; i < 20; i++ { 269 switch i { 270 case 3, 14: 271 bulk.Insert(M{"_id": "dupone"}) 272 case 7, 17: 273 bulk.Insert(M{"_id": "duptwo"}) 274 default: 275 bulk.Insert(M{"_id": i}) 276 } 277 } 278 279 _, err = bulk.Run() 280 ecases := err.(*mgo.BulkError).Cases() 281 282 c.Check(ecases[0].Err, ErrorMatches, ".*duplicate.*dupone.*") 283 if s.versionAtLeast(2, 6) { 284 c.Check(ecases[0].Index, Equals, 14) 285 } else { 286 c.Check(ecases[0].Index, Equals, -1) 287 } 288 c.Check(ecases, HasLen, 1) 289} 290 291func (s *S) TestBulkUpdate(c *C) { 292 session, err := mgo.Dial("localhost:40001") 293 c.Assert(err, IsNil) 294 defer session.Close() 295 296 coll := session.DB("mydb").C("mycoll") 297 298 err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3}) 299 c.Assert(err, IsNil) 300 301 bulk := coll.Bulk() 302 bulk.Update(M{"n": 1}, M{"$set": M{"n": 1}}) 303 bulk.Update(M{"n": 2}, M{"$set": M{"n": 20}}) 304 bulk.Update(M{"n": 5}, M{"$set": M{"n": 50}}) // Won't match. 305 bulk.Update(M{"n": 1}, M{"$set": M{"n": 10}}, M{"n": 3}, M{"$set": M{"n": 30}}) 306 r, err := bulk.Run() 307 c.Assert(err, IsNil) 308 c.Assert(r.Matched, Equals, 4) 309 if s.versionAtLeast(2, 6) { 310 c.Assert(r.Modified, Equals, 3) 311 } 312 313 type doc struct{ N int } 314 var res []doc 315 err = coll.Find(nil).Sort("n").All(&res) 316 c.Assert(err, IsNil) 317 c.Assert(res, DeepEquals, []doc{{10}, {20}, {30}}) 318} 319 320func (s *S) TestBulkUpdateError(c *C) { 321 session, err := mgo.Dial("localhost:40001") 322 c.Assert(err, IsNil) 323 defer session.Close() 324 325 coll := session.DB("mydb").C("mycoll") 326 327 err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3}) 328 c.Assert(err, IsNil) 329 330 bulk := coll.Bulk() 331 bulk.Update( 332 M{"n": 1}, M{"$set": M{"n": 10}}, 333 M{"n": 2}, M{"$set": M{"n": 20, "_id": 20}}, 334 M{"n": 3}, M{"$set": M{"n": 30}}, 335 ) 336 r, err := bulk.Run() 337 c.Assert(err, ErrorMatches, ".*_id.*") 338 c.Assert(r, FitsTypeOf, &mgo.BulkResult{}) 339 340 type doc struct{ N int } 341 var res []doc 342 err = coll.Find(nil).Sort("n").All(&res) 343 c.Assert(err, IsNil) 344 c.Assert(res, DeepEquals, []doc{{2}, {3}, {10}}) 345} 346 347func (s *S) TestBulkUpdateErrorUnordered(c *C) { 348 session, err := mgo.Dial("localhost:40001") 349 c.Assert(err, IsNil) 350 defer session.Close() 351 352 coll := session.DB("mydb").C("mycoll") 353 354 err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3}) 355 c.Assert(err, IsNil) 356 357 bulk := coll.Bulk() 358 bulk.Unordered() 359 bulk.Update( 360 M{"n": 1}, M{"$set": M{"n": 10}}, 361 M{"n": 2}, M{"$set": M{"n": 20, "_id": 20}}, 362 M{"n": 3}, M{"$set": M{"n": 30}}, 363 ) 364 r, err := bulk.Run() 365 c.Assert(err, ErrorMatches, ".*_id.*") 366 c.Assert(r, FitsTypeOf, &mgo.BulkResult{}) 367 368 type doc struct{ N int } 369 var res []doc 370 err = coll.Find(nil).Sort("n").All(&res) 371 c.Assert(err, IsNil) 372 c.Assert(res, DeepEquals, []doc{{2}, {10}, {30}}) 373} 374 375func (s *S) TestBulkUpdateAll(c *C) { 376 session, err := mgo.Dial("localhost:40001") 377 c.Assert(err, IsNil) 378 defer session.Close() 379 380 coll := session.DB("mydb").C("mycoll") 381 382 err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3}) 383 c.Assert(err, IsNil) 384 385 bulk := coll.Bulk() 386 bulk.UpdateAll(M{"n": 1}, M{"$set": M{"n": 10}}) 387 bulk.UpdateAll(M{"n": 2}, M{"$set": M{"n": 2}}) // Won't change. 388 bulk.UpdateAll(M{"n": 5}, M{"$set": M{"n": 50}}) // Won't match. 389 bulk.UpdateAll(M{}, M{"$inc": M{"n": 1}}, M{"n": 11}, M{"$set": M{"n": 5}}) 390 r, err := bulk.Run() 391 c.Assert(err, IsNil) 392 c.Assert(r.Matched, Equals, 6) 393 if s.versionAtLeast(2, 6) { 394 c.Assert(r.Modified, Equals, 5) 395 } 396 397 type doc struct{ N int } 398 var res []doc 399 err = coll.Find(nil).Sort("n").All(&res) 400 c.Assert(err, IsNil) 401 c.Assert(res, DeepEquals, []doc{{3}, {4}, {5}}) 402} 403 404func (s *S) TestBulkMixedUnordered(c *C) { 405 session, err := mgo.Dial("localhost:40001") 406 c.Assert(err, IsNil) 407 defer session.Close() 408 409 coll := session.DB("mydb").C("mycoll") 410 411 // Abuse undefined behavior to ensure the desired implementation is in place. 412 bulk := coll.Bulk() 413 bulk.Unordered() 414 bulk.Insert(M{"n": 1}) 415 bulk.Update(M{"n": 2}, M{"$inc": M{"n": 1}}) 416 bulk.Insert(M{"n": 2}) 417 bulk.Update(M{"n": 3}, M{"$inc": M{"n": 1}}) 418 bulk.Update(M{"n": 1}, M{"$inc": M{"n": 1}}) 419 bulk.Insert(M{"n": 3}) 420 r, err := bulk.Run() 421 c.Assert(err, IsNil) 422 c.Assert(r.Matched, Equals, 3) 423 if s.versionAtLeast(2, 6) { 424 c.Assert(r.Modified, Equals, 3) 425 } 426 427 type doc struct{ N int } 428 var res []doc 429 err = coll.Find(nil).Sort("n").All(&res) 430 c.Assert(err, IsNil) 431 c.Assert(res, DeepEquals, []doc{{2}, {3}, {4}}) 432} 433 434func (s *S) TestBulkUpsert(c *C) { 435 session, err := mgo.Dial("localhost:40001") 436 c.Assert(err, IsNil) 437 defer session.Close() 438 439 coll := session.DB("mydb").C("mycoll") 440 441 err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3}) 442 c.Assert(err, IsNil) 443 444 bulk := coll.Bulk() 445 bulk.Upsert(M{"n": 2}, M{"$set": M{"n": 20}}) 446 bulk.Upsert(M{"n": 4}, M{"$set": M{"n": 40}}, M{"n": 3}, M{"$set": M{"n": 30}}) 447 r, err := bulk.Run() 448 c.Assert(err, IsNil) 449 c.Assert(r, FitsTypeOf, &mgo.BulkResult{}) 450 451 type doc struct{ N int } 452 var res []doc 453 err = coll.Find(nil).Sort("n").All(&res) 454 c.Assert(err, IsNil) 455 c.Assert(res, DeepEquals, []doc{{1}, {20}, {30}, {40}}) 456} 457 458func (s *S) TestBulkRemove(c *C) { 459 session, err := mgo.Dial("localhost:40001") 460 c.Assert(err, IsNil) 461 defer session.Close() 462 463 coll := session.DB("mydb").C("mycoll") 464 465 err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3}, M{"n": 4}, M{"n": 4}) 466 c.Assert(err, IsNil) 467 468 bulk := coll.Bulk() 469 bulk.Remove(M{"n": 1}) 470 bulk.Remove(M{"n": 2}, M{"n": 4}) 471 r, err := bulk.Run() 472 c.Assert(err, IsNil) 473 c.Assert(r.Matched, Equals, 3) 474 475 type doc struct{ N int } 476 var res []doc 477 err = coll.Find(nil).Sort("n").All(&res) 478 c.Assert(err, IsNil) 479 c.Assert(res, DeepEquals, []doc{{3}, {4}}) 480} 481 482func (s *S) TestBulkRemoveAll(c *C) { 483 session, err := mgo.Dial("localhost:40001") 484 c.Assert(err, IsNil) 485 defer session.Close() 486 487 coll := session.DB("mydb").C("mycoll") 488 489 err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3}, M{"n": 4}, M{"n": 4}) 490 c.Assert(err, IsNil) 491 492 bulk := coll.Bulk() 493 bulk.RemoveAll(M{"n": 1}) 494 bulk.RemoveAll(M{"n": 2}, M{"n": 4}) 495 r, err := bulk.Run() 496 c.Assert(err, IsNil) 497 c.Assert(r.Matched, Equals, 4) 498 499 type doc struct{ N int } 500 var res []doc 501 err = coll.Find(nil).Sort("n").All(&res) 502 c.Assert(err, IsNil) 503 c.Assert(res, DeepEquals, []doc{{3}}) 504} 505