1// Copyright (C) MongoDB, Inc. 2017-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package mongo 8 9import ( 10 "context" 11 "errors" 12 "fmt" 13 "strings" 14 "time" 15 16 "go.mongodb.org/mongo-driver/bson" 17 "go.mongodb.org/mongo-driver/bson/bsoncodec" 18 "go.mongodb.org/mongo-driver/bson/bsontype" 19 "go.mongodb.org/mongo-driver/mongo/options" 20 "go.mongodb.org/mongo-driver/mongo/readconcern" 21 "go.mongodb.org/mongo-driver/mongo/readpref" 22 "go.mongodb.org/mongo-driver/mongo/writeconcern" 23 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" 24 "go.mongodb.org/mongo-driver/x/mongo/driver" 25 "go.mongodb.org/mongo-driver/x/mongo/driver/description" 26 "go.mongodb.org/mongo-driver/x/mongo/driver/operation" 27 "go.mongodb.org/mongo-driver/x/mongo/driver/session" 28) 29 30// Collection is a handle to a MongoDB collection. It is safe for concurrent use by multiple goroutines. 31type Collection struct { 32 client *Client 33 db *Database 34 name string 35 readConcern *readconcern.ReadConcern 36 writeConcern *writeconcern.WriteConcern 37 readPreference *readpref.ReadPref 38 readSelector description.ServerSelector 39 writeSelector description.ServerSelector 40 registry *bsoncodec.Registry 41} 42 43// aggregateParams is used to store information to configure an Aggregate operation. 44type aggregateParams struct { 45 ctx context.Context 46 pipeline interface{} 47 client *Client 48 registry *bsoncodec.Registry 49 readConcern *readconcern.ReadConcern 50 writeConcern *writeconcern.WriteConcern 51 retryRead bool 52 db string 53 col string 54 readSelector description.ServerSelector 55 writeSelector description.ServerSelector 56 readPreference *readpref.ReadPref 57 opts []*options.AggregateOptions 58} 59 60func closeImplicitSession(sess *session.Client) { 61 if sess != nil && sess.SessionType == session.Implicit { 62 sess.EndSession() 63 } 64} 65 66func newCollection(db *Database, name string, opts ...*options.CollectionOptions) *Collection { 67 collOpt := options.MergeCollectionOptions(opts...) 68 69 rc := db.readConcern 70 if collOpt.ReadConcern != nil { 71 rc = collOpt.ReadConcern 72 } 73 74 wc := db.writeConcern 75 if collOpt.WriteConcern != nil { 76 wc = collOpt.WriteConcern 77 } 78 79 rp := db.readPreference 80 if collOpt.ReadPreference != nil { 81 rp = collOpt.ReadPreference 82 } 83 84 reg := db.registry 85 if collOpt.Registry != nil { 86 reg = collOpt.Registry 87 } 88 89 readSelector := description.CompositeSelector([]description.ServerSelector{ 90 description.ReadPrefSelector(rp), 91 description.LatencySelector(db.client.localThreshold), 92 }) 93 94 writeSelector := description.CompositeSelector([]description.ServerSelector{ 95 description.WriteSelector(), 96 description.LatencySelector(db.client.localThreshold), 97 }) 98 99 coll := &Collection{ 100 client: db.client, 101 db: db, 102 name: name, 103 readPreference: rp, 104 readConcern: rc, 105 writeConcern: wc, 106 readSelector: readSelector, 107 writeSelector: writeSelector, 108 registry: reg, 109 } 110 111 return coll 112} 113 114func (coll *Collection) copy() *Collection { 115 return &Collection{ 116 client: coll.client, 117 db: coll.db, 118 name: coll.name, 119 readConcern: coll.readConcern, 120 writeConcern: coll.writeConcern, 121 readPreference: coll.readPreference, 122 readSelector: coll.readSelector, 123 writeSelector: coll.writeSelector, 124 registry: coll.registry, 125 } 126} 127 128// Clone creates a copy of the Collection configured with the given CollectionOptions. 129// The specified options are merged with the existing options on the collection, with the specified options taking 130// precedence. 131func (coll *Collection) Clone(opts ...*options.CollectionOptions) (*Collection, error) { 132 copyColl := coll.copy() 133 optsColl := options.MergeCollectionOptions(opts...) 134 135 if optsColl.ReadConcern != nil { 136 copyColl.readConcern = optsColl.ReadConcern 137 } 138 139 if optsColl.WriteConcern != nil { 140 copyColl.writeConcern = optsColl.WriteConcern 141 } 142 143 if optsColl.ReadPreference != nil { 144 copyColl.readPreference = optsColl.ReadPreference 145 } 146 147 if optsColl.Registry != nil { 148 copyColl.registry = optsColl.Registry 149 } 150 151 copyColl.readSelector = description.CompositeSelector([]description.ServerSelector{ 152 description.ReadPrefSelector(copyColl.readPreference), 153 description.LatencySelector(copyColl.client.localThreshold), 154 }) 155 156 return copyColl, nil 157} 158 159// Name returns the name of the collection. 160func (coll *Collection) Name() string { 161 return coll.name 162} 163 164// Database returns the Database that was used to create the Collection. 165func (coll *Collection) Database() *Database { 166 return coll.db 167} 168 169// BulkWrite performs a bulk write operation (https://docs.mongodb.com/manual/core/bulk-write-operations/). 170// 171// The models parameter must be a slice of operations to be executed in this bulk write. It cannot be nil or empty. 172// All of the models must be non-nil. See the mongo.WriteModel documentation for a list of valid model types and 173// examples of how they should be used. 174// 175// The opts parameter can be used to specify options for the operation (see the options.BulkWriteOptions documentation.) 176func (coll *Collection) BulkWrite(ctx context.Context, models []WriteModel, 177 opts ...*options.BulkWriteOptions) (*BulkWriteResult, error) { 178 179 if len(models) == 0 { 180 return nil, ErrEmptySlice 181 } 182 183 if ctx == nil { 184 ctx = context.Background() 185 } 186 187 sess := sessionFromContext(ctx) 188 if sess == nil && coll.client.sessionPool != nil { 189 var err error 190 sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit) 191 if err != nil { 192 return nil, err 193 } 194 defer sess.EndSession() 195 } 196 197 err := coll.client.validSession(sess) 198 if err != nil { 199 return nil, err 200 } 201 202 wc := coll.writeConcern 203 if sess.TransactionRunning() { 204 wc = nil 205 } 206 if !writeconcern.AckWrite(wc) { 207 sess = nil 208 } 209 210 selector := makePinnedSelector(sess, coll.writeSelector) 211 212 for _, model := range models { 213 if model == nil { 214 return nil, ErrNilDocument 215 } 216 } 217 218 bwo := options.MergeBulkWriteOptions(opts...) 219 220 op := bulkWrite{ 221 ordered: bwo.Ordered, 222 bypassDocumentValidation: bwo.BypassDocumentValidation, 223 models: models, 224 session: sess, 225 collection: coll, 226 selector: selector, 227 writeConcern: wc, 228 } 229 230 err = op.execute(ctx) 231 232 return &op.result, replaceErrors(err) 233} 234 235func (coll *Collection) insert(ctx context.Context, documents []interface{}, 236 opts ...*options.InsertManyOptions) ([]interface{}, error) { 237 238 if ctx == nil { 239 ctx = context.Background() 240 } 241 242 result := make([]interface{}, len(documents)) 243 docs := make([]bsoncore.Document, len(documents)) 244 245 for i, doc := range documents { 246 var err error 247 docs[i], result[i], err = transformAndEnsureIDv2(coll.registry, doc) 248 if err != nil { 249 return nil, err 250 } 251 } 252 253 sess := sessionFromContext(ctx) 254 if sess == nil && coll.client.sessionPool != nil { 255 var err error 256 sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit) 257 if err != nil { 258 return nil, err 259 } 260 defer sess.EndSession() 261 } 262 263 err := coll.client.validSession(sess) 264 if err != nil { 265 return nil, err 266 } 267 268 wc := coll.writeConcern 269 if sess.TransactionRunning() { 270 wc = nil 271 } 272 if !writeconcern.AckWrite(wc) { 273 sess = nil 274 } 275 276 selector := makePinnedSelector(sess, coll.writeSelector) 277 278 op := operation.NewInsert(docs...). 279 Session(sess).WriteConcern(wc).CommandMonitor(coll.client.monitor). 280 ServerSelector(selector).ClusterClock(coll.client.clock). 281 Database(coll.db.name).Collection(coll.name). 282 Deployment(coll.client.deployment).Crypt(coll.client.crypt) 283 imo := options.MergeInsertManyOptions(opts...) 284 if imo.BypassDocumentValidation != nil && *imo.BypassDocumentValidation { 285 op = op.BypassDocumentValidation(*imo.BypassDocumentValidation) 286 } 287 if imo.Ordered != nil { 288 op = op.Ordered(*imo.Ordered) 289 } 290 retry := driver.RetryNone 291 if coll.client.retryWrites { 292 retry = driver.RetryOncePerCommand 293 } 294 op = op.Retry(retry) 295 296 return result, op.Execute(ctx) 297} 298 299// InsertOne executes an insert command to insert a single document into the collection. 300// 301// The document parameter must be the document to be inserted. It cannot be nil. If the document does not have an _id 302// field when transformed into BSON, one will be added automatically to the marshalled document. The original document 303// will not be modified. The _id can be retrieved from the InsertedID field of the returned InsertOneResult. 304// 305// The opts parameter can be used to specify options for the operation (see the options.InsertOneOptions documentation.) 306// 307// For more information about the command, see https://docs.mongodb.com/manual/reference/command/insert/. 308func (coll *Collection) InsertOne(ctx context.Context, document interface{}, 309 opts ...*options.InsertOneOptions) (*InsertOneResult, error) { 310 311 imOpts := make([]*options.InsertManyOptions, len(opts)) 312 for i, opt := range opts { 313 imo := options.InsertMany() 314 if opt.BypassDocumentValidation != nil && *opt.BypassDocumentValidation { 315 imo = imo.SetBypassDocumentValidation(*opt.BypassDocumentValidation) 316 } 317 imOpts[i] = imo 318 } 319 res, err := coll.insert(ctx, []interface{}{document}, imOpts...) 320 321 rr, err := processWriteError(err) 322 if rr&rrOne == 0 { 323 return nil, err 324 } 325 return &InsertOneResult{InsertedID: res[0]}, err 326} 327 328// InsertMany executes an insert command to insert multiple documents into the collection. If write errors occur 329// during the operation (e.g. duplicate key error), this method returns a BulkWriteException error. 330// 331// The documents parameter must be a slice of documents to insert. The slice cannot be nil or empty. The elements must 332// all be non-nil. For any document that does not have an _id field when transformed into BSON, one will be added 333// automatically to the marshalled document. The original document will not be modified. The _id values for the inserted 334// documents can be retrieved from the InsertedIDs field of the returnd InsertManyResult. 335// 336// The opts parameter can be used to specify options for the operation (see the options.InsertManyOptions documentation.) 337// 338// For more information about the command, see https://docs.mongodb.com/manual/reference/command/insert/. 339func (coll *Collection) InsertMany(ctx context.Context, documents []interface{}, 340 opts ...*options.InsertManyOptions) (*InsertManyResult, error) { 341 342 if len(documents) == 0 { 343 return nil, ErrEmptySlice 344 } 345 346 result, err := coll.insert(ctx, documents, opts...) 347 rr, err := processWriteError(err) 348 if rr&rrMany == 0 { 349 return nil, err 350 } 351 352 imResult := &InsertManyResult{InsertedIDs: result} 353 writeException, ok := err.(WriteException) 354 if !ok { 355 return imResult, err 356 } 357 358 // create and return a BulkWriteException 359 bwErrors := make([]BulkWriteError, 0, len(writeException.WriteErrors)) 360 for _, we := range writeException.WriteErrors { 361 bwErrors = append(bwErrors, BulkWriteError{ 362 WriteError{ 363 Index: we.Index, 364 Code: we.Code, 365 Message: we.Message, 366 }, 367 nil, 368 }) 369 } 370 return imResult, BulkWriteException{ 371 WriteErrors: bwErrors, 372 WriteConcernError: writeException.WriteConcernError, 373 } 374} 375 376func (coll *Collection) delete(ctx context.Context, filter interface{}, deleteOne bool, expectedRr returnResult, 377 opts ...*options.DeleteOptions) (*DeleteResult, error) { 378 379 if ctx == nil { 380 ctx = context.Background() 381 } 382 383 f, err := transformBsoncoreDocument(coll.registry, filter) 384 if err != nil { 385 return nil, err 386 } 387 388 sess := sessionFromContext(ctx) 389 if sess == nil && coll.client.sessionPool != nil { 390 sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit) 391 if err != nil { 392 return nil, err 393 } 394 defer sess.EndSession() 395 } 396 397 err = coll.client.validSession(sess) 398 if err != nil { 399 return nil, err 400 } 401 402 wc := coll.writeConcern 403 if sess.TransactionRunning() { 404 wc = nil 405 } 406 if !writeconcern.AckWrite(wc) { 407 sess = nil 408 } 409 410 selector := makePinnedSelector(sess, coll.writeSelector) 411 412 var limit int32 413 if deleteOne { 414 limit = 1 415 } 416 do := options.MergeDeleteOptions(opts...) 417 didx, doc := bsoncore.AppendDocumentStart(nil) 418 doc = bsoncore.AppendDocumentElement(doc, "q", f) 419 doc = bsoncore.AppendInt32Element(doc, "limit", limit) 420 if do.Collation != nil { 421 doc = bsoncore.AppendDocumentElement(doc, "collation", do.Collation.ToDocument()) 422 } 423 doc, _ = bsoncore.AppendDocumentEnd(doc, didx) 424 425 op := operation.NewDelete(doc). 426 Session(sess).WriteConcern(wc).CommandMonitor(coll.client.monitor). 427 ServerSelector(selector).ClusterClock(coll.client.clock). 428 Database(coll.db.name).Collection(coll.name). 429 Deployment(coll.client.deployment).Crypt(coll.client.crypt) 430 431 // deleteMany cannot be retried 432 retryMode := driver.RetryNone 433 if deleteOne && coll.client.retryWrites { 434 retryMode = driver.RetryOncePerCommand 435 } 436 op = op.Retry(retryMode) 437 rr, err := processWriteError(op.Execute(ctx)) 438 if rr&expectedRr == 0 { 439 return nil, err 440 } 441 return &DeleteResult{DeletedCount: int64(op.Result().N)}, err 442} 443 444// DeleteOne executes a delete command to delete at most one document from the collection. 445// 446// The filter parameter must be a document containing query operators and can be used to select the document to be 447// deleted. It cannot be nil. If the filter does not match any documents, the operation will succeed and a DeleteResult 448// with a DeletedCount of 0 will be returned. If the filter matches multiple documents, one will be selected from the 449// matched set. 450// 451// The opts parameter can be used to specify options for the operation (see the options.DeleteOptions documentation). 452// 453// For more information about the command, see https://docs.mongodb.com/manual/reference/command/delete/. 454func (coll *Collection) DeleteOne(ctx context.Context, filter interface{}, 455 opts ...*options.DeleteOptions) (*DeleteResult, error) { 456 457 return coll.delete(ctx, filter, true, rrOne, opts...) 458} 459 460// DeleteMany executes a delete command to delete documents from the collection. 461// 462// The filter parameter must be a document containing query operators and can be used to select the documents to 463// be deleted. It cannot be nil. An empty document (e.g. bson.D{}) should be used to delete all documents in the 464// collection. If the filter does not match any documents, the operation will succeed and a DeleteResult with a 465// DeletedCount of 0 will be returned. 466// 467// The opts parameter can be used to specify options for the operation (see the options.DeleteOptions documentation). 468// 469// For more information about the command, see https://docs.mongodb.com/manual/reference/command/delete/. 470func (coll *Collection) DeleteMany(ctx context.Context, filter interface{}, 471 opts ...*options.DeleteOptions) (*DeleteResult, error) { 472 473 return coll.delete(ctx, filter, false, rrMany, opts...) 474} 475 476func (coll *Collection) updateOrReplace(ctx context.Context, filter bsoncore.Document, update interface{}, multi bool, 477 expectedRr returnResult, checkDollarKey bool, opts ...*options.UpdateOptions) (*UpdateResult, error) { 478 479 if ctx == nil { 480 ctx = context.Background() 481 } 482 483 uo := options.MergeUpdateOptions(opts...) 484 uidx, updateDoc := bsoncore.AppendDocumentStart(nil) 485 updateDoc = bsoncore.AppendDocumentElement(updateDoc, "q", filter) 486 487 u, err := transformUpdateValue(coll.registry, update, checkDollarKey) 488 if err != nil { 489 return nil, err 490 } 491 updateDoc = bsoncore.AppendValueElement(updateDoc, "u", u) 492 if multi { 493 updateDoc = bsoncore.AppendBooleanElement(updateDoc, "multi", multi) 494 } 495 496 // collation, arrayFilters, and upsert are included on the individual update documents rather than as part of the 497 // command 498 if uo.Collation != nil { 499 updateDoc = bsoncore.AppendDocumentElement(updateDoc, "collation", bsoncore.Document(uo.Collation.ToDocument())) 500 } 501 if uo.ArrayFilters != nil { 502 arr, err := uo.ArrayFilters.ToArrayDocument() 503 if err != nil { 504 return nil, err 505 } 506 updateDoc = bsoncore.AppendArrayElement(updateDoc, "arrayFilters", arr) 507 } 508 if uo.Upsert != nil { 509 updateDoc = bsoncore.AppendBooleanElement(updateDoc, "upsert", *uo.Upsert) 510 } 511 updateDoc, _ = bsoncore.AppendDocumentEnd(updateDoc, uidx) 512 513 sess := sessionFromContext(ctx) 514 if sess == nil && coll.client.sessionPool != nil { 515 var err error 516 sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit) 517 if err != nil { 518 return nil, err 519 } 520 defer sess.EndSession() 521 } 522 523 err = coll.client.validSession(sess) 524 if err != nil { 525 return nil, err 526 } 527 528 wc := coll.writeConcern 529 if sess.TransactionRunning() { 530 wc = nil 531 } 532 if !writeconcern.AckWrite(wc) { 533 sess = nil 534 } 535 536 selector := makePinnedSelector(sess, coll.writeSelector) 537 538 op := operation.NewUpdate(updateDoc). 539 Session(sess).WriteConcern(wc).CommandMonitor(coll.client.monitor). 540 ServerSelector(selector).ClusterClock(coll.client.clock). 541 Database(coll.db.name).Collection(coll.name). 542 Deployment(coll.client.deployment).Crypt(coll.client.crypt) 543 544 if uo.BypassDocumentValidation != nil && *uo.BypassDocumentValidation { 545 op = op.BypassDocumentValidation(*uo.BypassDocumentValidation) 546 } 547 retry := driver.RetryNone 548 // retryable writes are only enabled updateOne/replaceOne operations 549 if !multi && coll.client.retryWrites { 550 retry = driver.RetryOncePerCommand 551 } 552 op = op.Retry(retry) 553 err = op.Execute(ctx) 554 555 rr, err := processWriteError(err) 556 if rr&expectedRr == 0 { 557 return nil, err 558 } 559 560 opRes := op.Result() 561 res := &UpdateResult{ 562 MatchedCount: int64(opRes.N), 563 ModifiedCount: int64(opRes.NModified), 564 UpsertedCount: int64(len(opRes.Upserted)), 565 } 566 if len(opRes.Upserted) > 0 { 567 res.UpsertedID = opRes.Upserted[0].ID 568 res.MatchedCount-- 569 } 570 571 return res, err 572} 573 574// UpdateOne executes an update command to update at most one document in the collection. 575// 576// The filter parameter must be a document containing query operators and can be used to select the document to be 577// updated. It cannot be nil. If the filter does not match any documents, the operation will succeed and an UpdateResult 578// with a MatchedCount of 0 will be returned. If the filter matches multiple documents, one will be selected from the 579// matched set and MatchedCount will equal 1. 580// 581// The update parameter must be a document containing update operators 582// (https://docs.mongodb.com/manual/reference/operator/update/) and can be used to specify the modifications to be 583// made to the selected document. It cannot be nil or empty. 584// 585// The opts parameter can be used to specify options for the operation (see the options.UpdateOptions documentation). 586// 587// For more information about the command, see https://docs.mongodb.com/manual/reference/command/update/. 588func (coll *Collection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, 589 opts ...*options.UpdateOptions) (*UpdateResult, error) { 590 591 if ctx == nil { 592 ctx = context.Background() 593 } 594 595 f, err := transformBsoncoreDocument(coll.registry, filter) 596 if err != nil { 597 return nil, err 598 } 599 600 return coll.updateOrReplace(ctx, f, update, false, rrOne, true, opts...) 601} 602 603// UpdateMany executes an update command to update documents in the collection. 604// 605// The filter parameter must be a document containing query operators and can be used to select the documents to be 606// updated. It cannot be nil. If the filter does not match any documents, the operation will succeed and an UpdateResult 607// with a MatchedCount of 0 will be returned. 608// 609// The update parameter must be a document containing update operators 610// (https://docs.mongodb.com/manual/reference/operator/update/) and can be used to specify the modifications to be made 611// to the selected documents. It cannot be nil or empty. 612// 613// The opts parameter can be used to specify options for the operation (see the options.UpdateOptions documentation). 614// 615// For more information about the command, see https://docs.mongodb.com/manual/reference/command/update/. 616func (coll *Collection) UpdateMany(ctx context.Context, filter interface{}, update interface{}, 617 opts ...*options.UpdateOptions) (*UpdateResult, error) { 618 619 if ctx == nil { 620 ctx = context.Background() 621 } 622 623 f, err := transformBsoncoreDocument(coll.registry, filter) 624 if err != nil { 625 return nil, err 626 } 627 628 return coll.updateOrReplace(ctx, f, update, true, rrMany, true, opts...) 629} 630 631// ReplaceOne executes an update command to replace at most one document in the collection. 632// 633// The filter parameter must be a document containing query operators and can be used to select the document to be 634// replaced. It cannot be nil. If the filter does not match any documents, the operation will succeed and an 635// UpdateResult with a MatchedCount of 0 will be returned. If the filter matches multiple documents, one will be 636// selected from the matched set and MatchedCount will equal 1. 637// 638// The replacement parameter must be a document that will be used to replace the selected document. It cannot be nil 639// and cannot contain any update operators (https://docs.mongodb.com/manual/reference/operator/update/). 640// 641// The opts parameter can be used to specify options for the operation (see the options.ReplaceOptions documentation). 642// 643// For more information about the command, see https://docs.mongodb.com/manual/reference/command/update/. 644func (coll *Collection) ReplaceOne(ctx context.Context, filter interface{}, 645 replacement interface{}, opts ...*options.ReplaceOptions) (*UpdateResult, error) { 646 647 if ctx == nil { 648 ctx = context.Background() 649 } 650 651 f, err := transformBsoncoreDocument(coll.registry, filter) 652 if err != nil { 653 return nil, err 654 } 655 656 r, err := transformBsoncoreDocument(coll.registry, replacement) 657 if err != nil { 658 return nil, err 659 } 660 661 if elem, err := r.IndexErr(0); err == nil && strings.HasPrefix(elem.Key(), "$") { 662 return nil, errors.New("replacement document cannot contains keys beginning with '$") 663 } 664 665 updateOptions := make([]*options.UpdateOptions, 0, len(opts)) 666 for _, opt := range opts { 667 uOpts := options.Update() 668 uOpts.BypassDocumentValidation = opt.BypassDocumentValidation 669 uOpts.Collation = opt.Collation 670 uOpts.Upsert = opt.Upsert 671 updateOptions = append(updateOptions, uOpts) 672 } 673 674 return coll.updateOrReplace(ctx, f, r, false, rrOne, false, updateOptions...) 675} 676 677// Aggregate executes an aggregate command against the collection and returns a cursor over the resulting documents. 678// 679// The pipeline parameter must be an array of documents, each representing an aggregation stage. The pipeline cannot 680// be nil but can be empty. The stage documents must all be non-nil. For a pipeline of bson.D documents, the 681// mongo.Pipeline type can be used. See 682// https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/#db-collection-aggregate-stages for a list of 683// valid stages in aggregations. 684// 685// The opts parameter can be used to specify options for the operation (see the options.AggregateOptions documentation.) 686// 687// For more information about the command, see https://docs.mongodb.com/manual/reference/command/aggregate/. 688func (coll *Collection) Aggregate(ctx context.Context, pipeline interface{}, 689 opts ...*options.AggregateOptions) (*Cursor, error) { 690 a := aggregateParams{ 691 ctx: ctx, 692 pipeline: pipeline, 693 client: coll.client, 694 registry: coll.registry, 695 readConcern: coll.readConcern, 696 writeConcern: coll.writeConcern, 697 retryRead: coll.client.retryReads, 698 db: coll.db.name, 699 col: coll.name, 700 readSelector: coll.readSelector, 701 writeSelector: coll.writeSelector, 702 readPreference: coll.readPreference, 703 opts: opts, 704 } 705 return aggregate(a) 706} 707 708// aggreate is the helper method for Aggregate 709func aggregate(a aggregateParams) (*Cursor, error) { 710 711 if a.ctx == nil { 712 a.ctx = context.Background() 713 } 714 715 pipelineArr, hasOutputStage, err := transformAggregatePipelinev2(a.registry, a.pipeline) 716 if err != nil { 717 return nil, err 718 } 719 720 sess := sessionFromContext(a.ctx) 721 if sess == nil && a.client.sessionPool != nil { 722 sess, err = session.NewClientSession(a.client.sessionPool, a.client.id, session.Implicit) 723 if err != nil { 724 return nil, err 725 } 726 } 727 if err = a.client.validSession(sess); err != nil { 728 return nil, err 729 } 730 731 var wc *writeconcern.WriteConcern 732 if hasOutputStage { 733 wc = a.writeConcern 734 } 735 rc := a.readConcern 736 if sess.TransactionRunning() { 737 wc = nil 738 rc = nil 739 } 740 if !writeconcern.AckWrite(wc) { 741 closeImplicitSession(sess) 742 sess = nil 743 } 744 745 selector := makePinnedSelector(sess, a.writeSelector) 746 if !hasOutputStage { 747 selector = makeReadPrefSelector(sess, a.readSelector, a.client.localThreshold) 748 } 749 750 ao := options.MergeAggregateOptions(a.opts...) 751 cursorOpts := driver.CursorOptions{ 752 CommandMonitor: a.client.monitor, 753 Crypt: a.client.crypt, 754 } 755 756 op := operation.NewAggregate(pipelineArr).Session(sess).WriteConcern(wc).ReadConcern(rc).ReadPreference(a.readPreference).CommandMonitor(a.client.monitor). 757 ServerSelector(selector).ClusterClock(a.client.clock).Database(a.db).Collection(a.col).Deployment(a.client.deployment).Crypt(a.client.crypt) 758 if ao.AllowDiskUse != nil { 759 op.AllowDiskUse(*ao.AllowDiskUse) 760 } 761 // ignore batchSize of 0 with $out 762 if ao.BatchSize != nil && !(*ao.BatchSize == 0 && hasOutputStage) { 763 op.BatchSize(*ao.BatchSize) 764 cursorOpts.BatchSize = *ao.BatchSize 765 } 766 if ao.BypassDocumentValidation != nil && *ao.BypassDocumentValidation { 767 op.BypassDocumentValidation(*ao.BypassDocumentValidation) 768 } 769 if ao.Collation != nil { 770 op.Collation(bsoncore.Document(ao.Collation.ToDocument())) 771 } 772 if ao.MaxTime != nil { 773 op.MaxTimeMS(int64(*ao.MaxTime / time.Millisecond)) 774 } 775 if ao.MaxAwaitTime != nil { 776 cursorOpts.MaxTimeMS = int64(*ao.MaxAwaitTime / time.Millisecond) 777 } 778 if ao.Comment != nil { 779 op.Comment(*ao.Comment) 780 } 781 if ao.Hint != nil { 782 hintVal, err := transformValue(a.registry, ao.Hint) 783 if err != nil { 784 closeImplicitSession(sess) 785 return nil, err 786 } 787 op.Hint(hintVal) 788 } 789 790 retry := driver.RetryNone 791 if a.retryRead && !hasOutputStage { 792 retry = driver.RetryOncePerCommand 793 } 794 op = op.Retry(retry) 795 796 err = op.Execute(a.ctx) 797 if err != nil { 798 closeImplicitSession(sess) 799 if wce, ok := err.(driver.WriteCommandError); ok && wce.WriteConcernError != nil { 800 return nil, *convertDriverWriteConcernError(wce.WriteConcernError) 801 } 802 return nil, replaceErrors(err) 803 } 804 805 bc, err := op.Result(cursorOpts) 806 if err != nil { 807 closeImplicitSession(sess) 808 return nil, replaceErrors(err) 809 } 810 cursor, err := newCursorWithSession(bc, a.registry, sess) 811 return cursor, replaceErrors(err) 812} 813 814// CountDocuments returns the number of documents in the collection. For a fast count of the documents in the 815// collection, see the EstimatedDocumentCount method. 816// 817// The filter parameter must be a document and can be used to select which documents contribute to the count. It 818// cannot be nil. An empty document (e.g. bson.D{}) should be used to count all documents in the collection. This will 819// result in a full collection scan. 820// 821// The opts parameter can be used to specify options for the operation (see the options.CountOptions documentation). 822func (coll *Collection) CountDocuments(ctx context.Context, filter interface{}, 823 opts ...*options.CountOptions) (int64, error) { 824 825 if ctx == nil { 826 ctx = context.Background() 827 } 828 829 countOpts := options.MergeCountOptions(opts...) 830 831 pipelineArr, err := countDocumentsAggregatePipeline(coll.registry, filter, countOpts) 832 if err != nil { 833 return 0, err 834 } 835 836 sess := sessionFromContext(ctx) 837 if sess == nil && coll.client.sessionPool != nil { 838 sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit) 839 if err != nil { 840 return 0, err 841 } 842 defer sess.EndSession() 843 } 844 if err = coll.client.validSession(sess); err != nil { 845 return 0, err 846 } 847 848 rc := coll.readConcern 849 if sess.TransactionRunning() { 850 rc = nil 851 } 852 853 selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold) 854 op := operation.NewAggregate(pipelineArr).Session(sess).ReadConcern(rc).ReadPreference(coll.readPreference). 855 CommandMonitor(coll.client.monitor).ServerSelector(selector).ClusterClock(coll.client.clock).Database(coll.db.name). 856 Collection(coll.name).Deployment(coll.client.deployment).Crypt(coll.client.crypt) 857 if countOpts.Collation != nil { 858 op.Collation(bsoncore.Document(countOpts.Collation.ToDocument())) 859 } 860 if countOpts.MaxTime != nil { 861 op.MaxTimeMS(int64(*countOpts.MaxTime / time.Millisecond)) 862 } 863 if countOpts.Hint != nil { 864 hintVal, err := transformValue(coll.registry, countOpts.Hint) 865 if err != nil { 866 return 0, err 867 } 868 op.Hint(hintVal) 869 } 870 retry := driver.RetryNone 871 if coll.client.retryReads { 872 retry = driver.RetryOncePerCommand 873 } 874 op = op.Retry(retry) 875 876 err = op.Execute(ctx) 877 if err != nil { 878 return 0, replaceErrors(err) 879 } 880 881 batch := op.ResultCursorResponse().FirstBatch 882 if batch == nil { 883 return 0, errors.New("invalid response from server, no 'firstBatch' field") 884 } 885 886 docs, err := batch.Documents() 887 if err != nil || len(docs) == 0 { 888 return 0, nil 889 } 890 891 val, ok := docs[0].Lookup("n").AsInt64OK() 892 if !ok { 893 return 0, errors.New("invalid response from server, no 'n' field") 894 } 895 896 return val, nil 897} 898 899// EstimatedDocumentCount executes a count command and returns an estimate of the number of documents in the collection 900// using collection metadata. 901// 902// The opts parameter can be used to specify options for the operation (see the options.EstimatedDocumentCountOptions 903// documentation). 904// 905// For more information about the command, see https://docs.mongodb.com/manual/reference/command/count/. 906func (coll *Collection) EstimatedDocumentCount(ctx context.Context, 907 opts ...*options.EstimatedDocumentCountOptions) (int64, error) { 908 909 if ctx == nil { 910 ctx = context.Background() 911 } 912 913 sess := sessionFromContext(ctx) 914 915 var err error 916 if sess == nil && coll.client.sessionPool != nil { 917 sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit) 918 if err != nil { 919 return 0, err 920 } 921 defer sess.EndSession() 922 } 923 924 err = coll.client.validSession(sess) 925 if err != nil { 926 return 0, err 927 } 928 929 rc := coll.readConcern 930 if sess.TransactionRunning() { 931 rc = nil 932 } 933 934 selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold) 935 op := operation.NewCount().Session(sess).ClusterClock(coll.client.clock). 936 Database(coll.db.name).Collection(coll.name).CommandMonitor(coll.client.monitor). 937 Deployment(coll.client.deployment).ReadConcern(rc).ReadPreference(coll.readPreference). 938 ServerSelector(selector).Crypt(coll.client.crypt) 939 940 co := options.MergeEstimatedDocumentCountOptions(opts...) 941 if co.MaxTime != nil { 942 op = op.MaxTimeMS(int64(*co.MaxTime / time.Millisecond)) 943 } 944 retry := driver.RetryNone 945 if coll.client.retryReads { 946 retry = driver.RetryOncePerCommand 947 } 948 op.Retry(retry) 949 950 err = op.Execute(ctx) 951 952 return op.Result().N, replaceErrors(err) 953} 954 955// Distinct executes a distinct command to find the unique values for a specified field in the collection. 956// 957// The fieldName parameter specifies the field name for which distinct values should be returned. 958// 959// The filter parameter must be a document containing query operators and can be used to select which documents are 960// considered. It cannot be nil. An empty document (e.g. bson.D{}) should be used to select all documents. 961// 962// The opts parameter can be used to specify options for the operation (see the options.DistinctOptions documentation). 963// 964// For more information about the command, see https://docs.mongodb.com/manual/reference/command/distinct/. 965func (coll *Collection) Distinct(ctx context.Context, fieldName string, filter interface{}, 966 opts ...*options.DistinctOptions) ([]interface{}, error) { 967 968 if ctx == nil { 969 ctx = context.Background() 970 } 971 972 f, err := transformBsoncoreDocument(coll.registry, filter) 973 if err != nil { 974 return nil, err 975 } 976 977 sess := sessionFromContext(ctx) 978 979 if sess == nil && coll.client.sessionPool != nil { 980 sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit) 981 if err != nil { 982 return nil, err 983 } 984 defer sess.EndSession() 985 } 986 987 err = coll.client.validSession(sess) 988 if err != nil { 989 return nil, err 990 } 991 992 rc := coll.readConcern 993 if sess.TransactionRunning() { 994 rc = nil 995 } 996 997 selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold) 998 option := options.MergeDistinctOptions(opts...) 999 1000 op := operation.NewDistinct(fieldName, bsoncore.Document(f)). 1001 Session(sess).ClusterClock(coll.client.clock). 1002 Database(coll.db.name).Collection(coll.name).CommandMonitor(coll.client.monitor). 1003 Deployment(coll.client.deployment).ReadConcern(rc).ReadPreference(coll.readPreference). 1004 ServerSelector(selector).Crypt(coll.client.crypt) 1005 1006 if option.Collation != nil { 1007 op.Collation(bsoncore.Document(option.Collation.ToDocument())) 1008 } 1009 if option.MaxTime != nil { 1010 op.MaxTimeMS(int64(*option.MaxTime / time.Millisecond)) 1011 } 1012 retry := driver.RetryNone 1013 if coll.client.retryReads { 1014 retry = driver.RetryOncePerCommand 1015 } 1016 op = op.Retry(retry) 1017 1018 err = op.Execute(ctx) 1019 if err != nil { 1020 return nil, replaceErrors(err) 1021 } 1022 1023 arr, ok := op.Result().Values.ArrayOK() 1024 if !ok { 1025 return nil, fmt.Errorf("response field 'values' is type array, but received BSON type %s", op.Result().Values.Type) 1026 } 1027 1028 values, err := arr.Values() 1029 if err != nil { 1030 return nil, err 1031 } 1032 1033 retArray := make([]interface{}, len(values)) 1034 1035 for i, val := range values { 1036 raw := bson.RawValue{Type: val.Type, Value: val.Data} 1037 err = raw.Unmarshal(&retArray[i]) 1038 if err != nil { 1039 return nil, err 1040 } 1041 } 1042 1043 return retArray, replaceErrors(err) 1044} 1045 1046// Find executes a find command and returns a Cursor over the matching documents in the collection. 1047// 1048// The filter parameter must be a document containing query operators and can be used to select which documents are 1049// included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include all documents. 1050// 1051// The opts parameter can be used to specify options for the operation (see the options.FindOptions documentation). 1052// 1053// For more information about the command, see https://docs.mongodb.com/manual/reference/command/find/. 1054func (coll *Collection) Find(ctx context.Context, filter interface{}, 1055 opts ...*options.FindOptions) (*Cursor, error) { 1056 1057 if ctx == nil { 1058 ctx = context.Background() 1059 } 1060 1061 f, err := transformBsoncoreDocument(coll.registry, filter) 1062 if err != nil { 1063 return nil, err 1064 } 1065 1066 sess := sessionFromContext(ctx) 1067 if sess == nil && coll.client.sessionPool != nil { 1068 var err error 1069 sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit) 1070 if err != nil { 1071 return nil, err 1072 } 1073 } 1074 1075 err = coll.client.validSession(sess) 1076 if err != nil { 1077 closeImplicitSession(sess) 1078 return nil, err 1079 } 1080 1081 rc := coll.readConcern 1082 if sess.TransactionRunning() { 1083 rc = nil 1084 } 1085 1086 selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold) 1087 op := operation.NewFind(f). 1088 Session(sess).ReadConcern(rc).ReadPreference(coll.readPreference). 1089 CommandMonitor(coll.client.monitor).ServerSelector(selector). 1090 ClusterClock(coll.client.clock).Database(coll.db.name).Collection(coll.name). 1091 Deployment(coll.client.deployment).Crypt(coll.client.crypt) 1092 1093 fo := options.MergeFindOptions(opts...) 1094 cursorOpts := driver.CursorOptions{ 1095 CommandMonitor: coll.client.monitor, 1096 Crypt: coll.client.crypt, 1097 } 1098 1099 if fo.AllowPartialResults != nil { 1100 op.AllowPartialResults(*fo.AllowPartialResults) 1101 } 1102 if fo.BatchSize != nil { 1103 cursorOpts.BatchSize = *fo.BatchSize 1104 op.BatchSize(*fo.BatchSize) 1105 } 1106 if fo.Collation != nil { 1107 op.Collation(bsoncore.Document(fo.Collation.ToDocument())) 1108 } 1109 if fo.Comment != nil { 1110 op.Comment(*fo.Comment) 1111 } 1112 if fo.CursorType != nil { 1113 switch *fo.CursorType { 1114 case options.Tailable: 1115 op.Tailable(true) 1116 case options.TailableAwait: 1117 op.Tailable(true) 1118 op.AwaitData(true) 1119 } 1120 } 1121 if fo.Hint != nil { 1122 hint, err := transformValue(coll.registry, fo.Hint) 1123 if err != nil { 1124 closeImplicitSession(sess) 1125 return nil, err 1126 } 1127 op.Hint(hint) 1128 } 1129 if fo.Limit != nil { 1130 limit := *fo.Limit 1131 if limit < 0 { 1132 limit = -1 * limit 1133 op.SingleBatch(true) 1134 } 1135 cursorOpts.Limit = int32(limit) 1136 op.Limit(limit) 1137 } 1138 if fo.Max != nil { 1139 max, err := transformBsoncoreDocument(coll.registry, fo.Max) 1140 if err != nil { 1141 closeImplicitSession(sess) 1142 return nil, err 1143 } 1144 op.Max(max) 1145 } 1146 if fo.MaxAwaitTime != nil { 1147 cursorOpts.MaxTimeMS = int64(*fo.MaxAwaitTime / time.Millisecond) 1148 } 1149 if fo.MaxTime != nil { 1150 op.MaxTimeMS(int64(*fo.MaxTime / time.Millisecond)) 1151 } 1152 if fo.Min != nil { 1153 min, err := transformBsoncoreDocument(coll.registry, fo.Min) 1154 if err != nil { 1155 closeImplicitSession(sess) 1156 return nil, err 1157 } 1158 op.Min(min) 1159 } 1160 if fo.NoCursorTimeout != nil { 1161 op.NoCursorTimeout(*fo.NoCursorTimeout) 1162 } 1163 if fo.OplogReplay != nil { 1164 op.OplogReplay(*fo.OplogReplay) 1165 } 1166 if fo.Projection != nil { 1167 proj, err := transformBsoncoreDocument(coll.registry, fo.Projection) 1168 if err != nil { 1169 closeImplicitSession(sess) 1170 return nil, err 1171 } 1172 op.Projection(proj) 1173 } 1174 if fo.ReturnKey != nil { 1175 op.ReturnKey(*fo.ReturnKey) 1176 } 1177 if fo.ShowRecordID != nil { 1178 op.ShowRecordID(*fo.ShowRecordID) 1179 } 1180 if fo.Skip != nil { 1181 op.Skip(*fo.Skip) 1182 } 1183 if fo.Snapshot != nil { 1184 op.Snapshot(*fo.Snapshot) 1185 } 1186 if fo.Sort != nil { 1187 sort, err := transformBsoncoreDocument(coll.registry, fo.Sort) 1188 if err != nil { 1189 closeImplicitSession(sess) 1190 return nil, err 1191 } 1192 op.Sort(sort) 1193 } 1194 retry := driver.RetryNone 1195 if coll.client.retryReads { 1196 retry = driver.RetryOncePerCommand 1197 } 1198 op = op.Retry(retry) 1199 1200 if err = op.Execute(ctx); err != nil { 1201 closeImplicitSession(sess) 1202 return nil, replaceErrors(err) 1203 } 1204 1205 bc, err := op.Result(cursorOpts) 1206 if err != nil { 1207 closeImplicitSession(sess) 1208 return nil, replaceErrors(err) 1209 } 1210 return newCursorWithSession(bc, coll.registry, sess) 1211} 1212 1213// FindOne executes a find command and returns a SingleResult for one document in the collection. 1214// 1215// The filter parameter must be a document containing query operators and can be used to select the document to be 1216// returned. It cannot be nil. If the filter does not match any documents, a SingleResult with an error set to 1217// ErrNoDocuments will be returned. If the filter matches multiple documents, one will be selected from the matched set. 1218// 1219// The opts parameter can be used to specify options for this operation (see the options.FindOneOptions documentation). 1220// 1221// For more information about the command, see https://docs.mongodb.com/manual/reference/command/find/. 1222func (coll *Collection) FindOne(ctx context.Context, filter interface{}, 1223 opts ...*options.FindOneOptions) *SingleResult { 1224 1225 if ctx == nil { 1226 ctx = context.Background() 1227 } 1228 1229 findOpts := make([]*options.FindOptions, len(opts)) 1230 for i, opt := range opts { 1231 findOpts[i] = &options.FindOptions{ 1232 AllowPartialResults: opt.AllowPartialResults, 1233 BatchSize: opt.BatchSize, 1234 Collation: opt.Collation, 1235 Comment: opt.Comment, 1236 CursorType: opt.CursorType, 1237 Hint: opt.Hint, 1238 Max: opt.Max, 1239 MaxAwaitTime: opt.MaxAwaitTime, 1240 Min: opt.Min, 1241 NoCursorTimeout: opt.NoCursorTimeout, 1242 OplogReplay: opt.OplogReplay, 1243 Projection: opt.Projection, 1244 ReturnKey: opt.ReturnKey, 1245 ShowRecordID: opt.ShowRecordID, 1246 Skip: opt.Skip, 1247 Snapshot: opt.Snapshot, 1248 Sort: opt.Sort, 1249 } 1250 } 1251 // Unconditionally send a limit to make sure only one document is returned and the cursor is not kept open 1252 // by the server. 1253 findOpts = append(findOpts, options.Find().SetLimit(-1)) 1254 1255 cursor, err := coll.Find(ctx, filter, findOpts...) 1256 return &SingleResult{cur: cursor, reg: coll.registry, err: replaceErrors(err)} 1257} 1258 1259func (coll *Collection) findAndModify(ctx context.Context, op *operation.FindAndModify) *SingleResult { 1260 if ctx == nil { 1261 ctx = context.Background() 1262 } 1263 1264 sess := sessionFromContext(ctx) 1265 var err error 1266 if sess == nil && coll.client.sessionPool != nil { 1267 sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit) 1268 if err != nil { 1269 return &SingleResult{err: err} 1270 } 1271 defer sess.EndSession() 1272 } 1273 1274 err = coll.client.validSession(sess) 1275 if err != nil { 1276 return &SingleResult{err: err} 1277 } 1278 1279 wc := coll.writeConcern 1280 if sess.TransactionRunning() { 1281 wc = nil 1282 } 1283 if !writeconcern.AckWrite(wc) { 1284 sess = nil 1285 } 1286 1287 selector := makePinnedSelector(sess, coll.writeSelector) 1288 1289 retry := driver.RetryNone 1290 if coll.client.retryWrites { 1291 retry = driver.RetryOnce 1292 } 1293 1294 op = op.Session(sess). 1295 WriteConcern(wc). 1296 CommandMonitor(coll.client.monitor). 1297 ServerSelector(selector). 1298 ClusterClock(coll.client.clock). 1299 Database(coll.db.name). 1300 Collection(coll.name). 1301 Deployment(coll.client.deployment). 1302 Retry(retry). 1303 Crypt(coll.client.crypt) 1304 1305 _, err = processWriteError(op.Execute(ctx)) 1306 if err != nil { 1307 return &SingleResult{err: err} 1308 } 1309 1310 return &SingleResult{rdr: bson.Raw(op.Result().Value), reg: coll.registry} 1311} 1312 1313// FindOneAndDelete executes a findAndModify command to delete at most one document in the collection. and returns the 1314// document as it appeared before deletion. 1315// 1316// The filter parameter must be a document containing query operators and can be used to select the document to be 1317// deleted. It cannot be nil. If the filter does not match any documents, a SingleResult with an error set to 1318// ErrNoDocuments wil be returned. If the filter matches multiple documents, one will be selected from the matched set. 1319// 1320// The opts parameter can be used to specify options for the operation (see the options.FindOneAndDeleteOptions 1321// documentation). 1322// 1323// For more information about the command, see https://docs.mongodb.com/manual/reference/command/findAndModify/. 1324func (coll *Collection) FindOneAndDelete(ctx context.Context, filter interface{}, 1325 opts ...*options.FindOneAndDeleteOptions) *SingleResult { 1326 1327 f, err := transformBsoncoreDocument(coll.registry, filter) 1328 if err != nil { 1329 return &SingleResult{err: err} 1330 } 1331 fod := options.MergeFindOneAndDeleteOptions(opts...) 1332 op := operation.NewFindAndModify(f).Remove(true) 1333 if fod.Collation != nil { 1334 op = op.Collation(bsoncore.Document(fod.Collation.ToDocument())) 1335 } 1336 if fod.MaxTime != nil { 1337 op = op.MaxTimeMS(int64(*fod.MaxTime / time.Millisecond)) 1338 } 1339 if fod.Projection != nil { 1340 proj, err := transformBsoncoreDocument(coll.registry, fod.Projection) 1341 if err != nil { 1342 return &SingleResult{err: err} 1343 } 1344 op = op.Fields(proj) 1345 } 1346 if fod.Sort != nil { 1347 sort, err := transformBsoncoreDocument(coll.registry, fod.Sort) 1348 if err != nil { 1349 return &SingleResult{err: err} 1350 } 1351 op = op.Sort(sort) 1352 } 1353 1354 return coll.findAndModify(ctx, op) 1355} 1356 1357// FindOneAndReplace executes a findAndModify command to replace at most one document in the collection 1358// and returns the document as it appeared before replacement. 1359// 1360// The filter parameter must be a document containing query operators and can be used to select the document to be 1361// replaced. It cannot be nil. If the filter does not match any documents, a SingleResult with an error set to 1362// ErrNoDocuments wil be returned. If the filter matches multiple documents, one will be selected from the matched set. 1363// 1364// The replacement parameter must be a document that will be used to replace the selected document. It cannot be nil 1365// and cannot contain any update operators (https://docs.mongodb.com/manual/reference/operator/update/). 1366// 1367// The opts parameter can be used to specify options for the operation (see the options.FindOneAndReplaceOptions 1368// documentation). 1369// 1370// For more information about the command, see https://docs.mongodb.com/manual/reference/command/findAndModify/. 1371func (coll *Collection) FindOneAndReplace(ctx context.Context, filter interface{}, 1372 replacement interface{}, opts ...*options.FindOneAndReplaceOptions) *SingleResult { 1373 1374 f, err := transformBsoncoreDocument(coll.registry, filter) 1375 if err != nil { 1376 return &SingleResult{err: err} 1377 } 1378 r, err := transformBsoncoreDocument(coll.registry, replacement) 1379 if err != nil { 1380 return &SingleResult{err: err} 1381 } 1382 if firstElem, err := r.IndexErr(0); err == nil && strings.HasPrefix(firstElem.Key(), "$") { 1383 return &SingleResult{err: errors.New("replacement document cannot contain keys beginning with '$'")} 1384 } 1385 1386 fo := options.MergeFindOneAndReplaceOptions(opts...) 1387 op := operation.NewFindAndModify(f).Update(bsoncore.Value{Type: bsontype.EmbeddedDocument, Data: r}) 1388 if fo.BypassDocumentValidation != nil && *fo.BypassDocumentValidation { 1389 op = op.BypassDocumentValidation(*fo.BypassDocumentValidation) 1390 } 1391 if fo.Collation != nil { 1392 op = op.Collation(bsoncore.Document(fo.Collation.ToDocument())) 1393 } 1394 if fo.MaxTime != nil { 1395 op = op.MaxTimeMS(int64(*fo.MaxTime / time.Millisecond)) 1396 } 1397 if fo.Projection != nil { 1398 proj, err := transformBsoncoreDocument(coll.registry, fo.Projection) 1399 if err != nil { 1400 return &SingleResult{err: err} 1401 } 1402 op = op.Fields(proj) 1403 } 1404 if fo.ReturnDocument != nil { 1405 op = op.NewDocument(*fo.ReturnDocument == options.After) 1406 } 1407 if fo.Sort != nil { 1408 sort, err := transformBsoncoreDocument(coll.registry, fo.Sort) 1409 if err != nil { 1410 return &SingleResult{err: err} 1411 } 1412 op = op.Sort(sort) 1413 } 1414 if fo.Upsert != nil { 1415 op = op.Upsert(*fo.Upsert) 1416 } 1417 1418 return coll.findAndModify(ctx, op) 1419} 1420 1421// FindOneAndUpdate executes a findAndModify command to update at most one document in the collection and returns the 1422// document as it appeared before updating. 1423// 1424// The filter parameter must be a document containing query operators and can be used to select the document to be 1425// updated. It cannot be nil. If the filter does not match any documents, a SingleResult with an error set to 1426// ErrNoDocuments wil be returned. If the filter matches multiple documents, one will be selected from the matched set. 1427// 1428// The update parameter must be a document containing update operators 1429// (https://docs.mongodb.com/manual/reference/operator/update/) and can be used to specify the modifications to be made 1430// to the selected document. It cannot be nil or empty. 1431// 1432// The opts parameter can be used to specify options for the operation (see the options.FindOneAndUpdateOptions 1433// documentation). 1434// 1435// For more information about the command, see https://docs.mongodb.com/manual/reference/command/findAndModify/. 1436func (coll *Collection) FindOneAndUpdate(ctx context.Context, filter interface{}, 1437 update interface{}, opts ...*options.FindOneAndUpdateOptions) *SingleResult { 1438 1439 if ctx == nil { 1440 ctx = context.Background() 1441 } 1442 1443 f, err := transformBsoncoreDocument(coll.registry, filter) 1444 if err != nil { 1445 return &SingleResult{err: err} 1446 } 1447 1448 fo := options.MergeFindOneAndUpdateOptions(opts...) 1449 op := operation.NewFindAndModify(f) 1450 1451 u, err := transformUpdateValue(coll.registry, update, true) 1452 if err != nil { 1453 return &SingleResult{err: err} 1454 } 1455 op = op.Update(u) 1456 1457 if fo.ArrayFilters != nil { 1458 filtersDoc, err := fo.ArrayFilters.ToArrayDocument() 1459 if err != nil { 1460 return &SingleResult{err: err} 1461 } 1462 op = op.ArrayFilters(bsoncore.Document(filtersDoc)) 1463 } 1464 if fo.BypassDocumentValidation != nil && *fo.BypassDocumentValidation { 1465 op = op.BypassDocumentValidation(*fo.BypassDocumentValidation) 1466 } 1467 if fo.Collation != nil { 1468 op = op.Collation(bsoncore.Document(fo.Collation.ToDocument())) 1469 } 1470 if fo.MaxTime != nil { 1471 op = op.MaxTimeMS(int64(*fo.MaxTime / time.Millisecond)) 1472 } 1473 if fo.Projection != nil { 1474 proj, err := transformBsoncoreDocument(coll.registry, fo.Projection) 1475 if err != nil { 1476 return &SingleResult{err: err} 1477 } 1478 op = op.Fields(proj) 1479 } 1480 if fo.ReturnDocument != nil { 1481 op = op.NewDocument(*fo.ReturnDocument == options.After) 1482 } 1483 if fo.Sort != nil { 1484 sort, err := transformBsoncoreDocument(coll.registry, fo.Sort) 1485 if err != nil { 1486 return &SingleResult{err: err} 1487 } 1488 op = op.Sort(sort) 1489 } 1490 if fo.Upsert != nil { 1491 op = op.Upsert(*fo.Upsert) 1492 } 1493 1494 return coll.findAndModify(ctx, op) 1495} 1496 1497// Watch returns a change stream for all changes on the corresponding collection. See 1498// https://docs.mongodb.com/manual/changeStreams/ for more information about change streams. 1499// 1500// The Collection must be configured with read concern majority or no read concern for a change stream to be created 1501// successfully. 1502// 1503// The pipeline parameter must be an array of documents, each representing a pipeline stage. The pipeline cannot be 1504// nil but can be empty. The stage documents must all be non-nil. See https://docs.mongodb.com/manual/changeStreams/ for 1505// a list of pipeline stages that can be used with change streams. For a pipeline of bson.D documents, the 1506// mongo.Pipeline{} type can be used. 1507// 1508// The opts parameter can be used to specify options for change stream creation (see the options.ChangeStreamOptions 1509// documentation). 1510func (coll *Collection) Watch(ctx context.Context, pipeline interface{}, 1511 opts ...*options.ChangeStreamOptions) (*ChangeStream, error) { 1512 1513 csConfig := changeStreamConfig{ 1514 readConcern: coll.readConcern, 1515 readPreference: coll.readPreference, 1516 client: coll.client, 1517 registry: coll.registry, 1518 streamType: CollectionStream, 1519 collectionName: coll.Name(), 1520 databaseName: coll.db.Name(), 1521 } 1522 return newChangeStream(ctx, csConfig, pipeline, opts...) 1523} 1524 1525// Indexes returns an IndexView instance that can be used to perform operations on the indexes for the collection. 1526func (coll *Collection) Indexes() IndexView { 1527 return IndexView{coll: coll} 1528} 1529 1530// Drop drops the collection on the server. This method ignores "namespace not found" errors so it is safe to drop 1531// a collection that does not exist on the server. 1532func (coll *Collection) Drop(ctx context.Context) error { 1533 if ctx == nil { 1534 ctx = context.Background() 1535 } 1536 1537 sess := sessionFromContext(ctx) 1538 if sess == nil && coll.client.sessionPool != nil { 1539 var err error 1540 sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit) 1541 if err != nil { 1542 return err 1543 } 1544 defer sess.EndSession() 1545 } 1546 1547 err := coll.client.validSession(sess) 1548 if err != nil { 1549 return err 1550 } 1551 1552 wc := coll.writeConcern 1553 if sess.TransactionRunning() { 1554 wc = nil 1555 } 1556 if !writeconcern.AckWrite(wc) { 1557 sess = nil 1558 } 1559 1560 selector := makePinnedSelector(sess, coll.writeSelector) 1561 1562 op := operation.NewDropCollection(). 1563 Session(sess).WriteConcern(wc).CommandMonitor(coll.client.monitor). 1564 ServerSelector(selector).ClusterClock(coll.client.clock). 1565 Database(coll.db.name).Collection(coll.name). 1566 Deployment(coll.client.deployment).Crypt(coll.client.crypt) 1567 err = op.Execute(ctx) 1568 1569 // ignore namespace not found erorrs 1570 driverErr, ok := err.(driver.Error) 1571 if !ok || (ok && !driverErr.NamespaceNotFound()) { 1572 return replaceErrors(err) 1573 } 1574 return nil 1575} 1576 1577// makePinnedSelector makes a selector for a pinned session with a pinned server. Will attempt to do server selection on 1578// the pinned server but if that fails it will go through a list of default selectors 1579func makePinnedSelector(sess *session.Client, defaultSelector description.ServerSelector) description.ServerSelectorFunc { 1580 return func(t description.Topology, svrs []description.Server) ([]description.Server, error) { 1581 if sess != nil && sess.PinnedServer != nil { 1582 return sess.PinnedServer.SelectServer(t, svrs) 1583 } 1584 1585 return defaultSelector.SelectServer(t, svrs) 1586 } 1587} 1588 1589func makeReadPrefSelector(sess *session.Client, selector description.ServerSelector, localThreshold time.Duration) description.ServerSelectorFunc { 1590 if sess != nil && sess.TransactionRunning() { 1591 selector = description.CompositeSelector([]description.ServerSelector{ 1592 description.ReadPrefSelector(sess.CurrentRp), 1593 description.LatencySelector(localThreshold), 1594 }) 1595 } 1596 1597 return makePinnedSelector(sess, selector) 1598} 1599