1// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package mongo
8
9import (
10	"context"
11	"errors"
12	"fmt"
13	"io"
14	"reflect"
15
16	"go.mongodb.org/mongo-driver/bson"
17	"go.mongodb.org/mongo-driver/bson/bsoncodec"
18	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
19	"go.mongodb.org/mongo-driver/x/mongo/driver"
20	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
21)
22
23// Cursor is used to iterate over a stream of documents. Each document can be decoded into a Go type via the Decode
24// method or accessed as raw BSON via the Current field.
25type Cursor struct {
26	// Current contains the BSON bytes of the current change document. This property is only valid until the next call
27	// to Next or TryNext. If continued access is required, a copy must be made.
28	Current bson.Raw
29
30	bc            batchCursor
31	batch         *bsoncore.DocumentSequence
32	batchLength   int
33	registry      *bsoncodec.Registry
34	clientSession *session.Client
35
36	err error
37}
38
39func newCursor(bc batchCursor, registry *bsoncodec.Registry) (*Cursor, error) {
40	return newCursorWithSession(bc, registry, nil)
41}
42
43func newCursorWithSession(bc batchCursor, registry *bsoncodec.Registry, clientSession *session.Client) (*Cursor, error) {
44	if registry == nil {
45		registry = bson.DefaultRegistry
46	}
47	if bc == nil {
48		return nil, errors.New("batch cursor must not be nil")
49	}
50	c := &Cursor{
51		bc:            bc,
52		registry:      registry,
53		clientSession: clientSession,
54	}
55	if bc.ID() == 0 {
56		c.closeImplicitSession()
57	}
58
59	// Initialize just the batchLength here so RemainingBatchLength will return an accurate result. The actual batch
60	// will be pulled up by the first Next/TryNext call.
61	c.batchLength = c.bc.Batch().DocumentCount()
62	return c, nil
63}
64
65func newEmptyCursor() *Cursor {
66	return &Cursor{bc: driver.NewEmptyBatchCursor()}
67}
68
69// ID returns the ID of this cursor, or 0 if the cursor has been closed or exhausted.
70func (c *Cursor) ID() int64 { return c.bc.ID() }
71
72// Next gets the next document for this cursor. It returns true if there were no errors and the cursor has not been
73// exhausted.
74//
75// Next blocks until a document is available, an error occurs, or ctx expires. If ctx expires, the
76// error will be set to ctx.Err(). In an error case, Next will return false.
77//
78// If Next returns false, subsequent calls will also return false.
79func (c *Cursor) Next(ctx context.Context) bool {
80	return c.next(ctx, false)
81}
82
83// TryNext attempts to get the next document for this cursor. It returns true if there were no errors and the next
84// document is available. This is only recommended for use with tailable cursors as a non-blocking alternative to
85// Next. See https://docs.mongodb.com/manual/core/tailable-cursors/ for more information about tailable cursors.
86//
87// TryNext returns false if the cursor is exhausted, an error occurs when getting results from the server, the next
88// document is not yet available, or ctx expires. If ctx expires, the error will be set to ctx.Err().
89//
90// If TryNext returns false and an error occurred or the cursor has been exhausted (i.e. c.Err() != nil || c.ID() == 0),
91// subsequent attempts will also return false. Otherwise, it is safe to call TryNext again until a document is
92// available.
93//
94// This method requires driver version >= 1.2.0.
95func (c *Cursor) TryNext(ctx context.Context) bool {
96	return c.next(ctx, true)
97}
98
99func (c *Cursor) next(ctx context.Context, nonBlocking bool) bool {
100	// return false right away if the cursor has already errored.
101	if c.err != nil {
102		return false
103	}
104
105	if ctx == nil {
106		ctx = context.Background()
107	}
108	doc, err := c.batch.Next()
109	switch err {
110	case nil:
111		// Consume the next document in the current batch.
112		c.batchLength--
113		c.Current = bson.Raw(doc)
114		return true
115	case io.EOF: // Need to do a getMore
116	default:
117		c.err = err
118		return false
119	}
120
121	// call the Next method in a loop until at least one document is returned in the next batch or
122	// the context times out.
123	for {
124		// If we don't have a next batch
125		if !c.bc.Next(ctx) {
126			// Do we have an error? If so we return false.
127			c.err = replaceErrors(c.bc.Err())
128			if c.err != nil {
129				return false
130			}
131			// Is the cursor ID zero?
132			if c.bc.ID() == 0 {
133				c.closeImplicitSession()
134				return false
135			}
136			// empty batch, but cursor is still valid.
137			// use nonBlocking to determine if we should continue or return control to the caller.
138			if nonBlocking {
139				return false
140			}
141			continue
142		}
143
144		// close the implicit session if this was the last getMore
145		if c.bc.ID() == 0 {
146			c.closeImplicitSession()
147		}
148
149		// Use the new batch to update the batch and batchLength fields. Consume the first document in the batch.
150		c.batch = c.bc.Batch()
151		c.batchLength = c.batch.DocumentCount()
152		doc, err = c.batch.Next()
153		switch err {
154		case nil:
155			c.batchLength--
156			c.Current = bson.Raw(doc)
157			return true
158		case io.EOF: // Empty batch so we continue
159		default:
160			c.err = err
161			return false
162		}
163	}
164}
165
166// Decode will unmarshal the current document into val and return any errors from the unmarshalling process without any
167// modification. If val is nil or is a typed nil, an error will be returned.
168func (c *Cursor) Decode(val interface{}) error {
169	return bson.UnmarshalWithRegistry(c.registry, c.Current, val)
170}
171
172// Err returns the last error seen by the Cursor, or nil if no error has occurred.
173func (c *Cursor) Err() error { return c.err }
174
175// Close closes this cursor. Next and TryNext must not be called after Close has been called. Close is idempotent. After
176// the first call, any subsequent calls will not change the state.
177func (c *Cursor) Close(ctx context.Context) error {
178	defer c.closeImplicitSession()
179	return replaceErrors(c.bc.Close(ctx))
180}
181
182// All iterates the cursor and decodes each document into results. The results parameter must be a pointer to a slice.
183// The slice pointed to by results will be completely overwritten. This method will close the cursor after retrieving
184// all documents. If the cursor has been iterated, any previously iterated documents will not be included in results.
185//
186// This method requires driver version >= 1.1.0.
187func (c *Cursor) All(ctx context.Context, results interface{}) error {
188	resultsVal := reflect.ValueOf(results)
189	if resultsVal.Kind() != reflect.Ptr {
190		return fmt.Errorf("results argument must be a pointer to a slice, but was a %s", resultsVal.Kind())
191	}
192
193	sliceVal := resultsVal.Elem()
194	if sliceVal.Kind() == reflect.Interface {
195		sliceVal = sliceVal.Elem()
196	}
197
198	if sliceVal.Kind() != reflect.Slice {
199		return fmt.Errorf("results argument must be a pointer to a slice, but was a pointer to %s", sliceVal.Kind())
200	}
201
202	elementType := sliceVal.Type().Elem()
203	var index int
204	var err error
205
206	defer c.Close(ctx)
207
208	batch := c.batch // exhaust the current batch before iterating the batch cursor
209	for {
210		sliceVal, index, err = c.addFromBatch(sliceVal, elementType, batch, index)
211		if err != nil {
212			return err
213		}
214
215		if !c.bc.Next(ctx) {
216			break
217		}
218
219		batch = c.bc.Batch()
220	}
221
222	if err = replaceErrors(c.bc.Err()); err != nil {
223		return err
224	}
225
226	resultsVal.Elem().Set(sliceVal.Slice(0, index))
227	return nil
228}
229
230// RemainingBatchLength returns the number of documents left in the current batch. If this returns zero, the subsequent
231// call to Next or TryNext will do a network request to fetch the next batch.
232func (c *Cursor) RemainingBatchLength() int {
233	return c.batchLength
234}
235
236// addFromBatch adds all documents from batch to sliceVal starting at the given index. It returns the new slice value,
237// the next empty index in the slice, and an error if one occurs.
238func (c *Cursor) addFromBatch(sliceVal reflect.Value, elemType reflect.Type, batch *bsoncore.DocumentSequence,
239	index int) (reflect.Value, int, error) {
240
241	docs, err := batch.Documents()
242	if err != nil {
243		return sliceVal, index, err
244	}
245
246	for _, doc := range docs {
247		if sliceVal.Len() == index {
248			// slice is full
249			newElem := reflect.New(elemType)
250			sliceVal = reflect.Append(sliceVal, newElem.Elem())
251			sliceVal = sliceVal.Slice(0, sliceVal.Cap())
252		}
253
254		currElem := sliceVal.Index(index).Addr().Interface()
255		if err = bson.UnmarshalWithRegistry(c.registry, doc, currElem); err != nil {
256			return sliceVal, index, err
257		}
258
259		index++
260	}
261
262	return sliceVal, index, nil
263}
264
265func (c *Cursor) closeImplicitSession() {
266	if c.clientSession != nil && c.clientSession.SessionType == session.Implicit {
267		c.clientSession.EndSession()
268	}
269}
270
271// BatchCursorFromCursor returns a driver.BatchCursor for the given Cursor. If there is no underlying
272// driver.BatchCursor, nil is returned.
273//
274// Deprecated: This is an unstable function because the driver.BatchCursor type exists in the "x" package. Neither this
275// function nor the driver.BatchCursor type should be used by applications and may be changed or removed in any release.
276func BatchCursorFromCursor(c *Cursor) *driver.BatchCursor {
277	bc, _ := c.bc.(*driver.BatchCursor)
278	return bc
279}
280