1 2 /** 3 * Copyright (C) 2018-present MongoDB, Inc. 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the Server Side Public License, version 1, 7 * as published by MongoDB, Inc. 8 * 9 * This program is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * Server Side Public License for more details. 13 * 14 * You should have received a copy of the Server Side Public License 15 * along with this program. If not, see 16 * <http://www.mongodb.com/licensing/server-side-public-license>. 17 * 18 * As a special exception, the copyright holders give permission to link the 19 * code of portions of this program with the OpenSSL library under certain 20 * conditions as described in each individual source file and distribute 21 * linked combinations including the program with the OpenSSL library. You 22 * must comply with the Server Side Public License in all respects for 23 * all of the code used other than as permitted herein. If you modify file(s) 24 * with this exception, you may extend this exception to your version of the 25 * file(s), but you are not obligated to do so. If you do not wish to do so, 26 * delete this exception statement from your version. If you delete this 27 * exception statement from all source files in the program, then also delete 28 * it in the license file. 29 */ 30 31 #pragma once 32 33 #include <memory> 34 #include <queue> 35 #include <utility> 36 #include <vector> 37 38 #include "mongo/base/disallow_copying.h" 39 #include "mongo/executor/network_interface.h" 40 #include "mongo/rpc/metadata/metadata_hook.h" 41 #include "mongo/stdx/condition_variable.h" 42 #include "mongo/stdx/list.h" 43 #include "mongo/stdx/mutex.h" 44 #include "mongo/stdx/unordered_map.h" 45 #include "mongo/stdx/unordered_set.h" 46 #include "mongo/util/clock_source.h" 47 #include "mongo/util/time_support.h" 48 49 namespace mongo { 50 51 class BSONObj; 52 53 namespace executor { 54 55 using ResponseStatus = TaskExecutor::ResponseStatus; 56 class NetworkConnectionHook; 57 58 /** 59 * Mock network implementation for use in unit tests. 60 * 61 * To use, construct a new instance on the heap, and keep a pointer to it. Pass 62 * the pointer to the instance into the TaskExecutor constructor, transferring 63 * ownership. Start the executor's run() method in a separate thread, schedule the 64 * work you want to test into the executor, then while the test is still going, iterate 65 * through the ready network requests, servicing them and advancing time as needed. 66 * 67 * The mock has a fully virtualized notion of time and the the network. When the 68 * executor under test schedules a network operation, the startCommand 69 * method of this class adds an entry to the _unscheduled queue for immediate consideration. 70 * The test driver loop, when it examines the request, may schedule a response, ask the 71 * interface to redeliver the request at a later virtual time, or to swallow the virtual 72 * request until the end of the simulation. The test driver loop can also instruct the 73 * interface to run forward through virtual time until there are operations ready to 74 * consider, via runUntil. 75 * 76 * The thread acting as the "network" and the executor run thread are highly synchronized 77 * by this code, allowing for deterministic control of operation interleaving. 78 */ 79 class NetworkInterfaceMock : public NetworkInterface { 80 public: 81 class NetworkOperation; 82 using NetworkOperationList = stdx::list<NetworkOperation>; 83 using NetworkOperationIterator = NetworkOperationList::iterator; 84 85 NetworkInterfaceMock(); 86 virtual ~NetworkInterfaceMock(); 87 virtual void appendConnectionStats(ConnectionPoolStats* stats) const; 88 virtual std::string getDiagnosticString(); 89 90 /** 91 * Logs the contents of the queues for diagnostics. 92 */ 93 virtual void logQueues(); 94 95 //////////////////////////////////////////////////////////////////////////////// 96 // 97 // NetworkInterface methods 98 // 99 //////////////////////////////////////////////////////////////////////////////// 100 101 virtual void startup(); 102 virtual void shutdown(); 103 virtual bool inShutdown() const; 104 virtual void waitForWork(); 105 virtual void waitForWorkUntil(Date_t when); 106 virtual void setConnectionHook(std::unique_ptr<NetworkConnectionHook> hook); 107 virtual void setEgressMetadataHook(std::unique_ptr<rpc::EgressMetadataHook> metadataHook); 108 virtual void signalWorkAvailable(); 109 virtual Date_t now(); 110 virtual std::string getHostName(); 111 virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, 112 RemoteCommandRequest& request, 113 const RemoteCommandCompletionFn& onFinish); 114 115 /** 116 * If the network operation is in the _unscheduled or _processing queues, moves the operation 117 * into the _scheduled queue with ErrorCodes::CallbackCanceled. If the operation is already in 118 * the _scheduled queue, does nothing. The latter simulates the case where cancelCommand() is 119 * called after the task has already completed, but its callback has not yet been run. 120 */ 121 virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle); 122 123 /** 124 * Not implemented. 125 */ 126 virtual Status setAlarm(Date_t when, const stdx::function<void()>& action); 127 128 virtual bool onNetworkThread(); 129 dropConnections(const HostAndPort &)130 void dropConnections(const HostAndPort&) override {} 131 132 133 //////////////////////////////////////////////////////////////////////////////// 134 // 135 // Methods for simulating network operations and the passage of time. 136 // 137 // Methods in this section are to be called by the thread currently simulating 138 // the network. 139 // 140 //////////////////////////////////////////////////////////////////////////////// 141 142 /** 143 * RAII-style class for entering and exiting network. 144 */ 145 class InNetworkGuard; 146 147 /** 148 * Causes the currently running (non-executor) thread to assume the mantle of the network 149 * simulation thread. 150 * 151 * Call this before calling any of the other methods in this section. 152 */ 153 void enterNetwork(); 154 155 /** 156 * Causes the currently running thread to drop the mantle of "network simulation thread". 157 * 158 * Call this before calling any methods that might block waiting for the 159 * executor thread. 160 * 161 * It is safe to call exitNetwork() even if enterNetwork() has not been called - it will just 162 * be a no-op. 163 */ 164 void exitNetwork(); 165 166 /** 167 * Returns true if there are unscheduled network requests to be processed. 168 */ 169 bool hasReadyRequests(); 170 171 /** 172 * Gets the next unscheduled request to process, blocking until one is available. 173 * 174 * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork. 175 */ 176 NetworkOperationIterator getNextReadyRequest(); 177 178 /** 179 * Gets the first unscheduled request. There must be at least one unscheduled request in the 180 * queue. 181 */ 182 NetworkOperationIterator getFrontOfUnscheduledQueue(); 183 184 /** 185 * Schedules "response" in response to "noi" at virtual time "when". 186 */ 187 void scheduleResponse(NetworkOperationIterator noi, 188 Date_t when, 189 const ResponseStatus& response); 190 191 /** 192 * Schedules a successful "response" to "noi" at virtual time "when". 193 * "noi" defaults to next ready request. 194 * "when" defaults to now(). 195 * Returns the "request" that the response was scheduled for. 196 */ 197 RemoteCommandRequest scheduleSuccessfulResponse(const BSONObj& response); 198 RemoteCommandRequest scheduleSuccessfulResponse(const RemoteCommandResponse& response); 199 RemoteCommandRequest scheduleSuccessfulResponse(NetworkOperationIterator noi, 200 const RemoteCommandResponse& response); 201 RemoteCommandRequest scheduleSuccessfulResponse(NetworkOperationIterator noi, 202 Date_t when, 203 const RemoteCommandResponse& response); 204 205 /** 206 * Schedules an error "response" to "noi" at virtual time "when". 207 * "noi" defaults to next ready request. 208 * "when" defaults to now(). 209 */ 210 RemoteCommandRequest scheduleErrorResponse(const Status& response); 211 RemoteCommandRequest scheduleErrorResponse(const ResponseStatus response); 212 RemoteCommandRequest scheduleErrorResponse(NetworkOperationIterator noi, 213 const Status& response); 214 RemoteCommandRequest scheduleErrorResponse(NetworkOperationIterator noi, 215 Date_t when, 216 const Status& response); 217 218 219 /** 220 * Swallows "noi", causing the network interface to not respond to it until 221 * shutdown() is called. 222 */ 223 void blackHole(NetworkOperationIterator noi); 224 225 /** 226 * Defers decision making on "noi" until virtual time "dontAskUntil". Use 227 * this when getNextReadyRequest() returns a request you want to deal with 228 * after looking at other requests. 229 */ 230 void requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil); 231 232 /** 233 * Runs the simulator forward until now() == until or hasReadyRequests() is true. 234 * Returns now(). 235 * 236 * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork. 237 */ 238 Date_t runUntil(Date_t until); 239 240 /** 241 * Processes all ready, scheduled network operations. 242 * 243 * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork. 244 */ 245 void runReadyNetworkOperations(); 246 247 /** 248 * Sets the reply of the 'isMaster' handshake for a specific host. This reply will only 249 * be given to the 'validateHost' method of the ConnectionHook set on this object - NOT 250 * to the completion handlers of any 'isMaster' commands scheduled with 'startCommand'. 251 * 252 * This reply will persist until it is changed again using this method. 253 * 254 * If the NetworkInterfaceMock conducts a handshake with a simulated host which has not 255 * had a handshake reply set, a default constructed RemoteCommandResponse will be passed 256 * to validateHost if a hook is set. 257 */ 258 void setHandshakeReplyForHost(const HostAndPort& host, RemoteCommandResponse&& reply); 259 260 /** 261 * Deliver the response to the callback handle if the handle is present in queuesToCheck. 262 * This represents interrupting the regular flow with, for example, a NetworkTimeout or 263 * CallbackCanceled error. 264 */ 265 void _interruptWithResponse_inlock(const TaskExecutor::CallbackHandle& cbHandle, 266 const std::vector<NetworkOperationList*> queuesToCheck, 267 const ResponseStatus& response); 268 269 private: 270 /** 271 * Information describing a scheduled alarm. 272 */ 273 struct AlarmInfo { 274 using AlarmAction = stdx::function<void()>; AlarmInfoAlarmInfo275 AlarmInfo(Date_t inWhen, AlarmAction inAction) 276 : when(inWhen), action(std::move(inAction)) {} 277 bool operator>(const AlarmInfo& rhs) const { 278 return when > rhs.when; 279 } 280 281 Date_t when; 282 AlarmAction action; 283 }; 284 285 /** 286 * Type used to identify which thread (network mock or executor) is currently executing. 287 * 288 * Values are used in a bitmask, as well. 289 */ 290 enum ThreadType { kNoThread = 0, kExecutorThread = 1, kNetworkThread = 2 }; 291 292 /** 293 * Implementation of startup behavior. 294 */ 295 void _startup_inlock(); 296 297 /** 298 * Returns information about the state of this mock for diagnostic purposes. 299 */ 300 std::string _getDiagnosticString_inlock() const; 301 302 /** 303 * Logs the contents of the queues for diagnostics. 304 */ 305 void _logQueues_inlock() const; 306 /** 307 * Returns the current virtualized time. 308 */ _now_inlock()309 Date_t _now_inlock() const { 310 return _now; 311 } 312 313 /** 314 * Implementation of waitForWork*. 315 */ 316 void _waitForWork_inlock(stdx::unique_lock<stdx::mutex>* lk); 317 318 /** 319 * Returns true if there are ready requests for the network thread to service. 320 */ 321 bool _hasReadyRequests_inlock(); 322 323 /** 324 * Returns true if the network thread could run right now. 325 */ 326 bool _isNetworkThreadRunnable_inlock(); 327 328 /** 329 * Returns true if the executor thread could run right now. 330 */ 331 bool _isExecutorThreadRunnable_inlock(); 332 333 /** 334 * Enqueues a network operation to run in order of 'consideration date'. 335 */ 336 void _enqueueOperation_inlock(NetworkOperation&& op); 337 338 /** 339 * "Connects" to a remote host, and then enqueues the provided operation. 340 */ 341 void _connectThenEnqueueOperation_inlock(const HostAndPort& target, NetworkOperation&& op); 342 343 /** 344 * Runs all ready network operations, called while holding "lk". May drop and 345 * reaquire "lk" several times, but will not return until the executor has blocked 346 * in waitFor*. 347 */ 348 void _runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk); 349 350 // Mutex that synchronizes access to mutable data in this class and its subclasses. 351 // Fields guarded by the mutex are labled (M), below, and those that are read-only 352 // in multi-threaded execution, and so unsynchronized, are labeled (R). 353 stdx::mutex _mutex; 354 355 // Condition signaled to indicate that the network processing thread should wake up. 356 stdx::condition_variable _shouldWakeNetworkCondition; // (M) 357 358 // Condition signaled to indicate that the executor run thread should wake up. 359 stdx::condition_variable _shouldWakeExecutorCondition; // (M) 360 361 // Bitmask indicating which threads are runnable. 362 int _waitingToRunMask; // (M) 363 364 // Indicator of which thread, if any, is currently running. 365 ThreadType _currentlyRunning; // (M) 366 367 // The current time reported by this instance of NetworkInterfaceMock. 368 Date_t _now; // (M) 369 370 // Set to true by "startUp()" 371 bool _hasStarted; // (M) 372 373 // Set to true by "shutDown()". 374 AtomicWord<bool> _inShutdown; // (M) 375 376 // Next date that the executor expects to wake up at (due to a scheduleWorkAt() call). 377 Date_t _executorNextWakeupDate; // (M) 378 379 // List of network operations whose responses haven't been scheduled or blackholed. This is 380 // where network requests are first queued. It is sorted by 381 // NetworkOperation::_nextConsiderationDate, which is set to now() when startCommand() is 382 // called, and adjusted by requeueAt(). 383 NetworkOperationList _unscheduled; // (M) 384 385 // List of network operations that have been returned by getNextReadyRequest() but not 386 // yet scheudled, black-holed or requeued. 387 NetworkOperationList _processing; // (M) 388 389 // List of network operations whose responses have been scheduled but not delivered, sorted 390 // by NetworkOperation::_responseDate. These operations will have their responses delivered 391 // when now() == getResponseDate(). 392 NetworkOperationList _scheduled; // (M) 393 394 // List of network operations that will not be responded to until shutdown() is called. 395 NetworkOperationList _blackHoled; // (M) 396 397 // Heap of alarms, with the next alarm always on top. 398 std::priority_queue<AlarmInfo, std::vector<AlarmInfo>, std::greater<AlarmInfo>> _alarms; // (M) 399 400 // The connection hook. 401 std::unique_ptr<NetworkConnectionHook> _hook; // (R) 402 403 // The metadata hook. 404 std::unique_ptr<rpc::EgressMetadataHook> _metadataHook; // (R) 405 406 // The set of hosts we have seen so far. If we see a new host, we will execute the 407 // ConnectionHook's validation and post-connection logic. 408 // 409 // TODO: provide a way to simulate disconnections. 410 stdx::unordered_set<HostAndPort> _connections; // (M) 411 412 // The handshake replies set for each host. 413 stdx::unordered_map<HostAndPort, RemoteCommandResponse> _handshakeReplies; // (M) 414 }; 415 416 /** 417 * Representation of an in-progress network operation. 418 */ 419 class NetworkInterfaceMock::NetworkOperation { 420 public: 421 NetworkOperation(); 422 NetworkOperation(const TaskExecutor::CallbackHandle& cbHandle, 423 const RemoteCommandRequest& theRequest, 424 Date_t theRequestDate, 425 const RemoteCommandCompletionFn& onFinish); 426 ~NetworkOperation(); 427 428 /** 429 * Adjusts the stored virtual time at which this entry will be subject to consideration 430 * by the test harness. 431 */ 432 void setNextConsiderationDate(Date_t nextConsiderationDate); 433 434 /** 435 * Sets the response and thet virtual time at which it will be delivered. 436 */ 437 void setResponse(Date_t responseDate, const ResponseStatus& response); 438 439 /** 440 * Predicate that returns true if cbHandle equals the executor's handle for this network 441 * operation. Used for searching lists of NetworkOperations. 442 */ isForCallback(const TaskExecutor::CallbackHandle & cbHandle)443 bool isForCallback(const TaskExecutor::CallbackHandle& cbHandle) const { 444 return cbHandle == _cbHandle; 445 } 446 getCallbackHandle()447 const TaskExecutor::CallbackHandle& getCallbackHandle() const { 448 return _cbHandle; 449 } 450 451 /** 452 * Gets the request that initiated this operation. 453 */ getRequest()454 const RemoteCommandRequest& getRequest() const { 455 return _request; 456 } 457 458 /** 459 * Gets the virtual time at which the operation was started. 460 */ getRequestDate()461 Date_t getRequestDate() const { 462 return _requestDate; 463 } 464 465 /** 466 * Gets the virtual time at which the test harness should next consider what to do 467 * with this request. 468 */ getNextConsiderationDate()469 Date_t getNextConsiderationDate() const { 470 return _nextConsiderationDate; 471 } 472 473 /** 474 * After setResponse() has been called, returns the virtual time at which 475 * the response should be delivered. 476 */ getResponseDate()477 Date_t getResponseDate() const { 478 return _responseDate; 479 } 480 481 /** 482 * Delivers the response, by invoking the onFinish callback passed into the constructor. 483 */ 484 void finishResponse(); 485 486 /** 487 * Returns a printable diagnostic string. 488 */ 489 std::string getDiagnosticString() const; 490 491 private: 492 Date_t _requestDate; 493 Date_t _nextConsiderationDate; 494 Date_t _responseDate; 495 TaskExecutor::CallbackHandle _cbHandle; 496 RemoteCommandRequest _request; 497 ResponseStatus _response; 498 RemoteCommandCompletionFn _onFinish; 499 }; 500 501 /** 502 * RAII type to enter and exit network on construction/destruction. 503 * 504 * Calls enterNetwork on construction, and exitNetwork during destruction, 505 * unless dismissed. 506 * 507 * Not thread-safe. 508 */ 509 class NetworkInterfaceMock::InNetworkGuard { 510 MONGO_DISALLOW_COPYING(InNetworkGuard); 511 512 public: 513 /** 514 * Calls enterNetwork. 515 */ 516 explicit InNetworkGuard(NetworkInterfaceMock* net); 517 /** 518 * Calls exitNetwork, and disables the destructor from calling. 519 */ 520 void dismiss(); 521 /** 522 * Calls exitNetwork, unless dismiss has been called. 523 */ 524 ~InNetworkGuard(); 525 526 /** 527 * Returns network interface mock pointer. 528 */ 529 NetworkInterfaceMock* operator->() const; 530 531 private: 532 NetworkInterfaceMock* _net; 533 bool _callExitNetwork = true; 534 }; 535 536 class NetworkInterfaceMockClockSource : public ClockSource { 537 public: 538 explicit NetworkInterfaceMockClockSource(NetworkInterfaceMock* net); 539 getPrecision()540 Milliseconds getPrecision() override { 541 return Milliseconds{1}; 542 } now()543 Date_t now() override { 544 return _net->now(); 545 } setAlarm(Date_t when,stdx::function<void ()> action)546 Status setAlarm(Date_t when, stdx::function<void()> action) override { 547 return _net->setAlarm(when, action); 548 } 549 550 private: 551 NetworkInterfaceMock* _net; 552 }; 553 554 } // namespace executor 555 } // namespace mongo 556