1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/s/client/shard_remote.h"
36 
37 #include <algorithm>
38 #include <string>
39 
40 #include "mongo/client/fetcher.h"
41 #include "mongo/client/read_preference.h"
42 #include "mongo/client/remote_command_retry_scheduler.h"
43 #include "mongo/client/remote_command_targeter.h"
44 #include "mongo/client/replica_set_monitor.h"
45 #include "mongo/db/jsobj.h"
46 #include "mongo/db/operation_context.h"
47 #include "mongo/db/query/query_request.h"
48 #include "mongo/db/repl/read_concern_args.h"
49 #include "mongo/db/server_parameters.h"
50 #include "mongo/executor/task_executor_pool.h"
51 #include "mongo/rpc/get_status_from_command_result.h"
52 #include "mongo/rpc/metadata/repl_set_metadata.h"
53 #include "mongo/rpc/metadata/tracking_metadata.h"
54 #include "mongo/s/grid.h"
55 #include "mongo/util/log.h"
56 #include "mongo/util/mongoutils/str.h"
57 #include "mongo/util/time_support.h"
58 
59 namespace mongo {
60 
61 using std::string;
62 
63 using executor::RemoteCommandRequest;
64 using executor::RemoteCommandResponse;
65 using executor::TaskExecutor;
66 using rpc::TrackingMetadata;
67 using RemoteCommandCallbackArgs = TaskExecutor::RemoteCommandCallbackArgs;
68 
69 namespace {
70 // Include kReplSetMetadataFieldName in a request to get the shard's ReplSetMetadata in the
71 // response.
72 const BSONObj kReplMetadata(BSON(rpc::kReplSetMetadataFieldName << 1));
73 
74 constexpr bool internalProhibitShardOperationRetryByDefault = false;
75 MONGO_EXPORT_SERVER_PARAMETER(internalProhibitShardOperationRetry,
76                               bool,
77                               internalProhibitShardOperationRetryByDefault);
78 
79 /**
80  * Returns a new BSONObj describing the same command and arguments as 'cmdObj', but with maxTimeMS
81  * replaced by maxTimeMSOverride (or removed if maxTimeMSOverride is Milliseconds::max()).
82  */
appendMaxTimeToCmdObj(Milliseconds maxTimeMSOverride,const BSONObj & cmdObj)83 BSONObj appendMaxTimeToCmdObj(Milliseconds maxTimeMSOverride, const BSONObj& cmdObj) {
84     BSONObjBuilder updatedCmdBuilder;
85 
86     // Remove the user provided maxTimeMS so we can attach the one from the override
87     for (const auto& elem : cmdObj) {
88         if (!str::equals(elem.fieldName(), QueryRequest::cmdOptionMaxTimeMS)) {
89             updatedCmdBuilder.append(elem);
90         }
91     }
92 
93     if (maxTimeMSOverride < Milliseconds::max()) {
94         updatedCmdBuilder.append(QueryRequest::cmdOptionMaxTimeMS,
95                                  durationCount<Milliseconds>(maxTimeMSOverride));
96     }
97 
98     return updatedCmdBuilder.obj();
99 }
100 
101 }  // unnamed namespace
102 
ShardRemote(const ShardId & id,const ConnectionString & originalConnString,std::unique_ptr<RemoteCommandTargeter> targeter)103 ShardRemote::ShardRemote(const ShardId& id,
104                          const ConnectionString& originalConnString,
105                          std::unique_ptr<RemoteCommandTargeter> targeter)
106     : Shard(id), _originalConnString(originalConnString), _targeter(targeter.release()) {}
107 
108 ShardRemote::~ShardRemote() = default;
109 
isRetriableError(ErrorCodes::Error code,RetryPolicy options)110 bool ShardRemote::isRetriableError(ErrorCodes::Error code, RetryPolicy options) {
111     if (internalProhibitShardOperationRetry.loadRelaxed()) {
112         return false;
113     }
114 
115     if (options == RetryPolicy::kNoRetry) {
116         return false;
117     }
118 
119     const auto& retriableErrors = options == RetryPolicy::kIdempotent
120         ? RemoteCommandRetryScheduler::kAllRetriableErrors
121         : RemoteCommandRetryScheduler::kNotMasterErrors;
122     return std::find(retriableErrors.begin(), retriableErrors.end(), code) != retriableErrors.end();
123 }
124 
getConnString() const125 const ConnectionString ShardRemote::getConnString() const {
126     return _targeter->connectionString();
127 }
128 
129 // Any error code changes should possibly also be made to Shard::shouldErrorBePropagated!
updateReplSetMonitor(const HostAndPort & remoteHost,const Status & remoteCommandStatus)130 void ShardRemote::updateReplSetMonitor(const HostAndPort& remoteHost,
131                                        const Status& remoteCommandStatus) {
132     if (remoteCommandStatus.isOK())
133         return;
134 
135     if (ErrorCodes::isNotMasterError(remoteCommandStatus.code())) {
136         _targeter->markHostNotMaster(remoteHost, remoteCommandStatus);
137     } else if (ErrorCodes::isNetworkError(remoteCommandStatus.code())) {
138         _targeter->markHostUnreachable(remoteHost, remoteCommandStatus);
139     } else if (remoteCommandStatus == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
140         _targeter->markHostUnreachable(remoteHost, remoteCommandStatus);
141     }
142 }
143 
toString() const144 std::string ShardRemote::toString() const {
145     return getId().toString() + ":" + _originalConnString.toString();
146 }
147 
_appendMetadataForCommand(OperationContext * opCtx,const ReadPreferenceSetting & readPref)148 BSONObj ShardRemote::_appendMetadataForCommand(OperationContext* opCtx,
149                                                const ReadPreferenceSetting& readPref) {
150     BSONObjBuilder builder;
151     if (logger::globalLogDomain()->shouldLog(
152             logger::LogComponent::kTracking,
153             logger::LogSeverity::Debug(1))) {  // avoid performance overhead if not logging
154         if (!TrackingMetadata::get(opCtx).getIsLogged()) {
155             if (!TrackingMetadata::get(opCtx).getOperId()) {
156                 TrackingMetadata::get(opCtx).initWithOperName("NotSet");
157             }
158             MONGO_LOG_COMPONENT(1, logger::LogComponent::kTracking)
159                 << TrackingMetadata::get(opCtx).toString();
160             TrackingMetadata::get(opCtx).setIsLogged(true);
161         }
162 
163         TrackingMetadata metadata = TrackingMetadata::get(opCtx).constructChildMetadata();
164         metadata.writeToMetadata(&builder);
165     }
166 
167     readPref.toContainingBSON(&builder);
168 
169     if (isConfig())
170         builder.appendElements(kReplMetadata);
171 
172     return builder.obj();
173 }
174 
_runCommand(OperationContext * opCtx,const ReadPreferenceSetting & readPref,const string & dbName,Milliseconds maxTimeMSOverride,const BSONObj & cmdObj)175 StatusWith<Shard::CommandResponse> ShardRemote::_runCommand(OperationContext* opCtx,
176                                                             const ReadPreferenceSetting& readPref,
177                                                             const string& dbName,
178                                                             Milliseconds maxTimeMSOverride,
179                                                             const BSONObj& cmdObj) {
180 
181     ReadPreferenceSetting readPrefWithMinOpTime(readPref);
182     if (getId() == "config") {
183         readPrefWithMinOpTime.minOpTime = grid.configOpTime();
184     }
185     const auto swHost = _targeter->findHost(opCtx, readPrefWithMinOpTime);
186     if (!swHost.isOK()) {
187         return swHost.getStatus();
188     }
189     const auto host = std::move(swHost.getValue());
190 
191     const Milliseconds requestTimeout =
192         std::min(opCtx->getRemainingMaxTimeMillis(), maxTimeMSOverride);
193 
194     const RemoteCommandRequest request(
195         host,
196         dbName,
197         appendMaxTimeToCmdObj(requestTimeout, cmdObj),
198         _appendMetadataForCommand(opCtx, readPrefWithMinOpTime),
199         opCtx,
200         requestTimeout < Milliseconds::max() ? requestTimeout : RemoteCommandRequest::kNoTimeout);
201 
202     RemoteCommandResponse response =
203         Status(ErrorCodes::InternalError,
204                str::stream() << "Failed to run remote command request " << request.toString());
205 
206     TaskExecutor* executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
207     auto swCallbackHandle = executor->scheduleRemoteCommand(
208         request, [&response](const RemoteCommandCallbackArgs& args) { response = args.response; });
209     if (!swCallbackHandle.isOK()) {
210         return swCallbackHandle.getStatus();
211     }
212 
213     // Block until the command is carried out
214     executor->wait(swCallbackHandle.getValue());
215 
216     updateReplSetMonitor(host, response.status);
217 
218     if (!response.status.isOK()) {
219         if (ErrorCodes::isExceededTimeLimitError(response.status.code())) {
220             LOG(0) << "Operation timed out with status " << redact(response.status);
221         }
222         return response.status;
223     }
224 
225     auto result = response.data.getOwned();
226     auto commandStatus = getStatusFromCommandResult(result);
227     auto writeConcernStatus = getWriteConcernStatusFromCommandResult(result);
228 
229     updateReplSetMonitor(host, commandStatus);
230     updateReplSetMonitor(host, writeConcernStatus);
231 
232     return Shard::CommandResponse(std::move(host),
233                                   std::move(result),
234                                   response.metadata.getOwned(),
235                                   std::move(commandStatus),
236                                   std::move(writeConcernStatus));
237 }
238 
_exhaustiveFindOnConfig(OperationContext * opCtx,const ReadPreferenceSetting & readPref,const repl::ReadConcernLevel & readConcernLevel,const NamespaceString & nss,const BSONObj & query,const BSONObj & sort,boost::optional<long long> limit)239 StatusWith<Shard::QueryResponse> ShardRemote::_exhaustiveFindOnConfig(
240     OperationContext* opCtx,
241     const ReadPreferenceSetting& readPref,
242     const repl::ReadConcernLevel& readConcernLevel,
243     const NamespaceString& nss,
244     const BSONObj& query,
245     const BSONObj& sort,
246     boost::optional<long long> limit) {
247     invariant(getId() == "config");
248     ReadPreferenceSetting readPrefWithMinOpTime(readPref);
249     readPrefWithMinOpTime.minOpTime = grid.configOpTime();
250 
251     const auto host = _targeter->findHost(opCtx, readPrefWithMinOpTime);
252     if (!host.isOK()) {
253         return host.getStatus();
254     }
255 
256     QueryResponse response;
257 
258     // If for some reason the callback never gets invoked, we will return this status in response.
259     Status status = Status(ErrorCodes::InternalError, "Internal error running find command");
260 
261     auto fetcherCallback =
262         [this, &status, &response](const Fetcher::QueryResponseStatus& dataStatus,
263                                    Fetcher::NextAction* nextAction,
264                                    BSONObjBuilder* getMoreBob) {
265 
266             // Throw out any accumulated results on error
267             if (!dataStatus.isOK()) {
268                 status = dataStatus.getStatus();
269                 response.docs.clear();
270                 return;
271             }
272 
273             const auto& data = dataStatus.getValue();
274 
275             if (data.otherFields.metadata.hasField(rpc::kReplSetMetadataFieldName)) {
276                 auto replParseStatus =
277                     rpc::ReplSetMetadata::readFromMetadata(data.otherFields.metadata);
278                 if (!replParseStatus.isOK()) {
279                     status = replParseStatus.getStatus();
280                     response.docs.clear();
281                     return;
282                 }
283 
284                 const auto& replSetMetadata = replParseStatus.getValue();
285                 response.opTime = replSetMetadata.getLastOpCommitted();
286             }
287 
288             for (const BSONObj& doc : data.documents) {
289                 response.docs.push_back(doc.getOwned());
290             }
291 
292             status = Status::OK();
293 
294             if (!getMoreBob) {
295                 return;
296             }
297             getMoreBob->append("getMore", data.cursorId);
298             getMoreBob->append("collection", data.nss.coll());
299         };
300 
301     BSONObj readConcernObj;
302     {
303         invariant(readConcernLevel == repl::ReadConcernLevel::kMajorityReadConcern);
304         const repl::ReadConcernArgs readConcern{grid.configOpTime(), readConcernLevel};
305         BSONObjBuilder bob;
306         readConcern.appendInfo(&bob);
307         readConcernObj =
308             bob.done().getObjectField(repl::ReadConcernArgs::kReadConcernFieldName).getOwned();
309     }
310 
311     const Milliseconds maxTimeMS =
312         std::min(opCtx->getRemainingMaxTimeMillis(), kDefaultConfigCommandTimeout);
313 
314     BSONObjBuilder findCmdBuilder;
315 
316     {
317         QueryRequest qr(nss);
318         qr.setFilter(query);
319         qr.setSort(sort);
320         qr.setReadConcern(readConcernObj);
321         qr.setLimit(limit);
322 
323         if (maxTimeMS < Milliseconds::max()) {
324             qr.setMaxTimeMS(durationCount<Milliseconds>(maxTimeMS));
325         }
326 
327         qr.asFindCommand(&findCmdBuilder);
328     }
329 
330     Fetcher fetcher(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
331                     host.getValue(),
332                     nss.db().toString(),
333                     findCmdBuilder.done(),
334                     fetcherCallback,
335                     _appendMetadataForCommand(opCtx, readPrefWithMinOpTime),
336                     maxTimeMS /* find network timeout */,
337                     maxTimeMS /* getMore network timeout */);
338     Status scheduleStatus = fetcher.schedule();
339     if (!scheduleStatus.isOK()) {
340         return scheduleStatus;
341     }
342 
343     fetcher.join();
344 
345     updateReplSetMonitor(host.getValue(), status);
346 
347     if (!status.isOK()) {
348         if (ErrorCodes::isExceededTimeLimitError(status.code())) {
349             LOG(0) << "Operation timed out " << causedBy(status);
350         }
351         return status;
352     }
353 
354     return response;
355 }
356 
createIndexOnConfig(OperationContext * opCtx,const NamespaceString & ns,const BSONObj & keys,bool unique)357 Status ShardRemote::createIndexOnConfig(OperationContext* opCtx,
358                                         const NamespaceString& ns,
359                                         const BSONObj& keys,
360                                         bool unique) {
361     MONGO_UNREACHABLE;
362 }
363 
364 }  // namespace mongo
365