1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #pragma once
32 
33 #include <memory>
34 #include <queue>
35 #include <utility>
36 #include <vector>
37 
38 #include "mongo/base/disallow_copying.h"
39 #include "mongo/executor/network_interface.h"
40 #include "mongo/rpc/metadata/metadata_hook.h"
41 #include "mongo/stdx/condition_variable.h"
42 #include "mongo/stdx/list.h"
43 #include "mongo/stdx/mutex.h"
44 #include "mongo/stdx/unordered_map.h"
45 #include "mongo/stdx/unordered_set.h"
46 #include "mongo/util/clock_source.h"
47 #include "mongo/util/time_support.h"
48 
49 namespace mongo {
50 
51 class BSONObj;
52 
53 namespace executor {
54 
55 using ResponseStatus = TaskExecutor::ResponseStatus;
56 class NetworkConnectionHook;
57 
58 /**
59  * Mock network implementation for use in unit tests.
60  *
61  * To use, construct a new instance on the heap, and keep a pointer to it.  Pass
62  * the pointer to the instance into the TaskExecutor constructor, transferring
63  * ownership.  Start the executor's run() method in a separate thread, schedule the
64  * work you want to test into the executor, then while the test is still going, iterate
65  * through the ready network requests, servicing them and advancing time as needed.
66  *
67  * The mock has a fully virtualized notion of time and the the network.  When the
68  * executor under test schedules a network operation, the startCommand
69  * method of this class adds an entry to the _unscheduled queue for immediate consideration.
70  * The test driver loop, when it examines the request, may schedule a response, ask the
71  * interface to redeliver the request at a later virtual time, or to swallow the virtual
72  * request until the end of the simulation.  The test driver loop can also instruct the
73  * interface to run forward through virtual time until there are operations ready to
74  * consider, via runUntil.
75  *
76  * The thread acting as the "network" and the executor run thread are highly synchronized
77  * by this code, allowing for deterministic control of operation interleaving.
78  */
79 class NetworkInterfaceMock : public NetworkInterface {
80 public:
81     class NetworkOperation;
82     using NetworkOperationList = stdx::list<NetworkOperation>;
83     using NetworkOperationIterator = NetworkOperationList::iterator;
84 
85     NetworkInterfaceMock();
86     virtual ~NetworkInterfaceMock();
87     virtual void appendConnectionStats(ConnectionPoolStats* stats) const;
88     virtual std::string getDiagnosticString();
89 
90     /**
91      * Logs the contents of the queues for diagnostics.
92      */
93     virtual void logQueues();
94 
95     ////////////////////////////////////////////////////////////////////////////////
96     //
97     // NetworkInterface methods
98     //
99     ////////////////////////////////////////////////////////////////////////////////
100 
101     virtual void startup();
102     virtual void shutdown();
103     virtual bool inShutdown() const;
104     virtual void waitForWork();
105     virtual void waitForWorkUntil(Date_t when);
106     virtual void setConnectionHook(std::unique_ptr<NetworkConnectionHook> hook);
107     virtual void setEgressMetadataHook(std::unique_ptr<rpc::EgressMetadataHook> metadataHook);
108     virtual void signalWorkAvailable();
109     virtual Date_t now();
110     virtual std::string getHostName();
111     virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
112                                 RemoteCommandRequest& request,
113                                 const RemoteCommandCompletionFn& onFinish);
114 
115     /**
116      * If the network operation is in the _unscheduled or _processing queues, moves the operation
117      * into the _scheduled queue with ErrorCodes::CallbackCanceled. If the operation is already in
118      * the _scheduled queue, does nothing. The latter simulates the case where cancelCommand() is
119      * called after the task has already completed, but its callback has not yet been run.
120      */
121     virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle);
122 
123     /**
124      * Not implemented.
125      */
126     virtual Status setAlarm(Date_t when, const stdx::function<void()>& action);
127 
128     virtual bool onNetworkThread();
129 
dropConnections(const HostAndPort &)130     void dropConnections(const HostAndPort&) override {}
131 
132 
133     ////////////////////////////////////////////////////////////////////////////////
134     //
135     // Methods for simulating network operations and the passage of time.
136     //
137     // Methods in this section are to be called by the thread currently simulating
138     // the network.
139     //
140     ////////////////////////////////////////////////////////////////////////////////
141 
142     /**
143      * RAII-style class for entering and exiting network.
144      */
145     class InNetworkGuard;
146 
147     /**
148      * Causes the currently running (non-executor) thread to assume the mantle of the network
149      * simulation thread.
150      *
151      * Call this before calling any of the other methods in this section.
152      */
153     void enterNetwork();
154 
155     /**
156      * Causes the currently running thread to drop the mantle of "network simulation thread".
157      *
158      * Call this before calling any methods that might block waiting for the
159      * executor thread.
160      *
161      * It is safe to call exitNetwork() even if enterNetwork() has not been called - it will just
162      * be a no-op.
163      */
164     void exitNetwork();
165 
166     /**
167      * Returns true if there are unscheduled network requests to be processed.
168      */
169     bool hasReadyRequests();
170 
171     /**
172      * Gets the next unscheduled request to process, blocking until one is available.
173      *
174      * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork.
175      */
176     NetworkOperationIterator getNextReadyRequest();
177 
178     /**
179      * Gets the first unscheduled request. There must be at least one unscheduled request in the
180      * queue.
181      */
182     NetworkOperationIterator getFrontOfUnscheduledQueue();
183 
184     /**
185      * Schedules "response" in response to "noi" at virtual time "when".
186      */
187     void scheduleResponse(NetworkOperationIterator noi,
188                           Date_t when,
189                           const ResponseStatus& response);
190 
191     /**
192      * Schedules a successful "response" to "noi" at virtual time "when".
193      * "noi" defaults to next ready request.
194      * "when" defaults to now().
195      * Returns the "request" that the response was scheduled for.
196      */
197     RemoteCommandRequest scheduleSuccessfulResponse(const BSONObj& response);
198     RemoteCommandRequest scheduleSuccessfulResponse(const RemoteCommandResponse& response);
199     RemoteCommandRequest scheduleSuccessfulResponse(NetworkOperationIterator noi,
200                                                     const RemoteCommandResponse& response);
201     RemoteCommandRequest scheduleSuccessfulResponse(NetworkOperationIterator noi,
202                                                     Date_t when,
203                                                     const RemoteCommandResponse& response);
204 
205     /**
206      * Schedules an error "response" to "noi" at virtual time "when".
207      * "noi" defaults to next ready request.
208      * "when" defaults to now().
209      */
210     RemoteCommandRequest scheduleErrorResponse(const Status& response);
211     RemoteCommandRequest scheduleErrorResponse(const ResponseStatus response);
212     RemoteCommandRequest scheduleErrorResponse(NetworkOperationIterator noi,
213                                                const Status& response);
214     RemoteCommandRequest scheduleErrorResponse(NetworkOperationIterator noi,
215                                                Date_t when,
216                                                const Status& response);
217 
218 
219     /**
220      * Swallows "noi", causing the network interface to not respond to it until
221      * shutdown() is called.
222      */
223     void blackHole(NetworkOperationIterator noi);
224 
225     /**
226      * Defers decision making on "noi" until virtual time "dontAskUntil".  Use
227      * this when getNextReadyRequest() returns a request you want to deal with
228      * after looking at other requests.
229      */
230     void requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil);
231 
232     /**
233      * Runs the simulator forward until now() == until or hasReadyRequests() is true.
234      * Returns now().
235      *
236      * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork.
237      */
238     Date_t runUntil(Date_t until);
239 
240     /**
241      * Processes all ready, scheduled network operations.
242      *
243      * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork.
244      */
245     void runReadyNetworkOperations();
246 
247     /**
248      * Sets the reply of the 'isMaster' handshake for a specific host. This reply will only
249      * be given to the 'validateHost' method of the ConnectionHook set on this object - NOT
250      * to the completion handlers of any 'isMaster' commands scheduled with 'startCommand'.
251      *
252      * This reply will persist until it is changed again using this method.
253      *
254      * If the NetworkInterfaceMock conducts a handshake with a simulated host which has not
255      * had a handshake reply set, a default constructed RemoteCommandResponse will be passed
256      * to validateHost if a hook is set.
257      */
258     void setHandshakeReplyForHost(const HostAndPort& host, RemoteCommandResponse&& reply);
259 
260     /**
261      * Deliver the response to the callback handle if the handle is present in queuesToCheck.
262      * This represents interrupting the regular flow with, for example, a NetworkTimeout or
263      * CallbackCanceled error.
264      */
265     void _interruptWithResponse_inlock(const TaskExecutor::CallbackHandle& cbHandle,
266                                        const std::vector<NetworkOperationList*> queuesToCheck,
267                                        const ResponseStatus& response);
268 
269 private:
270     /**
271      * Information describing a scheduled alarm.
272      */
273     struct AlarmInfo {
274         using AlarmAction = stdx::function<void()>;
AlarmInfoAlarmInfo275         AlarmInfo(Date_t inWhen, AlarmAction inAction)
276             : when(inWhen), action(std::move(inAction)) {}
277         bool operator>(const AlarmInfo& rhs) const {
278             return when > rhs.when;
279         }
280 
281         Date_t when;
282         AlarmAction action;
283     };
284 
285     /**
286      * Type used to identify which thread (network mock or executor) is currently executing.
287      *
288      * Values are used in a bitmask, as well.
289      */
290     enum ThreadType { kNoThread = 0, kExecutorThread = 1, kNetworkThread = 2 };
291 
292     /**
293      * Implementation of startup behavior.
294      */
295     void _startup_inlock();
296 
297     /**
298      * Returns information about the state of this mock for diagnostic purposes.
299      */
300     std::string _getDiagnosticString_inlock() const;
301 
302     /**
303      * Logs the contents of the queues for diagnostics.
304      */
305     void _logQueues_inlock() const;
306     /**
307      * Returns the current virtualized time.
308      */
_now_inlock()309     Date_t _now_inlock() const {
310         return _now;
311     }
312 
313     /**
314      * Implementation of waitForWork*.
315      */
316     void _waitForWork_inlock(stdx::unique_lock<stdx::mutex>* lk);
317 
318     /**
319      * Returns true if there are ready requests for the network thread to service.
320      */
321     bool _hasReadyRequests_inlock();
322 
323     /**
324      * Returns true if the network thread could run right now.
325      */
326     bool _isNetworkThreadRunnable_inlock();
327 
328     /**
329      * Returns true if the executor thread could run right now.
330      */
331     bool _isExecutorThreadRunnable_inlock();
332 
333     /**
334      * Enqueues a network operation to run in order of 'consideration date'.
335      */
336     void _enqueueOperation_inlock(NetworkOperation&& op);
337 
338     /**
339      * "Connects" to a remote host, and then enqueues the provided operation.
340      */
341     void _connectThenEnqueueOperation_inlock(const HostAndPort& target, NetworkOperation&& op);
342 
343     /**
344      * Runs all ready network operations, called while holding "lk".  May drop and
345      * reaquire "lk" several times, but will not return until the executor has blocked
346      * in waitFor*.
347      */
348     void _runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk);
349 
350     // Mutex that synchronizes access to mutable data in this class and its subclasses.
351     // Fields guarded by the mutex are labled (M), below, and those that are read-only
352     // in multi-threaded execution, and so unsynchronized, are labeled (R).
353     stdx::mutex _mutex;
354 
355     // Condition signaled to indicate that the network processing thread should wake up.
356     stdx::condition_variable _shouldWakeNetworkCondition;  // (M)
357 
358     // Condition signaled to indicate that the executor run thread should wake up.
359     stdx::condition_variable _shouldWakeExecutorCondition;  // (M)
360 
361     // Bitmask indicating which threads are runnable.
362     int _waitingToRunMask;  // (M)
363 
364     // Indicator of which thread, if any, is currently running.
365     ThreadType _currentlyRunning;  // (M)
366 
367     // The current time reported by this instance of NetworkInterfaceMock.
368     Date_t _now;  // (M)
369 
370     // Set to true by "startUp()"
371     bool _hasStarted;  // (M)
372 
373     // Set to true by "shutDown()".
374     AtomicWord<bool> _inShutdown;  // (M)
375 
376     // Next date that the executor expects to wake up at (due to a scheduleWorkAt() call).
377     Date_t _executorNextWakeupDate;  // (M)
378 
379     // List of network operations whose responses haven't been scheduled or blackholed.  This is
380     // where network requests are first queued.  It is sorted by
381     // NetworkOperation::_nextConsiderationDate, which is set to now() when startCommand() is
382     // called, and adjusted by requeueAt().
383     NetworkOperationList _unscheduled;  // (M)
384 
385     // List of network operations that have been returned by getNextReadyRequest() but not
386     // yet scheudled, black-holed or requeued.
387     NetworkOperationList _processing;  // (M)
388 
389     // List of network operations whose responses have been scheduled but not delivered, sorted
390     // by NetworkOperation::_responseDate.  These operations will have their responses delivered
391     // when now() == getResponseDate().
392     NetworkOperationList _scheduled;  // (M)
393 
394     // List of network operations that will not be responded to until shutdown() is called.
395     NetworkOperationList _blackHoled;  // (M)
396 
397     // Heap of alarms, with the next alarm always on top.
398     std::priority_queue<AlarmInfo, std::vector<AlarmInfo>, std::greater<AlarmInfo>> _alarms;  // (M)
399 
400     // The connection hook.
401     std::unique_ptr<NetworkConnectionHook> _hook;  // (R)
402 
403     // The metadata hook.
404     std::unique_ptr<rpc::EgressMetadataHook> _metadataHook;  // (R)
405 
406     // The set of hosts we have seen so far. If we see a new host, we will execute the
407     // ConnectionHook's validation and post-connection logic.
408     //
409     // TODO: provide a way to simulate disconnections.
410     stdx::unordered_set<HostAndPort> _connections;  // (M)
411 
412     // The handshake replies set for each host.
413     stdx::unordered_map<HostAndPort, RemoteCommandResponse> _handshakeReplies;  // (M)
414 };
415 
416 /**
417  * Representation of an in-progress network operation.
418  */
419 class NetworkInterfaceMock::NetworkOperation {
420 public:
421     NetworkOperation();
422     NetworkOperation(const TaskExecutor::CallbackHandle& cbHandle,
423                      const RemoteCommandRequest& theRequest,
424                      Date_t theRequestDate,
425                      const RemoteCommandCompletionFn& onFinish);
426     ~NetworkOperation();
427 
428     /**
429      * Adjusts the stored virtual time at which this entry will be subject to consideration
430      * by the test harness.
431      */
432     void setNextConsiderationDate(Date_t nextConsiderationDate);
433 
434     /**
435      * Sets the response and thet virtual time at which it will be delivered.
436      */
437     void setResponse(Date_t responseDate, const ResponseStatus& response);
438 
439     /**
440      * Predicate that returns true if cbHandle equals the executor's handle for this network
441      * operation.  Used for searching lists of NetworkOperations.
442      */
isForCallback(const TaskExecutor::CallbackHandle & cbHandle)443     bool isForCallback(const TaskExecutor::CallbackHandle& cbHandle) const {
444         return cbHandle == _cbHandle;
445     }
446 
getCallbackHandle()447     const TaskExecutor::CallbackHandle& getCallbackHandle() const {
448         return _cbHandle;
449     }
450 
451     /**
452      * Gets the request that initiated this operation.
453      */
getRequest()454     const RemoteCommandRequest& getRequest() const {
455         return _request;
456     }
457 
458     /**
459      * Gets the virtual time at which the operation was started.
460      */
getRequestDate()461     Date_t getRequestDate() const {
462         return _requestDate;
463     }
464 
465     /**
466      * Gets the virtual time at which the test harness should next consider what to do
467      * with this request.
468      */
getNextConsiderationDate()469     Date_t getNextConsiderationDate() const {
470         return _nextConsiderationDate;
471     }
472 
473     /**
474      * After setResponse() has been called, returns the virtual time at which
475      * the response should be delivered.
476      */
getResponseDate()477     Date_t getResponseDate() const {
478         return _responseDate;
479     }
480 
481     /**
482      * Delivers the response, by invoking the onFinish callback passed into the constructor.
483      */
484     void finishResponse();
485 
486     /**
487      * Returns a printable diagnostic string.
488      */
489     std::string getDiagnosticString() const;
490 
491 private:
492     Date_t _requestDate;
493     Date_t _nextConsiderationDate;
494     Date_t _responseDate;
495     TaskExecutor::CallbackHandle _cbHandle;
496     RemoteCommandRequest _request;
497     ResponseStatus _response;
498     RemoteCommandCompletionFn _onFinish;
499 };
500 
501 /**
502  * RAII type to enter and exit network on construction/destruction.
503  *
504  * Calls enterNetwork on construction, and exitNetwork during destruction,
505  * unless dismissed.
506  *
507  * Not thread-safe.
508  */
509 class NetworkInterfaceMock::InNetworkGuard {
510     MONGO_DISALLOW_COPYING(InNetworkGuard);
511 
512 public:
513     /**
514      * Calls enterNetwork.
515      */
516     explicit InNetworkGuard(NetworkInterfaceMock* net);
517     /**
518      * Calls exitNetwork, and disables the destructor from calling.
519      */
520     void dismiss();
521     /**
522      * Calls exitNetwork, unless dismiss has been called.
523      */
524     ~InNetworkGuard();
525 
526     /**
527      * Returns network interface mock pointer.
528      */
529     NetworkInterfaceMock* operator->() const;
530 
531 private:
532     NetworkInterfaceMock* _net;
533     bool _callExitNetwork = true;
534 };
535 
536 class NetworkInterfaceMockClockSource : public ClockSource {
537 public:
538     explicit NetworkInterfaceMockClockSource(NetworkInterfaceMock* net);
539 
getPrecision()540     Milliseconds getPrecision() override {
541         return Milliseconds{1};
542     }
now()543     Date_t now() override {
544         return _net->now();
545     }
setAlarm(Date_t when,stdx::function<void ()> action)546     Status setAlarm(Date_t when, stdx::function<void()> action) override {
547         return _net->setAlarm(when, action);
548     }
549 
550 private:
551     NetworkInterfaceMock* _net;
552 };
553 
554 }  // namespace executor
555 }  // namespace mongo
556