1// Copyright (C) MongoDB, Inc. 2017-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package mongo 8 9import ( 10 "context" 11 "errors" 12 "io" 13 "reflect" 14 15 "go.mongodb.org/mongo-driver/bson" 16 "go.mongodb.org/mongo-driver/bson/bsoncodec" 17 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" 18 "go.mongodb.org/mongo-driver/x/mongo/driver" 19 "go.mongodb.org/mongo-driver/x/mongo/driver/session" 20) 21 22// Cursor is used to iterate over a stream of documents. Each document can be decoded into a Go type via the Decode 23// method or accessed as raw BSON via the Current field. 24type Cursor struct { 25 // Current contains the BSON bytes of the current change document. This property is only valid until the next call 26 // to Next or TryNext. If continued access is required, a copy must be made. 27 Current bson.Raw 28 29 bc batchCursor 30 batch *bsoncore.DocumentSequence 31 registry *bsoncodec.Registry 32 clientSession *session.Client 33 34 err error 35} 36 37func newCursor(bc batchCursor, registry *bsoncodec.Registry) (*Cursor, error) { 38 return newCursorWithSession(bc, registry, nil) 39} 40 41func newCursorWithSession(bc batchCursor, registry *bsoncodec.Registry, clientSession *session.Client) (*Cursor, error) { 42 if registry == nil { 43 registry = bson.DefaultRegistry 44 } 45 if bc == nil { 46 return nil, errors.New("batch cursor must not be nil") 47 } 48 c := &Cursor{ 49 bc: bc, 50 registry: registry, 51 clientSession: clientSession, 52 } 53 if bc.ID() == 0 { 54 c.closeImplicitSession() 55 } 56 return c, nil 57} 58 59func newEmptyCursor() *Cursor { 60 return &Cursor{bc: driver.NewEmptyBatchCursor()} 61} 62 63// ID returns the ID of this cursor, or 0 if the cursor has been closed or exhausted. 64func (c *Cursor) ID() int64 { return c.bc.ID() } 65 66// Next gets the next document for this cursor. It returns true if there were no errors and the cursor has not been 67// exhausted. 68// 69// Next blocks until a document is available, an error occurs, or ctx expires. If ctx expires, the 70// error will be set to ctx.Err(). In an error case, Next will return false. 71// 72// If Next returns false, subsequent calls will also return false. 73func (c *Cursor) Next(ctx context.Context) bool { 74 return c.next(ctx, false) 75} 76 77// TryNext attempts to get the next document for this cursor. It returns true if there were no errors and the next 78// document is available. This is only recommended for use with tailable cursors as a non-blocking alternative to 79// Next. See https://docs.mongodb.com/manual/core/tailable-cursors/ for more information about tailable cursors. 80// 81// TryNext returns false if the cursor is exhausted, an error occurs when getting results from the server, the next 82// document is not yet available, or ctx expires. If ctx expires, the error will be set to ctx.Err(). 83// 84// If TryNext returns false and an error occurred or the cursor has been exhausted (i.e. c.Err() != nil || c.ID() == 0), 85// subsequent attempts will also return false. Otherwise, it is safe to call TryNext again until a document is 86// available. 87// 88// This method requires driver version >= 1.2.0. 89func (c *Cursor) TryNext(ctx context.Context) bool { 90 return c.next(ctx, true) 91} 92 93func (c *Cursor) next(ctx context.Context, nonBlocking bool) bool { 94 // return false right away if the cursor has already errored. 95 if c.err != nil { 96 return false 97 } 98 99 if ctx == nil { 100 ctx = context.Background() 101 } 102 doc, err := c.batch.Next() 103 switch err { 104 case nil: 105 c.Current = bson.Raw(doc) 106 return true 107 case io.EOF: // Need to do a getMore 108 default: 109 c.err = err 110 return false 111 } 112 113 // call the Next method in a loop until at least one document is returned in the next batch or 114 // the context times out. 115 for { 116 // If we don't have a next batch 117 if !c.bc.Next(ctx) { 118 // Do we have an error? If so we return false. 119 c.err = c.bc.Err() 120 if c.err != nil { 121 return false 122 } 123 // Is the cursor ID zero? 124 if c.bc.ID() == 0 { 125 c.closeImplicitSession() 126 return false 127 } 128 // empty batch, but cursor is still valid. 129 // use nonBlocking to determine if we should continue or return control to the caller. 130 if nonBlocking { 131 return false 132 } 133 continue 134 } 135 136 // close the implicit session if this was the last getMore 137 if c.bc.ID() == 0 { 138 c.closeImplicitSession() 139 } 140 141 c.batch = c.bc.Batch() 142 doc, err = c.batch.Next() 143 switch err { 144 case nil: 145 c.Current = bson.Raw(doc) 146 return true 147 case io.EOF: // Empty batch so we continue 148 default: 149 c.err = err 150 return false 151 } 152 } 153} 154 155// Decode will unmarshal the current document into val and return any errors from the unmarshalling process without any 156// modification. If val is nil or is a typed nil, an error will be returned. 157func (c *Cursor) Decode(val interface{}) error { 158 return bson.UnmarshalWithRegistry(c.registry, c.Current, val) 159} 160 161// Err returns the last error seen by the Cursor, or nil if no error has occurred. 162func (c *Cursor) Err() error { return c.err } 163 164// Close closes this cursor. Next and TryNext must not be called after Close has been called. Close is idempotent. After 165// the first call, any subsequent calls will not change the state. 166func (c *Cursor) Close(ctx context.Context) error { 167 defer c.closeImplicitSession() 168 return c.bc.Close(ctx) 169} 170 171// All iterates the cursor and decodes each document into results. The results parameter must be a pointer to a slice. 172// The slice pointed to by results will be completely overwritten. This method will close the cursor after retrieving 173// all documents. If the cursor has been iterated, any previously iterated documents will not be included in results. 174// 175// This method requires driver version >= 1.1.0. 176func (c *Cursor) All(ctx context.Context, results interface{}) error { 177 resultsVal := reflect.ValueOf(results) 178 if resultsVal.Kind() != reflect.Ptr { 179 return errors.New("results argument must be a pointer to a slice") 180 } 181 182 sliceVal := resultsVal.Elem() 183 elementType := sliceVal.Type().Elem() 184 var index int 185 var err error 186 187 defer c.Close(ctx) 188 189 batch := c.batch // exhaust the current batch before iterating the batch cursor 190 for { 191 sliceVal, index, err = c.addFromBatch(sliceVal, elementType, batch, index) 192 if err != nil { 193 return err 194 } 195 196 if !c.bc.Next(ctx) { 197 break 198 } 199 200 batch = c.bc.Batch() 201 } 202 203 if err = c.bc.Err(); err != nil { 204 return err 205 } 206 207 resultsVal.Elem().Set(sliceVal.Slice(0, index)) 208 return nil 209} 210 211// addFromBatch adds all documents from batch to sliceVal starting at the given index. It returns the new slice value, 212// the next empty index in the slice, and an error if one occurs. 213func (c *Cursor) addFromBatch(sliceVal reflect.Value, elemType reflect.Type, batch *bsoncore.DocumentSequence, 214 index int) (reflect.Value, int, error) { 215 216 docs, err := batch.Documents() 217 if err != nil { 218 return sliceVal, index, err 219 } 220 221 for _, doc := range docs { 222 if sliceVal.Len() == index { 223 // slice is full 224 newElem := reflect.New(elemType) 225 sliceVal = reflect.Append(sliceVal, newElem.Elem()) 226 sliceVal = sliceVal.Slice(0, sliceVal.Cap()) 227 } 228 229 currElem := sliceVal.Index(index).Addr().Interface() 230 if err = bson.UnmarshalWithRegistry(c.registry, doc, currElem); err != nil { 231 return sliceVal, index, err 232 } 233 234 index++ 235 } 236 237 return sliceVal, index, nil 238} 239 240func (c *Cursor) closeImplicitSession() { 241 if c.clientSession != nil && c.clientSession.SessionType == session.Implicit { 242 c.clientSession.EndSession() 243 } 244} 245 246// BatchCursorFromCursor returns a driver.BatchCursor for the given Cursor. If there is no underlying 247// driver.BatchCursor, nil is returned. This method is deprecated and does not have any stability guarantees. It may be 248// removed in the future. 249func BatchCursorFromCursor(c *Cursor) *driver.BatchCursor { 250 bc, _ := c.bc.(*driver.BatchCursor) 251 return bc 252} 253