1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree.
7  */
8 
9 #include "proxygen/lib/http/connpool/test/SessionPoolTestFixture.h"
10 
11 #include "proxygen/lib/http/connpool/ServerIdleSessionController.h"
12 #include "proxygen/lib/http/connpool/SessionHolder.h"
13 #include "proxygen/lib/http/connpool/SessionPool.h"
14 #include "proxygen/lib/http/connpool/ThreadIdleSessionController.h"
15 
16 #include <folly/io/async/EventBaseManager.h>
17 #include <folly/portability/GFlags.h>
18 #include <folly/synchronization/Baton.h>
19 #include <wangle/acceptor/ConnectionManager.h>
20 
21 using namespace proxygen;
22 using namespace std;
23 using namespace testing;
24 
TEST_F(SessionPoolFixture,ParallelPoolChangedMaxSessions)25 TEST_F(SessionPoolFixture, ParallelPoolChangedMaxSessions) {
26   SessionPool p(this, 1);
27   HTTPCodec::Callback* cb = nullptr;
28   auto codec = makeParallelCodec();
29   EXPECT_CALL(*codec, setCallback(_)).WillRepeatedly(SaveArg<0>(&cb));
30   p.putSession(makeSession(std::move(codec)));
31 
32   EXPECT_EQ(p.getNumSessions(), 1);
33   cb->onSettings({{SettingsId::MAX_CONCURRENT_STREAMS, 0}});
34   evb_.loop();
35   EXPECT_EQ(p.getNumSessions(), 0);
36 
37   // Clear the pool
38   p.setMaxIdleSessions(0);
39   evb_.loop();
40 }
41 
TEST_F(SessionPoolFixture,SerialPoolBasic)42 TEST_F(SessionPoolFixture, SerialPoolBasic) {
43   SessionPool p(this, 1);
44   p.putSession(makeSerialSession());
45   auto txn = p.getTransaction(this);
46   ASSERT_TRUE(txn != nullptr);
47   ASSERT_TRUE(p.getTransaction(this) == nullptr);
48 
49   // Clear the pool
50   p.setMaxIdleSessions(0);
51 
52   // Drop the transaction. All transactions on the sessions in the pool
53   // must be completed before the pool can be destroyed
54   txn->sendAbort();
55 
56   ASSERT_EQ(activated_, 1);
57   ASSERT_EQ(deactivated_, 1);
58   evb_.loop();
59   ASSERT_EQ(closed_, 1);
60 }
61 
TEST_F(SessionPoolFixture,ParallelPoolBasic)62 TEST_F(SessionPoolFixture, ParallelPoolBasic) {
63   const int numTxns = 32;
64   HTTPTransaction* txns[numTxns];
65 
66   SessionPool p(this, 1);
67   p.putSession(makeParallelSession());
68 
69   for (int i = 0; i < numTxns; ++i) {
70     txns[i] = p.getTransaction(this);
71     ASSERT_TRUE(txns[i] != nullptr);
72   }
73 
74   // Clear the pool
75   p.setMaxIdleSessions(0);
76 
77   // Drop the transactions. All transactions on the sessions in the pool
78   // must be completed before the pool can be destroyed
79   for (int i = 0; i < numTxns; ++i) {
80     txns[i]->sendAbort();
81   }
82 
83   ASSERT_EQ(activated_, 1);
84   ASSERT_EQ(deactivated_, 1);
85   evb_.loop();
86   ASSERT_EQ(closed_, 1);
87 }
88 
TEST_F(SessionPoolFixture,SerialPoolPurge)89 TEST_F(SessionPoolFixture, SerialPoolPurge) {
90   // Put more sessions into the pool than can fit. Then open several
91   // transactions on this pool and make sure we can't get out more
92   // transactions than the size of the pool.
93   const int sessionLimit = 10;
94   const int sessionPut = 12;
95   HTTPTransaction* txns[sessionPut];
96 
97   SessionPool p(this, sessionLimit);
98   for (int i = 0; i < sessionPut; ++i) {
99     p.putSession(makeSerialSession());
100   }
101 
102   evb_.loop();
103   ASSERT_EQ(activated_, 0);
104   ASSERT_EQ(deactivated_, 0);
105   ASSERT_EQ(closed_, sessionPut - sessionLimit);
106 
107   for (int i = 0; i < sessionPut; ++i) {
108     txns[i] = p.getTransaction(this);
109     if (i < sessionLimit) {
110       ASSERT_TRUE(txns[i] != nullptr);
111     } else {
112       ASSERT_TRUE(txns[i] == nullptr);
113     }
114   }
115   ASSERT_EQ(activated_, sessionLimit);
116 
117   // Clear the pool
118   p.setMaxIdleSessions(0);
119   // The txns are still active, so nothing should have been deactivated yet.
120   ASSERT_EQ(deactivated_, 0);
121 
122   // Drop the transactions. All transactions on the sessions in the pool
123   // must be completed before the pool can be destroyed
124   for (int i = 0; i < sessionPut; ++i) {
125     if (txns[i]) {
126       txns[i]->sendAbort();
127     }
128   }
129   evb_.loop();
130   ASSERT_EQ(activated_, sessionLimit);
131   ASSERT_EQ(deactivated_, sessionLimit);
132   ASSERT_EQ(closed_, sessionPut);
133 }
134 
TEST_F(SessionPoolFixture,ParallelPoolLists)135 TEST_F(SessionPoolFixture, ParallelPoolLists) {
136   // Test where we put in 2 parallel sessions into the pool. Ensure that
137   // the first one fills up before we start using the second.
138   SessionPool p(this, 2);
139   std::vector<HTTPTransaction*> txnsSess1;
140   std::vector<HTTPTransaction*> txnsSess2;
141 
142   auto sess1 = makeParallelSession();
143   auto sess2 = makeParallelSession();
144   p.putSession(sess1);
145   p.putSession(sess2);
146   txnsSess1.push_back(CHECK_NOTNULL(p.getTransaction(this)));
147   // Since these two sessions are equally old, it's ok for either one to
148   // be selected to become "active"
149   if (sess2->getNumOutgoingStreams() > sess1->getNumOutgoingStreams()) {
150     std::swap(sess1, sess2);
151   }
152   ASSERT_EQ(sess1->getNumOutgoingStreams(), 1);
153   ASSERT_EQ(sess2->getNumOutgoingStreams(), 0);
154   ASSERT_EQ(activated_, 1);
155   ASSERT_EQ(deactivated_, 0);
156   ASSERT_EQ(closed_, 0);
157 
158   // sess1 should be filled completely before we add any transactions to sess2
159   while (sess1->supportsMoreTransactions()) {
160     txnsSess1.push_back(CHECK_NOTNULL(p.getTransaction(this)));
161   }
162   ASSERT_EQ(sess1->getNumOutgoingStreams(),
163             sess1->getMaxConcurrentOutgoingStreams());
164   ASSERT_EQ(sess2->getNumOutgoingStreams(), 0);
165   ASSERT_EQ(activated_, 1);
166   ASSERT_EQ(deactivated_, 0);
167   ASSERT_EQ(closed_, 0);
168 
169   // Now fill sess2
170   while (sess2->supportsMoreTransactions()) {
171     txnsSess2.push_back(CHECK_NOTNULL(p.getTransaction(this)));
172   }
173   // The two sessions should be completely full
174   ASSERT_EQ(sess1->getNumOutgoingStreams(),
175             sess1->getMaxConcurrentOutgoingStreams());
176   ASSERT_EQ(sess2->getNumOutgoingStreams(),
177             sess2->getMaxConcurrentOutgoingStreams());
178   ASSERT_EQ(activated_, 2);
179   ASSERT_EQ(deactivated_, 0);
180   ASSERT_EQ(closed_, 0);
181 
182   // Adding 1 more txn should fail since both sessions are full
183   CHECK(nullptr == p.getTransaction(this));
184 
185   evb_.loop();
186   ASSERT_EQ(sess1->getNumOutgoingStreams(),
187             sess1->getMaxConcurrentOutgoingStreams());
188   ASSERT_EQ(sess2->getNumOutgoingStreams(),
189             sess2->getMaxConcurrentOutgoingStreams());
190   ASSERT_EQ(activated_, 2);
191   ASSERT_EQ(deactivated_, 0);
192   ASSERT_EQ(closed_, 0);
193 
194   // Dropping all the txns on sess1 should "deactivate" that session
195   for (auto& txn : txnsSess1) {
196     txn->sendAbort();
197   }
198   ASSERT_EQ(activated_, 2);
199   ASSERT_EQ(deactivated_, 1);
200   ASSERT_EQ(closed_, 0);
201 
202   // Just for kicks, ensure that no session is scheduled to be closed
203   // right now
204   evb_.loop();
205   ASSERT_EQ(closed_, 0);
206 
207   // Decrease the session pool limit to 0 and purge the idle sessions.
208   // This should close sess1 (the only idle session at this time)
209   p.setMaxIdleSessions(0);
210   evb_.loop();
211   ASSERT_EQ(closed_, 1);
212 
213   // Drop 1 txn from the completely full sess2, then get another txn. This
214   // should succeed. This shouldn't "deactivate" sess2.
215   txnsSess2.back()->sendAbort();
216   txnsSess2.pop_back();
217   ASSERT_EQ(deactivated_, 1);
218   txnsSess2.push_back(CHECK_NOTNULL(p.getTransaction(this)));
219 
220   // Set the max pooled sessions to zero and drop all txns from
221   // sess2. The session should be automatically closed when it hits
222   // idle (i.e., no transactions open)
223   for (auto& txn : txnsSess2) {
224     ASSERT_EQ(closed_, 1);
225     txn->sendAbort();
226   }
227   ASSERT_EQ(activated_, 2);
228   ASSERT_EQ(deactivated_, 2);
229   ASSERT_EQ(closed_, 2);
230   evb_.loop();
231 }
232 
TEST_F(SessionPoolFixture,OutstandingWrites)233 TEST_F(SessionPoolFixture, OutstandingWrites) {
234   auto codec = makeSerialCodec();
235   EXPECT_CALL(*codec, generateHeader(_, _, _, _, _, _))
236       .WillOnce(Invoke([](folly::IOBufQueue& writeBuf,
237                           HTTPCodec::StreamID /*id*/,
238                           const HTTPMessage& /*msg*/,
239                           bool /*eom*/,
240                           HTTPHeaderSize* size,
241                           folly::Optional<HTTPHeaders>) {
242         writeBuf.append("somedata");
243         if (size) {
244           size->uncompressed = 8;
245         }
246       }));
247   auto sess = makeSession(std::move(codec));
248 
249   SessionPool p(this, 1);
250   p.putSession(sess);
251   ASSERT_FALSE(attached_);
252   ASSERT_FALSE(sess->isClosing());
253   auto txn = CHECK_NOTNULL(p.getTransaction(this));
254   ASSERT_TRUE(attached_);
255   txn->sendHeaders(HTTPMessage());
256   txn->sendAbort();
257   // Because the session has outstanding writes and is not parallel, the
258   // session is not reusable, and so it will be drained and closed.
259   ASSERT_TRUE(sess->isClosing());
260 
261   evb_.loop();
262   // The session's writes are now drained
263   ASSERT_FALSE(attached_);
264   ASSERT_EQ(deactivated_, 1);
265   ASSERT_EQ(closed_, 1);
266 }
267 
TEST_F(SessionPoolFixture,OutstandingTransaction)268 TEST_F(SessionPoolFixture, OutstandingTransaction) {
269   // Similar to the previous test, but even stricter. When the pool is
270   // reset, there are outstanding transactions. So, the pool should stay
271   // open longer to let the transaction finish.
272   auto sess = makeSerialSession();
273   HTTPTransaction* txn = nullptr;
274   {
275     SessionPool p(this, 1);
276     p.putSession(sess);
277     ASSERT_FALSE(attached_);
278     txn = p.getTransaction(this);
279     CHECK_NOTNULL(txn);
280     ASSERT_TRUE(attached_);
281     ASSERT_EQ(deactivated_, 0);
282     ASSERT_EQ(closed_, 0);
283     ASSERT_EQ(p.getNumSessions(), 1);
284   }
285   // Destroying the SessionPool starts draining all the sessions.
286   // We get the stats for the deactivation and closing early since later
287   // there is no guarantee the stats object will be around.
288   ASSERT_EQ(deactivated_, 1);
289   ASSERT_EQ(closed_, 1);
290 
291   // Dropping the transaction will let the session close
292   ASSERT_TRUE(attached_);
293   txn->sendAbort();
294   ASSERT_FALSE(attached_);
295 }
296 
TEST_F(SessionPoolFixture,DroppedRequestNotPooled)297 TEST_F(SessionPoolFixture, DroppedRequestNotPooled) {
298   // Let the session pool have 10 max
299   SessionPool p(this, 10, std::chrono::seconds(4));
300   auto codec = makeSerialCodec();
301   // In this test, the codec starts off not busy, but then we do some
302   // processing on it, and it should remain busy for the remainder of the
303   // test since the response never arrives
304   bool shouldBeBusy = false;
305   EXPECT_CALL(*codec, isBusy()).WillRepeatedly(Invoke([&]() {
306     return shouldBeBusy;
307   }));
308   auto sess = makeSession(std::move(codec));
309   p.putSession(sess);
310   auto txn1 = p.getTransaction(this);
311   ASSERT_EQ(p.getNumIdleSessions(), 0);
312   ASSERT_EQ(p.getNumActiveSessions(), 1);
313   ASSERT_EQ(p.getNumSessions(), 1);
314 
315   HTTPMessage req;
316   req.setMethod("GET");
317   req.setURL<string>("/");
318   req.setHTTPVersion(1, 1);
319   txn1->sendHeaders(req);
320   txn1->sendEOM();
321   shouldBeBusy = true;
322   ASSERT_TRUE(attached_);
323   evb_.loop();
324   // Writes have now succeeded. We should still be attached since we
325   // terminated the loop before the timeouts fired
326   CHECK(!timeout_);
327   ASSERT_TRUE(attached_);
328 
329   // Now drop the txn before the response comes back
330   ASSERT_EQ(p.getNumIdleSessions(), 0);
331   ASSERT_EQ(p.getNumActiveSessions(), 1);
332   ASSERT_EQ(p.getNumSessions(), 1);
333   txn1->sendAbort();
334   // The drop should have closed the session since the codec was busy when
335   // the transaction count went to zero
336 
337   // We should fail to get another txn out of the pool
338   ASSERT_EQ(p.getNumIdleSessions(), 0);
339   ASSERT_EQ(p.getNumActiveSessions(), 0);
340   ASSERT_EQ(p.getNumSessions(), 0);
341   ASSERT_TRUE(nullptr == p.getTransaction(this));
342   evb_.loop();
343 }
344 
TEST_F(SessionPoolFixture,InsertIntoZeroSizePool)345 TEST_F(SessionPoolFixture, InsertIntoZeroSizePool) {
346   // Let the session pool have 0 max sessions
347   SessionPool p(this, 0, std::chrono::seconds(4));
348   p.putSession(makeSerialSession());
349   CHECK(nullptr == p.getTransaction(this));
350   evb_.loop();
351 }
352 
TEST_F(SessionPoolFixture,DrainSessionLater)353 TEST_F(SessionPoolFixture, DrainSessionLater) {
354   // Mark a session in the pool to drain, then make sure the pool works.
355   SessionPool p(this, 10, std::chrono::seconds(4));
356   auto session = makeParallelSession();
357   p.putSession(session);
358 
359   auto txn1 = CHECK_NOTNULL(p.getTransaction(this));
360   auto txn2 = CHECK_NOTNULL(p.getTransaction(this));
361   session->drain();
362   ASSERT_EQ(p.getNumSessions(), 1); // don't detect until getTransaction()
363   CHECK(nullptr == p.getTransaction(this));
364   ASSERT_EQ(p.getNumSessions(), 0);
365   // now let the session be destroyed
366   txn1->sendAbort();
367   txn2->sendAbort();
368 }
369 
TEST_F(SessionPoolFixture,InsertDrainedSession)370 TEST_F(SessionPoolFixture, InsertDrainedSession) {
371   // Put a draining session into the pool. Make sure the pool ignores it.
372   auto session = makeParallelSession();
373   CHECK_NOTNULL(session->newTransaction(this));
374   session->drain();
375 
376   auto cm = wangle::ConnectionManager::makeUnique(
377       &evb_, std::chrono::milliseconds(1), nullptr);
378   SessionPool p(this, 10, std::chrono::seconds(4), std::chrono::seconds(0));
379   cm->addConnection(session);
380   p.putSession(session);
381   ASSERT_EQ(p.getNumSessions(), 0);
382   CHECK(nullptr == p.getTransaction(this));
383 
384   // this session needs to be owned by the cm
385   cm->dropAllConnections();
386   EXPECT_EQ(numSessions_, 0);
387 }
388 
TEST_F(SessionPoolFixture,CloseNotReusable)389 TEST_F(SessionPoolFixture, CloseNotReusable) {
390   // Make sure if a connection becomes not reusable, that it is removed
391   // from the pool after the txn completes
392   SessionPool p(this, 10, std::chrono::seconds(4));
393 
394   // Codec expectations
395   bool reusable = true;
396   auto codec = std::make_unique<NiceMock<MockHTTPCodec>>();
397   EXPECT_CALL(*codec, getTransportDirection())
398       .WillRepeatedly(Return(TransportDirection::UPSTREAM));
399   EXPECT_CALL(*codec, createStream()).WillOnce(Return(1));
400   EXPECT_CALL(*codec, isReusable()).WillRepeatedly(ReturnPointee(&reusable));
401   EXPECT_CALL(*codec, supportsParallelRequests()).WillRepeatedly(Return(false));
402   EXPECT_CALL(*codec, getProtocol())
403       .WillRepeatedly(Return(CodecProtocol::SPDY_3_1));
404 
405   p.putSession(makeSession(std::move(codec)));
406   ASSERT_EQ(p.getNumSessions(), 1);
407   auto txn = CHECK_NOTNULL(p.getTransaction(this));
408   reusable = false; // Mark the session as not reusable, e.g. if it got
409                     // Connection: close
410   txn->sendAbort();
411   ASSERT_EQ(p.getNumSessions(), 0); // Non-reusable conn should be dropped
412 }
413 
TEST_F(SessionPoolFixture,InsertOldSession)414 TEST_F(SessionPoolFixture, InsertOldSession) {
415   // Put a session into the pool that is over max-age. Make sure the
416   // pool ignores it.
417   auto session = makeParallelSession();
418 
419   SessionPool p(
420       this, 10, std::chrono::seconds(4), std::chrono::milliseconds(50));
421   usleep(70000); // exceeds max jitter (currently 65ms)
422   p.putSession(session);
423   ASSERT_EQ(p.getNumSessions(), 0);
424   CHECK(nullptr == p.getTransaction(this));
425 }
426 
TEST_F(SessionPoolFixture,IdleTmeout)427 TEST_F(SessionPoolFixture, IdleTmeout) {
428   // Put a session into the pool, let it timeout and ask for a new txn
429   // pool ignores it.
430   auto session = makeParallelSession();
431 
432   SessionPool p(
433       this, 10, std::chrono::milliseconds(250), std::chrono::seconds(4));
434   p.putSession(session);
435   ASSERT_TRUE(p.getNumSessions() == 1);
436   auto txn = CHECK_NOTNULL(p.getTransaction(this));
437   txn->sendAbort();
438   /* sleep override */ usleep(260000); // > 250ms
439   CHECK(nullptr == p.getTransaction(this));
440   ASSERT_EQ(p.getNumSessions(), 0);
441 }
442 
TEST_F(SessionPoolFixture,AgeOut)443 TEST_F(SessionPoolFixture, AgeOut) {
444   // Put a session into the pool, let it age out and ask for a new txn
445   // pool ignores it.
446   auto session = makeParallelSession();
447 
448   SessionPool p(
449       this, 10, std::chrono::seconds(4), std::chrono::milliseconds(250));
450   p.putSession(session);
451   // possible flake if this process takes a dirtnap for >= 175ms
452   ASSERT_TRUE(p.getNumSessions() == 1);
453   auto txn = CHECK_NOTNULL(p.getTransaction(this));
454   txn->sendAbort();
455   usleep(350000); // over max jitter (325 ms)
456   CHECK(nullptr == p.getTransaction(this));
457   ASSERT_EQ(p.getNumSessions(), 0);
458 }
459 
TEST_F(SessionPoolFixture,DrainOnShutdown)460 TEST_F(SessionPoolFixture, DrainOnShutdown) {
461   // Verify that the session is drained if the socket is closing/closed
462   auto session = makeParallelSession();
463 
464   SessionPool p(this, 10, std::chrono::milliseconds(4));
465   p.putSession(session);
466   ASSERT_EQ(p.getNumSessions(), 1);
467   ASSERT_EQ(p.getNumIdleSessions(), 1);
468   ASSERT_EQ(p.getNumActiveSessions(), 0);
469 
470   auto txn = p.getTransaction(this);
471   ASSERT_TRUE(txn != nullptr);
472   ASSERT_EQ(p.getNumSessions(), 1);
473   ASSERT_EQ(p.getNumIdleSessions(), 0);
474   ASSERT_EQ(p.getNumActiveSessions(), 1);
475   ASSERT_EQ(p.getNumActiveNonFullSessions(), 1);
476   ASSERT_EQ(session->getNumOutgoingStreams(), 1);
477 
478   // Manually close the socket
479   session->getTransport()->closeNow();
480   ASSERT_EQ(session->getConnectionCloseReason(),
481             ConnectionCloseReason::kMAX_REASON);
482 
483   // New transaction creation fails and drains the session
484   ASSERT_TRUE(p.getTransaction(this) == nullptr);
485   ASSERT_EQ(session->getConnectionCloseReason(),
486             ConnectionCloseReason::SHUTDOWN);
487   ASSERT_EQ(p.getNumSessions(), 0);
488   ASSERT_EQ(session->getNumOutgoingStreams(), 1);
489 
490   // Now let the session be destroyed
491   txn->sendAbort();
492 }
493 
494 class TestIdleController : public ServerIdleSessionController {
495  public:
496   // expose this method as public for tests.
popBestIdlePool()497   SessionPool* popBestIdlePool() {
498     return ServerIdleSessionController::popBestIdlePool();
499   }
500 };
501 
TEST_F(SessionPoolFixture,MoveIdleSessionBetweenThreadsTest)502 TEST_F(SessionPoolFixture, MoveIdleSessionBetweenThreadsTest) {
503   TestIdleController ctrl;
504   HTTPUpstreamSession* session = nullptr;
505 
506   folly::Baton<> t1InitBaton, t2InitBaton, transferBaton;
507   // Create two threads, each looping on their own event base
508   std::thread t1([&] {
509     folly::EventBaseManager::get()->setEventBase(&evb_, false);
510     SessionPool p1(this,
511                    10,
512                    std::chrono::seconds(30),
513                    std::chrono::milliseconds(0),
514                    nullptr,
515                    &ctrl);
516     // Put an (idle) session on p1.
517     session = makeParallelSession();
518     p1.putSession(session);
519     t1InitBaton.post();
520     evb_.loopForever();
521   });
522 
523   // Wait for t1 to start before starting t2
524   // to ensure it is picked by popBestIdlePool()
525   t1InitBaton.wait();
526 
527   folly::EventBase evb2;
528   std::thread t2([&] {
529     folly::EventBaseManager::get()->setEventBase(&evb2, false);
530     SessionPool p2(this,
531                    10,
532                    std::chrono::seconds(30),
533                    std::chrono::milliseconds(0),
534                    nullptr,
535                    &ctrl);
536     t2InitBaton.post();
537     evb2.loopForever();
538   });
539 
540   t2InitBaton.wait();
541   // Simulate thread2 asking thread1 for an idle session
542   evb2.runInEventBaseThread([&] {
543     ctrl.getIdleSession().via(&evb2).thenValue(
544         [&](HTTPSessionBase* idleSession) {
545           ASSERT_EQ(idleSession, session);
546           // Not re-attaching it to thread2 so ctrl will be empty
547           transferBaton.post();
548         });
549   });
550   transferBaton.wait();
551   EXPECT_EQ(ctrl.popBestIdlePool(), nullptr);
552 
553   session->drain();
554   evb_.terminateLoopSoon();
555   evb2.terminateLoopSoon();
556   t1.join();
557   t2.join();
558 }
559 
TEST_F(SessionPoolFixture,PurgeAddedSessionTest)560 TEST_F(SessionPoolFixture, PurgeAddedSessionTest) {
561   TestIdleController ctrl;
562   HTTPUpstreamSession* session = makeParallelSession();
563   SessionPool p1(this,
564                  1,
565                  /*idleTimeout=*/std::chrono::milliseconds(0),
566                  std::chrono::milliseconds(0),
567                  nullptr,
568                  &ctrl);
569 
570   // This is about to purge session immediately and should not crash.
571   p1.putSession(session);
572 }
573 
TEST_F(SessionPoolFixture,ServerIdleSessionControllerTest)574 TEST_F(SessionPoolFixture, ServerIdleSessionControllerTest) {
575   TestIdleController ctrl;
576   SessionPool p1, p2;
577   auto s1 = makeParallelSession(), s2 = makeParallelSession(),
578        s3 = makeParallelSession();
579   EXPECT_EQ(ctrl.popBestIdlePool(), nullptr);
580 
581   ctrl.addIdleSession(s1, &p1);
582   EXPECT_EQ(ctrl.popBestIdlePool(), &p1);
583   EXPECT_EQ(ctrl.popBestIdlePool(), nullptr);
584 
585   ctrl.addIdleSession(s3, &p2);
586   ctrl.addIdleSession(s1, &p1);
587   EXPECT_EQ(ctrl.popBestIdlePool(), &p2);
588   EXPECT_EQ(ctrl.popBestIdlePool(), &p1);
589   EXPECT_EQ(ctrl.popBestIdlePool(), nullptr);
590 
591   ctrl.addIdleSession(s1, &p1);
592   ctrl.addIdleSession(s2, &p1);
593   ctrl.removeIdleSession(s1);
594   ctrl.addIdleSession(s3, &p2);
595   EXPECT_EQ(ctrl.popBestIdlePool(), &p1);
596   EXPECT_EQ(ctrl.popBestIdlePool(), &p2);
597   EXPECT_EQ(ctrl.popBestIdlePool(), nullptr);
598 
599   s1->drain();
600   s2->drain();
601   s3->drain();
602 }
603 
TEST_F(SessionPoolFixture,WritePausedSessionNotMarkedAsIdle)604 TEST_F(SessionPoolFixture, WritePausedSessionNotMarkedAsIdle) {
605   auto codec = makeParallelCodec();
606   EXPECT_CALL(*codec, generateHeader(_, _, _, _, _, _))
607       .WillOnce(Invoke([](folly::IOBufQueue& writeBuf,
608                           HTTPCodec::StreamID /*id*/,
609                           const HTTPMessage& /*msg*/,
610                           bool /*eom*/,
611                           HTTPHeaderSize* size,
612                           folly::Optional<HTTPHeaders>) {
613         writeBuf.append("somedata");
614         if (size) {
615           size->uncompressed = 8;
616         }
617       }));
618   auto session = makeSession(std::move(codec));
619 
620   TestIdleController ctrl;
621   SessionPool p1(this,
622                  10,
623                  std::chrono::seconds(30),
624                  std::chrono::milliseconds(0),
625                  nullptr,
626                  &ctrl);
627   p1.putSession(session);
628 
629   auto txn = p1.getTransaction(this);
630   ASSERT_NE(txn, (HTTPTransaction*)nullptr);
631   TestAsyncTransport* transport =
632       session->getTransport()->getUnderlyingTransport<TestAsyncTransport>();
633   transport->pauseWrites();
634   HTTPMessage req;
635   req.setMethod("GET");
636   req.setURL<string>("/");
637   req.setHTTPVersion(1, 1);
638   txn->sendHeaders(req);
639   evb_.loopOnce();
640   txn->sendAbort();
641 
642   transport->resumeWrites();
643   evb_.loopOnce();
644 
645   // Session should not be marked as idle.
646   EXPECT_EQ(ctrl.popBestIdlePool(), nullptr);
647   EXPECT_EQ(p1.getNumSessions(), 1);
648   EXPECT_EQ(p1.getNumIdleSessions(), 0);
649   EXPECT_EQ(p1.getNumActiveSessions(), 1);
650   EXPECT_EQ(p1.getNumActiveNonFullSessions(), 1);
651 }
652 
TEST_F(SessionPoolFixture,ThreadIdleSessionControllerLimitsTotalIdle)653 TEST_F(SessionPoolFixture, ThreadIdleSessionControllerLimitsTotalIdle) {
654   ThreadIdleSessionController controller(3);
655   SessionPool p1(this,
656                  2,
657                  std::chrono::milliseconds(50),
658                  std::chrono::milliseconds(50),
659                  &controller);
660   SessionPool p2(this,
661                  2,
662                  std::chrono::milliseconds(50),
663                  std::chrono::milliseconds(50),
664                  &controller);
665 
666   // Add two sessions on each pool.
667   p1.putSession(makeSerialSession());
668   p1.putSession(makeSerialSession());
669 
670   p2.putSession(makeSerialSession());
671   p2.putSession(makeSerialSession());
672 
673   // Validate ThreadIdleSessionController limited it to 3 at most.
674   EXPECT_EQ(controller.getTotalIdleSessions(), 3);
675 
676   auto txn = p1.getTransaction(this);
677   ASSERT_TRUE(txn != nullptr);
678 
679   // Drop the transaction. All transactions on the sessions in the pool
680   // must be completed before the pool can be destroyed
681   txn->sendAbort();
682 
683   evb_.loop();
684   ASSERT_EQ(closed_, 2);
685   EXPECT_EQ(controller.getTotalIdleSessions(), 2);
686   EXPECT_EQ(p1.getNumIdleSessions(), 0);
687   EXPECT_EQ(p2.getNumIdleSessions(), 2);
688 }
689 
TEST_F(SessionPoolFixture,ThreadIdleSessionControllerTrackSessionPoolChanges)690 TEST_F(SessionPoolFixture, ThreadIdleSessionControllerTrackSessionPoolChanges) {
691   ThreadIdleSessionController controller(5);
692   SessionPool p1(this,
693                  2,
694                  std::chrono::milliseconds(50),
695                  std::chrono::milliseconds(50),
696                  &controller);
697   SessionPool p2(this,
698                  2,
699                  std::chrono::milliseconds(50),
700                  std::chrono::milliseconds(50),
701                  &controller);
702 
703   // Add two sessions on each pool.
704   p1.putSession(makeSerialSession());
705   p1.putSession(makeSerialSession());
706 
707   p2.putSession(makeSerialSession());
708   p2.putSession(makeSerialSession());
709 
710   // Validate ThreadIdleSessionController sees all sessions.
711   EXPECT_EQ(controller.getTotalIdleSessions(), 4);
712 
713   // Drop a number of sessions.
714   p1.setMaxIdleSessions(1);
715   p2.setMaxIdleSessions(1);
716 
717   EXPECT_EQ(controller.getTotalIdleSessions(), 2);
718   EXPECT_EQ(p1.getNumIdleSessions(), 1);
719   EXPECT_EQ(p2.getNumIdleSessions(), 1);
720 }
721 
722 // So we can have -v work
main(int argc,char ** argv)723 int main(int argc, char** argv) {
724   testing::InitGoogleTest(&argc, argv);
725   gflags::ParseCommandLineFlags(&argc, &argv, true);
726   google::InitGoogleLogging(argv[0]);
727   return RUN_ALL_TESTS();
728 }
729