1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/db/query/cursor_response.h"
36
37 #include "mongo/bson/bsontypes.h"
38 #include "mongo/rpc/get_status_from_command_result.h"
39 #include "mongo/s/chunk_version.h"
40
41 namespace mongo {
42
43 namespace {
44
45 const char kCursorField[] = "cursor";
46 const char kIdField[] = "id";
47 const char kNsField[] = "ns";
48 const char kBatchField[] = "nextBatch";
49 const char kBatchFieldInitial[] = "firstBatch";
50 const char kInternalLatestOplogTimestampField[] = "$_internalLatestOplogTimestamp";
51
52 } // namespace
53
CursorResponseBuilder(bool isInitialResponse,BSONObjBuilder * commandResponse)54 CursorResponseBuilder::CursorResponseBuilder(bool isInitialResponse,
55 BSONObjBuilder* commandResponse)
56 : _responseInitialLen(commandResponse->bb().len()),
57 _commandResponse(commandResponse),
58 _cursorObject(commandResponse->subobjStart(kCursorField)),
59 _batch(_cursorObject.subarrayStart(isInitialResponse ? kBatchFieldInitial : kBatchField)) {}
60
done(CursorId cursorId,StringData cursorNamespace)61 void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace) {
62 invariant(_active);
63 _batch.doneFast();
64 _cursorObject.append(kIdField, cursorId);
65 _cursorObject.append(kNsField, cursorNamespace);
66 _cursorObject.doneFast();
67 if (!_latestOplogTimestamp.isNull()) {
68 _commandResponse->append(kInternalLatestOplogTimestampField, _latestOplogTimestamp);
69 }
70 _active = false;
71 }
72
abandon()73 void CursorResponseBuilder::abandon() {
74 invariant(_active);
75 _batch.doneFast();
76 _cursorObject.doneFast();
77 _commandResponse->bb().setlen(_responseInitialLen); // Removes everything we've added.
78 _active = false;
79 }
80
appendCursorResponseObject(long long cursorId,StringData cursorNamespace,BSONArray firstBatch,BSONObjBuilder * builder)81 void appendCursorResponseObject(long long cursorId,
82 StringData cursorNamespace,
83 BSONArray firstBatch,
84 BSONObjBuilder* builder) {
85 BSONObjBuilder cursorObj(builder->subobjStart(kCursorField));
86 cursorObj.append(kIdField, cursorId);
87 cursorObj.append(kNsField, cursorNamespace);
88 cursorObj.append(kBatchFieldInitial, firstBatch);
89 cursorObj.done();
90 }
91
appendGetMoreResponseObject(long long cursorId,StringData cursorNamespace,BSONArray nextBatch,BSONObjBuilder * builder)92 void appendGetMoreResponseObject(long long cursorId,
93 StringData cursorNamespace,
94 BSONArray nextBatch,
95 BSONObjBuilder* builder) {
96 BSONObjBuilder cursorObj(builder->subobjStart(kCursorField));
97 cursorObj.append(kIdField, cursorId);
98 cursorObj.append(kNsField, cursorNamespace);
99 cursorObj.append(kBatchField, nextBatch);
100 cursorObj.done();
101 }
102
CursorResponse(NamespaceString nss,CursorId cursorId,std::vector<BSONObj> batch,boost::optional<long long> numReturnedSoFar,boost::optional<Timestamp> latestOplogTimestamp,boost::optional<BSONObj> writeConcernError)103 CursorResponse::CursorResponse(NamespaceString nss,
104 CursorId cursorId,
105 std::vector<BSONObj> batch,
106 boost::optional<long long> numReturnedSoFar,
107 boost::optional<Timestamp> latestOplogTimestamp,
108 boost::optional<BSONObj> writeConcernError)
109 : _nss(std::move(nss)),
110 _cursorId(cursorId),
111 _batch(std::move(batch)),
112 _numReturnedSoFar(numReturnedSoFar),
113 _latestOplogTimestamp(latestOplogTimestamp),
114 _writeConcernError(std::move(writeConcernError)) {}
115
parseFromBSON(const BSONObj & cmdResponse)116 StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdResponse) {
117 Status cmdStatus = getStatusFromCommandResult(cmdResponse);
118 if (!cmdStatus.isOK()) {
119 if (ErrorCodes::isStaleShardingError(cmdStatus.code())) {
120 auto vWanted = ChunkVersion::fromBSON(cmdResponse, "vWanted");
121 auto vReceived = ChunkVersion::fromBSON(cmdResponse, "vReceived");
122 if (!vWanted.hasEqualEpoch(vReceived)) {
123 return Status(ErrorCodes::StaleEpoch, cmdStatus.reason());
124 }
125 }
126 return cmdStatus;
127 }
128
129 std::string fullns;
130 BSONObj batchObj;
131 CursorId cursorId;
132
133 BSONElement cursorElt = cmdResponse[kCursorField];
134 if (cursorElt.type() != BSONType::Object) {
135 return {ErrorCodes::TypeMismatch,
136 str::stream() << "Field '" << kCursorField << "' must be a nested object in: "
137 << cmdResponse};
138 }
139 BSONObj cursorObj = cursorElt.Obj();
140
141 BSONElement idElt = cursorObj[kIdField];
142 if (idElt.type() != BSONType::NumberLong) {
143 return {
144 ErrorCodes::TypeMismatch,
145 str::stream() << "Field '" << kIdField << "' must be of type long in: " << cmdResponse};
146 }
147 cursorId = idElt.Long();
148
149 BSONElement nsElt = cursorObj[kNsField];
150 if (nsElt.type() != BSONType::String) {
151 return {ErrorCodes::TypeMismatch,
152 str::stream() << "Field '" << kNsField << "' must be of type string in: "
153 << cmdResponse};
154 }
155 fullns = nsElt.String();
156
157 BSONElement batchElt = cursorObj[kBatchField];
158 if (batchElt.eoo()) {
159 batchElt = cursorObj[kBatchFieldInitial];
160 }
161
162 if (batchElt.type() != BSONType::Array) {
163 return {ErrorCodes::TypeMismatch,
164 str::stream() << "Must have array field '" << kBatchFieldInitial << "' or '"
165 << kBatchField
166 << "' in: "
167 << cmdResponse};
168 }
169 batchObj = batchElt.Obj();
170
171 std::vector<BSONObj> batch;
172 for (BSONElement elt : batchObj) {
173 if (elt.type() != BSONType::Object) {
174 return {ErrorCodes::BadValue,
175 str::stream() << "getMore response batch contains a non-object element: "
176 << elt};
177 }
178
179 batch.push_back(elt.Obj());
180 }
181
182 for (auto& doc : batch) {
183 doc.shareOwnershipWith(cmdResponse);
184 }
185
186 auto latestOplogTimestampElem = cmdResponse[kInternalLatestOplogTimestampField];
187 if (latestOplogTimestampElem && latestOplogTimestampElem.type() != BSONType::bsonTimestamp) {
188 return {
189 ErrorCodes::BadValue,
190 str::stream()
191 << "invalid _internalLatestOplogTimestamp format; expected timestamp but found: "
192 << latestOplogTimestampElem.type()};
193 }
194
195 auto writeConcernError = cmdResponse["writeConcernError"];
196
197 if (writeConcernError && writeConcernError.type() != BSONType::Object) {
198 return {ErrorCodes::BadValue,
199 str::stream() << "invalid writeConcernError format; expected object but found: "
200 << writeConcernError.type()};
201 }
202
203 return {{NamespaceString(fullns),
204 cursorId,
205 std::move(batch),
206 boost::none,
207 latestOplogTimestampElem ? latestOplogTimestampElem.timestamp()
208 : boost::optional<Timestamp>{},
209 writeConcernError ? writeConcernError.Obj().getOwned() : boost::optional<BSONObj>{}}};
210 }
211
addToBSON(CursorResponse::ResponseType responseType,BSONObjBuilder * builder) const212 void CursorResponse::addToBSON(CursorResponse::ResponseType responseType,
213 BSONObjBuilder* builder) const {
214 BSONObjBuilder cursorBuilder(builder->subobjStart(kCursorField));
215
216 cursorBuilder.append(kIdField, _cursorId);
217 cursorBuilder.append(kNsField, _nss.ns());
218
219 const char* batchFieldName =
220 (responseType == ResponseType::InitialResponse) ? kBatchFieldInitial : kBatchField;
221 BSONArrayBuilder batchBuilder(cursorBuilder.subarrayStart(batchFieldName));
222 for (const BSONObj& obj : _batch) {
223 batchBuilder.append(obj);
224 }
225 batchBuilder.doneFast();
226
227 cursorBuilder.doneFast();
228
229 if (_latestOplogTimestamp) {
230 builder->append(kInternalLatestOplogTimestampField, *_latestOplogTimestamp);
231 }
232 builder->append("ok", 1.0);
233
234 if (_writeConcernError) {
235 builder->append("writeConcernError", *_writeConcernError);
236 }
237 }
238
toBSON(CursorResponse::ResponseType responseType) const239 BSONObj CursorResponse::toBSON(CursorResponse::ResponseType responseType) const {
240 BSONObjBuilder builder;
241 addToBSON(responseType, &builder);
242 return builder.obj();
243 }
244
245 } // namespace mongo
246