1// Copyright (C) MongoDB, Inc. 2017-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package mongo 8 9import ( 10 "context" 11 "errors" 12 "fmt" 13 "io" 14 "reflect" 15 16 "go.mongodb.org/mongo-driver/bson" 17 "go.mongodb.org/mongo-driver/bson/bsoncodec" 18 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" 19 "go.mongodb.org/mongo-driver/x/mongo/driver" 20 "go.mongodb.org/mongo-driver/x/mongo/driver/session" 21) 22 23// Cursor is used to iterate over a stream of documents. Each document can be decoded into a Go type via the Decode 24// method or accessed as raw BSON via the Current field. 25type Cursor struct { 26 // Current contains the BSON bytes of the current change document. This property is only valid until the next call 27 // to Next or TryNext. If continued access is required, a copy must be made. 28 Current bson.Raw 29 30 bc batchCursor 31 batch *bsoncore.DocumentSequence 32 batchLength int 33 registry *bsoncodec.Registry 34 clientSession *session.Client 35 36 err error 37} 38 39func newCursor(bc batchCursor, registry *bsoncodec.Registry) (*Cursor, error) { 40 return newCursorWithSession(bc, registry, nil) 41} 42 43func newCursorWithSession(bc batchCursor, registry *bsoncodec.Registry, clientSession *session.Client) (*Cursor, error) { 44 if registry == nil { 45 registry = bson.DefaultRegistry 46 } 47 if bc == nil { 48 return nil, errors.New("batch cursor must not be nil") 49 } 50 c := &Cursor{ 51 bc: bc, 52 registry: registry, 53 clientSession: clientSession, 54 } 55 if bc.ID() == 0 { 56 c.closeImplicitSession() 57 } 58 59 // Initialize just the batchLength here so RemainingBatchLength will return an accurate result. The actual batch 60 // will be pulled up by the first Next/TryNext call. 61 c.batchLength = c.bc.Batch().DocumentCount() 62 return c, nil 63} 64 65func newEmptyCursor() *Cursor { 66 return &Cursor{bc: driver.NewEmptyBatchCursor()} 67} 68 69// ID returns the ID of this cursor, or 0 if the cursor has been closed or exhausted. 70func (c *Cursor) ID() int64 { return c.bc.ID() } 71 72// Next gets the next document for this cursor. It returns true if there were no errors and the cursor has not been 73// exhausted. 74// 75// Next blocks until a document is available, an error occurs, or ctx expires. If ctx expires, the 76// error will be set to ctx.Err(). In an error case, Next will return false. 77// 78// If Next returns false, subsequent calls will also return false. 79func (c *Cursor) Next(ctx context.Context) bool { 80 return c.next(ctx, false) 81} 82 83// TryNext attempts to get the next document for this cursor. It returns true if there were no errors and the next 84// document is available. This is only recommended for use with tailable cursors as a non-blocking alternative to 85// Next. See https://docs.mongodb.com/manual/core/tailable-cursors/ for more information about tailable cursors. 86// 87// TryNext returns false if the cursor is exhausted, an error occurs when getting results from the server, the next 88// document is not yet available, or ctx expires. If ctx expires, the error will be set to ctx.Err(). 89// 90// If TryNext returns false and an error occurred or the cursor has been exhausted (i.e. c.Err() != nil || c.ID() == 0), 91// subsequent attempts will also return false. Otherwise, it is safe to call TryNext again until a document is 92// available. 93// 94// This method requires driver version >= 1.2.0. 95func (c *Cursor) TryNext(ctx context.Context) bool { 96 return c.next(ctx, true) 97} 98 99func (c *Cursor) next(ctx context.Context, nonBlocking bool) bool { 100 // return false right away if the cursor has already errored. 101 if c.err != nil { 102 return false 103 } 104 105 if ctx == nil { 106 ctx = context.Background() 107 } 108 doc, err := c.batch.Next() 109 switch err { 110 case nil: 111 // Consume the next document in the current batch. 112 c.batchLength-- 113 c.Current = bson.Raw(doc) 114 return true 115 case io.EOF: // Need to do a getMore 116 default: 117 c.err = err 118 return false 119 } 120 121 // call the Next method in a loop until at least one document is returned in the next batch or 122 // the context times out. 123 for { 124 // If we don't have a next batch 125 if !c.bc.Next(ctx) { 126 // Do we have an error? If so we return false. 127 c.err = replaceErrors(c.bc.Err()) 128 if c.err != nil { 129 return false 130 } 131 // Is the cursor ID zero? 132 if c.bc.ID() == 0 { 133 c.closeImplicitSession() 134 return false 135 } 136 // empty batch, but cursor is still valid. 137 // use nonBlocking to determine if we should continue or return control to the caller. 138 if nonBlocking { 139 return false 140 } 141 continue 142 } 143 144 // close the implicit session if this was the last getMore 145 if c.bc.ID() == 0 { 146 c.closeImplicitSession() 147 } 148 149 // Use the new batch to update the batch and batchLength fields. Consume the first document in the batch. 150 c.batch = c.bc.Batch() 151 c.batchLength = c.batch.DocumentCount() 152 doc, err = c.batch.Next() 153 switch err { 154 case nil: 155 c.batchLength-- 156 c.Current = bson.Raw(doc) 157 return true 158 case io.EOF: // Empty batch so we continue 159 default: 160 c.err = err 161 return false 162 } 163 } 164} 165 166// Decode will unmarshal the current document into val and return any errors from the unmarshalling process without any 167// modification. If val is nil or is a typed nil, an error will be returned. 168func (c *Cursor) Decode(val interface{}) error { 169 return bson.UnmarshalWithRegistry(c.registry, c.Current, val) 170} 171 172// Err returns the last error seen by the Cursor, or nil if no error has occurred. 173func (c *Cursor) Err() error { return c.err } 174 175// Close closes this cursor. Next and TryNext must not be called after Close has been called. Close is idempotent. After 176// the first call, any subsequent calls will not change the state. 177func (c *Cursor) Close(ctx context.Context) error { 178 defer c.closeImplicitSession() 179 return replaceErrors(c.bc.Close(ctx)) 180} 181 182// All iterates the cursor and decodes each document into results. The results parameter must be a pointer to a slice. 183// The slice pointed to by results will be completely overwritten. This method will close the cursor after retrieving 184// all documents. If the cursor has been iterated, any previously iterated documents will not be included in results. 185// 186// This method requires driver version >= 1.1.0. 187func (c *Cursor) All(ctx context.Context, results interface{}) error { 188 resultsVal := reflect.ValueOf(results) 189 if resultsVal.Kind() != reflect.Ptr { 190 return fmt.Errorf("results argument must be a pointer to a slice, but was a %s", resultsVal.Kind()) 191 } 192 193 sliceVal := resultsVal.Elem() 194 if sliceVal.Kind() == reflect.Interface { 195 sliceVal = sliceVal.Elem() 196 } 197 198 if sliceVal.Kind() != reflect.Slice { 199 return fmt.Errorf("results argument must be a pointer to a slice, but was a pointer to %s", sliceVal.Kind()) 200 } 201 202 elementType := sliceVal.Type().Elem() 203 var index int 204 var err error 205 206 defer c.Close(ctx) 207 208 batch := c.batch // exhaust the current batch before iterating the batch cursor 209 for { 210 sliceVal, index, err = c.addFromBatch(sliceVal, elementType, batch, index) 211 if err != nil { 212 return err 213 } 214 215 if !c.bc.Next(ctx) { 216 break 217 } 218 219 batch = c.bc.Batch() 220 } 221 222 if err = replaceErrors(c.bc.Err()); err != nil { 223 return err 224 } 225 226 resultsVal.Elem().Set(sliceVal.Slice(0, index)) 227 return nil 228} 229 230// RemainingBatchLength returns the number of documents left in the current batch. If this returns zero, the subsequent 231// call to Next or TryNext will do a network request to fetch the next batch. 232func (c *Cursor) RemainingBatchLength() int { 233 return c.batchLength 234} 235 236// addFromBatch adds all documents from batch to sliceVal starting at the given index. It returns the new slice value, 237// the next empty index in the slice, and an error if one occurs. 238func (c *Cursor) addFromBatch(sliceVal reflect.Value, elemType reflect.Type, batch *bsoncore.DocumentSequence, 239 index int) (reflect.Value, int, error) { 240 241 docs, err := batch.Documents() 242 if err != nil { 243 return sliceVal, index, err 244 } 245 246 for _, doc := range docs { 247 if sliceVal.Len() == index { 248 // slice is full 249 newElem := reflect.New(elemType) 250 sliceVal = reflect.Append(sliceVal, newElem.Elem()) 251 sliceVal = sliceVal.Slice(0, sliceVal.Cap()) 252 } 253 254 currElem := sliceVal.Index(index).Addr().Interface() 255 if err = bson.UnmarshalWithRegistry(c.registry, doc, currElem); err != nil { 256 return sliceVal, index, err 257 } 258 259 index++ 260 } 261 262 return sliceVal, index, nil 263} 264 265func (c *Cursor) closeImplicitSession() { 266 if c.clientSession != nil && c.clientSession.SessionType == session.Implicit { 267 c.clientSession.EndSession() 268 } 269} 270 271// BatchCursorFromCursor returns a driver.BatchCursor for the given Cursor. If there is no underlying 272// driver.BatchCursor, nil is returned. 273// 274// Deprecated: This is an unstable function because the driver.BatchCursor type exists in the "x" package. Neither this 275// function nor the driver.BatchCursor type should be used by applications and may be changed or removed in any release. 276func BatchCursorFromCursor(c *Cursor) *driver.BatchCursor { 277 bc, _ := c.bc.(*driver.BatchCursor) 278 return bc 279} 280