1// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package options
8
9import (
10	"time"
11
12	"go.mongodb.org/mongo-driver/bson/primitive"
13)
14
15// ChangeStreamOptions represents options that can be used to configure a Watch operation.
16type ChangeStreamOptions struct {
17	// The maximum number of documents to be included in each batch returned by the server.
18	BatchSize *int32
19
20	// Specifies a collation to use for string comparisons during the operation. This option is only valid for MongoDB
21	// versions >= 3.4. For previous server versions, the driver will return an error if this option is used. The
22	// default value is nil, which means the default collation of the collection will be used.
23	Collation *Collation
24
25	// Specifies whether the updated document should be returned in change notifications for update operations along
26	// with the deltas describing the changes made to the document. The default is options.Default, which means that
27	// the updated document will not be included in the change notification.
28	FullDocument *FullDocument
29
30	// The maximum amount of time that the server should wait for new documents to satisfy a tailable cursor query.
31	MaxAwaitTime *time.Duration
32
33	// A document specifying the logical starting point for the change stream. Only changes corresponding to an oplog
34	// entry immediately after the resume token will be returned. If this is specified, StartAtOperationTime and
35	// StartAfter must not be set.
36	ResumeAfter interface{}
37
38	// If specified, the change stream will only return changes that occurred at or after the given timestamp. This
39	// option is only valid for MongoDB versions >= 4.0. If this is specified, ResumeAfter and StartAfter must not be
40	// set.
41	StartAtOperationTime *primitive.Timestamp
42
43	// A document specifying the logical starting point for the change stream. This is similar to the ResumeAfter
44	// option, but allows a resume token from an "invalidate" notification to be used. This allows a change stream on a
45	// collection to be resumed after the collection has been dropped and recreated or renamed. Only changes
46	// corresponding to an oplog entry immediately after the specified token will be returned. If this is specified,
47	// ResumeAfter and StartAtOperationTime must not be set. This option is only valid for MongoDB versions >= 4.1.1.
48	StartAfter interface{}
49}
50
51// ChangeStream creates a new ChangeStreamOptions instance.
52func ChangeStream() *ChangeStreamOptions {
53	cso := &ChangeStreamOptions{}
54	cso.SetFullDocument(Default)
55	return cso
56}
57
58// SetBatchSize sets the value for the BatchSize field.
59func (cso *ChangeStreamOptions) SetBatchSize(i int32) *ChangeStreamOptions {
60	cso.BatchSize = &i
61	return cso
62}
63
64// SetCollation sets the value for the Collation field.
65func (cso *ChangeStreamOptions) SetCollation(c Collation) *ChangeStreamOptions {
66	cso.Collation = &c
67	return cso
68}
69
70// SetFullDocument sets the value for the FullDocument field.
71func (cso *ChangeStreamOptions) SetFullDocument(fd FullDocument) *ChangeStreamOptions {
72	cso.FullDocument = &fd
73	return cso
74}
75
76// SetMaxAwaitTime sets the value for the MaxAwaitTime field.
77func (cso *ChangeStreamOptions) SetMaxAwaitTime(d time.Duration) *ChangeStreamOptions {
78	cso.MaxAwaitTime = &d
79	return cso
80}
81
82// SetResumeAfter sets the value for the ResumeAfter field.
83func (cso *ChangeStreamOptions) SetResumeAfter(rt interface{}) *ChangeStreamOptions {
84	cso.ResumeAfter = rt
85	return cso
86}
87
88// SetStartAtOperationTime sets the value for the StartAtOperationTime field.
89func (cso *ChangeStreamOptions) SetStartAtOperationTime(t *primitive.Timestamp) *ChangeStreamOptions {
90	cso.StartAtOperationTime = t
91	return cso
92}
93
94// SetStartAfter sets the value for the StartAfter field.
95func (cso *ChangeStreamOptions) SetStartAfter(sa interface{}) *ChangeStreamOptions {
96	cso.StartAfter = sa
97	return cso
98}
99
100// MergeChangeStreamOptions combines the given ChangeStreamOptions instances into a single ChangeStreamOptions in a
101// last-one-wins fashion.
102func MergeChangeStreamOptions(opts ...*ChangeStreamOptions) *ChangeStreamOptions {
103	csOpts := ChangeStream()
104	for _, cso := range opts {
105		if cso == nil {
106			continue
107		}
108		if cso.BatchSize != nil {
109			csOpts.BatchSize = cso.BatchSize
110		}
111		if cso.Collation != nil {
112			csOpts.Collation = cso.Collation
113		}
114		if cso.FullDocument != nil {
115			csOpts.FullDocument = cso.FullDocument
116		}
117		if cso.MaxAwaitTime != nil {
118			csOpts.MaxAwaitTime = cso.MaxAwaitTime
119		}
120		if cso.ResumeAfter != nil {
121			csOpts.ResumeAfter = cso.ResumeAfter
122		}
123		if cso.StartAtOperationTime != nil {
124			csOpts.StartAtOperationTime = cso.StartAtOperationTime
125		}
126		if cso.StartAfter != nil {
127			csOpts.StartAfter = cso.StartAfter
128		}
129	}
130
131	return csOpts
132}
133