1// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package mongo
8
9import (
10	"context"
11	"errors"
12	"io"
13	"reflect"
14
15	"go.mongodb.org/mongo-driver/bson"
16	"go.mongodb.org/mongo-driver/bson/bsoncodec"
17	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
18	"go.mongodb.org/mongo-driver/x/mongo/driver"
19	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
20)
21
22// Cursor is used to iterate over a stream of documents. Each document can be decoded into a Go type via the Decode
23// method or accessed as raw BSON via the Current field.
24type Cursor struct {
25	// Current contains the BSON bytes of the current change document. This property is only valid until the next call
26	// to Next or TryNext. If continued access is required, a copy must be made.
27	Current bson.Raw
28
29	bc            batchCursor
30	batch         *bsoncore.DocumentSequence
31	registry      *bsoncodec.Registry
32	clientSession *session.Client
33
34	err error
35}
36
37func newCursor(bc batchCursor, registry *bsoncodec.Registry) (*Cursor, error) {
38	return newCursorWithSession(bc, registry, nil)
39}
40
41func newCursorWithSession(bc batchCursor, registry *bsoncodec.Registry, clientSession *session.Client) (*Cursor, error) {
42	if registry == nil {
43		registry = bson.DefaultRegistry
44	}
45	if bc == nil {
46		return nil, errors.New("batch cursor must not be nil")
47	}
48	c := &Cursor{
49		bc:            bc,
50		registry:      registry,
51		clientSession: clientSession,
52	}
53	if bc.ID() == 0 {
54		c.closeImplicitSession()
55	}
56	return c, nil
57}
58
59func newEmptyCursor() *Cursor {
60	return &Cursor{bc: driver.NewEmptyBatchCursor()}
61}
62
63// ID returns the ID of this cursor, or 0 if the cursor has been closed or exhausted.
64func (c *Cursor) ID() int64 { return c.bc.ID() }
65
66// Next gets the next document for this cursor. It returns true if there were no errors and the cursor has not been
67// exhausted.
68//
69// Next blocks until a document is available, an error occurs, or ctx expires. If ctx expires, the
70// error will be set to ctx.Err(). In an error case, Next will return false.
71//
72// If Next returns false, subsequent calls will also return false.
73func (c *Cursor) Next(ctx context.Context) bool {
74	return c.next(ctx, false)
75}
76
77// TryNext attempts to get the next document for this cursor. It returns true if there were no errors and the next
78// document is available. This is only recommended for use with tailable cursors as a non-blocking alternative to
79// Next. See https://docs.mongodb.com/manual/core/tailable-cursors/ for more information about tailable cursors.
80//
81// TryNext returns false if the cursor is exhausted, an error occurs when getting results from the server, the next
82// document is not yet available, or ctx expires. If ctx expires, the error will be set to ctx.Err().
83//
84// If TryNext returns false and an error occurred or the cursor has been exhausted (i.e. c.Err() != nil || c.ID() == 0),
85// subsequent attempts will also return false. Otherwise, it is safe to call TryNext again until a document is
86// available.
87//
88// This method requires driver version >= 1.2.0.
89func (c *Cursor) TryNext(ctx context.Context) bool {
90	return c.next(ctx, true)
91}
92
93func (c *Cursor) next(ctx context.Context, nonBlocking bool) bool {
94	// return false right away if the cursor has already errored.
95	if c.err != nil {
96		return false
97	}
98
99	if ctx == nil {
100		ctx = context.Background()
101	}
102	doc, err := c.batch.Next()
103	switch err {
104	case nil:
105		c.Current = bson.Raw(doc)
106		return true
107	case io.EOF: // Need to do a getMore
108	default:
109		c.err = err
110		return false
111	}
112
113	// call the Next method in a loop until at least one document is returned in the next batch or
114	// the context times out.
115	for {
116		// If we don't have a next batch
117		if !c.bc.Next(ctx) {
118			// Do we have an error? If so we return false.
119			c.err = c.bc.Err()
120			if c.err != nil {
121				return false
122			}
123			// Is the cursor ID zero?
124			if c.bc.ID() == 0 {
125				c.closeImplicitSession()
126				return false
127			}
128			// empty batch, but cursor is still valid.
129			// use nonBlocking to determine if we should continue or return control to the caller.
130			if nonBlocking {
131				return false
132			}
133			continue
134		}
135
136		// close the implicit session if this was the last getMore
137		if c.bc.ID() == 0 {
138			c.closeImplicitSession()
139		}
140
141		c.batch = c.bc.Batch()
142		doc, err = c.batch.Next()
143		switch err {
144		case nil:
145			c.Current = bson.Raw(doc)
146			return true
147		case io.EOF: // Empty batch so we continue
148		default:
149			c.err = err
150			return false
151		}
152	}
153}
154
155// Decode will unmarshal the current document into val and return any errors from the unmarshalling process without any
156// modification. If val is nil or is a typed nil, an error will be returned.
157func (c *Cursor) Decode(val interface{}) error {
158	return bson.UnmarshalWithRegistry(c.registry, c.Current, val)
159}
160
161// Err returns the last error seen by the Cursor, or nil if no error has occurred.
162func (c *Cursor) Err() error { return c.err }
163
164// Close closes this cursor. Next and TryNext must not be called after Close has been called. Close is idempotent. After
165// the first call, any subsequent calls will not change the state.
166func (c *Cursor) Close(ctx context.Context) error {
167	defer c.closeImplicitSession()
168	return c.bc.Close(ctx)
169}
170
171// All iterates the cursor and decodes each document into results. The results parameter must be a pointer to a slice.
172// The slice pointed to by results will be completely overwritten. This method will close the cursor after retrieving
173// all documents. If the cursor has been iterated, any previously iterated documents will not be included in results.
174//
175// This method requires driver version >= 1.1.0.
176func (c *Cursor) All(ctx context.Context, results interface{}) error {
177	resultsVal := reflect.ValueOf(results)
178	if resultsVal.Kind() != reflect.Ptr {
179		return errors.New("results argument must be a pointer to a slice")
180	}
181
182	sliceVal := resultsVal.Elem()
183	elementType := sliceVal.Type().Elem()
184	var index int
185	var err error
186
187	defer c.Close(ctx)
188
189	batch := c.batch // exhaust the current batch before iterating the batch cursor
190	for {
191		sliceVal, index, err = c.addFromBatch(sliceVal, elementType, batch, index)
192		if err != nil {
193			return err
194		}
195
196		if !c.bc.Next(ctx) {
197			break
198		}
199
200		batch = c.bc.Batch()
201	}
202
203	if err = c.bc.Err(); err != nil {
204		return err
205	}
206
207	resultsVal.Elem().Set(sliceVal.Slice(0, index))
208	return nil
209}
210
211// addFromBatch adds all documents from batch to sliceVal starting at the given index. It returns the new slice value,
212// the next empty index in the slice, and an error if one occurs.
213func (c *Cursor) addFromBatch(sliceVal reflect.Value, elemType reflect.Type, batch *bsoncore.DocumentSequence,
214	index int) (reflect.Value, int, error) {
215
216	docs, err := batch.Documents()
217	if err != nil {
218		return sliceVal, index, err
219	}
220
221	for _, doc := range docs {
222		if sliceVal.Len() == index {
223			// slice is full
224			newElem := reflect.New(elemType)
225			sliceVal = reflect.Append(sliceVal, newElem.Elem())
226			sliceVal = sliceVal.Slice(0, sliceVal.Cap())
227		}
228
229		currElem := sliceVal.Index(index).Addr().Interface()
230		if err = bson.UnmarshalWithRegistry(c.registry, doc, currElem); err != nil {
231			return sliceVal, index, err
232		}
233
234		index++
235	}
236
237	return sliceVal, index, nil
238}
239
240func (c *Cursor) closeImplicitSession() {
241	if c.clientSession != nil && c.clientSession.SessionType == session.Implicit {
242		c.clientSession.EndSession()
243	}
244}
245
246// BatchCursorFromCursor returns a driver.BatchCursor for the given Cursor. If there is no underlying
247// driver.BatchCursor, nil is returned. This method is deprecated and does not have any stability guarantees. It may be
248// removed in the future.
249func BatchCursorFromCursor(c *Cursor) *driver.BatchCursor {
250	bc, _ := c.bc.(*driver.BatchCursor)
251	return bc
252}
253