1 // dbmessage.h
2 
3 
4 /**
5  *    Copyright (C) 2018-present MongoDB, Inc.
6  *
7  *    This program is free software: you can redistribute it and/or modify
8  *    it under the terms of the Server Side Public License, version 1,
9  *    as published by MongoDB, Inc.
10  *
11  *    This program is distributed in the hope that it will be useful,
12  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *    Server Side Public License for more details.
15  *
16  *    You should have received a copy of the Server Side Public License
17  *    along with this program. If not, see
18  *    <http://www.mongodb.com/licensing/server-side-public-license>.
19  *
20  *    As a special exception, the copyright holders give permission to link the
21  *    code of portions of this program with the OpenSSL library under certain
22  *    conditions as described in each individual source file and distribute
23  *    linked combinations including the program with the OpenSSL library. You
24  *    must comply with the Server Side Public License in all respects for
25  *    all of the code used other than as permitted herein. If you modify file(s)
26  *    with this exception, you may extend this exception to your version of the
27  *    file(s), but you are not obligated to do so. If you do not wish to do so,
28  *    delete this exception statement from your version. If you delete this
29  *    exception statement from all source files in the program, then also delete
30  *    it in the license file.
31  */
32 
33 #pragma once
34 
35 #include "mongo/base/static_assert.h"
36 #include "mongo/bson/bson_validate.h"
37 #include "mongo/client/constants.h"
38 #include "mongo/db/jsobj.h"
39 #include "mongo/db/server_options.h"
40 #include "mongo/util/net/message.h"
41 
42 namespace mongo {
43 
44 class OperationContext;
45 
46 /* db response format
47 
48    Query or GetMore: // see struct QueryResult
49       int resultFlags;
50       int64 cursorID;
51       int startingFrom;
52       int nReturned;
53       list of marshalled JSObjects;
54 */
55 
56 /* db request message format
57 
58    unsigned opid;         // arbitary; will be echoed back
59    byte operation;
60    int options;
61 
62    then for:
63 
64    dbInsert:
65       std::string collection;
66       a series of JSObjects
67    dbDelete:
68       std::string collection;
69       int flags=0; // 1=DeleteSingle
70       JSObject query;
71    dbUpdate:
72       std::string collection;
73       int flags; // 1=upsert
74       JSObject query;
75       JSObject objectToUpdate;
76         objectToUpdate may include { $inc: <field> } or { $set: ... }, see struct Mod.
77    dbQuery:
78       std::string collection;
79       int nToSkip;
80       int nToReturn; // how many you want back as the beginning of the cursor data (0=no limit)
81                      // greater than zero is simply a hint on how many objects to send back per
82                      // "cursor batch".
83                      // a negative number indicates a hard limit.
84       JSObject query;
85       [JSObject fieldsToReturn]
86    dbGetMore:
87       std::string collection; // redundant, might use for security.
88       int nToReturn;
89       int64 cursorID;
90    dbKillCursors=2007:
91       int n;
92       int64 cursorIDs[n];
93 
94    Note that on Update, there is only one object, which is different
95    from insert where you can pass a list of objects to insert in the db.
96    Note that the update field layout is very similar layout to Query.
97 */
98 
99 namespace QueryResult {
100 #pragma pack(1)
101 /* see http://dochub.mongodb.org/core/mongowireprotocol
102 */
103 struct Layout {
104     MsgData::Layout msgdata;
105     int64_t cursorId;
106     int32_t startingFrom;
107     int32_t nReturned;
108 };
109 #pragma pack()
110 
111 class ConstView {
112 public:
ConstView(const char * storage)113     ConstView(const char* storage) : _storage(storage) {}
114 
view2ptr()115     const char* view2ptr() const {
116         return storage().view();
117     }
118 
msgdata()119     MsgData::ConstView msgdata() const {
120         return storage().view(offsetof(Layout, msgdata));
121     }
122 
getCursorId()123     int64_t getCursorId() const {
124         return storage().read<LittleEndian<int64_t>>(offsetof(Layout, cursorId));
125     }
126 
getStartingFrom()127     int32_t getStartingFrom() const {
128         return storage().read<LittleEndian<int32_t>>(offsetof(Layout, startingFrom));
129     }
130 
getNReturned()131     int32_t getNReturned() const {
132         return storage().read<LittleEndian<int32_t>>(offsetof(Layout, nReturned));
133     }
134 
data()135     const char* data() const {
136         return storage().view(sizeof(Layout));
137     }
138 
dataLen()139     int32_t dataLen() const {
140         return msgdata().getLen() - sizeof(Layout);
141     }
142 
143 protected:
storage()144     const ConstDataView& storage() const {
145         return _storage;
146     }
147 
148 private:
149     ConstDataView _storage;
150 };
151 
152 class View : public ConstView {
153 public:
View(char * data)154     View(char* data) : ConstView(data) {}
155 
156     using ConstView::view2ptr;
view2ptr()157     char* view2ptr() {
158         return storage().view();
159     }
160 
161     using ConstView::msgdata;
msgdata()162     MsgData::View msgdata() {
163         return storage().view(offsetof(Layout, msgdata));
164     }
165 
setCursorId(int64_t value)166     void setCursorId(int64_t value) {
167         storage().write(tagLittleEndian(value), offsetof(Layout, cursorId));
168     }
169 
setStartingFrom(int32_t value)170     void setStartingFrom(int32_t value) {
171         storage().write(tagLittleEndian(value), offsetof(Layout, startingFrom));
172     }
173 
setNReturned(int32_t value)174     void setNReturned(int32_t value) {
175         storage().write(tagLittleEndian(value), offsetof(Layout, nReturned));
176     }
177 
getResultFlags()178     int32_t getResultFlags() {
179         return DataView(msgdata().data()).read<LittleEndian<int32_t>>();
180     }
181 
setResultFlags(int32_t value)182     void setResultFlags(int32_t value) {
183         DataView(msgdata().data()).write(tagLittleEndian(value));
184     }
185 
setResultFlagsToOk()186     void setResultFlagsToOk() {
187         setResultFlags(ResultFlag_AwaitCapable);
188     }
189 
initializeResultFlags()190     void initializeResultFlags() {
191         setResultFlags(0);
192     }
193 
194 private:
storage()195     DataView storage() const {
196         return const_cast<char*>(ConstView::view2ptr());
197     }
198 };
199 
200 class Value : public EncodedValueStorage<Layout, ConstView, View> {
201 public:
Value()202     Value() {
203         MONGO_STATIC_ASSERT(sizeof(Value) == sizeof(Layout));
204     }
205 
Value(ZeroInitTag_t zit)206     Value(ZeroInitTag_t zit) : EncodedValueStorage<Layout, ConstView, View>(zit) {}
207 };
208 
209 }  // namespace QueryResult
210 
211 /* For the database/server protocol, these objects and functions encapsulate
212    the various messages transmitted over the connection.
213 
214    See http://dochub.mongodb.org/core/mongowireprotocol
215 */
216 class DbMessage {
217     // Assume sizeof(int) == 4 bytes
218     MONGO_STATIC_ASSERT(sizeof(int) == 4);
219 
220 public:
221     // Note: DbMessage constructor reads the first 4 bytes and stores it in reserved
222     DbMessage(const Message& msg);
223 
224     // Indicates whether this message is expected to have a ns
messageShouldHaveNs()225     bool messageShouldHaveNs() const {
226         return (_msg.operation() >= dbUpdate) & (_msg.operation() <= dbDelete);
227     }
228 
229     /** the 32 bit field before the ns
230      * track all bit usage here as its cross op
231      * 0: InsertOption_ContinueOnError
232      * 1: fromWriteback
233      */
reservedField()234     int reservedField() const {
235         return _reserved;
236     }
237 
238     const char* getns() const;
239     int getQueryNToReturn() const;
240 
241     int pullInt();
242     long long pullInt64();
243     const char* getArray(size_t count) const;
244 
245     /* for insert and update msgs */
moreJSObjs()246     bool moreJSObjs() const {
247         return _nextjsobj != 0 && _nextjsobj != _theEnd;
248     }
249 
250     BSONObj nextJsObj();
251 
msg()252     const Message& msg() const {
253         return _msg;
254     }
255 
markGet()256     const char* markGet() const {
257         return _nextjsobj;
258     }
259 
markSet()260     void markSet() {
261         _mark = _nextjsobj;
262     }
263 
264     void markReset(const char* toMark);
265 
266 private:
267     // Check if we have enough data to read
268     template <typename T>
269     void checkRead(const char* start, size_t count = 0) const;
270 
271     // Read some type without advancing our position
272     template <typename T>
273     T read() const;
274 
275     // Read some type, and advance our position
276     template <typename T>
277     T readAndAdvance();
278 
279     const Message& _msg;
280     int _reserved;  // flags or zero depending on packet, starts the packet
281 
282     const char* _nsStart;    // start of namespace string, +4 from message start
283     const char* _nextjsobj;  // current position reading packet
284     const char* _theEnd;     // end of packet
285 
286     const char* _mark;
287 
288     unsigned int _nsLen;
289 };
290 
291 /** the query field 'options' can have these bits set: */
292 enum QueryOptions {
293     /** Tailable means cursor is not closed when the last data is retrieved.  rather, the cursor
294      * marks the final object's position.  you can resume using the cursor later, from where it was
295        located, if more data were received.  Set on dbQuery and dbGetMore.
296 
297        like any "latent cursor", the cursor may become invalid at some point -- for example if that
298        final object it references were deleted.  Thus, you should be prepared to requery if you get
299        back ResultFlag_CursorNotFound.
300     */
301     QueryOption_CursorTailable = 1 << 1,
302 
303     /** allow query of replica slave.  normally these return an error except for namespace "local".
304     */
305     QueryOption_SlaveOk = 1 << 2,
306 
307     // findingStart mode is used to find the first operation of interest when
308     // we are scanning through a repl log.  For efficiency in the common case,
309     // where the first operation of interest is closer to the tail than the head,
310     // we start from the tail of the log and work backwards until we find the
311     // first operation of interest.  Then we scan forward from that first operation,
312     // actually returning results to the client.  During the findingStart phase,
313     // we release the db mutex occasionally to avoid blocking the db process for
314     // an extended period of time.
315     QueryOption_OplogReplay = 1 << 3,
316 
317     /** The server normally times out idle cursors after an inactivity period to prevent excess
318      * memory uses
319         Set this option to prevent that.
320     */
321     QueryOption_NoCursorTimeout = 1 << 4,
322 
323     /** Use with QueryOption_CursorTailable.  If we are at the end of the data, block for a while
324      * rather than returning no data. After a timeout period, we do return as normal.
325     */
326     QueryOption_AwaitData = 1 << 5,
327 
328     /** Stream the data down full blast in multiple "more" packages, on the assumption that the
329      * client will fully read all data queried.  Faster when you are pulling a lot of data and know
330      * you want to pull it all down.  Note: it is not allowed to not read all the data unless you
331      * close the connection.
332 
333         Use the query( stdx::function<void(const BSONObj&)> f, ... ) version of the connection's
334         query()
335         method, and it will take care of all the details for you.
336     */
337     QueryOption_Exhaust = 1 << 6,
338 
339     /** When sharded, this means its ok to return partial results
340         Usually we will fail a query if all required shards aren't up
341         If this is set, it'll be a partial result set
342      */
343     QueryOption_PartialResults = 1 << 7,
344 
345     // DBClientCursor reserves flag 1 << 30 to force the use of OP_QUERY.
346 
347     QueryOption_AllSupported = QueryOption_CursorTailable | QueryOption_SlaveOk |
348         QueryOption_OplogReplay | QueryOption_NoCursorTimeout | QueryOption_AwaitData |
349         QueryOption_Exhaust | QueryOption_PartialResults,
350 
351     QueryOption_AllSupportedForSharding = QueryOption_CursorTailable | QueryOption_SlaveOk |
352         QueryOption_OplogReplay | QueryOption_NoCursorTimeout | QueryOption_AwaitData |
353         QueryOption_PartialResults,
354 };
355 
356 /* a request to run a query, received from the database */
357 class QueryMessage {
358 public:
359     const char* ns;
360     int ntoskip;
361     int ntoreturn;
362     int queryOptions;
363     BSONObj query;
364     BSONObj fields;
365 
366     /**
367      * parses the message into the above fields
368      * Warning: constructor mutates DbMessage.
369      */
QueryMessage(DbMessage & d)370     explicit QueryMessage(DbMessage& d) {
371         ns = d.getns();
372         ntoskip = d.pullInt();
373         ntoreturn = d.pullInt();
374         query = d.nextJsObj();
375         if (d.moreJSObjs()) {
376             fields = d.nextJsObj();
377         }
378         queryOptions = DataView(d.msg().header().data()).read<LittleEndian<int32_t>>();
379     }
380 
381     /**
382      * A non-muting constructor from the whole message.
383      */
QueryMessage(const Message & message)384     explicit QueryMessage(const Message& message) {
385         DbMessage dbm(message);
386         *this = QueryMessage(dbm);
387     }
388 };
389 
390 enum InsertOptions {
391     /** With muli-insert keep processing inserts if one fails */
392     InsertOption_ContinueOnError = 1 << 0
393 };
394 
395 /**
396  * Builds a legacy OP_INSERT message.
397  */
398 Message makeInsertMessage(StringData ns, const BSONObj* objs, size_t count, int flags = 0);
399 inline Message makeInsertMessage(StringData ns, const BSONObj& obj, int flags = 0) {
400     return makeInsertMessage(ns, &obj, 1, flags);
401 }
402 
403 enum UpdateOptions {
404     /** Upsert - that is, insert the item if no matching item is found. */
405     UpdateOption_Upsert = 1 << 0,
406 
407     /** Update multiple documents (if multiple documents match query expression).
408        (Default is update a single document and stop.) */
409     UpdateOption_Multi = 1 << 1,
410 
411     /** flag from mongo saying this update went everywhere */
412     UpdateOption_Broadcast = 1 << 2
413 };
414 
415 /**
416  * Builds a legacy OP_UPDATE message.
417  */
418 Message makeUpdateMessage(StringData ns, BSONObj query, BSONObj update, int flags = 0);
419 
420 enum RemoveOptions {
421     /** only delete one option */
422     RemoveOption_JustOne = 1 << 0,
423 
424     /** flag from mongo saying this update went everywhere */
425     RemoveOption_Broadcast = 1 << 1
426 };
427 
428 /**
429  * Builds a legacy OP_REMOVE message.
430  */
431 Message makeRemoveMessage(StringData ns, BSONObj query, int flags = 0);
432 
433 /**
434  * Builds a legacy OP_KILLCURSORS message.
435  */
436 Message makeKillCursorsMessage(long long cursorId);
437 
438 /**
439  * Builds a legacy OP_GETMORE message.
440  */
441 Message makeGetMoreMessage(StringData ns, long long cursorId, int nToReturn, int flags = 0);
442 
443 /**
444  * A response to a DbMessage.
445  *
446  * Order of fields makes DbResponse{funcReturningMessage()} valid.
447  */
448 struct DbResponse {
449     Message response;       // If empty, nothing will be returned to the client.
450     std::string exhaustNS;  // Namespace of cursor if exhaust mode, else "".
451 };
452 
453 /**
454  * Prepares query replies to legacy finds (opReply to dbQuery) in place. This is also used for
455  * command responses that don't use the new dbCommand protocol.
456  */
457 class OpQueryReplyBuilder {
458     MONGO_DISALLOW_COPYING(OpQueryReplyBuilder);
459 
460 public:
461     OpQueryReplyBuilder();
462 
463     /**
464      * Returns the BufBuilder that should be used for placing result objects. It will be positioned
465      * where the first (or next) object should go.
466      *
467      * You must finish the BSONObjBuilder that uses this (by destruction or calling doneFast())
468      * before calling any more methods on this object.
469      */
bufBuilderForResults()470     BufBuilder& bufBuilderForResults() {
471         return _buffer;
472     }
473 
474     /**
475      * Finishes the reply and returns the message buffer.
476      */
477     Message toQueryReply(int queryResultFlags,
478                          int nReturned,
479                          int startingFrom = 0,
480                          long long cursorId = 0);
481 
482     /**
483      * Similar to toQueryReply() but used for replying to a command.
484      */
toCommandReply()485     Message toCommandReply() {
486         return toQueryReply(0, 1);
487     }
488 
489 private:
490     BufBuilder _buffer;
491 };
492 
493 /**
494  * Helper to build a DbResponse from a buffer containing an OP_QUERY response.
495  */
496 DbResponse replyToQuery(int queryResultFlags,
497                         const void* data,
498                         int size,
499                         int nReturned,
500                         int startingFrom = 0,
501                         long long cursorId = 0);
502 
503 
504 /**
505  * Helper to build a DbRespose for OP_QUERY with a single reply object.
506  */
507 inline DbResponse replyToQuery(const BSONObj& obj, int queryResultFlags = 0) {
508     return replyToQuery(queryResultFlags, obj.objdata(), obj.objsize(), /*nReturned*/ 1);
509 }
510 }  // namespace mongo
511