1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/transport/service_entry_point_test_suite.h"
36
37 #include <boost/optional.hpp>
38
39 #include "mongo/bson/bsonmisc.h"
40 #include "mongo/bson/bsonobjbuilder.h"
41 #include "mongo/platform/atomic_word.h"
42 #include "mongo/stdx/condition_variable.h"
43 #include "mongo/stdx/functional.h"
44 #include "mongo/stdx/future.h"
45 #include "mongo/stdx/memory.h"
46 #include "mongo/stdx/mutex.h"
47 #include "mongo/stdx/unordered_map.h"
48 #include "mongo/stdx/unordered_set.h"
49 #include "mongo/transport/service_entry_point.h"
50 #include "mongo/transport/session.h"
51 #include "mongo/transport/ticket.h"
52 #include "mongo/transport/ticket_impl.h"
53 #include "mongo/transport/transport_layer.h"
54 #include "mongo/unittest/unittest.h"
55 #include "mongo/util/net/message.h"
56 #include "mongo/util/net/ssl_types.h"
57
58 namespace mongo {
59
60 using namespace transport;
61 using namespace stdx::placeholders;
62
63 using TicketCallback = TransportLayer::TicketCallback;
64 using SEPTestSession = ServiceEntryPointTestSuite::SEPTestSession;
65
66 namespace {
67
68 // Helper function to populate a message with { ping : 1 } command
setPingCommand(Message * m)69 void setPingCommand(Message* m) {
70 BufBuilder b{};
71
72 // Leave room for the message header.
73 b.skip(mongo::MsgData::MsgDataHeaderSize);
74
75 b.appendStr("admin");
76 b.appendStr("ping");
77
78 auto commandObj = BSON("ping" << 1);
79 commandObj.appendSelfToBufBuilder(b);
80
81 auto metadata = BSONObj();
82 metadata.appendSelfToBufBuilder(b);
83
84 // Set Message header fields.
85 MsgData::View msg = b.buf();
86 msg.setLen(b.len());
87 msg.setOperation(dbCommand);
88
89 m->reset();
90
91 // Transfer buffer ownership to the Message.
92 m->setData(b.release());
93 }
94
95 // Some default method implementations
__anonc7f51c580202(const SessionHandle& session) 96 const auto kDefaultEnd = [](const SessionHandle& session) { return; };
__anonc7f51c580302(SEPTestSession& session) 97 const auto kDefaultDestroyHook = [](SEPTestSession& session) { return; };
__anonc7f51c580402(Ticket, TicketCallback cb) 98 const auto kDefaultAsyncWait = [](Ticket, TicketCallback cb) { cb(Status::OK()); };
__anonc7f51c580502null99 const auto kNoopFunction = [] { return; };
100
101 // "End connection" error status
102 const auto kEndConnectionStatus = Status(ErrorCodes::HostUnreachable, "connection closed");
103
104 } // namespace
105
MockTLHarness()106 ServiceEntryPointTestSuite::MockTLHarness::MockTLHarness()
107 : _sourceMessage(
108 stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultSource, this, _1, _2, _3)),
109 _sinkMessage(
110 stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultSink, this, _1, _2, _3)),
111 _wait(stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultWait, this, _1)),
112 _asyncWait(kDefaultAsyncWait),
113 _end(kDefaultEnd) {}
114
sourceMessage(const SessionHandle & session,Message * message,Date_t expiration)115 Ticket ServiceEntryPointTestSuite::MockTLHarness::sourceMessage(const SessionHandle& session,
116 Message* message,
117 Date_t expiration) {
118 return _sourceMessage(session, message, expiration);
119 }
120
sinkMessage(const SessionHandle & session,const Message & message,Date_t expiration)121 Ticket ServiceEntryPointTestSuite::MockTLHarness::sinkMessage(const SessionHandle& session,
122 const Message& message,
123 Date_t expiration) {
124 return _sinkMessage(session, message, expiration);
125 }
126
wait(Ticket && ticket)127 Status ServiceEntryPointTestSuite::MockTLHarness::wait(Ticket&& ticket) {
128 return _wait(std::move(ticket));
129 }
130
asyncWait(Ticket && ticket,TicketCallback callback)131 void ServiceEntryPointTestSuite::MockTLHarness::asyncWait(Ticket&& ticket,
132 TicketCallback callback) {
133 return _asyncWait(std::move(ticket), std::move(callback));
134 }
135
end(const SessionHandle & session)136 void ServiceEntryPointTestSuite::MockTLHarness::end(const SessionHandle& session) {
137 return _end(session);
138 }
139
setup()140 Status ServiceEntryPointTestSuite::MockTLHarness::setup() {
141 return Status::OK();
142 }
143
start()144 Status ServiceEntryPointTestSuite::MockTLHarness::start() {
145 return _start();
146 }
147
shutdown()148 void ServiceEntryPointTestSuite::MockTLHarness::shutdown() {
149 return _shutdown();
150 }
151
_defaultWait(transport::Ticket ticket)152 Status ServiceEntryPointTestSuite::MockTLHarness::_defaultWait(transport::Ticket ticket) {
153 auto mockTicket = getMockTicket(ticket);
154 if (mockTicket->message()) {
155 setPingCommand(*(mockTicket->message()));
156 }
157 return Status::OK();
158 }
159
_waitError(transport::Ticket ticket)160 Status ServiceEntryPointTestSuite::MockTLHarness::_waitError(transport::Ticket ticket) {
161 return kEndConnectionStatus;
162 }
163
_waitOnceThenError(transport::Ticket ticket)164 Status ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError(transport::Ticket ticket) {
165 _wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitError, this, _1);
166 return _defaultWait(std::move(ticket));
167 }
168
_defaultSource(const SessionHandle & s,Message * m,Date_t d)169 Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSource(const SessionHandle& s,
170 Message* m,
171 Date_t d) {
172 return Ticket(this, stdx::make_unique<transport::MockTicket>(s, m, d));
173 }
174
_defaultSink(const SessionHandle & s,const Message &,Date_t d)175 Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSink(const SessionHandle& s,
176 const Message&,
177 Date_t d) {
178 return Ticket(this, stdx::make_unique<transport::MockTicket>(s, d));
179 }
180
_sinkThenErrorOnWait(const SessionHandle & s,const Message & m,Date_t d)181 Ticket ServiceEntryPointTestSuite::MockTLHarness::_sinkThenErrorOnWait(const SessionHandle& s,
182 const Message& m,
183 Date_t d) {
184 _wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError, this, _1);
185 return _defaultSink(s, m, d);
186 }
187
_resetHooks()188 void ServiceEntryPointTestSuite::MockTLHarness::_resetHooks() {
189 _sourceMessage =
190 stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultSource, this, _1, _2, _3);
191 _sinkMessage =
192 stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultSink, this, _1, _2, _3);
193 _wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultWait, this, _1);
194 _asyncWait = kDefaultAsyncWait;
195 _end = kDefaultEnd;
196 _destroy_hook = kDefaultDestroyHook;
197 }
198
getMockTicket(const transport::Ticket & ticket)199 transport::MockTicket* ServiceEntryPointTestSuite::MockTLHarness::getMockTicket(
200 const transport::Ticket& ticket) {
201 return dynamic_cast<transport::MockTicket*>(getTicketImpl(ticket));
202 }
203
_destroy(SEPTestSession & session)204 void ServiceEntryPointTestSuite::MockTLHarness::_destroy(SEPTestSession& session) {
205 return _destroy_hook(session);
206 }
207
setUp()208 void ServiceEntryPointTestSuite::setUp() {
209 _tl = stdx::make_unique<MockTLHarness>();
210 }
211
setServiceEntryPoint(ServiceEntryPointFactory factory)212 void ServiceEntryPointTestSuite::setServiceEntryPoint(ServiceEntryPointFactory factory) {
213 _sep = factory(_tl.get());
214 }
215
216 // Start a Session and error on get-Message
noLifeCycleTest()217 void ServiceEntryPointTestSuite::noLifeCycleTest() {
218 stdx::promise<void> testComplete;
219 auto testFuture = testComplete.get_future();
220
221 _tl->_resetHooks();
222
223 // Step 1: SEP gets a ticket to source a Message
224 // Step 2: SEP calls wait() on the ticket and receives an error
225 _tl->_wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitError, _tl.get(), _1);
226
227 // Step 3: SEP destroys the session, which calls end()
228 _tl->_destroy_hook = [&testComplete](SEPTestSession&) { testComplete.set_value(); };
229
230 // Kick off the SEP
231 auto s = SEPTestSession::create(_tl.get());
232 _sep->startSession(std::move(s));
233
234 testFuture.wait();
235 }
236
237 // Partial cycle: get-Message, handle-Message, error on send-Message
halfLifeCycleTest()238 void ServiceEntryPointTestSuite::halfLifeCycleTest() {
239 stdx::promise<void> testComplete;
240 auto testFuture = testComplete.get_future();
241
242 _tl->_resetHooks();
243
244 // Step 1: SEP gets a ticket to source a Message
245 // Step 2: SEP calls wait() on the ticket and receives a Message
246 // Step 3: SEP gets a ticket to sink a Message
247 _tl->_sinkMessage = [this](const SessionHandle& session, const Message& m, Date_t expiration) {
248
249 // Step 4: SEP calls wait() on the ticket and receives an error
250 _tl->_wait =
251 stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitError, _tl.get(), _1);
252
253 return _tl->_defaultSink(session, m, expiration);
254 };
255
256 // Step 5: SEP destroys the session, which calls _destroy()
257 _tl->_destroy_hook = [&testComplete](SEPTestSession&) { testComplete.set_value(); };
258
259 // Kick off the SEP
260 auto s = SEPTestSession::create(_tl.get());
261 _sep->startSession(std::move(s));
262
263 testFuture.wait();
264 }
265
266 // Perform a full get-Message, handle-Message, send-Message cycle
fullLifeCycleTest()267 void ServiceEntryPointTestSuite::fullLifeCycleTest() {
268 stdx::promise<void> testComplete;
269 auto testFuture = testComplete.get_future();
270
271 _tl->_resetHooks();
272
273 // Step 1: SEP gets a ticket to source a Message
274 // Step 2: SEP calls wait() on the ticket and receives a Message
275 _tl->_sinkMessage = stdx::bind(
276 &ServiceEntryPointTestSuite::MockTLHarness::_sinkThenErrorOnWait, _tl.get(), _1, _2, _3);
277
278 // Step 3: SEP gets a ticket to sink a Message
279 // Step 4: SEP calls wait() on the ticket and receives Status::OK()
280 // Step 5: SEP gets a ticket to source a Message
281 // Step 6: SEP calls wait() on the ticket and receives and error
282 // Step 7: SEP destroys the session, which calls _destroy()
283 _tl->_destroy_hook = [&testComplete](SEPTestSession& session) { testComplete.set_value(); };
284
285 // Kick off the SEP
286 auto s = SEPTestSession::create(_tl.get());
287 _sep->startSession(std::move(s));
288
289 testFuture.wait();
290 }
291
interruptingSessionTest()292 void ServiceEntryPointTestSuite::interruptingSessionTest() {
293 auto sA = SEPTestSession::create(_tl.get());
294 auto sB = SEPTestSession::create(_tl.get());
295 auto idA = sA->id();
296 auto idB = sB->id();
297 int waitCountB = 0;
298
299 stdx::promise<void> startB;
300 auto startBFuture = startB.get_future();
301
302 stdx::promise<void> resumeA;
303 auto resumeAFuture = resumeA.get_future();
304
305 stdx::promise<void> testComplete;
306 auto testFuture = testComplete.get_future();
307
308 _tl->_resetHooks();
309
310 // Start Session A
311 // Step 1: SEP calls sourceMessage() for A
312 // Step 2: SEP calls wait() for A and we block...
313 // Start Session B
314 _tl->_wait = [this, idA, &startB, &resumeAFuture, &waitCountB](Ticket t) -> Status {
315 // If we're handling B, just do a default wait
316 if (t.sessionId() != idA) {
317 if (waitCountB < 2) {
318 ++waitCountB;
319 return _tl->_defaultWait(std::move(t));
320 } else {
321 // If we've done a full round trip, time to end session B
322 return kEndConnectionStatus;
323 }
324 }
325
326 // Otherwise, we need to start B and block A
327 startB.set_value();
328 resumeAFuture.wait();
329
330 _tl->_wait = stdx::bind(
331 &ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError, _tl.get(), _1);
332
333 return Status::OK();
334 };
335
336 // Step 3: SEP calls sourceMessage() for B, gets tB
337 // Step 4: SEP calls wait() for tB, gets { ping : 1 }
338 // Step 5: SEP calls sinkMessage() for B, gets tB2
339 // Step 6: SEP calls wait() for tB2, gets Status::OK()
340 // Step 7: SEP calls sourceMessage() for B, gets tB3
341 // Step 8: SEP calls wait() for tB3, gets an error
342 // Step 9: SEP calls end(B)
343 _tl->_destroy_hook = [this, idA, idB, &resumeA, &testComplete](SEPTestSession& session) {
344 // When end(B) is called, time to resume session A
345 if (session.id() == idB) {
346 // Resume session A
347 resumeA.set_value();
348 } else {
349 // Else our test is over when end(A) is called
350 invariant(session.id() == idA);
351 testComplete.set_value();
352 }
353 };
354
355 // Resume Session A
356 // Step 10: SEP calls sinkMessage() for A, gets tA
357 // Step 11: SEP calls wait() for tA, gets Status::OK()
358 // Step 12: SEP calls sourceMessage() for A, get tA2
359 // Step 13: SEP calls wait() for tA2, receives an error
360 // Step 14: SEP calls end(A)
361
362 // Kick off the test
363 _sep->startSession(std::move(sA));
364
365 startBFuture.wait();
366 _sep->startSession(std::move(sB));
367
368 testFuture.wait();
369 }
370
burstStressTest(int numSessions,int numCycles,Milliseconds delay)371 void ServiceEntryPointTestSuite::burstStressTest(int numSessions,
372 int numCycles,
373 Milliseconds delay) {
374 AtomicWord<int> ended{0};
375 stdx::promise<void> allSessionsComplete;
376
377 auto allCompleteFuture = allSessionsComplete.get_future();
378
379 stdx::mutex cyclesLock;
380 stdx::unordered_map<Session::Id, int> completedCycles;
381
382 _tl->_resetHooks();
383
384 // Same wait() callback for all sessions.
385 _tl->_wait = [this, &completedCycles, &cyclesLock, numSessions, numCycles, &delay](
386 Ticket ticket) -> Status {
387 auto id = ticket.sessionId();
388 int cycleCount;
389
390 {
391 stdx::lock_guard<stdx::mutex> lock(cyclesLock);
392 auto item = completedCycles.find(id);
393 invariant(item != completedCycles.end());
394 cycleCount = item->second;
395 }
396
397 auto mockTicket = _tl->getMockTicket(ticket);
398 // If we are sourcing:
399 if (mockTicket->message()) {
400 // If we've completed enough cycles, done.
401 if (cycleCount == numCycles) {
402 return kEndConnectionStatus;
403 }
404
405 // Otherwise, source another { ping : 1 }
406 invariant(mockTicket->message());
407 setPingCommand(*(mockTicket->message()));
408
409 // Wait a bit before returning
410 sleepmillis(delay.count());
411
412 return Status::OK();
413 }
414
415 // We are sinking, increment numCycles and return OK.
416 {
417 stdx::lock_guard<stdx::mutex> lock(cyclesLock);
418 auto item = completedCycles.find(id);
419 invariant(item != completedCycles.end());
420 ++(item->second);
421 }
422
423 return Status::OK();
424 };
425
426 // When we end the last session, end the test.
427 _tl->_destroy_hook = [&allSessionsComplete, numSessions, &ended](SEPTestSession& session) {
428 if (ended.fetchAndAdd(1) == (numSessions - 1)) {
429 allSessionsComplete.set_value();
430 }
431 };
432
433 for (int i = 0; i < numSessions; i++) {
434 auto s = SEPTestSession::create(_tl.get());
435 {
436 // This operation may cause a re-hash.
437 stdx::lock_guard<stdx::mutex> lock(cyclesLock);
438 completedCycles.emplace(s->id(), 0);
439 }
440 _sep->startSession(std::move(s));
441 }
442
443 // Block and wait for all sessions to finish.
444 allCompleteFuture.wait();
445 }
446
longSessionStressTest()447 void ServiceEntryPointTestSuite::longSessionStressTest() {
448 return burstStressTest(1000, 100, Milliseconds(100));
449 }
450
451 } // namespace mongo
452