1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/s/client/shard_remote.h"
36
37 #include <algorithm>
38 #include <string>
39
40 #include "mongo/client/fetcher.h"
41 #include "mongo/client/read_preference.h"
42 #include "mongo/client/remote_command_retry_scheduler.h"
43 #include "mongo/client/remote_command_targeter.h"
44 #include "mongo/client/replica_set_monitor.h"
45 #include "mongo/db/jsobj.h"
46 #include "mongo/db/operation_context.h"
47 #include "mongo/db/query/query_request.h"
48 #include "mongo/db/repl/read_concern_args.h"
49 #include "mongo/db/server_parameters.h"
50 #include "mongo/executor/task_executor_pool.h"
51 #include "mongo/rpc/get_status_from_command_result.h"
52 #include "mongo/rpc/metadata/repl_set_metadata.h"
53 #include "mongo/rpc/metadata/tracking_metadata.h"
54 #include "mongo/s/grid.h"
55 #include "mongo/util/log.h"
56 #include "mongo/util/mongoutils/str.h"
57 #include "mongo/util/time_support.h"
58
59 namespace mongo {
60
61 using std::string;
62
63 using executor::RemoteCommandRequest;
64 using executor::RemoteCommandResponse;
65 using executor::TaskExecutor;
66 using rpc::TrackingMetadata;
67 using RemoteCommandCallbackArgs = TaskExecutor::RemoteCommandCallbackArgs;
68
69 namespace {
70 // Include kReplSetMetadataFieldName in a request to get the shard's ReplSetMetadata in the
71 // response.
72 const BSONObj kReplMetadata(BSON(rpc::kReplSetMetadataFieldName << 1));
73
74 constexpr bool internalProhibitShardOperationRetryByDefault = false;
75 MONGO_EXPORT_SERVER_PARAMETER(internalProhibitShardOperationRetry,
76 bool,
77 internalProhibitShardOperationRetryByDefault);
78
79 /**
80 * Returns a new BSONObj describing the same command and arguments as 'cmdObj', but with maxTimeMS
81 * replaced by maxTimeMSOverride (or removed if maxTimeMSOverride is Milliseconds::max()).
82 */
appendMaxTimeToCmdObj(Milliseconds maxTimeMSOverride,const BSONObj & cmdObj)83 BSONObj appendMaxTimeToCmdObj(Milliseconds maxTimeMSOverride, const BSONObj& cmdObj) {
84 BSONObjBuilder updatedCmdBuilder;
85
86 // Remove the user provided maxTimeMS so we can attach the one from the override
87 for (const auto& elem : cmdObj) {
88 if (!str::equals(elem.fieldName(), QueryRequest::cmdOptionMaxTimeMS)) {
89 updatedCmdBuilder.append(elem);
90 }
91 }
92
93 if (maxTimeMSOverride < Milliseconds::max()) {
94 updatedCmdBuilder.append(QueryRequest::cmdOptionMaxTimeMS,
95 durationCount<Milliseconds>(maxTimeMSOverride));
96 }
97
98 return updatedCmdBuilder.obj();
99 }
100
101 } // unnamed namespace
102
ShardRemote(const ShardId & id,const ConnectionString & originalConnString,std::unique_ptr<RemoteCommandTargeter> targeter)103 ShardRemote::ShardRemote(const ShardId& id,
104 const ConnectionString& originalConnString,
105 std::unique_ptr<RemoteCommandTargeter> targeter)
106 : Shard(id), _originalConnString(originalConnString), _targeter(targeter.release()) {}
107
108 ShardRemote::~ShardRemote() = default;
109
isRetriableError(ErrorCodes::Error code,RetryPolicy options)110 bool ShardRemote::isRetriableError(ErrorCodes::Error code, RetryPolicy options) {
111 if (internalProhibitShardOperationRetry.loadRelaxed()) {
112 return false;
113 }
114
115 if (options == RetryPolicy::kNoRetry) {
116 return false;
117 }
118
119 const auto& retriableErrors = options == RetryPolicy::kIdempotent
120 ? RemoteCommandRetryScheduler::kAllRetriableErrors
121 : RemoteCommandRetryScheduler::kNotMasterErrors;
122 return std::find(retriableErrors.begin(), retriableErrors.end(), code) != retriableErrors.end();
123 }
124
getConnString() const125 const ConnectionString ShardRemote::getConnString() const {
126 return _targeter->connectionString();
127 }
128
129 // Any error code changes should possibly also be made to Shard::shouldErrorBePropagated!
updateReplSetMonitor(const HostAndPort & remoteHost,const Status & remoteCommandStatus)130 void ShardRemote::updateReplSetMonitor(const HostAndPort& remoteHost,
131 const Status& remoteCommandStatus) {
132 if (remoteCommandStatus.isOK())
133 return;
134
135 if (ErrorCodes::isNotMasterError(remoteCommandStatus.code())) {
136 _targeter->markHostNotMaster(remoteHost, remoteCommandStatus);
137 } else if (ErrorCodes::isNetworkError(remoteCommandStatus.code())) {
138 _targeter->markHostUnreachable(remoteHost, remoteCommandStatus);
139 } else if (remoteCommandStatus == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
140 _targeter->markHostUnreachable(remoteHost, remoteCommandStatus);
141 }
142 }
143
toString() const144 std::string ShardRemote::toString() const {
145 return getId().toString() + ":" + _originalConnString.toString();
146 }
147
_appendMetadataForCommand(OperationContext * opCtx,const ReadPreferenceSetting & readPref)148 BSONObj ShardRemote::_appendMetadataForCommand(OperationContext* opCtx,
149 const ReadPreferenceSetting& readPref) {
150 BSONObjBuilder builder;
151 if (logger::globalLogDomain()->shouldLog(
152 logger::LogComponent::kTracking,
153 logger::LogSeverity::Debug(1))) { // avoid performance overhead if not logging
154 if (!TrackingMetadata::get(opCtx).getIsLogged()) {
155 if (!TrackingMetadata::get(opCtx).getOperId()) {
156 TrackingMetadata::get(opCtx).initWithOperName("NotSet");
157 }
158 MONGO_LOG_COMPONENT(1, logger::LogComponent::kTracking)
159 << TrackingMetadata::get(opCtx).toString();
160 TrackingMetadata::get(opCtx).setIsLogged(true);
161 }
162
163 TrackingMetadata metadata = TrackingMetadata::get(opCtx).constructChildMetadata();
164 metadata.writeToMetadata(&builder);
165 }
166
167 readPref.toContainingBSON(&builder);
168
169 if (isConfig())
170 builder.appendElements(kReplMetadata);
171
172 return builder.obj();
173 }
174
_runCommand(OperationContext * opCtx,const ReadPreferenceSetting & readPref,const string & dbName,Milliseconds maxTimeMSOverride,const BSONObj & cmdObj)175 StatusWith<Shard::CommandResponse> ShardRemote::_runCommand(OperationContext* opCtx,
176 const ReadPreferenceSetting& readPref,
177 const string& dbName,
178 Milliseconds maxTimeMSOverride,
179 const BSONObj& cmdObj) {
180
181 ReadPreferenceSetting readPrefWithMinOpTime(readPref);
182 if (getId() == "config") {
183 readPrefWithMinOpTime.minOpTime = grid.configOpTime();
184 }
185 const auto swHost = _targeter->findHost(opCtx, readPrefWithMinOpTime);
186 if (!swHost.isOK()) {
187 return swHost.getStatus();
188 }
189 const auto host = std::move(swHost.getValue());
190
191 const Milliseconds requestTimeout =
192 std::min(opCtx->getRemainingMaxTimeMillis(), maxTimeMSOverride);
193
194 const RemoteCommandRequest request(
195 host,
196 dbName,
197 appendMaxTimeToCmdObj(requestTimeout, cmdObj),
198 _appendMetadataForCommand(opCtx, readPrefWithMinOpTime),
199 opCtx,
200 requestTimeout < Milliseconds::max() ? requestTimeout : RemoteCommandRequest::kNoTimeout);
201
202 RemoteCommandResponse response =
203 Status(ErrorCodes::InternalError,
204 str::stream() << "Failed to run remote command request " << request.toString());
205
206 TaskExecutor* executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
207 auto swCallbackHandle = executor->scheduleRemoteCommand(
208 request, [&response](const RemoteCommandCallbackArgs& args) { response = args.response; });
209 if (!swCallbackHandle.isOK()) {
210 return swCallbackHandle.getStatus();
211 }
212
213 // Block until the command is carried out
214 executor->wait(swCallbackHandle.getValue());
215
216 updateReplSetMonitor(host, response.status);
217
218 if (!response.status.isOK()) {
219 if (ErrorCodes::isExceededTimeLimitError(response.status.code())) {
220 LOG(0) << "Operation timed out with status " << redact(response.status);
221 }
222 return response.status;
223 }
224
225 auto result = response.data.getOwned();
226 auto commandStatus = getStatusFromCommandResult(result);
227 auto writeConcernStatus = getWriteConcernStatusFromCommandResult(result);
228
229 updateReplSetMonitor(host, commandStatus);
230 updateReplSetMonitor(host, writeConcernStatus);
231
232 return Shard::CommandResponse(std::move(host),
233 std::move(result),
234 response.metadata.getOwned(),
235 std::move(commandStatus),
236 std::move(writeConcernStatus));
237 }
238
_exhaustiveFindOnConfig(OperationContext * opCtx,const ReadPreferenceSetting & readPref,const repl::ReadConcernLevel & readConcernLevel,const NamespaceString & nss,const BSONObj & query,const BSONObj & sort,boost::optional<long long> limit)239 StatusWith<Shard::QueryResponse> ShardRemote::_exhaustiveFindOnConfig(
240 OperationContext* opCtx,
241 const ReadPreferenceSetting& readPref,
242 const repl::ReadConcernLevel& readConcernLevel,
243 const NamespaceString& nss,
244 const BSONObj& query,
245 const BSONObj& sort,
246 boost::optional<long long> limit) {
247 invariant(getId() == "config");
248 ReadPreferenceSetting readPrefWithMinOpTime(readPref);
249 readPrefWithMinOpTime.minOpTime = grid.configOpTime();
250
251 const auto host = _targeter->findHost(opCtx, readPrefWithMinOpTime);
252 if (!host.isOK()) {
253 return host.getStatus();
254 }
255
256 QueryResponse response;
257
258 // If for some reason the callback never gets invoked, we will return this status in response.
259 Status status = Status(ErrorCodes::InternalError, "Internal error running find command");
260
261 auto fetcherCallback =
262 [this, &status, &response](const Fetcher::QueryResponseStatus& dataStatus,
263 Fetcher::NextAction* nextAction,
264 BSONObjBuilder* getMoreBob) {
265
266 // Throw out any accumulated results on error
267 if (!dataStatus.isOK()) {
268 status = dataStatus.getStatus();
269 response.docs.clear();
270 return;
271 }
272
273 const auto& data = dataStatus.getValue();
274
275 if (data.otherFields.metadata.hasField(rpc::kReplSetMetadataFieldName)) {
276 auto replParseStatus =
277 rpc::ReplSetMetadata::readFromMetadata(data.otherFields.metadata);
278 if (!replParseStatus.isOK()) {
279 status = replParseStatus.getStatus();
280 response.docs.clear();
281 return;
282 }
283
284 const auto& replSetMetadata = replParseStatus.getValue();
285 response.opTime = replSetMetadata.getLastOpCommitted();
286 }
287
288 for (const BSONObj& doc : data.documents) {
289 response.docs.push_back(doc.getOwned());
290 }
291
292 status = Status::OK();
293
294 if (!getMoreBob) {
295 return;
296 }
297 getMoreBob->append("getMore", data.cursorId);
298 getMoreBob->append("collection", data.nss.coll());
299 };
300
301 BSONObj readConcernObj;
302 {
303 invariant(readConcernLevel == repl::ReadConcernLevel::kMajorityReadConcern);
304 const repl::ReadConcernArgs readConcern{grid.configOpTime(), readConcernLevel};
305 BSONObjBuilder bob;
306 readConcern.appendInfo(&bob);
307 readConcernObj =
308 bob.done().getObjectField(repl::ReadConcernArgs::kReadConcernFieldName).getOwned();
309 }
310
311 const Milliseconds maxTimeMS =
312 std::min(opCtx->getRemainingMaxTimeMillis(), kDefaultConfigCommandTimeout);
313
314 BSONObjBuilder findCmdBuilder;
315
316 {
317 QueryRequest qr(nss);
318 qr.setFilter(query);
319 qr.setSort(sort);
320 qr.setReadConcern(readConcernObj);
321 qr.setLimit(limit);
322
323 if (maxTimeMS < Milliseconds::max()) {
324 qr.setMaxTimeMS(durationCount<Milliseconds>(maxTimeMS));
325 }
326
327 qr.asFindCommand(&findCmdBuilder);
328 }
329
330 Fetcher fetcher(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
331 host.getValue(),
332 nss.db().toString(),
333 findCmdBuilder.done(),
334 fetcherCallback,
335 _appendMetadataForCommand(opCtx, readPrefWithMinOpTime),
336 maxTimeMS /* find network timeout */,
337 maxTimeMS /* getMore network timeout */);
338 Status scheduleStatus = fetcher.schedule();
339 if (!scheduleStatus.isOK()) {
340 return scheduleStatus;
341 }
342
343 fetcher.join();
344
345 updateReplSetMonitor(host.getValue(), status);
346
347 if (!status.isOK()) {
348 if (ErrorCodes::isExceededTimeLimitError(status.code())) {
349 LOG(0) << "Operation timed out " << causedBy(status);
350 }
351 return status;
352 }
353
354 return response;
355 }
356
createIndexOnConfig(OperationContext * opCtx,const NamespaceString & ns,const BSONObj & keys,bool unique)357 Status ShardRemote::createIndexOnConfig(OperationContext* opCtx,
358 const NamespaceString& ns,
359 const BSONObj& keys,
360 bool unique) {
361 MONGO_UNREACHABLE;
362 }
363
364 } // namespace mongo
365