1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor
32
33 #include "mongo/platform/basic.h"
34
35 #include <algorithm>
36 #include <vector>
37
38 #include "mongo/client/remote_command_retry_scheduler.h"
39 #include "mongo/stdx/memory.h"
40 #include "mongo/util/assert_util.h"
41 #include "mongo/util/destructor_guard.h"
42 #include "mongo/util/log.h"
43
44 namespace mongo {
45
46 namespace {
47
48 class RetryPolicyImpl : public RemoteCommandRetryScheduler::RetryPolicy {
49 public:
50 RetryPolicyImpl(std::size_t maximumAttempts,
51 Milliseconds maximumResponseElapsedTotal,
52 const std::initializer_list<ErrorCodes::Error>& retryableErrors);
53 std::size_t getMaximumAttempts() const override;
54 Milliseconds getMaximumResponseElapsedTotal() const override;
55 bool shouldRetryOnError(ErrorCodes::Error error) const override;
56 std::string toString() const override;
57
58 private:
59 std::size_t _maximumAttempts;
60 Milliseconds _maximumResponseElapsedTotal;
61 std::vector<ErrorCodes::Error> _retryableErrors;
62 };
63
RetryPolicyImpl(std::size_t maximumAttempts,Milliseconds maximumResponseElapsedTotal,const std::initializer_list<ErrorCodes::Error> & retryableErrors)64 RetryPolicyImpl::RetryPolicyImpl(std::size_t maximumAttempts,
65 Milliseconds maximumResponseElapsedTotal,
66 const std::initializer_list<ErrorCodes::Error>& retryableErrors)
67 : _maximumAttempts(maximumAttempts),
68 _maximumResponseElapsedTotal(maximumResponseElapsedTotal),
69 _retryableErrors(retryableErrors) {
70 std::sort(_retryableErrors.begin(), _retryableErrors.end());
71 }
72
toString() const73 std::string RetryPolicyImpl::toString() const {
74 str::stream output;
75 output << "RetryPolicyImpl";
76 output << " maxAttempts: " << _maximumAttempts;
77 output << " maxTimeMillis: " << _maximumResponseElapsedTotal;
78
79 if (_retryableErrors.size() > 0) {
80 output << "Retryable Errors: ";
81 for (auto error : _retryableErrors) {
82 output << error;
83 }
84 }
85 return output;
86 }
87
getMaximumAttempts() const88 std::size_t RetryPolicyImpl::getMaximumAttempts() const {
89 return _maximumAttempts;
90 }
91
getMaximumResponseElapsedTotal() const92 Milliseconds RetryPolicyImpl::getMaximumResponseElapsedTotal() const {
93 return _maximumResponseElapsedTotal;
94 }
95
shouldRetryOnError(ErrorCodes::Error error) const96 bool RetryPolicyImpl::shouldRetryOnError(ErrorCodes::Error error) const {
97 return std::binary_search(_retryableErrors.cbegin(), _retryableErrors.cend(), error);
98 }
99
100 } // namespace
101
102 const std::initializer_list<ErrorCodes::Error> RemoteCommandRetryScheduler::kNotMasterErrors{
103 ErrorCodes::NotMaster, ErrorCodes::NotMasterNoSlaveOk, ErrorCodes::NotMasterOrSecondary};
104
105 const std::initializer_list<ErrorCodes::Error> RemoteCommandRetryScheduler::kAllRetriableErrors{
106 ErrorCodes::NotMaster,
107 ErrorCodes::NotMasterNoSlaveOk,
108 ErrorCodes::NotMasterOrSecondary,
109 // If write concern failed to be satisfied on the remote server, this most probably means that
110 // some of the secondary nodes were unreachable or otherwise unresponsive, so the call is safe
111 // to be retried if idempotency can be guaranteed.
112 ErrorCodes::WriteConcernFailed,
113 ErrorCodes::HostUnreachable,
114 ErrorCodes::HostNotFound,
115 ErrorCodes::NetworkTimeout,
116 ErrorCodes::PrimarySteppedDown,
117 ErrorCodes::InterruptedDueToReplStateChange,
118 ErrorCodes::BalancerInterrupted};
119
120 std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy>
makeNoRetryPolicy()121 RemoteCommandRetryScheduler::makeNoRetryPolicy() {
122 return makeRetryPolicy(1U, executor::RemoteCommandRequest::kNoTimeout, {});
123 }
124
125 std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy>
makeRetryPolicy(std::size_t maxAttempts,Milliseconds maxResponseElapsedTotal,const std::initializer_list<ErrorCodes::Error> & retryableErrors)126 RemoteCommandRetryScheduler::makeRetryPolicy(
127 std::size_t maxAttempts,
128 Milliseconds maxResponseElapsedTotal,
129 const std::initializer_list<ErrorCodes::Error>& retryableErrors) {
130 std::unique_ptr<RetryPolicy> policy =
131 stdx::make_unique<RetryPolicyImpl>(maxAttempts, maxResponseElapsedTotal, retryableErrors);
132 return policy;
133 }
134
RemoteCommandRetryScheduler(executor::TaskExecutor * executor,const executor::RemoteCommandRequest & request,const executor::TaskExecutor::RemoteCommandCallbackFn & callback,std::unique_ptr<RetryPolicy> retryPolicy)135 RemoteCommandRetryScheduler::RemoteCommandRetryScheduler(
136 executor::TaskExecutor* executor,
137 const executor::RemoteCommandRequest& request,
138 const executor::TaskExecutor::RemoteCommandCallbackFn& callback,
139 std::unique_ptr<RetryPolicy> retryPolicy)
140 : _executor(executor),
141 _request(request),
142 _callback(callback),
143 _retryPolicy(std::move(retryPolicy)) {
144 uassert(ErrorCodes::BadValue, "task executor cannot be null", executor);
145 uassert(ErrorCodes::BadValue,
146 "source in remote command request cannot be empty",
147 !request.target.empty());
148 uassert(ErrorCodes::BadValue,
149 "database name in remote command request cannot be empty",
150 !request.dbname.empty());
151 uassert(ErrorCodes::BadValue,
152 "command object in remote command request cannot be empty",
153 !request.cmdObj.isEmpty());
154 uassert(ErrorCodes::BadValue, "remote command callback function cannot be null", callback);
155 uassert(ErrorCodes::BadValue, "retry policy cannot be null", _retryPolicy);
156 uassert(ErrorCodes::BadValue,
157 "policy max attempts cannot be zero",
158 _retryPolicy->getMaximumAttempts() != 0);
159 uassert(ErrorCodes::BadValue,
160 "policy max response elapsed total cannot be negative",
161 !(_retryPolicy->getMaximumResponseElapsedTotal() !=
162 executor::RemoteCommandRequest::kNoTimeout &&
163 _retryPolicy->getMaximumResponseElapsedTotal() < Milliseconds(0)));
164 }
165
~RemoteCommandRetryScheduler()166 RemoteCommandRetryScheduler::~RemoteCommandRetryScheduler() {
167 DESTRUCTOR_GUARD(shutdown(); join(););
168 }
169
isActive() const170 bool RemoteCommandRetryScheduler::isActive() const {
171 stdx::lock_guard<stdx::mutex> lock(_mutex);
172 return _isActive_inlock();
173 }
174
_isActive_inlock() const175 bool RemoteCommandRetryScheduler::_isActive_inlock() const {
176 return State::kRunning == _state || State::kShuttingDown == _state;
177 }
178
startup()179 Status RemoteCommandRetryScheduler::startup() {
180 stdx::lock_guard<stdx::mutex> lock(_mutex);
181
182 switch (_state) {
183 case State::kPreStart:
184 _state = State::kRunning;
185 break;
186 case State::kRunning:
187 return Status(ErrorCodes::IllegalOperation, "scheduler already started");
188 case State::kShuttingDown:
189 return Status(ErrorCodes::ShutdownInProgress, "scheduler shutting down");
190 case State::kComplete:
191 return Status(ErrorCodes::ShutdownInProgress, "scheduler completed");
192 }
193
194 auto scheduleStatus = _schedule_inlock();
195 if (!scheduleStatus.isOK()) {
196 _state = State::kComplete;
197 return scheduleStatus;
198 }
199
200 return Status::OK();
201 }
202
shutdown()203 void RemoteCommandRetryScheduler::shutdown() {
204 executor::TaskExecutor::CallbackHandle remoteCommandCallbackHandle;
205 {
206 stdx::lock_guard<stdx::mutex> lock(_mutex);
207 switch (_state) {
208 case State::kPreStart:
209 // Transition directly from PreStart to Complete if not started yet.
210 _state = State::kComplete;
211 return;
212 case State::kRunning:
213 _state = State::kShuttingDown;
214 break;
215 case State::kShuttingDown:
216 case State::kComplete:
217 // Nothing to do if we are already in ShuttingDown or Complete state.
218 return;
219 }
220
221 remoteCommandCallbackHandle = _remoteCommandCallbackHandle;
222 }
223
224 invariant(remoteCommandCallbackHandle.isValid());
225 _executor->cancel(remoteCommandCallbackHandle);
226 }
227
join()228 void RemoteCommandRetryScheduler::join() {
229 stdx::unique_lock<stdx::mutex> lock(_mutex);
230 _condition.wait(lock, [this]() { return !_isActive_inlock(); });
231 }
232
toString() const233 std::string RemoteCommandRetryScheduler::toString() const {
234 stdx::lock_guard<stdx::mutex> lock(_mutex);
235 str::stream output;
236 output << "RemoteCommandRetryScheduler";
237 output << " request: " << _request.toString();
238 output << " active: " << _isActive_inlock();
239 if (_remoteCommandCallbackHandle.isValid()) {
240 output << " callbackHandle.valid: " << _remoteCommandCallbackHandle.isValid();
241 output << " callbackHandle.cancelled: " << _remoteCommandCallbackHandle.isCanceled();
242 }
243 output << " attempt: " << _currentAttempt;
244 output << " retryPolicy: " << _retryPolicy->toString();
245 return output;
246 }
247
_schedule_inlock()248 Status RemoteCommandRetryScheduler::_schedule_inlock() {
249 ++_currentAttempt;
250 auto scheduleResult = _executor->scheduleRemoteCommand(
251 _request,
252 stdx::bind(
253 &RemoteCommandRetryScheduler::_remoteCommandCallback, this, stdx::placeholders::_1));
254
255 if (!scheduleResult.isOK()) {
256 return scheduleResult.getStatus();
257 }
258
259 _remoteCommandCallbackHandle = scheduleResult.getValue();
260 return Status::OK();
261 }
262
_remoteCommandCallback(const executor::TaskExecutor::RemoteCommandCallbackArgs & rcba)263 void RemoteCommandRetryScheduler::_remoteCommandCallback(
264 const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba) {
265 const auto& status = rcba.response.status;
266
267 // Use a lambda to avoid unnecessary lock acquisition when checking conditions for termination.
268 auto getCurrentAttempt = [this]() {
269 stdx::lock_guard<stdx::mutex> lock(_mutex);
270 return _currentAttempt;
271 };
272
273 if (status.isOK() || status == ErrorCodes::CallbackCanceled ||
274 !_retryPolicy->shouldRetryOnError(status.code()) ||
275 getCurrentAttempt() == _retryPolicy->getMaximumAttempts()) {
276 _onComplete(rcba);
277 return;
278 }
279
280 // TODO(benety): Check cumulative elapsed time of failed responses received against retry
281 // policy. Requires SERVER-24067.
282 auto scheduleStatus = [this]() {
283 stdx::lock_guard<stdx::mutex> lock(_mutex);
284 if (State::kShuttingDown == _state) {
285 return Status(ErrorCodes::CallbackCanceled,
286 "scheduler was shut down before retrying command");
287 }
288 return _schedule_inlock();
289 }();
290
291 if (!scheduleStatus.isOK()) {
292 _onComplete({rcba.executor, rcba.myHandle, rcba.request, scheduleStatus});
293 return;
294 }
295 }
296
_onComplete(const executor::TaskExecutor::RemoteCommandCallbackArgs & rcba)297 void RemoteCommandRetryScheduler::_onComplete(
298 const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba) {
299
300 invariant(_callback);
301 _callback(rcba);
302
303 // This will release the resources held by the '_callback' function object. To avoid any issues
304 // with destruction logic in the function object's resources accessing this
305 // RemoteCommandRetryScheduler, we release this function object outside the lock.
306 _callback = {};
307
308 stdx::lock_guard<stdx::mutex> lock(_mutex);
309 invariant(_isActive_inlock());
310 _state = State::kComplete;
311 _condition.notify_all();
312 }
313
314 } // namespace mongo
315