1// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package mongo
8
9import (
10	"context"
11	"errors"
12	"fmt"
13	"strings"
14	"time"
15
16	"go.mongodb.org/mongo-driver/bson"
17	"go.mongodb.org/mongo-driver/bson/bsoncodec"
18	"go.mongodb.org/mongo-driver/bson/bsontype"
19	"go.mongodb.org/mongo-driver/mongo/options"
20	"go.mongodb.org/mongo-driver/mongo/readconcern"
21	"go.mongodb.org/mongo-driver/mongo/readpref"
22	"go.mongodb.org/mongo-driver/mongo/writeconcern"
23	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
24	"go.mongodb.org/mongo-driver/x/mongo/driver"
25	"go.mongodb.org/mongo-driver/x/mongo/driver/description"
26	"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
27	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
28)
29
30// Collection is a handle to a MongoDB collection. It is safe for concurrent use by multiple goroutines.
31type Collection struct {
32	client         *Client
33	db             *Database
34	name           string
35	readConcern    *readconcern.ReadConcern
36	writeConcern   *writeconcern.WriteConcern
37	readPreference *readpref.ReadPref
38	readSelector   description.ServerSelector
39	writeSelector  description.ServerSelector
40	registry       *bsoncodec.Registry
41}
42
43// aggregateParams is used to store information to configure an Aggregate operation.
44type aggregateParams struct {
45	ctx            context.Context
46	pipeline       interface{}
47	client         *Client
48	registry       *bsoncodec.Registry
49	readConcern    *readconcern.ReadConcern
50	writeConcern   *writeconcern.WriteConcern
51	retryRead      bool
52	db             string
53	col            string
54	readSelector   description.ServerSelector
55	writeSelector  description.ServerSelector
56	readPreference *readpref.ReadPref
57	opts           []*options.AggregateOptions
58}
59
60func closeImplicitSession(sess *session.Client) {
61	if sess != nil && sess.SessionType == session.Implicit {
62		sess.EndSession()
63	}
64}
65
66func newCollection(db *Database, name string, opts ...*options.CollectionOptions) *Collection {
67	collOpt := options.MergeCollectionOptions(opts...)
68
69	rc := db.readConcern
70	if collOpt.ReadConcern != nil {
71		rc = collOpt.ReadConcern
72	}
73
74	wc := db.writeConcern
75	if collOpt.WriteConcern != nil {
76		wc = collOpt.WriteConcern
77	}
78
79	rp := db.readPreference
80	if collOpt.ReadPreference != nil {
81		rp = collOpt.ReadPreference
82	}
83
84	reg := db.registry
85	if collOpt.Registry != nil {
86		reg = collOpt.Registry
87	}
88
89	readSelector := description.CompositeSelector([]description.ServerSelector{
90		description.ReadPrefSelector(rp),
91		description.LatencySelector(db.client.localThreshold),
92	})
93
94	writeSelector := description.CompositeSelector([]description.ServerSelector{
95		description.WriteSelector(),
96		description.LatencySelector(db.client.localThreshold),
97	})
98
99	coll := &Collection{
100		client:         db.client,
101		db:             db,
102		name:           name,
103		readPreference: rp,
104		readConcern:    rc,
105		writeConcern:   wc,
106		readSelector:   readSelector,
107		writeSelector:  writeSelector,
108		registry:       reg,
109	}
110
111	return coll
112}
113
114func (coll *Collection) copy() *Collection {
115	return &Collection{
116		client:         coll.client,
117		db:             coll.db,
118		name:           coll.name,
119		readConcern:    coll.readConcern,
120		writeConcern:   coll.writeConcern,
121		readPreference: coll.readPreference,
122		readSelector:   coll.readSelector,
123		writeSelector:  coll.writeSelector,
124		registry:       coll.registry,
125	}
126}
127
128// Clone creates a copy of the Collection configured with the given CollectionOptions.
129// The specified options are merged with the existing options on the collection, with the specified options taking
130// precedence.
131func (coll *Collection) Clone(opts ...*options.CollectionOptions) (*Collection, error) {
132	copyColl := coll.copy()
133	optsColl := options.MergeCollectionOptions(opts...)
134
135	if optsColl.ReadConcern != nil {
136		copyColl.readConcern = optsColl.ReadConcern
137	}
138
139	if optsColl.WriteConcern != nil {
140		copyColl.writeConcern = optsColl.WriteConcern
141	}
142
143	if optsColl.ReadPreference != nil {
144		copyColl.readPreference = optsColl.ReadPreference
145	}
146
147	if optsColl.Registry != nil {
148		copyColl.registry = optsColl.Registry
149	}
150
151	copyColl.readSelector = description.CompositeSelector([]description.ServerSelector{
152		description.ReadPrefSelector(copyColl.readPreference),
153		description.LatencySelector(copyColl.client.localThreshold),
154	})
155
156	return copyColl, nil
157}
158
159// Name returns the name of the collection.
160func (coll *Collection) Name() string {
161	return coll.name
162}
163
164// Database returns the Database that was used to create the Collection.
165func (coll *Collection) Database() *Database {
166	return coll.db
167}
168
169// BulkWrite performs a bulk write operation (https://docs.mongodb.com/manual/core/bulk-write-operations/).
170//
171// The models parameter must be a slice of operations to be executed in this bulk write. It cannot be nil or empty.
172// All of the models must be non-nil. See the mongo.WriteModel documentation for a list of valid model types and
173// examples of how they should be used.
174//
175// The opts parameter can be used to specify options for the operation (see the options.BulkWriteOptions documentation.)
176func (coll *Collection) BulkWrite(ctx context.Context, models []WriteModel,
177	opts ...*options.BulkWriteOptions) (*BulkWriteResult, error) {
178
179	if len(models) == 0 {
180		return nil, ErrEmptySlice
181	}
182
183	if ctx == nil {
184		ctx = context.Background()
185	}
186
187	sess := sessionFromContext(ctx)
188	if sess == nil && coll.client.sessionPool != nil {
189		var err error
190		sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
191		if err != nil {
192			return nil, err
193		}
194		defer sess.EndSession()
195	}
196
197	err := coll.client.validSession(sess)
198	if err != nil {
199		return nil, err
200	}
201
202	wc := coll.writeConcern
203	if sess.TransactionRunning() {
204		wc = nil
205	}
206	if !writeconcern.AckWrite(wc) {
207		sess = nil
208	}
209
210	selector := makePinnedSelector(sess, coll.writeSelector)
211
212	for _, model := range models {
213		if model == nil {
214			return nil, ErrNilDocument
215		}
216	}
217
218	bwo := options.MergeBulkWriteOptions(opts...)
219
220	op := bulkWrite{
221		ordered:                  bwo.Ordered,
222		bypassDocumentValidation: bwo.BypassDocumentValidation,
223		models:                   models,
224		session:                  sess,
225		collection:               coll,
226		selector:                 selector,
227		writeConcern:             wc,
228	}
229
230	err = op.execute(ctx)
231
232	return &op.result, replaceErrors(err)
233}
234
235func (coll *Collection) insert(ctx context.Context, documents []interface{},
236	opts ...*options.InsertManyOptions) ([]interface{}, error) {
237
238	if ctx == nil {
239		ctx = context.Background()
240	}
241
242	result := make([]interface{}, len(documents))
243	docs := make([]bsoncore.Document, len(documents))
244
245	for i, doc := range documents {
246		var err error
247		docs[i], result[i], err = transformAndEnsureIDv2(coll.registry, doc)
248		if err != nil {
249			return nil, err
250		}
251	}
252
253	sess := sessionFromContext(ctx)
254	if sess == nil && coll.client.sessionPool != nil {
255		var err error
256		sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
257		if err != nil {
258			return nil, err
259		}
260		defer sess.EndSession()
261	}
262
263	err := coll.client.validSession(sess)
264	if err != nil {
265		return nil, err
266	}
267
268	wc := coll.writeConcern
269	if sess.TransactionRunning() {
270		wc = nil
271	}
272	if !writeconcern.AckWrite(wc) {
273		sess = nil
274	}
275
276	selector := makePinnedSelector(sess, coll.writeSelector)
277
278	op := operation.NewInsert(docs...).
279		Session(sess).WriteConcern(wc).CommandMonitor(coll.client.monitor).
280		ServerSelector(selector).ClusterClock(coll.client.clock).
281		Database(coll.db.name).Collection(coll.name).
282		Deployment(coll.client.deployment).Crypt(coll.client.crypt)
283	imo := options.MergeInsertManyOptions(opts...)
284	if imo.BypassDocumentValidation != nil && *imo.BypassDocumentValidation {
285		op = op.BypassDocumentValidation(*imo.BypassDocumentValidation)
286	}
287	if imo.Ordered != nil {
288		op = op.Ordered(*imo.Ordered)
289	}
290	retry := driver.RetryNone
291	if coll.client.retryWrites {
292		retry = driver.RetryOncePerCommand
293	}
294	op = op.Retry(retry)
295
296	return result, op.Execute(ctx)
297}
298
299// InsertOne executes an insert command to insert a single document into the collection.
300//
301// The document parameter must be the document to be inserted. It cannot be nil. If the document does not have an _id
302// field when transformed into BSON, one will be added automatically to the marshalled document. The original document
303// will not be modified. The _id can be retrieved from the InsertedID field of the returned InsertOneResult.
304//
305// The opts parameter can be used to specify options for the operation (see the options.InsertOneOptions documentation.)
306//
307// For more information about the command, see https://docs.mongodb.com/manual/reference/command/insert/.
308func (coll *Collection) InsertOne(ctx context.Context, document interface{},
309	opts ...*options.InsertOneOptions) (*InsertOneResult, error) {
310
311	imOpts := make([]*options.InsertManyOptions, len(opts))
312	for i, opt := range opts {
313		imo := options.InsertMany()
314		if opt.BypassDocumentValidation != nil && *opt.BypassDocumentValidation {
315			imo = imo.SetBypassDocumentValidation(*opt.BypassDocumentValidation)
316		}
317		imOpts[i] = imo
318	}
319	res, err := coll.insert(ctx, []interface{}{document}, imOpts...)
320
321	rr, err := processWriteError(err)
322	if rr&rrOne == 0 {
323		return nil, err
324	}
325	return &InsertOneResult{InsertedID: res[0]}, err
326}
327
328// InsertMany executes an insert command to insert multiple documents into the collection. If write errors occur
329// during the operation (e.g. duplicate key error), this method returns a BulkWriteException error.
330//
331// The documents parameter must be a slice of documents to insert. The slice cannot be nil or empty. The elements must
332// all be non-nil. For any document that does not have an _id field when transformed into BSON, one will be added
333// automatically to the marshalled document. The original document will not be modified. The _id values for the inserted
334// documents can be retrieved from the InsertedIDs field of the returnd InsertManyResult.
335//
336// The opts parameter can be used to specify options for the operation (see the options.InsertManyOptions documentation.)
337//
338// For more information about the command, see https://docs.mongodb.com/manual/reference/command/insert/.
339func (coll *Collection) InsertMany(ctx context.Context, documents []interface{},
340	opts ...*options.InsertManyOptions) (*InsertManyResult, error) {
341
342	if len(documents) == 0 {
343		return nil, ErrEmptySlice
344	}
345
346	result, err := coll.insert(ctx, documents, opts...)
347	rr, err := processWriteError(err)
348	if rr&rrMany == 0 {
349		return nil, err
350	}
351
352	imResult := &InsertManyResult{InsertedIDs: result}
353	writeException, ok := err.(WriteException)
354	if !ok {
355		return imResult, err
356	}
357
358	// create and return a BulkWriteException
359	bwErrors := make([]BulkWriteError, 0, len(writeException.WriteErrors))
360	for _, we := range writeException.WriteErrors {
361		bwErrors = append(bwErrors, BulkWriteError{
362			WriteError{
363				Index:   we.Index,
364				Code:    we.Code,
365				Message: we.Message,
366			},
367			nil,
368		})
369	}
370	return imResult, BulkWriteException{
371		WriteErrors:       bwErrors,
372		WriteConcernError: writeException.WriteConcernError,
373	}
374}
375
376func (coll *Collection) delete(ctx context.Context, filter interface{}, deleteOne bool, expectedRr returnResult,
377	opts ...*options.DeleteOptions) (*DeleteResult, error) {
378
379	if ctx == nil {
380		ctx = context.Background()
381	}
382
383	f, err := transformBsoncoreDocument(coll.registry, filter)
384	if err != nil {
385		return nil, err
386	}
387
388	sess := sessionFromContext(ctx)
389	if sess == nil && coll.client.sessionPool != nil {
390		sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
391		if err != nil {
392			return nil, err
393		}
394		defer sess.EndSession()
395	}
396
397	err = coll.client.validSession(sess)
398	if err != nil {
399		return nil, err
400	}
401
402	wc := coll.writeConcern
403	if sess.TransactionRunning() {
404		wc = nil
405	}
406	if !writeconcern.AckWrite(wc) {
407		sess = nil
408	}
409
410	selector := makePinnedSelector(sess, coll.writeSelector)
411
412	var limit int32
413	if deleteOne {
414		limit = 1
415	}
416	do := options.MergeDeleteOptions(opts...)
417	didx, doc := bsoncore.AppendDocumentStart(nil)
418	doc = bsoncore.AppendDocumentElement(doc, "q", f)
419	doc = bsoncore.AppendInt32Element(doc, "limit", limit)
420	if do.Collation != nil {
421		doc = bsoncore.AppendDocumentElement(doc, "collation", do.Collation.ToDocument())
422	}
423	doc, _ = bsoncore.AppendDocumentEnd(doc, didx)
424
425	op := operation.NewDelete(doc).
426		Session(sess).WriteConcern(wc).CommandMonitor(coll.client.monitor).
427		ServerSelector(selector).ClusterClock(coll.client.clock).
428		Database(coll.db.name).Collection(coll.name).
429		Deployment(coll.client.deployment).Crypt(coll.client.crypt)
430
431	// deleteMany cannot be retried
432	retryMode := driver.RetryNone
433	if deleteOne && coll.client.retryWrites {
434		retryMode = driver.RetryOncePerCommand
435	}
436	op = op.Retry(retryMode)
437	rr, err := processWriteError(op.Execute(ctx))
438	if rr&expectedRr == 0 {
439		return nil, err
440	}
441	return &DeleteResult{DeletedCount: int64(op.Result().N)}, err
442}
443
444// DeleteOne executes a delete command to delete at most one document from the collection.
445//
446// The filter parameter must be a document containing query operators and can be used to select the document to be
447// deleted. It cannot be nil. If the filter does not match any documents, the operation will succeed and a DeleteResult
448// with a DeletedCount of 0 will be returned. If the filter matches multiple documents, one will be selected from the
449// matched set.
450//
451// The opts parameter can be used to specify options for the operation (see the options.DeleteOptions documentation).
452//
453// For more information about the command, see https://docs.mongodb.com/manual/reference/command/delete/.
454func (coll *Collection) DeleteOne(ctx context.Context, filter interface{},
455	opts ...*options.DeleteOptions) (*DeleteResult, error) {
456
457	return coll.delete(ctx, filter, true, rrOne, opts...)
458}
459
460// DeleteMany executes a delete command to delete documents from the collection.
461//
462// The filter parameter must be a document containing query operators and can be used to select the documents to
463// be deleted. It cannot be nil. An empty document (e.g. bson.D{}) should be used to delete all documents in the
464// collection. If the filter does not match any documents, the operation will succeed and a DeleteResult with a
465// DeletedCount of 0 will be returned.
466//
467// The opts parameter can be used to specify options for the operation (see the options.DeleteOptions documentation).
468//
469// For more information about the command, see https://docs.mongodb.com/manual/reference/command/delete/.
470func (coll *Collection) DeleteMany(ctx context.Context, filter interface{},
471	opts ...*options.DeleteOptions) (*DeleteResult, error) {
472
473	return coll.delete(ctx, filter, false, rrMany, opts...)
474}
475
476func (coll *Collection) updateOrReplace(ctx context.Context, filter bsoncore.Document, update interface{}, multi bool,
477	expectedRr returnResult, checkDollarKey bool, opts ...*options.UpdateOptions) (*UpdateResult, error) {
478
479	if ctx == nil {
480		ctx = context.Background()
481	}
482
483	uo := options.MergeUpdateOptions(opts...)
484	uidx, updateDoc := bsoncore.AppendDocumentStart(nil)
485	updateDoc = bsoncore.AppendDocumentElement(updateDoc, "q", filter)
486
487	u, err := transformUpdateValue(coll.registry, update, checkDollarKey)
488	if err != nil {
489		return nil, err
490	}
491	updateDoc = bsoncore.AppendValueElement(updateDoc, "u", u)
492	if multi {
493		updateDoc = bsoncore.AppendBooleanElement(updateDoc, "multi", multi)
494	}
495
496	// collation, arrayFilters, and upsert are included on the individual update documents rather than as part of the
497	// command
498	if uo.Collation != nil {
499		updateDoc = bsoncore.AppendDocumentElement(updateDoc, "collation", bsoncore.Document(uo.Collation.ToDocument()))
500	}
501	if uo.ArrayFilters != nil {
502		arr, err := uo.ArrayFilters.ToArrayDocument()
503		if err != nil {
504			return nil, err
505		}
506		updateDoc = bsoncore.AppendArrayElement(updateDoc, "arrayFilters", arr)
507	}
508	if uo.Upsert != nil {
509		updateDoc = bsoncore.AppendBooleanElement(updateDoc, "upsert", *uo.Upsert)
510	}
511	updateDoc, _ = bsoncore.AppendDocumentEnd(updateDoc, uidx)
512
513	sess := sessionFromContext(ctx)
514	if sess == nil && coll.client.sessionPool != nil {
515		var err error
516		sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
517		if err != nil {
518			return nil, err
519		}
520		defer sess.EndSession()
521	}
522
523	err = coll.client.validSession(sess)
524	if err != nil {
525		return nil, err
526	}
527
528	wc := coll.writeConcern
529	if sess.TransactionRunning() {
530		wc = nil
531	}
532	if !writeconcern.AckWrite(wc) {
533		sess = nil
534	}
535
536	selector := makePinnedSelector(sess, coll.writeSelector)
537
538	op := operation.NewUpdate(updateDoc).
539		Session(sess).WriteConcern(wc).CommandMonitor(coll.client.monitor).
540		ServerSelector(selector).ClusterClock(coll.client.clock).
541		Database(coll.db.name).Collection(coll.name).
542		Deployment(coll.client.deployment).Crypt(coll.client.crypt)
543
544	if uo.BypassDocumentValidation != nil && *uo.BypassDocumentValidation {
545		op = op.BypassDocumentValidation(*uo.BypassDocumentValidation)
546	}
547	retry := driver.RetryNone
548	// retryable writes are only enabled updateOne/replaceOne operations
549	if !multi && coll.client.retryWrites {
550		retry = driver.RetryOncePerCommand
551	}
552	op = op.Retry(retry)
553	err = op.Execute(ctx)
554
555	rr, err := processWriteError(err)
556	if rr&expectedRr == 0 {
557		return nil, err
558	}
559
560	opRes := op.Result()
561	res := &UpdateResult{
562		MatchedCount:  int64(opRes.N),
563		ModifiedCount: int64(opRes.NModified),
564		UpsertedCount: int64(len(opRes.Upserted)),
565	}
566	if len(opRes.Upserted) > 0 {
567		res.UpsertedID = opRes.Upserted[0].ID
568		res.MatchedCount--
569	}
570
571	return res, err
572}
573
574// UpdateOne executes an update command to update at most one document in the collection.
575//
576// The filter parameter must be a document containing query operators and can be used to select the document to be
577// updated. It cannot be nil. If the filter does not match any documents, the operation will succeed and an UpdateResult
578// with a MatchedCount of 0 will be returned. If the filter matches multiple documents, one will be selected from the
579// matched set and MatchedCount will equal 1.
580//
581// The update parameter must be a document containing update operators
582// (https://docs.mongodb.com/manual/reference/operator/update/) and can be used to specify the modifications to be
583// made to the selected document. It cannot be nil or empty.
584//
585// The opts parameter can be used to specify options for the operation (see the options.UpdateOptions documentation).
586//
587// For more information about the command, see https://docs.mongodb.com/manual/reference/command/update/.
588func (coll *Collection) UpdateOne(ctx context.Context, filter interface{}, update interface{},
589	opts ...*options.UpdateOptions) (*UpdateResult, error) {
590
591	if ctx == nil {
592		ctx = context.Background()
593	}
594
595	f, err := transformBsoncoreDocument(coll.registry, filter)
596	if err != nil {
597		return nil, err
598	}
599
600	return coll.updateOrReplace(ctx, f, update, false, rrOne, true, opts...)
601}
602
603// UpdateMany executes an update command to update documents in the collection.
604//
605// The filter parameter must be a document containing query operators and can be used to select the documents to be
606// updated. It cannot be nil. If the filter does not match any documents, the operation will succeed and an UpdateResult
607// with a MatchedCount of 0 will be returned.
608//
609// The update parameter must be a document containing update operators
610// (https://docs.mongodb.com/manual/reference/operator/update/) and can be used to specify the modifications to be made
611// to the selected documents. It cannot be nil or empty.
612//
613// The opts parameter can be used to specify options for the operation (see the options.UpdateOptions documentation).
614//
615// For more information about the command, see https://docs.mongodb.com/manual/reference/command/update/.
616func (coll *Collection) UpdateMany(ctx context.Context, filter interface{}, update interface{},
617	opts ...*options.UpdateOptions) (*UpdateResult, error) {
618
619	if ctx == nil {
620		ctx = context.Background()
621	}
622
623	f, err := transformBsoncoreDocument(coll.registry, filter)
624	if err != nil {
625		return nil, err
626	}
627
628	return coll.updateOrReplace(ctx, f, update, true, rrMany, true, opts...)
629}
630
631// ReplaceOne executes an update command to replace at most one document in the collection.
632//
633// The filter parameter must be a document containing query operators and can be used to select the document to be
634// replaced. It cannot be nil. If the filter does not match any documents, the operation will succeed and an
635// UpdateResult with a MatchedCount of 0 will be returned. If the filter matches multiple documents, one will be
636// selected from the matched set and MatchedCount will equal 1.
637//
638// The replacement parameter must be a document that will be used to replace the selected document. It cannot be nil
639// and cannot contain any update operators (https://docs.mongodb.com/manual/reference/operator/update/).
640//
641// The opts parameter can be used to specify options for the operation (see the options.ReplaceOptions documentation).
642//
643// For more information about the command, see https://docs.mongodb.com/manual/reference/command/update/.
644func (coll *Collection) ReplaceOne(ctx context.Context, filter interface{},
645	replacement interface{}, opts ...*options.ReplaceOptions) (*UpdateResult, error) {
646
647	if ctx == nil {
648		ctx = context.Background()
649	}
650
651	f, err := transformBsoncoreDocument(coll.registry, filter)
652	if err != nil {
653		return nil, err
654	}
655
656	r, err := transformBsoncoreDocument(coll.registry, replacement)
657	if err != nil {
658		return nil, err
659	}
660
661	if elem, err := r.IndexErr(0); err == nil && strings.HasPrefix(elem.Key(), "$") {
662		return nil, errors.New("replacement document cannot contains keys beginning with '$")
663	}
664
665	updateOptions := make([]*options.UpdateOptions, 0, len(opts))
666	for _, opt := range opts {
667		uOpts := options.Update()
668		uOpts.BypassDocumentValidation = opt.BypassDocumentValidation
669		uOpts.Collation = opt.Collation
670		uOpts.Upsert = opt.Upsert
671		updateOptions = append(updateOptions, uOpts)
672	}
673
674	return coll.updateOrReplace(ctx, f, r, false, rrOne, false, updateOptions...)
675}
676
677// Aggregate executes an aggregate command against the collection and returns a cursor over the resulting documents.
678//
679// The pipeline parameter must be an array of documents, each representing an aggregation stage. The pipeline cannot
680// be nil but can be empty. The stage documents must all be non-nil. For a pipeline of bson.D documents, the
681// mongo.Pipeline type can be used. See
682// https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/#db-collection-aggregate-stages for a list of
683// valid stages in aggregations.
684//
685// The opts parameter can be used to specify options for the operation (see the options.AggregateOptions documentation.)
686//
687// For more information about the command, see https://docs.mongodb.com/manual/reference/command/aggregate/.
688func (coll *Collection) Aggregate(ctx context.Context, pipeline interface{},
689	opts ...*options.AggregateOptions) (*Cursor, error) {
690	a := aggregateParams{
691		ctx:            ctx,
692		pipeline:       pipeline,
693		client:         coll.client,
694		registry:       coll.registry,
695		readConcern:    coll.readConcern,
696		writeConcern:   coll.writeConcern,
697		retryRead:      coll.client.retryReads,
698		db:             coll.db.name,
699		col:            coll.name,
700		readSelector:   coll.readSelector,
701		writeSelector:  coll.writeSelector,
702		readPreference: coll.readPreference,
703		opts:           opts,
704	}
705	return aggregate(a)
706}
707
708// aggreate is the helper method for Aggregate
709func aggregate(a aggregateParams) (*Cursor, error) {
710
711	if a.ctx == nil {
712		a.ctx = context.Background()
713	}
714
715	pipelineArr, hasOutputStage, err := transformAggregatePipelinev2(a.registry, a.pipeline)
716	if err != nil {
717		return nil, err
718	}
719
720	sess := sessionFromContext(a.ctx)
721	if sess == nil && a.client.sessionPool != nil {
722		sess, err = session.NewClientSession(a.client.sessionPool, a.client.id, session.Implicit)
723		if err != nil {
724			return nil, err
725		}
726	}
727	if err = a.client.validSession(sess); err != nil {
728		return nil, err
729	}
730
731	var wc *writeconcern.WriteConcern
732	if hasOutputStage {
733		wc = a.writeConcern
734	}
735	rc := a.readConcern
736	if sess.TransactionRunning() {
737		wc = nil
738		rc = nil
739	}
740	if !writeconcern.AckWrite(wc) {
741		closeImplicitSession(sess)
742		sess = nil
743	}
744
745	selector := makePinnedSelector(sess, a.writeSelector)
746	if !hasOutputStage {
747		selector = makeReadPrefSelector(sess, a.readSelector, a.client.localThreshold)
748	}
749
750	ao := options.MergeAggregateOptions(a.opts...)
751	cursorOpts := driver.CursorOptions{
752		CommandMonitor: a.client.monitor,
753		Crypt:          a.client.crypt,
754	}
755
756	op := operation.NewAggregate(pipelineArr).Session(sess).WriteConcern(wc).ReadConcern(rc).ReadPreference(a.readPreference).CommandMonitor(a.client.monitor).
757		ServerSelector(selector).ClusterClock(a.client.clock).Database(a.db).Collection(a.col).Deployment(a.client.deployment).Crypt(a.client.crypt)
758	if ao.AllowDiskUse != nil {
759		op.AllowDiskUse(*ao.AllowDiskUse)
760	}
761	// ignore batchSize of 0 with $out
762	if ao.BatchSize != nil && !(*ao.BatchSize == 0 && hasOutputStage) {
763		op.BatchSize(*ao.BatchSize)
764		cursorOpts.BatchSize = *ao.BatchSize
765	}
766	if ao.BypassDocumentValidation != nil && *ao.BypassDocumentValidation {
767		op.BypassDocumentValidation(*ao.BypassDocumentValidation)
768	}
769	if ao.Collation != nil {
770		op.Collation(bsoncore.Document(ao.Collation.ToDocument()))
771	}
772	if ao.MaxTime != nil {
773		op.MaxTimeMS(int64(*ao.MaxTime / time.Millisecond))
774	}
775	if ao.MaxAwaitTime != nil {
776		cursorOpts.MaxTimeMS = int64(*ao.MaxAwaitTime / time.Millisecond)
777	}
778	if ao.Comment != nil {
779		op.Comment(*ao.Comment)
780	}
781	if ao.Hint != nil {
782		hintVal, err := transformValue(a.registry, ao.Hint)
783		if err != nil {
784			closeImplicitSession(sess)
785			return nil, err
786		}
787		op.Hint(hintVal)
788	}
789
790	retry := driver.RetryNone
791	if a.retryRead && !hasOutputStage {
792		retry = driver.RetryOncePerCommand
793	}
794	op = op.Retry(retry)
795
796	err = op.Execute(a.ctx)
797	if err != nil {
798		closeImplicitSession(sess)
799		if wce, ok := err.(driver.WriteCommandError); ok && wce.WriteConcernError != nil {
800			return nil, *convertDriverWriteConcernError(wce.WriteConcernError)
801		}
802		return nil, replaceErrors(err)
803	}
804
805	bc, err := op.Result(cursorOpts)
806	if err != nil {
807		closeImplicitSession(sess)
808		return nil, replaceErrors(err)
809	}
810	cursor, err := newCursorWithSession(bc, a.registry, sess)
811	return cursor, replaceErrors(err)
812}
813
814// CountDocuments returns the number of documents in the collection. For a fast count of the documents in the
815// collection, see the EstimatedDocumentCount method.
816//
817// The filter parameter must be a document and can be used to select which documents contribute to the count. It
818// cannot be nil. An empty document (e.g. bson.D{}) should be used to count all documents in the collection. This will
819// result in a full collection scan.
820//
821// The opts parameter can be used to specify options for the operation (see the options.CountOptions documentation).
822func (coll *Collection) CountDocuments(ctx context.Context, filter interface{},
823	opts ...*options.CountOptions) (int64, error) {
824
825	if ctx == nil {
826		ctx = context.Background()
827	}
828
829	countOpts := options.MergeCountOptions(opts...)
830
831	pipelineArr, err := countDocumentsAggregatePipeline(coll.registry, filter, countOpts)
832	if err != nil {
833		return 0, err
834	}
835
836	sess := sessionFromContext(ctx)
837	if sess == nil && coll.client.sessionPool != nil {
838		sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
839		if err != nil {
840			return 0, err
841		}
842		defer sess.EndSession()
843	}
844	if err = coll.client.validSession(sess); err != nil {
845		return 0, err
846	}
847
848	rc := coll.readConcern
849	if sess.TransactionRunning() {
850		rc = nil
851	}
852
853	selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold)
854	op := operation.NewAggregate(pipelineArr).Session(sess).ReadConcern(rc).ReadPreference(coll.readPreference).
855		CommandMonitor(coll.client.monitor).ServerSelector(selector).ClusterClock(coll.client.clock).Database(coll.db.name).
856		Collection(coll.name).Deployment(coll.client.deployment).Crypt(coll.client.crypt)
857	if countOpts.Collation != nil {
858		op.Collation(bsoncore.Document(countOpts.Collation.ToDocument()))
859	}
860	if countOpts.MaxTime != nil {
861		op.MaxTimeMS(int64(*countOpts.MaxTime / time.Millisecond))
862	}
863	if countOpts.Hint != nil {
864		hintVal, err := transformValue(coll.registry, countOpts.Hint)
865		if err != nil {
866			return 0, err
867		}
868		op.Hint(hintVal)
869	}
870	retry := driver.RetryNone
871	if coll.client.retryReads {
872		retry = driver.RetryOncePerCommand
873	}
874	op = op.Retry(retry)
875
876	err = op.Execute(ctx)
877	if err != nil {
878		return 0, replaceErrors(err)
879	}
880
881	batch := op.ResultCursorResponse().FirstBatch
882	if batch == nil {
883		return 0, errors.New("invalid response from server, no 'firstBatch' field")
884	}
885
886	docs, err := batch.Documents()
887	if err != nil || len(docs) == 0 {
888		return 0, nil
889	}
890
891	val, ok := docs[0].Lookup("n").AsInt64OK()
892	if !ok {
893		return 0, errors.New("invalid response from server, no 'n' field")
894	}
895
896	return val, nil
897}
898
899// EstimatedDocumentCount executes a count command and returns an estimate of the number of documents in the collection
900// using collection metadata.
901//
902// The opts parameter can be used to specify options for the operation (see the options.EstimatedDocumentCountOptions
903// documentation).
904//
905// For more information about the command, see https://docs.mongodb.com/manual/reference/command/count/.
906func (coll *Collection) EstimatedDocumentCount(ctx context.Context,
907	opts ...*options.EstimatedDocumentCountOptions) (int64, error) {
908
909	if ctx == nil {
910		ctx = context.Background()
911	}
912
913	sess := sessionFromContext(ctx)
914
915	var err error
916	if sess == nil && coll.client.sessionPool != nil {
917		sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
918		if err != nil {
919			return 0, err
920		}
921		defer sess.EndSession()
922	}
923
924	err = coll.client.validSession(sess)
925	if err != nil {
926		return 0, err
927	}
928
929	rc := coll.readConcern
930	if sess.TransactionRunning() {
931		rc = nil
932	}
933
934	selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold)
935	op := operation.NewCount().Session(sess).ClusterClock(coll.client.clock).
936		Database(coll.db.name).Collection(coll.name).CommandMonitor(coll.client.monitor).
937		Deployment(coll.client.deployment).ReadConcern(rc).ReadPreference(coll.readPreference).
938		ServerSelector(selector).Crypt(coll.client.crypt)
939
940	co := options.MergeEstimatedDocumentCountOptions(opts...)
941	if co.MaxTime != nil {
942		op = op.MaxTimeMS(int64(*co.MaxTime / time.Millisecond))
943	}
944	retry := driver.RetryNone
945	if coll.client.retryReads {
946		retry = driver.RetryOncePerCommand
947	}
948	op.Retry(retry)
949
950	err = op.Execute(ctx)
951
952	return op.Result().N, replaceErrors(err)
953}
954
955// Distinct executes a distinct command to find the unique values for a specified field in the collection.
956//
957// The fieldName parameter specifies the field name for which distinct values should be returned.
958//
959// The filter parameter must be a document containing query operators and can be used to select which documents are
960// considered. It cannot be nil. An empty document (e.g. bson.D{}) should be used to select all documents.
961//
962// The opts parameter can be used to specify options for the operation (see the options.DistinctOptions documentation).
963//
964// For more information about the command, see https://docs.mongodb.com/manual/reference/command/distinct/.
965func (coll *Collection) Distinct(ctx context.Context, fieldName string, filter interface{},
966	opts ...*options.DistinctOptions) ([]interface{}, error) {
967
968	if ctx == nil {
969		ctx = context.Background()
970	}
971
972	f, err := transformBsoncoreDocument(coll.registry, filter)
973	if err != nil {
974		return nil, err
975	}
976
977	sess := sessionFromContext(ctx)
978
979	if sess == nil && coll.client.sessionPool != nil {
980		sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
981		if err != nil {
982			return nil, err
983		}
984		defer sess.EndSession()
985	}
986
987	err = coll.client.validSession(sess)
988	if err != nil {
989		return nil, err
990	}
991
992	rc := coll.readConcern
993	if sess.TransactionRunning() {
994		rc = nil
995	}
996
997	selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold)
998	option := options.MergeDistinctOptions(opts...)
999
1000	op := operation.NewDistinct(fieldName, bsoncore.Document(f)).
1001		Session(sess).ClusterClock(coll.client.clock).
1002		Database(coll.db.name).Collection(coll.name).CommandMonitor(coll.client.monitor).
1003		Deployment(coll.client.deployment).ReadConcern(rc).ReadPreference(coll.readPreference).
1004		ServerSelector(selector).Crypt(coll.client.crypt)
1005
1006	if option.Collation != nil {
1007		op.Collation(bsoncore.Document(option.Collation.ToDocument()))
1008	}
1009	if option.MaxTime != nil {
1010		op.MaxTimeMS(int64(*option.MaxTime / time.Millisecond))
1011	}
1012	retry := driver.RetryNone
1013	if coll.client.retryReads {
1014		retry = driver.RetryOncePerCommand
1015	}
1016	op = op.Retry(retry)
1017
1018	err = op.Execute(ctx)
1019	if err != nil {
1020		return nil, replaceErrors(err)
1021	}
1022
1023	arr, ok := op.Result().Values.ArrayOK()
1024	if !ok {
1025		return nil, fmt.Errorf("response field 'values' is type array, but received BSON type %s", op.Result().Values.Type)
1026	}
1027
1028	values, err := arr.Values()
1029	if err != nil {
1030		return nil, err
1031	}
1032
1033	retArray := make([]interface{}, len(values))
1034
1035	for i, val := range values {
1036		raw := bson.RawValue{Type: val.Type, Value: val.Data}
1037		err = raw.Unmarshal(&retArray[i])
1038		if err != nil {
1039			return nil, err
1040		}
1041	}
1042
1043	return retArray, replaceErrors(err)
1044}
1045
1046// Find executes a find command and returns a Cursor over the matching documents in the collection.
1047//
1048// The filter parameter must be a document containing query operators and can be used to select which documents are
1049// included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include all documents.
1050//
1051// The opts parameter can be used to specify options for the operation (see the options.FindOptions documentation).
1052//
1053// For more information about the command, see https://docs.mongodb.com/manual/reference/command/find/.
1054func (coll *Collection) Find(ctx context.Context, filter interface{},
1055	opts ...*options.FindOptions) (*Cursor, error) {
1056
1057	if ctx == nil {
1058		ctx = context.Background()
1059	}
1060
1061	f, err := transformBsoncoreDocument(coll.registry, filter)
1062	if err != nil {
1063		return nil, err
1064	}
1065
1066	sess := sessionFromContext(ctx)
1067	if sess == nil && coll.client.sessionPool != nil {
1068		var err error
1069		sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
1070		if err != nil {
1071			return nil, err
1072		}
1073	}
1074
1075	err = coll.client.validSession(sess)
1076	if err != nil {
1077		closeImplicitSession(sess)
1078		return nil, err
1079	}
1080
1081	rc := coll.readConcern
1082	if sess.TransactionRunning() {
1083		rc = nil
1084	}
1085
1086	selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold)
1087	op := operation.NewFind(f).
1088		Session(sess).ReadConcern(rc).ReadPreference(coll.readPreference).
1089		CommandMonitor(coll.client.monitor).ServerSelector(selector).
1090		ClusterClock(coll.client.clock).Database(coll.db.name).Collection(coll.name).
1091		Deployment(coll.client.deployment).Crypt(coll.client.crypt)
1092
1093	fo := options.MergeFindOptions(opts...)
1094	cursorOpts := driver.CursorOptions{
1095		CommandMonitor: coll.client.monitor,
1096		Crypt:          coll.client.crypt,
1097	}
1098
1099	if fo.AllowPartialResults != nil {
1100		op.AllowPartialResults(*fo.AllowPartialResults)
1101	}
1102	if fo.BatchSize != nil {
1103		cursorOpts.BatchSize = *fo.BatchSize
1104		op.BatchSize(*fo.BatchSize)
1105	}
1106	if fo.Collation != nil {
1107		op.Collation(bsoncore.Document(fo.Collation.ToDocument()))
1108	}
1109	if fo.Comment != nil {
1110		op.Comment(*fo.Comment)
1111	}
1112	if fo.CursorType != nil {
1113		switch *fo.CursorType {
1114		case options.Tailable:
1115			op.Tailable(true)
1116		case options.TailableAwait:
1117			op.Tailable(true)
1118			op.AwaitData(true)
1119		}
1120	}
1121	if fo.Hint != nil {
1122		hint, err := transformValue(coll.registry, fo.Hint)
1123		if err != nil {
1124			closeImplicitSession(sess)
1125			return nil, err
1126		}
1127		op.Hint(hint)
1128	}
1129	if fo.Limit != nil {
1130		limit := *fo.Limit
1131		if limit < 0 {
1132			limit = -1 * limit
1133			op.SingleBatch(true)
1134		}
1135		cursorOpts.Limit = int32(limit)
1136		op.Limit(limit)
1137	}
1138	if fo.Max != nil {
1139		max, err := transformBsoncoreDocument(coll.registry, fo.Max)
1140		if err != nil {
1141			closeImplicitSession(sess)
1142			return nil, err
1143		}
1144		op.Max(max)
1145	}
1146	if fo.MaxAwaitTime != nil {
1147		cursorOpts.MaxTimeMS = int64(*fo.MaxAwaitTime / time.Millisecond)
1148	}
1149	if fo.MaxTime != nil {
1150		op.MaxTimeMS(int64(*fo.MaxTime / time.Millisecond))
1151	}
1152	if fo.Min != nil {
1153		min, err := transformBsoncoreDocument(coll.registry, fo.Min)
1154		if err != nil {
1155			closeImplicitSession(sess)
1156			return nil, err
1157		}
1158		op.Min(min)
1159	}
1160	if fo.NoCursorTimeout != nil {
1161		op.NoCursorTimeout(*fo.NoCursorTimeout)
1162	}
1163	if fo.OplogReplay != nil {
1164		op.OplogReplay(*fo.OplogReplay)
1165	}
1166	if fo.Projection != nil {
1167		proj, err := transformBsoncoreDocument(coll.registry, fo.Projection)
1168		if err != nil {
1169			closeImplicitSession(sess)
1170			return nil, err
1171		}
1172		op.Projection(proj)
1173	}
1174	if fo.ReturnKey != nil {
1175		op.ReturnKey(*fo.ReturnKey)
1176	}
1177	if fo.ShowRecordID != nil {
1178		op.ShowRecordID(*fo.ShowRecordID)
1179	}
1180	if fo.Skip != nil {
1181		op.Skip(*fo.Skip)
1182	}
1183	if fo.Snapshot != nil {
1184		op.Snapshot(*fo.Snapshot)
1185	}
1186	if fo.Sort != nil {
1187		sort, err := transformBsoncoreDocument(coll.registry, fo.Sort)
1188		if err != nil {
1189			closeImplicitSession(sess)
1190			return nil, err
1191		}
1192		op.Sort(sort)
1193	}
1194	retry := driver.RetryNone
1195	if coll.client.retryReads {
1196		retry = driver.RetryOncePerCommand
1197	}
1198	op = op.Retry(retry)
1199
1200	if err = op.Execute(ctx); err != nil {
1201		closeImplicitSession(sess)
1202		return nil, replaceErrors(err)
1203	}
1204
1205	bc, err := op.Result(cursorOpts)
1206	if err != nil {
1207		closeImplicitSession(sess)
1208		return nil, replaceErrors(err)
1209	}
1210	return newCursorWithSession(bc, coll.registry, sess)
1211}
1212
1213// FindOne executes a find command and returns a SingleResult for one document in the collection.
1214//
1215// The filter parameter must be a document containing query operators and can be used to select the document to be
1216// returned. It cannot be nil. If the filter does not match any documents, a SingleResult with an error set to
1217// ErrNoDocuments will be returned. If the filter matches multiple documents, one will be selected from the matched set.
1218//
1219// The opts parameter can be used to specify options for this operation (see the options.FindOneOptions documentation).
1220//
1221// For more information about the command, see https://docs.mongodb.com/manual/reference/command/find/.
1222func (coll *Collection) FindOne(ctx context.Context, filter interface{},
1223	opts ...*options.FindOneOptions) *SingleResult {
1224
1225	if ctx == nil {
1226		ctx = context.Background()
1227	}
1228
1229	findOpts := make([]*options.FindOptions, len(opts))
1230	for i, opt := range opts {
1231		findOpts[i] = &options.FindOptions{
1232			AllowPartialResults: opt.AllowPartialResults,
1233			BatchSize:           opt.BatchSize,
1234			Collation:           opt.Collation,
1235			Comment:             opt.Comment,
1236			CursorType:          opt.CursorType,
1237			Hint:                opt.Hint,
1238			Max:                 opt.Max,
1239			MaxAwaitTime:        opt.MaxAwaitTime,
1240			Min:                 opt.Min,
1241			NoCursorTimeout:     opt.NoCursorTimeout,
1242			OplogReplay:         opt.OplogReplay,
1243			Projection:          opt.Projection,
1244			ReturnKey:           opt.ReturnKey,
1245			ShowRecordID:        opt.ShowRecordID,
1246			Skip:                opt.Skip,
1247			Snapshot:            opt.Snapshot,
1248			Sort:                opt.Sort,
1249		}
1250	}
1251	// Unconditionally send a limit to make sure only one document is returned and the cursor is not kept open
1252	// by the server.
1253	findOpts = append(findOpts, options.Find().SetLimit(-1))
1254
1255	cursor, err := coll.Find(ctx, filter, findOpts...)
1256	return &SingleResult{cur: cursor, reg: coll.registry, err: replaceErrors(err)}
1257}
1258
1259func (coll *Collection) findAndModify(ctx context.Context, op *operation.FindAndModify) *SingleResult {
1260	if ctx == nil {
1261		ctx = context.Background()
1262	}
1263
1264	sess := sessionFromContext(ctx)
1265	var err error
1266	if sess == nil && coll.client.sessionPool != nil {
1267		sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
1268		if err != nil {
1269			return &SingleResult{err: err}
1270		}
1271		defer sess.EndSession()
1272	}
1273
1274	err = coll.client.validSession(sess)
1275	if err != nil {
1276		return &SingleResult{err: err}
1277	}
1278
1279	wc := coll.writeConcern
1280	if sess.TransactionRunning() {
1281		wc = nil
1282	}
1283	if !writeconcern.AckWrite(wc) {
1284		sess = nil
1285	}
1286
1287	selector := makePinnedSelector(sess, coll.writeSelector)
1288
1289	retry := driver.RetryNone
1290	if coll.client.retryWrites {
1291		retry = driver.RetryOnce
1292	}
1293
1294	op = op.Session(sess).
1295		WriteConcern(wc).
1296		CommandMonitor(coll.client.monitor).
1297		ServerSelector(selector).
1298		ClusterClock(coll.client.clock).
1299		Database(coll.db.name).
1300		Collection(coll.name).
1301		Deployment(coll.client.deployment).
1302		Retry(retry).
1303		Crypt(coll.client.crypt)
1304
1305	_, err = processWriteError(op.Execute(ctx))
1306	if err != nil {
1307		return &SingleResult{err: err}
1308	}
1309
1310	return &SingleResult{rdr: bson.Raw(op.Result().Value), reg: coll.registry}
1311}
1312
1313// FindOneAndDelete executes a findAndModify command to delete at most one document in the collection. and returns the
1314// document as it appeared before deletion.
1315//
1316// The filter parameter must be a document containing query operators and can be used to select the document to be
1317// deleted. It cannot be nil. If the filter does not match any documents, a SingleResult with an error set to
1318// ErrNoDocuments wil be returned. If the filter matches multiple documents, one will be selected from the matched set.
1319//
1320// The opts parameter can be used to specify options for the operation (see the options.FindOneAndDeleteOptions
1321// documentation).
1322//
1323// For more information about the command, see https://docs.mongodb.com/manual/reference/command/findAndModify/.
1324func (coll *Collection) FindOneAndDelete(ctx context.Context, filter interface{},
1325	opts ...*options.FindOneAndDeleteOptions) *SingleResult {
1326
1327	f, err := transformBsoncoreDocument(coll.registry, filter)
1328	if err != nil {
1329		return &SingleResult{err: err}
1330	}
1331	fod := options.MergeFindOneAndDeleteOptions(opts...)
1332	op := operation.NewFindAndModify(f).Remove(true)
1333	if fod.Collation != nil {
1334		op = op.Collation(bsoncore.Document(fod.Collation.ToDocument()))
1335	}
1336	if fod.MaxTime != nil {
1337		op = op.MaxTimeMS(int64(*fod.MaxTime / time.Millisecond))
1338	}
1339	if fod.Projection != nil {
1340		proj, err := transformBsoncoreDocument(coll.registry, fod.Projection)
1341		if err != nil {
1342			return &SingleResult{err: err}
1343		}
1344		op = op.Fields(proj)
1345	}
1346	if fod.Sort != nil {
1347		sort, err := transformBsoncoreDocument(coll.registry, fod.Sort)
1348		if err != nil {
1349			return &SingleResult{err: err}
1350		}
1351		op = op.Sort(sort)
1352	}
1353
1354	return coll.findAndModify(ctx, op)
1355}
1356
1357// FindOneAndReplace executes a findAndModify command to replace at most one document in the collection
1358// and returns the document as it appeared before replacement.
1359//
1360// The filter parameter must be a document containing query operators and can be used to select the document to be
1361// replaced. It cannot be nil. If the filter does not match any documents, a SingleResult with an error set to
1362// ErrNoDocuments wil be returned. If the filter matches multiple documents, one will be selected from the matched set.
1363//
1364// The replacement parameter must be a document that will be used to replace the selected document. It cannot be nil
1365// and cannot contain any update operators (https://docs.mongodb.com/manual/reference/operator/update/).
1366//
1367// The opts parameter can be used to specify options for the operation (see the options.FindOneAndReplaceOptions
1368// documentation).
1369//
1370// For more information about the command, see https://docs.mongodb.com/manual/reference/command/findAndModify/.
1371func (coll *Collection) FindOneAndReplace(ctx context.Context, filter interface{},
1372	replacement interface{}, opts ...*options.FindOneAndReplaceOptions) *SingleResult {
1373
1374	f, err := transformBsoncoreDocument(coll.registry, filter)
1375	if err != nil {
1376		return &SingleResult{err: err}
1377	}
1378	r, err := transformBsoncoreDocument(coll.registry, replacement)
1379	if err != nil {
1380		return &SingleResult{err: err}
1381	}
1382	if firstElem, err := r.IndexErr(0); err == nil && strings.HasPrefix(firstElem.Key(), "$") {
1383		return &SingleResult{err: errors.New("replacement document cannot contain keys beginning with '$'")}
1384	}
1385
1386	fo := options.MergeFindOneAndReplaceOptions(opts...)
1387	op := operation.NewFindAndModify(f).Update(bsoncore.Value{Type: bsontype.EmbeddedDocument, Data: r})
1388	if fo.BypassDocumentValidation != nil && *fo.BypassDocumentValidation {
1389		op = op.BypassDocumentValidation(*fo.BypassDocumentValidation)
1390	}
1391	if fo.Collation != nil {
1392		op = op.Collation(bsoncore.Document(fo.Collation.ToDocument()))
1393	}
1394	if fo.MaxTime != nil {
1395		op = op.MaxTimeMS(int64(*fo.MaxTime / time.Millisecond))
1396	}
1397	if fo.Projection != nil {
1398		proj, err := transformBsoncoreDocument(coll.registry, fo.Projection)
1399		if err != nil {
1400			return &SingleResult{err: err}
1401		}
1402		op = op.Fields(proj)
1403	}
1404	if fo.ReturnDocument != nil {
1405		op = op.NewDocument(*fo.ReturnDocument == options.After)
1406	}
1407	if fo.Sort != nil {
1408		sort, err := transformBsoncoreDocument(coll.registry, fo.Sort)
1409		if err != nil {
1410			return &SingleResult{err: err}
1411		}
1412		op = op.Sort(sort)
1413	}
1414	if fo.Upsert != nil {
1415		op = op.Upsert(*fo.Upsert)
1416	}
1417
1418	return coll.findAndModify(ctx, op)
1419}
1420
1421// FindOneAndUpdate executes a findAndModify command to update at most one document in the collection and returns the
1422// document as it appeared before updating.
1423//
1424// The filter parameter must be a document containing query operators and can be used to select the document to be
1425// updated. It cannot be nil. If the filter does not match any documents, a SingleResult with an error set to
1426// ErrNoDocuments wil be returned. If the filter matches multiple documents, one will be selected from the matched set.
1427//
1428// The update parameter must be a document containing update operators
1429// (https://docs.mongodb.com/manual/reference/operator/update/) and can be used to specify the modifications to be made
1430// to the selected document. It cannot be nil or empty.
1431//
1432// The opts parameter can be used to specify options for the operation (see the options.FindOneAndUpdateOptions
1433// documentation).
1434//
1435// For more information about the command, see https://docs.mongodb.com/manual/reference/command/findAndModify/.
1436func (coll *Collection) FindOneAndUpdate(ctx context.Context, filter interface{},
1437	update interface{}, opts ...*options.FindOneAndUpdateOptions) *SingleResult {
1438
1439	if ctx == nil {
1440		ctx = context.Background()
1441	}
1442
1443	f, err := transformBsoncoreDocument(coll.registry, filter)
1444	if err != nil {
1445		return &SingleResult{err: err}
1446	}
1447
1448	fo := options.MergeFindOneAndUpdateOptions(opts...)
1449	op := operation.NewFindAndModify(f)
1450
1451	u, err := transformUpdateValue(coll.registry, update, true)
1452	if err != nil {
1453		return &SingleResult{err: err}
1454	}
1455	op = op.Update(u)
1456
1457	if fo.ArrayFilters != nil {
1458		filtersDoc, err := fo.ArrayFilters.ToArrayDocument()
1459		if err != nil {
1460			return &SingleResult{err: err}
1461		}
1462		op = op.ArrayFilters(bsoncore.Document(filtersDoc))
1463	}
1464	if fo.BypassDocumentValidation != nil && *fo.BypassDocumentValidation {
1465		op = op.BypassDocumentValidation(*fo.BypassDocumentValidation)
1466	}
1467	if fo.Collation != nil {
1468		op = op.Collation(bsoncore.Document(fo.Collation.ToDocument()))
1469	}
1470	if fo.MaxTime != nil {
1471		op = op.MaxTimeMS(int64(*fo.MaxTime / time.Millisecond))
1472	}
1473	if fo.Projection != nil {
1474		proj, err := transformBsoncoreDocument(coll.registry, fo.Projection)
1475		if err != nil {
1476			return &SingleResult{err: err}
1477		}
1478		op = op.Fields(proj)
1479	}
1480	if fo.ReturnDocument != nil {
1481		op = op.NewDocument(*fo.ReturnDocument == options.After)
1482	}
1483	if fo.Sort != nil {
1484		sort, err := transformBsoncoreDocument(coll.registry, fo.Sort)
1485		if err != nil {
1486			return &SingleResult{err: err}
1487		}
1488		op = op.Sort(sort)
1489	}
1490	if fo.Upsert != nil {
1491		op = op.Upsert(*fo.Upsert)
1492	}
1493
1494	return coll.findAndModify(ctx, op)
1495}
1496
1497// Watch returns a change stream for all changes on the corresponding collection. See
1498// https://docs.mongodb.com/manual/changeStreams/ for more information about change streams.
1499//
1500// The Collection must be configured with read concern majority or no read concern for a change stream to be created
1501// successfully.
1502//
1503// The pipeline parameter must be an array of documents, each representing a pipeline stage. The pipeline cannot be
1504// nil but can be empty. The stage documents must all be non-nil. See https://docs.mongodb.com/manual/changeStreams/ for
1505// a list of pipeline stages that can be used with change streams. For a pipeline of bson.D documents, the
1506// mongo.Pipeline{} type can be used.
1507//
1508// The opts parameter can be used to specify options for change stream creation (see the options.ChangeStreamOptions
1509// documentation).
1510func (coll *Collection) Watch(ctx context.Context, pipeline interface{},
1511	opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
1512
1513	csConfig := changeStreamConfig{
1514		readConcern:    coll.readConcern,
1515		readPreference: coll.readPreference,
1516		client:         coll.client,
1517		registry:       coll.registry,
1518		streamType:     CollectionStream,
1519		collectionName: coll.Name(),
1520		databaseName:   coll.db.Name(),
1521	}
1522	return newChangeStream(ctx, csConfig, pipeline, opts...)
1523}
1524
1525// Indexes returns an IndexView instance that can be used to perform operations on the indexes for the collection.
1526func (coll *Collection) Indexes() IndexView {
1527	return IndexView{coll: coll}
1528}
1529
1530// Drop drops the collection on the server. This method ignores "namespace not found" errors so it is safe to drop
1531// a collection that does not exist on the server.
1532func (coll *Collection) Drop(ctx context.Context) error {
1533	if ctx == nil {
1534		ctx = context.Background()
1535	}
1536
1537	sess := sessionFromContext(ctx)
1538	if sess == nil && coll.client.sessionPool != nil {
1539		var err error
1540		sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
1541		if err != nil {
1542			return err
1543		}
1544		defer sess.EndSession()
1545	}
1546
1547	err := coll.client.validSession(sess)
1548	if err != nil {
1549		return err
1550	}
1551
1552	wc := coll.writeConcern
1553	if sess.TransactionRunning() {
1554		wc = nil
1555	}
1556	if !writeconcern.AckWrite(wc) {
1557		sess = nil
1558	}
1559
1560	selector := makePinnedSelector(sess, coll.writeSelector)
1561
1562	op := operation.NewDropCollection().
1563		Session(sess).WriteConcern(wc).CommandMonitor(coll.client.monitor).
1564		ServerSelector(selector).ClusterClock(coll.client.clock).
1565		Database(coll.db.name).Collection(coll.name).
1566		Deployment(coll.client.deployment).Crypt(coll.client.crypt)
1567	err = op.Execute(ctx)
1568
1569	// ignore namespace not found erorrs
1570	driverErr, ok := err.(driver.Error)
1571	if !ok || (ok && !driverErr.NamespaceNotFound()) {
1572		return replaceErrors(err)
1573	}
1574	return nil
1575}
1576
1577// makePinnedSelector makes a selector for a pinned session with a pinned server. Will attempt to do server selection on
1578// the pinned server but if that fails it will go through a list of default selectors
1579func makePinnedSelector(sess *session.Client, defaultSelector description.ServerSelector) description.ServerSelectorFunc {
1580	return func(t description.Topology, svrs []description.Server) ([]description.Server, error) {
1581		if sess != nil && sess.PinnedServer != nil {
1582			return sess.PinnedServer.SelectServer(t, svrs)
1583		}
1584
1585		return defaultSelector.SelectServer(t, svrs)
1586	}
1587}
1588
1589func makeReadPrefSelector(sess *session.Client, selector description.ServerSelector, localThreshold time.Duration) description.ServerSelectorFunc {
1590	if sess != nil && sess.TransactionRunning() {
1591		selector = description.CompositeSelector([]description.ServerSelector{
1592			description.ReadPrefSelector(sess.CurrentRp),
1593			description.LatencySelector(localThreshold),
1594		})
1595	}
1596
1597	return makePinnedSelector(sess, selector)
1598}
1599