1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/executor/network_connection_hook.h"
36 #include "mongo/executor/network_interface_mock.h"
37
38 #include <algorithm>
39 #include <iterator>
40
41 #include "mongo/executor/connection_pool_stats.h"
42 #include "mongo/stdx/functional.h"
43 #include "mongo/util/log.h"
44 #include "mongo/util/mongoutils/str.h"
45 #include "mongo/util/time_support.h"
46
47 namespace mongo {
48 namespace executor {
49
50 using CallbackHandle = TaskExecutor::CallbackHandle;
51 using ResponseStatus = TaskExecutor::ResponseStatus;
52
NetworkInterfaceMock()53 NetworkInterfaceMock::NetworkInterfaceMock()
54 : _waitingToRunMask(0),
55 _currentlyRunning(kNoThread),
56 _now(fassertStatusOK(18653, dateFromISOString("2014-08-01T00:00:00Z"))),
57 _hasStarted(false),
58 _inShutdown(false),
59 _executorNextWakeupDate(Date_t::max()) {}
60
~NetworkInterfaceMock()61 NetworkInterfaceMock::~NetworkInterfaceMock() {
62 stdx::unique_lock<stdx::mutex> lk(_mutex);
63 invariant(!_hasStarted || inShutdown());
64 invariant(_scheduled.empty());
65 invariant(_blackHoled.empty());
66 }
67
logQueues()68 void NetworkInterfaceMock::logQueues() {
69 stdx::unique_lock<stdx::mutex> lk(_mutex);
70 _logQueues_inlock();
71 }
72
getDiagnosticString()73 std::string NetworkInterfaceMock::getDiagnosticString() {
74 stdx::unique_lock<stdx::mutex> lk(_mutex);
75 return _getDiagnosticString_inlock();
76 }
77
_getDiagnosticString_inlock() const78 std::string NetworkInterfaceMock::_getDiagnosticString_inlock() const {
79 return str::stream() << "NetworkInterfaceMock -- waitingToRunMask:" << _waitingToRunMask
80 << ", now:" << _now_inlock().toString() << ", hasStarted:" << _hasStarted
81 << ", inShutdown: " << _inShutdown.load()
82 << ", processing: " << _processing.size()
83 << ", scheduled: " << _scheduled.size()
84 << ", blackHoled: " << _blackHoled.size()
85 << ", unscheduled: " << _unscheduled.size();
86 }
87
_logQueues_inlock() const88 void NetworkInterfaceMock::_logQueues_inlock() const {
89 std::vector<std::pair<std::string, const NetworkOperationList*>> queues{
90 {"unscheduled", &_unscheduled},
91 {"scheduled", &_scheduled},
92 {"processing", &_processing},
93 {"blackholes", &_blackHoled}};
94 for (auto&& queue : queues) {
95 if (queue.second->empty()) {
96 continue;
97 }
98 log() << "**** queue: " << queue.first << " ****";
99 for (auto&& item : *queue.second) {
100 log() << "\t\t " << item.getDiagnosticString();
101 }
102 }
103 }
104
appendConnectionStats(ConnectionPoolStats * stats) const105 void NetworkInterfaceMock::appendConnectionStats(ConnectionPoolStats* stats) const {}
106
now()107 Date_t NetworkInterfaceMock::now() {
108 stdx::lock_guard<stdx::mutex> lk(_mutex);
109 return _now_inlock();
110 }
111
getHostName()112 std::string NetworkInterfaceMock::getHostName() {
113 return "thisisourhostname";
114 }
115
startCommand(const CallbackHandle & cbHandle,RemoteCommandRequest & request,const RemoteCommandCompletionFn & onFinish)116 Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle,
117 RemoteCommandRequest& request,
118 const RemoteCommandCompletionFn& onFinish) {
119 if (inShutdown()) {
120 return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
121 }
122
123 stdx::lock_guard<stdx::mutex> lk(_mutex);
124
125 const Date_t now = _now_inlock();
126 auto op = NetworkOperation(cbHandle, request, now, onFinish);
127
128 // If we don't have a hook, or we have already 'connected' to this host, enqueue the op.
129 if (!_hook || _connections.count(request.target)) {
130 _enqueueOperation_inlock(std::move(op));
131 } else {
132 _connectThenEnqueueOperation_inlock(request.target, std::move(op));
133 }
134
135 return Status::OK();
136 }
137
setHandshakeReplyForHost(const mongo::HostAndPort & host,mongo::executor::RemoteCommandResponse && reply)138 void NetworkInterfaceMock::setHandshakeReplyForHost(
139 const mongo::HostAndPort& host, mongo::executor::RemoteCommandResponse&& reply) {
140 stdx::lock_guard<stdx::mutex> lk(_mutex);
141 auto it = _handshakeReplies.find(host);
142 if (it == std::end(_handshakeReplies)) {
143 auto res = _handshakeReplies.emplace(host, std::move(reply));
144 invariant(res.second);
145 } else {
146 it->second = std::move(reply);
147 }
148 }
149
cancelCommand(const CallbackHandle & cbHandle)150 void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle) {
151 invariant(!inShutdown());
152
153 stdx::lock_guard<stdx::mutex> lk(_mutex);
154 ResponseStatus rs(ErrorCodes::CallbackCanceled, "Network operation canceled", Milliseconds(0));
155
156 // We mimic the real NetworkInterface by only delivering the CallbackCanceled status if the
157 // operation has not already received a response (i.e., is not already in the _scheduled queue).
158 std::vector<NetworkOperationList*> queuesToCheck{&_unscheduled, &_blackHoled, &_processing};
159 _interruptWithResponse_inlock(cbHandle, queuesToCheck, rs);
160 }
161
_interruptWithResponse_inlock(const CallbackHandle & cbHandle,const std::vector<NetworkOperationList * > queuesToCheck,const ResponseStatus & response)162 void NetworkInterfaceMock::_interruptWithResponse_inlock(
163 const CallbackHandle& cbHandle,
164 const std::vector<NetworkOperationList*> queuesToCheck,
165 const ResponseStatus& response) {
166 auto matchFn = stdx::bind(&NetworkOperation::isForCallback, stdx::placeholders::_1, cbHandle);
167 for (auto list : queuesToCheck) {
168 auto noi = std::find_if(list->begin(), list->end(), matchFn);
169 if (noi == list->end()) {
170 continue;
171 }
172 _scheduled.splice(_scheduled.begin(), *list, noi);
173 noi->setResponse(_now_inlock(), response);
174 return;
175 }
176 }
177
setAlarm(const Date_t when,const stdx::function<void ()> & action)178 Status NetworkInterfaceMock::setAlarm(const Date_t when, const stdx::function<void()>& action) {
179 if (inShutdown()) {
180 return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
181 }
182
183 stdx::unique_lock<stdx::mutex> lk(_mutex);
184
185 if (when <= _now_inlock()) {
186 lk.unlock();
187 action();
188 return Status::OK();
189 }
190 _alarms.emplace(when, action);
191
192 return Status::OK();
193 }
194
onNetworkThread()195 bool NetworkInterfaceMock::onNetworkThread() {
196 return _currentlyRunning == kNetworkThread;
197 }
198
startup()199 void NetworkInterfaceMock::startup() {
200 stdx::lock_guard<stdx::mutex> lk(_mutex);
201 _startup_inlock();
202 }
203
_startup_inlock()204 void NetworkInterfaceMock::_startup_inlock() {
205 invariant(!_hasStarted);
206 _hasStarted = true;
207 _inShutdown.store(false);
208 invariant(_currentlyRunning == kNoThread);
209 _currentlyRunning = kExecutorThread;
210 }
211
shutdown()212 void NetworkInterfaceMock::shutdown() {
213 invariant(!inShutdown());
214
215 stdx::unique_lock<stdx::mutex> lk(_mutex);
216 if (!_hasStarted) {
217 _startup_inlock();
218 }
219 _inShutdown.store(true);
220 NetworkOperationList todo;
221 todo.splice(todo.end(), _scheduled);
222 todo.splice(todo.end(), _unscheduled);
223 todo.splice(todo.end(), _processing);
224 todo.splice(todo.end(), _blackHoled);
225
226 const Date_t now = _now_inlock();
227 _waitingToRunMask |= kExecutorThread; // Prevents network thread from scheduling.
228 lk.unlock();
229 for (NetworkOperationIterator iter = todo.begin(); iter != todo.end(); ++iter) {
230 iter->setResponse(
231 now, {ErrorCodes::ShutdownInProgress, "Shutting down mock network", Milliseconds(0)});
232 iter->finishResponse();
233 }
234 lk.lock();
235 invariant(_currentlyRunning == kExecutorThread);
236 _currentlyRunning = kNoThread;
237 _waitingToRunMask = kNetworkThread;
238 _shouldWakeNetworkCondition.notify_one();
239 }
240
inShutdown() const241 bool NetworkInterfaceMock::inShutdown() const {
242 return _inShutdown.load();
243 }
244
enterNetwork()245 void NetworkInterfaceMock::enterNetwork() {
246 stdx::unique_lock<stdx::mutex> lk(_mutex);
247 while (!_isNetworkThreadRunnable_inlock()) {
248 _shouldWakeNetworkCondition.wait(lk);
249 }
250 _currentlyRunning = kNetworkThread;
251 _waitingToRunMask &= ~kNetworkThread;
252 }
253
exitNetwork()254 void NetworkInterfaceMock::exitNetwork() {
255 stdx::lock_guard<stdx::mutex> lk(_mutex);
256 if (_currentlyRunning != kNetworkThread) {
257 return;
258 }
259 _currentlyRunning = kNoThread;
260 if (_isExecutorThreadRunnable_inlock()) {
261 _shouldWakeExecutorCondition.notify_one();
262 }
263 _waitingToRunMask |= kNetworkThread;
264 }
265
hasReadyRequests()266 bool NetworkInterfaceMock::hasReadyRequests() {
267 stdx::lock_guard<stdx::mutex> lk(_mutex);
268 invariant(_currentlyRunning == kNetworkThread);
269 return _hasReadyRequests_inlock();
270 }
271
_hasReadyRequests_inlock()272 bool NetworkInterfaceMock::_hasReadyRequests_inlock() {
273 if (_unscheduled.empty())
274 return false;
275 if (_unscheduled.front().getNextConsiderationDate() > _now_inlock()) {
276 return false;
277 }
278 return true;
279 }
280
getNextReadyRequest()281 NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getNextReadyRequest() {
282 stdx::unique_lock<stdx::mutex> lk(_mutex);
283 invariant(_currentlyRunning == kNetworkThread);
284 while (!_hasReadyRequests_inlock()) {
285 _waitingToRunMask |= kExecutorThread;
286 _runReadyNetworkOperations_inlock(&lk);
287 }
288 invariant(_hasReadyRequests_inlock());
289 _processing.splice(_processing.begin(), _unscheduled, _unscheduled.begin());
290 return _processing.begin();
291 }
292
getFrontOfUnscheduledQueue()293 NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getFrontOfUnscheduledQueue() {
294 stdx::unique_lock<stdx::mutex> lk(_mutex);
295 invariant(_currentlyRunning == kNetworkThread);
296 invariant(_hasReadyRequests_inlock());
297 return _unscheduled.begin();
298 }
299
scheduleResponse(NetworkOperationIterator noi,Date_t when,const ResponseStatus & response)300 void NetworkInterfaceMock::scheduleResponse(NetworkOperationIterator noi,
301 Date_t when,
302 const ResponseStatus& response) {
303 stdx::lock_guard<stdx::mutex> lk(_mutex);
304 invariant(_currentlyRunning == kNetworkThread);
305 NetworkOperationIterator insertBefore = _scheduled.begin();
306 while ((insertBefore != _scheduled.end()) && (insertBefore->getResponseDate() <= when)) {
307 ++insertBefore;
308 }
309
310 // If no RemoteCommandResponse was returned (for example, on a simulated network error), then
311 // do not attempt to run the metadata hook, since there is no returned metadata.
312 if (_metadataHook && response.isOK()) {
313 _metadataHook
314 ->readReplyMetadata(
315 noi->getRequest().opCtx, noi->getRequest().target.toString(), response.metadata)
316 .transitional_ignore();
317 }
318
319 noi->setResponse(when, response);
320 _scheduled.splice(insertBefore, _processing, noi);
321 }
322
scheduleSuccessfulResponse(const BSONObj & response)323 RemoteCommandRequest NetworkInterfaceMock::scheduleSuccessfulResponse(const BSONObj& response) {
324 BSONObj metadata;
325 return scheduleSuccessfulResponse(RemoteCommandResponse(response, metadata, Milliseconds(0)));
326 }
327
scheduleSuccessfulResponse(const RemoteCommandResponse & response)328 RemoteCommandRequest NetworkInterfaceMock::scheduleSuccessfulResponse(
329 const RemoteCommandResponse& response) {
330 return scheduleSuccessfulResponse(getNextReadyRequest(), response);
331 }
332
scheduleSuccessfulResponse(NetworkOperationIterator noi,const RemoteCommandResponse & response)333 RemoteCommandRequest NetworkInterfaceMock::scheduleSuccessfulResponse(
334 NetworkOperationIterator noi, const RemoteCommandResponse& response) {
335 return scheduleSuccessfulResponse(noi, now(), response);
336 }
337
scheduleSuccessfulResponse(NetworkOperationIterator noi,Date_t when,const RemoteCommandResponse & response)338 RemoteCommandRequest NetworkInterfaceMock::scheduleSuccessfulResponse(
339 NetworkOperationIterator noi, Date_t when, const RemoteCommandResponse& response) {
340 scheduleResponse(noi, when, response);
341 return noi->getRequest();
342 }
343
scheduleErrorResponse(const Status & response)344 RemoteCommandRequest NetworkInterfaceMock::scheduleErrorResponse(const Status& response) {
345 return scheduleErrorResponse(getNextReadyRequest(), response);
346 }
347
scheduleErrorResponse(const ResponseStatus response)348 RemoteCommandRequest NetworkInterfaceMock::scheduleErrorResponse(const ResponseStatus response) {
349 auto noi = getNextReadyRequest();
350 scheduleResponse(noi, now(), response);
351 return noi->getRequest();
352 }
353
scheduleErrorResponse(NetworkOperationIterator noi,const Status & response)354 RemoteCommandRequest NetworkInterfaceMock::scheduleErrorResponse(NetworkOperationIterator noi,
355 const Status& response) {
356 return scheduleErrorResponse(noi, now(), response);
357 }
358
scheduleErrorResponse(NetworkOperationIterator noi,Date_t when,const Status & response)359 RemoteCommandRequest NetworkInterfaceMock::scheduleErrorResponse(NetworkOperationIterator noi,
360 Date_t when,
361 const Status& response) {
362 scheduleResponse(noi, when, response);
363 return noi->getRequest();
364 }
365
blackHole(NetworkOperationIterator noi)366 void NetworkInterfaceMock::blackHole(NetworkOperationIterator noi) {
367 stdx::lock_guard<stdx::mutex> lk(_mutex);
368 invariant(_currentlyRunning == kNetworkThread);
369 _blackHoled.splice(_blackHoled.end(), _processing, noi);
370 }
371
requeueAt(NetworkOperationIterator noi,Date_t dontAskUntil)372 void NetworkInterfaceMock::requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil) {
373 stdx::lock_guard<stdx::mutex> lk(_mutex);
374 invariant(_currentlyRunning == kNetworkThread);
375 invariant(noi->getNextConsiderationDate() < dontAskUntil);
376 invariant(_now_inlock() < dontAskUntil);
377 NetworkOperationIterator insertBefore = _unscheduled.begin();
378 for (; insertBefore != _unscheduled.end(); ++insertBefore) {
379 if (insertBefore->getNextConsiderationDate() >= dontAskUntil) {
380 break;
381 }
382 }
383 noi->setNextConsiderationDate(dontAskUntil);
384 _unscheduled.splice(insertBefore, _processing, noi);
385 }
386
runUntil(Date_t until)387 Date_t NetworkInterfaceMock::runUntil(Date_t until) {
388 stdx::unique_lock<stdx::mutex> lk(_mutex);
389 invariant(_currentlyRunning == kNetworkThread);
390 invariant(until > _now_inlock());
391 while (until > _now_inlock()) {
392 _runReadyNetworkOperations_inlock(&lk);
393 if (_hasReadyRequests_inlock()) {
394 break;
395 }
396 Date_t newNow = _executorNextWakeupDate;
397 if (!_alarms.empty() && _alarms.top().when < newNow) {
398 newNow = _alarms.top().when;
399 }
400 if (!_scheduled.empty() && _scheduled.front().getResponseDate() < newNow) {
401 newNow = _scheduled.front().getResponseDate();
402 }
403 if (until < newNow) {
404 newNow = until;
405 }
406 invariant(_now_inlock() <= newNow);
407 _now = newNow;
408 _waitingToRunMask |= kExecutorThread;
409 }
410 _runReadyNetworkOperations_inlock(&lk);
411 return _now_inlock();
412 }
413
runReadyNetworkOperations()414 void NetworkInterfaceMock::runReadyNetworkOperations() {
415 stdx::unique_lock<stdx::mutex> lk(_mutex);
416 invariant(_currentlyRunning == kNetworkThread);
417 _runReadyNetworkOperations_inlock(&lk);
418 }
419
waitForWork()420 void NetworkInterfaceMock::waitForWork() {
421 stdx::unique_lock<stdx::mutex> lk(_mutex);
422 invariant(_currentlyRunning == kExecutorThread);
423 _waitForWork_inlock(&lk);
424 }
425
waitForWorkUntil(Date_t when)426 void NetworkInterfaceMock::waitForWorkUntil(Date_t when) {
427 stdx::unique_lock<stdx::mutex> lk(_mutex);
428 invariant(_currentlyRunning == kExecutorThread);
429 _executorNextWakeupDate = when;
430 if (_executorNextWakeupDate <= _now_inlock()) {
431 return;
432 }
433 _waitForWork_inlock(&lk);
434 }
435
_enqueueOperation_inlock(mongo::executor::NetworkInterfaceMock::NetworkOperation && op)436 void NetworkInterfaceMock::_enqueueOperation_inlock(
437 mongo::executor::NetworkInterfaceMock::NetworkOperation&& op) {
438 auto insertBefore =
439 std::upper_bound(std::begin(_unscheduled),
440 std::end(_unscheduled),
441 op,
442 [](const NetworkOperation& a, const NetworkOperation& b) {
443 return a.getNextConsiderationDate() < b.getNextConsiderationDate();
444 });
445
446 _unscheduled.emplace(insertBefore, std::move(op));
447
448 if (op.getRequest().timeout != RemoteCommandRequest::kNoTimeout) {
449 invariant(op.getRequest().timeout >= Milliseconds(0));
450 ResponseStatus rs(ErrorCodes::NetworkTimeout, "Network timeout", Milliseconds(0));
451 std::vector<NetworkOperationList*> queuesToCheck{&_unscheduled, &_blackHoled, &_scheduled};
452 auto action = stdx::bind(&NetworkInterfaceMock::_interruptWithResponse_inlock,
453 this,
454 op.getCallbackHandle(),
455 queuesToCheck,
456 rs);
457 _alarms.emplace(_now_inlock() + op.getRequest().timeout, action);
458 }
459 }
460
_connectThenEnqueueOperation_inlock(const HostAndPort & target,NetworkOperation && op)461 void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort& target,
462 NetworkOperation&& op) {
463 invariant(_hook); // if there is no hook, we shouldn't even hit this codepath
464 invariant(!_connections.count(target));
465
466 auto handshakeReplyIter = _handshakeReplies.find(target);
467
468 auto handshakeReply = (handshakeReplyIter != std::end(_handshakeReplies))
469 ? handshakeReplyIter->second
470 : RemoteCommandResponse(BSONObj(), BSONObj(), Milliseconds(0));
471
472 auto valid = _hook->validateHost(target, handshakeReply);
473 if (!valid.isOK()) {
474 op.setResponse(_now_inlock(), valid);
475 op.finishResponse();
476 return;
477 }
478
479 auto swHookPostconnectCommand = _hook->makeRequest(target);
480
481 if (!swHookPostconnectCommand.isOK()) {
482 op.setResponse(_now_inlock(), swHookPostconnectCommand.getStatus());
483 op.finishResponse();
484 return;
485 }
486
487 boost::optional<RemoteCommandRequest> hookPostconnectCommand =
488 std::move(swHookPostconnectCommand.getValue());
489
490 if (!hookPostconnectCommand) {
491 // If we don't have a post connect command, enqueue the actual command.
492 _enqueueOperation_inlock(std::move(op));
493 _connections.emplace(op.getRequest().target);
494 return;
495 }
496
497 // The completion handler for the postconnect command schedules the original command.
498 auto postconnectCompletionHandler = [this, op](ResponseStatus rs) mutable {
499 stdx::lock_guard<stdx::mutex> lk(_mutex);
500 if (!rs.isOK()) {
501 op.setResponse(_now_inlock(), rs);
502 op.finishResponse();
503 return;
504 }
505
506 auto handleStatus = _hook->handleReply(op.getRequest().target, std::move(rs));
507
508 if (!handleStatus.isOK()) {
509 op.setResponse(_now_inlock(), handleStatus);
510 op.finishResponse();
511 return;
512 }
513
514 _enqueueOperation_inlock(std::move(op));
515 _connections.emplace(op.getRequest().target);
516 };
517
518 auto postconnectOp = NetworkOperation(op.getCallbackHandle(),
519 std::move(*hookPostconnectCommand),
520 _now_inlock(),
521 std::move(postconnectCompletionHandler));
522
523 _enqueueOperation_inlock(std::move(postconnectOp));
524 }
525
setConnectionHook(std::unique_ptr<NetworkConnectionHook> hook)526 void NetworkInterfaceMock::setConnectionHook(std::unique_ptr<NetworkConnectionHook> hook) {
527 stdx::lock_guard<stdx::mutex> lk(_mutex);
528 invariant(!_hasStarted);
529 invariant(!_hook);
530 _hook = std::move(hook);
531 }
532
setEgressMetadataHook(std::unique_ptr<rpc::EgressMetadataHook> metadataHook)533 void NetworkInterfaceMock::setEgressMetadataHook(
534 std::unique_ptr<rpc::EgressMetadataHook> metadataHook) {
535 stdx::lock_guard<stdx::mutex> lk(_mutex);
536 invariant(!_hasStarted);
537 invariant(!_metadataHook);
538 _metadataHook = std::move(metadataHook);
539 }
540
signalWorkAvailable()541 void NetworkInterfaceMock::signalWorkAvailable() {
542 stdx::lock_guard<stdx::mutex> lk(_mutex);
543 _waitingToRunMask |= kExecutorThread;
544 if (_currentlyRunning == kNoThread) {
545 _shouldWakeExecutorCondition.notify_one();
546 }
547 }
548
_runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex> * lk)549 void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk) {
550 while (!_alarms.empty() && _now_inlock() >= _alarms.top().when) {
551 auto fn = _alarms.top().action;
552 _alarms.pop();
553 lk->unlock();
554 fn();
555 lk->lock();
556 }
557 while (!_scheduled.empty() && _scheduled.front().getResponseDate() <= _now_inlock()) {
558 invariant(_currentlyRunning == kNetworkThread);
559 NetworkOperation op = _scheduled.front();
560 _scheduled.pop_front();
561 _waitingToRunMask |= kExecutorThread;
562 lk->unlock();
563 op.finishResponse();
564 lk->lock();
565 }
566 invariant(_currentlyRunning == kNetworkThread);
567 if (!(_waitingToRunMask & kExecutorThread)) {
568 return;
569 }
570 _shouldWakeExecutorCondition.notify_one();
571 _currentlyRunning = kNoThread;
572 while (!_isNetworkThreadRunnable_inlock()) {
573 _shouldWakeNetworkCondition.wait(*lk);
574 }
575 _currentlyRunning = kNetworkThread;
576 _waitingToRunMask &= ~kNetworkThread;
577 }
578
_waitForWork_inlock(stdx::unique_lock<stdx::mutex> * lk)579 void NetworkInterfaceMock::_waitForWork_inlock(stdx::unique_lock<stdx::mutex>* lk) {
580 if (_waitingToRunMask & kExecutorThread) {
581 _waitingToRunMask &= ~kExecutorThread;
582 return;
583 }
584 _currentlyRunning = kNoThread;
585 while (!_isExecutorThreadRunnable_inlock()) {
586 _waitingToRunMask |= kNetworkThread;
587 _shouldWakeNetworkCondition.notify_one();
588 _shouldWakeExecutorCondition.wait(*lk);
589 }
590 _currentlyRunning = kExecutorThread;
591 _waitingToRunMask &= ~kExecutorThread;
592 }
593
_isNetworkThreadRunnable_inlock()594 bool NetworkInterfaceMock::_isNetworkThreadRunnable_inlock() {
595 if (_currentlyRunning != kNoThread) {
596 return false;
597 }
598 if (_waitingToRunMask != kNetworkThread) {
599 return false;
600 }
601 return true;
602 }
603
_isExecutorThreadRunnable_inlock()604 bool NetworkInterfaceMock::_isExecutorThreadRunnable_inlock() {
605 if (_currentlyRunning != kNoThread) {
606 return false;
607 }
608 return _waitingToRunMask & kExecutorThread;
609 }
610
611 static const ResponseStatus kUnsetResponse(ErrorCodes::InternalError,
612 "NetworkOperation::_response never set");
613
NetworkOperation()614 NetworkInterfaceMock::NetworkOperation::NetworkOperation()
615 : _requestDate(),
616 _nextConsiderationDate(),
617 _responseDate(),
618 _request(),
619 _response(kUnsetResponse),
620 _onFinish() {}
621
NetworkOperation(const CallbackHandle & cbHandle,const RemoteCommandRequest & theRequest,Date_t theRequestDate,const RemoteCommandCompletionFn & onFinish)622 NetworkInterfaceMock::NetworkOperation::NetworkOperation(const CallbackHandle& cbHandle,
623 const RemoteCommandRequest& theRequest,
624 Date_t theRequestDate,
625 const RemoteCommandCompletionFn& onFinish)
626 : _requestDate(theRequestDate),
627 _nextConsiderationDate(theRequestDate),
628 _responseDate(),
629 _cbHandle(cbHandle),
630 _request(theRequest),
631 _response(kUnsetResponse),
632 _onFinish(onFinish) {}
633
~NetworkOperation()634 NetworkInterfaceMock::NetworkOperation::~NetworkOperation() {}
635
getDiagnosticString() const636 std::string NetworkInterfaceMock::NetworkOperation::getDiagnosticString() const {
637 return str::stream() << "NetworkOperation -- request:'" << _request.toString()
638 << "', responseStatus: '" << _response.status.toString()
639 << "', responseBody: '" << (_response.isOK() ? _response.toString() : "")
640 << "', reqDate: " << _requestDate.toString()
641 << ", nextConsiderDate: " << _nextConsiderationDate.toString()
642 << ", respDate: " << _responseDate.toString();
643 }
644
setNextConsiderationDate(Date_t nextConsiderationDate)645 void NetworkInterfaceMock::NetworkOperation::setNextConsiderationDate(
646 Date_t nextConsiderationDate) {
647 invariant(nextConsiderationDate > _nextConsiderationDate);
648 _nextConsiderationDate = nextConsiderationDate;
649 }
650
setResponse(Date_t responseDate,const ResponseStatus & response)651 void NetworkInterfaceMock::NetworkOperation::setResponse(Date_t responseDate,
652 const ResponseStatus& response) {
653 invariant(responseDate >= _requestDate);
654 _responseDate = responseDate;
655 _response = response;
656 }
657
finishResponse()658 void NetworkInterfaceMock::NetworkOperation::finishResponse() {
659 invariant(_onFinish);
660 _onFinish(_response);
661 _onFinish = RemoteCommandCompletionFn();
662 }
663
InNetworkGuard(NetworkInterfaceMock * net)664 NetworkInterfaceMock::InNetworkGuard::InNetworkGuard(NetworkInterfaceMock* net) : _net(net) {
665 _net->enterNetwork();
666 }
667
dismiss()668 void NetworkInterfaceMock::InNetworkGuard::dismiss() {
669 _callExitNetwork = false;
670 _net->exitNetwork();
671 }
672
~InNetworkGuard()673 NetworkInterfaceMock::InNetworkGuard::~InNetworkGuard() {
674 if (_callExitNetwork)
675 _net->exitNetwork();
676 }
677
operator ->() const678 NetworkInterfaceMock* NetworkInterfaceMock::InNetworkGuard::operator->() const {
679 return _net;
680 }
681
NetworkInterfaceMockClockSource(NetworkInterfaceMock * net)682 NetworkInterfaceMockClockSource::NetworkInterfaceMockClockSource(NetworkInterfaceMock* net)
683 : _net(net) {
684 _tracksSystemClock = false;
685 }
686
687 } // namespace executor
688 } // namespace mongo
689