1package driver 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "strconv" 9 "time" 10 11 "go.mongodb.org/mongo-driver/bson" 12 "go.mongodb.org/mongo-driver/bson/bsontype" 13 "go.mongodb.org/mongo-driver/bson/primitive" 14 "go.mongodb.org/mongo-driver/event" 15 "go.mongodb.org/mongo-driver/mongo/readconcern" 16 "go.mongodb.org/mongo-driver/mongo/readpref" 17 "go.mongodb.org/mongo-driver/mongo/writeconcern" 18 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" 19 "go.mongodb.org/mongo-driver/x/mongo/driver/description" 20 "go.mongodb.org/mongo-driver/x/mongo/driver/session" 21 "go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage" 22) 23 24const defaultLocalThreshold = 15 * time.Millisecond 25 26var dollarCmd = [...]byte{'.', '$', 'c', 'm', 'd'} 27 28var ( 29 // ErrNoDocCommandResponse occurs when the server indicated a response existed, but none was found. 30 ErrNoDocCommandResponse = errors.New("command returned no documents") 31 // ErrMultiDocCommandResponse occurs when the server sent multiple documents in response to a command. 32 ErrMultiDocCommandResponse = errors.New("command returned multiple documents") 33 // ErrReplyDocumentMismatch occurs when the number of documents returned in an OP_QUERY does not match the numberReturned field. 34 ErrReplyDocumentMismatch = errors.New("number of documents returned does not match numberReturned field") 35 // ErrNonPrimaryReadPref is returned when a read is attempted in a transaction with a non-primary read preference. 36 ErrNonPrimaryReadPref = errors.New("read preference in a transaction must be primary") 37) 38 39const ( 40 // maximum BSON object size when client side encryption is enabled 41 cryptMaxBsonObjectSize uint32 = 2097152 42 // minimum wire version necessary to use automatic encryption 43 cryptMinWireVersion int32 = 8 44) 45 46// InvalidOperationError is returned from Validate and indicates that a required field is missing 47// from an instance of Operation. 48type InvalidOperationError struct{ MissingField string } 49 50func (err InvalidOperationError) Error() string { 51 return "the " + err.MissingField + " field must be set on Operation" 52} 53 54// opReply stores information returned in an OP_REPLY response from the server. 55// The err field stores any error that occurred when decoding or validating the OP_REPLY response. 56type opReply struct { 57 responseFlags wiremessage.ReplyFlag 58 cursorID int64 59 startingFrom int32 60 numReturned int32 61 documents []bsoncore.Document 62 err error 63} 64 65// startedInformation keeps track of all of the information necessary for monitoring started events. 66type startedInformation struct { 67 cmd bsoncore.Document 68 requestID int32 69 cmdName string 70 documentSequenceIncluded bool 71 connID string 72} 73 74// finishedInformation keeps track of all of the information necessary for monitoring success and failure events. 75type finishedInformation struct { 76 cmdName string 77 requestID int32 78 response bsoncore.Document 79 cmdErr error 80 connID string 81 startTime time.Time 82} 83 84// Operation is used to execute an operation. It contains all of the common code required to 85// select a server, transform an operation into a command, write the command to a connection from 86// the selected server, read a response from that connection, process the response, and potentially 87// retry. 88// 89// The required fields are Database, CommandFn, and Deployment. All other fields are optional. 90// 91// While an Operation can be constructed manually, drivergen should be used to generate an 92// implementation of an operation instead. This will ensure that there are helpers for constructing 93// the operation and that this type isn't configured incorrectly. 94type Operation struct { 95 // CommandFn is used to create the command that will be wrapped in a wire message and sent to 96 // the server. This function should only add the elements of the command and not start or end 97 // the enclosing BSON document. Per the command API, the first element must be the name of the 98 // command to run. This field is required. 99 CommandFn func(dst []byte, desc description.SelectedServer) ([]byte, error) 100 101 // Database is the database that the command will be run against. This field is required. 102 Database string 103 104 // Deployment is the MongoDB Deployment to use. While most of the time this will be multiple 105 // servers, commands that need to run against a single, preselected server can use the 106 // SingleServerDeployment type. Commands that need to run on a preselected connection can use 107 // the SingleConnectionDeployment type. 108 Deployment Deployment 109 110 // ProcessResponseFn is called after a response to the command is returned. The server is 111 // provided for types like Cursor that are required to run subsequent commands using the same 112 // server. 113 ProcessResponseFn func(response bsoncore.Document, srvr Server, desc description.Server) error 114 115 // Selector is the server selector that's used during both initial server selection and 116 // subsequent selection for retries. Depending on the Deployment implementation, the 117 // SelectServer method may not actually be called. 118 Selector description.ServerSelector 119 120 // ReadPreference is the read preference that will be attached to the command. If this field is 121 // not specified a default read preference of primary will be used. 122 ReadPreference *readpref.ReadPref 123 124 // ReadConcern is the read concern used when running read commands. This field should not be set 125 // for write operations. If this field is set, it will be encoded onto the commands sent to the 126 // server. 127 ReadConcern *readconcern.ReadConcern 128 129 // MinimumReadConcernWireVersion specifies the minimum wire version to add the read concern to 130 // the command being executed. 131 MinimumReadConcernWireVersion int32 132 133 // WriteConcern is the write concern used when running write commands. This field should not be 134 // set for read operations. If this field is set, it will be encoded onto the commands sent to 135 // the server. 136 WriteConcern *writeconcern.WriteConcern 137 138 // MinimumWriteConcernWireVersion specifies the minimum wire version to add the write concern to 139 // the command being executed. 140 MinimumWriteConcernWireVersion int32 141 142 // Client is the session used with this operation. This can be either an implicit or explicit 143 // session. If the server selected does not support sessions and Client is specified the 144 // behavior depends on the session type. If the session is implicit, the session fields will not 145 // be encoded onto the command. If the session is explicit, an error will be returned. The 146 // caller is responsible for ensuring that this field is nil if the Deployment does not support 147 // sessions. 148 Client *session.Client 149 150 // Clock is a cluster clock, different from the one contained within a session.Client. This 151 // allows updating cluster times for a global cluster clock while allowing individual session's 152 // cluster clocks to be only updated as far as the last command that's been run. 153 Clock *session.ClusterClock 154 155 // RetryMode specifies how to retry. There are three modes that enable retry: RetryOnce, 156 // RetryOncePerCommand, and RetryContext. For more information about what these modes do, please 157 // refer to their definitions. Both RetryMode and Type must be set for retryability to be enabled. 158 RetryMode *RetryMode 159 160 // Type specifies the kind of operation this is. There is only one mode that enables retry: Write. 161 // For more information about what this mode does, please refer to it's definition. Both Type and 162 // RetryMode must be set for retryability to be enabled. 163 Type Type 164 165 // Batches contains the documents that are split when executing a write command that potentially 166 // has more documents than can fit in a single command. This should only be specified for 167 // commands that are batch compatible. For more information, please refer to the definition of 168 // Batches. 169 Batches *Batches 170 171 // Legacy sets the legacy type for this operation. There are only 3 types that require legacy 172 // support: find, getMore, and killCursors. For more information about LegacyOperationKind, 173 // please refer to it's definition. 174 Legacy LegacyOperationKind 175 176 // CommandMonitor specifies the monitor to use for APM events. If this field is not set, 177 // no events will be reported. 178 CommandMonitor *event.CommandMonitor 179 180 // Crypt specifies a Crypt object to use for automatic client side encryption and decryption. 181 Crypt *Crypt 182} 183 184// shouldEncrypt returns true if this operation should automatically be encrypted. 185func (op Operation) shouldEncrypt() bool { 186 return op.Crypt != nil && !op.Crypt.BypassAutoEncryption 187} 188 189// selectServer handles performing server selection for an operation. 190func (op Operation) selectServer(ctx context.Context) (Server, error) { 191 if err := op.Validate(); err != nil { 192 return nil, err 193 } 194 195 select { 196 case <-ctx.Done(): 197 return nil, ctx.Err() 198 default: 199 } 200 201 selector := op.Selector 202 if selector == nil { 203 rp := op.ReadPreference 204 if rp == nil { 205 rp = readpref.Primary() 206 } 207 selector = description.CompositeSelector([]description.ServerSelector{ 208 description.ReadPrefSelector(rp), 209 description.LatencySelector(defaultLocalThreshold), 210 }) 211 } 212 213 return op.Deployment.SelectServer(ctx, selector) 214} 215 216// Validate validates this operation, ensuring the fields are set properly. 217func (op Operation) Validate() error { 218 if op.CommandFn == nil { 219 return InvalidOperationError{MissingField: "CommandFn"} 220 } 221 if op.Deployment == nil { 222 return InvalidOperationError{MissingField: "Deployment"} 223 } 224 if op.Database == "" { 225 return InvalidOperationError{MissingField: "Database"} 226 } 227 if op.Client != nil && !writeconcern.AckWrite(op.WriteConcern) { 228 return errors.New("session provided for an unacknowledged write") 229 } 230 return nil 231} 232 233// Execute runs this operation. The scratch parameter will be used and overwritten (potentially many 234// times), this should mainly be used to enable pooling of byte slices. 235func (op Operation) Execute(ctx context.Context, scratch []byte) error { 236 err := op.Validate() 237 if err != nil { 238 return err 239 } 240 241 srvr, err := op.selectServer(ctx) 242 if err != nil { 243 return err 244 } 245 246 conn, err := srvr.Connection(ctx) 247 if err != nil { 248 return err 249 } 250 defer conn.Close() 251 252 desc := description.SelectedServer{Server: conn.Description(), Kind: op.Deployment.Kind()} 253 scratch = scratch[:0] 254 255 if desc.WireVersion == nil || desc.WireVersion.Max < 4 { 256 switch op.Legacy { 257 case LegacyFind: 258 return op.legacyFind(ctx, scratch, srvr, conn, desc) 259 case LegacyGetMore: 260 return op.legacyGetMore(ctx, scratch, srvr, conn, desc) 261 case LegacyKillCursors: 262 return op.legacyKillCursors(ctx, scratch, srvr, conn, desc) 263 } 264 } 265 if desc.WireVersion == nil || desc.WireVersion.Max < 3 { 266 switch op.Legacy { 267 case LegacyListCollections: 268 return op.legacyListCollections(ctx, scratch, srvr, conn, desc) 269 case LegacyListIndexes: 270 return op.legacyListIndexes(ctx, scratch, srvr, conn, desc) 271 } 272 } 273 274 var res bsoncore.Document 275 var operationErr WriteCommandError 276 var original error 277 var retries int 278 retryable := op.retryable(desc.Server) 279 if retryable && op.RetryMode != nil { 280 switch op.Type { 281 case Write: 282 if op.Client == nil { 283 break 284 } 285 switch *op.RetryMode { 286 case RetryOnce, RetryOncePerCommand: 287 retries = 1 288 case RetryContext: 289 retries = -1 290 } 291 292 op.Client.RetryWrite = false 293 if *op.RetryMode > RetryNone { 294 op.Client.RetryWrite = true 295 if !op.Client.Committing && !op.Client.Aborting { 296 op.Client.IncrementTxnNumber() 297 } 298 } 299 300 case Read: 301 switch *op.RetryMode { 302 case RetryOnce, RetryOncePerCommand: 303 retries = 1 304 case RetryContext: 305 retries = -1 306 } 307 } 308 } 309 batching := op.Batches.Valid() 310 for { 311 if batching { 312 targetBatchSize := desc.MaxDocumentSize 313 maxDocSize := desc.MaxDocumentSize 314 if op.shouldEncrypt() { 315 // For client-side encryption, we want the batch to be split at 2 MiB instead of 16MiB. 316 // If there's only one document in the batch, it can be up to 16MiB, so we set target batch size to 317 // 2MiB but max document size to 16MiB. This will allow the AdvanceBatch call to create a batch 318 // with a single large document. 319 targetBatchSize = cryptMaxBsonObjectSize 320 } 321 322 err = op.Batches.AdvanceBatch(int(desc.MaxBatchCount), int(targetBatchSize), int(maxDocSize)) 323 if err != nil { 324 // TODO(GODRIVER-982): Should we also be returning operationErr? 325 return err 326 } 327 } 328 329 // convert to wire message 330 if len(scratch) > 0 { 331 scratch = scratch[:0] 332 } 333 wm, startedInfo, err := op.createWireMessage(ctx, scratch, desc) 334 if err != nil { 335 return err 336 } 337 338 // set extra data and send event if possible 339 startedInfo.connID = conn.ID() 340 startedInfo.cmdName = op.getCommandName(startedInfo.cmd) 341 op.publishStartedEvent(ctx, startedInfo) 342 343 // get the moreToCome flag information before we compress 344 moreToCome := wiremessage.IsMsgMoreToCome(wm) 345 346 // compress wiremessage if allowed 347 if compressor, ok := conn.(Compressor); ok && op.canCompress(startedInfo.cmdName) { 348 wm, err = compressor.CompressWireMessage(wm, nil) 349 if err != nil { 350 return err 351 } 352 } 353 354 finishedInfo := finishedInformation{ 355 cmdName: startedInfo.cmdName, 356 requestID: startedInfo.requestID, 357 startTime: time.Now(), 358 connID: startedInfo.connID, 359 } 360 361 // roundtrip using either the full roundTripper or a special one for when the moreToCome 362 // flag is set 363 var roundTrip = op.roundTrip 364 if moreToCome { 365 roundTrip = op.moreToComeRoundTrip 366 } 367 res, err = roundTrip(ctx, conn, wm) 368 if ep, ok := srvr.(ErrorProcessor); ok { 369 ep.ProcessError(err) 370 } 371 372 finishedInfo.response = res 373 finishedInfo.cmdErr = err 374 op.publishFinishedEvent(ctx, finishedInfo) 375 376 // Pull out $clusterTime and operationTime and update session and clock. We handle this before 377 // handling the error to ensure we are properly gossiping the cluster time. 378 op.updateClusterTimes(res) 379 op.updateOperationTime(res) 380 op.Client.UpdateRecoveryToken(bson.Raw(res)) 381 382 // automatically attempt to decrypt all results if client side encryption enabled 383 if op.Crypt != nil { 384 // use decryptErr isntead of err because err is used below for retrying 385 var decryptErr error 386 res, decryptErr = op.Crypt.Decrypt(ctx, res) 387 if decryptErr != nil { 388 return decryptErr 389 } 390 } 391 var perr error 392 if op.ProcessResponseFn != nil { 393 perr = op.ProcessResponseFn(res, srvr, desc.Server) 394 } 395 switch tt := err.(type) { 396 case WriteCommandError: 397 if e := err.(WriteCommandError); retryable && op.Type == Write && e.UnsupportedStorageEngine() { 398 return ErrUnsupportedStorageEngine 399 } 400 if retryable && tt.Retryable() && retries != 0 { 401 retries-- 402 original, err = err, nil 403 conn.Close() // Avoid leaking the connection. 404 srvr, err = op.selectServer(ctx) 405 if err != nil { 406 return original 407 } 408 conn, err = srvr.Connection(ctx) 409 if err != nil || conn == nil || !op.retryable(conn.Description()) { 410 if conn != nil { 411 conn.Close() 412 } 413 return original 414 } 415 defer conn.Close() // Avoid leaking the new connection. 416 if op.Client != nil && op.Client.Committing { 417 // Apply majority write concern for retries 418 op.Client.UpdateCommitTransactionWriteConcern() 419 op.WriteConcern = op.Client.CurrentWc 420 } 421 continue 422 } 423 // If batching is enabled and either ordered is the default (which is true) or 424 // explicitly set to true and we have write errors, return the errors. 425 if batching && (op.Batches.Ordered == nil || *op.Batches.Ordered == true) && len(tt.WriteErrors) > 0 { 426 return tt 427 } 428 if op.Client != nil && op.Client.Committing && tt.WriteConcernError != nil { 429 // When running commitTransaction we return WriteConcernErrors as an Error. 430 err := Error{ 431 Name: tt.WriteConcernError.Name, 432 Code: int32(tt.WriteConcernError.Code), 433 Message: tt.WriteConcernError.Message, 434 } 435 if err.Code == 64 || err.Code == 50 || tt.WriteConcernError.Retryable() { 436 err.Labels = []string{UnknownTransactionCommitResult} 437 } 438 return err 439 } 440 operationErr.WriteConcernError = tt.WriteConcernError 441 operationErr.WriteErrors = append(operationErr.WriteErrors, tt.WriteErrors...) 442 case Error: 443 if tt.HasErrorLabel(TransientTransactionError) || tt.HasErrorLabel(UnknownTransactionCommitResult) { 444 op.Client.ClearPinnedServer() 445 } 446 if e := err.(Error); retryable && op.Type == Write && e.UnsupportedStorageEngine() { 447 return ErrUnsupportedStorageEngine 448 } 449 if retryable && tt.Retryable() && retries != 0 { 450 retries-- 451 original, err = err, nil 452 conn.Close() // Avoid leaking the connection. 453 srvr, err = op.selectServer(ctx) 454 if err != nil { 455 return original 456 } 457 conn, err = srvr.Connection(ctx) 458 if err != nil || conn == nil || !op.retryable(conn.Description()) { 459 if conn != nil { 460 conn.Close() 461 } 462 return original 463 } 464 defer conn.Close() // Avoid leaking the new connection. 465 if op.Client != nil && op.Client.Committing { 466 // Apply majority write concern for retries 467 op.Client.UpdateCommitTransactionWriteConcern() 468 op.WriteConcern = op.Client.CurrentWc 469 } 470 continue 471 } 472 if op.Client != nil && op.Client.Committing && (tt.Retryable() || tt.Code == 50) { 473 // If we got a retryable error or MaxTimeMSExpired error, we add UnknownTransactionCommitResult. 474 tt.Labels = append(tt.Labels, UnknownTransactionCommitResult) 475 } 476 return tt 477 case nil: 478 if moreToCome { 479 return ErrUnacknowledgedWrite 480 } 481 if perr != nil { 482 return perr 483 } 484 default: 485 return err 486 } 487 488 if batching && len(op.Batches.Documents) > 0 { 489 if retryable && op.Client != nil && op.RetryMode != nil { 490 if *op.RetryMode > RetryNone { 491 op.Client.IncrementTxnNumber() 492 } 493 if *op.RetryMode == RetryOncePerCommand { 494 retries = 1 495 } 496 } 497 op.Batches.ClearBatch() 498 continue 499 } 500 break 501 } 502 if len(operationErr.WriteErrors) > 0 || operationErr.WriteConcernError != nil { 503 return operationErr 504 } 505 return nil 506} 507 508// Retryable writes are supported if the server supports sessions, the operation is not 509// within a transaction, and the write is acknowledged 510func (op Operation) retryable(desc description.Server) bool { 511 switch op.Type { 512 case Write: 513 if op.Client != nil && (op.Client.Committing || op.Client.Aborting) { 514 return true 515 } 516 if op.Deployment.SupportsRetryWrites() && 517 desc.WireVersion != nil && desc.WireVersion.Max >= 6 && 518 op.Client != nil && !(op.Client.TransactionInProgress() || op.Client.TransactionStarting()) && 519 writeconcern.AckWrite(op.WriteConcern) { 520 return true 521 } 522 case Read: 523 if op.Client != nil && (op.Client.Committing || op.Client.Aborting) { 524 return true 525 } 526 if desc.WireVersion != nil && desc.WireVersion.Max >= 6 && 527 (op.Client == nil || !(op.Client.TransactionInProgress() || op.Client.TransactionStarting())) { 528 return true 529 } 530 } 531 return false 532} 533 534// roundTrip writes a wiremessage to the connection and then reads a wiremessage. The wm parameter 535// is reused when reading the wiremessage. 536func (op Operation) roundTrip(ctx context.Context, conn Connection, wm []byte) ([]byte, error) { 537 err := conn.WriteWireMessage(ctx, wm) 538 if err != nil { 539 labels := []string{NetworkError} 540 if op.Client != nil { 541 op.Client.MarkDirty() 542 } 543 if op.Client != nil && op.Client.TransactionRunning() && !op.Client.Committing { 544 labels = append(labels, TransientTransactionError) 545 } 546 if op.Client != nil && op.Client.Committing { 547 labels = append(labels, UnknownTransactionCommitResult) 548 } 549 return nil, Error{Message: err.Error(), Labels: labels, Wrapped: err} 550 } 551 552 wm, err = conn.ReadWireMessage(ctx, wm[:0]) 553 if err != nil { 554 labels := []string{NetworkError} 555 if op.Client != nil { 556 op.Client.MarkDirty() 557 } 558 if op.Client != nil && op.Client.TransactionRunning() && !op.Client.Committing { 559 labels = append(labels, TransientTransactionError) 560 } 561 if op.Client != nil && op.Client.Committing { 562 labels = append(labels, UnknownTransactionCommitResult) 563 } 564 return nil, Error{Message: err.Error(), Labels: labels, Wrapped: err} 565 } 566 567 // decompress wiremessage 568 wm, err = op.decompressWireMessage(wm) 569 if err != nil { 570 return nil, err 571 } 572 573 // decode 574 res, err := op.decodeResult(wm) 575 // Pull out $clusterTime and operationTime and update session and clock. We handle this before 576 // handling the error to ensure we are properly gossiping the cluster time. 577 op.updateClusterTimes(res) 578 op.updateOperationTime(res) 579 580 return res, err 581} 582 583// moreToComeRoundTrip writes a wiremessage to the provided connection. This is used when an OP_MSG is 584// being sent with the moreToCome bit set. 585func (op *Operation) moreToComeRoundTrip(ctx context.Context, conn Connection, wm []byte) ([]byte, error) { 586 err := conn.WriteWireMessage(ctx, wm) 587 if err != nil { 588 if op.Client != nil { 589 op.Client.MarkDirty() 590 } 591 err = Error{Message: err.Error(), Labels: []string{TransientTransactionError, NetworkError}, Wrapped: err} 592 } 593 return bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "ok", 1)), err 594} 595 596// decompressWireMessage handles decompressing a wiremessage. If the wiremessage 597// is not compressed, this method will return the wiremessage. 598func (Operation) decompressWireMessage(wm []byte) ([]byte, error) { 599 // read the header and ensure this is a compressed wire message 600 length, reqid, respto, opcode, rem, ok := wiremessage.ReadHeader(wm) 601 if !ok || len(wm) < int(length) { 602 return nil, errors.New("malformed wire message: insufficient bytes") 603 } 604 if opcode != wiremessage.OpCompressed { 605 return wm, nil 606 } 607 // get the original opcode and uncompressed size 608 opcode, rem, ok = wiremessage.ReadCompressedOriginalOpCode(rem) 609 if !ok { 610 return nil, errors.New("malformed OP_COMPRESSED: missing original opcode") 611 } 612 uncompressedSize, rem, ok := wiremessage.ReadCompressedUncompressedSize(rem) 613 if !ok { 614 return nil, errors.New("malformed OP_COMPRESSED: missing uncompressed size") 615 } 616 // get the compressor ID and decompress the message 617 compressorID, rem, ok := wiremessage.ReadCompressedCompressorID(rem) 618 if !ok { 619 return nil, errors.New("malformed OP_COMPRESSED: missing compressor ID") 620 } 621 compressedSize := length - 25 // header (16) + original opcode (4) + uncompressed size (4) + compressor ID (1) 622 // return the original wiremessage 623 msg, rem, ok := wiremessage.ReadCompressedCompressedMessage(rem, compressedSize) 624 if !ok { 625 return nil, errors.New("malformed OP_COMPRESSED: insufficient bytes for compressed wiremessage") 626 } 627 628 header := make([]byte, 0, uncompressedSize+16) 629 header = wiremessage.AppendHeader(header, uncompressedSize+16, reqid, respto, opcode) 630 opts := CompressionOpts{ 631 Compressor: compressorID, 632 UncompressedSize: uncompressedSize, 633 } 634 uncompressed, err := DecompressPayload(msg, opts) 635 if err != nil { 636 return nil, err 637 } 638 639 return append(header, uncompressed...), nil 640} 641 642func (op Operation) createWireMessage(ctx context.Context, dst []byte, 643 desc description.SelectedServer) ([]byte, startedInformation, error) { 644 645 if desc.WireVersion == nil || desc.WireVersion.Max < wiremessage.OpmsgWireVersion { 646 return op.createQueryWireMessage(dst, desc) 647 } 648 return op.createMsgWireMessage(ctx, dst, desc) 649} 650 651func (op Operation) addBatchArray(dst []byte) []byte { 652 aidx, dst := bsoncore.AppendArrayElementStart(dst, op.Batches.Identifier) 653 for i, doc := range op.Batches.Current { 654 dst = bsoncore.AppendDocumentElement(dst, strconv.Itoa(i), doc) 655 } 656 dst, _ = bsoncore.AppendArrayEnd(dst, aidx) 657 return dst 658} 659 660func (op Operation) createQueryWireMessage(dst []byte, desc description.SelectedServer) ([]byte, startedInformation, error) { 661 var info startedInformation 662 flags := op.slaveOK(desc) 663 var wmindex int32 664 info.requestID = wiremessage.NextRequestID() 665 wmindex, dst = wiremessage.AppendHeaderStart(dst, info.requestID, 0, wiremessage.OpQuery) 666 dst = wiremessage.AppendQueryFlags(dst, flags) 667 // FullCollectionName 668 dst = append(dst, op.Database...) 669 dst = append(dst, dollarCmd[:]...) 670 dst = append(dst, 0x00) 671 dst = wiremessage.AppendQueryNumberToSkip(dst, 0) 672 dst = wiremessage.AppendQueryNumberToReturn(dst, -1) 673 674 wrapper := int32(-1) 675 rp, err := op.createReadPref(desc.Server.Kind, desc.Kind, true) 676 if err != nil { 677 return dst, info, err 678 } 679 if len(rp) > 0 { 680 wrapper, dst = bsoncore.AppendDocumentStart(dst) 681 dst = bsoncore.AppendHeader(dst, bsontype.EmbeddedDocument, "$query") 682 } 683 idx, dst := bsoncore.AppendDocumentStart(dst) 684 dst, err = op.CommandFn(dst, desc) 685 if err != nil { 686 return dst, info, err 687 } 688 689 if op.Batches != nil && len(op.Batches.Current) > 0 { 690 dst = op.addBatchArray(dst) 691 } 692 693 dst, err = op.addReadConcern(dst, desc) 694 if err != nil { 695 return dst, info, err 696 } 697 698 dst, err = op.addWriteConcern(dst, desc) 699 if err != nil { 700 return dst, info, err 701 } 702 703 dst, err = op.addSession(dst, desc) 704 if err != nil { 705 return dst, info, err 706 } 707 708 dst = op.addClusterTime(dst, desc) 709 710 dst, _ = bsoncore.AppendDocumentEnd(dst, idx) 711 // Command monitoring only reports the document inside $query 712 info.cmd = dst[idx:] 713 714 if len(rp) > 0 { 715 var err error 716 dst = bsoncore.AppendDocumentElement(dst, "$readPreference", rp) 717 dst, err = bsoncore.AppendDocumentEnd(dst, wrapper) 718 if err != nil { 719 return dst, info, err 720 } 721 } 722 723 return bsoncore.UpdateLength(dst, wmindex, int32(len(dst[wmindex:]))), info, nil 724} 725 726func (op Operation) createMsgWireMessage(ctx context.Context, dst []byte, desc description.SelectedServer) ([]byte, startedInformation, error) { 727 var info startedInformation 728 var flags wiremessage.MsgFlag 729 var wmindex int32 730 // We set the MoreToCome bit if we have a write concern, it's unacknowledged, and we either 731 // aren't batching or we are encoding the last batch. 732 if op.WriteConcern != nil && !writeconcern.AckWrite(op.WriteConcern) && (op.Batches == nil || len(op.Batches.Documents) == 0) { 733 flags = wiremessage.MoreToCome 734 } 735 info.requestID = wiremessage.NextRequestID() 736 wmindex, dst = wiremessage.AppendHeaderStart(dst, info.requestID, 0, wiremessage.OpMsg) 737 dst = wiremessage.AppendMsgFlags(dst, flags) 738 // Body 739 dst = wiremessage.AppendMsgSectionType(dst, wiremessage.SingleDocument) 740 741 idx, dst := bsoncore.AppendDocumentStart(dst) 742 743 dst, err := op.addCommandFields(ctx, dst, desc) 744 if err != nil { 745 return dst, info, err 746 } 747 dst, err = op.addReadConcern(dst, desc) 748 if err != nil { 749 return dst, info, err 750 } 751 dst, err = op.addWriteConcern(dst, desc) 752 if err != nil { 753 return dst, info, err 754 } 755 756 dst, err = op.addSession(dst, desc) 757 if err != nil { 758 return dst, info, err 759 } 760 761 dst = op.addClusterTime(dst, desc) 762 763 dst = bsoncore.AppendStringElement(dst, "$db", op.Database) 764 rp, err := op.createReadPref(desc.Server.Kind, desc.Kind, false) 765 if err != nil { 766 return dst, info, err 767 } 768 if len(rp) > 0 { 769 dst = bsoncore.AppendDocumentElement(dst, "$readPreference", rp) 770 } 771 772 dst, _ = bsoncore.AppendDocumentEnd(dst, idx) 773 // The command document for monitoring shouldn't include the type 1 payload as a document sequence 774 info.cmd = dst[idx:] 775 776 // add batch as a document sequence if auto encryption is not enabled 777 // if auto encryption is enabled, the batch will already be an array in the command document 778 if !op.shouldEncrypt() && op.Batches != nil && len(op.Batches.Current) > 0 { 779 info.documentSequenceIncluded = true 780 dst = wiremessage.AppendMsgSectionType(dst, wiremessage.DocumentSequence) 781 idx, dst = bsoncore.ReserveLength(dst) 782 783 dst = append(dst, op.Batches.Identifier...) 784 dst = append(dst, 0x00) 785 786 for _, doc := range op.Batches.Current { 787 dst = append(dst, doc...) 788 } 789 790 dst = bsoncore.UpdateLength(dst, idx, int32(len(dst[idx:]))) 791 } 792 793 return bsoncore.UpdateLength(dst, wmindex, int32(len(dst[wmindex:]))), info, nil 794} 795 796// addCommandFields adds the fields for a command to the wire message in dst. This assumes that the start of the document 797// has already been added and does not add the final 0 byte. 798func (op Operation) addCommandFields(ctx context.Context, dst []byte, desc description.SelectedServer) ([]byte, error) { 799 if !op.shouldEncrypt() { 800 return op.CommandFn(dst, desc) 801 } 802 803 if desc.WireVersion.Max < cryptMinWireVersion { 804 return dst, errors.New("auto-encryption requires a MongoDB version of 4.2") 805 } 806 807 // create temporary command document 808 cidx, cmdDst := bsoncore.AppendDocumentStart(nil) 809 var err error 810 cmdDst, err = op.CommandFn(cmdDst, desc) 811 if err != nil { 812 return dst, err 813 } 814 // use a BSON array instead of a type 1 payload because mongocryptd will convert to arrays regardless 815 if op.Batches != nil && len(op.Batches.Current) > 0 { 816 cmdDst = op.addBatchArray(cmdDst) 817 } 818 cmdDst, _ = bsoncore.AppendDocumentEnd(cmdDst, cidx) 819 820 // encrypt the command 821 encrypted, err := op.Crypt.Encrypt(ctx, op.Database, cmdDst) 822 if err != nil { 823 return dst, err 824 } 825 // append encrypted command to original destination, removing the first 4 bytes (length) and final byte (terminator) 826 dst = append(dst, encrypted[4:len(encrypted)-1]...) 827 return dst, nil 828} 829 830func (op Operation) addReadConcern(dst []byte, desc description.SelectedServer) ([]byte, error) { 831 if op.MinimumReadConcernWireVersion > 0 && (desc.WireVersion == nil || !desc.WireVersion.Includes(op.MinimumReadConcernWireVersion)) { 832 return dst, nil 833 } 834 rc := op.ReadConcern 835 client := op.Client 836 // Starting transaction's read concern overrides all others 837 if client != nil && client.TransactionStarting() && client.CurrentRc != nil { 838 rc = client.CurrentRc 839 } 840 841 // start transaction must append afterclustertime IF causally consistent and operation time exists 842 if rc == nil && client != nil && client.TransactionStarting() && client.Consistent && client.OperationTime != nil { 843 rc = readconcern.New() 844 } 845 846 if rc == nil { 847 return dst, nil 848 } 849 850 _, data, err := rc.MarshalBSONValue() // always returns a document 851 if err != nil { 852 return dst, err 853 } 854 855 if description.SessionsSupported(desc.WireVersion) && client != nil && client.Consistent && client.OperationTime != nil { 856 data = data[:len(data)-1] // remove the null byte 857 data = bsoncore.AppendTimestampElement(data, "afterClusterTime", client.OperationTime.T, client.OperationTime.I) 858 data, _ = bsoncore.AppendDocumentEnd(data, 0) 859 } 860 861 if len(data) == bsoncore.EmptyDocumentLength { 862 return dst, nil 863 } 864 return bsoncore.AppendDocumentElement(dst, "readConcern", data), nil 865} 866 867func (op Operation) addWriteConcern(dst []byte, desc description.SelectedServer) ([]byte, error) { 868 if op.MinimumWriteConcernWireVersion > 0 && (desc.WireVersion == nil || !desc.WireVersion.Includes(op.MinimumWriteConcernWireVersion)) { 869 return dst, nil 870 } 871 wc := op.WriteConcern 872 if wc == nil { 873 return dst, nil 874 } 875 876 t, data, err := wc.MarshalBSONValue() 877 if err == writeconcern.ErrEmptyWriteConcern { 878 return dst, nil 879 } 880 if err != nil { 881 return dst, err 882 } 883 884 return append(bsoncore.AppendHeader(dst, t, "writeConcern"), data...), nil 885} 886 887func (op Operation) addSession(dst []byte, desc description.SelectedServer) ([]byte, error) { 888 client := op.Client 889 if client == nil || !description.SessionsSupported(desc.WireVersion) || desc.SessionTimeoutMinutes == 0 { 890 return dst, nil 891 } 892 if client.Terminated { 893 return dst, session.ErrSessionEnded 894 } 895 lsid, _ := client.SessionID.MarshalBSON() 896 dst = bsoncore.AppendDocumentElement(dst, "lsid", lsid) 897 898 var addedTxnNumber bool 899 if op.Type == Write && client.RetryWrite { 900 addedTxnNumber = true 901 dst = bsoncore.AppendInt64Element(dst, "txnNumber", op.Client.TxnNumber) 902 } 903 if client.TransactionRunning() || client.RetryingCommit { 904 if !addedTxnNumber { 905 dst = bsoncore.AppendInt64Element(dst, "txnNumber", op.Client.TxnNumber) 906 } 907 if client.TransactionStarting() { 908 dst = bsoncore.AppendBooleanElement(dst, "startTransaction", true) 909 } 910 dst = bsoncore.AppendBooleanElement(dst, "autocommit", false) 911 } 912 913 client.ApplyCommand(desc.Server) 914 915 return dst, nil 916} 917 918func (op Operation) addClusterTime(dst []byte, desc description.SelectedServer) []byte { 919 client, clock := op.Client, op.Clock 920 if (clock == nil && client == nil) || !description.SessionsSupported(desc.WireVersion) { 921 return dst 922 } 923 clusterTime := clock.GetClusterTime() 924 if client != nil { 925 clusterTime = session.MaxClusterTime(clusterTime, client.ClusterTime) 926 } 927 if clusterTime == nil { 928 return dst 929 } 930 val, err := clusterTime.LookupErr("$clusterTime") 931 if err != nil { 932 return dst 933 } 934 return append(bsoncore.AppendHeader(dst, val.Type, "$clusterTime"), val.Value...) 935 // return bsoncore.AppendDocumentElement(dst, "$clusterTime", clusterTime) 936} 937 938// updateClusterTimes updates the cluster times for the session and cluster clock attached to this 939// operation. While the session's AdvanceClusterTime may return an error, this method does not 940// because an error being returned from this method will not be returned further up. 941func (op Operation) updateClusterTimes(response bsoncore.Document) { 942 // Extract cluster time. 943 value, err := response.LookupErr("$clusterTime") 944 if err != nil { 945 // $clusterTime not included by the server 946 return 947 } 948 clusterTime := bsoncore.BuildDocumentFromElements(nil, bsoncore.AppendValueElement(nil, "$clusterTime", value)) 949 950 sess, clock := op.Client, op.Clock 951 952 if sess != nil { 953 _ = sess.AdvanceClusterTime(bson.Raw(clusterTime)) 954 } 955 956 if clock != nil { 957 clock.AdvanceClusterTime(bson.Raw(clusterTime)) 958 } 959} 960 961// updateOperationTime updates the operation time on the session attached to this operation. While 962// the session's AdvanceOperationTime method may return an error, this method does not because an 963// error being returned from this method will not be returned further up. 964func (op Operation) updateOperationTime(response bsoncore.Document) { 965 sess := op.Client 966 if sess == nil { 967 return 968 } 969 970 opTimeElem, err := response.LookupErr("operationTime") 971 if err != nil { 972 // operationTime not included by the server 973 return 974 } 975 976 t, i := opTimeElem.Timestamp() 977 _ = sess.AdvanceOperationTime(&primitive.Timestamp{ 978 T: t, 979 I: i, 980 }) 981} 982 983func (op Operation) getReadPrefBasedOnTransaction() (*readpref.ReadPref, error) { 984 if op.Client != nil && op.Client.TransactionRunning() { 985 // Transaction's read preference always takes priority 986 rp := op.Client.CurrentRp 987 // Reads in a transaction must have read preference primary 988 // This must not be checked in startTransaction 989 if rp != nil && !op.Client.TransactionStarting() && rp.Mode() != readpref.PrimaryMode { 990 return nil, ErrNonPrimaryReadPref 991 } 992 return rp, nil 993 } 994 return op.ReadPreference, nil 995} 996 997func (op Operation) createReadPref(serverKind description.ServerKind, topologyKind description.TopologyKind, isOpQuery bool) (bsoncore.Document, error) { 998 if serverKind == description.Standalone || (isOpQuery && serverKind != description.Mongos) || op.Type == Write { 999 // Don't send read preference for non-mongos when using OP_QUERY or for all standalones 1000 return nil, nil 1001 } 1002 1003 idx, doc := bsoncore.AppendDocumentStart(nil) 1004 rp, err := op.getReadPrefBasedOnTransaction() 1005 if err != nil { 1006 return nil, err 1007 } 1008 1009 if rp == nil { 1010 if topologyKind == description.Single && serverKind != description.Mongos { 1011 doc = bsoncore.AppendStringElement(doc, "mode", "primaryPreferred") 1012 doc, _ = bsoncore.AppendDocumentEnd(doc, idx) 1013 return doc, nil 1014 } 1015 return nil, nil 1016 } 1017 1018 switch rp.Mode() { 1019 case readpref.PrimaryMode: 1020 if serverKind == description.Mongos { 1021 return nil, nil 1022 } 1023 if topologyKind == description.Single { 1024 doc = bsoncore.AppendStringElement(doc, "mode", "primaryPreferred") 1025 doc, _ = bsoncore.AppendDocumentEnd(doc, idx) 1026 return doc, nil 1027 } 1028 doc = bsoncore.AppendStringElement(doc, "mode", "primary") 1029 case readpref.PrimaryPreferredMode: 1030 doc = bsoncore.AppendStringElement(doc, "mode", "primaryPreferred") 1031 case readpref.SecondaryPreferredMode: 1032 _, ok := rp.MaxStaleness() 1033 if serverKind == description.Mongos && isOpQuery && !ok && len(rp.TagSets()) == 0 { 1034 return nil, nil 1035 } 1036 doc = bsoncore.AppendStringElement(doc, "mode", "secondaryPreferred") 1037 case readpref.SecondaryMode: 1038 doc = bsoncore.AppendStringElement(doc, "mode", "secondary") 1039 case readpref.NearestMode: 1040 doc = bsoncore.AppendStringElement(doc, "mode", "nearest") 1041 } 1042 1043 sets := make([]bsoncore.Document, 0, len(rp.TagSets())) 1044 for _, ts := range rp.TagSets() { 1045 if len(ts) == 0 { 1046 continue 1047 } 1048 i, set := bsoncore.AppendDocumentStart(nil) 1049 for _, t := range ts { 1050 set = bsoncore.AppendStringElement(set, t.Name, t.Value) 1051 } 1052 set, _ = bsoncore.AppendDocumentEnd(set, i) 1053 sets = append(sets, set) 1054 } 1055 if len(sets) > 0 { 1056 var aidx int32 1057 aidx, doc = bsoncore.AppendArrayElementStart(doc, "tags") 1058 for i, set := range sets { 1059 doc = bsoncore.AppendDocumentElement(doc, strconv.Itoa(i), set) 1060 } 1061 doc, _ = bsoncore.AppendArrayEnd(doc, aidx) 1062 } 1063 1064 if d, ok := rp.MaxStaleness(); ok { 1065 doc = bsoncore.AppendInt32Element(doc, "maxStalenessSeconds", int32(d.Seconds())) 1066 } 1067 1068 doc, _ = bsoncore.AppendDocumentEnd(doc, idx) 1069 return doc, nil 1070} 1071 1072func (op Operation) slaveOK(desc description.SelectedServer) wiremessage.QueryFlag { 1073 if desc.Kind == description.Single && desc.Server.Kind != description.Mongos { 1074 return wiremessage.SlaveOK 1075 } 1076 1077 if rp := op.ReadPreference; rp != nil && rp.Mode() != readpref.PrimaryMode { 1078 return wiremessage.SlaveOK 1079 } 1080 1081 return 0 1082} 1083 1084func (Operation) canCompress(cmd string) bool { 1085 if cmd == "isMaster" || cmd == "saslStart" || cmd == "saslContinue" || cmd == "getnonce" || cmd == "authenticate" || 1086 cmd == "createUser" || cmd == "updateUser" || cmd == "copydbSaslStart" || cmd == "copydbgetnonce" || cmd == "copydb" { 1087 return false 1088 } 1089 return true 1090} 1091 1092// decodeOpReply extracts the necessary information from an OP_REPLY wire message. 1093// includesHeader: specifies whether or not wm includes the message header 1094// Returns the decoded OP_REPLY. If the err field of the returned opReply is non-nil, an error occurred while decoding 1095// or validating the response and the other fields are undefined. 1096func (Operation) decodeOpReply(wm []byte, includesHeader bool) opReply { 1097 var reply opReply 1098 var ok bool 1099 1100 if includesHeader { 1101 wmLength := len(wm) 1102 var length int32 1103 var opcode wiremessage.OpCode 1104 length, _, _, opcode, wm, ok = wiremessage.ReadHeader(wm) 1105 if !ok || int(length) > wmLength { 1106 reply.err = errors.New("malformed wire message: insufficient bytes") 1107 return reply 1108 } 1109 if opcode != wiremessage.OpReply { 1110 reply.err = errors.New("malformed wire message: incorrect opcode") 1111 return reply 1112 } 1113 } 1114 1115 reply.responseFlags, wm, ok = wiremessage.ReadReplyFlags(wm) 1116 if !ok { 1117 reply.err = errors.New("malformed OP_REPLY: missing flags") 1118 return reply 1119 } 1120 reply.cursorID, wm, ok = wiremessage.ReadReplyCursorID(wm) 1121 if !ok { 1122 reply.err = errors.New("malformed OP_REPLY: missing cursorID") 1123 return reply 1124 } 1125 reply.startingFrom, wm, ok = wiremessage.ReadReplyStartingFrom(wm) 1126 if !ok { 1127 reply.err = errors.New("malformed OP_REPLY: missing startingFrom") 1128 return reply 1129 } 1130 reply.numReturned, wm, ok = wiremessage.ReadReplyNumberReturned(wm) 1131 if !ok { 1132 reply.err = errors.New("malformed OP_REPLY: missing numberReturned") 1133 return reply 1134 } 1135 reply.documents, wm, ok = wiremessage.ReadReplyDocuments(wm) 1136 if !ok { 1137 reply.err = errors.New("malformed OP_REPLY: could not read documents from reply") 1138 } 1139 1140 if reply.responseFlags&wiremessage.QueryFailure == wiremessage.QueryFailure { 1141 reply.err = QueryFailureError{ 1142 Message: "command failure", 1143 Response: reply.documents[0], 1144 } 1145 return reply 1146 } 1147 if reply.responseFlags&wiremessage.CursorNotFound == wiremessage.CursorNotFound { 1148 reply.err = ErrCursorNotFound 1149 return reply 1150 } 1151 if reply.numReturned != int32(len(reply.documents)) { 1152 reply.err = ErrReplyDocumentMismatch 1153 return reply 1154 } 1155 1156 return reply 1157} 1158 1159func (op Operation) decodeResult(wm []byte) (bsoncore.Document, error) { 1160 wmLength := len(wm) 1161 length, _, _, opcode, wm, ok := wiremessage.ReadHeader(wm) 1162 if !ok || int(length) > wmLength { 1163 return nil, errors.New("malformed wire message: insufficient bytes") 1164 } 1165 1166 wm = wm[:wmLength-16] // constrain to just this wiremessage, incase there are multiple in the slice 1167 1168 switch opcode { 1169 case wiremessage.OpReply: 1170 reply := op.decodeOpReply(wm, false) 1171 if reply.err != nil { 1172 return nil, reply.err 1173 } 1174 if reply.numReturned == 0 { 1175 return nil, ErrNoDocCommandResponse 1176 } 1177 if reply.numReturned > 1 { 1178 return nil, ErrMultiDocCommandResponse 1179 } 1180 rdr := reply.documents[0] 1181 if err := rdr.Validate(); err != nil { 1182 return nil, NewCommandResponseError("malformed OP_REPLY: invalid document", err) 1183 } 1184 1185 return rdr, extractError(rdr) 1186 case wiremessage.OpMsg: 1187 _, wm, ok = wiremessage.ReadMsgFlags(wm) 1188 if !ok { 1189 return nil, errors.New("malformed wire message: missing OP_MSG flags") 1190 } 1191 1192 var res bsoncore.Document 1193 for len(wm) > 0 { 1194 var stype wiremessage.SectionType 1195 stype, wm, ok = wiremessage.ReadMsgSectionType(wm) 1196 if !ok { 1197 return nil, errors.New("malformed wire message: insuffienct bytes to read section type") 1198 } 1199 1200 switch stype { 1201 case wiremessage.SingleDocument: 1202 res, wm, ok = wiremessage.ReadMsgSectionSingleDocument(wm) 1203 if !ok { 1204 return nil, errors.New("malformed wire message: insufficient bytes to read single document") 1205 } 1206 case wiremessage.DocumentSequence: 1207 // TODO(GODRIVER-617): Implement document sequence returns. 1208 _, _, wm, ok = wiremessage.ReadMsgSectionDocumentSequence(wm) 1209 if !ok { 1210 return nil, errors.New("malformed wire message: insufficient bytes to read document sequence") 1211 } 1212 default: 1213 return nil, fmt.Errorf("malformed wire message: uknown section type %v", stype) 1214 } 1215 } 1216 1217 err := res.Validate() 1218 if err != nil { 1219 return nil, NewCommandResponseError("malformed OP_MSG: invalid document", err) 1220 } 1221 1222 return res, extractError(res) 1223 default: 1224 return nil, fmt.Errorf("cannot decode result from %s", opcode) 1225 } 1226} 1227 1228// getCommandName returns the name of the command from the given BSON document. 1229func (op Operation) getCommandName(doc []byte) string { 1230 // skip 4 bytes for document length and 1 byte for element type 1231 idx := bytes.IndexByte(doc[5:], 0x00) // look for the 0 byte after the command name 1232 return string(doc[5 : idx+5]) 1233} 1234 1235func (op *Operation) canMonitor(cmd string) bool { 1236 return !(cmd == "authenticate" || cmd == "saslStart" || cmd == "saslContinue" || cmd == "getnonce" || cmd == "createUser" || 1237 cmd == "updateUser" || cmd == "copydbgetnonce" || cmd == "copydbsaslstart" || cmd == "copydb") 1238} 1239 1240// publishStartedEvent publishes a CommandStartedEvent to the operation's command monitor if possible. If the command is 1241// an unacknowledged write, a CommandSucceededEvent will be published as well. If started events are not being monitored, 1242// no events are published. 1243func (op Operation) publishStartedEvent(ctx context.Context, info startedInformation) { 1244 if op.CommandMonitor == nil || op.CommandMonitor.Started == nil { 1245 return 1246 } 1247 1248 // Make a copy of the command. Redact if the command is security sensitive and cannot be monitored. 1249 // If there was a type 1 payload for the current batch, convert it to a BSON array. 1250 var cmdCopy []byte 1251 if op.canMonitor(info.cmdName) { 1252 cmdCopy = make([]byte, len(info.cmd)) 1253 copy(cmdCopy, info.cmd) 1254 if info.documentSequenceIncluded { 1255 cmdCopy = cmdCopy[:len(info.cmd)-1] // remove 0 byte at end 1256 cmdCopy = op.addBatchArray(cmdCopy) 1257 cmdCopy, _ = bsoncore.AppendDocumentEnd(cmdCopy, 0) // add back 0 byte and update length 1258 } 1259 } 1260 1261 started := &event.CommandStartedEvent{ 1262 Command: cmdCopy, 1263 DatabaseName: op.Database, 1264 CommandName: info.cmdName, 1265 RequestID: int64(info.requestID), 1266 ConnectionID: info.connID, 1267 } 1268 op.CommandMonitor.Started(ctx, started) 1269} 1270 1271// publishFinishedEvent publishes either a CommandSucceededEvent or a CommandFailedEvent to the operation's command 1272// monitor if possible. If success/failure events aren't being monitored, no events are published. 1273func (op Operation) publishFinishedEvent(ctx context.Context, info finishedInformation) { 1274 success := info.cmdErr == nil 1275 if _, ok := info.cmdErr.(WriteCommandError); ok { 1276 success = true 1277 } 1278 if op.CommandMonitor == nil || (success && op.CommandMonitor.Succeeded == nil) || (!success && op.CommandMonitor.Failed == nil) { 1279 return 1280 } 1281 1282 var durationNanos int64 1283 var emptyTime time.Time 1284 if info.startTime != emptyTime { 1285 durationNanos = time.Now().Sub(info.startTime).Nanoseconds() 1286 } 1287 1288 finished := event.CommandFinishedEvent{ 1289 CommandName: info.cmdName, 1290 RequestID: int64(info.requestID), 1291 ConnectionID: info.connID, 1292 DurationNanos: durationNanos, 1293 } 1294 1295 if success { 1296 res := bson.Raw{} 1297 // Only copy the reply for commands that are not security sensitive 1298 if op.canMonitor(info.cmdName) { 1299 res = make([]byte, len(info.response)) 1300 copy(res, info.response) 1301 } 1302 successEvent := &event.CommandSucceededEvent{ 1303 Reply: res, 1304 CommandFinishedEvent: finished, 1305 } 1306 op.CommandMonitor.Succeeded(ctx, successEvent) 1307 return 1308 } 1309 1310 failedEvent := &event.CommandFailedEvent{ 1311 Failure: info.cmdErr.Error(), 1312 CommandFinishedEvent: finished, 1313 } 1314 op.CommandMonitor.Failed(ctx, failedEvent) 1315} 1316