1 // file dbclientcursor.h
2 
3 
4 /**
5  *    Copyright (C) 2018-present MongoDB, Inc.
6  *
7  *    This program is free software: you can redistribute it and/or modify
8  *    it under the terms of the Server Side Public License, version 1,
9  *    as published by MongoDB, Inc.
10  *
11  *    This program is distributed in the hope that it will be useful,
12  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *    Server Side Public License for more details.
15  *
16  *    You should have received a copy of the Server Side Public License
17  *    along with this program. If not, see
18  *    <http://www.mongodb.com/licensing/server-side-public-license>.
19  *
20  *    As a special exception, the copyright holders give permission to link the
21  *    code of portions of this program with the OpenSSL library under certain
22  *    conditions as described in each individual source file and distribute
23  *    linked combinations including the program with the OpenSSL library. You
24  *    must comply with the Server Side Public License in all respects for
25  *    all of the code used other than as permitted herein. If you modify file(s)
26  *    with this exception, you may extend this exception to your version of the
27  *    file(s), but you are not obligated to do so. If you do not wish to do so,
28  *    delete this exception statement from your version. If you delete this
29  *    exception statement from all source files in the program, then also delete
30  *    it in the license file.
31  */
32 
33 #pragma once
34 
35 #include <stack>
36 
37 #include "mongo/base/disallow_copying.h"
38 #include "mongo/client/dbclientinterface.h"
39 #include "mongo/db/jsobj.h"
40 #include "mongo/db/json.h"
41 #include "mongo/util/net/message.h"
42 
43 namespace mongo {
44 
45 class AScopedConnection;
46 
47 /** Queries return a cursor object */
48 class DBClientCursor {
49     MONGO_DISALLOW_COPYING(DBClientCursor);
50 
51 public:
52     /** If true, safe to call next().  Requests more from server if necessary. */
53     virtual bool more();
54 
55     /** If true, there is more in our local buffers to be fetched via next(). Returns
56         false when a getMore request back to server would be required.  You can use this
57         if you want to exhaust whatever data has been fetched to the client already but
58         then perhaps stop.
59     */
objsLeftInBatch()60     int objsLeftInBatch() const {
61         return _putBack.size() + batch.objs.size() - batch.pos;
62     }
moreInCurrentBatch()63     bool moreInCurrentBatch() {
64         return objsLeftInBatch() > 0;
65     }
66 
67     /** next
68        @return next object in the result cursor.
69        on an error at the remote server, you will get back:
70          { $err: <std::string> }
71        if you do not want to handle that yourself, call nextSafe().
72     */
73     virtual BSONObj next();
74 
75     /**
76         restore an object previously returned by next() to the cursor
77      */
putBack(const BSONObj & o)78     void putBack(const BSONObj& o) {
79         _putBack.push(o.getOwned());
80     }
81 
82     /** throws AssertionException if get back { $err : ... } */
83     BSONObj nextSafe();
84 
85     /** peek ahead at items buffered for future next() calls.
86         never requests new data from the server.  so peek only effective
87         with what is already buffered.
88         WARNING: no support for _putBack yet!
89     */
90     void peek(std::vector<BSONObj>&, int atMost);
91 
92     // Peeks at first element, if exists
93     BSONObj peekFirst();
94 
95     /**
96      * peek ahead and see if an error occurred, and get the error if so.
97      */
98     bool peekError(BSONObj* error = NULL);
99 
100     /**
101        iterate the rest of the cursor and return the number if items
102      */
itcount()103     int itcount() {
104         int c = 0;
105         while (more()) {
106             next();
107             c++;
108         }
109         return c;
110     }
111 
112     /** cursor no longer valid -- use with tailable cursors.
113        note you should only rely on this once more() returns false;
114        'dead' may be preset yet some data still queued and locally
115        available from the dbclientcursor.
116     */
isDead()117     bool isDead() const {
118         return cursorId == 0;
119     }
120 
tailable()121     bool tailable() const {
122         return (opts & QueryOption_CursorTailable) != 0;
123     }
124 
125     /** see ResultFlagType (constants.h) for flag values
126         mostly these flags are for internal purposes -
127         ResultFlag_ErrSet is the possible exception to that
128     */
hasResultFlag(int flag)129     bool hasResultFlag(int flag) {
130         return (resultFlags & flag) != 0;
131     }
132 
133     /// Change batchSize after construction. Can change after requesting first batch.
setBatchSize(int newBatchSize)134     void setBatchSize(int newBatchSize) {
135         batchSize = newBatchSize;
136     }
137 
138 
139     /**
140      * Fold this in with queryOptions to force the use of legacy query operations.
141      * This flag is never sent over the wire and is only used locally.
142      */
143     enum { QueryOptionLocal_forceOpQuery = 1 << 30 };
144 
145     DBClientCursor(DBClientBase* client,
146                    const std::string& ns,
147                    const BSONObj& query,
148                    int nToReturn,
149                    int nToSkip,
150                    const BSONObj* fieldsToReturn,
151                    int queryOptions,
152                    int bs);
153 
154     DBClientCursor(DBClientBase* client,
155                    const std::string& ns,
156                    long long cursorId,
157                    int nToReturn,
158                    int options);
159 
160     virtual ~DBClientCursor();
161 
getCursorId()162     long long getCursorId() const {
163         return cursorId;
164     }
165 
166     /** by default we "own" the cursor and will send the server a KillCursor
167         message when ~DBClientCursor() is called. This function overrides that.
168     */
decouple()169     void decouple() {
170         _ownCursor = false;
171     }
172 
173     void attach(AScopedConnection* conn);
174 
originalHost()175     std::string originalHost() const {
176         return _originalHost;
177     }
178 
getns()179     std::string getns() const {
180         return ns.ns();
181     }
182 
183     /**
184      * actually does the query
185      */
186     bool init();
187 
188     void initLazy(bool isRetry = false);
189     bool initLazyFinish(bool& retry);
190 
191     /**
192      * For exhaust. Used in DBClientConnection.
193      */
194     void exhaustReceiveMore();
195 
196     /**
197      * Marks this object as dead and sends the KillCursors message to the server.
198      *
199      * Any errors that result from this are swallowed since this is typically performed as part of
200      * cleanup and a failure to kill the cursor should not result in a failure of the operation
201      * using the cursor.
202      *
203      * Killing an already killed or exhausted cursor does nothing, so it is safe to always call this
204      * if you want to ensure that a cursor is killed.
205      */
206     void kill();
207 
208     /**
209      * Returns true if the connection this cursor is using has pending replies.
210      *
211      * If true, you should not try to use the connection for any other purpose or return it to a
212      * pool.
213      *
214      * This can happen if either initLazy() was called without initLazyFinish() or an exhaust query
215      * was started but not completed.
216      */
connectionHasPendingReplies()217     bool connectionHasPendingReplies() const {
218         return _connectionHasPendingReplies;
219     }
220 
221 private:
222     DBClientCursor(DBClientBase* client,
223                    const std::string& ns,
224                    const BSONObj& query,
225                    long long cursorId,
226                    int nToReturn,
227                    int nToSkip,
228                    const BSONObj* fieldsToReturn,
229                    int queryOptions,
230                    int bs);
231 
232     int nextBatchSize();
233 
234     struct Batch {
235         std::vector<BSONObj> objs;
236         size_t pos = 0;
237     };
238 
239     Batch batch;
240     DBClientBase* _client;
241     std::string _originalHost;
242     NamespaceString ns;
243     const bool _isCommand;
244     BSONObj query;
245     int nToReturn;
246     bool haveLimit;
247     int nToSkip;
248     const BSONObj* fieldsToReturn;
249     int opts;
250     int batchSize;
251     std::stack<BSONObj> _putBack;
252     int resultFlags;
253     long long cursorId;
254     bool _ownCursor;  // see decouple()
255     std::string _scopedHost;
256     std::string _lazyHost;
257     bool wasError;
258     BSONVersion _enabledBSONVersion;
259     bool _useFindCommand = true;
260     bool _connectionHasPendingReplies = false;
261     int _lastRequestId = 0;
262 
dataReceived(const Message & reply)263     void dataReceived(const Message& reply) {
264         bool retry;
265         std::string lazyHost;
266         dataReceived(reply, retry, lazyHost);
267     }
268     void dataReceived(const Message& reply, bool& retry, std::string& lazyHost);
269 
270     /**
271      * Parses and returns command replies regardless of which command protocol was used.
272      * Does *not* parse replies from non-command OP_QUERY finds.
273      */
274     BSONObj commandDataReceived(const Message& reply);
275 
276     void requestMore();
277 
278     // init pieces
279     Message _assembleInit();
280     Message _assembleGetMore();
281 };
282 
283 /** iterate over objects in current batch only - will not cause a network call
284  */
285 class DBClientCursorBatchIterator {
286 public:
DBClientCursorBatchIterator(DBClientCursor & c)287     DBClientCursorBatchIterator(DBClientCursor& c) : _c(c), _n() {}
moreInCurrentBatch()288     bool moreInCurrentBatch() {
289         return _c.moreInCurrentBatch();
290     }
nextSafe()291     BSONObj nextSafe() {
292         massert(13383, "BatchIterator empty", moreInCurrentBatch());
293         ++_n;
294         return _c.nextSafe();
295     }
n()296     int n() const {
297         return _n;
298     }
299 
300 private:
301     DBClientCursor& _c;
302     int _n;
303 };
304 
305 }  // namespace mongo
306