1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/executor/network_connection_hook.h"
36 #include "mongo/executor/network_interface_mock.h"
37 
38 #include <algorithm>
39 #include <iterator>
40 
41 #include "mongo/executor/connection_pool_stats.h"
42 #include "mongo/stdx/functional.h"
43 #include "mongo/util/log.h"
44 #include "mongo/util/mongoutils/str.h"
45 #include "mongo/util/time_support.h"
46 
47 namespace mongo {
48 namespace executor {
49 
50 using CallbackHandle = TaskExecutor::CallbackHandle;
51 using ResponseStatus = TaskExecutor::ResponseStatus;
52 
NetworkInterfaceMock()53 NetworkInterfaceMock::NetworkInterfaceMock()
54     : _waitingToRunMask(0),
55       _currentlyRunning(kNoThread),
56       _now(fassertStatusOK(18653, dateFromISOString("2014-08-01T00:00:00Z"))),
57       _hasStarted(false),
58       _inShutdown(false),
59       _executorNextWakeupDate(Date_t::max()) {}
60 
~NetworkInterfaceMock()61 NetworkInterfaceMock::~NetworkInterfaceMock() {
62     stdx::unique_lock<stdx::mutex> lk(_mutex);
63     invariant(!_hasStarted || inShutdown());
64     invariant(_scheduled.empty());
65     invariant(_blackHoled.empty());
66 }
67 
logQueues()68 void NetworkInterfaceMock::logQueues() {
69     stdx::unique_lock<stdx::mutex> lk(_mutex);
70     _logQueues_inlock();
71 }
72 
getDiagnosticString()73 std::string NetworkInterfaceMock::getDiagnosticString() {
74     stdx::unique_lock<stdx::mutex> lk(_mutex);
75     return _getDiagnosticString_inlock();
76 }
77 
_getDiagnosticString_inlock() const78 std::string NetworkInterfaceMock::_getDiagnosticString_inlock() const {
79     return str::stream() << "NetworkInterfaceMock -- waitingToRunMask:" << _waitingToRunMask
80                          << ", now:" << _now_inlock().toString() << ", hasStarted:" << _hasStarted
81                          << ", inShutdown: " << _inShutdown.load()
82                          << ", processing: " << _processing.size()
83                          << ", scheduled: " << _scheduled.size()
84                          << ", blackHoled: " << _blackHoled.size()
85                          << ", unscheduled: " << _unscheduled.size();
86 }
87 
_logQueues_inlock() const88 void NetworkInterfaceMock::_logQueues_inlock() const {
89     std::vector<std::pair<std::string, const NetworkOperationList*>> queues{
90         {"unscheduled", &_unscheduled},
91         {"scheduled", &_scheduled},
92         {"processing", &_processing},
93         {"blackholes", &_blackHoled}};
94     for (auto&& queue : queues) {
95         if (queue.second->empty()) {
96             continue;
97         }
98         log() << "**** queue: " << queue.first << " ****";
99         for (auto&& item : *queue.second) {
100             log() << "\t\t " << item.getDiagnosticString();
101         }
102     }
103 }
104 
appendConnectionStats(ConnectionPoolStats * stats) const105 void NetworkInterfaceMock::appendConnectionStats(ConnectionPoolStats* stats) const {}
106 
now()107 Date_t NetworkInterfaceMock::now() {
108     stdx::lock_guard<stdx::mutex> lk(_mutex);
109     return _now_inlock();
110 }
111 
getHostName()112 std::string NetworkInterfaceMock::getHostName() {
113     return "thisisourhostname";
114 }
115 
startCommand(const CallbackHandle & cbHandle,RemoteCommandRequest & request,const RemoteCommandCompletionFn & onFinish)116 Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle,
117                                           RemoteCommandRequest& request,
118                                           const RemoteCommandCompletionFn& onFinish) {
119     if (inShutdown()) {
120         return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
121     }
122 
123     stdx::lock_guard<stdx::mutex> lk(_mutex);
124 
125     const Date_t now = _now_inlock();
126     auto op = NetworkOperation(cbHandle, request, now, onFinish);
127 
128     // If we don't have a hook, or we have already 'connected' to this host, enqueue the op.
129     if (!_hook || _connections.count(request.target)) {
130         _enqueueOperation_inlock(std::move(op));
131     } else {
132         _connectThenEnqueueOperation_inlock(request.target, std::move(op));
133     }
134 
135     return Status::OK();
136 }
137 
setHandshakeReplyForHost(const mongo::HostAndPort & host,mongo::executor::RemoteCommandResponse && reply)138 void NetworkInterfaceMock::setHandshakeReplyForHost(
139     const mongo::HostAndPort& host, mongo::executor::RemoteCommandResponse&& reply) {
140     stdx::lock_guard<stdx::mutex> lk(_mutex);
141     auto it = _handshakeReplies.find(host);
142     if (it == std::end(_handshakeReplies)) {
143         auto res = _handshakeReplies.emplace(host, std::move(reply));
144         invariant(res.second);
145     } else {
146         it->second = std::move(reply);
147     }
148 }
149 
cancelCommand(const CallbackHandle & cbHandle)150 void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle) {
151     invariant(!inShutdown());
152 
153     stdx::lock_guard<stdx::mutex> lk(_mutex);
154     ResponseStatus rs(ErrorCodes::CallbackCanceled, "Network operation canceled", Milliseconds(0));
155 
156     // We mimic the real NetworkInterface by only delivering the CallbackCanceled status if the
157     // operation has not already received a response (i.e., is not already in the _scheduled queue).
158     std::vector<NetworkOperationList*> queuesToCheck{&_unscheduled, &_blackHoled, &_processing};
159     _interruptWithResponse_inlock(cbHandle, queuesToCheck, rs);
160 }
161 
_interruptWithResponse_inlock(const CallbackHandle & cbHandle,const std::vector<NetworkOperationList * > queuesToCheck,const ResponseStatus & response)162 void NetworkInterfaceMock::_interruptWithResponse_inlock(
163     const CallbackHandle& cbHandle,
164     const std::vector<NetworkOperationList*> queuesToCheck,
165     const ResponseStatus& response) {
166     auto matchFn = stdx::bind(&NetworkOperation::isForCallback, stdx::placeholders::_1, cbHandle);
167     for (auto list : queuesToCheck) {
168         auto noi = std::find_if(list->begin(), list->end(), matchFn);
169         if (noi == list->end()) {
170             continue;
171         }
172         _scheduled.splice(_scheduled.begin(), *list, noi);
173         noi->setResponse(_now_inlock(), response);
174         return;
175     }
176 }
177 
setAlarm(const Date_t when,const stdx::function<void ()> & action)178 Status NetworkInterfaceMock::setAlarm(const Date_t when, const stdx::function<void()>& action) {
179     if (inShutdown()) {
180         return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
181     }
182 
183     stdx::unique_lock<stdx::mutex> lk(_mutex);
184 
185     if (when <= _now_inlock()) {
186         lk.unlock();
187         action();
188         return Status::OK();
189     }
190     _alarms.emplace(when, action);
191 
192     return Status::OK();
193 }
194 
onNetworkThread()195 bool NetworkInterfaceMock::onNetworkThread() {
196     return _currentlyRunning == kNetworkThread;
197 }
198 
startup()199 void NetworkInterfaceMock::startup() {
200     stdx::lock_guard<stdx::mutex> lk(_mutex);
201     _startup_inlock();
202 }
203 
_startup_inlock()204 void NetworkInterfaceMock::_startup_inlock() {
205     invariant(!_hasStarted);
206     _hasStarted = true;
207     _inShutdown.store(false);
208     invariant(_currentlyRunning == kNoThread);
209     _currentlyRunning = kExecutorThread;
210 }
211 
shutdown()212 void NetworkInterfaceMock::shutdown() {
213     invariant(!inShutdown());
214 
215     stdx::unique_lock<stdx::mutex> lk(_mutex);
216     if (!_hasStarted) {
217         _startup_inlock();
218     }
219     _inShutdown.store(true);
220     NetworkOperationList todo;
221     todo.splice(todo.end(), _scheduled);
222     todo.splice(todo.end(), _unscheduled);
223     todo.splice(todo.end(), _processing);
224     todo.splice(todo.end(), _blackHoled);
225 
226     const Date_t now = _now_inlock();
227     _waitingToRunMask |= kExecutorThread;  // Prevents network thread from scheduling.
228     lk.unlock();
229     for (NetworkOperationIterator iter = todo.begin(); iter != todo.end(); ++iter) {
230         iter->setResponse(
231             now, {ErrorCodes::ShutdownInProgress, "Shutting down mock network", Milliseconds(0)});
232         iter->finishResponse();
233     }
234     lk.lock();
235     invariant(_currentlyRunning == kExecutorThread);
236     _currentlyRunning = kNoThread;
237     _waitingToRunMask = kNetworkThread;
238     _shouldWakeNetworkCondition.notify_one();
239 }
240 
inShutdown() const241 bool NetworkInterfaceMock::inShutdown() const {
242     return _inShutdown.load();
243 }
244 
enterNetwork()245 void NetworkInterfaceMock::enterNetwork() {
246     stdx::unique_lock<stdx::mutex> lk(_mutex);
247     while (!_isNetworkThreadRunnable_inlock()) {
248         _shouldWakeNetworkCondition.wait(lk);
249     }
250     _currentlyRunning = kNetworkThread;
251     _waitingToRunMask &= ~kNetworkThread;
252 }
253 
exitNetwork()254 void NetworkInterfaceMock::exitNetwork() {
255     stdx::lock_guard<stdx::mutex> lk(_mutex);
256     if (_currentlyRunning != kNetworkThread) {
257         return;
258     }
259     _currentlyRunning = kNoThread;
260     if (_isExecutorThreadRunnable_inlock()) {
261         _shouldWakeExecutorCondition.notify_one();
262     }
263     _waitingToRunMask |= kNetworkThread;
264 }
265 
hasReadyRequests()266 bool NetworkInterfaceMock::hasReadyRequests() {
267     stdx::lock_guard<stdx::mutex> lk(_mutex);
268     invariant(_currentlyRunning == kNetworkThread);
269     return _hasReadyRequests_inlock();
270 }
271 
_hasReadyRequests_inlock()272 bool NetworkInterfaceMock::_hasReadyRequests_inlock() {
273     if (_unscheduled.empty())
274         return false;
275     if (_unscheduled.front().getNextConsiderationDate() > _now_inlock()) {
276         return false;
277     }
278     return true;
279 }
280 
getNextReadyRequest()281 NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getNextReadyRequest() {
282     stdx::unique_lock<stdx::mutex> lk(_mutex);
283     invariant(_currentlyRunning == kNetworkThread);
284     while (!_hasReadyRequests_inlock()) {
285         _waitingToRunMask |= kExecutorThread;
286         _runReadyNetworkOperations_inlock(&lk);
287     }
288     invariant(_hasReadyRequests_inlock());
289     _processing.splice(_processing.begin(), _unscheduled, _unscheduled.begin());
290     return _processing.begin();
291 }
292 
getFrontOfUnscheduledQueue()293 NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getFrontOfUnscheduledQueue() {
294     stdx::unique_lock<stdx::mutex> lk(_mutex);
295     invariant(_currentlyRunning == kNetworkThread);
296     invariant(_hasReadyRequests_inlock());
297     return _unscheduled.begin();
298 }
299 
scheduleResponse(NetworkOperationIterator noi,Date_t when,const ResponseStatus & response)300 void NetworkInterfaceMock::scheduleResponse(NetworkOperationIterator noi,
301                                             Date_t when,
302                                             const ResponseStatus& response) {
303     stdx::lock_guard<stdx::mutex> lk(_mutex);
304     invariant(_currentlyRunning == kNetworkThread);
305     NetworkOperationIterator insertBefore = _scheduled.begin();
306     while ((insertBefore != _scheduled.end()) && (insertBefore->getResponseDate() <= when)) {
307         ++insertBefore;
308     }
309 
310     // If no RemoteCommandResponse was returned (for example, on a simulated network error), then
311     // do not attempt to run the metadata hook, since there is no returned metadata.
312     if (_metadataHook && response.isOK()) {
313         _metadataHook
314             ->readReplyMetadata(
315                 noi->getRequest().opCtx, noi->getRequest().target.toString(), response.metadata)
316             .transitional_ignore();
317     }
318 
319     noi->setResponse(when, response);
320     _scheduled.splice(insertBefore, _processing, noi);
321 }
322 
scheduleSuccessfulResponse(const BSONObj & response)323 RemoteCommandRequest NetworkInterfaceMock::scheduleSuccessfulResponse(const BSONObj& response) {
324     BSONObj metadata;
325     return scheduleSuccessfulResponse(RemoteCommandResponse(response, metadata, Milliseconds(0)));
326 }
327 
scheduleSuccessfulResponse(const RemoteCommandResponse & response)328 RemoteCommandRequest NetworkInterfaceMock::scheduleSuccessfulResponse(
329     const RemoteCommandResponse& response) {
330     return scheduleSuccessfulResponse(getNextReadyRequest(), response);
331 }
332 
scheduleSuccessfulResponse(NetworkOperationIterator noi,const RemoteCommandResponse & response)333 RemoteCommandRequest NetworkInterfaceMock::scheduleSuccessfulResponse(
334     NetworkOperationIterator noi, const RemoteCommandResponse& response) {
335     return scheduleSuccessfulResponse(noi, now(), response);
336 }
337 
scheduleSuccessfulResponse(NetworkOperationIterator noi,Date_t when,const RemoteCommandResponse & response)338 RemoteCommandRequest NetworkInterfaceMock::scheduleSuccessfulResponse(
339     NetworkOperationIterator noi, Date_t when, const RemoteCommandResponse& response) {
340     scheduleResponse(noi, when, response);
341     return noi->getRequest();
342 }
343 
scheduleErrorResponse(const Status & response)344 RemoteCommandRequest NetworkInterfaceMock::scheduleErrorResponse(const Status& response) {
345     return scheduleErrorResponse(getNextReadyRequest(), response);
346 }
347 
scheduleErrorResponse(const ResponseStatus response)348 RemoteCommandRequest NetworkInterfaceMock::scheduleErrorResponse(const ResponseStatus response) {
349     auto noi = getNextReadyRequest();
350     scheduleResponse(noi, now(), response);
351     return noi->getRequest();
352 }
353 
scheduleErrorResponse(NetworkOperationIterator noi,const Status & response)354 RemoteCommandRequest NetworkInterfaceMock::scheduleErrorResponse(NetworkOperationIterator noi,
355                                                                  const Status& response) {
356     return scheduleErrorResponse(noi, now(), response);
357 }
358 
scheduleErrorResponse(NetworkOperationIterator noi,Date_t when,const Status & response)359 RemoteCommandRequest NetworkInterfaceMock::scheduleErrorResponse(NetworkOperationIterator noi,
360                                                                  Date_t when,
361                                                                  const Status& response) {
362     scheduleResponse(noi, when, response);
363     return noi->getRequest();
364 }
365 
blackHole(NetworkOperationIterator noi)366 void NetworkInterfaceMock::blackHole(NetworkOperationIterator noi) {
367     stdx::lock_guard<stdx::mutex> lk(_mutex);
368     invariant(_currentlyRunning == kNetworkThread);
369     _blackHoled.splice(_blackHoled.end(), _processing, noi);
370 }
371 
requeueAt(NetworkOperationIterator noi,Date_t dontAskUntil)372 void NetworkInterfaceMock::requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil) {
373     stdx::lock_guard<stdx::mutex> lk(_mutex);
374     invariant(_currentlyRunning == kNetworkThread);
375     invariant(noi->getNextConsiderationDate() < dontAskUntil);
376     invariant(_now_inlock() < dontAskUntil);
377     NetworkOperationIterator insertBefore = _unscheduled.begin();
378     for (; insertBefore != _unscheduled.end(); ++insertBefore) {
379         if (insertBefore->getNextConsiderationDate() >= dontAskUntil) {
380             break;
381         }
382     }
383     noi->setNextConsiderationDate(dontAskUntil);
384     _unscheduled.splice(insertBefore, _processing, noi);
385 }
386 
runUntil(Date_t until)387 Date_t NetworkInterfaceMock::runUntil(Date_t until) {
388     stdx::unique_lock<stdx::mutex> lk(_mutex);
389     invariant(_currentlyRunning == kNetworkThread);
390     invariant(until > _now_inlock());
391     while (until > _now_inlock()) {
392         _runReadyNetworkOperations_inlock(&lk);
393         if (_hasReadyRequests_inlock()) {
394             break;
395         }
396         Date_t newNow = _executorNextWakeupDate;
397         if (!_alarms.empty() && _alarms.top().when < newNow) {
398             newNow = _alarms.top().when;
399         }
400         if (!_scheduled.empty() && _scheduled.front().getResponseDate() < newNow) {
401             newNow = _scheduled.front().getResponseDate();
402         }
403         if (until < newNow) {
404             newNow = until;
405         }
406         invariant(_now_inlock() <= newNow);
407         _now = newNow;
408         _waitingToRunMask |= kExecutorThread;
409     }
410     _runReadyNetworkOperations_inlock(&lk);
411     return _now_inlock();
412 }
413 
runReadyNetworkOperations()414 void NetworkInterfaceMock::runReadyNetworkOperations() {
415     stdx::unique_lock<stdx::mutex> lk(_mutex);
416     invariant(_currentlyRunning == kNetworkThread);
417     _runReadyNetworkOperations_inlock(&lk);
418 }
419 
waitForWork()420 void NetworkInterfaceMock::waitForWork() {
421     stdx::unique_lock<stdx::mutex> lk(_mutex);
422     invariant(_currentlyRunning == kExecutorThread);
423     _waitForWork_inlock(&lk);
424 }
425 
waitForWorkUntil(Date_t when)426 void NetworkInterfaceMock::waitForWorkUntil(Date_t when) {
427     stdx::unique_lock<stdx::mutex> lk(_mutex);
428     invariant(_currentlyRunning == kExecutorThread);
429     _executorNextWakeupDate = when;
430     if (_executorNextWakeupDate <= _now_inlock()) {
431         return;
432     }
433     _waitForWork_inlock(&lk);
434 }
435 
_enqueueOperation_inlock(mongo::executor::NetworkInterfaceMock::NetworkOperation && op)436 void NetworkInterfaceMock::_enqueueOperation_inlock(
437     mongo::executor::NetworkInterfaceMock::NetworkOperation&& op) {
438     auto insertBefore =
439         std::upper_bound(std::begin(_unscheduled),
440                          std::end(_unscheduled),
441                          op,
442                          [](const NetworkOperation& a, const NetworkOperation& b) {
443                              return a.getNextConsiderationDate() < b.getNextConsiderationDate();
444                          });
445 
446     _unscheduled.emplace(insertBefore, std::move(op));
447 
448     if (op.getRequest().timeout != RemoteCommandRequest::kNoTimeout) {
449         invariant(op.getRequest().timeout >= Milliseconds(0));
450         ResponseStatus rs(ErrorCodes::NetworkTimeout, "Network timeout", Milliseconds(0));
451         std::vector<NetworkOperationList*> queuesToCheck{&_unscheduled, &_blackHoled, &_scheduled};
452         auto action = stdx::bind(&NetworkInterfaceMock::_interruptWithResponse_inlock,
453                                  this,
454                                  op.getCallbackHandle(),
455                                  queuesToCheck,
456                                  rs);
457         _alarms.emplace(_now_inlock() + op.getRequest().timeout, action);
458     }
459 }
460 
_connectThenEnqueueOperation_inlock(const HostAndPort & target,NetworkOperation && op)461 void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort& target,
462                                                                NetworkOperation&& op) {
463     invariant(_hook);  // if there is no hook, we shouldn't even hit this codepath
464     invariant(!_connections.count(target));
465 
466     auto handshakeReplyIter = _handshakeReplies.find(target);
467 
468     auto handshakeReply = (handshakeReplyIter != std::end(_handshakeReplies))
469         ? handshakeReplyIter->second
470         : RemoteCommandResponse(BSONObj(), BSONObj(), Milliseconds(0));
471 
472     auto valid = _hook->validateHost(target, handshakeReply);
473     if (!valid.isOK()) {
474         op.setResponse(_now_inlock(), valid);
475         op.finishResponse();
476         return;
477     }
478 
479     auto swHookPostconnectCommand = _hook->makeRequest(target);
480 
481     if (!swHookPostconnectCommand.isOK()) {
482         op.setResponse(_now_inlock(), swHookPostconnectCommand.getStatus());
483         op.finishResponse();
484         return;
485     }
486 
487     boost::optional<RemoteCommandRequest> hookPostconnectCommand =
488         std::move(swHookPostconnectCommand.getValue());
489 
490     if (!hookPostconnectCommand) {
491         // If we don't have a post connect command, enqueue the actual command.
492         _enqueueOperation_inlock(std::move(op));
493         _connections.emplace(op.getRequest().target);
494         return;
495     }
496 
497     // The completion handler for the postconnect command schedules the original command.
498     auto postconnectCompletionHandler = [this, op](ResponseStatus rs) mutable {
499         stdx::lock_guard<stdx::mutex> lk(_mutex);
500         if (!rs.isOK()) {
501             op.setResponse(_now_inlock(), rs);
502             op.finishResponse();
503             return;
504         }
505 
506         auto handleStatus = _hook->handleReply(op.getRequest().target, std::move(rs));
507 
508         if (!handleStatus.isOK()) {
509             op.setResponse(_now_inlock(), handleStatus);
510             op.finishResponse();
511             return;
512         }
513 
514         _enqueueOperation_inlock(std::move(op));
515         _connections.emplace(op.getRequest().target);
516     };
517 
518     auto postconnectOp = NetworkOperation(op.getCallbackHandle(),
519                                           std::move(*hookPostconnectCommand),
520                                           _now_inlock(),
521                                           std::move(postconnectCompletionHandler));
522 
523     _enqueueOperation_inlock(std::move(postconnectOp));
524 }
525 
setConnectionHook(std::unique_ptr<NetworkConnectionHook> hook)526 void NetworkInterfaceMock::setConnectionHook(std::unique_ptr<NetworkConnectionHook> hook) {
527     stdx::lock_guard<stdx::mutex> lk(_mutex);
528     invariant(!_hasStarted);
529     invariant(!_hook);
530     _hook = std::move(hook);
531 }
532 
setEgressMetadataHook(std::unique_ptr<rpc::EgressMetadataHook> metadataHook)533 void NetworkInterfaceMock::setEgressMetadataHook(
534     std::unique_ptr<rpc::EgressMetadataHook> metadataHook) {
535     stdx::lock_guard<stdx::mutex> lk(_mutex);
536     invariant(!_hasStarted);
537     invariant(!_metadataHook);
538     _metadataHook = std::move(metadataHook);
539 }
540 
signalWorkAvailable()541 void NetworkInterfaceMock::signalWorkAvailable() {
542     stdx::lock_guard<stdx::mutex> lk(_mutex);
543     _waitingToRunMask |= kExecutorThread;
544     if (_currentlyRunning == kNoThread) {
545         _shouldWakeExecutorCondition.notify_one();
546     }
547 }
548 
_runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex> * lk)549 void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk) {
550     while (!_alarms.empty() && _now_inlock() >= _alarms.top().when) {
551         auto fn = _alarms.top().action;
552         _alarms.pop();
553         lk->unlock();
554         fn();
555         lk->lock();
556     }
557     while (!_scheduled.empty() && _scheduled.front().getResponseDate() <= _now_inlock()) {
558         invariant(_currentlyRunning == kNetworkThread);
559         NetworkOperation op = _scheduled.front();
560         _scheduled.pop_front();
561         _waitingToRunMask |= kExecutorThread;
562         lk->unlock();
563         op.finishResponse();
564         lk->lock();
565     }
566     invariant(_currentlyRunning == kNetworkThread);
567     if (!(_waitingToRunMask & kExecutorThread)) {
568         return;
569     }
570     _shouldWakeExecutorCondition.notify_one();
571     _currentlyRunning = kNoThread;
572     while (!_isNetworkThreadRunnable_inlock()) {
573         _shouldWakeNetworkCondition.wait(*lk);
574     }
575     _currentlyRunning = kNetworkThread;
576     _waitingToRunMask &= ~kNetworkThread;
577 }
578 
_waitForWork_inlock(stdx::unique_lock<stdx::mutex> * lk)579 void NetworkInterfaceMock::_waitForWork_inlock(stdx::unique_lock<stdx::mutex>* lk) {
580     if (_waitingToRunMask & kExecutorThread) {
581         _waitingToRunMask &= ~kExecutorThread;
582         return;
583     }
584     _currentlyRunning = kNoThread;
585     while (!_isExecutorThreadRunnable_inlock()) {
586         _waitingToRunMask |= kNetworkThread;
587         _shouldWakeNetworkCondition.notify_one();
588         _shouldWakeExecutorCondition.wait(*lk);
589     }
590     _currentlyRunning = kExecutorThread;
591     _waitingToRunMask &= ~kExecutorThread;
592 }
593 
_isNetworkThreadRunnable_inlock()594 bool NetworkInterfaceMock::_isNetworkThreadRunnable_inlock() {
595     if (_currentlyRunning != kNoThread) {
596         return false;
597     }
598     if (_waitingToRunMask != kNetworkThread) {
599         return false;
600     }
601     return true;
602 }
603 
_isExecutorThreadRunnable_inlock()604 bool NetworkInterfaceMock::_isExecutorThreadRunnable_inlock() {
605     if (_currentlyRunning != kNoThread) {
606         return false;
607     }
608     return _waitingToRunMask & kExecutorThread;
609 }
610 
611 static const ResponseStatus kUnsetResponse(ErrorCodes::InternalError,
612                                            "NetworkOperation::_response never set");
613 
NetworkOperation()614 NetworkInterfaceMock::NetworkOperation::NetworkOperation()
615     : _requestDate(),
616       _nextConsiderationDate(),
617       _responseDate(),
618       _request(),
619       _response(kUnsetResponse),
620       _onFinish() {}
621 
NetworkOperation(const CallbackHandle & cbHandle,const RemoteCommandRequest & theRequest,Date_t theRequestDate,const RemoteCommandCompletionFn & onFinish)622 NetworkInterfaceMock::NetworkOperation::NetworkOperation(const CallbackHandle& cbHandle,
623                                                          const RemoteCommandRequest& theRequest,
624                                                          Date_t theRequestDate,
625                                                          const RemoteCommandCompletionFn& onFinish)
626     : _requestDate(theRequestDate),
627       _nextConsiderationDate(theRequestDate),
628       _responseDate(),
629       _cbHandle(cbHandle),
630       _request(theRequest),
631       _response(kUnsetResponse),
632       _onFinish(onFinish) {}
633 
~NetworkOperation()634 NetworkInterfaceMock::NetworkOperation::~NetworkOperation() {}
635 
getDiagnosticString() const636 std::string NetworkInterfaceMock::NetworkOperation::getDiagnosticString() const {
637     return str::stream() << "NetworkOperation -- request:'" << _request.toString()
638                          << "', responseStatus: '" << _response.status.toString()
639                          << "', responseBody: '" << (_response.isOK() ? _response.toString() : "")
640                          << "', reqDate: " << _requestDate.toString()
641                          << ", nextConsiderDate: " << _nextConsiderationDate.toString()
642                          << ", respDate: " << _responseDate.toString();
643 }
644 
setNextConsiderationDate(Date_t nextConsiderationDate)645 void NetworkInterfaceMock::NetworkOperation::setNextConsiderationDate(
646     Date_t nextConsiderationDate) {
647     invariant(nextConsiderationDate > _nextConsiderationDate);
648     _nextConsiderationDate = nextConsiderationDate;
649 }
650 
setResponse(Date_t responseDate,const ResponseStatus & response)651 void NetworkInterfaceMock::NetworkOperation::setResponse(Date_t responseDate,
652                                                          const ResponseStatus& response) {
653     invariant(responseDate >= _requestDate);
654     _responseDate = responseDate;
655     _response = response;
656 }
657 
finishResponse()658 void NetworkInterfaceMock::NetworkOperation::finishResponse() {
659     invariant(_onFinish);
660     _onFinish(_response);
661     _onFinish = RemoteCommandCompletionFn();
662 }
663 
InNetworkGuard(NetworkInterfaceMock * net)664 NetworkInterfaceMock::InNetworkGuard::InNetworkGuard(NetworkInterfaceMock* net) : _net(net) {
665     _net->enterNetwork();
666 }
667 
dismiss()668 void NetworkInterfaceMock::InNetworkGuard::dismiss() {
669     _callExitNetwork = false;
670     _net->exitNetwork();
671 }
672 
~InNetworkGuard()673 NetworkInterfaceMock::InNetworkGuard::~InNetworkGuard() {
674     if (_callExitNetwork)
675         _net->exitNetwork();
676 }
677 
operator ->() const678 NetworkInterfaceMock* NetworkInterfaceMock::InNetworkGuard::operator->() const {
679     return _net;
680 }
681 
NetworkInterfaceMockClockSource(NetworkInterfaceMock * net)682 NetworkInterfaceMockClockSource::NetworkInterfaceMockClockSource(NetworkInterfaceMock* net)
683     : _net(net) {
684     _tracksSystemClock = false;
685 }
686 
687 }  // namespace executor
688 }  // namespace mongo
689