1// Copyright (C) MongoDB, Inc. 2017-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package options 8 9import ( 10 "time" 11 12 "go.mongodb.org/mongo-driver/bson/primitive" 13) 14 15// ChangeStreamOptions represents options that can be used to configure a Watch operation. 16type ChangeStreamOptions struct { 17 // The maximum number of documents to be included in each batch returned by the server. 18 BatchSize *int32 19 20 // Specifies a collation to use for string comparisons during the operation. This option is only valid for MongoDB 21 // versions >= 3.4. For previous server versions, the driver will return an error if this option is used. The 22 // default value is nil, which means the default collation of the collection will be used. 23 Collation *Collation 24 25 // Specifies whether the updated document should be returned in change notifications for update operations along 26 // with the deltas describing the changes made to the document. The default is options.Default, which means that 27 // the updated document will not be included in the change notification. 28 FullDocument *FullDocument 29 30 // The maximum amount of time that the server should wait for new documents to satisfy a tailable cursor query. 31 MaxAwaitTime *time.Duration 32 33 // A document specifying the logical starting point for the change stream. Only changes corresponding to an oplog 34 // entry immediately after the resume token will be returned. If this is specified, StartAtOperationTime and 35 // StartAfter must not be set. 36 ResumeAfter interface{} 37 38 // If specified, the change stream will only return changes that occurred at or after the given timestamp. This 39 // option is only valid for MongoDB versions >= 4.0. If this is specified, ResumeAfter and StartAfter must not be 40 // set. 41 StartAtOperationTime *primitive.Timestamp 42 43 // A document specifying the logical starting point for the change stream. This is similar to the ResumeAfter 44 // option, but allows a resume token from an "invalidate" notification to be used. This allows a change stream on a 45 // collection to be resumed after the collection has been dropped and recreated or renamed. Only changes 46 // corresponding to an oplog entry immediately after the specified token will be returned. If this is specified, 47 // ResumeAfter and StartAtOperationTime must not be set. This option is only valid for MongoDB versions >= 4.1.1. 48 StartAfter interface{} 49} 50 51// ChangeStream creates a new ChangeStreamOptions instance. 52func ChangeStream() *ChangeStreamOptions { 53 cso := &ChangeStreamOptions{} 54 cso.SetFullDocument(Default) 55 return cso 56} 57 58// SetBatchSize sets the value for the BatchSize field. 59func (cso *ChangeStreamOptions) SetBatchSize(i int32) *ChangeStreamOptions { 60 cso.BatchSize = &i 61 return cso 62} 63 64// SetCollation sets the value for the Collation field. 65func (cso *ChangeStreamOptions) SetCollation(c Collation) *ChangeStreamOptions { 66 cso.Collation = &c 67 return cso 68} 69 70// SetFullDocument sets the value for the FullDocument field. 71func (cso *ChangeStreamOptions) SetFullDocument(fd FullDocument) *ChangeStreamOptions { 72 cso.FullDocument = &fd 73 return cso 74} 75 76// SetMaxAwaitTime sets the value for the MaxAwaitTime field. 77func (cso *ChangeStreamOptions) SetMaxAwaitTime(d time.Duration) *ChangeStreamOptions { 78 cso.MaxAwaitTime = &d 79 return cso 80} 81 82// SetResumeAfter sets the value for the ResumeAfter field. 83func (cso *ChangeStreamOptions) SetResumeAfter(rt interface{}) *ChangeStreamOptions { 84 cso.ResumeAfter = rt 85 return cso 86} 87 88// SetStartAtOperationTime sets the value for the StartAtOperationTime field. 89func (cso *ChangeStreamOptions) SetStartAtOperationTime(t *primitive.Timestamp) *ChangeStreamOptions { 90 cso.StartAtOperationTime = t 91 return cso 92} 93 94// SetStartAfter sets the value for the StartAfter field. 95func (cso *ChangeStreamOptions) SetStartAfter(sa interface{}) *ChangeStreamOptions { 96 cso.StartAfter = sa 97 return cso 98} 99 100// MergeChangeStreamOptions combines the given ChangeStreamOptions instances into a single ChangeStreamOptions in a 101// last-one-wins fashion. 102func MergeChangeStreamOptions(opts ...*ChangeStreamOptions) *ChangeStreamOptions { 103 csOpts := ChangeStream() 104 for _, cso := range opts { 105 if cso == nil { 106 continue 107 } 108 if cso.BatchSize != nil { 109 csOpts.BatchSize = cso.BatchSize 110 } 111 if cso.Collation != nil { 112 csOpts.Collation = cso.Collation 113 } 114 if cso.FullDocument != nil { 115 csOpts.FullDocument = cso.FullDocument 116 } 117 if cso.MaxAwaitTime != nil { 118 csOpts.MaxAwaitTime = cso.MaxAwaitTime 119 } 120 if cso.ResumeAfter != nil { 121 csOpts.ResumeAfter = cso.ResumeAfter 122 } 123 if cso.StartAtOperationTime != nil { 124 csOpts.StartAtOperationTime = cso.StartAtOperationTime 125 } 126 if cso.StartAfter != nil { 127 csOpts.StartAfter = cso.StartAfter 128 } 129 } 130 131 return csOpts 132} 133