1package driver
2
3import (
4	"bytes"
5	"context"
6	"errors"
7	"fmt"
8	"strconv"
9	"time"
10
11	"go.mongodb.org/mongo-driver/bson"
12	"go.mongodb.org/mongo-driver/bson/bsontype"
13	"go.mongodb.org/mongo-driver/bson/primitive"
14	"go.mongodb.org/mongo-driver/event"
15	"go.mongodb.org/mongo-driver/mongo/readconcern"
16	"go.mongodb.org/mongo-driver/mongo/readpref"
17	"go.mongodb.org/mongo-driver/mongo/writeconcern"
18	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
19	"go.mongodb.org/mongo-driver/x/mongo/driver/description"
20	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
21	"go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage"
22)
23
24const defaultLocalThreshold = 15 * time.Millisecond
25
26var dollarCmd = [...]byte{'.', '$', 'c', 'm', 'd'}
27
28var (
29	// ErrNoDocCommandResponse occurs when the server indicated a response existed, but none was found.
30	ErrNoDocCommandResponse = errors.New("command returned no documents")
31	// ErrMultiDocCommandResponse occurs when the server sent multiple documents in response to a command.
32	ErrMultiDocCommandResponse = errors.New("command returned multiple documents")
33	// ErrReplyDocumentMismatch occurs when the number of documents returned in an OP_QUERY does not match the numberReturned field.
34	ErrReplyDocumentMismatch = errors.New("number of documents returned does not match numberReturned field")
35	// ErrNonPrimaryReadPref is returned when a read is attempted in a transaction with a non-primary read preference.
36	ErrNonPrimaryReadPref = errors.New("read preference in a transaction must be primary")
37)
38
39const (
40	// maximum BSON object size when client side encryption is enabled
41	cryptMaxBsonObjectSize uint32 = 2097152
42	// minimum wire version necessary to use automatic encryption
43	cryptMinWireVersion int32 = 8
44)
45
46// InvalidOperationError is returned from Validate and indicates that a required field is missing
47// from an instance of Operation.
48type InvalidOperationError struct{ MissingField string }
49
50func (err InvalidOperationError) Error() string {
51	return "the " + err.MissingField + " field must be set on Operation"
52}
53
54// opReply stores information returned in an OP_REPLY response from the server.
55// The err field stores any error that occurred when decoding or validating the OP_REPLY response.
56type opReply struct {
57	responseFlags wiremessage.ReplyFlag
58	cursorID      int64
59	startingFrom  int32
60	numReturned   int32
61	documents     []bsoncore.Document
62	err           error
63}
64
65// startedInformation keeps track of all of the information necessary for monitoring started events.
66type startedInformation struct {
67	cmd                      bsoncore.Document
68	requestID                int32
69	cmdName                  string
70	documentSequenceIncluded bool
71	connID                   string
72}
73
74// finishedInformation keeps track of all of the information necessary for monitoring success and failure events.
75type finishedInformation struct {
76	cmdName   string
77	requestID int32
78	response  bsoncore.Document
79	cmdErr    error
80	connID    string
81	startTime time.Time
82}
83
84// Operation is used to execute an operation. It contains all of the common code required to
85// select a server, transform an operation into a command, write the command to a connection from
86// the selected server, read a response from that connection, process the response, and potentially
87// retry.
88//
89// The required fields are Database, CommandFn, and Deployment. All other fields are optional.
90//
91// While an Operation can be constructed manually, drivergen should be used to generate an
92// implementation of an operation instead. This will ensure that there are helpers for constructing
93// the operation and that this type isn't configured incorrectly.
94type Operation struct {
95	// CommandFn is used to create the command that will be wrapped in a wire message and sent to
96	// the server. This function should only add the elements of the command and not start or end
97	// the enclosing BSON document. Per the command API, the first element must be the name of the
98	// command to run. This field is required.
99	CommandFn func(dst []byte, desc description.SelectedServer) ([]byte, error)
100
101	// Database is the database that the command will be run against. This field is required.
102	Database string
103
104	// Deployment is the MongoDB Deployment to use. While most of the time this will be multiple
105	// servers, commands that need to run against a single, preselected server can use the
106	// SingleServerDeployment type. Commands that need to run on a preselected connection can use
107	// the SingleConnectionDeployment type.
108	Deployment Deployment
109
110	// ProcessResponseFn is called after a response to the command is returned. The server is
111	// provided for types like Cursor that are required to run subsequent commands using the same
112	// server.
113	ProcessResponseFn func(response bsoncore.Document, srvr Server, desc description.Server) error
114
115	// Selector is the server selector that's used during both initial server selection and
116	// subsequent selection for retries. Depending on the Deployment implementation, the
117	// SelectServer method may not actually be called.
118	Selector description.ServerSelector
119
120	// ReadPreference is the read preference that will be attached to the command. If this field is
121	// not specified a default read preference of primary will be used.
122	ReadPreference *readpref.ReadPref
123
124	// ReadConcern is the read concern used when running read commands. This field should not be set
125	// for write operations. If this field is set, it will be encoded onto the commands sent to the
126	// server.
127	ReadConcern *readconcern.ReadConcern
128
129	// MinimumReadConcernWireVersion specifies the minimum wire version to add the read concern to
130	// the command being executed.
131	MinimumReadConcernWireVersion int32
132
133	// WriteConcern is the write concern used when running write commands. This field should not be
134	// set for read operations. If this field is set, it will be encoded onto the commands sent to
135	// the server.
136	WriteConcern *writeconcern.WriteConcern
137
138	// MinimumWriteConcernWireVersion specifies the minimum wire version to add the write concern to
139	// the command being executed.
140	MinimumWriteConcernWireVersion int32
141
142	// Client is the session used with this operation. This can be either an implicit or explicit
143	// session. If the server selected does not support sessions and Client is specified the
144	// behavior depends on the session type. If the session is implicit, the session fields will not
145	// be encoded onto the command. If the session is explicit, an error will be returned. The
146	// caller is responsible for ensuring that this field is nil if the Deployment does not support
147	// sessions.
148	Client *session.Client
149
150	// Clock is a cluster clock, different from the one contained within a session.Client. This
151	// allows updating cluster times for a global cluster clock while allowing individual session's
152	// cluster clocks to be only updated as far as the last command that's been run.
153	Clock *session.ClusterClock
154
155	// RetryMode specifies how to retry. There are three modes that enable retry: RetryOnce,
156	// RetryOncePerCommand, and RetryContext. For more information about what these modes do, please
157	// refer to their definitions. Both RetryMode and Type must be set for retryability to be enabled.
158	RetryMode *RetryMode
159
160	// Type specifies the kind of operation this is. There is only one mode that enables retry: Write.
161	// For more information about what this mode does, please refer to it's definition. Both Type and
162	// RetryMode must be set for retryability to be enabled.
163	Type Type
164
165	// Batches contains the documents that are split when executing a write command that potentially
166	// has more documents than can fit in a single command. This should only be specified for
167	// commands that are batch compatible. For more information, please refer to the definition of
168	// Batches.
169	Batches *Batches
170
171	// Legacy sets the legacy type for this operation. There are only 3 types that require legacy
172	// support: find, getMore, and killCursors. For more information about LegacyOperationKind,
173	// please refer to it's definition.
174	Legacy LegacyOperationKind
175
176	// CommandMonitor specifies the monitor to use for APM events. If this field is not set,
177	// no events will be reported.
178	CommandMonitor *event.CommandMonitor
179
180	// Crypt specifies a Crypt object to use for automatic client side encryption and decryption.
181	Crypt *Crypt
182}
183
184// shouldEncrypt returns true if this operation should automatically be encrypted.
185func (op Operation) shouldEncrypt() bool {
186	return op.Crypt != nil && !op.Crypt.BypassAutoEncryption
187}
188
189// selectServer handles performing server selection for an operation.
190func (op Operation) selectServer(ctx context.Context) (Server, error) {
191	if err := op.Validate(); err != nil {
192		return nil, err
193	}
194
195	select {
196	case <-ctx.Done():
197		return nil, ctx.Err()
198	default:
199	}
200
201	selector := op.Selector
202	if selector == nil {
203		rp := op.ReadPreference
204		if rp == nil {
205			rp = readpref.Primary()
206		}
207		selector = description.CompositeSelector([]description.ServerSelector{
208			description.ReadPrefSelector(rp),
209			description.LatencySelector(defaultLocalThreshold),
210		})
211	}
212
213	return op.Deployment.SelectServer(ctx, selector)
214}
215
216// Validate validates this operation, ensuring the fields are set properly.
217func (op Operation) Validate() error {
218	if op.CommandFn == nil {
219		return InvalidOperationError{MissingField: "CommandFn"}
220	}
221	if op.Deployment == nil {
222		return InvalidOperationError{MissingField: "Deployment"}
223	}
224	if op.Database == "" {
225		return InvalidOperationError{MissingField: "Database"}
226	}
227	if op.Client != nil && !writeconcern.AckWrite(op.WriteConcern) {
228		return errors.New("session provided for an unacknowledged write")
229	}
230	return nil
231}
232
233// Execute runs this operation. The scratch parameter will be used and overwritten (potentially many
234// times), this should mainly be used to enable pooling of byte slices.
235func (op Operation) Execute(ctx context.Context, scratch []byte) error {
236	err := op.Validate()
237	if err != nil {
238		return err
239	}
240
241	srvr, err := op.selectServer(ctx)
242	if err != nil {
243		return err
244	}
245
246	conn, err := srvr.Connection(ctx)
247	if err != nil {
248		return err
249	}
250	defer conn.Close()
251
252	desc := description.SelectedServer{Server: conn.Description(), Kind: op.Deployment.Kind()}
253	scratch = scratch[:0]
254
255	if desc.WireVersion == nil || desc.WireVersion.Max < 4 {
256		switch op.Legacy {
257		case LegacyFind:
258			return op.legacyFind(ctx, scratch, srvr, conn, desc)
259		case LegacyGetMore:
260			return op.legacyGetMore(ctx, scratch, srvr, conn, desc)
261		case LegacyKillCursors:
262			return op.legacyKillCursors(ctx, scratch, srvr, conn, desc)
263		}
264	}
265	if desc.WireVersion == nil || desc.WireVersion.Max < 3 {
266		switch op.Legacy {
267		case LegacyListCollections:
268			return op.legacyListCollections(ctx, scratch, srvr, conn, desc)
269		case LegacyListIndexes:
270			return op.legacyListIndexes(ctx, scratch, srvr, conn, desc)
271		}
272	}
273
274	var res bsoncore.Document
275	var operationErr WriteCommandError
276	var original error
277	var retries int
278	retryable := op.retryable(desc.Server)
279	if retryable && op.RetryMode != nil {
280		switch op.Type {
281		case Write:
282			if op.Client == nil {
283				break
284			}
285			switch *op.RetryMode {
286			case RetryOnce, RetryOncePerCommand:
287				retries = 1
288			case RetryContext:
289				retries = -1
290			}
291
292			op.Client.RetryWrite = false
293			if *op.RetryMode > RetryNone {
294				op.Client.RetryWrite = true
295				if !op.Client.Committing && !op.Client.Aborting {
296					op.Client.IncrementTxnNumber()
297				}
298			}
299
300		case Read:
301			switch *op.RetryMode {
302			case RetryOnce, RetryOncePerCommand:
303				retries = 1
304			case RetryContext:
305				retries = -1
306			}
307		}
308	}
309	batching := op.Batches.Valid()
310	for {
311		if batching {
312			targetBatchSize := desc.MaxDocumentSize
313			maxDocSize := desc.MaxDocumentSize
314			if op.shouldEncrypt() {
315				// For client-side encryption, we want the batch to be split at 2 MiB instead of 16MiB.
316				// If there's only one document in the batch, it can be up to 16MiB, so we set target batch size to
317				// 2MiB but max document size to 16MiB. This will allow the AdvanceBatch call to create a batch
318				// with a single large document.
319				targetBatchSize = cryptMaxBsonObjectSize
320			}
321
322			err = op.Batches.AdvanceBatch(int(desc.MaxBatchCount), int(targetBatchSize), int(maxDocSize))
323			if err != nil {
324				// TODO(GODRIVER-982): Should we also be returning operationErr?
325				return err
326			}
327		}
328
329		// convert to wire message
330		if len(scratch) > 0 {
331			scratch = scratch[:0]
332		}
333		wm, startedInfo, err := op.createWireMessage(ctx, scratch, desc)
334		if err != nil {
335			return err
336		}
337
338		// set extra data and send event if possible
339		startedInfo.connID = conn.ID()
340		startedInfo.cmdName = op.getCommandName(startedInfo.cmd)
341		op.publishStartedEvent(ctx, startedInfo)
342
343		// get the moreToCome flag information before we compress
344		moreToCome := wiremessage.IsMsgMoreToCome(wm)
345
346		// compress wiremessage if allowed
347		if compressor, ok := conn.(Compressor); ok && op.canCompress(startedInfo.cmdName) {
348			wm, err = compressor.CompressWireMessage(wm, nil)
349			if err != nil {
350				return err
351			}
352		}
353
354		finishedInfo := finishedInformation{
355			cmdName:   startedInfo.cmdName,
356			requestID: startedInfo.requestID,
357			startTime: time.Now(),
358			connID:    startedInfo.connID,
359		}
360
361		// roundtrip using either the full roundTripper or a special one for when the moreToCome
362		// flag is set
363		var roundTrip = op.roundTrip
364		if moreToCome {
365			roundTrip = op.moreToComeRoundTrip
366		}
367		res, err = roundTrip(ctx, conn, wm)
368		if ep, ok := srvr.(ErrorProcessor); ok {
369			ep.ProcessError(err)
370		}
371
372		finishedInfo.response = res
373		finishedInfo.cmdErr = err
374		op.publishFinishedEvent(ctx, finishedInfo)
375
376		// Pull out $clusterTime and operationTime and update session and clock. We handle this before
377		// handling the error to ensure we are properly gossiping the cluster time.
378		op.updateClusterTimes(res)
379		op.updateOperationTime(res)
380		op.Client.UpdateRecoveryToken(bson.Raw(res))
381
382		// automatically attempt to decrypt all results if client side encryption enabled
383		if op.Crypt != nil {
384			// use decryptErr isntead of err because err is used below for retrying
385			var decryptErr error
386			res, decryptErr = op.Crypt.Decrypt(ctx, res)
387			if decryptErr != nil {
388				return decryptErr
389			}
390		}
391		var perr error
392		if op.ProcessResponseFn != nil {
393			perr = op.ProcessResponseFn(res, srvr, desc.Server)
394		}
395		switch tt := err.(type) {
396		case WriteCommandError:
397			if e := err.(WriteCommandError); retryable && op.Type == Write && e.UnsupportedStorageEngine() {
398				return ErrUnsupportedStorageEngine
399			}
400			if retryable && tt.Retryable() && retries != 0 {
401				retries--
402				original, err = err, nil
403				conn.Close() // Avoid leaking the connection.
404				srvr, err = op.selectServer(ctx)
405				if err != nil {
406					return original
407				}
408				conn, err = srvr.Connection(ctx)
409				if err != nil || conn == nil || !op.retryable(conn.Description()) {
410					if conn != nil {
411						conn.Close()
412					}
413					return original
414				}
415				defer conn.Close() // Avoid leaking the new connection.
416				if op.Client != nil && op.Client.Committing {
417					// Apply majority write concern for retries
418					op.Client.UpdateCommitTransactionWriteConcern()
419					op.WriteConcern = op.Client.CurrentWc
420				}
421				continue
422			}
423			// If batching is enabled and either ordered is the default (which is true) or
424			// explicitly set to true and we have write errors, return the errors.
425			if batching && (op.Batches.Ordered == nil || *op.Batches.Ordered == true) && len(tt.WriteErrors) > 0 {
426				return tt
427			}
428			if op.Client != nil && op.Client.Committing && tt.WriteConcernError != nil {
429				// When running commitTransaction we return WriteConcernErrors as an Error.
430				err := Error{
431					Name:    tt.WriteConcernError.Name,
432					Code:    int32(tt.WriteConcernError.Code),
433					Message: tt.WriteConcernError.Message,
434				}
435				if err.Code == 64 || err.Code == 50 || tt.WriteConcernError.Retryable() {
436					err.Labels = []string{UnknownTransactionCommitResult}
437				}
438				return err
439			}
440			operationErr.WriteConcernError = tt.WriteConcernError
441			operationErr.WriteErrors = append(operationErr.WriteErrors, tt.WriteErrors...)
442		case Error:
443			if tt.HasErrorLabel(TransientTransactionError) || tt.HasErrorLabel(UnknownTransactionCommitResult) {
444				op.Client.ClearPinnedServer()
445			}
446			if e := err.(Error); retryable && op.Type == Write && e.UnsupportedStorageEngine() {
447				return ErrUnsupportedStorageEngine
448			}
449			if retryable && tt.Retryable() && retries != 0 {
450				retries--
451				original, err = err, nil
452				conn.Close() // Avoid leaking the connection.
453				srvr, err = op.selectServer(ctx)
454				if err != nil {
455					return original
456				}
457				conn, err = srvr.Connection(ctx)
458				if err != nil || conn == nil || !op.retryable(conn.Description()) {
459					if conn != nil {
460						conn.Close()
461					}
462					return original
463				}
464				defer conn.Close() // Avoid leaking the new connection.
465				if op.Client != nil && op.Client.Committing {
466					// Apply majority write concern for retries
467					op.Client.UpdateCommitTransactionWriteConcern()
468					op.WriteConcern = op.Client.CurrentWc
469				}
470				continue
471			}
472			if op.Client != nil && op.Client.Committing && (tt.Retryable() || tt.Code == 50) {
473				// If we got a retryable error or MaxTimeMSExpired error, we add UnknownTransactionCommitResult.
474				tt.Labels = append(tt.Labels, UnknownTransactionCommitResult)
475			}
476			return tt
477		case nil:
478			if moreToCome {
479				return ErrUnacknowledgedWrite
480			}
481			if perr != nil {
482				return perr
483			}
484		default:
485			return err
486		}
487
488		if batching && len(op.Batches.Documents) > 0 {
489			if retryable && op.Client != nil && op.RetryMode != nil {
490				if *op.RetryMode > RetryNone {
491					op.Client.IncrementTxnNumber()
492				}
493				if *op.RetryMode == RetryOncePerCommand {
494					retries = 1
495				}
496			}
497			op.Batches.ClearBatch()
498			continue
499		}
500		break
501	}
502	if len(operationErr.WriteErrors) > 0 || operationErr.WriteConcernError != nil {
503		return operationErr
504	}
505	return nil
506}
507
508// Retryable writes are supported if the server supports sessions, the operation is not
509// within a transaction, and the write is acknowledged
510func (op Operation) retryable(desc description.Server) bool {
511	switch op.Type {
512	case Write:
513		if op.Client != nil && (op.Client.Committing || op.Client.Aborting) {
514			return true
515		}
516		if op.Deployment.SupportsRetryWrites() &&
517			desc.WireVersion != nil && desc.WireVersion.Max >= 6 &&
518			op.Client != nil && !(op.Client.TransactionInProgress() || op.Client.TransactionStarting()) &&
519			writeconcern.AckWrite(op.WriteConcern) {
520			return true
521		}
522	case Read:
523		if op.Client != nil && (op.Client.Committing || op.Client.Aborting) {
524			return true
525		}
526		if desc.WireVersion != nil && desc.WireVersion.Max >= 6 &&
527			(op.Client == nil || !(op.Client.TransactionInProgress() || op.Client.TransactionStarting())) {
528			return true
529		}
530	}
531	return false
532}
533
534// roundTrip writes a wiremessage to the connection and then reads a wiremessage. The wm parameter
535// is reused when reading the wiremessage.
536func (op Operation) roundTrip(ctx context.Context, conn Connection, wm []byte) ([]byte, error) {
537	err := conn.WriteWireMessage(ctx, wm)
538	if err != nil {
539		labels := []string{NetworkError}
540		if op.Client != nil {
541			op.Client.MarkDirty()
542		}
543		if op.Client != nil && op.Client.TransactionRunning() && !op.Client.Committing {
544			labels = append(labels, TransientTransactionError)
545		}
546		if op.Client != nil && op.Client.Committing {
547			labels = append(labels, UnknownTransactionCommitResult)
548		}
549		return nil, Error{Message: err.Error(), Labels: labels, Wrapped: err}
550	}
551
552	wm, err = conn.ReadWireMessage(ctx, wm[:0])
553	if err != nil {
554		labels := []string{NetworkError}
555		if op.Client != nil {
556			op.Client.MarkDirty()
557		}
558		if op.Client != nil && op.Client.TransactionRunning() && !op.Client.Committing {
559			labels = append(labels, TransientTransactionError)
560		}
561		if op.Client != nil && op.Client.Committing {
562			labels = append(labels, UnknownTransactionCommitResult)
563		}
564		return nil, Error{Message: err.Error(), Labels: labels, Wrapped: err}
565	}
566
567	// decompress wiremessage
568	wm, err = op.decompressWireMessage(wm)
569	if err != nil {
570		return nil, err
571	}
572
573	// decode
574	res, err := op.decodeResult(wm)
575	// Pull out $clusterTime and operationTime and update session and clock. We handle this before
576	// handling the error to ensure we are properly gossiping the cluster time.
577	op.updateClusterTimes(res)
578	op.updateOperationTime(res)
579
580	return res, err
581}
582
583// moreToComeRoundTrip writes a wiremessage to the provided connection. This is used when an OP_MSG is
584// being sent with  the moreToCome bit set.
585func (op *Operation) moreToComeRoundTrip(ctx context.Context, conn Connection, wm []byte) ([]byte, error) {
586	err := conn.WriteWireMessage(ctx, wm)
587	if err != nil {
588		if op.Client != nil {
589			op.Client.MarkDirty()
590		}
591		err = Error{Message: err.Error(), Labels: []string{TransientTransactionError, NetworkError}, Wrapped: err}
592	}
593	return bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "ok", 1)), err
594}
595
596// decompressWireMessage handles decompressing a wiremessage. If the wiremessage
597// is not compressed, this method will return the wiremessage.
598func (Operation) decompressWireMessage(wm []byte) ([]byte, error) {
599	// read the header and ensure this is a compressed wire message
600	length, reqid, respto, opcode, rem, ok := wiremessage.ReadHeader(wm)
601	if !ok || len(wm) < int(length) {
602		return nil, errors.New("malformed wire message: insufficient bytes")
603	}
604	if opcode != wiremessage.OpCompressed {
605		return wm, nil
606	}
607	// get the original opcode and uncompressed size
608	opcode, rem, ok = wiremessage.ReadCompressedOriginalOpCode(rem)
609	if !ok {
610		return nil, errors.New("malformed OP_COMPRESSED: missing original opcode")
611	}
612	uncompressedSize, rem, ok := wiremessage.ReadCompressedUncompressedSize(rem)
613	if !ok {
614		return nil, errors.New("malformed OP_COMPRESSED: missing uncompressed size")
615	}
616	// get the compressor ID and decompress the message
617	compressorID, rem, ok := wiremessage.ReadCompressedCompressorID(rem)
618	if !ok {
619		return nil, errors.New("malformed OP_COMPRESSED: missing compressor ID")
620	}
621	compressedSize := length - 25 // header (16) + original opcode (4) + uncompressed size (4) + compressor ID (1)
622	// return the original wiremessage
623	msg, rem, ok := wiremessage.ReadCompressedCompressedMessage(rem, compressedSize)
624	if !ok {
625		return nil, errors.New("malformed OP_COMPRESSED: insufficient bytes for compressed wiremessage")
626	}
627
628	header := make([]byte, 0, uncompressedSize+16)
629	header = wiremessage.AppendHeader(header, uncompressedSize+16, reqid, respto, opcode)
630	opts := CompressionOpts{
631		Compressor:       compressorID,
632		UncompressedSize: uncompressedSize,
633	}
634	uncompressed, err := DecompressPayload(msg, opts)
635	if err != nil {
636		return nil, err
637	}
638
639	return append(header, uncompressed...), nil
640}
641
642func (op Operation) createWireMessage(ctx context.Context, dst []byte,
643	desc description.SelectedServer) ([]byte, startedInformation, error) {
644
645	if desc.WireVersion == nil || desc.WireVersion.Max < wiremessage.OpmsgWireVersion {
646		return op.createQueryWireMessage(dst, desc)
647	}
648	return op.createMsgWireMessage(ctx, dst, desc)
649}
650
651func (op Operation) addBatchArray(dst []byte) []byte {
652	aidx, dst := bsoncore.AppendArrayElementStart(dst, op.Batches.Identifier)
653	for i, doc := range op.Batches.Current {
654		dst = bsoncore.AppendDocumentElement(dst, strconv.Itoa(i), doc)
655	}
656	dst, _ = bsoncore.AppendArrayEnd(dst, aidx)
657	return dst
658}
659
660func (op Operation) createQueryWireMessage(dst []byte, desc description.SelectedServer) ([]byte, startedInformation, error) {
661	var info startedInformation
662	flags := op.slaveOK(desc)
663	var wmindex int32
664	info.requestID = wiremessage.NextRequestID()
665	wmindex, dst = wiremessage.AppendHeaderStart(dst, info.requestID, 0, wiremessage.OpQuery)
666	dst = wiremessage.AppendQueryFlags(dst, flags)
667	// FullCollectionName
668	dst = append(dst, op.Database...)
669	dst = append(dst, dollarCmd[:]...)
670	dst = append(dst, 0x00)
671	dst = wiremessage.AppendQueryNumberToSkip(dst, 0)
672	dst = wiremessage.AppendQueryNumberToReturn(dst, -1)
673
674	wrapper := int32(-1)
675	rp, err := op.createReadPref(desc.Server.Kind, desc.Kind, true)
676	if err != nil {
677		return dst, info, err
678	}
679	if len(rp) > 0 {
680		wrapper, dst = bsoncore.AppendDocumentStart(dst)
681		dst = bsoncore.AppendHeader(dst, bsontype.EmbeddedDocument, "$query")
682	}
683	idx, dst := bsoncore.AppendDocumentStart(dst)
684	dst, err = op.CommandFn(dst, desc)
685	if err != nil {
686		return dst, info, err
687	}
688
689	if op.Batches != nil && len(op.Batches.Current) > 0 {
690		dst = op.addBatchArray(dst)
691	}
692
693	dst, err = op.addReadConcern(dst, desc)
694	if err != nil {
695		return dst, info, err
696	}
697
698	dst, err = op.addWriteConcern(dst, desc)
699	if err != nil {
700		return dst, info, err
701	}
702
703	dst, err = op.addSession(dst, desc)
704	if err != nil {
705		return dst, info, err
706	}
707
708	dst = op.addClusterTime(dst, desc)
709
710	dst, _ = bsoncore.AppendDocumentEnd(dst, idx)
711	// Command monitoring only reports the document inside $query
712	info.cmd = dst[idx:]
713
714	if len(rp) > 0 {
715		var err error
716		dst = bsoncore.AppendDocumentElement(dst, "$readPreference", rp)
717		dst, err = bsoncore.AppendDocumentEnd(dst, wrapper)
718		if err != nil {
719			return dst, info, err
720		}
721	}
722
723	return bsoncore.UpdateLength(dst, wmindex, int32(len(dst[wmindex:]))), info, nil
724}
725
726func (op Operation) createMsgWireMessage(ctx context.Context, dst []byte, desc description.SelectedServer) ([]byte, startedInformation, error) {
727	var info startedInformation
728	var flags wiremessage.MsgFlag
729	var wmindex int32
730	// We set the MoreToCome bit if we have a write concern, it's unacknowledged, and we either
731	// aren't batching or we are encoding the last batch.
732	if op.WriteConcern != nil && !writeconcern.AckWrite(op.WriteConcern) && (op.Batches == nil || len(op.Batches.Documents) == 0) {
733		flags = wiremessage.MoreToCome
734	}
735	info.requestID = wiremessage.NextRequestID()
736	wmindex, dst = wiremessage.AppendHeaderStart(dst, info.requestID, 0, wiremessage.OpMsg)
737	dst = wiremessage.AppendMsgFlags(dst, flags)
738	// Body
739	dst = wiremessage.AppendMsgSectionType(dst, wiremessage.SingleDocument)
740
741	idx, dst := bsoncore.AppendDocumentStart(dst)
742
743	dst, err := op.addCommandFields(ctx, dst, desc)
744	if err != nil {
745		return dst, info, err
746	}
747	dst, err = op.addReadConcern(dst, desc)
748	if err != nil {
749		return dst, info, err
750	}
751	dst, err = op.addWriteConcern(dst, desc)
752	if err != nil {
753		return dst, info, err
754	}
755
756	dst, err = op.addSession(dst, desc)
757	if err != nil {
758		return dst, info, err
759	}
760
761	dst = op.addClusterTime(dst, desc)
762
763	dst = bsoncore.AppendStringElement(dst, "$db", op.Database)
764	rp, err := op.createReadPref(desc.Server.Kind, desc.Kind, false)
765	if err != nil {
766		return dst, info, err
767	}
768	if len(rp) > 0 {
769		dst = bsoncore.AppendDocumentElement(dst, "$readPreference", rp)
770	}
771
772	dst, _ = bsoncore.AppendDocumentEnd(dst, idx)
773	// The command document for monitoring shouldn't include the type 1 payload as a document sequence
774	info.cmd = dst[idx:]
775
776	// add batch as a document sequence if auto encryption is not enabled
777	// if auto encryption is enabled, the batch will already be an array in the command document
778	if !op.shouldEncrypt() && op.Batches != nil && len(op.Batches.Current) > 0 {
779		info.documentSequenceIncluded = true
780		dst = wiremessage.AppendMsgSectionType(dst, wiremessage.DocumentSequence)
781		idx, dst = bsoncore.ReserveLength(dst)
782
783		dst = append(dst, op.Batches.Identifier...)
784		dst = append(dst, 0x00)
785
786		for _, doc := range op.Batches.Current {
787			dst = append(dst, doc...)
788		}
789
790		dst = bsoncore.UpdateLength(dst, idx, int32(len(dst[idx:])))
791	}
792
793	return bsoncore.UpdateLength(dst, wmindex, int32(len(dst[wmindex:]))), info, nil
794}
795
796// addCommandFields adds the fields for a command to the wire message in dst. This assumes that the start of the document
797// has already been added and does not add the final 0 byte.
798func (op Operation) addCommandFields(ctx context.Context, dst []byte, desc description.SelectedServer) ([]byte, error) {
799	if !op.shouldEncrypt() {
800		return op.CommandFn(dst, desc)
801	}
802
803	if desc.WireVersion.Max < cryptMinWireVersion {
804		return dst, errors.New("auto-encryption requires a MongoDB version of 4.2")
805	}
806
807	// create temporary command document
808	cidx, cmdDst := bsoncore.AppendDocumentStart(nil)
809	var err error
810	cmdDst, err = op.CommandFn(cmdDst, desc)
811	if err != nil {
812		return dst, err
813	}
814	// use a BSON array instead of a type 1 payload because mongocryptd will convert to arrays regardless
815	if op.Batches != nil && len(op.Batches.Current) > 0 {
816		cmdDst = op.addBatchArray(cmdDst)
817	}
818	cmdDst, _ = bsoncore.AppendDocumentEnd(cmdDst, cidx)
819
820	// encrypt the command
821	encrypted, err := op.Crypt.Encrypt(ctx, op.Database, cmdDst)
822	if err != nil {
823		return dst, err
824	}
825	// append encrypted command to original destination, removing the first 4 bytes (length) and final byte (terminator)
826	dst = append(dst, encrypted[4:len(encrypted)-1]...)
827	return dst, nil
828}
829
830func (op Operation) addReadConcern(dst []byte, desc description.SelectedServer) ([]byte, error) {
831	if op.MinimumReadConcernWireVersion > 0 && (desc.WireVersion == nil || !desc.WireVersion.Includes(op.MinimumReadConcernWireVersion)) {
832		return dst, nil
833	}
834	rc := op.ReadConcern
835	client := op.Client
836	// Starting transaction's read concern overrides all others
837	if client != nil && client.TransactionStarting() && client.CurrentRc != nil {
838		rc = client.CurrentRc
839	}
840
841	// start transaction must append afterclustertime IF causally consistent and operation time exists
842	if rc == nil && client != nil && client.TransactionStarting() && client.Consistent && client.OperationTime != nil {
843		rc = readconcern.New()
844	}
845
846	if rc == nil {
847		return dst, nil
848	}
849
850	_, data, err := rc.MarshalBSONValue() // always returns a document
851	if err != nil {
852		return dst, err
853	}
854
855	if description.SessionsSupported(desc.WireVersion) && client != nil && client.Consistent && client.OperationTime != nil {
856		data = data[:len(data)-1] // remove the null byte
857		data = bsoncore.AppendTimestampElement(data, "afterClusterTime", client.OperationTime.T, client.OperationTime.I)
858		data, _ = bsoncore.AppendDocumentEnd(data, 0)
859	}
860
861	if len(data) == bsoncore.EmptyDocumentLength {
862		return dst, nil
863	}
864	return bsoncore.AppendDocumentElement(dst, "readConcern", data), nil
865}
866
867func (op Operation) addWriteConcern(dst []byte, desc description.SelectedServer) ([]byte, error) {
868	if op.MinimumWriteConcernWireVersion > 0 && (desc.WireVersion == nil || !desc.WireVersion.Includes(op.MinimumWriteConcernWireVersion)) {
869		return dst, nil
870	}
871	wc := op.WriteConcern
872	if wc == nil {
873		return dst, nil
874	}
875
876	t, data, err := wc.MarshalBSONValue()
877	if err == writeconcern.ErrEmptyWriteConcern {
878		return dst, nil
879	}
880	if err != nil {
881		return dst, err
882	}
883
884	return append(bsoncore.AppendHeader(dst, t, "writeConcern"), data...), nil
885}
886
887func (op Operation) addSession(dst []byte, desc description.SelectedServer) ([]byte, error) {
888	client := op.Client
889	if client == nil || !description.SessionsSupported(desc.WireVersion) || desc.SessionTimeoutMinutes == 0 {
890		return dst, nil
891	}
892	if client.Terminated {
893		return dst, session.ErrSessionEnded
894	}
895	lsid, _ := client.SessionID.MarshalBSON()
896	dst = bsoncore.AppendDocumentElement(dst, "lsid", lsid)
897
898	var addedTxnNumber bool
899	if op.Type == Write && client.RetryWrite {
900		addedTxnNumber = true
901		dst = bsoncore.AppendInt64Element(dst, "txnNumber", op.Client.TxnNumber)
902	}
903	if client.TransactionRunning() || client.RetryingCommit {
904		if !addedTxnNumber {
905			dst = bsoncore.AppendInt64Element(dst, "txnNumber", op.Client.TxnNumber)
906		}
907		if client.TransactionStarting() {
908			dst = bsoncore.AppendBooleanElement(dst, "startTransaction", true)
909		}
910		dst = bsoncore.AppendBooleanElement(dst, "autocommit", false)
911	}
912
913	client.ApplyCommand(desc.Server)
914
915	return dst, nil
916}
917
918func (op Operation) addClusterTime(dst []byte, desc description.SelectedServer) []byte {
919	client, clock := op.Client, op.Clock
920	if (clock == nil && client == nil) || !description.SessionsSupported(desc.WireVersion) {
921		return dst
922	}
923	clusterTime := clock.GetClusterTime()
924	if client != nil {
925		clusterTime = session.MaxClusterTime(clusterTime, client.ClusterTime)
926	}
927	if clusterTime == nil {
928		return dst
929	}
930	val, err := clusterTime.LookupErr("$clusterTime")
931	if err != nil {
932		return dst
933	}
934	return append(bsoncore.AppendHeader(dst, val.Type, "$clusterTime"), val.Value...)
935	// return bsoncore.AppendDocumentElement(dst, "$clusterTime", clusterTime)
936}
937
938// updateClusterTimes updates the cluster times for the session and cluster clock attached to this
939// operation. While the session's AdvanceClusterTime may return an error, this method does not
940// because an error being returned from this method will not be returned further up.
941func (op Operation) updateClusterTimes(response bsoncore.Document) {
942	// Extract cluster time.
943	value, err := response.LookupErr("$clusterTime")
944	if err != nil {
945		// $clusterTime not included by the server
946		return
947	}
948	clusterTime := bsoncore.BuildDocumentFromElements(nil, bsoncore.AppendValueElement(nil, "$clusterTime", value))
949
950	sess, clock := op.Client, op.Clock
951
952	if sess != nil {
953		_ = sess.AdvanceClusterTime(bson.Raw(clusterTime))
954	}
955
956	if clock != nil {
957		clock.AdvanceClusterTime(bson.Raw(clusterTime))
958	}
959}
960
961// updateOperationTime updates the operation time on the session attached to this operation. While
962// the session's AdvanceOperationTime method may return an error, this method does not because an
963// error being returned from this method will not be returned further up.
964func (op Operation) updateOperationTime(response bsoncore.Document) {
965	sess := op.Client
966	if sess == nil {
967		return
968	}
969
970	opTimeElem, err := response.LookupErr("operationTime")
971	if err != nil {
972		// operationTime not included by the server
973		return
974	}
975
976	t, i := opTimeElem.Timestamp()
977	_ = sess.AdvanceOperationTime(&primitive.Timestamp{
978		T: t,
979		I: i,
980	})
981}
982
983func (op Operation) getReadPrefBasedOnTransaction() (*readpref.ReadPref, error) {
984	if op.Client != nil && op.Client.TransactionRunning() {
985		// Transaction's read preference always takes priority
986		rp := op.Client.CurrentRp
987		// Reads in a transaction must have read preference primary
988		// This must not be checked in startTransaction
989		if rp != nil && !op.Client.TransactionStarting() && rp.Mode() != readpref.PrimaryMode {
990			return nil, ErrNonPrimaryReadPref
991		}
992		return rp, nil
993	}
994	return op.ReadPreference, nil
995}
996
997func (op Operation) createReadPref(serverKind description.ServerKind, topologyKind description.TopologyKind, isOpQuery bool) (bsoncore.Document, error) {
998	if serverKind == description.Standalone || (isOpQuery && serverKind != description.Mongos) || op.Type == Write {
999		// Don't send read preference for non-mongos when using OP_QUERY or for all standalones
1000		return nil, nil
1001	}
1002
1003	idx, doc := bsoncore.AppendDocumentStart(nil)
1004	rp, err := op.getReadPrefBasedOnTransaction()
1005	if err != nil {
1006		return nil, err
1007	}
1008
1009	if rp == nil {
1010		if topologyKind == description.Single && serverKind != description.Mongos {
1011			doc = bsoncore.AppendStringElement(doc, "mode", "primaryPreferred")
1012			doc, _ = bsoncore.AppendDocumentEnd(doc, idx)
1013			return doc, nil
1014		}
1015		return nil, nil
1016	}
1017
1018	switch rp.Mode() {
1019	case readpref.PrimaryMode:
1020		if serverKind == description.Mongos {
1021			return nil, nil
1022		}
1023		if topologyKind == description.Single {
1024			doc = bsoncore.AppendStringElement(doc, "mode", "primaryPreferred")
1025			doc, _ = bsoncore.AppendDocumentEnd(doc, idx)
1026			return doc, nil
1027		}
1028		doc = bsoncore.AppendStringElement(doc, "mode", "primary")
1029	case readpref.PrimaryPreferredMode:
1030		doc = bsoncore.AppendStringElement(doc, "mode", "primaryPreferred")
1031	case readpref.SecondaryPreferredMode:
1032		_, ok := rp.MaxStaleness()
1033		if serverKind == description.Mongos && isOpQuery && !ok && len(rp.TagSets()) == 0 {
1034			return nil, nil
1035		}
1036		doc = bsoncore.AppendStringElement(doc, "mode", "secondaryPreferred")
1037	case readpref.SecondaryMode:
1038		doc = bsoncore.AppendStringElement(doc, "mode", "secondary")
1039	case readpref.NearestMode:
1040		doc = bsoncore.AppendStringElement(doc, "mode", "nearest")
1041	}
1042
1043	sets := make([]bsoncore.Document, 0, len(rp.TagSets()))
1044	for _, ts := range rp.TagSets() {
1045		if len(ts) == 0 {
1046			continue
1047		}
1048		i, set := bsoncore.AppendDocumentStart(nil)
1049		for _, t := range ts {
1050			set = bsoncore.AppendStringElement(set, t.Name, t.Value)
1051		}
1052		set, _ = bsoncore.AppendDocumentEnd(set, i)
1053		sets = append(sets, set)
1054	}
1055	if len(sets) > 0 {
1056		var aidx int32
1057		aidx, doc = bsoncore.AppendArrayElementStart(doc, "tags")
1058		for i, set := range sets {
1059			doc = bsoncore.AppendDocumentElement(doc, strconv.Itoa(i), set)
1060		}
1061		doc, _ = bsoncore.AppendArrayEnd(doc, aidx)
1062	}
1063
1064	if d, ok := rp.MaxStaleness(); ok {
1065		doc = bsoncore.AppendInt32Element(doc, "maxStalenessSeconds", int32(d.Seconds()))
1066	}
1067
1068	doc, _ = bsoncore.AppendDocumentEnd(doc, idx)
1069	return doc, nil
1070}
1071
1072func (op Operation) slaveOK(desc description.SelectedServer) wiremessage.QueryFlag {
1073	if desc.Kind == description.Single && desc.Server.Kind != description.Mongos {
1074		return wiremessage.SlaveOK
1075	}
1076
1077	if rp := op.ReadPreference; rp != nil && rp.Mode() != readpref.PrimaryMode {
1078		return wiremessage.SlaveOK
1079	}
1080
1081	return 0
1082}
1083
1084func (Operation) canCompress(cmd string) bool {
1085	if cmd == "isMaster" || cmd == "saslStart" || cmd == "saslContinue" || cmd == "getnonce" || cmd == "authenticate" ||
1086		cmd == "createUser" || cmd == "updateUser" || cmd == "copydbSaslStart" || cmd == "copydbgetnonce" || cmd == "copydb" {
1087		return false
1088	}
1089	return true
1090}
1091
1092// decodeOpReply extracts the necessary information from an OP_REPLY wire message.
1093// includesHeader: specifies whether or not wm includes the message header
1094// Returns the decoded OP_REPLY. If the err field of the returned opReply is non-nil, an error occurred while decoding
1095// or validating the response and the other fields are undefined.
1096func (Operation) decodeOpReply(wm []byte, includesHeader bool) opReply {
1097	var reply opReply
1098	var ok bool
1099
1100	if includesHeader {
1101		wmLength := len(wm)
1102		var length int32
1103		var opcode wiremessage.OpCode
1104		length, _, _, opcode, wm, ok = wiremessage.ReadHeader(wm)
1105		if !ok || int(length) > wmLength {
1106			reply.err = errors.New("malformed wire message: insufficient bytes")
1107			return reply
1108		}
1109		if opcode != wiremessage.OpReply {
1110			reply.err = errors.New("malformed wire message: incorrect opcode")
1111			return reply
1112		}
1113	}
1114
1115	reply.responseFlags, wm, ok = wiremessage.ReadReplyFlags(wm)
1116	if !ok {
1117		reply.err = errors.New("malformed OP_REPLY: missing flags")
1118		return reply
1119	}
1120	reply.cursorID, wm, ok = wiremessage.ReadReplyCursorID(wm)
1121	if !ok {
1122		reply.err = errors.New("malformed OP_REPLY: missing cursorID")
1123		return reply
1124	}
1125	reply.startingFrom, wm, ok = wiremessage.ReadReplyStartingFrom(wm)
1126	if !ok {
1127		reply.err = errors.New("malformed OP_REPLY: missing startingFrom")
1128		return reply
1129	}
1130	reply.numReturned, wm, ok = wiremessage.ReadReplyNumberReturned(wm)
1131	if !ok {
1132		reply.err = errors.New("malformed OP_REPLY: missing numberReturned")
1133		return reply
1134	}
1135	reply.documents, wm, ok = wiremessage.ReadReplyDocuments(wm)
1136	if !ok {
1137		reply.err = errors.New("malformed OP_REPLY: could not read documents from reply")
1138	}
1139
1140	if reply.responseFlags&wiremessage.QueryFailure == wiremessage.QueryFailure {
1141		reply.err = QueryFailureError{
1142			Message:  "command failure",
1143			Response: reply.documents[0],
1144		}
1145		return reply
1146	}
1147	if reply.responseFlags&wiremessage.CursorNotFound == wiremessage.CursorNotFound {
1148		reply.err = ErrCursorNotFound
1149		return reply
1150	}
1151	if reply.numReturned != int32(len(reply.documents)) {
1152		reply.err = ErrReplyDocumentMismatch
1153		return reply
1154	}
1155
1156	return reply
1157}
1158
1159func (op Operation) decodeResult(wm []byte) (bsoncore.Document, error) {
1160	wmLength := len(wm)
1161	length, _, _, opcode, wm, ok := wiremessage.ReadHeader(wm)
1162	if !ok || int(length) > wmLength {
1163		return nil, errors.New("malformed wire message: insufficient bytes")
1164	}
1165
1166	wm = wm[:wmLength-16] // constrain to just this wiremessage, incase there are multiple in the slice
1167
1168	switch opcode {
1169	case wiremessage.OpReply:
1170		reply := op.decodeOpReply(wm, false)
1171		if reply.err != nil {
1172			return nil, reply.err
1173		}
1174		if reply.numReturned == 0 {
1175			return nil, ErrNoDocCommandResponse
1176		}
1177		if reply.numReturned > 1 {
1178			return nil, ErrMultiDocCommandResponse
1179		}
1180		rdr := reply.documents[0]
1181		if err := rdr.Validate(); err != nil {
1182			return nil, NewCommandResponseError("malformed OP_REPLY: invalid document", err)
1183		}
1184
1185		return rdr, extractError(rdr)
1186	case wiremessage.OpMsg:
1187		_, wm, ok = wiremessage.ReadMsgFlags(wm)
1188		if !ok {
1189			return nil, errors.New("malformed wire message: missing OP_MSG flags")
1190		}
1191
1192		var res bsoncore.Document
1193		for len(wm) > 0 {
1194			var stype wiremessage.SectionType
1195			stype, wm, ok = wiremessage.ReadMsgSectionType(wm)
1196			if !ok {
1197				return nil, errors.New("malformed wire message: insuffienct bytes to read section type")
1198			}
1199
1200			switch stype {
1201			case wiremessage.SingleDocument:
1202				res, wm, ok = wiremessage.ReadMsgSectionSingleDocument(wm)
1203				if !ok {
1204					return nil, errors.New("malformed wire message: insufficient bytes to read single document")
1205				}
1206			case wiremessage.DocumentSequence:
1207				// TODO(GODRIVER-617): Implement document sequence returns.
1208				_, _, wm, ok = wiremessage.ReadMsgSectionDocumentSequence(wm)
1209				if !ok {
1210					return nil, errors.New("malformed wire message: insufficient bytes to read document sequence")
1211				}
1212			default:
1213				return nil, fmt.Errorf("malformed wire message: uknown section type %v", stype)
1214			}
1215		}
1216
1217		err := res.Validate()
1218		if err != nil {
1219			return nil, NewCommandResponseError("malformed OP_MSG: invalid document", err)
1220		}
1221
1222		return res, extractError(res)
1223	default:
1224		return nil, fmt.Errorf("cannot decode result from %s", opcode)
1225	}
1226}
1227
1228// getCommandName returns the name of the command from the given BSON document.
1229func (op Operation) getCommandName(doc []byte) string {
1230	// skip 4 bytes for document length and 1 byte for element type
1231	idx := bytes.IndexByte(doc[5:], 0x00) // look for the 0 byte after the command name
1232	return string(doc[5 : idx+5])
1233}
1234
1235func (op *Operation) canMonitor(cmd string) bool {
1236	return !(cmd == "authenticate" || cmd == "saslStart" || cmd == "saslContinue" || cmd == "getnonce" || cmd == "createUser" ||
1237		cmd == "updateUser" || cmd == "copydbgetnonce" || cmd == "copydbsaslstart" || cmd == "copydb")
1238}
1239
1240// publishStartedEvent publishes a CommandStartedEvent to the operation's command monitor if possible. If the command is
1241// an unacknowledged write, a CommandSucceededEvent will be published as well. If started events are not being monitored,
1242// no events are published.
1243func (op Operation) publishStartedEvent(ctx context.Context, info startedInformation) {
1244	if op.CommandMonitor == nil || op.CommandMonitor.Started == nil {
1245		return
1246	}
1247
1248	// Make a copy of the command. Redact if the command is security sensitive and cannot be monitored.
1249	// If there was a type 1 payload for the current batch, convert it to a BSON array.
1250	var cmdCopy []byte
1251	if op.canMonitor(info.cmdName) {
1252		cmdCopy = make([]byte, len(info.cmd))
1253		copy(cmdCopy, info.cmd)
1254		if info.documentSequenceIncluded {
1255			cmdCopy = cmdCopy[:len(info.cmd)-1] // remove 0 byte at end
1256			cmdCopy = op.addBatchArray(cmdCopy)
1257			cmdCopy, _ = bsoncore.AppendDocumentEnd(cmdCopy, 0) // add back 0 byte and update length
1258		}
1259	}
1260
1261	started := &event.CommandStartedEvent{
1262		Command:      cmdCopy,
1263		DatabaseName: op.Database,
1264		CommandName:  info.cmdName,
1265		RequestID:    int64(info.requestID),
1266		ConnectionID: info.connID,
1267	}
1268	op.CommandMonitor.Started(ctx, started)
1269}
1270
1271// publishFinishedEvent publishes either a CommandSucceededEvent or a CommandFailedEvent to the operation's command
1272// monitor if possible. If success/failure events aren't being monitored, no events are published.
1273func (op Operation) publishFinishedEvent(ctx context.Context, info finishedInformation) {
1274	success := info.cmdErr == nil
1275	if _, ok := info.cmdErr.(WriteCommandError); ok {
1276		success = true
1277	}
1278	if op.CommandMonitor == nil || (success && op.CommandMonitor.Succeeded == nil) || (!success && op.CommandMonitor.Failed == nil) {
1279		return
1280	}
1281
1282	var durationNanos int64
1283	var emptyTime time.Time
1284	if info.startTime != emptyTime {
1285		durationNanos = time.Now().Sub(info.startTime).Nanoseconds()
1286	}
1287
1288	finished := event.CommandFinishedEvent{
1289		CommandName:   info.cmdName,
1290		RequestID:     int64(info.requestID),
1291		ConnectionID:  info.connID,
1292		DurationNanos: durationNanos,
1293	}
1294
1295	if success {
1296		res := bson.Raw{}
1297		// Only copy the reply for commands that are not security sensitive
1298		if op.canMonitor(info.cmdName) {
1299			res = make([]byte, len(info.response))
1300			copy(res, info.response)
1301		}
1302		successEvent := &event.CommandSucceededEvent{
1303			Reply:                res,
1304			CommandFinishedEvent: finished,
1305		}
1306		op.CommandMonitor.Succeeded(ctx, successEvent)
1307		return
1308	}
1309
1310	failedEvent := &event.CommandFailedEvent{
1311		Failure:              info.cmdErr.Error(),
1312		CommandFinishedEvent: finished,
1313	}
1314	op.CommandMonitor.Failed(ctx, failedEvent)
1315}
1316