1// Copyright (C) MongoDB, Inc. 2017-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package mongo 8 9import ( 10 "context" 11 "errors" 12 "fmt" 13 "reflect" 14 "strconv" 15 "time" 16 17 "go.mongodb.org/mongo-driver/bson" 18 "go.mongodb.org/mongo-driver/bson/bsoncodec" 19 "go.mongodb.org/mongo-driver/bson/primitive" 20 "go.mongodb.org/mongo-driver/mongo/options" 21 "go.mongodb.org/mongo-driver/mongo/readconcern" 22 "go.mongodb.org/mongo-driver/mongo/readpref" 23 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" 24 "go.mongodb.org/mongo-driver/x/mongo/driver" 25 "go.mongodb.org/mongo-driver/x/mongo/driver/description" 26 "go.mongodb.org/mongo-driver/x/mongo/driver/operation" 27 "go.mongodb.org/mongo-driver/x/mongo/driver/session" 28) 29 30const errorInterrupted int32 = 11601 31const errorCappedPositionLost int32 = 136 32const errorCursorKilled int32 = 237 33 34// ErrMissingResumeToken indicates that a change stream notification from the server did not contain a resume token. 35var ErrMissingResumeToken = errors.New("cannot provide resume functionality when the resume token is missing") 36 37// ErrNilCursor indicates that the underlying cursor for the change stream is nil. 38var ErrNilCursor = errors.New("cursor is nil") 39 40// ChangeStream is used to iterate over a stream of events. Each event can be decoded into a Go type via the Decode 41// method or accessed as raw BSON via the Current field. For more information about change streams, see 42// https://docs.mongodb.com/manual/changeStreams/. 43type ChangeStream struct { 44 // Current is the BSON bytes of the current event. This property is only valid until the next call to Next or 45 // TryNext. If continued access is required, a copy must be made. 46 Current bson.Raw 47 48 aggregate *operation.Aggregate 49 pipelineSlice []bsoncore.Document 50 cursor changeStreamCursor 51 cursorOptions driver.CursorOptions 52 batch []bsoncore.Document 53 resumeToken bson.Raw 54 err error 55 sess *session.Client 56 client *Client 57 registry *bsoncodec.Registry 58 streamType StreamType 59 options *options.ChangeStreamOptions 60 selector description.ServerSelector 61 operationTime *primitive.Timestamp 62} 63 64type changeStreamConfig struct { 65 readConcern *readconcern.ReadConcern 66 readPreference *readpref.ReadPref 67 client *Client 68 registry *bsoncodec.Registry 69 streamType StreamType 70 collectionName string 71 databaseName string 72} 73 74func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline interface{}, 75 opts ...*options.ChangeStreamOptions) (*ChangeStream, error) { 76 if ctx == nil { 77 ctx = context.Background() 78 } 79 80 cs := &ChangeStream{ 81 client: config.client, 82 registry: config.registry, 83 streamType: config.streamType, 84 options: options.MergeChangeStreamOptions(opts...), 85 selector: description.ReadPrefSelector(config.readPreference), 86 } 87 88 cs.sess = sessionFromContext(ctx) 89 if cs.sess == nil && cs.client.sessionPool != nil { 90 cs.sess, cs.err = session.NewClientSession(cs.client.sessionPool, cs.client.id, session.Implicit) 91 if cs.err != nil { 92 return nil, cs.Err() 93 } 94 } 95 if cs.err = cs.client.validSession(cs.sess); cs.err != nil { 96 closeImplicitSession(cs.sess) 97 return nil, cs.Err() 98 } 99 100 cs.aggregate = operation.NewAggregate(nil). 101 ReadPreference(config.readPreference).ReadConcern(config.readConcern). 102 Deployment(cs.client.deployment).ClusterClock(cs.client.clock). 103 CommandMonitor(cs.client.monitor).Session(cs.sess).ServerSelector(cs.selector).Retry(driver.RetryNone) 104 105 if cs.options.Collation != nil { 106 cs.aggregate.Collation(bsoncore.Document(cs.options.Collation.ToDocument())) 107 } 108 if cs.options.BatchSize != nil { 109 cs.aggregate.BatchSize(*cs.options.BatchSize) 110 cs.cursorOptions.BatchSize = *cs.options.BatchSize 111 } 112 if cs.options.MaxAwaitTime != nil { 113 cs.cursorOptions.MaxTimeMS = int64(time.Duration(*cs.options.MaxAwaitTime) / time.Millisecond) 114 } 115 cs.cursorOptions.CommandMonitor = cs.client.monitor 116 117 switch cs.streamType { 118 case ClientStream: 119 cs.aggregate.Database("admin") 120 case DatabaseStream: 121 cs.aggregate.Database(config.databaseName) 122 case CollectionStream: 123 cs.aggregate.Collection(config.collectionName).Database(config.databaseName) 124 default: 125 closeImplicitSession(cs.sess) 126 return nil, fmt.Errorf("must supply a valid StreamType in config, instead of %v", cs.streamType) 127 } 128 129 // When starting a change stream, cache startAfter as the first resume token if it is set. If not, cache 130 // resumeAfter. If neither is set, do not cache a resume token. 131 resumeToken := cs.options.StartAfter 132 if resumeToken == nil { 133 resumeToken = cs.options.ResumeAfter 134 } 135 var marshaledToken bson.Raw 136 if resumeToken != nil { 137 if marshaledToken, cs.err = bson.Marshal(resumeToken); cs.err != nil { 138 closeImplicitSession(cs.sess) 139 return nil, cs.Err() 140 } 141 } 142 cs.resumeToken = marshaledToken 143 144 if cs.err = cs.buildPipelineSlice(pipeline); cs.err != nil { 145 closeImplicitSession(cs.sess) 146 return nil, cs.Err() 147 } 148 var pipelineArr bsoncore.Document 149 pipelineArr, cs.err = cs.pipelineToBSON() 150 cs.aggregate.Pipeline(pipelineArr) 151 152 if cs.err = cs.executeOperation(ctx, false); cs.err != nil { 153 closeImplicitSession(cs.sess) 154 return nil, cs.Err() 155 } 156 157 return cs, cs.Err() 158} 159 160func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) error { 161 var server driver.Server 162 var conn driver.Connection 163 var err error 164 165 if server, cs.err = cs.client.deployment.SelectServer(ctx, cs.selector); cs.err != nil { 166 return cs.Err() 167 } 168 if conn, cs.err = server.Connection(ctx); cs.err != nil { 169 return cs.Err() 170 } 171 172 defer conn.Close() 173 174 cs.aggregate.Deployment(driver.SingleConnectionDeployment{ 175 C: conn, 176 }) 177 178 if resuming { 179 cs.replaceOptions(ctx, conn.Description().WireVersion) // pass wire version 180 181 csOptDoc := cs.createPipelineOptionsDoc() 182 pipIdx, pipDoc := bsoncore.AppendDocumentStart(nil) 183 pipDoc = bsoncore.AppendDocumentElement(pipDoc, "$changeStream", csOptDoc) 184 if pipDoc, cs.err = bsoncore.AppendDocumentEnd(pipDoc, pipIdx); cs.err != nil { 185 return cs.Err() 186 } 187 cs.pipelineSlice[0] = pipDoc 188 189 var plArr bsoncore.Document 190 if plArr, cs.err = cs.pipelineToBSON(); cs.err != nil { 191 return cs.Err() 192 } 193 cs.aggregate.Pipeline(plArr) 194 } 195 196 if original := cs.aggregate.Execute(ctx); original != nil { 197 wireVersion := conn.Description().WireVersion 198 retryableRead := cs.client.retryReads && wireVersion != nil && wireVersion.Max >= 6 199 if !retryableRead { 200 cs.err = replaceErrors(original) 201 return cs.err 202 } 203 204 cs.err = original 205 switch tt := original.(type) { 206 case driver.Error: 207 if !tt.Retryable() { 208 break 209 } 210 211 server, err = cs.client.deployment.SelectServer(ctx, cs.selector) 212 if err != nil { 213 break 214 } 215 216 conn.Close() 217 conn, err = server.Connection(ctx) 218 if err != nil { 219 break 220 } 221 defer conn.Close() 222 223 wireVersion := conn.Description().WireVersion 224 if wireVersion == nil || wireVersion.Max < 6 { 225 break 226 } 227 228 cs.aggregate.Deployment(driver.SingleConnectionDeployment{ 229 C: conn, 230 }) 231 cs.err = cs.aggregate.Execute(ctx) 232 } 233 234 if cs.err != nil { 235 cs.err = replaceErrors(cs.err) 236 return cs.Err() 237 } 238 239 } 240 cs.err = nil 241 242 cr := cs.aggregate.ResultCursorResponse() 243 cr.Server = server 244 245 cs.cursor, cs.err = driver.NewBatchCursor(cr, cs.sess, cs.client.clock, cs.cursorOptions) 246 if cs.err = replaceErrors(cs.err); cs.err != nil { 247 return cs.Err() 248 } 249 250 cs.updatePbrtFromCommand() 251 if cs.options.StartAtOperationTime == nil && cs.options.ResumeAfter == nil && 252 cs.options.StartAfter == nil && conn.Description().WireVersion.Max >= 7 && 253 cs.emptyBatch() && cs.resumeToken == nil { 254 cs.operationTime = cs.sess.OperationTime 255 } 256 257 return cs.Err() 258} 259 260// Updates the post batch resume token after a successful aggregate or getMore operation. 261func (cs *ChangeStream) updatePbrtFromCommand() { 262 // Only cache the pbrt if an empty batch was returned and a pbrt was included 263 if pbrt := cs.cursor.PostBatchResumeToken(); cs.emptyBatch() && pbrt != nil { 264 cs.resumeToken = bson.Raw(pbrt) 265 } 266} 267 268func (cs *ChangeStream) storeResumeToken() error { 269 // If cs.Current is the last document in the batch and a pbrt is included, cache the pbrt 270 // Otherwise, cache the _id of the document 271 var tokenDoc bson.Raw 272 if len(cs.batch) == 0 { 273 if pbrt := cs.cursor.PostBatchResumeToken(); pbrt != nil { 274 tokenDoc = bson.Raw(pbrt) 275 } 276 } 277 278 if tokenDoc == nil { 279 var ok bool 280 tokenDoc, ok = cs.Current.Lookup("_id").DocumentOK() 281 if !ok { 282 _ = cs.Close(context.Background()) 283 return ErrMissingResumeToken 284 } 285 } 286 287 cs.resumeToken = tokenDoc 288 return nil 289} 290 291func (cs *ChangeStream) buildPipelineSlice(pipeline interface{}) error { 292 val := reflect.ValueOf(pipeline) 293 if !val.IsValid() || !(val.Kind() == reflect.Slice) { 294 cs.err = errors.New("can only transform slices and arrays into aggregation pipelines, but got invalid") 295 return cs.err 296 } 297 298 cs.pipelineSlice = make([]bsoncore.Document, 0, val.Len()+1) 299 300 csIdx, csDoc := bsoncore.AppendDocumentStart(nil) 301 csDocTemp := cs.createPipelineOptionsDoc() 302 if cs.err != nil { 303 return cs.err 304 } 305 csDoc = bsoncore.AppendDocumentElement(csDoc, "$changeStream", csDocTemp) 306 csDoc, cs.err = bsoncore.AppendDocumentEnd(csDoc, csIdx) 307 if cs.err != nil { 308 return cs.err 309 } 310 cs.pipelineSlice = append(cs.pipelineSlice, csDoc) 311 312 for i := 0; i < val.Len(); i++ { 313 var elem []byte 314 elem, cs.err = transformBsoncoreDocument(cs.registry, val.Index(i).Interface()) 315 if cs.err != nil { 316 return cs.err 317 } 318 319 cs.pipelineSlice = append(cs.pipelineSlice, elem) 320 } 321 322 return cs.err 323} 324 325func (cs *ChangeStream) createPipelineOptionsDoc() bsoncore.Document { 326 plDocIdx, plDoc := bsoncore.AppendDocumentStart(nil) 327 328 if cs.streamType == ClientStream { 329 plDoc = bsoncore.AppendBooleanElement(plDoc, "allChangesForCluster", true) 330 } 331 332 if cs.options.FullDocument != nil { 333 plDoc = bsoncore.AppendStringElement(plDoc, "fullDocument", string(*cs.options.FullDocument)) 334 } 335 336 if cs.options.ResumeAfter != nil { 337 var raDoc bsoncore.Document 338 raDoc, cs.err = transformBsoncoreDocument(cs.registry, cs.options.ResumeAfter) 339 if cs.err != nil { 340 return nil 341 } 342 343 plDoc = bsoncore.AppendDocumentElement(plDoc, "resumeAfter", raDoc) 344 } 345 346 if cs.options.StartAfter != nil { 347 var saDoc bsoncore.Document 348 saDoc, cs.err = transformBsoncoreDocument(cs.registry, cs.options.StartAfter) 349 if cs.err != nil { 350 return nil 351 } 352 353 plDoc = bsoncore.AppendDocumentElement(plDoc, "startAfter", saDoc) 354 } 355 356 if cs.options.StartAtOperationTime != nil { 357 plDoc = bsoncore.AppendTimestampElement(plDoc, "startAtOperationTime", cs.options.StartAtOperationTime.T, cs.options.StartAtOperationTime.I) 358 } 359 360 if plDoc, cs.err = bsoncore.AppendDocumentEnd(plDoc, plDocIdx); cs.err != nil { 361 return nil 362 } 363 364 return plDoc 365} 366 367func (cs *ChangeStream) pipelineToBSON() (bsoncore.Document, error) { 368 pipelineDocIdx, pipelineArr := bsoncore.AppendArrayStart(nil) 369 for i, doc := range cs.pipelineSlice { 370 pipelineArr = bsoncore.AppendDocumentElement(pipelineArr, strconv.Itoa(i), doc) 371 } 372 if pipelineArr, cs.err = bsoncore.AppendArrayEnd(pipelineArr, pipelineDocIdx); cs.err != nil { 373 return nil, cs.err 374 } 375 return pipelineArr, cs.err 376} 377 378func (cs *ChangeStream) replaceOptions(ctx context.Context, wireVersion *description.VersionRange) { 379 // Cached resume token: use the resume token as the resumeAfter option and set no other resume options 380 if cs.resumeToken != nil { 381 cs.options.SetResumeAfter(cs.resumeToken) 382 cs.options.SetStartAfter(nil) 383 cs.options.SetStartAtOperationTime(nil) 384 return 385 } 386 387 // No cached resume token but cached operation time: use the operation time as the startAtOperationTime option and 388 // set no other resume options 389 if (cs.sess.OperationTime != nil || cs.options.StartAtOperationTime != nil) && wireVersion.Max >= 7 { 390 opTime := cs.options.StartAtOperationTime 391 if cs.operationTime != nil { 392 opTime = cs.sess.OperationTime 393 } 394 395 cs.options.SetStartAtOperationTime(opTime) 396 cs.options.SetResumeAfter(nil) 397 cs.options.SetStartAfter(nil) 398 return 399 } 400 401 // No cached resume token or operation time: set none of the resume options 402 cs.options.SetResumeAfter(nil) 403 cs.options.SetStartAfter(nil) 404 cs.options.SetStartAtOperationTime(nil) 405} 406 407// ID returns the ID for this change stream, or 0 if the cursor has been closed or exhausted. 408func (cs *ChangeStream) ID() int64 { 409 if cs.cursor == nil { 410 return 0 411 } 412 return cs.cursor.ID() 413} 414 415// Decode will unmarshal the current event document into val and return any errors from the unmarshalling process 416// without any modification. If val is nil or is a typed nil, an error will be returned. 417func (cs *ChangeStream) Decode(val interface{}) error { 418 if cs.cursor == nil { 419 return ErrNilCursor 420 } 421 422 return bson.UnmarshalWithRegistry(cs.registry, cs.Current, val) 423} 424 425// Err returns the last error seen by the change stream, or nil if no errors has occurred. 426func (cs *ChangeStream) Err() error { 427 if cs.err != nil { 428 return replaceErrors(cs.err) 429 } 430 if cs.cursor == nil { 431 return nil 432 } 433 434 return replaceErrors(cs.cursor.Err()) 435} 436 437// Close closes this change stream and the underlying cursor. Next and TryNext must not be called after Close has been 438// called. Close is idempotent. After the first call, any subsequent calls will not change the state. 439func (cs *ChangeStream) Close(ctx context.Context) error { 440 if ctx == nil { 441 ctx = context.Background() 442 } 443 444 defer closeImplicitSession(cs.sess) 445 446 if cs.cursor == nil { 447 return nil // cursor is already closed 448 } 449 450 cs.err = replaceErrors(cs.cursor.Close(ctx)) 451 cs.cursor = nil 452 return cs.Err() 453} 454 455// ResumeToken returns the last cached resume token for this change stream, or nil if a resume token has not been 456// stored. 457func (cs *ChangeStream) ResumeToken() bson.Raw { 458 return cs.resumeToken 459} 460 461// Next gets the next event for this change stream. It returns true if there were no errors and the next event document 462// is available. 463// 464// Next blocks until an event is available, an error occurs, or ctx expires. If ctx expires, the error 465// will be set to ctx.Err(). In an error case, Next will return false. 466// 467// If Next returns false, subsequent calls will also return false. 468func (cs *ChangeStream) Next(ctx context.Context) bool { 469 return cs.next(ctx, false) 470} 471 472// TryNext attempts to get the next event for this change stream. It returns true if there were no errors and the next 473// event document is available. 474// 475// TryNext returns false if the change stream is closed by the server, an error occurs when getting changes from the 476// server, the next change is not yet available, or ctx expires. If ctx expires, the error will be set to ctx.Err(). 477// 478// If TryNext returns false and an error occurred or the change stream was closed 479// (i.e. cs.Err() != nil || cs.ID() == 0), subsequent attempts will also return false. Otherwise, it is safe to call 480// TryNext again until a change is available. 481// 482// This method requires driver version >= 1.2.0. 483func (cs *ChangeStream) TryNext(ctx context.Context) bool { 484 return cs.next(ctx, true) 485} 486 487func (cs *ChangeStream) next(ctx context.Context, nonBlocking bool) bool { 488 // return false right away if the change stream has already errored. 489 if cs.err != nil { 490 return false 491 } 492 493 if ctx == nil { 494 ctx = context.Background() 495 } 496 497 if len(cs.batch) == 0 { 498 cs.loopNext(ctx, nonBlocking) 499 if cs.err != nil { 500 cs.err = replaceErrors(cs.err) 501 return false 502 } 503 if len(cs.batch) == 0 { 504 return false 505 } 506 } 507 508 // successfully got non-empty batch 509 cs.Current = bson.Raw(cs.batch[0]) 510 cs.batch = cs.batch[1:] 511 if cs.err = cs.storeResumeToken(); cs.err != nil { 512 return false 513 } 514 return true 515} 516 517func (cs *ChangeStream) loopNext(ctx context.Context, nonBlocking bool) { 518 for { 519 if cs.cursor == nil { 520 return 521 } 522 523 if cs.cursor.Next(ctx) { 524 // non-empty batch returned 525 cs.batch, cs.err = cs.cursor.Batch().Documents() 526 return 527 } 528 529 cs.err = replaceErrors(cs.cursor.Err()) 530 if cs.err == nil { 531 // If a getMore was done but the batch was empty, the batch cursor will return false with no error. 532 // Update the tracked resume token to catch the post batch resume token from the server response. 533 cs.updatePbrtFromCommand() 534 if nonBlocking { 535 // stop after a successful getMore, even though the batch was empty 536 return 537 } 538 continue // loop getMore until a non-empty batch is returned or an error occurs 539 } 540 541 switch t := cs.err.(type) { 542 case CommandError: 543 if t.Code == errorInterrupted || t.Code == errorCappedPositionLost || t.Code == errorCursorKilled || t.HasErrorLabel("NonResumableChangeStreamError") { 544 return 545 } 546 } 547 548 // ignore error from cursor close because if the cursor is deleted or errors we tried to close it and will remake and try to get next batch 549 _ = cs.cursor.Close(ctx) 550 if cs.err = cs.executeOperation(ctx, true); cs.err != nil { 551 return 552 } 553 } 554} 555 556// Returns true if the underlying cursor's batch is empty 557func (cs *ChangeStream) emptyBatch() bool { 558 return cs.cursor.Batch().Empty() 559} 560 561// StreamType represents the cluster type against which a ChangeStream was created. 562type StreamType uint8 563 564// These constants represent valid change stream types. A change stream can be initialized over a collection, all 565// collections in a database, or over a cluster. 566const ( 567 CollectionStream StreamType = iota 568 DatabaseStream 569 ClientStream 570) 571