1package mgo 2 3import ( 4 "bytes" 5 "sort" 6 7 "gopkg.in/mgo.v2/bson" 8) 9 10// Bulk represents an operation that can be prepared with several 11// orthogonal changes before being delivered to the server. 12// 13// MongoDB servers older than version 2.6 do not have proper support for bulk 14// operations, so the driver attempts to map its API as much as possible into 15// the functionality that works. In particular, in those releases updates and 16// removals are sent individually, and inserts are sent in bulk but have 17// suboptimal error reporting compared to more recent versions of the server. 18// See the documentation of BulkErrorCase for details on that. 19// 20// Relevant documentation: 21// 22// http://blog.mongodb.org/post/84922794768/mongodbs-new-bulk-api 23// 24type Bulk struct { 25 c *Collection 26 opcount int 27 actions []bulkAction 28 ordered bool 29} 30 31type bulkOp int 32 33const ( 34 bulkInsert bulkOp = iota + 1 35 bulkUpdate 36 bulkUpdateAll 37 bulkRemove 38) 39 40type bulkAction struct { 41 op bulkOp 42 docs []interface{} 43 idxs []int 44} 45 46type bulkUpdateOp []interface{} 47type bulkDeleteOp []interface{} 48 49// BulkResult holds the results for a bulk operation. 50type BulkResult struct { 51 Matched int 52 Modified int // Available only for MongoDB 2.6+ 53 54 // Be conservative while we understand exactly how to report these 55 // results in a useful and convenient way, and also how to emulate 56 // them with prior servers. 57 private bool 58} 59 60// BulkError holds an error returned from running a Bulk operation. 61// Individual errors may be obtained and inspected via the Cases method. 62type BulkError struct { 63 ecases []BulkErrorCase 64} 65 66func (e *BulkError) Error() string { 67 if len(e.ecases) == 0 { 68 return "invalid BulkError instance: no errors" 69 } 70 if len(e.ecases) == 1 { 71 return e.ecases[0].Err.Error() 72 } 73 msgs := make([]string, 0, len(e.ecases)) 74 seen := make(map[string]bool) 75 for _, ecase := range e.ecases { 76 msg := ecase.Err.Error() 77 if !seen[msg] { 78 seen[msg] = true 79 msgs = append(msgs, msg) 80 } 81 } 82 if len(msgs) == 1 { 83 return msgs[0] 84 } 85 var buf bytes.Buffer 86 buf.WriteString("multiple errors in bulk operation:\n") 87 for _, msg := range msgs { 88 buf.WriteString(" - ") 89 buf.WriteString(msg) 90 buf.WriteByte('\n') 91 } 92 return buf.String() 93} 94 95type bulkErrorCases []BulkErrorCase 96 97func (slice bulkErrorCases) Len() int { return len(slice) } 98func (slice bulkErrorCases) Less(i, j int) bool { return slice[i].Index < slice[j].Index } 99func (slice bulkErrorCases) Swap(i, j int) { slice[i], slice[j] = slice[j], slice[i] } 100 101// BulkErrorCase holds an individual error found while attempting a single change 102// within a bulk operation, and the position in which it was enqueued. 103// 104// MongoDB servers older than version 2.6 do not have proper support for bulk 105// operations, so the driver attempts to map its API as much as possible into 106// the functionality that works. In particular, only the last error is reported 107// for bulk inserts and without any positional information, so the Index 108// field is set to -1 in these cases. 109type BulkErrorCase struct { 110 Index int // Position of operation that failed, or -1 if unknown. 111 Err error 112} 113 114// Cases returns all individual errors found while attempting the requested changes. 115// 116// See the documentation of BulkErrorCase for limitations in older MongoDB releases. 117func (e *BulkError) Cases() []BulkErrorCase { 118 return e.ecases 119} 120 121// Bulk returns a value to prepare the execution of a bulk operation. 122func (c *Collection) Bulk() *Bulk { 123 return &Bulk{c: c, ordered: true} 124} 125 126// Unordered puts the bulk operation in unordered mode. 127// 128// In unordered mode the indvidual operations may be sent 129// out of order, which means latter operations may proceed 130// even if prior ones have failed. 131func (b *Bulk) Unordered() { 132 b.ordered = false 133} 134 135func (b *Bulk) action(op bulkOp, opcount int) *bulkAction { 136 var action *bulkAction 137 if len(b.actions) > 0 && b.actions[len(b.actions)-1].op == op { 138 action = &b.actions[len(b.actions)-1] 139 } else if !b.ordered { 140 for i := range b.actions { 141 if b.actions[i].op == op { 142 action = &b.actions[i] 143 break 144 } 145 } 146 } 147 if action == nil { 148 b.actions = append(b.actions, bulkAction{op: op}) 149 action = &b.actions[len(b.actions)-1] 150 } 151 for i := 0; i < opcount; i++ { 152 action.idxs = append(action.idxs, b.opcount) 153 b.opcount++ 154 } 155 return action 156} 157 158// Insert queues up the provided documents for insertion. 159func (b *Bulk) Insert(docs ...interface{}) { 160 action := b.action(bulkInsert, len(docs)) 161 action.docs = append(action.docs, docs...) 162} 163 164// Remove queues up the provided selectors for removing matching documents. 165// Each selector will remove only a single matching document. 166func (b *Bulk) Remove(selectors ...interface{}) { 167 action := b.action(bulkRemove, len(selectors)) 168 for _, selector := range selectors { 169 if selector == nil { 170 selector = bson.D{} 171 } 172 action.docs = append(action.docs, &deleteOp{ 173 Collection: b.c.FullName, 174 Selector: selector, 175 Flags: 1, 176 Limit: 1, 177 }) 178 } 179} 180 181// RemoveAll queues up the provided selectors for removing all matching documents. 182// Each selector will remove all matching documents. 183func (b *Bulk) RemoveAll(selectors ...interface{}) { 184 action := b.action(bulkRemove, len(selectors)) 185 for _, selector := range selectors { 186 if selector == nil { 187 selector = bson.D{} 188 } 189 action.docs = append(action.docs, &deleteOp{ 190 Collection: b.c.FullName, 191 Selector: selector, 192 Flags: 0, 193 Limit: 0, 194 }) 195 } 196} 197 198// Update queues up the provided pairs of updating instructions. 199// The first element of each pair selects which documents must be 200// updated, and the second element defines how to update it. 201// Each pair matches exactly one document for updating at most. 202func (b *Bulk) Update(pairs ...interface{}) { 203 if len(pairs)%2 != 0 { 204 panic("Bulk.Update requires an even number of parameters") 205 } 206 action := b.action(bulkUpdate, len(pairs)/2) 207 for i := 0; i < len(pairs); i += 2 { 208 selector := pairs[i] 209 if selector == nil { 210 selector = bson.D{} 211 } 212 action.docs = append(action.docs, &updateOp{ 213 Collection: b.c.FullName, 214 Selector: selector, 215 Update: pairs[i+1], 216 }) 217 } 218} 219 220// UpdateAll queues up the provided pairs of updating instructions. 221// The first element of each pair selects which documents must be 222// updated, and the second element defines how to update it. 223// Each pair updates all documents matching the selector. 224func (b *Bulk) UpdateAll(pairs ...interface{}) { 225 if len(pairs)%2 != 0 { 226 panic("Bulk.UpdateAll requires an even number of parameters") 227 } 228 action := b.action(bulkUpdate, len(pairs)/2) 229 for i := 0; i < len(pairs); i += 2 { 230 selector := pairs[i] 231 if selector == nil { 232 selector = bson.D{} 233 } 234 action.docs = append(action.docs, &updateOp{ 235 Collection: b.c.FullName, 236 Selector: selector, 237 Update: pairs[i+1], 238 Flags: 2, 239 Multi: true, 240 }) 241 } 242} 243 244// Upsert queues up the provided pairs of upserting instructions. 245// The first element of each pair selects which documents must be 246// updated, and the second element defines how to update it. 247// Each pair matches exactly one document for updating at most. 248func (b *Bulk) Upsert(pairs ...interface{}) { 249 if len(pairs)%2 != 0 { 250 panic("Bulk.Update requires an even number of parameters") 251 } 252 action := b.action(bulkUpdate, len(pairs)/2) 253 for i := 0; i < len(pairs); i += 2 { 254 selector := pairs[i] 255 if selector == nil { 256 selector = bson.D{} 257 } 258 action.docs = append(action.docs, &updateOp{ 259 Collection: b.c.FullName, 260 Selector: selector, 261 Update: pairs[i+1], 262 Flags: 1, 263 Upsert: true, 264 }) 265 } 266} 267 268// Run runs all the operations queued up. 269// 270// If an error is reported on an unordered bulk operation, the error value may 271// be an aggregation of all issues observed. As an exception to that, Insert 272// operations running on MongoDB versions prior to 2.6 will report the last 273// error only due to a limitation in the wire protocol. 274func (b *Bulk) Run() (*BulkResult, error) { 275 var result BulkResult 276 var berr BulkError 277 var failed bool 278 for i := range b.actions { 279 action := &b.actions[i] 280 var ok bool 281 switch action.op { 282 case bulkInsert: 283 ok = b.runInsert(action, &result, &berr) 284 case bulkUpdate: 285 ok = b.runUpdate(action, &result, &berr) 286 case bulkRemove: 287 ok = b.runRemove(action, &result, &berr) 288 default: 289 panic("unknown bulk operation") 290 } 291 if !ok { 292 failed = true 293 if b.ordered { 294 break 295 } 296 } 297 } 298 if failed { 299 sort.Sort(bulkErrorCases(berr.ecases)) 300 return nil, &berr 301 } 302 return &result, nil 303} 304 305func (b *Bulk) runInsert(action *bulkAction, result *BulkResult, berr *BulkError) bool { 306 op := &insertOp{b.c.FullName, action.docs, 0} 307 if !b.ordered { 308 op.flags = 1 // ContinueOnError 309 } 310 lerr, err := b.c.writeOp(op, b.ordered) 311 return b.checkSuccess(action, berr, lerr, err) 312} 313 314func (b *Bulk) runUpdate(action *bulkAction, result *BulkResult, berr *BulkError) bool { 315 lerr, err := b.c.writeOp(bulkUpdateOp(action.docs), b.ordered) 316 if lerr != nil { 317 result.Matched += lerr.N 318 result.Modified += lerr.modified 319 } 320 return b.checkSuccess(action, berr, lerr, err) 321} 322 323func (b *Bulk) runRemove(action *bulkAction, result *BulkResult, berr *BulkError) bool { 324 lerr, err := b.c.writeOp(bulkDeleteOp(action.docs), b.ordered) 325 if lerr != nil { 326 result.Matched += lerr.N 327 result.Modified += lerr.modified 328 } 329 return b.checkSuccess(action, berr, lerr, err) 330} 331 332func (b *Bulk) checkSuccess(action *bulkAction, berr *BulkError, lerr *LastError, err error) bool { 333 if lerr != nil && len(lerr.ecases) > 0 { 334 for i := 0; i < len(lerr.ecases); i++ { 335 // Map back from the local error index into the visible one. 336 ecase := lerr.ecases[i] 337 idx := ecase.Index 338 if idx >= 0 { 339 idx = action.idxs[idx] 340 } 341 berr.ecases = append(berr.ecases, BulkErrorCase{idx, ecase.Err}) 342 } 343 return false 344 } else if err != nil { 345 for i := 0; i < len(action.idxs); i++ { 346 berr.ecases = append(berr.ecases, BulkErrorCase{action.idxs[i], err}) 347 } 348 return false 349 } 350 return true 351} 352