Lines Matching refs:cs

105 	cs := &ChangeStream{
113 cs.sess = sessionFromContext(ctx)
114 if cs.sess == nil && cs.client.sessionPool != nil {
115 cs.sess, cs.err = session.NewClientSession(cs.client.sessionPool, cs.client.id, session.Implicit)
116 if cs.err != nil {
117 return nil, cs.Err()
120 if cs.err = cs.client.validSession(cs.sess); cs.err != nil {
121 closeImplicitSession(cs.sess)
122 return nil, cs.Err()
125 cs.aggregate = operation.NewAggregate(nil).
127 Deployment(cs.client.deployment).ClusterClock(cs.client.clock).
128 …CommandMonitor(cs.client.monitor).Session(cs.sess).ServerSelector(cs.selector).Retry(driver.RetryN…
132 cs.cursorOptions.Crypt = config.crypt
134 if cs.options.Collation != nil {
135 cs.aggregate.Collation(bsoncore.Document(cs.options.Collation.ToDocument()))
137 if cs.options.BatchSize != nil {
138 cs.aggregate.BatchSize(*cs.options.BatchSize)
139 cs.cursorOptions.BatchSize = *cs.options.BatchSize
141 if cs.options.MaxAwaitTime != nil {
142 cs.cursorOptions.MaxTimeMS = int64(time.Duration(*cs.options.MaxAwaitTime) / time.Millisecond)
144 cs.cursorOptions.CommandMonitor = cs.client.monitor
146 switch cs.streamType {
148 cs.aggregate.Database("admin")
150 cs.aggregate.Database(config.databaseName)
152 cs.aggregate.Collection(config.collectionName).Database(config.databaseName)
154 closeImplicitSession(cs.sess)
155 return nil, fmt.Errorf("must supply a valid StreamType in config, instead of %v", cs.streamType)
160 resumeToken := cs.options.StartAfter
162 resumeToken = cs.options.ResumeAfter
166 if marshaledToken, cs.err = bson.Marshal(resumeToken); cs.err != nil {
167 closeImplicitSession(cs.sess)
168 return nil, cs.Err()
171 cs.resumeToken = marshaledToken
173 if cs.err = cs.buildPipelineSlice(pipeline); cs.err != nil {
174 closeImplicitSession(cs.sess)
175 return nil, cs.Err()
178 pipelineArr, cs.err = cs.pipelineToBSON()
179 cs.aggregate.Pipeline(pipelineArr)
181 if cs.err = cs.executeOperation(ctx, false); cs.err != nil {
182 closeImplicitSession(cs.sess)
183 return nil, cs.Err()
186 return cs, cs.Err()
189 func (cs *ChangeStream) createOperationDeployment(server driver.Server, connection driver.Connectio…
191 topologyKind: cs.client.deployment.Kind(),
197 func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) error {
202 if server, cs.err = cs.client.deployment.SelectServer(ctx, cs.selector); cs.err != nil {
203 return cs.Err()
205 if conn, cs.err = server.Connection(ctx); cs.err != nil {
206 return cs.Err()
209 cs.wireVersion = conn.Description().WireVersion
211 cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))
214 cs.replaceOptions(ctx, cs.wireVersion)
216 csOptDoc := cs.createPipelineOptionsDoc()
219 if pipDoc, cs.err = bsoncore.AppendDocumentEnd(pipDoc, pipIdx); cs.err != nil {
220 return cs.Err()
222 cs.pipelineSlice[0] = pipDoc
225 if plArr, cs.err = cs.pipelineToBSON(); cs.err != nil {
226 return cs.Err()
228 cs.aggregate.Pipeline(plArr)
231 if original := cs.aggregate.Execute(ctx); original != nil {
232 retryableRead := cs.client.retryReads && cs.wireVersion != nil && cs.wireVersion.Max >= 6
234 cs.err = replaceErrors(original)
235 return cs.err
238 cs.err = original
245 server, err = cs.client.deployment.SelectServer(ctx, cs.selector)
256 cs.wireVersion = conn.Description().WireVersion
258 if cs.wireVersion == nil || cs.wireVersion.Max < 6 {
262 cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))
263 cs.err = cs.aggregate.Execute(ctx)
266 if cs.err != nil {
267 cs.err = replaceErrors(cs.err)
268 return cs.Err()
272 cs.err = nil
274 cr := cs.aggregate.ResultCursorResponse()
277 cs.cursor, cs.err = driver.NewBatchCursor(cr, cs.sess, cs.client.clock, cs.cursorOptions)
278 if cs.err = replaceErrors(cs.err); cs.err != nil {
279 return cs.Err()
282 cs.updatePbrtFromCommand()
283 if cs.options.StartAtOperationTime == nil && cs.options.ResumeAfter == nil &&
284 cs.options.StartAfter == nil && cs.wireVersion.Max >= 7 &&
285 cs.emptyBatch() && cs.resumeToken == nil {
286 cs.operationTime = cs.sess.OperationTime
289 return cs.Err()
293 func (cs *ChangeStream) updatePbrtFromCommand() {
295 if pbrt := cs.cursor.PostBatchResumeToken(); cs.emptyBatch() && pbrt != nil {
296 cs.resumeToken = bson.Raw(pbrt)
300 func (cs *ChangeStream) storeResumeToken() error {
304 if len(cs.batch) == 0 {
305 if pbrt := cs.cursor.PostBatchResumeToken(); pbrt != nil {
312 tokenDoc, ok = cs.Current.Lookup("_id").DocumentOK()
314 _ = cs.Close(context.Background())
319 cs.resumeToken = tokenDoc
323 func (cs *ChangeStream) buildPipelineSlice(pipeline interface{}) error {
326cs.err = errors.New("can only transform slices and arrays into aggregation pipelines, but got inva…
327 return cs.err
330 cs.pipelineSlice = make([]bsoncore.Document, 0, val.Len()+1)
333 csDocTemp := cs.createPipelineOptionsDoc()
334 if cs.err != nil {
335 return cs.err
338 csDoc, cs.err = bsoncore.AppendDocumentEnd(csDoc, csIdx)
339 if cs.err != nil {
340 return cs.err
342 cs.pipelineSlice = append(cs.pipelineSlice, csDoc)
346 elem, cs.err = transformBsoncoreDocument(cs.registry, val.Index(i).Interface())
347 if cs.err != nil {
348 return cs.err
351 cs.pipelineSlice = append(cs.pipelineSlice, elem)
354 return cs.err
357 func (cs *ChangeStream) createPipelineOptionsDoc() bsoncore.Document {
360 if cs.streamType == ClientStream {
364 if cs.options.FullDocument != nil {
365 plDoc = bsoncore.AppendStringElement(plDoc, "fullDocument", string(*cs.options.FullDocument))
368 if cs.options.ResumeAfter != nil {
370 raDoc, cs.err = transformBsoncoreDocument(cs.registry, cs.options.ResumeAfter)
371 if cs.err != nil {
378 if cs.options.StartAfter != nil {
380 saDoc, cs.err = transformBsoncoreDocument(cs.registry, cs.options.StartAfter)
381 if cs.err != nil {
388 if cs.options.StartAtOperationTime != nil {
389 …ore.AppendTimestampElement(plDoc, "startAtOperationTime", cs.options.StartAtOperationTime.T, cs.op…
392 if plDoc, cs.err = bsoncore.AppendDocumentEnd(plDoc, plDocIdx); cs.err != nil {
399 func (cs *ChangeStream) pipelineToBSON() (bsoncore.Document, error) {
401 for i, doc := range cs.pipelineSlice {
404 if pipelineArr, cs.err = bsoncore.AppendArrayEnd(pipelineArr, pipelineDocIdx); cs.err != nil {
405 return nil, cs.err
407 return pipelineArr, cs.err
410 func (cs *ChangeStream) replaceOptions(ctx context.Context, wireVersion *description.VersionRange) {
412 if cs.resumeToken != nil {
413 cs.options.SetResumeAfter(cs.resumeToken)
414 cs.options.SetStartAfter(nil)
415 cs.options.SetStartAtOperationTime(nil)
421 …if (cs.sess.OperationTime != nil || cs.options.StartAtOperationTime != nil) && wireVersion.Max >= …
422 opTime := cs.options.StartAtOperationTime
423 if cs.operationTime != nil {
424 opTime = cs.sess.OperationTime
427 cs.options.SetStartAtOperationTime(opTime)
428 cs.options.SetResumeAfter(nil)
429 cs.options.SetStartAfter(nil)
434 cs.options.SetResumeAfter(nil)
435 cs.options.SetStartAfter(nil)
436 cs.options.SetStartAtOperationTime(nil)
440 func (cs *ChangeStream) ID() int64 {
441 if cs.cursor == nil {
444 return cs.cursor.ID()
449 func (cs *ChangeStream) Decode(val interface{}) error {
450 if cs.cursor == nil {
454 return bson.UnmarshalWithRegistry(cs.registry, cs.Current, val)
458 func (cs *ChangeStream) Err() error {
459 if cs.err != nil {
460 return replaceErrors(cs.err)
462 if cs.cursor == nil {
466 return replaceErrors(cs.cursor.Err())
471 func (cs *ChangeStream) Close(ctx context.Context) error {
476 defer closeImplicitSession(cs.sess)
478 if cs.cursor == nil {
482 cs.err = replaceErrors(cs.cursor.Close(ctx))
483 cs.cursor = nil
484 return cs.Err()
489 func (cs *ChangeStream) ResumeToken() bson.Raw {
490 return cs.resumeToken
500 func (cs *ChangeStream) Next(ctx context.Context) bool {
501 return cs.next(ctx, false)
515 func (cs *ChangeStream) TryNext(ctx context.Context) bool {
516 return cs.next(ctx, true)
519 func (cs *ChangeStream) next(ctx context.Context, nonBlocking bool) bool {
521 if cs.err != nil {
529 if len(cs.batch) == 0 {
530 cs.loopNext(ctx, nonBlocking)
531 if cs.err != nil {
532 cs.err = replaceErrors(cs.err)
535 if len(cs.batch) == 0 {
541 cs.Current = bson.Raw(cs.batch[0])
542 cs.batch = cs.batch[1:]
543 if cs.err = cs.storeResumeToken(); cs.err != nil {
549 func (cs *ChangeStream) loopNext(ctx context.Context, nonBlocking bool) {
551 if cs.cursor == nil {
555 if cs.cursor.Next(ctx) {
557 cs.batch, cs.err = cs.cursor.Batch().Documents()
561 cs.err = replaceErrors(cs.cursor.Err())
562 if cs.err == nil {
564 if cs.ID() == 0 {
570 cs.updatePbrtFromCommand()
578 if !cs.isResumableError() {
583 _ = cs.cursor.Close(ctx)
584 if cs.err = cs.executeOperation(ctx, true); cs.err != nil {
590 func (cs *ChangeStream) isResumableError() bool {
591 commandErr, ok := cs.err.(CommandError)
602 if cs.wireVersion != nil && cs.wireVersion.Includes(minResumableLabelWireVersion) {
612 func (cs *ChangeStream) emptyBatch() bool {
613 return cs.cursor.Batch().Empty()