1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/transport/service_entry_point_test_suite.h"
36 
37 #include <boost/optional.hpp>
38 
39 #include "mongo/bson/bsonmisc.h"
40 #include "mongo/bson/bsonobjbuilder.h"
41 #include "mongo/platform/atomic_word.h"
42 #include "mongo/stdx/condition_variable.h"
43 #include "mongo/stdx/functional.h"
44 #include "mongo/stdx/future.h"
45 #include "mongo/stdx/memory.h"
46 #include "mongo/stdx/mutex.h"
47 #include "mongo/stdx/unordered_map.h"
48 #include "mongo/stdx/unordered_set.h"
49 #include "mongo/transport/service_entry_point.h"
50 #include "mongo/transport/session.h"
51 #include "mongo/transport/ticket.h"
52 #include "mongo/transport/ticket_impl.h"
53 #include "mongo/transport/transport_layer.h"
54 #include "mongo/unittest/unittest.h"
55 #include "mongo/util/net/message.h"
56 #include "mongo/util/net/ssl_types.h"
57 
58 namespace mongo {
59 
60 using namespace transport;
61 using namespace stdx::placeholders;
62 
63 using TicketCallback = TransportLayer::TicketCallback;
64 using SEPTestSession = ServiceEntryPointTestSuite::SEPTestSession;
65 
66 namespace {
67 
68 // Helper function to populate a message with { ping : 1 } command
setPingCommand(Message * m)69 void setPingCommand(Message* m) {
70     BufBuilder b{};
71 
72     // Leave room for the message header.
73     b.skip(mongo::MsgData::MsgDataHeaderSize);
74 
75     b.appendStr("admin");
76     b.appendStr("ping");
77 
78     auto commandObj = BSON("ping" << 1);
79     commandObj.appendSelfToBufBuilder(b);
80 
81     auto metadata = BSONObj();
82     metadata.appendSelfToBufBuilder(b);
83 
84     // Set Message header fields.
85     MsgData::View msg = b.buf();
86     msg.setLen(b.len());
87     msg.setOperation(dbCommand);
88 
89     m->reset();
90 
91     // Transfer buffer ownership to the Message.
92     m->setData(b.release());
93 }
94 
95 // Some default method implementations
__anonc7f51c580202(const SessionHandle& session) 96 const auto kDefaultEnd = [](const SessionHandle& session) { return; };
__anonc7f51c580302(SEPTestSession& session) 97 const auto kDefaultDestroyHook = [](SEPTestSession& session) { return; };
__anonc7f51c580402(Ticket, TicketCallback cb) 98 const auto kDefaultAsyncWait = [](Ticket, TicketCallback cb) { cb(Status::OK()); };
__anonc7f51c580502null99 const auto kNoopFunction = [] { return; };
100 
101 // "End connection" error status
102 const auto kEndConnectionStatus = Status(ErrorCodes::HostUnreachable, "connection closed");
103 
104 }  // namespace
105 
MockTLHarness()106 ServiceEntryPointTestSuite::MockTLHarness::MockTLHarness()
107     : _sourceMessage(
108           stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultSource, this, _1, _2, _3)),
109       _sinkMessage(
110           stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultSink, this, _1, _2, _3)),
111       _wait(stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultWait, this, _1)),
112       _asyncWait(kDefaultAsyncWait),
113       _end(kDefaultEnd) {}
114 
sourceMessage(const SessionHandle & session,Message * message,Date_t expiration)115 Ticket ServiceEntryPointTestSuite::MockTLHarness::sourceMessage(const SessionHandle& session,
116                                                                 Message* message,
117                                                                 Date_t expiration) {
118     return _sourceMessage(session, message, expiration);
119 }
120 
sinkMessage(const SessionHandle & session,const Message & message,Date_t expiration)121 Ticket ServiceEntryPointTestSuite::MockTLHarness::sinkMessage(const SessionHandle& session,
122                                                               const Message& message,
123                                                               Date_t expiration) {
124     return _sinkMessage(session, message, expiration);
125 }
126 
wait(Ticket && ticket)127 Status ServiceEntryPointTestSuite::MockTLHarness::wait(Ticket&& ticket) {
128     return _wait(std::move(ticket));
129 }
130 
asyncWait(Ticket && ticket,TicketCallback callback)131 void ServiceEntryPointTestSuite::MockTLHarness::asyncWait(Ticket&& ticket,
132                                                           TicketCallback callback) {
133     return _asyncWait(std::move(ticket), std::move(callback));
134 }
135 
end(const SessionHandle & session)136 void ServiceEntryPointTestSuite::MockTLHarness::end(const SessionHandle& session) {
137     return _end(session);
138 }
139 
setup()140 Status ServiceEntryPointTestSuite::MockTLHarness::setup() {
141     return Status::OK();
142 }
143 
start()144 Status ServiceEntryPointTestSuite::MockTLHarness::start() {
145     return _start();
146 }
147 
shutdown()148 void ServiceEntryPointTestSuite::MockTLHarness::shutdown() {
149     return _shutdown();
150 }
151 
_defaultWait(transport::Ticket ticket)152 Status ServiceEntryPointTestSuite::MockTLHarness::_defaultWait(transport::Ticket ticket) {
153     auto mockTicket = getMockTicket(ticket);
154     if (mockTicket->message()) {
155         setPingCommand(*(mockTicket->message()));
156     }
157     return Status::OK();
158 }
159 
_waitError(transport::Ticket ticket)160 Status ServiceEntryPointTestSuite::MockTLHarness::_waitError(transport::Ticket ticket) {
161     return kEndConnectionStatus;
162 }
163 
_waitOnceThenError(transport::Ticket ticket)164 Status ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError(transport::Ticket ticket) {
165     _wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitError, this, _1);
166     return _defaultWait(std::move(ticket));
167 }
168 
_defaultSource(const SessionHandle & s,Message * m,Date_t d)169 Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSource(const SessionHandle& s,
170                                                                  Message* m,
171                                                                  Date_t d) {
172     return Ticket(this, stdx::make_unique<transport::MockTicket>(s, m, d));
173 }
174 
_defaultSink(const SessionHandle & s,const Message &,Date_t d)175 Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSink(const SessionHandle& s,
176                                                                const Message&,
177                                                                Date_t d) {
178     return Ticket(this, stdx::make_unique<transport::MockTicket>(s, d));
179 }
180 
_sinkThenErrorOnWait(const SessionHandle & s,const Message & m,Date_t d)181 Ticket ServiceEntryPointTestSuite::MockTLHarness::_sinkThenErrorOnWait(const SessionHandle& s,
182                                                                        const Message& m,
183                                                                        Date_t d) {
184     _wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError, this, _1);
185     return _defaultSink(s, m, d);
186 }
187 
_resetHooks()188 void ServiceEntryPointTestSuite::MockTLHarness::_resetHooks() {
189     _sourceMessage =
190         stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultSource, this, _1, _2, _3);
191     _sinkMessage =
192         stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultSink, this, _1, _2, _3);
193     _wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultWait, this, _1);
194     _asyncWait = kDefaultAsyncWait;
195     _end = kDefaultEnd;
196     _destroy_hook = kDefaultDestroyHook;
197 }
198 
getMockTicket(const transport::Ticket & ticket)199 transport::MockTicket* ServiceEntryPointTestSuite::MockTLHarness::getMockTicket(
200     const transport::Ticket& ticket) {
201     return dynamic_cast<transport::MockTicket*>(getTicketImpl(ticket));
202 }
203 
_destroy(SEPTestSession & session)204 void ServiceEntryPointTestSuite::MockTLHarness::_destroy(SEPTestSession& session) {
205     return _destroy_hook(session);
206 }
207 
setUp()208 void ServiceEntryPointTestSuite::setUp() {
209     _tl = stdx::make_unique<MockTLHarness>();
210 }
211 
setServiceEntryPoint(ServiceEntryPointFactory factory)212 void ServiceEntryPointTestSuite::setServiceEntryPoint(ServiceEntryPointFactory factory) {
213     _sep = factory(_tl.get());
214 }
215 
216 // Start a Session and error on get-Message
noLifeCycleTest()217 void ServiceEntryPointTestSuite::noLifeCycleTest() {
218     stdx::promise<void> testComplete;
219     auto testFuture = testComplete.get_future();
220 
221     _tl->_resetHooks();
222 
223     // Step 1: SEP gets a ticket to source a Message
224     // Step 2: SEP calls wait() on the ticket and receives an error
225     _tl->_wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitError, _tl.get(), _1);
226 
227     // Step 3: SEP destroys the session, which calls end()
228     _tl->_destroy_hook = [&testComplete](SEPTestSession&) { testComplete.set_value(); };
229 
230     // Kick off the SEP
231     auto s = SEPTestSession::create(_tl.get());
232     _sep->startSession(std::move(s));
233 
234     testFuture.wait();
235 }
236 
237 // Partial cycle: get-Message, handle-Message, error on send-Message
halfLifeCycleTest()238 void ServiceEntryPointTestSuite::halfLifeCycleTest() {
239     stdx::promise<void> testComplete;
240     auto testFuture = testComplete.get_future();
241 
242     _tl->_resetHooks();
243 
244     // Step 1: SEP gets a ticket to source a Message
245     // Step 2: SEP calls wait() on the ticket and receives a Message
246     // Step 3: SEP gets a ticket to sink a Message
247     _tl->_sinkMessage = [this](const SessionHandle& session, const Message& m, Date_t expiration) {
248 
249         // Step 4: SEP calls wait() on the ticket and receives an error
250         _tl->_wait =
251             stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitError, _tl.get(), _1);
252 
253         return _tl->_defaultSink(session, m, expiration);
254     };
255 
256     // Step 5: SEP destroys the session, which calls _destroy()
257     _tl->_destroy_hook = [&testComplete](SEPTestSession&) { testComplete.set_value(); };
258 
259     // Kick off the SEP
260     auto s = SEPTestSession::create(_tl.get());
261     _sep->startSession(std::move(s));
262 
263     testFuture.wait();
264 }
265 
266 // Perform a full get-Message, handle-Message, send-Message cycle
fullLifeCycleTest()267 void ServiceEntryPointTestSuite::fullLifeCycleTest() {
268     stdx::promise<void> testComplete;
269     auto testFuture = testComplete.get_future();
270 
271     _tl->_resetHooks();
272 
273     // Step 1: SEP gets a ticket to source a Message
274     // Step 2: SEP calls wait() on the ticket and receives a Message
275     _tl->_sinkMessage = stdx::bind(
276         &ServiceEntryPointTestSuite::MockTLHarness::_sinkThenErrorOnWait, _tl.get(), _1, _2, _3);
277 
278     // Step 3: SEP gets a ticket to sink a Message
279     // Step 4: SEP calls wait() on the ticket and receives Status::OK()
280     // Step 5: SEP gets a ticket to source a Message
281     // Step 6: SEP calls wait() on the ticket and receives and error
282     // Step 7: SEP destroys the session, which calls _destroy()
283     _tl->_destroy_hook = [&testComplete](SEPTestSession& session) { testComplete.set_value(); };
284 
285     // Kick off the SEP
286     auto s = SEPTestSession::create(_tl.get());
287     _sep->startSession(std::move(s));
288 
289     testFuture.wait();
290 }
291 
interruptingSessionTest()292 void ServiceEntryPointTestSuite::interruptingSessionTest() {
293     auto sA = SEPTestSession::create(_tl.get());
294     auto sB = SEPTestSession::create(_tl.get());
295     auto idA = sA->id();
296     auto idB = sB->id();
297     int waitCountB = 0;
298 
299     stdx::promise<void> startB;
300     auto startBFuture = startB.get_future();
301 
302     stdx::promise<void> resumeA;
303     auto resumeAFuture = resumeA.get_future();
304 
305     stdx::promise<void> testComplete;
306     auto testFuture = testComplete.get_future();
307 
308     _tl->_resetHooks();
309 
310     // Start Session A
311     // Step 1: SEP calls sourceMessage() for A
312     // Step 2: SEP calls wait() for A and we block...
313     // Start Session B
314     _tl->_wait = [this, idA, &startB, &resumeAFuture, &waitCountB](Ticket t) -> Status {
315         // If we're handling B, just do a default wait
316         if (t.sessionId() != idA) {
317             if (waitCountB < 2) {
318                 ++waitCountB;
319                 return _tl->_defaultWait(std::move(t));
320             } else {
321                 //  If we've done a full round trip, time to end session B
322                 return kEndConnectionStatus;
323             }
324         }
325 
326         // Otherwise, we need to start B and block A
327         startB.set_value();
328         resumeAFuture.wait();
329 
330         _tl->_wait = stdx::bind(
331             &ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError, _tl.get(), _1);
332 
333         return Status::OK();
334     };
335 
336     // Step 3: SEP calls sourceMessage() for B, gets tB
337     // Step 4: SEP calls wait() for tB, gets { ping : 1 }
338     // Step 5: SEP calls sinkMessage() for B, gets tB2
339     // Step 6: SEP calls wait() for tB2, gets Status::OK()
340     // Step 7: SEP calls sourceMessage() for B, gets tB3
341     // Step 8: SEP calls wait() for tB3, gets an error
342     // Step 9: SEP calls end(B)
343     _tl->_destroy_hook = [this, idA, idB, &resumeA, &testComplete](SEPTestSession& session) {
344         // When end(B) is called, time to resume session A
345         if (session.id() == idB) {
346             // Resume session A
347             resumeA.set_value();
348         } else {
349             // Else our test is over when end(A) is called
350             invariant(session.id() == idA);
351             testComplete.set_value();
352         }
353     };
354 
355     // Resume Session A
356     // Step 10: SEP calls sinkMessage() for A, gets tA
357     // Step 11: SEP calls wait() for tA, gets Status::OK()
358     // Step 12: SEP calls sourceMessage() for A, get tA2
359     // Step 13: SEP calls wait() for tA2, receives an error
360     // Step 14: SEP calls end(A)
361 
362     // Kick off the test
363     _sep->startSession(std::move(sA));
364 
365     startBFuture.wait();
366     _sep->startSession(std::move(sB));
367 
368     testFuture.wait();
369 }
370 
burstStressTest(int numSessions,int numCycles,Milliseconds delay)371 void ServiceEntryPointTestSuite::burstStressTest(int numSessions,
372                                                  int numCycles,
373                                                  Milliseconds delay) {
374     AtomicWord<int> ended{0};
375     stdx::promise<void> allSessionsComplete;
376 
377     auto allCompleteFuture = allSessionsComplete.get_future();
378 
379     stdx::mutex cyclesLock;
380     stdx::unordered_map<Session::Id, int> completedCycles;
381 
382     _tl->_resetHooks();
383 
384     // Same wait() callback for all sessions.
385     _tl->_wait = [this, &completedCycles, &cyclesLock, numSessions, numCycles, &delay](
386         Ticket ticket) -> Status {
387         auto id = ticket.sessionId();
388         int cycleCount;
389 
390         {
391             stdx::lock_guard<stdx::mutex> lock(cyclesLock);
392             auto item = completedCycles.find(id);
393             invariant(item != completedCycles.end());
394             cycleCount = item->second;
395         }
396 
397         auto mockTicket = _tl->getMockTicket(ticket);
398         // If we are sourcing:
399         if (mockTicket->message()) {
400             // If we've completed enough cycles, done.
401             if (cycleCount == numCycles) {
402                 return kEndConnectionStatus;
403             }
404 
405             // Otherwise, source another { ping : 1 }
406             invariant(mockTicket->message());
407             setPingCommand(*(mockTicket->message()));
408 
409             // Wait a bit before returning
410             sleepmillis(delay.count());
411 
412             return Status::OK();
413         }
414 
415         // We are sinking, increment numCycles and return OK.
416         {
417             stdx::lock_guard<stdx::mutex> lock(cyclesLock);
418             auto item = completedCycles.find(id);
419             invariant(item != completedCycles.end());
420             ++(item->second);
421         }
422 
423         return Status::OK();
424     };
425 
426     // When we end the last session, end the test.
427     _tl->_destroy_hook = [&allSessionsComplete, numSessions, &ended](SEPTestSession& session) {
428         if (ended.fetchAndAdd(1) == (numSessions - 1)) {
429             allSessionsComplete.set_value();
430         }
431     };
432 
433     for (int i = 0; i < numSessions; i++) {
434         auto s = SEPTestSession::create(_tl.get());
435         {
436             // This operation may cause a re-hash.
437             stdx::lock_guard<stdx::mutex> lock(cyclesLock);
438             completedCycles.emplace(s->id(), 0);
439         }
440         _sep->startSession(std::move(s));
441     }
442 
443     // Block and wait for all sessions to finish.
444     allCompleteFuture.wait();
445 }
446 
longSessionStressTest()447 void ServiceEntryPointTestSuite::longSessionStressTest() {
448     return burstStressTest(1000, 100, Milliseconds(100));
449 }
450 
451 }  // namespace mongo
452