1 // dbmessage.h 2 3 4 /** 5 * Copyright (C) 2018-present MongoDB, Inc. 6 * 7 * This program is free software: you can redistribute it and/or modify 8 * it under the terms of the Server Side Public License, version 1, 9 * as published by MongoDB, Inc. 10 * 11 * This program is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 * Server Side Public License for more details. 15 * 16 * You should have received a copy of the Server Side Public License 17 * along with this program. If not, see 18 * <http://www.mongodb.com/licensing/server-side-public-license>. 19 * 20 * As a special exception, the copyright holders give permission to link the 21 * code of portions of this program with the OpenSSL library under certain 22 * conditions as described in each individual source file and distribute 23 * linked combinations including the program with the OpenSSL library. You 24 * must comply with the Server Side Public License in all respects for 25 * all of the code used other than as permitted herein. If you modify file(s) 26 * with this exception, you may extend this exception to your version of the 27 * file(s), but you are not obligated to do so. If you do not wish to do so, 28 * delete this exception statement from your version. If you delete this 29 * exception statement from all source files in the program, then also delete 30 * it in the license file. 31 */ 32 33 #pragma once 34 35 #include "mongo/base/static_assert.h" 36 #include "mongo/bson/bson_validate.h" 37 #include "mongo/client/constants.h" 38 #include "mongo/db/jsobj.h" 39 #include "mongo/db/server_options.h" 40 #include "mongo/util/net/message.h" 41 42 namespace mongo { 43 44 class OperationContext; 45 46 /* db response format 47 48 Query or GetMore: // see struct QueryResult 49 int resultFlags; 50 int64 cursorID; 51 int startingFrom; 52 int nReturned; 53 list of marshalled JSObjects; 54 */ 55 56 /* db request message format 57 58 unsigned opid; // arbitary; will be echoed back 59 byte operation; 60 int options; 61 62 then for: 63 64 dbInsert: 65 std::string collection; 66 a series of JSObjects 67 dbDelete: 68 std::string collection; 69 int flags=0; // 1=DeleteSingle 70 JSObject query; 71 dbUpdate: 72 std::string collection; 73 int flags; // 1=upsert 74 JSObject query; 75 JSObject objectToUpdate; 76 objectToUpdate may include { $inc: <field> } or { $set: ... }, see struct Mod. 77 dbQuery: 78 std::string collection; 79 int nToSkip; 80 int nToReturn; // how many you want back as the beginning of the cursor data (0=no limit) 81 // greater than zero is simply a hint on how many objects to send back per 82 // "cursor batch". 83 // a negative number indicates a hard limit. 84 JSObject query; 85 [JSObject fieldsToReturn] 86 dbGetMore: 87 std::string collection; // redundant, might use for security. 88 int nToReturn; 89 int64 cursorID; 90 dbKillCursors=2007: 91 int n; 92 int64 cursorIDs[n]; 93 94 Note that on Update, there is only one object, which is different 95 from insert where you can pass a list of objects to insert in the db. 96 Note that the update field layout is very similar layout to Query. 97 */ 98 99 namespace QueryResult { 100 #pragma pack(1) 101 /* see http://dochub.mongodb.org/core/mongowireprotocol 102 */ 103 struct Layout { 104 MsgData::Layout msgdata; 105 int64_t cursorId; 106 int32_t startingFrom; 107 int32_t nReturned; 108 }; 109 #pragma pack() 110 111 class ConstView { 112 public: ConstView(const char * storage)113 ConstView(const char* storage) : _storage(storage) {} 114 view2ptr()115 const char* view2ptr() const { 116 return storage().view(); 117 } 118 msgdata()119 MsgData::ConstView msgdata() const { 120 return storage().view(offsetof(Layout, msgdata)); 121 } 122 getCursorId()123 int64_t getCursorId() const { 124 return storage().read<LittleEndian<int64_t>>(offsetof(Layout, cursorId)); 125 } 126 getStartingFrom()127 int32_t getStartingFrom() const { 128 return storage().read<LittleEndian<int32_t>>(offsetof(Layout, startingFrom)); 129 } 130 getNReturned()131 int32_t getNReturned() const { 132 return storage().read<LittleEndian<int32_t>>(offsetof(Layout, nReturned)); 133 } 134 data()135 const char* data() const { 136 return storage().view(sizeof(Layout)); 137 } 138 dataLen()139 int32_t dataLen() const { 140 return msgdata().getLen() - sizeof(Layout); 141 } 142 143 protected: storage()144 const ConstDataView& storage() const { 145 return _storage; 146 } 147 148 private: 149 ConstDataView _storage; 150 }; 151 152 class View : public ConstView { 153 public: View(char * data)154 View(char* data) : ConstView(data) {} 155 156 using ConstView::view2ptr; view2ptr()157 char* view2ptr() { 158 return storage().view(); 159 } 160 161 using ConstView::msgdata; msgdata()162 MsgData::View msgdata() { 163 return storage().view(offsetof(Layout, msgdata)); 164 } 165 setCursorId(int64_t value)166 void setCursorId(int64_t value) { 167 storage().write(tagLittleEndian(value), offsetof(Layout, cursorId)); 168 } 169 setStartingFrom(int32_t value)170 void setStartingFrom(int32_t value) { 171 storage().write(tagLittleEndian(value), offsetof(Layout, startingFrom)); 172 } 173 setNReturned(int32_t value)174 void setNReturned(int32_t value) { 175 storage().write(tagLittleEndian(value), offsetof(Layout, nReturned)); 176 } 177 getResultFlags()178 int32_t getResultFlags() { 179 return DataView(msgdata().data()).read<LittleEndian<int32_t>>(); 180 } 181 setResultFlags(int32_t value)182 void setResultFlags(int32_t value) { 183 DataView(msgdata().data()).write(tagLittleEndian(value)); 184 } 185 setResultFlagsToOk()186 void setResultFlagsToOk() { 187 setResultFlags(ResultFlag_AwaitCapable); 188 } 189 initializeResultFlags()190 void initializeResultFlags() { 191 setResultFlags(0); 192 } 193 194 private: storage()195 DataView storage() const { 196 return const_cast<char*>(ConstView::view2ptr()); 197 } 198 }; 199 200 class Value : public EncodedValueStorage<Layout, ConstView, View> { 201 public: Value()202 Value() { 203 MONGO_STATIC_ASSERT(sizeof(Value) == sizeof(Layout)); 204 } 205 Value(ZeroInitTag_t zit)206 Value(ZeroInitTag_t zit) : EncodedValueStorage<Layout, ConstView, View>(zit) {} 207 }; 208 209 } // namespace QueryResult 210 211 /* For the database/server protocol, these objects and functions encapsulate 212 the various messages transmitted over the connection. 213 214 See http://dochub.mongodb.org/core/mongowireprotocol 215 */ 216 class DbMessage { 217 // Assume sizeof(int) == 4 bytes 218 MONGO_STATIC_ASSERT(sizeof(int) == 4); 219 220 public: 221 // Note: DbMessage constructor reads the first 4 bytes and stores it in reserved 222 DbMessage(const Message& msg); 223 224 // Indicates whether this message is expected to have a ns messageShouldHaveNs()225 bool messageShouldHaveNs() const { 226 return (_msg.operation() >= dbUpdate) & (_msg.operation() <= dbDelete); 227 } 228 229 /** the 32 bit field before the ns 230 * track all bit usage here as its cross op 231 * 0: InsertOption_ContinueOnError 232 * 1: fromWriteback 233 */ reservedField()234 int reservedField() const { 235 return _reserved; 236 } 237 238 const char* getns() const; 239 int getQueryNToReturn() const; 240 241 int pullInt(); 242 long long pullInt64(); 243 const char* getArray(size_t count) const; 244 245 /* for insert and update msgs */ moreJSObjs()246 bool moreJSObjs() const { 247 return _nextjsobj != 0 && _nextjsobj != _theEnd; 248 } 249 250 BSONObj nextJsObj(); 251 msg()252 const Message& msg() const { 253 return _msg; 254 } 255 markGet()256 const char* markGet() const { 257 return _nextjsobj; 258 } 259 markSet()260 void markSet() { 261 _mark = _nextjsobj; 262 } 263 264 void markReset(const char* toMark); 265 266 private: 267 // Check if we have enough data to read 268 template <typename T> 269 void checkRead(const char* start, size_t count = 0) const; 270 271 // Read some type without advancing our position 272 template <typename T> 273 T read() const; 274 275 // Read some type, and advance our position 276 template <typename T> 277 T readAndAdvance(); 278 279 const Message& _msg; 280 int _reserved; // flags or zero depending on packet, starts the packet 281 282 const char* _nsStart; // start of namespace string, +4 from message start 283 const char* _nextjsobj; // current position reading packet 284 const char* _theEnd; // end of packet 285 286 const char* _mark; 287 288 unsigned int _nsLen; 289 }; 290 291 /** the query field 'options' can have these bits set: */ 292 enum QueryOptions { 293 /** Tailable means cursor is not closed when the last data is retrieved. rather, the cursor 294 * marks the final object's position. you can resume using the cursor later, from where it was 295 located, if more data were received. Set on dbQuery and dbGetMore. 296 297 like any "latent cursor", the cursor may become invalid at some point -- for example if that 298 final object it references were deleted. Thus, you should be prepared to requery if you get 299 back ResultFlag_CursorNotFound. 300 */ 301 QueryOption_CursorTailable = 1 << 1, 302 303 /** allow query of replica slave. normally these return an error except for namespace "local". 304 */ 305 QueryOption_SlaveOk = 1 << 2, 306 307 // findingStart mode is used to find the first operation of interest when 308 // we are scanning through a repl log. For efficiency in the common case, 309 // where the first operation of interest is closer to the tail than the head, 310 // we start from the tail of the log and work backwards until we find the 311 // first operation of interest. Then we scan forward from that first operation, 312 // actually returning results to the client. During the findingStart phase, 313 // we release the db mutex occasionally to avoid blocking the db process for 314 // an extended period of time. 315 QueryOption_OplogReplay = 1 << 3, 316 317 /** The server normally times out idle cursors after an inactivity period to prevent excess 318 * memory uses 319 Set this option to prevent that. 320 */ 321 QueryOption_NoCursorTimeout = 1 << 4, 322 323 /** Use with QueryOption_CursorTailable. If we are at the end of the data, block for a while 324 * rather than returning no data. After a timeout period, we do return as normal. 325 */ 326 QueryOption_AwaitData = 1 << 5, 327 328 /** Stream the data down full blast in multiple "more" packages, on the assumption that the 329 * client will fully read all data queried. Faster when you are pulling a lot of data and know 330 * you want to pull it all down. Note: it is not allowed to not read all the data unless you 331 * close the connection. 332 333 Use the query( stdx::function<void(const BSONObj&)> f, ... ) version of the connection's 334 query() 335 method, and it will take care of all the details for you. 336 */ 337 QueryOption_Exhaust = 1 << 6, 338 339 /** When sharded, this means its ok to return partial results 340 Usually we will fail a query if all required shards aren't up 341 If this is set, it'll be a partial result set 342 */ 343 QueryOption_PartialResults = 1 << 7, 344 345 // DBClientCursor reserves flag 1 << 30 to force the use of OP_QUERY. 346 347 QueryOption_AllSupported = QueryOption_CursorTailable | QueryOption_SlaveOk | 348 QueryOption_OplogReplay | QueryOption_NoCursorTimeout | QueryOption_AwaitData | 349 QueryOption_Exhaust | QueryOption_PartialResults, 350 351 QueryOption_AllSupportedForSharding = QueryOption_CursorTailable | QueryOption_SlaveOk | 352 QueryOption_OplogReplay | QueryOption_NoCursorTimeout | QueryOption_AwaitData | 353 QueryOption_PartialResults, 354 }; 355 356 /* a request to run a query, received from the database */ 357 class QueryMessage { 358 public: 359 const char* ns; 360 int ntoskip; 361 int ntoreturn; 362 int queryOptions; 363 BSONObj query; 364 BSONObj fields; 365 366 /** 367 * parses the message into the above fields 368 * Warning: constructor mutates DbMessage. 369 */ QueryMessage(DbMessage & d)370 explicit QueryMessage(DbMessage& d) { 371 ns = d.getns(); 372 ntoskip = d.pullInt(); 373 ntoreturn = d.pullInt(); 374 query = d.nextJsObj(); 375 if (d.moreJSObjs()) { 376 fields = d.nextJsObj(); 377 } 378 queryOptions = DataView(d.msg().header().data()).read<LittleEndian<int32_t>>(); 379 } 380 381 /** 382 * A non-muting constructor from the whole message. 383 */ QueryMessage(const Message & message)384 explicit QueryMessage(const Message& message) { 385 DbMessage dbm(message); 386 *this = QueryMessage(dbm); 387 } 388 }; 389 390 enum InsertOptions { 391 /** With muli-insert keep processing inserts if one fails */ 392 InsertOption_ContinueOnError = 1 << 0 393 }; 394 395 /** 396 * Builds a legacy OP_INSERT message. 397 */ 398 Message makeInsertMessage(StringData ns, const BSONObj* objs, size_t count, int flags = 0); 399 inline Message makeInsertMessage(StringData ns, const BSONObj& obj, int flags = 0) { 400 return makeInsertMessage(ns, &obj, 1, flags); 401 } 402 403 enum UpdateOptions { 404 /** Upsert - that is, insert the item if no matching item is found. */ 405 UpdateOption_Upsert = 1 << 0, 406 407 /** Update multiple documents (if multiple documents match query expression). 408 (Default is update a single document and stop.) */ 409 UpdateOption_Multi = 1 << 1, 410 411 /** flag from mongo saying this update went everywhere */ 412 UpdateOption_Broadcast = 1 << 2 413 }; 414 415 /** 416 * Builds a legacy OP_UPDATE message. 417 */ 418 Message makeUpdateMessage(StringData ns, BSONObj query, BSONObj update, int flags = 0); 419 420 enum RemoveOptions { 421 /** only delete one option */ 422 RemoveOption_JustOne = 1 << 0, 423 424 /** flag from mongo saying this update went everywhere */ 425 RemoveOption_Broadcast = 1 << 1 426 }; 427 428 /** 429 * Builds a legacy OP_REMOVE message. 430 */ 431 Message makeRemoveMessage(StringData ns, BSONObj query, int flags = 0); 432 433 /** 434 * Builds a legacy OP_KILLCURSORS message. 435 */ 436 Message makeKillCursorsMessage(long long cursorId); 437 438 /** 439 * Builds a legacy OP_GETMORE message. 440 */ 441 Message makeGetMoreMessage(StringData ns, long long cursorId, int nToReturn, int flags = 0); 442 443 /** 444 * A response to a DbMessage. 445 * 446 * Order of fields makes DbResponse{funcReturningMessage()} valid. 447 */ 448 struct DbResponse { 449 Message response; // If empty, nothing will be returned to the client. 450 std::string exhaustNS; // Namespace of cursor if exhaust mode, else "". 451 }; 452 453 /** 454 * Prepares query replies to legacy finds (opReply to dbQuery) in place. This is also used for 455 * command responses that don't use the new dbCommand protocol. 456 */ 457 class OpQueryReplyBuilder { 458 MONGO_DISALLOW_COPYING(OpQueryReplyBuilder); 459 460 public: 461 OpQueryReplyBuilder(); 462 463 /** 464 * Returns the BufBuilder that should be used for placing result objects. It will be positioned 465 * where the first (or next) object should go. 466 * 467 * You must finish the BSONObjBuilder that uses this (by destruction or calling doneFast()) 468 * before calling any more methods on this object. 469 */ bufBuilderForResults()470 BufBuilder& bufBuilderForResults() { 471 return _buffer; 472 } 473 474 /** 475 * Finishes the reply and returns the message buffer. 476 */ 477 Message toQueryReply(int queryResultFlags, 478 int nReturned, 479 int startingFrom = 0, 480 long long cursorId = 0); 481 482 /** 483 * Similar to toQueryReply() but used for replying to a command. 484 */ toCommandReply()485 Message toCommandReply() { 486 return toQueryReply(0, 1); 487 } 488 489 private: 490 BufBuilder _buffer; 491 }; 492 493 /** 494 * Helper to build a DbResponse from a buffer containing an OP_QUERY response. 495 */ 496 DbResponse replyToQuery(int queryResultFlags, 497 const void* data, 498 int size, 499 int nReturned, 500 int startingFrom = 0, 501 long long cursorId = 0); 502 503 504 /** 505 * Helper to build a DbRespose for OP_QUERY with a single reply object. 506 */ 507 inline DbResponse replyToQuery(const BSONObj& obj, int queryResultFlags = 0) { 508 return replyToQuery(queryResultFlags, obj.objdata(), obj.objsize(), /*nReturned*/ 1); 509 } 510 } // namespace mongo 511