1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #include "mongo/platform/basic.h"
32 
33 #include <memory>
34 #include <string>
35 #include <vector>
36 
37 #include "mongo/base/disallow_copying.h"
38 #include "mongo/base/status_with.h"
39 #include "mongo/client/remote_command_retry_scheduler.h"
40 #include "mongo/db/jsobj.h"
41 #include "mongo/executor/remote_command_response.h"
42 #include "mongo/executor/task_executor.h"
43 #include "mongo/executor/thread_pool_task_executor_test_fixture.h"
44 #include "mongo/stdx/memory.h"
45 #include "mongo/unittest/task_executor_proxy.h"
46 #include "mongo/unittest/unittest.h"
47 #include "mongo/util/assert_util.h"
48 #include "mongo/util/net/hostandport.h"
49 
50 namespace {
51 
52 using namespace mongo;
53 using ResponseStatus = executor::TaskExecutor::ResponseStatus;
54 
55 class CallbackResponseSaver;
56 
57 class RemoteCommandRetrySchedulerTest : public executor::ThreadPoolExecutorTest {
58 public:
59     void start(RemoteCommandRetryScheduler* scheduler);
60     void checkCompletionStatus(RemoteCommandRetryScheduler* scheduler,
61                                const CallbackResponseSaver& callbackResponseSaver,
62                                const ResponseStatus& response);
63     void processNetworkResponse(const ResponseStatus& response);
64     void runReadyNetworkOperations();
65 
66 protected:
67     void setUp() override;
68 };
69 
70 class CallbackResponseSaver {
71     MONGO_DISALLOW_COPYING(CallbackResponseSaver);
72 
73 public:
74     CallbackResponseSaver();
75 
76     /**
77      * Use this for scheduler callback.
78      */
79     void operator()(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba);
80 
81     std::vector<ResponseStatus> getResponses() const;
82 
83 private:
84     std::vector<ResponseStatus> _responses;
85 };
86 
87 /**
88  * Task executor proxy with fail point for scheduleRemoteCommand().
89  */
90 class TaskExecutorWithFailureInScheduleRemoteCommand : public unittest::TaskExecutorProxy {
91 public:
TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor * executor)92     TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor* executor)
93         : unittest::TaskExecutorProxy(executor) {}
scheduleRemoteCommand(const executor::RemoteCommandRequest & request,const RemoteCommandCallbackFn & cb)94     virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleRemoteCommand(
95         const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) override {
96         if (scheduleRemoteCommandFailPoint) {
97             return Status(ErrorCodes::ShutdownInProgress,
98                           "failed to send remote command - shutdown in progress");
99         }
100         return getExecutor()->scheduleRemoteCommand(request, cb);
101     }
102 
103     bool scheduleRemoteCommandFailPoint = false;
104 };
105 
start(RemoteCommandRetryScheduler * scheduler)106 void RemoteCommandRetrySchedulerTest::start(RemoteCommandRetryScheduler* scheduler) {
107     ASSERT_FALSE(scheduler->isActive());
108 
109     ASSERT_OK(scheduler->startup());
110     ASSERT_TRUE(scheduler->isActive());
111 
112     // Starting an already active scheduler should fail.
113     ASSERT_EQUALS(ErrorCodes::IllegalOperation, scheduler->startup());
114     ASSERT_TRUE(scheduler->isActive());
115 
116     auto net = getNet();
117     executor::NetworkInterfaceMock::InNetworkGuard guard(net);
118     ASSERT_TRUE(net->hasReadyRequests());
119 }
120 
checkCompletionStatus(RemoteCommandRetryScheduler * scheduler,const CallbackResponseSaver & callbackResponseSaver,const ResponseStatus & response)121 void RemoteCommandRetrySchedulerTest::checkCompletionStatus(
122     RemoteCommandRetryScheduler* scheduler,
123     const CallbackResponseSaver& callbackResponseSaver,
124     const ResponseStatus& response) {
125     ASSERT_FALSE(scheduler->isActive());
126     auto responses = callbackResponseSaver.getResponses();
127     ASSERT_EQUALS(1U, responses.size());
128     if (response.isOK()) {
129         ASSERT_OK(responses.front().status);
130         ASSERT_EQUALS(response, responses.front());
131     } else {
132         ASSERT_EQUALS(response.status, responses.front().status);
133     }
134 }
135 
processNetworkResponse(const ResponseStatus & response)136 void RemoteCommandRetrySchedulerTest::processNetworkResponse(const ResponseStatus& response) {
137     auto net = getNet();
138     executor::NetworkInterfaceMock::InNetworkGuard guard(net);
139     ASSERT_TRUE(net->hasReadyRequests());
140     auto noi = net->getNextReadyRequest();
141     net->scheduleResponse(noi, net->now(), response);
142     net->runReadyNetworkOperations();
143 }
144 
runReadyNetworkOperations()145 void RemoteCommandRetrySchedulerTest::runReadyNetworkOperations() {
146     auto net = getNet();
147     executor::NetworkInterfaceMock::InNetworkGuard guard(net);
148     net->runReadyNetworkOperations();
149 }
150 
setUp()151 void RemoteCommandRetrySchedulerTest::setUp() {
152     executor::ThreadPoolExecutorTest::setUp();
153     launchExecutorThread();
154 }
155 
156 CallbackResponseSaver::CallbackResponseSaver() = default;
157 
operator ()(const executor::TaskExecutor::RemoteCommandCallbackArgs & rcba)158 void CallbackResponseSaver::operator()(
159     const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba) {
160     _responses.push_back(rcba.response);
161 }
162 
getResponses() const163 std::vector<ResponseStatus> CallbackResponseSaver::getResponses() const {
164     return _responses;
165 }
166 
167 const executor::RemoteCommandRequest request(HostAndPort("h1:12345"),
168                                              "db1",
169                                              BSON("ping" << 1),
170                                              nullptr);
171 
TEST_F(RemoteCommandRetrySchedulerTest,MakeSingleShotRetryPolicy)172 TEST_F(RemoteCommandRetrySchedulerTest, MakeSingleShotRetryPolicy) {
173     auto policy = RemoteCommandRetryScheduler::makeNoRetryPolicy();
174     ASSERT_TRUE(policy);
175     ASSERT_EQUALS(1U, policy->getMaximumAttempts());
176     ASSERT_EQUALS(executor::RemoteCommandRequest::kNoTimeout,
177                   policy->getMaximumResponseElapsedTotal());
178     // Doesn't matter what "shouldRetryOnError()" returns since we won't be retrying the remote
179     // command.
180     for (int i = 0; i < int(ErrorCodes::MaxError); ++i) {
181         auto error = ErrorCodes::Error(i);
182         ASSERT_FALSE(policy->shouldRetryOnError(error));
183     }
184 }
185 
TEST_F(RemoteCommandRetrySchedulerTest,MakeRetryPolicy)186 TEST_F(RemoteCommandRetrySchedulerTest, MakeRetryPolicy) {
187     auto policy = RemoteCommandRetryScheduler::makeRetryPolicy(
188         5U,
189         Milliseconds(100),
190         {ErrorCodes::FailedToParse, ErrorCodes::InvalidNamespace, ErrorCodes::InternalError});
191     ASSERT_EQUALS(5U, policy->getMaximumAttempts());
192     ASSERT_EQUALS(Milliseconds(100), policy->getMaximumResponseElapsedTotal());
193     for (int i = 0; i < int(ErrorCodes::MaxError); ++i) {
194         auto error = ErrorCodes::Error(i);
195         if (error == ErrorCodes::InternalError || error == ErrorCodes::FailedToParse ||
196             error == ErrorCodes::InvalidNamespace) {
197             ASSERT_TRUE(policy->shouldRetryOnError(error));
198             continue;
199         }
200         ASSERT_FALSE(policy->shouldRetryOnError(error));
201     }
202 }
203 
TEST_F(RemoteCommandRetrySchedulerTest,InvalidConstruction)204 TEST_F(RemoteCommandRetrySchedulerTest, InvalidConstruction) {
205     auto callback = [](const executor::TaskExecutor::RemoteCommandCallbackArgs&) {};
206     auto makeRetryPolicy = [] { return RemoteCommandRetryScheduler::makeNoRetryPolicy(); };
207 
208     // Null executor.
209     ASSERT_THROWS_CODE_AND_WHAT(
210         RemoteCommandRetryScheduler(nullptr, request, callback, makeRetryPolicy()),
211         AssertionException,
212         ErrorCodes::BadValue,
213         "task executor cannot be null");
214 
215     // Empty source in remote command request.
216     ASSERT_THROWS_CODE_AND_WHAT(
217         RemoteCommandRetryScheduler(
218             &getExecutor(),
219             executor::RemoteCommandRequest(HostAndPort(), request.dbname, request.cmdObj, nullptr),
220             callback,
221             makeRetryPolicy()),
222         AssertionException,
223         ErrorCodes::BadValue,
224         "source in remote command request cannot be empty");
225 
226     // Empty source in remote command request.
227     ASSERT_THROWS_CODE_AND_WHAT(
228         RemoteCommandRetryScheduler(
229             &getExecutor(),
230             executor::RemoteCommandRequest(request.target, "", request.cmdObj, nullptr),
231             callback,
232             makeRetryPolicy()),
233         AssertionException,
234         ErrorCodes::BadValue,
235         "database name in remote command request cannot be empty");
236 
237     // Empty command object in remote command request.
238     ASSERT_THROWS_CODE_AND_WHAT(
239         RemoteCommandRetryScheduler(
240             &getExecutor(),
241             executor::RemoteCommandRequest(request.target, request.dbname, BSONObj(), nullptr),
242             callback,
243             makeRetryPolicy()),
244         AssertionException,
245         ErrorCodes::BadValue,
246         "command object in remote command request cannot be empty");
247 
248     // Null remote command callback function.
249     ASSERT_THROWS_CODE_AND_WHAT(
250         RemoteCommandRetryScheduler(&getExecutor(),
251                                     request,
252                                     executor::TaskExecutor::RemoteCommandCallbackFn(),
253                                     makeRetryPolicy()),
254         AssertionException,
255         ErrorCodes::BadValue,
256         "remote command callback function cannot be null");
257 
258     // Null retry policy.
259     ASSERT_THROWS_CODE_AND_WHAT(
260         RemoteCommandRetryScheduler(&getExecutor(),
261                                     request,
262                                     callback,
263                                     std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy>()),
264         AssertionException,
265         ErrorCodes::BadValue,
266         "retry policy cannot be null");
267 
268     // Policy max attempts should be positive.
269     ASSERT_THROWS_CODE_AND_WHAT(
270         RemoteCommandRetryScheduler(
271             &getExecutor(),
272             request,
273             callback,
274             RemoteCommandRetryScheduler::makeRetryPolicy(0, Milliseconds(100), {})),
275         AssertionException,
276         ErrorCodes::BadValue,
277         "policy max attempts cannot be zero");
278 
279     // Policy max response elapsed total cannot be negative.
280     ASSERT_THROWS_CODE_AND_WHAT(
281         RemoteCommandRetryScheduler(
282             &getExecutor(),
283             request,
284             callback,
285             RemoteCommandRetryScheduler::makeRetryPolicy(1U, Milliseconds(-100), {})),
286         AssertionException,
287         ErrorCodes::BadValue,
288         "policy max response elapsed total cannot be negative");
289 }
290 
TEST_F(RemoteCommandRetrySchedulerTest,StartupFailsWhenExecutorIsShutDown)291 TEST_F(RemoteCommandRetrySchedulerTest, StartupFailsWhenExecutorIsShutDown) {
292     auto callback = [](const executor::TaskExecutor::RemoteCommandCallbackArgs&) {};
293     auto policy = RemoteCommandRetryScheduler::makeNoRetryPolicy();
294 
295     RemoteCommandRetryScheduler scheduler(&getExecutor(), request, callback, std::move(policy));
296     ASSERT_FALSE(scheduler.isActive());
297 
298     getExecutor().shutdown();
299 
300     ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, scheduler.startup());
301     ASSERT_FALSE(scheduler.isActive());
302 }
303 
TEST_F(RemoteCommandRetrySchedulerTest,StartupFailsWhenSchedulerIsShutDown)304 TEST_F(RemoteCommandRetrySchedulerTest, StartupFailsWhenSchedulerIsShutDown) {
305     auto callback = [](const executor::TaskExecutor::RemoteCommandCallbackArgs&) {};
306     auto policy = RemoteCommandRetryScheduler::makeNoRetryPolicy();
307 
308     RemoteCommandRetryScheduler scheduler(&getExecutor(), request, callback, std::move(policy));
309     ASSERT_FALSE(scheduler.isActive());
310 
311     scheduler.shutdown();
312 
313     ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, scheduler.startup());
314     ASSERT_FALSE(scheduler.isActive());
315 }
316 
TEST_F(RemoteCommandRetrySchedulerTest,ShuttingDownExecutorAfterSchedulerStartupInvokesCallbackWithCallbackCanceledError)317 TEST_F(RemoteCommandRetrySchedulerTest,
318        ShuttingDownExecutorAfterSchedulerStartupInvokesCallbackWithCallbackCanceledError) {
319     CallbackResponseSaver callback;
320     auto policy = RemoteCommandRetryScheduler::makeRetryPolicy(
321         10U, Milliseconds(1), {ErrorCodes::HostNotFound});
322     RemoteCommandRetryScheduler scheduler(
323         &getExecutor(), request, stdx::ref(callback), std::move(policy));
324     start(&scheduler);
325 
326     auto net = getNet();
327     {
328         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
329         ASSERT_EQUALS(request, net->getNextReadyRequest()->getRequest());
330     }
331 
332     getExecutor().shutdown();
333 
334     runReadyNetworkOperations();
335     checkCompletionStatus(
336         &scheduler, callback, {ErrorCodes::CallbackCanceled, "executor shutdown"});
337 }
338 
TEST_F(RemoteCommandRetrySchedulerTest,ShuttingDownSchedulerAfterSchedulerStartupInvokesCallbackWithCallbackCanceledError)339 TEST_F(RemoteCommandRetrySchedulerTest,
340        ShuttingDownSchedulerAfterSchedulerStartupInvokesCallbackWithCallbackCanceledError) {
341     CallbackResponseSaver callback;
342     auto policy = RemoteCommandRetryScheduler::makeRetryPolicy(
343         10U, Milliseconds(1), {ErrorCodes::HostNotFound});
344     RemoteCommandRetryScheduler scheduler(
345         &getExecutor(), request, stdx::ref(callback), std::move(policy));
346     start(&scheduler);
347 
348     scheduler.shutdown();
349 
350     runReadyNetworkOperations();
351     checkCompletionStatus(
352         &scheduler, callback, {ErrorCodes::CallbackCanceled, "scheduler shutdown"});
353 }
354 
TEST_F(RemoteCommandRetrySchedulerTest,SchedulerInvokesCallbackOnNonRetryableErrorInResponse)355 TEST_F(RemoteCommandRetrySchedulerTest, SchedulerInvokesCallbackOnNonRetryableErrorInResponse) {
356     CallbackResponseSaver callback;
357     auto policy = RemoteCommandRetryScheduler::makeRetryPolicy(
358         10U, Milliseconds(1), RemoteCommandRetryScheduler::kNotMasterErrors);
359     RemoteCommandRetryScheduler scheduler(
360         &getExecutor(), request, stdx::ref(callback), std::move(policy));
361     start(&scheduler);
362 
363     // This should match one of the non-retryable error codes in the policy.
364     ResponseStatus rs(ErrorCodes::OperationFailed, "injected error", Milliseconds(0));
365     processNetworkResponse(rs);
366     checkCompletionStatus(&scheduler, callback, rs);
367 
368     // Scheduler cannot be restarted once it has run to completion.
369     ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, scheduler.startup());
370 }
371 
TEST_F(RemoteCommandRetrySchedulerTest,SchedulerInvokesCallbackOnFirstSuccessfulResponse)372 TEST_F(RemoteCommandRetrySchedulerTest, SchedulerInvokesCallbackOnFirstSuccessfulResponse) {
373     CallbackResponseSaver callback;
374     auto policy = RemoteCommandRetryScheduler::makeRetryPolicy(
375         10U, Milliseconds(1), {ErrorCodes::HostNotFound});
376     RemoteCommandRetryScheduler scheduler(
377         &getExecutor(), request, stdx::ref(callback), std::move(policy));
378     start(&scheduler);
379 
380     // Elapsed time in response is ignored on successful responses.
381     ResponseStatus response(BSON("ok" << 1 << "x" << 123), BSON("z" << 456), Milliseconds(100));
382 
383     processNetworkResponse(response);
384     checkCompletionStatus(&scheduler, callback, response);
385 
386     // Scheduler cannot be restarted once it has run to completion.
387     ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, scheduler.startup());
388     ASSERT_FALSE(scheduler.isActive());
389 }
390 
TEST_F(RemoteCommandRetrySchedulerTest,SchedulerIgnoresEmbeddedErrorInSuccessfulResponse)391 TEST_F(RemoteCommandRetrySchedulerTest, SchedulerIgnoresEmbeddedErrorInSuccessfulResponse) {
392     CallbackResponseSaver callback;
393     auto policy = RemoteCommandRetryScheduler::makeRetryPolicy(
394         10U, Milliseconds(1), {ErrorCodes::HostNotFound});
395     RemoteCommandRetryScheduler scheduler(
396         &getExecutor(), request, stdx::ref(callback), std::move(policy));
397     start(&scheduler);
398 
399     // Scheduler does not parse document in a successful response for embedded errors.
400     // This is the case with some commands (e.g. find) which do not always return errors using the
401     // wire protocol.
402     ResponseStatus response(BSON("ok" << 0 << "code" << int(ErrorCodes::FailedToParse) << "errmsg"
403                                       << "injected error"),
404                             BSON("z" << 456),
405                             Milliseconds(100));
406 
407     processNetworkResponse(response);
408     checkCompletionStatus(&scheduler, callback, response);
409 }
410 
TEST_F(RemoteCommandRetrySchedulerTest,SchedulerInvokesCallbackWithErrorFromExecutorIfScheduleRemoteCommandFailsOnRetry)411 TEST_F(RemoteCommandRetrySchedulerTest,
412        SchedulerInvokesCallbackWithErrorFromExecutorIfScheduleRemoteCommandFailsOnRetry) {
413     CallbackResponseSaver callback;
414     auto policy = RemoteCommandRetryScheduler::makeRetryPolicy(
415         3U, executor::RemoteCommandRequest::kNoTimeout, {ErrorCodes::HostNotFound});
416     TaskExecutorWithFailureInScheduleRemoteCommand badExecutor(&getExecutor());
417     RemoteCommandRetryScheduler scheduler(
418         &badExecutor, request, stdx::ref(callback), std::move(policy));
419     start(&scheduler);
420 
421     processNetworkResponse({ErrorCodes::HostNotFound, "first", Milliseconds(0)});
422 
423     // scheduleRemoteCommand() will fail with ErrorCodes::ShutdownInProgress when trying to send
424     // third remote command request after processing second failed response.
425     badExecutor.scheduleRemoteCommandFailPoint = true;
426     processNetworkResponse({ErrorCodes::HostNotFound, "second", Milliseconds(0)});
427 
428     checkCompletionStatus(
429         &scheduler, callback, {ErrorCodes::ShutdownInProgress, "", Milliseconds(0)});
430 }
431 
TEST_F(RemoteCommandRetrySchedulerTest,SchedulerEnforcesPolicyMaximumAttemptsAndReturnsErrorOfLastFailedRequest)432 TEST_F(RemoteCommandRetrySchedulerTest,
433        SchedulerEnforcesPolicyMaximumAttemptsAndReturnsErrorOfLastFailedRequest) {
434     CallbackResponseSaver callback;
435     auto policy = RemoteCommandRetryScheduler::makeRetryPolicy(
436         3U,
437         executor::RemoteCommandRequest::kNoTimeout,
438         RemoteCommandRetryScheduler::kAllRetriableErrors);
439     RemoteCommandRetryScheduler scheduler(
440         &getExecutor(), request, stdx::ref(callback), std::move(policy));
441     start(&scheduler);
442 
443     processNetworkResponse({ErrorCodes::HostNotFound, "first", Milliseconds(0)});
444     processNetworkResponse({ErrorCodes::HostUnreachable, "second", Milliseconds(0)});
445 
446     ResponseStatus response(ErrorCodes::NetworkTimeout, "last", Milliseconds(0));
447     processNetworkResponse(response);
448     checkCompletionStatus(&scheduler, callback, response);
449 }
450 
TEST_F(RemoteCommandRetrySchedulerTest,SchedulerShouldRetryUntilSuccessfulResponseIsReceived)451 TEST_F(RemoteCommandRetrySchedulerTest, SchedulerShouldRetryUntilSuccessfulResponseIsReceived) {
452     CallbackResponseSaver callback;
453     auto policy = RemoteCommandRetryScheduler::makeRetryPolicy(
454         3U, executor::RemoteCommandRequest::kNoTimeout, {ErrorCodes::HostNotFound});
455     RemoteCommandRetryScheduler scheduler(
456         &getExecutor(), request, stdx::ref(callback), std::move(policy));
457     start(&scheduler);
458 
459     processNetworkResponse({ErrorCodes::HostNotFound, "first", Milliseconds(0)});
460 
461     ResponseStatus response(BSON("ok" << 1 << "x" << 123), BSON("z" << 456), Milliseconds(100));
462     processNetworkResponse(response);
463     checkCompletionStatus(&scheduler, callback, response);
464 }
465 
466 /**
467  * Retry policy that shuts down the scheduler whenever it is consulted by the scheduler.
468  * Results from getMaximumAttempts() and shouldRetryOnError() must cause the scheduler
469  * to resend the request.
470  */
471 class ShutdownSchedulerRetryPolicy : public RemoteCommandRetryScheduler::RetryPolicy {
472 public:
getMaximumAttempts() const473     std::size_t getMaximumAttempts() const override {
474         if (scheduler) {
475             scheduler->shutdown();
476         }
477         return 2U;
478     }
getMaximumResponseElapsedTotal() const479     Milliseconds getMaximumResponseElapsedTotal() const override {
480         return executor::RemoteCommandRequest::kNoTimeout;
481     }
shouldRetryOnError(ErrorCodes::Error) const482     bool shouldRetryOnError(ErrorCodes::Error) const override {
483         if (scheduler) {
484             scheduler->shutdown();
485         }
486         return true;
487     }
toString() const488     std::string toString() const override {
489         return "";
490     }
491 
492     // This must be set before starting the scheduler.
493     RemoteCommandRetryScheduler* scheduler = nullptr;
494 };
495 
TEST_F(RemoteCommandRetrySchedulerTest,SchedulerReturnsCallbackCanceledIfShutdownBeforeSendingRetryCommand)496 TEST_F(RemoteCommandRetrySchedulerTest,
497        SchedulerReturnsCallbackCanceledIfShutdownBeforeSendingRetryCommand) {
498     CallbackResponseSaver callback;
499     auto policy = stdx::make_unique<ShutdownSchedulerRetryPolicy>();
500     auto policyPtr = policy.get();
501     TaskExecutorWithFailureInScheduleRemoteCommand badExecutor(&getExecutor());
502     RemoteCommandRetryScheduler scheduler(
503         &badExecutor, request, stdx::ref(callback), std::move(policy));
504     policyPtr->scheduler = &scheduler;
505     start(&scheduler);
506 
507     processNetworkResponse({ErrorCodes::HostNotFound, "first", Milliseconds(0)});
508 
509     checkCompletionStatus(&scheduler,
510                           callback,
511                           {ErrorCodes::CallbackCanceled,
512                            "scheduler was shut down before retrying command",
513                            Milliseconds(0)});
514 }
515 
516 bool sharedCallbackStateDestroyed = false;
517 class SharedCallbackState {
518     MONGO_DISALLOW_COPYING(SharedCallbackState);
519 
520 public:
SharedCallbackState()521     SharedCallbackState() {}
~SharedCallbackState()522     ~SharedCallbackState() {
523         sharedCallbackStateDestroyed = true;
524     }
525 };
526 
TEST_F(RemoteCommandRetrySchedulerTest,SchedulerResetsOnCompletionCallbackFunctionAfterCompletion)527 TEST_F(RemoteCommandRetrySchedulerTest,
528        SchedulerResetsOnCompletionCallbackFunctionAfterCompletion) {
529     sharedCallbackStateDestroyed = false;
530     auto sharedCallbackData = std::make_shared<SharedCallbackState>();
531 
532     Status result = getDetectableErrorStatus();
533     auto policy = RemoteCommandRetryScheduler::makeNoRetryPolicy();
534 
535     RemoteCommandRetryScheduler scheduler(
536         &getExecutor(),
537         request,
538         [&result,
539          sharedCallbackData](const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba) {
540             unittest::log() << "setting result to " << rcba.response.status;
541             result = rcba.response.status;
542         },
543         std::move(policy));
544     start(&scheduler);
545 
546     sharedCallbackData.reset();
547     ASSERT_FALSE(sharedCallbackStateDestroyed);
548 
549     processNetworkResponse({ErrorCodes::OperationFailed, "command failed", Milliseconds(0)});
550 
551     scheduler.join();
552     ASSERT_EQUALS(ErrorCodes::OperationFailed, result);
553     ASSERT_TRUE(sharedCallbackStateDestroyed);
554 }
555 
556 }  // namespace
557