1// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package mongo
8
9import (
10	"context"
11	"errors"
12	"strings"
13
14	"go.mongodb.org/mongo-driver/bson/bsoncodec"
15	"go.mongodb.org/mongo-driver/mongo/options"
16	"go.mongodb.org/mongo-driver/mongo/readconcern"
17	"go.mongodb.org/mongo-driver/mongo/readpref"
18	"go.mongodb.org/mongo-driver/mongo/writeconcern"
19	"go.mongodb.org/mongo-driver/x/bsonx"
20	"go.mongodb.org/mongo-driver/x/mongo/driver"
21	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
22	"go.mongodb.org/mongo-driver/x/network/command"
23	"go.mongodb.org/mongo-driver/x/network/description"
24	"go.mongodb.org/mongo-driver/x/network/result"
25)
26
27// Collection performs operations on a given collection.
28type Collection struct {
29	client         *Client
30	db             *Database
31	name           string
32	readConcern    *readconcern.ReadConcern
33	writeConcern   *writeconcern.WriteConcern
34	readPreference *readpref.ReadPref
35	readSelector   description.ServerSelector
36	writeSelector  description.ServerSelector
37	registry       *bsoncodec.Registry
38}
39
40func newCollection(db *Database, name string, opts ...*options.CollectionOptions) *Collection {
41	collOpt := options.MergeCollectionOptions(opts...)
42
43	rc := db.readConcern
44	if collOpt.ReadConcern != nil {
45		rc = collOpt.ReadConcern
46	}
47
48	wc := db.writeConcern
49	if collOpt.WriteConcern != nil {
50		wc = collOpt.WriteConcern
51	}
52
53	rp := db.readPreference
54	if collOpt.ReadPreference != nil {
55		rp = collOpt.ReadPreference
56	}
57
58	reg := db.registry
59	if collOpt.Registry != nil {
60		reg = collOpt.Registry
61	}
62
63	readSelector := description.CompositeSelector([]description.ServerSelector{
64		description.ReadPrefSelector(rp),
65		description.LatencySelector(db.client.localThreshold),
66	})
67
68	writeSelector := description.CompositeSelector([]description.ServerSelector{
69		description.WriteSelector(),
70		description.LatencySelector(db.client.localThreshold),
71	})
72
73	coll := &Collection{
74		client:         db.client,
75		db:             db,
76		name:           name,
77		readPreference: rp,
78		readConcern:    rc,
79		writeConcern:   wc,
80		readSelector:   readSelector,
81		writeSelector:  writeSelector,
82		registry:       reg,
83	}
84
85	return coll
86}
87
88func (coll *Collection) copy() *Collection {
89	return &Collection{
90		client:         coll.client,
91		db:             coll.db,
92		name:           coll.name,
93		readConcern:    coll.readConcern,
94		writeConcern:   coll.writeConcern,
95		readPreference: coll.readPreference,
96		readSelector:   coll.readSelector,
97		writeSelector:  coll.writeSelector,
98		registry:       coll.registry,
99	}
100}
101
102// Clone creates a copy of this collection with updated options, if any are given.
103func (coll *Collection) Clone(opts ...*options.CollectionOptions) (*Collection, error) {
104	copyColl := coll.copy()
105	optsColl := options.MergeCollectionOptions(opts...)
106
107	if optsColl.ReadConcern != nil {
108		copyColl.readConcern = optsColl.ReadConcern
109	}
110
111	if optsColl.WriteConcern != nil {
112		copyColl.writeConcern = optsColl.WriteConcern
113	}
114
115	if optsColl.ReadPreference != nil {
116		copyColl.readPreference = optsColl.ReadPreference
117	}
118
119	if optsColl.Registry != nil {
120		copyColl.registry = optsColl.Registry
121	}
122
123	copyColl.readSelector = description.CompositeSelector([]description.ServerSelector{
124		description.ReadPrefSelector(copyColl.readPreference),
125		description.LatencySelector(copyColl.client.localThreshold),
126	})
127
128	return copyColl, nil
129}
130
131// Name provides access to the name of the collection.
132func (coll *Collection) Name() string {
133	return coll.name
134}
135
136// namespace returns the namespace of the collection.
137func (coll *Collection) namespace() command.Namespace {
138	return command.NewNamespace(coll.db.name, coll.name)
139}
140
141// Database provides access to the database that contains the collection.
142func (coll *Collection) Database() *Database {
143	return coll.db
144}
145
146// BulkWrite performs a bulk write operation.
147//
148// See https://docs.mongodb.com/manual/core/bulk-write-operations/.
149func (coll *Collection) BulkWrite(ctx context.Context, models []WriteModel,
150	opts ...*options.BulkWriteOptions) (*BulkWriteResult, error) {
151
152	if len(models) == 0 {
153		return nil, ErrEmptySlice
154	}
155
156	if ctx == nil {
157		ctx = context.Background()
158	}
159
160	sess := sessionFromContext(ctx)
161
162	err := coll.client.validSession(sess)
163	if err != nil {
164		return nil, err
165	}
166
167	dispatchModels := make([]driver.WriteModel, len(models))
168	for i, model := range models {
169		if model == nil {
170			return nil, ErrNilDocument
171		}
172		dispatchModels[i] = model.convertModel()
173	}
174
175	res, err := driver.BulkWrite(
176		ctx,
177		coll.namespace(),
178		dispatchModels,
179		coll.client.topology,
180		coll.writeSelector,
181		coll.client.id,
182		coll.client.topology.SessionPool,
183		coll.client.retryWrites,
184		sess,
185		coll.writeConcern,
186		coll.client.clock,
187		coll.registry,
188		opts...,
189	)
190	result := BulkWriteResult{
191		InsertedCount: res.InsertedCount,
192		MatchedCount:  res.MatchedCount,
193		ModifiedCount: res.ModifiedCount,
194		DeletedCount:  res.DeletedCount,
195		UpsertedCount: res.UpsertedCount,
196		UpsertedIDs:   res.UpsertedIDs,
197	}
198
199	return &result, replaceErrors(err)
200}
201
202// InsertOne inserts a single document into the collection.
203func (coll *Collection) InsertOne(ctx context.Context, document interface{},
204	opts ...*options.InsertOneOptions) (*InsertOneResult, error) {
205
206	if ctx == nil {
207		ctx = context.Background()
208	}
209
210	doc, insertedID, err := transformAndEnsureID(coll.registry, document)
211	if err != nil {
212		return nil, err
213	}
214
215	sess := sessionFromContext(ctx)
216
217	err = coll.client.validSession(sess)
218	if err != nil {
219		return nil, err
220	}
221
222	wc := coll.writeConcern
223	if sess.TransactionRunning() {
224		wc = nil
225	}
226	oldns := coll.namespace()
227	cmd := command.Insert{
228		NS:           command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
229		Docs:         []bsonx.Doc{doc},
230		WriteConcern: wc,
231		Session:      sess,
232		Clock:        coll.client.clock,
233	}
234
235	// convert to InsertManyOptions so these can be argued to dispatch.Insert
236	insertOpts := make([]*options.InsertManyOptions, len(opts))
237	for i, opt := range opts {
238		insertOpts[i] = options.InsertMany()
239		insertOpts[i].BypassDocumentValidation = opt.BypassDocumentValidation
240	}
241
242	res, err := driver.Insert(
243		ctx, cmd,
244		coll.client.topology,
245		coll.writeSelector,
246		coll.client.id,
247		coll.client.topology.SessionPool,
248		coll.client.retryWrites,
249		insertOpts...,
250	)
251
252	rr, err := processWriteError(res.WriteConcernError, res.WriteErrors, err)
253	if rr&rrOne == 0 {
254		return nil, err
255	}
256
257	return &InsertOneResult{InsertedID: insertedID}, err
258}
259
260// InsertMany inserts the provided documents.
261func (coll *Collection) InsertMany(ctx context.Context, documents []interface{},
262	opts ...*options.InsertManyOptions) (*InsertManyResult, error) {
263
264	if ctx == nil {
265		ctx = context.Background()
266	}
267
268	if len(documents) == 0 {
269		return nil, ErrEmptySlice
270	}
271
272	result := make([]interface{}, len(documents))
273	docs := make([]bsonx.Doc, len(documents))
274
275	for i, doc := range documents {
276		if doc == nil {
277			return nil, ErrNilDocument
278		}
279		bdoc, insertedID, err := transformAndEnsureID(coll.registry, doc)
280		if err != nil {
281			return nil, err
282		}
283
284		docs[i] = bdoc
285		result[i] = insertedID
286	}
287
288	sess := sessionFromContext(ctx)
289
290	err := coll.client.validSession(sess)
291	if err != nil {
292		return nil, err
293	}
294
295	wc := coll.writeConcern
296	if sess.TransactionRunning() {
297		wc = nil
298	}
299
300	oldns := coll.namespace()
301	cmd := command.Insert{
302		NS:           command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
303		Docs:         docs,
304		WriteConcern: wc,
305		Session:      sess,
306		Clock:        coll.client.clock,
307	}
308
309	res, err := driver.Insert(
310		ctx, cmd,
311		coll.client.topology,
312		coll.writeSelector,
313		coll.client.id,
314		coll.client.topology.SessionPool,
315		coll.client.retryWrites,
316		opts...,
317	)
318
319	switch err {
320	case nil:
321	case command.ErrUnacknowledgedWrite:
322		return &InsertManyResult{InsertedIDs: result}, ErrUnacknowledgedWrite
323	default:
324		return nil, replaceErrors(err)
325	}
326	if len(res.WriteErrors) > 0 || res.WriteConcernError != nil {
327		bwErrors := make([]BulkWriteError, 0, len(res.WriteErrors))
328		for _, we := range res.WriteErrors {
329			bwErrors = append(bwErrors, BulkWriteError{
330				WriteError{
331					Index:   we.Index,
332					Code:    we.Code,
333					Message: we.ErrMsg,
334				},
335				nil,
336			})
337		}
338
339		err = BulkWriteException{
340			WriteErrors:       bwErrors,
341			WriteConcernError: convertWriteConcernError(res.WriteConcernError),
342		}
343	}
344
345	return &InsertManyResult{InsertedIDs: result}, err
346}
347
348// DeleteOne deletes a single document from the collection.
349func (coll *Collection) DeleteOne(ctx context.Context, filter interface{},
350	opts ...*options.DeleteOptions) (*DeleteResult, error) {
351
352	if ctx == nil {
353		ctx = context.Background()
354	}
355
356	f, err := transformDocument(coll.registry, filter)
357	if err != nil {
358		return nil, err
359	}
360	deleteDocs := []bsonx.Doc{
361		{
362			{"q", bsonx.Document(f)},
363			{"limit", bsonx.Int32(1)},
364		},
365	}
366
367	sess := sessionFromContext(ctx)
368
369	err = coll.client.validSession(sess)
370	if err != nil {
371		return nil, err
372	}
373
374	wc := coll.writeConcern
375	if sess.TransactionRunning() {
376		wc = nil
377	}
378
379	oldns := coll.namespace()
380	cmd := command.Delete{
381		NS:           command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
382		Deletes:      deleteDocs,
383		WriteConcern: wc,
384		Session:      sess,
385		Clock:        coll.client.clock,
386	}
387
388	res, err := driver.Delete(
389		ctx, cmd,
390		coll.client.topology,
391		coll.writeSelector,
392		coll.client.id,
393		coll.client.topology.SessionPool,
394		coll.client.retryWrites,
395		opts...,
396	)
397
398	rr, err := processWriteError(res.WriteConcernError, res.WriteErrors, err)
399	if rr&rrOne == 0 {
400		return nil, err
401	}
402	return &DeleteResult{DeletedCount: int64(res.N)}, err
403}
404
405// DeleteMany deletes multiple documents from the collection.
406func (coll *Collection) DeleteMany(ctx context.Context, filter interface{},
407	opts ...*options.DeleteOptions) (*DeleteResult, error) {
408
409	if ctx == nil {
410		ctx = context.Background()
411	}
412
413	f, err := transformDocument(coll.registry, filter)
414	if err != nil {
415		return nil, err
416	}
417	deleteDocs := []bsonx.Doc{{{"q", bsonx.Document(f)}, {"limit", bsonx.Int32(0)}}}
418
419	sess := sessionFromContext(ctx)
420
421	err = coll.client.validSession(sess)
422	if err != nil {
423		return nil, err
424	}
425
426	wc := coll.writeConcern
427	if sess.TransactionRunning() {
428		wc = nil
429	}
430
431	oldns := coll.namespace()
432	cmd := command.Delete{
433		NS:           command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
434		Deletes:      deleteDocs,
435		WriteConcern: wc,
436		Session:      sess,
437		Clock:        coll.client.clock,
438	}
439
440	res, err := driver.Delete(
441		ctx, cmd,
442		coll.client.topology,
443		coll.writeSelector,
444		coll.client.id,
445		coll.client.topology.SessionPool,
446		false,
447		opts...,
448	)
449
450	rr, err := processWriteError(res.WriteConcernError, res.WriteErrors, err)
451	if rr&rrMany == 0 {
452		return nil, err
453	}
454	return &DeleteResult{DeletedCount: int64(res.N)}, err
455}
456
457func (coll *Collection) updateOrReplaceOne(ctx context.Context, filter,
458	update bsonx.Doc, sess *session.Client, opts ...*options.UpdateOptions) (*UpdateResult, error) {
459
460	// TODO: should session be taken from ctx or left as argument?
461	if ctx == nil {
462		ctx = context.Background()
463	}
464
465	updateDocs := []bsonx.Doc{
466		{
467			{"q", bsonx.Document(filter)},
468			{"u", bsonx.Document(update)},
469			{"multi", bsonx.Boolean(false)},
470		},
471	}
472
473	wc := coll.writeConcern
474	if sess.TransactionRunning() {
475		wc = nil
476	}
477
478	oldns := coll.namespace()
479	cmd := command.Update{
480		NS:           command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
481		Docs:         updateDocs,
482		WriteConcern: wc,
483		Session:      sess,
484		Clock:        coll.client.clock,
485	}
486
487	r, err := driver.Update(
488		ctx, cmd,
489		coll.client.topology,
490		coll.writeSelector,
491		coll.client.id,
492		coll.client.topology.SessionPool,
493		coll.client.retryWrites,
494		opts...,
495	)
496	if err != nil && err != command.ErrUnacknowledgedWrite {
497		return nil, replaceErrors(err)
498	}
499
500	res := &UpdateResult{
501		MatchedCount:  r.MatchedCount,
502		ModifiedCount: r.ModifiedCount,
503		UpsertedCount: int64(len(r.Upserted)),
504	}
505	if len(r.Upserted) > 0 {
506		res.UpsertedID = r.Upserted[0].ID
507		res.MatchedCount--
508	}
509
510	rr, err := processWriteError(r.WriteConcernError, r.WriteErrors, err)
511	if rr&rrOne == 0 {
512		return nil, err
513	}
514	return res, err
515}
516
517// UpdateOne updates a single document in the collection.
518func (coll *Collection) UpdateOne(ctx context.Context, filter interface{}, update interface{},
519	opts ...*options.UpdateOptions) (*UpdateResult, error) {
520
521	if ctx == nil {
522		ctx = context.Background()
523	}
524
525	f, err := transformDocument(coll.registry, filter)
526	if err != nil {
527		return nil, err
528	}
529
530	u, err := transformDocument(coll.registry, update)
531	if err != nil {
532		return nil, err
533	}
534
535	if err := ensureDollarKey(u); err != nil {
536		return nil, err
537	}
538
539	sess := sessionFromContext(ctx)
540
541	err = coll.client.validSession(sess)
542	if err != nil {
543		return nil, err
544	}
545
546	return coll.updateOrReplaceOne(ctx, f, u, sess, opts...)
547}
548
549// UpdateMany updates multiple documents in the collection.
550func (coll *Collection) UpdateMany(ctx context.Context, filter interface{}, update interface{},
551	opts ...*options.UpdateOptions) (*UpdateResult, error) {
552
553	if ctx == nil {
554		ctx = context.Background()
555	}
556
557	f, err := transformDocument(coll.registry, filter)
558	if err != nil {
559		return nil, err
560	}
561
562	u, err := transformDocument(coll.registry, update)
563	if err != nil {
564		return nil, err
565	}
566
567	if err = ensureDollarKey(u); err != nil {
568		return nil, err
569	}
570
571	updateDocs := []bsonx.Doc{
572		{
573			{"q", bsonx.Document(f)},
574			{"u", bsonx.Document(u)},
575			{"multi", bsonx.Boolean(true)},
576		},
577	}
578
579	sess := sessionFromContext(ctx)
580
581	err = coll.client.validSession(sess)
582	if err != nil {
583		return nil, err
584	}
585
586	wc := coll.writeConcern
587	if sess.TransactionRunning() {
588		wc = nil
589	}
590
591	oldns := coll.namespace()
592	cmd := command.Update{
593		NS:           command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
594		Docs:         updateDocs,
595		WriteConcern: wc,
596		Session:      sess,
597		Clock:        coll.client.clock,
598	}
599
600	r, err := driver.Update(
601		ctx, cmd,
602		coll.client.topology,
603		coll.writeSelector,
604		coll.client.id,
605		coll.client.topology.SessionPool,
606		false,
607		opts...,
608	)
609	if err != nil && err != command.ErrUnacknowledgedWrite {
610		return nil, replaceErrors(err)
611	}
612	res := &UpdateResult{
613		MatchedCount:  r.MatchedCount,
614		ModifiedCount: r.ModifiedCount,
615		UpsertedCount: int64(len(r.Upserted)),
616	}
617	// TODO(skriptble): Is this correct? Do we only return the first upserted ID for an UpdateMany?
618	if len(r.Upserted) > 0 {
619		res.UpsertedID = r.Upserted[0].ID
620		res.MatchedCount--
621	}
622
623	rr, err := processWriteError(r.WriteConcernError, r.WriteErrors, err)
624	if rr&rrMany == 0 {
625		return nil, err
626	}
627	return res, err
628}
629
630// ReplaceOne replaces a single document in the collection.
631func (coll *Collection) ReplaceOne(ctx context.Context, filter interface{},
632	replacement interface{}, opts ...*options.ReplaceOptions) (*UpdateResult, error) {
633
634	if ctx == nil {
635		ctx = context.Background()
636	}
637
638	f, err := transformDocument(coll.registry, filter)
639	if err != nil {
640		return nil, err
641	}
642
643	r, err := transformDocument(coll.registry, replacement)
644	if err != nil {
645		return nil, err
646	}
647
648	if len(r) > 0 && strings.HasPrefix(r[0].Key, "$") {
649		return nil, errors.New("replacement document cannot contains keys beginning with '$")
650	}
651
652	sess := sessionFromContext(ctx)
653
654	err = coll.client.validSession(sess)
655	if err != nil {
656		return nil, err
657	}
658
659	updateOptions := make([]*options.UpdateOptions, 0, len(opts))
660	for _, opt := range opts {
661		uOpts := options.Update()
662		uOpts.BypassDocumentValidation = opt.BypassDocumentValidation
663		uOpts.Collation = opt.Collation
664		uOpts.Upsert = opt.Upsert
665		updateOptions = append(updateOptions, uOpts)
666	}
667
668	return coll.updateOrReplaceOne(ctx, f, r, sess, updateOptions...)
669}
670
671// Aggregate runs an aggregation framework pipeline.
672//
673// See https://docs.mongodb.com/manual/aggregation/.
674func (coll *Collection) Aggregate(ctx context.Context, pipeline interface{},
675	opts ...*options.AggregateOptions) (*Cursor, error) {
676
677	if ctx == nil {
678		ctx = context.Background()
679	}
680
681	pipelineArr, err := transformAggregatePipeline(coll.registry, pipeline)
682	if err != nil {
683		return nil, err
684	}
685
686	aggOpts := options.MergeAggregateOptions(opts...)
687
688	sess := sessionFromContext(ctx)
689
690	err = coll.client.validSession(sess)
691	if err != nil {
692		return nil, err
693	}
694
695	wc := coll.writeConcern
696	rc := coll.readConcern
697
698	if sess.TransactionRunning() {
699		wc = nil
700		rc = nil
701	}
702
703	oldns := coll.namespace()
704	cmd := command.Aggregate{
705		NS:           command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
706		Pipeline:     pipelineArr,
707		ReadPref:     coll.readPreference,
708		WriteConcern: wc,
709		ReadConcern:  rc,
710		Session:      sess,
711		Clock:        coll.client.clock,
712	}
713
714	batchCursor, err := driver.Aggregate(
715		ctx, cmd,
716		coll.client.topology,
717		coll.readSelector,
718		coll.writeSelector,
719		coll.client.id,
720		coll.client.topology.SessionPool,
721		coll.registry,
722		aggOpts,
723	)
724	if err != nil {
725		if wce, ok := err.(result.WriteConcernError); ok {
726			return nil, *convertWriteConcernError(&wce)
727		}
728		return nil, replaceErrors(err)
729	}
730
731	cursor, err := newCursor(batchCursor, coll.registry)
732	return cursor, replaceErrors(err)
733}
734
735// CountDocuments gets the number of documents matching the filter.
736func (coll *Collection) CountDocuments(ctx context.Context, filter interface{},
737	opts ...*options.CountOptions) (int64, error) {
738
739	if ctx == nil {
740		ctx = context.Background()
741	}
742
743	countOpts := options.MergeCountOptions(opts...)
744
745	pipelineArr, err := countDocumentsAggregatePipeline(coll.registry, filter, countOpts)
746	if err != nil {
747		return 0, err
748	}
749
750	sess := sessionFromContext(ctx)
751
752	err = coll.client.validSession(sess)
753	if err != nil {
754		return 0, err
755	}
756
757	rc := coll.readConcern
758	if sess.TransactionRunning() {
759		rc = nil
760	}
761
762	oldns := coll.namespace()
763	cmd := command.CountDocuments{
764		NS:          command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
765		Pipeline:    pipelineArr,
766		ReadPref:    coll.readPreference,
767		ReadConcern: rc,
768		Session:     sess,
769		Clock:       coll.client.clock,
770	}
771
772	count, err := driver.CountDocuments(
773		ctx, cmd,
774		coll.client.topology,
775		coll.readSelector,
776		coll.client.id,
777		coll.client.topology.SessionPool,
778		coll.registry,
779		countOpts,
780	)
781
782	return count, replaceErrors(err)
783}
784
785// EstimatedDocumentCount gets an estimate of the count of documents in a collection using collection metadata.
786func (coll *Collection) EstimatedDocumentCount(ctx context.Context,
787	opts ...*options.EstimatedDocumentCountOptions) (int64, error) {
788
789	if ctx == nil {
790		ctx = context.Background()
791	}
792
793	sess := sessionFromContext(ctx)
794
795	err := coll.client.validSession(sess)
796	if err != nil {
797		return 0, err
798	}
799
800	rc := coll.readConcern
801	if sess != nil && (sess.TransactionInProgress()) {
802		rc = nil
803	}
804
805	oldns := coll.namespace()
806	cmd := command.Count{
807		NS:          command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
808		Query:       bsonx.Doc{},
809		ReadPref:    coll.readPreference,
810		ReadConcern: rc,
811		Session:     sess,
812		Clock:       coll.client.clock,
813	}
814
815	countOpts := options.Count()
816	if len(opts) >= 1 {
817		countOpts = countOpts.SetMaxTime(*opts[len(opts)-1].MaxTime)
818	}
819
820	count, err := driver.Count(
821		ctx, cmd,
822		coll.client.topology,
823		coll.readSelector,
824		coll.client.id,
825		coll.client.topology.SessionPool,
826		coll.registry,
827		countOpts,
828	)
829
830	return count, replaceErrors(err)
831}
832
833// Distinct finds the distinct values for a specified field across a single
834// collection.
835func (coll *Collection) Distinct(ctx context.Context, fieldName string, filter interface{},
836	opts ...*options.DistinctOptions) ([]interface{}, error) {
837
838	if ctx == nil {
839		ctx = context.Background()
840	}
841
842	f, err := transformDocument(coll.registry, filter)
843	if err != nil {
844		return nil, err
845	}
846
847	sess := sessionFromContext(ctx)
848
849	err = coll.client.validSession(sess)
850	if err != nil {
851		return nil, err
852	}
853
854	rc := coll.readConcern
855	if sess.TransactionRunning() {
856		rc = nil
857	}
858
859	oldns := coll.namespace()
860	cmd := command.Distinct{
861		NS:          command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
862		Field:       fieldName,
863		Query:       f,
864		ReadPref:    coll.readPreference,
865		ReadConcern: rc,
866		Session:     sess,
867		Clock:       coll.client.clock,
868	}
869
870	res, err := driver.Distinct(
871		ctx, cmd,
872		coll.client.topology,
873		coll.readSelector,
874		coll.client.id,
875		coll.client.topology.SessionPool,
876		opts...,
877	)
878	if err != nil {
879		return nil, replaceErrors(err)
880	}
881
882	return res.Values, nil
883}
884
885// Find finds the documents matching a model.
886func (coll *Collection) Find(ctx context.Context, filter interface{},
887	opts ...*options.FindOptions) (*Cursor, error) {
888
889	if ctx == nil {
890		ctx = context.Background()
891	}
892
893	f, err := transformDocument(coll.registry, filter)
894	if err != nil {
895		return nil, err
896	}
897
898	sess := sessionFromContext(ctx)
899
900	err = coll.client.validSession(sess)
901	if err != nil {
902		return nil, err
903	}
904
905	rc := coll.readConcern
906	if sess.TransactionRunning() {
907		rc = nil
908	}
909
910	oldns := coll.namespace()
911	cmd := command.Find{
912		NS:          command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
913		Filter:      f,
914		ReadPref:    coll.readPreference,
915		ReadConcern: rc,
916		Session:     sess,
917		Clock:       coll.client.clock,
918	}
919
920	batchCursor, err := driver.Find(
921		ctx, cmd,
922		coll.client.topology,
923		coll.readSelector,
924		coll.client.id,
925		coll.client.topology.SessionPool,
926		coll.registry,
927		opts...,
928	)
929	if err != nil {
930		return nil, replaceErrors(err)
931	}
932
933	cursor, err := newCursor(batchCursor, coll.registry)
934	return cursor, replaceErrors(err)
935}
936
937// FindOne returns up to one document that matches the model.
938func (coll *Collection) FindOne(ctx context.Context, filter interface{},
939	opts ...*options.FindOneOptions) *SingleResult {
940
941	if ctx == nil {
942		ctx = context.Background()
943	}
944
945	f, err := transformDocument(coll.registry, filter)
946	if err != nil {
947		return &SingleResult{err: err}
948	}
949
950	sess := sessionFromContext(ctx)
951
952	err = coll.client.validSession(sess)
953	if err != nil {
954		return &SingleResult{err: err}
955	}
956
957	rc := coll.readConcern
958	if sess.TransactionRunning() {
959		rc = nil
960	}
961
962	oldns := coll.namespace()
963	cmd := command.Find{
964		NS:          command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
965		Filter:      f,
966		ReadPref:    coll.readPreference,
967		ReadConcern: rc,
968		Session:     sess,
969		Clock:       coll.client.clock,
970	}
971
972	findOpts := make([]*options.FindOptions, len(opts))
973	for i, opt := range opts {
974		findOpts[i] = &options.FindOptions{
975			AllowPartialResults: opt.AllowPartialResults,
976			BatchSize:           opt.BatchSize,
977			Collation:           opt.Collation,
978			Comment:             opt.Comment,
979			CursorType:          opt.CursorType,
980			Hint:                opt.Hint,
981			Max:                 opt.Max,
982			MaxAwaitTime:        opt.MaxAwaitTime,
983			Min:                 opt.Min,
984			NoCursorTimeout:     opt.NoCursorTimeout,
985			OplogReplay:         opt.OplogReplay,
986			Projection:          opt.Projection,
987			ReturnKey:           opt.ReturnKey,
988			ShowRecordID:        opt.ShowRecordID,
989			Skip:                opt.Skip,
990			Snapshot:            opt.Snapshot,
991			Sort:                opt.Sort,
992		}
993	}
994	// Unconditionally send a limit to make sure only one document is returned and the cursor is not kept open
995	// by the server.
996	findOpts = append(findOpts, options.Find().SetLimit(-1))
997
998	batchCursor, err := driver.Find(
999		ctx, cmd,
1000		coll.client.topology,
1001		coll.readSelector,
1002		coll.client.id,
1003		coll.client.topology.SessionPool,
1004		coll.registry,
1005		findOpts...,
1006	)
1007	if err != nil {
1008		return &SingleResult{err: replaceErrors(err)}
1009	}
1010
1011	cursor, err := newCursor(batchCursor, coll.registry)
1012	return &SingleResult{cur: cursor, reg: coll.registry, err: replaceErrors(err)}
1013}
1014
1015// FindOneAndDelete find a single document and deletes it, returning the
1016// original in result.
1017func (coll *Collection) FindOneAndDelete(ctx context.Context, filter interface{},
1018	opts ...*options.FindOneAndDeleteOptions) *SingleResult {
1019
1020	if ctx == nil {
1021		ctx = context.Background()
1022	}
1023
1024	f, err := transformDocument(coll.registry, filter)
1025	if err != nil {
1026		return &SingleResult{err: err}
1027	}
1028
1029	sess := sessionFromContext(ctx)
1030
1031	err = coll.client.validSession(sess)
1032	if err != nil {
1033		return &SingleResult{err: err}
1034	}
1035
1036	oldns := coll.namespace()
1037	wc := coll.writeConcern
1038	if sess.TransactionRunning() {
1039		wc = nil
1040	}
1041
1042	cmd := command.FindOneAndDelete{
1043		NS:           command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
1044		Query:        f,
1045		WriteConcern: wc,
1046		Session:      sess,
1047		Clock:        coll.client.clock,
1048	}
1049
1050	res, err := driver.FindOneAndDelete(
1051		ctx, cmd,
1052		coll.client.topology,
1053		coll.writeSelector,
1054		coll.client.id,
1055		coll.client.topology.SessionPool,
1056		coll.client.retryWrites,
1057		coll.registry,
1058		opts...,
1059	)
1060
1061	if err != nil {
1062		return &SingleResult{err: replaceErrors(err)}
1063	}
1064
1065	if res.WriteConcernError != nil {
1066		return &SingleResult{err: *convertWriteConcernError(res.WriteConcernError)}
1067	}
1068
1069	return &SingleResult{rdr: res.Value, reg: coll.registry}
1070}
1071
1072// FindOneAndReplace finds a single document and replaces it, returning either
1073// the original or the replaced document.
1074func (coll *Collection) FindOneAndReplace(ctx context.Context, filter interface{},
1075	replacement interface{}, opts ...*options.FindOneAndReplaceOptions) *SingleResult {
1076
1077	if ctx == nil {
1078		ctx = context.Background()
1079	}
1080
1081	f, err := transformDocument(coll.registry, filter)
1082	if err != nil {
1083		return &SingleResult{err: err}
1084	}
1085
1086	r, err := transformDocument(coll.registry, replacement)
1087	if err != nil {
1088		return &SingleResult{err: err}
1089	}
1090
1091	if len(r) > 0 && strings.HasPrefix(r[0].Key, "$") {
1092		return &SingleResult{err: errors.New("replacement document cannot contains keys beginning with '$")}
1093	}
1094
1095	sess := sessionFromContext(ctx)
1096
1097	err = coll.client.validSession(sess)
1098	if err != nil {
1099		return &SingleResult{err: err}
1100	}
1101
1102	wc := coll.writeConcern
1103	if sess.TransactionRunning() {
1104		wc = nil
1105	}
1106
1107	oldns := coll.namespace()
1108	cmd := command.FindOneAndReplace{
1109		NS:           command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
1110		Query:        f,
1111		Replacement:  r,
1112		WriteConcern: wc,
1113		Session:      sess,
1114		Clock:        coll.client.clock,
1115	}
1116
1117	res, err := driver.FindOneAndReplace(
1118		ctx, cmd,
1119		coll.client.topology,
1120		coll.writeSelector,
1121		coll.client.id,
1122		coll.client.topology.SessionPool,
1123		coll.client.retryWrites,
1124		coll.registry,
1125		opts...,
1126	)
1127	if err != nil {
1128		return &SingleResult{err: replaceErrors(err)}
1129	}
1130
1131	if res.WriteConcernError != nil {
1132		return &SingleResult{err: *convertWriteConcernError(res.WriteConcernError)}
1133	}
1134
1135	return &SingleResult{rdr: res.Value, reg: coll.registry}
1136}
1137
1138// FindOneAndUpdate finds a single document and updates it, returning either
1139// the original or the updated.
1140func (coll *Collection) FindOneAndUpdate(ctx context.Context, filter interface{},
1141	update interface{}, opts ...*options.FindOneAndUpdateOptions) *SingleResult {
1142
1143	if ctx == nil {
1144		ctx = context.Background()
1145	}
1146
1147	f, err := transformDocument(coll.registry, filter)
1148	if err != nil {
1149		return &SingleResult{err: err}
1150	}
1151
1152	u, err := transformDocument(coll.registry, update)
1153	if err != nil {
1154		return &SingleResult{err: err}
1155	}
1156
1157	err = ensureDollarKey(u)
1158	if err != nil {
1159		return &SingleResult{
1160			err: err,
1161		}
1162	}
1163
1164	sess := sessionFromContext(ctx)
1165
1166	err = coll.client.validSession(sess)
1167	if err != nil {
1168		return &SingleResult{err: err}
1169	}
1170
1171	wc := coll.writeConcern
1172	if sess.TransactionRunning() {
1173		wc = nil
1174	}
1175
1176	oldns := coll.namespace()
1177	cmd := command.FindOneAndUpdate{
1178		NS:           command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
1179		Query:        f,
1180		Update:       u,
1181		WriteConcern: wc,
1182		Session:      sess,
1183		Clock:        coll.client.clock,
1184	}
1185
1186	res, err := driver.FindOneAndUpdate(
1187		ctx, cmd,
1188		coll.client.topology,
1189		coll.writeSelector,
1190		coll.client.id,
1191		coll.client.topology.SessionPool,
1192		coll.client.retryWrites,
1193		coll.registry,
1194		opts...,
1195	)
1196	if err != nil {
1197		return &SingleResult{err: replaceErrors(err)}
1198	}
1199
1200	if res.WriteConcernError != nil {
1201		return &SingleResult{err: *convertWriteConcernError(res.WriteConcernError)}
1202	}
1203
1204	return &SingleResult{rdr: res.Value, reg: coll.registry}
1205}
1206
1207// Watch returns a change stream cursor used to receive notifications of changes to the collection.
1208//
1209// This method is preferred to running a raw aggregation with a $changeStream stage because it
1210// supports resumability in the case of some errors. The collection must have read concern majority or no read concern
1211// for a change stream to be created successfully.
1212func (coll *Collection) Watch(ctx context.Context, pipeline interface{},
1213	opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
1214	return newChangeStream(ctx, coll, pipeline, opts...)
1215}
1216
1217// Indexes returns the index view for this collection.
1218func (coll *Collection) Indexes() IndexView {
1219	return IndexView{coll: coll}
1220}
1221
1222// Drop drops this collection from database.
1223func (coll *Collection) Drop(ctx context.Context) error {
1224	if ctx == nil {
1225		ctx = context.Background()
1226	}
1227
1228	sess := sessionFromContext(ctx)
1229
1230	err := coll.client.validSession(sess)
1231	if err != nil {
1232		return err
1233	}
1234
1235	wc := coll.writeConcern
1236	if sess.TransactionRunning() {
1237		wc = nil
1238	}
1239
1240	cmd := command.DropCollection{
1241		DB:           coll.db.name,
1242		Collection:   coll.name,
1243		WriteConcern: wc,
1244		Session:      sess,
1245		Clock:        coll.client.clock,
1246	}
1247	_, err = driver.DropCollection(
1248		ctx, cmd,
1249		coll.client.topology,
1250		coll.writeSelector,
1251		coll.client.id,
1252		coll.client.topology.SessionPool,
1253	)
1254	if err != nil && !command.IsNotFound(err) {
1255		return replaceErrors(err)
1256	}
1257	return nil
1258}
1259