1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #pragma once
32 
33 #include <algorithm>
34 #include <string>
35 #include <vector>
36 
37 #include "mongo/base/string_data.h"
38 #include "mongo/db/jsobj.h"
39 #include "mongo/util/net/message.h"
40 
41 namespace mongo {
42 
43 struct OpMsg {
44     struct DocumentSequence {
45         std::string name;
46         std::vector<BSONObj> objs;
47     };
48 
49     static constexpr uint32_t kChecksumPresent = 1 << 0;
50     static constexpr uint32_t kMoreToCome = 1 << 1;
51 
52     /**
53      * Returns the unvalidated flags for the given message if it is an OP_MSG message.
54      * Returns 0 for other message kinds since they are the equivalent of no flags set.
55      * Throws if the message is too small to hold flags.
56      */
57     static uint32_t flags(const Message& message);
isFlagSetOpMsg58     static bool isFlagSet(const Message& message, uint32_t flag) {
59         return flags(message) & flag;
60     }
61 
62     /**
63      * Replaces the flags in message with the supplied flags.
64      * Only legal on an otherwise valid OP_MSG message.
65      */
66     static void replaceFlags(Message* message, uint32_t flags);
67 
68     /**
69      * Adds flag to the list of set flags in message.
70      * Only legal on an otherwise valid OP_MSG message.
71      */
setFlagOpMsg72     static void setFlag(Message* message, uint32_t flag) {
73         replaceFlags(message, flags(*message) | flag);
74     }
75 
76     /**
77      * Parses and returns an OpMsg containing unowned BSON.
78      */
79     static OpMsg parse(const Message& message);
80 
81     /**
82      * Parses and returns an OpMsg containing owned BSON.
83      */
parseOwnedOpMsg84     static OpMsg parseOwned(const Message& message) {
85         auto msg = parse(message);
86         msg.shareOwnershipWith(message.sharedBuffer());
87         return msg;
88     }
89 
90     Message serialize() const;
91 
92     /**
93      * Makes all BSONObjs in this object share ownership with buffer.
94      */
95     void shareOwnershipWith(const ConstSharedBuffer& buffer);
96 
97     /**
98      * Returns a pointer to the sequence with the given name or nullptr if there are none.
99      */
getSequenceOpMsg100     const DocumentSequence* getSequence(StringData name) const {
101         // Getting N sequences is technically O(N**2) but because there currently is at most 2
102         // sequences, this does either 1 or 2 comparisons. Consider making sequences a StringMap if
103         // there will be many sequences. This problem may also just go away with the IDL project.
104         auto it = std::find_if(
105             sequences.begin(), sequences.end(), [&](const auto& seq) { return seq.name == name; });
106         return it == sequences.end() ? nullptr : &*it;
107     }
108 
109     BSONObj body;
110     std::vector<DocumentSequence> sequences;
111 };
112 
113 /**
114  * An OpMsg that represents a request. This is a separate type from OpMsg only to provide better
115  * type-safety along with a place to hang request-specific methods.
116  */
117 struct OpMsgRequest : public OpMsg {
118     // TODO in C++17 remove constructors so we can use aggregate initialization.
119     OpMsgRequest() = default;
OpMsgRequestOpMsgRequest120     explicit OpMsgRequest(OpMsg&& generic) : OpMsg(std::move(generic)) {}
121 
parseOpMsgRequest122     static OpMsgRequest parse(const Message& message) {
123         return OpMsgRequest(OpMsg::parse(message));
124     }
125 
126     static OpMsgRequest fromDBAndBody(StringData db,
127                                       BSONObj body,
128                                       const BSONObj& extraFields = {}) {
129         OpMsgRequest request;
130         request.body = ([&] {
131             BSONObjBuilder bodyBuilder(std::move(body));
132             bodyBuilder.appendElements(extraFields);
133             bodyBuilder.append("$db", db);
134             return bodyBuilder.obj();
135         }());
136         return request;
137     }
138 
getDatabaseOpMsgRequest139     StringData getDatabase() const {
140         if (auto elem = body["$db"])
141             return elem.checkAndGetStringData();
142         uasserted(40571, "OP_MSG requests require a $db argument");
143     }
144 
getCommandNameOpMsgRequest145     StringData getCommandName() const {
146         return body.firstElementFieldName();
147     }
148 
149     // DO NOT ADD MEMBERS!  Since this type is essentially a strong typedef (see the class comment),
150     // it should not hold more data than an OpMsg. It should be freely interconvertible with OpMsg
151     // without issues like slicing.
152 };
153 
154 /**
155  * Builds an OP_MSG message in-place in a Message buffer.
156  *
157  * While the OP_MSG format imposes no ordering of sections, in order to efficiently support our
158  * usage patterns, this class requires that all document sequences (if any) are built before the
159  * body. This allows repeatedly appending fields to the body until right before it is ready to be
160  * sent.
161  */
162 class OpMsgBuilder {
163     MONGO_DISALLOW_COPYING(OpMsgBuilder);
164 
165 public:
OpMsgBuilder()166     OpMsgBuilder() {
167         skipHeaderAndFlags();
168     }
169 
170     /**
171      * See the documentation for DocSequenceBuilder below.
172      */
173     class DocSequenceBuilder;
174     DocSequenceBuilder beginDocSequence(StringData name);
175 
176     /**
177      * Returns an empty builder for the body.
178      * It is an error to call this if a body has already been begun.  You must destroy or call
179      * done() on the returned builder before calling any methods on this object.
180      */
181     BSONObjBuilder beginBody();
setBody(const BSONObj & body)182     void setBody(const BSONObj& body) {
183         beginBody().appendElements(body);
184     }
185 
186     /**
187      * Returns a builder that can be used to append new fields to the body.
188      * It is an error to call this if beginBody() hasn't been called yet. It is an error to append
189      * elements with field names that already exist in the body. You must destroy or call done() on
190      * the returned builder before calling any methods on this object.
191      *
192      * TODO decide if it is worth keeping the begin/resume distinction in the public API.
193      */
194     BSONObjBuilder resumeBody();
appendElementsToBody(const BSONObj & body)195     void appendElementsToBody(const BSONObj& body) {
196         resumeBody().appendElements(body);
197     }
198 
199     /**
200      * Finish building and return a Message ready to give to the networking layer for transmission.
201      * It is illegal to call any methods on this object after calling this.
202      */
203     Message finish();
204 
205     /**
206      * Reset this object to its initial empty state. All previously appended data is lost.
207      */
reset()208     void reset() {
209         invariant(!_openBuilder);
210 
211         _buf.reset();
212         skipHeaderAndFlags();
213         _bodyStart = 0;
214         _state = kEmpty;
215         _openBuilder = false;
216     }
217 
218     /**
219      * Set to true in tests that need to be able to generate duplicate top-level fields to see how
220      * the server handles them. Is false by default, although the check only happens in debug
221      * builds.
222      */
223     static AtomicBool disableDupeFieldCheck_forTest;
224 
225 private:
226     friend class DocSequenceBuilder;
227 
228     enum State {
229         kEmpty,
230         kDocSequence,
231         kBody,
232         kDone,
233     };
234 
235     void finishDocumentStream(DocSequenceBuilder* docSequenceBuilder);
236 
skipHeaderAndFlags()237     void skipHeaderAndFlags() {
238         _buf.skip(sizeof(MSGHEADER::Layout));  // This is filled in by finish().
239         _buf.appendNum(uint32_t(0));           // flags (currently always 0).
240     }
241 
242     // When adding members, remember to update reset().
243     BufBuilder _buf;
244     int _bodyStart = 0;
245     State _state = kEmpty;
246     bool _openBuilder = false;
247 };
248 
249 /**
250  * Builds a document sequence in an OpMsgBuilder.
251  *
252  * Example:
253  *
254  * auto docSeq = msgBuilder.beginDocSequence("some.sequence");
255  *
256  * docSeq.append(BSON("a" << 1)); // Copy an obj into the sequence
257  *
258  * auto bob = docSeq.appendBuilder(); // Build an obj in-place
259  * bob.append("a", 2);
260  * bob.doneFast();
261  *
262  * docSeq.done(); // Or just let it go out of scope.
263  */
264 class OpMsgBuilder::DocSequenceBuilder {
265     MONGO_DISALLOW_COPYING(DocSequenceBuilder);
266 
267 public:
DocSequenceBuilder(DocSequenceBuilder && other)268     DocSequenceBuilder(DocSequenceBuilder&& other)
269         : _buf(other._buf), _msgBuilder(other._msgBuilder), _sizeOffset(other._sizeOffset) {
270         other._buf = nullptr;
271     }
272 
~DocSequenceBuilder()273     ~DocSequenceBuilder() {
274         if (_buf)
275             done();
276     }
277 
278     /**
279      * Indicates that the caller is done with this stream prior to destruction.
280      * Following this call, it is illegal to call any methods on this object.
281      */
done()282     void done() {
283         invariant(_buf);
284         _msgBuilder->finishDocumentStream(this);
285         _buf = nullptr;
286     }
287 
288     /**
289      * Appends a single document to this sequence.
290      */
append(const BSONObj & obj)291     void append(const BSONObj& obj) {
292         _buf->appendBuf(obj.objdata(), obj.objsize());
293     }
294 
295     /**
296      * Returns a BSONObjBuilder that appends a single document to this sequence in place.
297      * It is illegal to call any methods on this DocSequenceBuilder until the returned builder
298      * is destroyed or done()/doneFast() is called on it.
299      */
appendBuilder()300     BSONObjBuilder appendBuilder() {
301         return BSONObjBuilder(*_buf);
302     }
303 
304 private:
305     friend OpMsgBuilder;
306 
DocSequenceBuilder(OpMsgBuilder * msgBuilder,BufBuilder * buf,int sizeOffset)307     DocSequenceBuilder(OpMsgBuilder* msgBuilder, BufBuilder* buf, int sizeOffset)
308         : _buf(buf), _msgBuilder(msgBuilder), _sizeOffset(sizeOffset) {}
309 
310     BufBuilder* _buf;
311     OpMsgBuilder* const _msgBuilder;
312     const int _sizeOffset;
313 };
314 
315 }  // namespace mongo
316