1// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package mongo
8
9import (
10	"context"
11	"errors"
12	"fmt"
13	"reflect"
14	"strconv"
15	"time"
16
17	"go.mongodb.org/mongo-driver/bson"
18	"go.mongodb.org/mongo-driver/bson/bsoncodec"
19	"go.mongodb.org/mongo-driver/bson/primitive"
20	"go.mongodb.org/mongo-driver/mongo/options"
21	"go.mongodb.org/mongo-driver/mongo/readconcern"
22	"go.mongodb.org/mongo-driver/mongo/readpref"
23	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
24	"go.mongodb.org/mongo-driver/x/mongo/driver"
25	"go.mongodb.org/mongo-driver/x/mongo/driver/description"
26	"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
27	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
28)
29
30const errorInterrupted int32 = 11601
31const errorCappedPositionLost int32 = 136
32const errorCursorKilled int32 = 237
33
34// ErrMissingResumeToken indicates that a change stream notification from the server did not contain a resume token.
35var ErrMissingResumeToken = errors.New("cannot provide resume functionality when the resume token is missing")
36
37// ErrNilCursor indicates that the underlying cursor for the change stream is nil.
38var ErrNilCursor = errors.New("cursor is nil")
39
40// ChangeStream is used to iterate over a stream of events. Each event can be decoded into a Go type via the Decode
41// method or accessed as raw BSON via the Current field. For more information about change streams, see
42// https://docs.mongodb.com/manual/changeStreams/.
43type ChangeStream struct {
44	// Current is the BSON bytes of the current event. This property is only valid until the next call to Next or
45	// TryNext. If continued access is required, a copy must be made.
46	Current bson.Raw
47
48	aggregate     *operation.Aggregate
49	pipelineSlice []bsoncore.Document
50	cursor        changeStreamCursor
51	cursorOptions driver.CursorOptions
52	batch         []bsoncore.Document
53	resumeToken   bson.Raw
54	err           error
55	sess          *session.Client
56	client        *Client
57	registry      *bsoncodec.Registry
58	streamType    StreamType
59	options       *options.ChangeStreamOptions
60	selector      description.ServerSelector
61	operationTime *primitive.Timestamp
62}
63
64type changeStreamConfig struct {
65	readConcern    *readconcern.ReadConcern
66	readPreference *readpref.ReadPref
67	client         *Client
68	registry       *bsoncodec.Registry
69	streamType     StreamType
70	collectionName string
71	databaseName   string
72}
73
74func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline interface{},
75	opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
76	if ctx == nil {
77		ctx = context.Background()
78	}
79
80	cs := &ChangeStream{
81		client:     config.client,
82		registry:   config.registry,
83		streamType: config.streamType,
84		options:    options.MergeChangeStreamOptions(opts...),
85		selector:   description.ReadPrefSelector(config.readPreference),
86	}
87
88	cs.sess = sessionFromContext(ctx)
89	if cs.sess == nil && cs.client.sessionPool != nil {
90		cs.sess, cs.err = session.NewClientSession(cs.client.sessionPool, cs.client.id, session.Implicit)
91		if cs.err != nil {
92			return nil, cs.Err()
93		}
94	}
95	if cs.err = cs.client.validSession(cs.sess); cs.err != nil {
96		closeImplicitSession(cs.sess)
97		return nil, cs.Err()
98	}
99
100	cs.aggregate = operation.NewAggregate(nil).
101		ReadPreference(config.readPreference).ReadConcern(config.readConcern).
102		Deployment(cs.client.deployment).ClusterClock(cs.client.clock).
103		CommandMonitor(cs.client.monitor).Session(cs.sess).ServerSelector(cs.selector).Retry(driver.RetryNone)
104
105	if cs.options.Collation != nil {
106		cs.aggregate.Collation(bsoncore.Document(cs.options.Collation.ToDocument()))
107	}
108	if cs.options.BatchSize != nil {
109		cs.aggregate.BatchSize(*cs.options.BatchSize)
110		cs.cursorOptions.BatchSize = *cs.options.BatchSize
111	}
112	if cs.options.MaxAwaitTime != nil {
113		cs.cursorOptions.MaxTimeMS = int64(time.Duration(*cs.options.MaxAwaitTime) / time.Millisecond)
114	}
115	cs.cursorOptions.CommandMonitor = cs.client.monitor
116
117	switch cs.streamType {
118	case ClientStream:
119		cs.aggregate.Database("admin")
120	case DatabaseStream:
121		cs.aggregate.Database(config.databaseName)
122	case CollectionStream:
123		cs.aggregate.Collection(config.collectionName).Database(config.databaseName)
124	default:
125		closeImplicitSession(cs.sess)
126		return nil, fmt.Errorf("must supply a valid StreamType in config, instead of %v", cs.streamType)
127	}
128
129	// When starting a change stream, cache startAfter as the first resume token if it is set. If not, cache
130	// resumeAfter. If neither is set, do not cache a resume token.
131	resumeToken := cs.options.StartAfter
132	if resumeToken == nil {
133		resumeToken = cs.options.ResumeAfter
134	}
135	var marshaledToken bson.Raw
136	if resumeToken != nil {
137		if marshaledToken, cs.err = bson.Marshal(resumeToken); cs.err != nil {
138			closeImplicitSession(cs.sess)
139			return nil, cs.Err()
140		}
141	}
142	cs.resumeToken = marshaledToken
143
144	if cs.err = cs.buildPipelineSlice(pipeline); cs.err != nil {
145		closeImplicitSession(cs.sess)
146		return nil, cs.Err()
147	}
148	var pipelineArr bsoncore.Document
149	pipelineArr, cs.err = cs.pipelineToBSON()
150	cs.aggregate.Pipeline(pipelineArr)
151
152	if cs.err = cs.executeOperation(ctx, false); cs.err != nil {
153		closeImplicitSession(cs.sess)
154		return nil, cs.Err()
155	}
156
157	return cs, cs.Err()
158}
159
160func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) error {
161	var server driver.Server
162	var conn driver.Connection
163	var err error
164
165	if server, cs.err = cs.client.deployment.SelectServer(ctx, cs.selector); cs.err != nil {
166		return cs.Err()
167	}
168	if conn, cs.err = server.Connection(ctx); cs.err != nil {
169		return cs.Err()
170	}
171
172	defer conn.Close()
173
174	cs.aggregate.Deployment(driver.SingleConnectionDeployment{
175		C: conn,
176	})
177
178	if resuming {
179		cs.replaceOptions(ctx, conn.Description().WireVersion) // pass wire version
180
181		csOptDoc := cs.createPipelineOptionsDoc()
182		pipIdx, pipDoc := bsoncore.AppendDocumentStart(nil)
183		pipDoc = bsoncore.AppendDocumentElement(pipDoc, "$changeStream", csOptDoc)
184		if pipDoc, cs.err = bsoncore.AppendDocumentEnd(pipDoc, pipIdx); cs.err != nil {
185			return cs.Err()
186		}
187		cs.pipelineSlice[0] = pipDoc
188
189		var plArr bsoncore.Document
190		if plArr, cs.err = cs.pipelineToBSON(); cs.err != nil {
191			return cs.Err()
192		}
193		cs.aggregate.Pipeline(plArr)
194	}
195
196	if original := cs.aggregate.Execute(ctx); original != nil {
197		wireVersion := conn.Description().WireVersion
198		retryableRead := cs.client.retryReads && wireVersion != nil && wireVersion.Max >= 6
199		if !retryableRead {
200			cs.err = replaceErrors(original)
201			return cs.err
202		}
203
204		cs.err = original
205		switch tt := original.(type) {
206		case driver.Error:
207			if !tt.Retryable() {
208				break
209			}
210
211			server, err = cs.client.deployment.SelectServer(ctx, cs.selector)
212			if err != nil {
213				break
214			}
215
216			conn.Close()
217			conn, err = server.Connection(ctx)
218			if err != nil {
219				break
220			}
221			defer conn.Close()
222
223			wireVersion := conn.Description().WireVersion
224			if wireVersion == nil || wireVersion.Max < 6 {
225				break
226			}
227
228			cs.aggregate.Deployment(driver.SingleConnectionDeployment{
229				C: conn,
230			})
231			cs.err = cs.aggregate.Execute(ctx)
232		}
233
234		if cs.err != nil {
235			cs.err = replaceErrors(cs.err)
236			return cs.Err()
237		}
238
239	}
240	cs.err = nil
241
242	cr := cs.aggregate.ResultCursorResponse()
243	cr.Server = server
244
245	cs.cursor, cs.err = driver.NewBatchCursor(cr, cs.sess, cs.client.clock, cs.cursorOptions)
246	if cs.err = replaceErrors(cs.err); cs.err != nil {
247		return cs.Err()
248	}
249
250	cs.updatePbrtFromCommand()
251	if cs.options.StartAtOperationTime == nil && cs.options.ResumeAfter == nil &&
252		cs.options.StartAfter == nil && conn.Description().WireVersion.Max >= 7 &&
253		cs.emptyBatch() && cs.resumeToken == nil {
254		cs.operationTime = cs.sess.OperationTime
255	}
256
257	return cs.Err()
258}
259
260// Updates the post batch resume token after a successful aggregate or getMore operation.
261func (cs *ChangeStream) updatePbrtFromCommand() {
262	// Only cache the pbrt if an empty batch was returned and a pbrt was included
263	if pbrt := cs.cursor.PostBatchResumeToken(); cs.emptyBatch() && pbrt != nil {
264		cs.resumeToken = bson.Raw(pbrt)
265	}
266}
267
268func (cs *ChangeStream) storeResumeToken() error {
269	// If cs.Current is the last document in the batch and a pbrt is included, cache the pbrt
270	// Otherwise, cache the _id of the document
271	var tokenDoc bson.Raw
272	if len(cs.batch) == 0 {
273		if pbrt := cs.cursor.PostBatchResumeToken(); pbrt != nil {
274			tokenDoc = bson.Raw(pbrt)
275		}
276	}
277
278	if tokenDoc == nil {
279		var ok bool
280		tokenDoc, ok = cs.Current.Lookup("_id").DocumentOK()
281		if !ok {
282			_ = cs.Close(context.Background())
283			return ErrMissingResumeToken
284		}
285	}
286
287	cs.resumeToken = tokenDoc
288	return nil
289}
290
291func (cs *ChangeStream) buildPipelineSlice(pipeline interface{}) error {
292	val := reflect.ValueOf(pipeline)
293	if !val.IsValid() || !(val.Kind() == reflect.Slice) {
294		cs.err = errors.New("can only transform slices and arrays into aggregation pipelines, but got invalid")
295		return cs.err
296	}
297
298	cs.pipelineSlice = make([]bsoncore.Document, 0, val.Len()+1)
299
300	csIdx, csDoc := bsoncore.AppendDocumentStart(nil)
301	csDocTemp := cs.createPipelineOptionsDoc()
302	if cs.err != nil {
303		return cs.err
304	}
305	csDoc = bsoncore.AppendDocumentElement(csDoc, "$changeStream", csDocTemp)
306	csDoc, cs.err = bsoncore.AppendDocumentEnd(csDoc, csIdx)
307	if cs.err != nil {
308		return cs.err
309	}
310	cs.pipelineSlice = append(cs.pipelineSlice, csDoc)
311
312	for i := 0; i < val.Len(); i++ {
313		var elem []byte
314		elem, cs.err = transformBsoncoreDocument(cs.registry, val.Index(i).Interface())
315		if cs.err != nil {
316			return cs.err
317		}
318
319		cs.pipelineSlice = append(cs.pipelineSlice, elem)
320	}
321
322	return cs.err
323}
324
325func (cs *ChangeStream) createPipelineOptionsDoc() bsoncore.Document {
326	plDocIdx, plDoc := bsoncore.AppendDocumentStart(nil)
327
328	if cs.streamType == ClientStream {
329		plDoc = bsoncore.AppendBooleanElement(plDoc, "allChangesForCluster", true)
330	}
331
332	if cs.options.FullDocument != nil {
333		plDoc = bsoncore.AppendStringElement(plDoc, "fullDocument", string(*cs.options.FullDocument))
334	}
335
336	if cs.options.ResumeAfter != nil {
337		var raDoc bsoncore.Document
338		raDoc, cs.err = transformBsoncoreDocument(cs.registry, cs.options.ResumeAfter)
339		if cs.err != nil {
340			return nil
341		}
342
343		plDoc = bsoncore.AppendDocumentElement(plDoc, "resumeAfter", raDoc)
344	}
345
346	if cs.options.StartAfter != nil {
347		var saDoc bsoncore.Document
348		saDoc, cs.err = transformBsoncoreDocument(cs.registry, cs.options.StartAfter)
349		if cs.err != nil {
350			return nil
351		}
352
353		plDoc = bsoncore.AppendDocumentElement(plDoc, "startAfter", saDoc)
354	}
355
356	if cs.options.StartAtOperationTime != nil {
357		plDoc = bsoncore.AppendTimestampElement(plDoc, "startAtOperationTime", cs.options.StartAtOperationTime.T, cs.options.StartAtOperationTime.I)
358	}
359
360	if plDoc, cs.err = bsoncore.AppendDocumentEnd(plDoc, plDocIdx); cs.err != nil {
361		return nil
362	}
363
364	return plDoc
365}
366
367func (cs *ChangeStream) pipelineToBSON() (bsoncore.Document, error) {
368	pipelineDocIdx, pipelineArr := bsoncore.AppendArrayStart(nil)
369	for i, doc := range cs.pipelineSlice {
370		pipelineArr = bsoncore.AppendDocumentElement(pipelineArr, strconv.Itoa(i), doc)
371	}
372	if pipelineArr, cs.err = bsoncore.AppendArrayEnd(pipelineArr, pipelineDocIdx); cs.err != nil {
373		return nil, cs.err
374	}
375	return pipelineArr, cs.err
376}
377
378func (cs *ChangeStream) replaceOptions(ctx context.Context, wireVersion *description.VersionRange) {
379	// Cached resume token: use the resume token as the resumeAfter option and set no other resume options
380	if cs.resumeToken != nil {
381		cs.options.SetResumeAfter(cs.resumeToken)
382		cs.options.SetStartAfter(nil)
383		cs.options.SetStartAtOperationTime(nil)
384		return
385	}
386
387	// No cached resume token but cached operation time: use the operation time as the startAtOperationTime option and
388	// set no other resume options
389	if (cs.sess.OperationTime != nil || cs.options.StartAtOperationTime != nil) && wireVersion.Max >= 7 {
390		opTime := cs.options.StartAtOperationTime
391		if cs.operationTime != nil {
392			opTime = cs.sess.OperationTime
393		}
394
395		cs.options.SetStartAtOperationTime(opTime)
396		cs.options.SetResumeAfter(nil)
397		cs.options.SetStartAfter(nil)
398		return
399	}
400
401	// No cached resume token or operation time: set none of the resume options
402	cs.options.SetResumeAfter(nil)
403	cs.options.SetStartAfter(nil)
404	cs.options.SetStartAtOperationTime(nil)
405}
406
407// ID returns the ID for this change stream, or 0 if the cursor has been closed or exhausted.
408func (cs *ChangeStream) ID() int64 {
409	if cs.cursor == nil {
410		return 0
411	}
412	return cs.cursor.ID()
413}
414
415// Decode will unmarshal the current event document into val and return any errors from the unmarshalling process
416// without any modification. If val is nil or is a typed nil, an error will be returned.
417func (cs *ChangeStream) Decode(val interface{}) error {
418	if cs.cursor == nil {
419		return ErrNilCursor
420	}
421
422	return bson.UnmarshalWithRegistry(cs.registry, cs.Current, val)
423}
424
425// Err returns the last error seen by the change stream, or nil if no errors has occurred.
426func (cs *ChangeStream) Err() error {
427	if cs.err != nil {
428		return replaceErrors(cs.err)
429	}
430	if cs.cursor == nil {
431		return nil
432	}
433
434	return replaceErrors(cs.cursor.Err())
435}
436
437// Close closes this change stream and the underlying cursor. Next and TryNext must not be called after Close has been
438// called. Close is idempotent. After the first call, any subsequent calls will not change the state.
439func (cs *ChangeStream) Close(ctx context.Context) error {
440	if ctx == nil {
441		ctx = context.Background()
442	}
443
444	defer closeImplicitSession(cs.sess)
445
446	if cs.cursor == nil {
447		return nil // cursor is already closed
448	}
449
450	cs.err = replaceErrors(cs.cursor.Close(ctx))
451	cs.cursor = nil
452	return cs.Err()
453}
454
455// ResumeToken returns the last cached resume token for this change stream, or nil if a resume token has not been
456// stored.
457func (cs *ChangeStream) ResumeToken() bson.Raw {
458	return cs.resumeToken
459}
460
461// Next gets the next event for this change stream. It returns true if there were no errors and the next event document
462// is available.
463//
464// Next blocks until an event is available, an error occurs, or ctx expires. If ctx expires, the error
465// will be set to ctx.Err(). In an error case, Next will return false.
466//
467// If Next returns false, subsequent calls will also return false.
468func (cs *ChangeStream) Next(ctx context.Context) bool {
469	return cs.next(ctx, false)
470}
471
472// TryNext attempts to get the next event for this change stream. It returns true if there were no errors and the next
473// event document is available.
474//
475// TryNext returns false if the change stream is closed by the server, an error occurs when getting changes from the
476// server, the next change is not yet available, or ctx expires. If ctx expires, the error will be set to ctx.Err().
477//
478// If TryNext returns false and an error occurred or the change stream was closed
479// (i.e. cs.Err() != nil || cs.ID() == 0), subsequent attempts will also return false. Otherwise, it is safe to call
480// TryNext again until a change is available.
481//
482// This method requires driver version >= 1.2.0.
483func (cs *ChangeStream) TryNext(ctx context.Context) bool {
484	return cs.next(ctx, true)
485}
486
487func (cs *ChangeStream) next(ctx context.Context, nonBlocking bool) bool {
488	// return false right away if the change stream has already errored.
489	if cs.err != nil {
490		return false
491	}
492
493	if ctx == nil {
494		ctx = context.Background()
495	}
496
497	if len(cs.batch) == 0 {
498		cs.loopNext(ctx, nonBlocking)
499		if cs.err != nil {
500			cs.err = replaceErrors(cs.err)
501			return false
502		}
503		if len(cs.batch) == 0 {
504			return false
505		}
506	}
507
508	// successfully got non-empty batch
509	cs.Current = bson.Raw(cs.batch[0])
510	cs.batch = cs.batch[1:]
511	if cs.err = cs.storeResumeToken(); cs.err != nil {
512		return false
513	}
514	return true
515}
516
517func (cs *ChangeStream) loopNext(ctx context.Context, nonBlocking bool) {
518	for {
519		if cs.cursor == nil {
520			return
521		}
522
523		if cs.cursor.Next(ctx) {
524			// non-empty batch returned
525			cs.batch, cs.err = cs.cursor.Batch().Documents()
526			return
527		}
528
529		cs.err = replaceErrors(cs.cursor.Err())
530		if cs.err == nil {
531			// If a getMore was done but the batch was empty, the batch cursor will return false with no error.
532			// Update the tracked resume token to catch the post batch resume token from the server response.
533			cs.updatePbrtFromCommand()
534			if nonBlocking {
535				// stop after a successful getMore, even though the batch was empty
536				return
537			}
538			continue // loop getMore until a non-empty batch is returned or an error occurs
539		}
540
541		switch t := cs.err.(type) {
542		case CommandError:
543			if t.Code == errorInterrupted || t.Code == errorCappedPositionLost || t.Code == errorCursorKilled || t.HasErrorLabel("NonResumableChangeStreamError") {
544				return
545			}
546		}
547
548		// ignore error from cursor close because if the cursor is deleted or errors we tried to close it and will remake and try to get next batch
549		_ = cs.cursor.Close(ctx)
550		if cs.err = cs.executeOperation(ctx, true); cs.err != nil {
551			return
552		}
553	}
554}
555
556// Returns true if the underlying cursor's batch is empty
557func (cs *ChangeStream) emptyBatch() bool {
558	return cs.cursor.Batch().Empty()
559}
560
561// StreamType represents the cluster type against which a ChangeStream was created.
562type StreamType uint8
563
564// These constants represent valid change stream types. A change stream can be initialized over a collection, all
565// collections in a database, or over a cluster.
566const (
567	CollectionStream StreamType = iota
568	DatabaseStream
569	ClientStream
570)
571