1// Copyright (C) MongoDB, Inc. 2017-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package mongo 8 9import ( 10 "context" 11 "errors" 12 "strings" 13 14 "go.mongodb.org/mongo-driver/bson/bsoncodec" 15 "go.mongodb.org/mongo-driver/mongo/options" 16 "go.mongodb.org/mongo-driver/mongo/readconcern" 17 "go.mongodb.org/mongo-driver/mongo/readpref" 18 "go.mongodb.org/mongo-driver/mongo/writeconcern" 19 "go.mongodb.org/mongo-driver/x/bsonx" 20 "go.mongodb.org/mongo-driver/x/mongo/driver" 21 "go.mongodb.org/mongo-driver/x/mongo/driver/session" 22 "go.mongodb.org/mongo-driver/x/network/command" 23 "go.mongodb.org/mongo-driver/x/network/description" 24 "go.mongodb.org/mongo-driver/x/network/result" 25) 26 27// Collection performs operations on a given collection. 28type Collection struct { 29 client *Client 30 db *Database 31 name string 32 readConcern *readconcern.ReadConcern 33 writeConcern *writeconcern.WriteConcern 34 readPreference *readpref.ReadPref 35 readSelector description.ServerSelector 36 writeSelector description.ServerSelector 37 registry *bsoncodec.Registry 38} 39 40func newCollection(db *Database, name string, opts ...*options.CollectionOptions) *Collection { 41 collOpt := options.MergeCollectionOptions(opts...) 42 43 rc := db.readConcern 44 if collOpt.ReadConcern != nil { 45 rc = collOpt.ReadConcern 46 } 47 48 wc := db.writeConcern 49 if collOpt.WriteConcern != nil { 50 wc = collOpt.WriteConcern 51 } 52 53 rp := db.readPreference 54 if collOpt.ReadPreference != nil { 55 rp = collOpt.ReadPreference 56 } 57 58 reg := db.registry 59 if collOpt.Registry != nil { 60 reg = collOpt.Registry 61 } 62 63 readSelector := description.CompositeSelector([]description.ServerSelector{ 64 description.ReadPrefSelector(rp), 65 description.LatencySelector(db.client.localThreshold), 66 }) 67 68 writeSelector := description.CompositeSelector([]description.ServerSelector{ 69 description.WriteSelector(), 70 description.LatencySelector(db.client.localThreshold), 71 }) 72 73 coll := &Collection{ 74 client: db.client, 75 db: db, 76 name: name, 77 readPreference: rp, 78 readConcern: rc, 79 writeConcern: wc, 80 readSelector: readSelector, 81 writeSelector: writeSelector, 82 registry: reg, 83 } 84 85 return coll 86} 87 88func (coll *Collection) copy() *Collection { 89 return &Collection{ 90 client: coll.client, 91 db: coll.db, 92 name: coll.name, 93 readConcern: coll.readConcern, 94 writeConcern: coll.writeConcern, 95 readPreference: coll.readPreference, 96 readSelector: coll.readSelector, 97 writeSelector: coll.writeSelector, 98 registry: coll.registry, 99 } 100} 101 102// Clone creates a copy of this collection with updated options, if any are given. 103func (coll *Collection) Clone(opts ...*options.CollectionOptions) (*Collection, error) { 104 copyColl := coll.copy() 105 optsColl := options.MergeCollectionOptions(opts...) 106 107 if optsColl.ReadConcern != nil { 108 copyColl.readConcern = optsColl.ReadConcern 109 } 110 111 if optsColl.WriteConcern != nil { 112 copyColl.writeConcern = optsColl.WriteConcern 113 } 114 115 if optsColl.ReadPreference != nil { 116 copyColl.readPreference = optsColl.ReadPreference 117 } 118 119 if optsColl.Registry != nil { 120 copyColl.registry = optsColl.Registry 121 } 122 123 copyColl.readSelector = description.CompositeSelector([]description.ServerSelector{ 124 description.ReadPrefSelector(copyColl.readPreference), 125 description.LatencySelector(copyColl.client.localThreshold), 126 }) 127 128 return copyColl, nil 129} 130 131// Name provides access to the name of the collection. 132func (coll *Collection) Name() string { 133 return coll.name 134} 135 136// namespace returns the namespace of the collection. 137func (coll *Collection) namespace() command.Namespace { 138 return command.NewNamespace(coll.db.name, coll.name) 139} 140 141// Database provides access to the database that contains the collection. 142func (coll *Collection) Database() *Database { 143 return coll.db 144} 145 146// BulkWrite performs a bulk write operation. 147// 148// See https://docs.mongodb.com/manual/core/bulk-write-operations/. 149func (coll *Collection) BulkWrite(ctx context.Context, models []WriteModel, 150 opts ...*options.BulkWriteOptions) (*BulkWriteResult, error) { 151 152 if len(models) == 0 { 153 return nil, ErrEmptySlice 154 } 155 156 if ctx == nil { 157 ctx = context.Background() 158 } 159 160 sess := sessionFromContext(ctx) 161 162 err := coll.client.validSession(sess) 163 if err != nil { 164 return nil, err 165 } 166 167 dispatchModels := make([]driver.WriteModel, len(models)) 168 for i, model := range models { 169 if model == nil { 170 return nil, ErrNilDocument 171 } 172 dispatchModels[i] = model.convertModel() 173 } 174 175 res, err := driver.BulkWrite( 176 ctx, 177 coll.namespace(), 178 dispatchModels, 179 coll.client.topology, 180 coll.writeSelector, 181 coll.client.id, 182 coll.client.topology.SessionPool, 183 coll.client.retryWrites, 184 sess, 185 coll.writeConcern, 186 coll.client.clock, 187 coll.registry, 188 opts..., 189 ) 190 result := BulkWriteResult{ 191 InsertedCount: res.InsertedCount, 192 MatchedCount: res.MatchedCount, 193 ModifiedCount: res.ModifiedCount, 194 DeletedCount: res.DeletedCount, 195 UpsertedCount: res.UpsertedCount, 196 UpsertedIDs: res.UpsertedIDs, 197 } 198 199 return &result, replaceErrors(err) 200} 201 202// InsertOne inserts a single document into the collection. 203func (coll *Collection) InsertOne(ctx context.Context, document interface{}, 204 opts ...*options.InsertOneOptions) (*InsertOneResult, error) { 205 206 if ctx == nil { 207 ctx = context.Background() 208 } 209 210 doc, insertedID, err := transformAndEnsureID(coll.registry, document) 211 if err != nil { 212 return nil, err 213 } 214 215 sess := sessionFromContext(ctx) 216 217 err = coll.client.validSession(sess) 218 if err != nil { 219 return nil, err 220 } 221 222 wc := coll.writeConcern 223 if sess.TransactionRunning() { 224 wc = nil 225 } 226 oldns := coll.namespace() 227 cmd := command.Insert{ 228 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection}, 229 Docs: []bsonx.Doc{doc}, 230 WriteConcern: wc, 231 Session: sess, 232 Clock: coll.client.clock, 233 } 234 235 // convert to InsertManyOptions so these can be argued to dispatch.Insert 236 insertOpts := make([]*options.InsertManyOptions, len(opts)) 237 for i, opt := range opts { 238 insertOpts[i] = options.InsertMany() 239 insertOpts[i].BypassDocumentValidation = opt.BypassDocumentValidation 240 } 241 242 res, err := driver.Insert( 243 ctx, cmd, 244 coll.client.topology, 245 coll.writeSelector, 246 coll.client.id, 247 coll.client.topology.SessionPool, 248 coll.client.retryWrites, 249 insertOpts..., 250 ) 251 252 rr, err := processWriteError(res.WriteConcernError, res.WriteErrors, err) 253 if rr&rrOne == 0 { 254 return nil, err 255 } 256 257 return &InsertOneResult{InsertedID: insertedID}, err 258} 259 260// InsertMany inserts the provided documents. 261func (coll *Collection) InsertMany(ctx context.Context, documents []interface{}, 262 opts ...*options.InsertManyOptions) (*InsertManyResult, error) { 263 264 if ctx == nil { 265 ctx = context.Background() 266 } 267 268 if len(documents) == 0 { 269 return nil, ErrEmptySlice 270 } 271 272 result := make([]interface{}, len(documents)) 273 docs := make([]bsonx.Doc, len(documents)) 274 275 for i, doc := range documents { 276 if doc == nil { 277 return nil, ErrNilDocument 278 } 279 bdoc, insertedID, err := transformAndEnsureID(coll.registry, doc) 280 if err != nil { 281 return nil, err 282 } 283 284 docs[i] = bdoc 285 result[i] = insertedID 286 } 287 288 sess := sessionFromContext(ctx) 289 290 err := coll.client.validSession(sess) 291 if err != nil { 292 return nil, err 293 } 294 295 wc := coll.writeConcern 296 if sess.TransactionRunning() { 297 wc = nil 298 } 299 300 oldns := coll.namespace() 301 cmd := command.Insert{ 302 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection}, 303 Docs: docs, 304 WriteConcern: wc, 305 Session: sess, 306 Clock: coll.client.clock, 307 } 308 309 res, err := driver.Insert( 310 ctx, cmd, 311 coll.client.topology, 312 coll.writeSelector, 313 coll.client.id, 314 coll.client.topology.SessionPool, 315 coll.client.retryWrites, 316 opts..., 317 ) 318 319 switch err { 320 case nil: 321 case command.ErrUnacknowledgedWrite: 322 return &InsertManyResult{InsertedIDs: result}, ErrUnacknowledgedWrite 323 default: 324 return nil, replaceErrors(err) 325 } 326 if len(res.WriteErrors) > 0 || res.WriteConcernError != nil { 327 bwErrors := make([]BulkWriteError, 0, len(res.WriteErrors)) 328 for _, we := range res.WriteErrors { 329 bwErrors = append(bwErrors, BulkWriteError{ 330 WriteError{ 331 Index: we.Index, 332 Code: we.Code, 333 Message: we.ErrMsg, 334 }, 335 nil, 336 }) 337 } 338 339 err = BulkWriteException{ 340 WriteErrors: bwErrors, 341 WriteConcernError: convertWriteConcernError(res.WriteConcernError), 342 } 343 } 344 345 return &InsertManyResult{InsertedIDs: result}, err 346} 347 348// DeleteOne deletes a single document from the collection. 349func (coll *Collection) DeleteOne(ctx context.Context, filter interface{}, 350 opts ...*options.DeleteOptions) (*DeleteResult, error) { 351 352 if ctx == nil { 353 ctx = context.Background() 354 } 355 356 f, err := transformDocument(coll.registry, filter) 357 if err != nil { 358 return nil, err 359 } 360 deleteDocs := []bsonx.Doc{ 361 { 362 {"q", bsonx.Document(f)}, 363 {"limit", bsonx.Int32(1)}, 364 }, 365 } 366 367 sess := sessionFromContext(ctx) 368 369 err = coll.client.validSession(sess) 370 if err != nil { 371 return nil, err 372 } 373 374 wc := coll.writeConcern 375 if sess.TransactionRunning() { 376 wc = nil 377 } 378 379 oldns := coll.namespace() 380 cmd := command.Delete{ 381 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection}, 382 Deletes: deleteDocs, 383 WriteConcern: wc, 384 Session: sess, 385 Clock: coll.client.clock, 386 } 387 388 res, err := driver.Delete( 389 ctx, cmd, 390 coll.client.topology, 391 coll.writeSelector, 392 coll.client.id, 393 coll.client.topology.SessionPool, 394 coll.client.retryWrites, 395 opts..., 396 ) 397 398 rr, err := processWriteError(res.WriteConcernError, res.WriteErrors, err) 399 if rr&rrOne == 0 { 400 return nil, err 401 } 402 return &DeleteResult{DeletedCount: int64(res.N)}, err 403} 404 405// DeleteMany deletes multiple documents from the collection. 406func (coll *Collection) DeleteMany(ctx context.Context, filter interface{}, 407 opts ...*options.DeleteOptions) (*DeleteResult, error) { 408 409 if ctx == nil { 410 ctx = context.Background() 411 } 412 413 f, err := transformDocument(coll.registry, filter) 414 if err != nil { 415 return nil, err 416 } 417 deleteDocs := []bsonx.Doc{{{"q", bsonx.Document(f)}, {"limit", bsonx.Int32(0)}}} 418 419 sess := sessionFromContext(ctx) 420 421 err = coll.client.validSession(sess) 422 if err != nil { 423 return nil, err 424 } 425 426 wc := coll.writeConcern 427 if sess.TransactionRunning() { 428 wc = nil 429 } 430 431 oldns := coll.namespace() 432 cmd := command.Delete{ 433 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection}, 434 Deletes: deleteDocs, 435 WriteConcern: wc, 436 Session: sess, 437 Clock: coll.client.clock, 438 } 439 440 res, err := driver.Delete( 441 ctx, cmd, 442 coll.client.topology, 443 coll.writeSelector, 444 coll.client.id, 445 coll.client.topology.SessionPool, 446 false, 447 opts..., 448 ) 449 450 rr, err := processWriteError(res.WriteConcernError, res.WriteErrors, err) 451 if rr&rrMany == 0 { 452 return nil, err 453 } 454 return &DeleteResult{DeletedCount: int64(res.N)}, err 455} 456 457func (coll *Collection) updateOrReplaceOne(ctx context.Context, filter, 458 update bsonx.Doc, sess *session.Client, opts ...*options.UpdateOptions) (*UpdateResult, error) { 459 460 // TODO: should session be taken from ctx or left as argument? 461 if ctx == nil { 462 ctx = context.Background() 463 } 464 465 updateDocs := []bsonx.Doc{ 466 { 467 {"q", bsonx.Document(filter)}, 468 {"u", bsonx.Document(update)}, 469 {"multi", bsonx.Boolean(false)}, 470 }, 471 } 472 473 wc := coll.writeConcern 474 if sess.TransactionRunning() { 475 wc = nil 476 } 477 478 oldns := coll.namespace() 479 cmd := command.Update{ 480 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection}, 481 Docs: updateDocs, 482 WriteConcern: wc, 483 Session: sess, 484 Clock: coll.client.clock, 485 } 486 487 r, err := driver.Update( 488 ctx, cmd, 489 coll.client.topology, 490 coll.writeSelector, 491 coll.client.id, 492 coll.client.topology.SessionPool, 493 coll.client.retryWrites, 494 opts..., 495 ) 496 if err != nil && err != command.ErrUnacknowledgedWrite { 497 return nil, replaceErrors(err) 498 } 499 500 res := &UpdateResult{ 501 MatchedCount: r.MatchedCount, 502 ModifiedCount: r.ModifiedCount, 503 UpsertedCount: int64(len(r.Upserted)), 504 } 505 if len(r.Upserted) > 0 { 506 res.UpsertedID = r.Upserted[0].ID 507 res.MatchedCount-- 508 } 509 510 rr, err := processWriteError(r.WriteConcernError, r.WriteErrors, err) 511 if rr&rrOne == 0 { 512 return nil, err 513 } 514 return res, err 515} 516 517// UpdateOne updates a single document in the collection. 518func (coll *Collection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, 519 opts ...*options.UpdateOptions) (*UpdateResult, error) { 520 521 if ctx == nil { 522 ctx = context.Background() 523 } 524 525 f, err := transformDocument(coll.registry, filter) 526 if err != nil { 527 return nil, err 528 } 529 530 u, err := transformDocument(coll.registry, update) 531 if err != nil { 532 return nil, err 533 } 534 535 if err := ensureDollarKey(u); err != nil { 536 return nil, err 537 } 538 539 sess := sessionFromContext(ctx) 540 541 err = coll.client.validSession(sess) 542 if err != nil { 543 return nil, err 544 } 545 546 return coll.updateOrReplaceOne(ctx, f, u, sess, opts...) 547} 548 549// UpdateMany updates multiple documents in the collection. 550func (coll *Collection) UpdateMany(ctx context.Context, filter interface{}, update interface{}, 551 opts ...*options.UpdateOptions) (*UpdateResult, error) { 552 553 if ctx == nil { 554 ctx = context.Background() 555 } 556 557 f, err := transformDocument(coll.registry, filter) 558 if err != nil { 559 return nil, err 560 } 561 562 u, err := transformDocument(coll.registry, update) 563 if err != nil { 564 return nil, err 565 } 566 567 if err = ensureDollarKey(u); err != nil { 568 return nil, err 569 } 570 571 updateDocs := []bsonx.Doc{ 572 { 573 {"q", bsonx.Document(f)}, 574 {"u", bsonx.Document(u)}, 575 {"multi", bsonx.Boolean(true)}, 576 }, 577 } 578 579 sess := sessionFromContext(ctx) 580 581 err = coll.client.validSession(sess) 582 if err != nil { 583 return nil, err 584 } 585 586 wc := coll.writeConcern 587 if sess.TransactionRunning() { 588 wc = nil 589 } 590 591 oldns := coll.namespace() 592 cmd := command.Update{ 593 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection}, 594 Docs: updateDocs, 595 WriteConcern: wc, 596 Session: sess, 597 Clock: coll.client.clock, 598 } 599 600 r, err := driver.Update( 601 ctx, cmd, 602 coll.client.topology, 603 coll.writeSelector, 604 coll.client.id, 605 coll.client.topology.SessionPool, 606 false, 607 opts..., 608 ) 609 if err != nil && err != command.ErrUnacknowledgedWrite { 610 return nil, replaceErrors(err) 611 } 612 res := &UpdateResult{ 613 MatchedCount: r.MatchedCount, 614 ModifiedCount: r.ModifiedCount, 615 UpsertedCount: int64(len(r.Upserted)), 616 } 617 // TODO(skriptble): Is this correct? Do we only return the first upserted ID for an UpdateMany? 618 if len(r.Upserted) > 0 { 619 res.UpsertedID = r.Upserted[0].ID 620 res.MatchedCount-- 621 } 622 623 rr, err := processWriteError(r.WriteConcernError, r.WriteErrors, err) 624 if rr&rrMany == 0 { 625 return nil, err 626 } 627 return res, err 628} 629 630// ReplaceOne replaces a single document in the collection. 631func (coll *Collection) ReplaceOne(ctx context.Context, filter interface{}, 632 replacement interface{}, opts ...*options.ReplaceOptions) (*UpdateResult, error) { 633 634 if ctx == nil { 635 ctx = context.Background() 636 } 637 638 f, err := transformDocument(coll.registry, filter) 639 if err != nil { 640 return nil, err 641 } 642 643 r, err := transformDocument(coll.registry, replacement) 644 if err != nil { 645 return nil, err 646 } 647 648 if len(r) > 0 && strings.HasPrefix(r[0].Key, "$") { 649 return nil, errors.New("replacement document cannot contains keys beginning with '$") 650 } 651 652 sess := sessionFromContext(ctx) 653 654 err = coll.client.validSession(sess) 655 if err != nil { 656 return nil, err 657 } 658 659 updateOptions := make([]*options.UpdateOptions, 0, len(opts)) 660 for _, opt := range opts { 661 uOpts := options.Update() 662 uOpts.BypassDocumentValidation = opt.BypassDocumentValidation 663 uOpts.Collation = opt.Collation 664 uOpts.Upsert = opt.Upsert 665 updateOptions = append(updateOptions, uOpts) 666 } 667 668 return coll.updateOrReplaceOne(ctx, f, r, sess, updateOptions...) 669} 670 671// Aggregate runs an aggregation framework pipeline. 672// 673// See https://docs.mongodb.com/manual/aggregation/. 674func (coll *Collection) Aggregate(ctx context.Context, pipeline interface{}, 675 opts ...*options.AggregateOptions) (*Cursor, error) { 676 677 if ctx == nil { 678 ctx = context.Background() 679 } 680 681 pipelineArr, err := transformAggregatePipeline(coll.registry, pipeline) 682 if err != nil { 683 return nil, err 684 } 685 686 aggOpts := options.MergeAggregateOptions(opts...) 687 688 sess := sessionFromContext(ctx) 689 690 err = coll.client.validSession(sess) 691 if err != nil { 692 return nil, err 693 } 694 695 wc := coll.writeConcern 696 rc := coll.readConcern 697 698 if sess.TransactionRunning() { 699 wc = nil 700 rc = nil 701 } 702 703 oldns := coll.namespace() 704 cmd := command.Aggregate{ 705 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection}, 706 Pipeline: pipelineArr, 707 ReadPref: coll.readPreference, 708 WriteConcern: wc, 709 ReadConcern: rc, 710 Session: sess, 711 Clock: coll.client.clock, 712 } 713 714 batchCursor, err := driver.Aggregate( 715 ctx, cmd, 716 coll.client.topology, 717 coll.readSelector, 718 coll.writeSelector, 719 coll.client.id, 720 coll.client.topology.SessionPool, 721 coll.registry, 722 aggOpts, 723 ) 724 if err != nil { 725 if wce, ok := err.(result.WriteConcernError); ok { 726 return nil, *convertWriteConcernError(&wce) 727 } 728 return nil, replaceErrors(err) 729 } 730 731 cursor, err := newCursor(batchCursor, coll.registry) 732 return cursor, replaceErrors(err) 733} 734 735// CountDocuments gets the number of documents matching the filter. 736func (coll *Collection) CountDocuments(ctx context.Context, filter interface{}, 737 opts ...*options.CountOptions) (int64, error) { 738 739 if ctx == nil { 740 ctx = context.Background() 741 } 742 743 countOpts := options.MergeCountOptions(opts...) 744 745 pipelineArr, err := countDocumentsAggregatePipeline(coll.registry, filter, countOpts) 746 if err != nil { 747 return 0, err 748 } 749 750 sess := sessionFromContext(ctx) 751 752 err = coll.client.validSession(sess) 753 if err != nil { 754 return 0, err 755 } 756 757 rc := coll.readConcern 758 if sess.TransactionRunning() { 759 rc = nil 760 } 761 762 oldns := coll.namespace() 763 cmd := command.CountDocuments{ 764 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection}, 765 Pipeline: pipelineArr, 766 ReadPref: coll.readPreference, 767 ReadConcern: rc, 768 Session: sess, 769 Clock: coll.client.clock, 770 } 771 772 count, err := driver.CountDocuments( 773 ctx, cmd, 774 coll.client.topology, 775 coll.readSelector, 776 coll.client.id, 777 coll.client.topology.SessionPool, 778 coll.registry, 779 countOpts, 780 ) 781 782 return count, replaceErrors(err) 783} 784 785// EstimatedDocumentCount gets an estimate of the count of documents in a collection using collection metadata. 786func (coll *Collection) EstimatedDocumentCount(ctx context.Context, 787 opts ...*options.EstimatedDocumentCountOptions) (int64, error) { 788 789 if ctx == nil { 790 ctx = context.Background() 791 } 792 793 sess := sessionFromContext(ctx) 794 795 err := coll.client.validSession(sess) 796 if err != nil { 797 return 0, err 798 } 799 800 rc := coll.readConcern 801 if sess != nil && (sess.TransactionInProgress()) { 802 rc = nil 803 } 804 805 oldns := coll.namespace() 806 cmd := command.Count{ 807 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection}, 808 Query: bsonx.Doc{}, 809 ReadPref: coll.readPreference, 810 ReadConcern: rc, 811 Session: sess, 812 Clock: coll.client.clock, 813 } 814 815 countOpts := options.Count() 816 if len(opts) >= 1 { 817 countOpts = countOpts.SetMaxTime(*opts[len(opts)-1].MaxTime) 818 } 819 820 count, err := driver.Count( 821 ctx, cmd, 822 coll.client.topology, 823 coll.readSelector, 824 coll.client.id, 825 coll.client.topology.SessionPool, 826 coll.registry, 827 countOpts, 828 ) 829 830 return count, replaceErrors(err) 831} 832 833// Distinct finds the distinct values for a specified field across a single 834// collection. 835func (coll *Collection) Distinct(ctx context.Context, fieldName string, filter interface{}, 836 opts ...*options.DistinctOptions) ([]interface{}, error) { 837 838 if ctx == nil { 839 ctx = context.Background() 840 } 841 842 f, err := transformDocument(coll.registry, filter) 843 if err != nil { 844 return nil, err 845 } 846 847 sess := sessionFromContext(ctx) 848 849 err = coll.client.validSession(sess) 850 if err != nil { 851 return nil, err 852 } 853 854 rc := coll.readConcern 855 if sess.TransactionRunning() { 856 rc = nil 857 } 858 859 oldns := coll.namespace() 860 cmd := command.Distinct{ 861 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection}, 862 Field: fieldName, 863 Query: f, 864 ReadPref: coll.readPreference, 865 ReadConcern: rc, 866 Session: sess, 867 Clock: coll.client.clock, 868 } 869 870 res, err := driver.Distinct( 871 ctx, cmd, 872 coll.client.topology, 873 coll.readSelector, 874 coll.client.id, 875 coll.client.topology.SessionPool, 876 opts..., 877 ) 878 if err != nil { 879 return nil, replaceErrors(err) 880 } 881 882 return res.Values, nil 883} 884 885// Find finds the documents matching a model. 886func (coll *Collection) Find(ctx context.Context, filter interface{}, 887 opts ...*options.FindOptions) (*Cursor, error) { 888 889 if ctx == nil { 890 ctx = context.Background() 891 } 892 893 f, err := transformDocument(coll.registry, filter) 894 if err != nil { 895 return nil, err 896 } 897 898 sess := sessionFromContext(ctx) 899 900 err = coll.client.validSession(sess) 901 if err != nil { 902 return nil, err 903 } 904 905 rc := coll.readConcern 906 if sess.TransactionRunning() { 907 rc = nil 908 } 909 910 oldns := coll.namespace() 911 cmd := command.Find{ 912 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection}, 913 Filter: f, 914 ReadPref: coll.readPreference, 915 ReadConcern: rc, 916 Session: sess, 917 Clock: coll.client.clock, 918 } 919 920 batchCursor, err := driver.Find( 921 ctx, cmd, 922 coll.client.topology, 923 coll.readSelector, 924 coll.client.id, 925 coll.client.topology.SessionPool, 926 coll.registry, 927 opts..., 928 ) 929 if err != nil { 930 return nil, replaceErrors(err) 931 } 932 933 cursor, err := newCursor(batchCursor, coll.registry) 934 return cursor, replaceErrors(err) 935} 936 937// FindOne returns up to one document that matches the model. 938func (coll *Collection) FindOne(ctx context.Context, filter interface{}, 939 opts ...*options.FindOneOptions) *SingleResult { 940 941 if ctx == nil { 942 ctx = context.Background() 943 } 944 945 f, err := transformDocument(coll.registry, filter) 946 if err != nil { 947 return &SingleResult{err: err} 948 } 949 950 sess := sessionFromContext(ctx) 951 952 err = coll.client.validSession(sess) 953 if err != nil { 954 return &SingleResult{err: err} 955 } 956 957 rc := coll.readConcern 958 if sess.TransactionRunning() { 959 rc = nil 960 } 961 962 oldns := coll.namespace() 963 cmd := command.Find{ 964 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection}, 965 Filter: f, 966 ReadPref: coll.readPreference, 967 ReadConcern: rc, 968 Session: sess, 969 Clock: coll.client.clock, 970 } 971 972 findOpts := make([]*options.FindOptions, len(opts)) 973 for i, opt := range opts { 974 findOpts[i] = &options.FindOptions{ 975 AllowPartialResults: opt.AllowPartialResults, 976 BatchSize: opt.BatchSize, 977 Collation: opt.Collation, 978 Comment: opt.Comment, 979 CursorType: opt.CursorType, 980 Hint: opt.Hint, 981 Max: opt.Max, 982 MaxAwaitTime: opt.MaxAwaitTime, 983 Min: opt.Min, 984 NoCursorTimeout: opt.NoCursorTimeout, 985 OplogReplay: opt.OplogReplay, 986 Projection: opt.Projection, 987 ReturnKey: opt.ReturnKey, 988 ShowRecordID: opt.ShowRecordID, 989 Skip: opt.Skip, 990 Snapshot: opt.Snapshot, 991 Sort: opt.Sort, 992 } 993 } 994 // Unconditionally send a limit to make sure only one document is returned and the cursor is not kept open 995 // by the server. 996 findOpts = append(findOpts, options.Find().SetLimit(-1)) 997 998 batchCursor, err := driver.Find( 999 ctx, cmd, 1000 coll.client.topology, 1001 coll.readSelector, 1002 coll.client.id, 1003 coll.client.topology.SessionPool, 1004 coll.registry, 1005 findOpts..., 1006 ) 1007 if err != nil { 1008 return &SingleResult{err: replaceErrors(err)} 1009 } 1010 1011 cursor, err := newCursor(batchCursor, coll.registry) 1012 return &SingleResult{cur: cursor, reg: coll.registry, err: replaceErrors(err)} 1013} 1014 1015// FindOneAndDelete find a single document and deletes it, returning the 1016// original in result. 1017func (coll *Collection) FindOneAndDelete(ctx context.Context, filter interface{}, 1018 opts ...*options.FindOneAndDeleteOptions) *SingleResult { 1019 1020 if ctx == nil { 1021 ctx = context.Background() 1022 } 1023 1024 f, err := transformDocument(coll.registry, filter) 1025 if err != nil { 1026 return &SingleResult{err: err} 1027 } 1028 1029 sess := sessionFromContext(ctx) 1030 1031 err = coll.client.validSession(sess) 1032 if err != nil { 1033 return &SingleResult{err: err} 1034 } 1035 1036 oldns := coll.namespace() 1037 wc := coll.writeConcern 1038 if sess.TransactionRunning() { 1039 wc = nil 1040 } 1041 1042 cmd := command.FindOneAndDelete{ 1043 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection}, 1044 Query: f, 1045 WriteConcern: wc, 1046 Session: sess, 1047 Clock: coll.client.clock, 1048 } 1049 1050 res, err := driver.FindOneAndDelete( 1051 ctx, cmd, 1052 coll.client.topology, 1053 coll.writeSelector, 1054 coll.client.id, 1055 coll.client.topology.SessionPool, 1056 coll.client.retryWrites, 1057 coll.registry, 1058 opts..., 1059 ) 1060 1061 if err != nil { 1062 return &SingleResult{err: replaceErrors(err)} 1063 } 1064 1065 if res.WriteConcernError != nil { 1066 return &SingleResult{err: *convertWriteConcernError(res.WriteConcernError)} 1067 } 1068 1069 return &SingleResult{rdr: res.Value, reg: coll.registry} 1070} 1071 1072// FindOneAndReplace finds a single document and replaces it, returning either 1073// the original or the replaced document. 1074func (coll *Collection) FindOneAndReplace(ctx context.Context, filter interface{}, 1075 replacement interface{}, opts ...*options.FindOneAndReplaceOptions) *SingleResult { 1076 1077 if ctx == nil { 1078 ctx = context.Background() 1079 } 1080 1081 f, err := transformDocument(coll.registry, filter) 1082 if err != nil { 1083 return &SingleResult{err: err} 1084 } 1085 1086 r, err := transformDocument(coll.registry, replacement) 1087 if err != nil { 1088 return &SingleResult{err: err} 1089 } 1090 1091 if len(r) > 0 && strings.HasPrefix(r[0].Key, "$") { 1092 return &SingleResult{err: errors.New("replacement document cannot contains keys beginning with '$")} 1093 } 1094 1095 sess := sessionFromContext(ctx) 1096 1097 err = coll.client.validSession(sess) 1098 if err != nil { 1099 return &SingleResult{err: err} 1100 } 1101 1102 wc := coll.writeConcern 1103 if sess.TransactionRunning() { 1104 wc = nil 1105 } 1106 1107 oldns := coll.namespace() 1108 cmd := command.FindOneAndReplace{ 1109 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection}, 1110 Query: f, 1111 Replacement: r, 1112 WriteConcern: wc, 1113 Session: sess, 1114 Clock: coll.client.clock, 1115 } 1116 1117 res, err := driver.FindOneAndReplace( 1118 ctx, cmd, 1119 coll.client.topology, 1120 coll.writeSelector, 1121 coll.client.id, 1122 coll.client.topology.SessionPool, 1123 coll.client.retryWrites, 1124 coll.registry, 1125 opts..., 1126 ) 1127 if err != nil { 1128 return &SingleResult{err: replaceErrors(err)} 1129 } 1130 1131 if res.WriteConcernError != nil { 1132 return &SingleResult{err: *convertWriteConcernError(res.WriteConcernError)} 1133 } 1134 1135 return &SingleResult{rdr: res.Value, reg: coll.registry} 1136} 1137 1138// FindOneAndUpdate finds a single document and updates it, returning either 1139// the original or the updated. 1140func (coll *Collection) FindOneAndUpdate(ctx context.Context, filter interface{}, 1141 update interface{}, opts ...*options.FindOneAndUpdateOptions) *SingleResult { 1142 1143 if ctx == nil { 1144 ctx = context.Background() 1145 } 1146 1147 f, err := transformDocument(coll.registry, filter) 1148 if err != nil { 1149 return &SingleResult{err: err} 1150 } 1151 1152 u, err := transformDocument(coll.registry, update) 1153 if err != nil { 1154 return &SingleResult{err: err} 1155 } 1156 1157 err = ensureDollarKey(u) 1158 if err != nil { 1159 return &SingleResult{ 1160 err: err, 1161 } 1162 } 1163 1164 sess := sessionFromContext(ctx) 1165 1166 err = coll.client.validSession(sess) 1167 if err != nil { 1168 return &SingleResult{err: err} 1169 } 1170 1171 wc := coll.writeConcern 1172 if sess.TransactionRunning() { 1173 wc = nil 1174 } 1175 1176 oldns := coll.namespace() 1177 cmd := command.FindOneAndUpdate{ 1178 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection}, 1179 Query: f, 1180 Update: u, 1181 WriteConcern: wc, 1182 Session: sess, 1183 Clock: coll.client.clock, 1184 } 1185 1186 res, err := driver.FindOneAndUpdate( 1187 ctx, cmd, 1188 coll.client.topology, 1189 coll.writeSelector, 1190 coll.client.id, 1191 coll.client.topology.SessionPool, 1192 coll.client.retryWrites, 1193 coll.registry, 1194 opts..., 1195 ) 1196 if err != nil { 1197 return &SingleResult{err: replaceErrors(err)} 1198 } 1199 1200 if res.WriteConcernError != nil { 1201 return &SingleResult{err: *convertWriteConcernError(res.WriteConcernError)} 1202 } 1203 1204 return &SingleResult{rdr: res.Value, reg: coll.registry} 1205} 1206 1207// Watch returns a change stream cursor used to receive notifications of changes to the collection. 1208// 1209// This method is preferred to running a raw aggregation with a $changeStream stage because it 1210// supports resumability in the case of some errors. The collection must have read concern majority or no read concern 1211// for a change stream to be created successfully. 1212func (coll *Collection) Watch(ctx context.Context, pipeline interface{}, 1213 opts ...*options.ChangeStreamOptions) (*ChangeStream, error) { 1214 return newChangeStream(ctx, coll, pipeline, opts...) 1215} 1216 1217// Indexes returns the index view for this collection. 1218func (coll *Collection) Indexes() IndexView { 1219 return IndexView{coll: coll} 1220} 1221 1222// Drop drops this collection from database. 1223func (coll *Collection) Drop(ctx context.Context) error { 1224 if ctx == nil { 1225 ctx = context.Background() 1226 } 1227 1228 sess := sessionFromContext(ctx) 1229 1230 err := coll.client.validSession(sess) 1231 if err != nil { 1232 return err 1233 } 1234 1235 wc := coll.writeConcern 1236 if sess.TransactionRunning() { 1237 wc = nil 1238 } 1239 1240 cmd := command.DropCollection{ 1241 DB: coll.db.name, 1242 Collection: coll.name, 1243 WriteConcern: wc, 1244 Session: sess, 1245 Clock: coll.client.clock, 1246 } 1247 _, err = driver.DropCollection( 1248 ctx, cmd, 1249 coll.client.topology, 1250 coll.writeSelector, 1251 coll.client.id, 1252 coll.client.topology.SessionPool, 1253 ) 1254 if err != nil && !command.IsNotFound(err) { 1255 return replaceErrors(err) 1256 } 1257 return nil 1258} 1259