1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/db/query/cursor_response.h"
36 
37 #include "mongo/bson/bsontypes.h"
38 #include "mongo/rpc/get_status_from_command_result.h"
39 #include "mongo/s/chunk_version.h"
40 
41 namespace mongo {
42 
43 namespace {
44 
45 const char kCursorField[] = "cursor";
46 const char kIdField[] = "id";
47 const char kNsField[] = "ns";
48 const char kBatchField[] = "nextBatch";
49 const char kBatchFieldInitial[] = "firstBatch";
50 const char kInternalLatestOplogTimestampField[] = "$_internalLatestOplogTimestamp";
51 
52 }  // namespace
53 
CursorResponseBuilder(bool isInitialResponse,BSONObjBuilder * commandResponse)54 CursorResponseBuilder::CursorResponseBuilder(bool isInitialResponse,
55                                              BSONObjBuilder* commandResponse)
56     : _responseInitialLen(commandResponse->bb().len()),
57       _commandResponse(commandResponse),
58       _cursorObject(commandResponse->subobjStart(kCursorField)),
59       _batch(_cursorObject.subarrayStart(isInitialResponse ? kBatchFieldInitial : kBatchField)) {}
60 
done(CursorId cursorId,StringData cursorNamespace)61 void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace) {
62     invariant(_active);
63     _batch.doneFast();
64     _cursorObject.append(kIdField, cursorId);
65     _cursorObject.append(kNsField, cursorNamespace);
66     _cursorObject.doneFast();
67     if (!_latestOplogTimestamp.isNull()) {
68         _commandResponse->append(kInternalLatestOplogTimestampField, _latestOplogTimestamp);
69     }
70     _active = false;
71 }
72 
abandon()73 void CursorResponseBuilder::abandon() {
74     invariant(_active);
75     _batch.doneFast();
76     _cursorObject.doneFast();
77     _commandResponse->bb().setlen(_responseInitialLen);  // Removes everything we've added.
78     _active = false;
79 }
80 
appendCursorResponseObject(long long cursorId,StringData cursorNamespace,BSONArray firstBatch,BSONObjBuilder * builder)81 void appendCursorResponseObject(long long cursorId,
82                                 StringData cursorNamespace,
83                                 BSONArray firstBatch,
84                                 BSONObjBuilder* builder) {
85     BSONObjBuilder cursorObj(builder->subobjStart(kCursorField));
86     cursorObj.append(kIdField, cursorId);
87     cursorObj.append(kNsField, cursorNamespace);
88     cursorObj.append(kBatchFieldInitial, firstBatch);
89     cursorObj.done();
90 }
91 
appendGetMoreResponseObject(long long cursorId,StringData cursorNamespace,BSONArray nextBatch,BSONObjBuilder * builder)92 void appendGetMoreResponseObject(long long cursorId,
93                                  StringData cursorNamespace,
94                                  BSONArray nextBatch,
95                                  BSONObjBuilder* builder) {
96     BSONObjBuilder cursorObj(builder->subobjStart(kCursorField));
97     cursorObj.append(kIdField, cursorId);
98     cursorObj.append(kNsField, cursorNamespace);
99     cursorObj.append(kBatchField, nextBatch);
100     cursorObj.done();
101 }
102 
CursorResponse(NamespaceString nss,CursorId cursorId,std::vector<BSONObj> batch,boost::optional<long long> numReturnedSoFar,boost::optional<Timestamp> latestOplogTimestamp,boost::optional<BSONObj> writeConcernError)103 CursorResponse::CursorResponse(NamespaceString nss,
104                                CursorId cursorId,
105                                std::vector<BSONObj> batch,
106                                boost::optional<long long> numReturnedSoFar,
107                                boost::optional<Timestamp> latestOplogTimestamp,
108                                boost::optional<BSONObj> writeConcernError)
109     : _nss(std::move(nss)),
110       _cursorId(cursorId),
111       _batch(std::move(batch)),
112       _numReturnedSoFar(numReturnedSoFar),
113       _latestOplogTimestamp(latestOplogTimestamp),
114       _writeConcernError(std::move(writeConcernError)) {}
115 
parseFromBSON(const BSONObj & cmdResponse)116 StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdResponse) {
117     Status cmdStatus = getStatusFromCommandResult(cmdResponse);
118     if (!cmdStatus.isOK()) {
119         if (ErrorCodes::isStaleShardingError(cmdStatus.code())) {
120             auto vWanted = ChunkVersion::fromBSON(cmdResponse, "vWanted");
121             auto vReceived = ChunkVersion::fromBSON(cmdResponse, "vReceived");
122             if (!vWanted.hasEqualEpoch(vReceived)) {
123                 return Status(ErrorCodes::StaleEpoch, cmdStatus.reason());
124             }
125         }
126         return cmdStatus;
127     }
128 
129     std::string fullns;
130     BSONObj batchObj;
131     CursorId cursorId;
132 
133     BSONElement cursorElt = cmdResponse[kCursorField];
134     if (cursorElt.type() != BSONType::Object) {
135         return {ErrorCodes::TypeMismatch,
136                 str::stream() << "Field '" << kCursorField << "' must be a nested object in: "
137                               << cmdResponse};
138     }
139     BSONObj cursorObj = cursorElt.Obj();
140 
141     BSONElement idElt = cursorObj[kIdField];
142     if (idElt.type() != BSONType::NumberLong) {
143         return {
144             ErrorCodes::TypeMismatch,
145             str::stream() << "Field '" << kIdField << "' must be of type long in: " << cmdResponse};
146     }
147     cursorId = idElt.Long();
148 
149     BSONElement nsElt = cursorObj[kNsField];
150     if (nsElt.type() != BSONType::String) {
151         return {ErrorCodes::TypeMismatch,
152                 str::stream() << "Field '" << kNsField << "' must be of type string in: "
153                               << cmdResponse};
154     }
155     fullns = nsElt.String();
156 
157     BSONElement batchElt = cursorObj[kBatchField];
158     if (batchElt.eoo()) {
159         batchElt = cursorObj[kBatchFieldInitial];
160     }
161 
162     if (batchElt.type() != BSONType::Array) {
163         return {ErrorCodes::TypeMismatch,
164                 str::stream() << "Must have array field '" << kBatchFieldInitial << "' or '"
165                               << kBatchField
166                               << "' in: "
167                               << cmdResponse};
168     }
169     batchObj = batchElt.Obj();
170 
171     std::vector<BSONObj> batch;
172     for (BSONElement elt : batchObj) {
173         if (elt.type() != BSONType::Object) {
174             return {ErrorCodes::BadValue,
175                     str::stream() << "getMore response batch contains a non-object element: "
176                                   << elt};
177         }
178 
179         batch.push_back(elt.Obj());
180     }
181 
182     for (auto& doc : batch) {
183         doc.shareOwnershipWith(cmdResponse);
184     }
185 
186     auto latestOplogTimestampElem = cmdResponse[kInternalLatestOplogTimestampField];
187     if (latestOplogTimestampElem && latestOplogTimestampElem.type() != BSONType::bsonTimestamp) {
188         return {
189             ErrorCodes::BadValue,
190             str::stream()
191                 << "invalid _internalLatestOplogTimestamp format; expected timestamp but found: "
192                 << latestOplogTimestampElem.type()};
193     }
194 
195     auto writeConcernError = cmdResponse["writeConcernError"];
196 
197     if (writeConcernError && writeConcernError.type() != BSONType::Object) {
198         return {ErrorCodes::BadValue,
199                 str::stream() << "invalid writeConcernError format; expected object but found: "
200                               << writeConcernError.type()};
201     }
202 
203     return {{NamespaceString(fullns),
204              cursorId,
205              std::move(batch),
206              boost::none,
207              latestOplogTimestampElem ? latestOplogTimestampElem.timestamp()
208                                       : boost::optional<Timestamp>{},
209              writeConcernError ? writeConcernError.Obj().getOwned() : boost::optional<BSONObj>{}}};
210 }
211 
addToBSON(CursorResponse::ResponseType responseType,BSONObjBuilder * builder) const212 void CursorResponse::addToBSON(CursorResponse::ResponseType responseType,
213                                BSONObjBuilder* builder) const {
214     BSONObjBuilder cursorBuilder(builder->subobjStart(kCursorField));
215 
216     cursorBuilder.append(kIdField, _cursorId);
217     cursorBuilder.append(kNsField, _nss.ns());
218 
219     const char* batchFieldName =
220         (responseType == ResponseType::InitialResponse) ? kBatchFieldInitial : kBatchField;
221     BSONArrayBuilder batchBuilder(cursorBuilder.subarrayStart(batchFieldName));
222     for (const BSONObj& obj : _batch) {
223         batchBuilder.append(obj);
224     }
225     batchBuilder.doneFast();
226 
227     cursorBuilder.doneFast();
228 
229     if (_latestOplogTimestamp) {
230         builder->append(kInternalLatestOplogTimestampField, *_latestOplogTimestamp);
231     }
232     builder->append("ok", 1.0);
233 
234     if (_writeConcernError) {
235         builder->append("writeConcernError", *_writeConcernError);
236     }
237 }
238 
toBSON(CursorResponse::ResponseType responseType) const239 BSONObj CursorResponse::toBSON(CursorResponse::ResponseType responseType) const {
240     BSONObjBuilder builder;
241     addToBSON(responseType, &builder);
242     return builder.obj();
243 }
244 
245 }  // namespace mongo
246