1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor
32 
33 #include "mongo/platform/basic.h"
34 
35 #include <algorithm>
36 #include <vector>
37 
38 #include "mongo/client/remote_command_retry_scheduler.h"
39 #include "mongo/stdx/memory.h"
40 #include "mongo/util/assert_util.h"
41 #include "mongo/util/destructor_guard.h"
42 #include "mongo/util/log.h"
43 
44 namespace mongo {
45 
46 namespace {
47 
48 class RetryPolicyImpl : public RemoteCommandRetryScheduler::RetryPolicy {
49 public:
50     RetryPolicyImpl(std::size_t maximumAttempts,
51                     Milliseconds maximumResponseElapsedTotal,
52                     const std::initializer_list<ErrorCodes::Error>& retryableErrors);
53     std::size_t getMaximumAttempts() const override;
54     Milliseconds getMaximumResponseElapsedTotal() const override;
55     bool shouldRetryOnError(ErrorCodes::Error error) const override;
56     std::string toString() const override;
57 
58 private:
59     std::size_t _maximumAttempts;
60     Milliseconds _maximumResponseElapsedTotal;
61     std::vector<ErrorCodes::Error> _retryableErrors;
62 };
63 
RetryPolicyImpl(std::size_t maximumAttempts,Milliseconds maximumResponseElapsedTotal,const std::initializer_list<ErrorCodes::Error> & retryableErrors)64 RetryPolicyImpl::RetryPolicyImpl(std::size_t maximumAttempts,
65                                  Milliseconds maximumResponseElapsedTotal,
66                                  const std::initializer_list<ErrorCodes::Error>& retryableErrors)
67     : _maximumAttempts(maximumAttempts),
68       _maximumResponseElapsedTotal(maximumResponseElapsedTotal),
69       _retryableErrors(retryableErrors) {
70     std::sort(_retryableErrors.begin(), _retryableErrors.end());
71 }
72 
toString() const73 std::string RetryPolicyImpl::toString() const {
74     str::stream output;
75     output << "RetryPolicyImpl";
76     output << " maxAttempts: " << _maximumAttempts;
77     output << " maxTimeMillis: " << _maximumResponseElapsedTotal;
78 
79     if (_retryableErrors.size() > 0) {
80         output << "Retryable Errors: ";
81         for (auto error : _retryableErrors) {
82             output << error;
83         }
84     }
85     return output;
86 }
87 
getMaximumAttempts() const88 std::size_t RetryPolicyImpl::getMaximumAttempts() const {
89     return _maximumAttempts;
90 }
91 
getMaximumResponseElapsedTotal() const92 Milliseconds RetryPolicyImpl::getMaximumResponseElapsedTotal() const {
93     return _maximumResponseElapsedTotal;
94 }
95 
shouldRetryOnError(ErrorCodes::Error error) const96 bool RetryPolicyImpl::shouldRetryOnError(ErrorCodes::Error error) const {
97     return std::binary_search(_retryableErrors.cbegin(), _retryableErrors.cend(), error);
98 }
99 
100 }  // namespace
101 
102 const std::initializer_list<ErrorCodes::Error> RemoteCommandRetryScheduler::kNotMasterErrors{
103     ErrorCodes::NotMaster, ErrorCodes::NotMasterNoSlaveOk, ErrorCodes::NotMasterOrSecondary};
104 
105 const std::initializer_list<ErrorCodes::Error> RemoteCommandRetryScheduler::kAllRetriableErrors{
106     ErrorCodes::NotMaster,
107     ErrorCodes::NotMasterNoSlaveOk,
108     ErrorCodes::NotMasterOrSecondary,
109     // If write concern failed to be satisfied on the remote server, this most probably means that
110     // some of the secondary nodes were unreachable or otherwise unresponsive, so the call is safe
111     // to be retried if idempotency can be guaranteed.
112     ErrorCodes::WriteConcernFailed,
113     ErrorCodes::HostUnreachable,
114     ErrorCodes::HostNotFound,
115     ErrorCodes::NetworkTimeout,
116     ErrorCodes::PrimarySteppedDown,
117     ErrorCodes::InterruptedDueToReplStateChange,
118     ErrorCodes::BalancerInterrupted};
119 
120 std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy>
makeNoRetryPolicy()121 RemoteCommandRetryScheduler::makeNoRetryPolicy() {
122     return makeRetryPolicy(1U, executor::RemoteCommandRequest::kNoTimeout, {});
123 }
124 
125 std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy>
makeRetryPolicy(std::size_t maxAttempts,Milliseconds maxResponseElapsedTotal,const std::initializer_list<ErrorCodes::Error> & retryableErrors)126 RemoteCommandRetryScheduler::makeRetryPolicy(
127     std::size_t maxAttempts,
128     Milliseconds maxResponseElapsedTotal,
129     const std::initializer_list<ErrorCodes::Error>& retryableErrors) {
130     std::unique_ptr<RetryPolicy> policy =
131         stdx::make_unique<RetryPolicyImpl>(maxAttempts, maxResponseElapsedTotal, retryableErrors);
132     return policy;
133 }
134 
RemoteCommandRetryScheduler(executor::TaskExecutor * executor,const executor::RemoteCommandRequest & request,const executor::TaskExecutor::RemoteCommandCallbackFn & callback,std::unique_ptr<RetryPolicy> retryPolicy)135 RemoteCommandRetryScheduler::RemoteCommandRetryScheduler(
136     executor::TaskExecutor* executor,
137     const executor::RemoteCommandRequest& request,
138     const executor::TaskExecutor::RemoteCommandCallbackFn& callback,
139     std::unique_ptr<RetryPolicy> retryPolicy)
140     : _executor(executor),
141       _request(request),
142       _callback(callback),
143       _retryPolicy(std::move(retryPolicy)) {
144     uassert(ErrorCodes::BadValue, "task executor cannot be null", executor);
145     uassert(ErrorCodes::BadValue,
146             "source in remote command request cannot be empty",
147             !request.target.empty());
148     uassert(ErrorCodes::BadValue,
149             "database name in remote command request cannot be empty",
150             !request.dbname.empty());
151     uassert(ErrorCodes::BadValue,
152             "command object in remote command request cannot be empty",
153             !request.cmdObj.isEmpty());
154     uassert(ErrorCodes::BadValue, "remote command callback function cannot be null", callback);
155     uassert(ErrorCodes::BadValue, "retry policy cannot be null", _retryPolicy);
156     uassert(ErrorCodes::BadValue,
157             "policy max attempts cannot be zero",
158             _retryPolicy->getMaximumAttempts() != 0);
159     uassert(ErrorCodes::BadValue,
160             "policy max response elapsed total cannot be negative",
161             !(_retryPolicy->getMaximumResponseElapsedTotal() !=
162                   executor::RemoteCommandRequest::kNoTimeout &&
163               _retryPolicy->getMaximumResponseElapsedTotal() < Milliseconds(0)));
164 }
165 
~RemoteCommandRetryScheduler()166 RemoteCommandRetryScheduler::~RemoteCommandRetryScheduler() {
167     DESTRUCTOR_GUARD(shutdown(); join(););
168 }
169 
isActive() const170 bool RemoteCommandRetryScheduler::isActive() const {
171     stdx::lock_guard<stdx::mutex> lock(_mutex);
172     return _isActive_inlock();
173 }
174 
_isActive_inlock() const175 bool RemoteCommandRetryScheduler::_isActive_inlock() const {
176     return State::kRunning == _state || State::kShuttingDown == _state;
177 }
178 
startup()179 Status RemoteCommandRetryScheduler::startup() {
180     stdx::lock_guard<stdx::mutex> lock(_mutex);
181 
182     switch (_state) {
183         case State::kPreStart:
184             _state = State::kRunning;
185             break;
186         case State::kRunning:
187             return Status(ErrorCodes::IllegalOperation, "scheduler already started");
188         case State::kShuttingDown:
189             return Status(ErrorCodes::ShutdownInProgress, "scheduler shutting down");
190         case State::kComplete:
191             return Status(ErrorCodes::ShutdownInProgress, "scheduler completed");
192     }
193 
194     auto scheduleStatus = _schedule_inlock();
195     if (!scheduleStatus.isOK()) {
196         _state = State::kComplete;
197         return scheduleStatus;
198     }
199 
200     return Status::OK();
201 }
202 
shutdown()203 void RemoteCommandRetryScheduler::shutdown() {
204     executor::TaskExecutor::CallbackHandle remoteCommandCallbackHandle;
205     {
206         stdx::lock_guard<stdx::mutex> lock(_mutex);
207         switch (_state) {
208             case State::kPreStart:
209                 // Transition directly from PreStart to Complete if not started yet.
210                 _state = State::kComplete;
211                 return;
212             case State::kRunning:
213                 _state = State::kShuttingDown;
214                 break;
215             case State::kShuttingDown:
216             case State::kComplete:
217                 // Nothing to do if we are already in ShuttingDown or Complete state.
218                 return;
219         }
220 
221         remoteCommandCallbackHandle = _remoteCommandCallbackHandle;
222     }
223 
224     invariant(remoteCommandCallbackHandle.isValid());
225     _executor->cancel(remoteCommandCallbackHandle);
226 }
227 
join()228 void RemoteCommandRetryScheduler::join() {
229     stdx::unique_lock<stdx::mutex> lock(_mutex);
230     _condition.wait(lock, [this]() { return !_isActive_inlock(); });
231 }
232 
toString() const233 std::string RemoteCommandRetryScheduler::toString() const {
234     stdx::lock_guard<stdx::mutex> lock(_mutex);
235     str::stream output;
236     output << "RemoteCommandRetryScheduler";
237     output << " request: " << _request.toString();
238     output << " active: " << _isActive_inlock();
239     if (_remoteCommandCallbackHandle.isValid()) {
240         output << " callbackHandle.valid: " << _remoteCommandCallbackHandle.isValid();
241         output << " callbackHandle.cancelled: " << _remoteCommandCallbackHandle.isCanceled();
242     }
243     output << " attempt: " << _currentAttempt;
244     output << " retryPolicy: " << _retryPolicy->toString();
245     return output;
246 }
247 
_schedule_inlock()248 Status RemoteCommandRetryScheduler::_schedule_inlock() {
249     ++_currentAttempt;
250     auto scheduleResult = _executor->scheduleRemoteCommand(
251         _request,
252         stdx::bind(
253             &RemoteCommandRetryScheduler::_remoteCommandCallback, this, stdx::placeholders::_1));
254 
255     if (!scheduleResult.isOK()) {
256         return scheduleResult.getStatus();
257     }
258 
259     _remoteCommandCallbackHandle = scheduleResult.getValue();
260     return Status::OK();
261 }
262 
_remoteCommandCallback(const executor::TaskExecutor::RemoteCommandCallbackArgs & rcba)263 void RemoteCommandRetryScheduler::_remoteCommandCallback(
264     const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba) {
265     const auto& status = rcba.response.status;
266 
267     // Use a lambda to avoid unnecessary lock acquisition when checking conditions for termination.
268     auto getCurrentAttempt = [this]() {
269         stdx::lock_guard<stdx::mutex> lock(_mutex);
270         return _currentAttempt;
271     };
272 
273     if (status.isOK() || status == ErrorCodes::CallbackCanceled ||
274         !_retryPolicy->shouldRetryOnError(status.code()) ||
275         getCurrentAttempt() == _retryPolicy->getMaximumAttempts()) {
276         _onComplete(rcba);
277         return;
278     }
279 
280     // TODO(benety): Check cumulative elapsed time of failed responses received against retry
281     // policy. Requires SERVER-24067.
282     auto scheduleStatus = [this]() {
283         stdx::lock_guard<stdx::mutex> lock(_mutex);
284         if (State::kShuttingDown == _state) {
285             return Status(ErrorCodes::CallbackCanceled,
286                           "scheduler was shut down before retrying command");
287         }
288         return _schedule_inlock();
289     }();
290 
291     if (!scheduleStatus.isOK()) {
292         _onComplete({rcba.executor, rcba.myHandle, rcba.request, scheduleStatus});
293         return;
294     }
295 }
296 
_onComplete(const executor::TaskExecutor::RemoteCommandCallbackArgs & rcba)297 void RemoteCommandRetryScheduler::_onComplete(
298     const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba) {
299 
300     invariant(_callback);
301     _callback(rcba);
302 
303     // This will release the resources held by the '_callback' function object. To avoid any issues
304     // with destruction logic in the function object's resources accessing this
305     // RemoteCommandRetryScheduler, we release this function object outside the lock.
306     _callback = {};
307 
308     stdx::lock_guard<stdx::mutex> lock(_mutex);
309     invariant(_isActive_inlock());
310     _state = State::kComplete;
311     _condition.notify_all();
312 }
313 
314 }  // namespace mongo
315