1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <atomic>
18 #include <future>
19 #include <iostream>
20 #include <memory>
21 #include <thread>
22 
23 #include <folly/Memory.h>
24 #include <folly/ScopeGuard.h>
25 #include <folly/futures/Promise.h>
26 #include <folly/io/async/AsyncTimeout.h>
27 #include <folly/io/async/EventBase.h>
28 #include <folly/io/async/EventHandler.h>
29 #include <folly/io/async/test/SocketPair.h>
30 #include <folly/io/async/test/Util.h>
31 #include <folly/portability/Stdlib.h>
32 #include <folly/portability/Unistd.h>
33 #include <folly/synchronization/Baton.h>
34 #include <folly/system/ThreadName.h>
35 
36 #define FOLLY_SKIP_IF_NULLPTR_BACKEND_WITH_OPTS(evb, opts)  \
37   std::unique_ptr<EventBase> evb##Ptr;                      \
38   try {                                                     \
39     auto factory = [] {                                     \
40       auto backend = TypeParam::getBackend();               \
41       if (!backend) {                                       \
42         throw std::runtime_error("backend not available");  \
43       }                                                     \
44       return backend;                                       \
45     };                                                      \
46     auto evbOpts = opts;                                    \
47     evb##Ptr = std::make_unique<EventBase>(                 \
48         opts.setBackendFactory(std::move(factory)));        \
49   } catch (const std::runtime_error& e) {                   \
50     if (std::string("backend not available") == e.what()) { \
51       SKIP() << "Backend not available";                    \
52     }                                                       \
53   }                                                         \
54   EventBase& evb = *evb##Ptr.get()
55 
56 #define FOLLY_SKIP_IF_NULLPTR_BACKEND(evb) \
57   FOLLY_SKIP_IF_NULLPTR_BACKEND_WITH_OPTS(evb, EventBase::Options())
58 
59 ///////////////////////////////////////////////////////////////////////////
60 // Tests for read and write events
61 ///////////////////////////////////////////////////////////////////////////
62 
63 namespace folly {
64 namespace test {
65 class EventBaseTestBase : public ::testing::Test {
66  public:
EventBaseTestBase()67   EventBaseTestBase() {
68     // libevent 2.x uses a coarse monotonic timer by default on Linux.
69     // This timer is imprecise enough to cause several of our tests to fail.
70     //
71     // Set an environment variable that causes libevent to use a non-coarse
72     // timer. This can be controlled programmatically by using the
73     // EVENT_BASE_FLAG_PRECISE_TIMER flag with event_base_new_with_config().
74     // However, this would require more compile-time #ifdefs to tell if we are
75     // using libevent 2.1+ or not.  Simply using the environment variable is
76     // the easiest option for now.
77     setenv("EVENT_PRECISE_TIMER", "1", 1);
78   }
79 };
80 
81 template <typename T>
82 class EventBaseTest : public EventBaseTestBase {
83  public:
84   EventBaseTest() = default;
85 };
86 
87 TYPED_TEST_SUITE_P(EventBaseTest);
88 
89 template <typename T>
90 class EventBaseTest1 : public EventBaseTestBase {
91  public:
92   EventBaseTest1() = default;
93 };
94 
95 template <class Factory>
96 std::unique_ptr<EventBase> getEventBase(
97     folly::EventBase::Options opts = folly::EventBase::Options()) {
98   try {
99     auto factory = [] {
100       auto backend = Factory::getBackend();
101       if (!backend) {
102         throw std::runtime_error("backend not available");
103       }
104       return backend;
105     };
106     return std::make_unique<EventBase>(
107         opts.setBackendFactory(std::move(factory)));
catch(const std::runtime_error & e)108   } catch (const std::runtime_error& e) {
109     if (std::string("backend not available") == e.what()) {
110       return nullptr;
111     }
112     throw;
113   }
114 }
115 
116 TYPED_TEST_SUITE_P(EventBaseTest1);
117 
118 enum { BUF_SIZE = 4096 };
119 
writeToFD(int fd,size_t length)120 FOLLY_ALWAYS_INLINE ssize_t writeToFD(int fd, size_t length) {
121   // write an arbitrary amount of data to the fd
122   auto bufv = std::vector<char>(length);
123   auto buf = bufv.data();
124   memset(buf, 'a', length);
125   ssize_t rc = write(fd, buf, length);
126   CHECK_EQ(rc, length);
127   return rc;
128 }
129 
writeUntilFull(int fd)130 FOLLY_ALWAYS_INLINE size_t writeUntilFull(int fd) {
131   // Write to the fd until EAGAIN is returned
132   size_t bytesWritten = 0;
133   char buf[BUF_SIZE];
134   memset(buf, 'a', sizeof(buf));
135   while (true) {
136     ssize_t rc = write(fd, buf, sizeof(buf));
137     if (rc < 0) {
138       CHECK_EQ(errno, EAGAIN);
139       break;
140     } else {
141       bytesWritten += rc;
142     }
143   }
144   return bytesWritten;
145 }
146 
readFromFD(int fd,size_t length)147 FOLLY_ALWAYS_INLINE ssize_t readFromFD(int fd, size_t length) {
148   // write an arbitrary amount of data to the fd
149   auto buf = std::vector<char>(length);
150   return read(fd, buf.data(), length);
151 }
152 
readUntilEmpty(int fd)153 FOLLY_ALWAYS_INLINE size_t readUntilEmpty(int fd) {
154   // Read from the fd until EAGAIN is returned
155   char buf[BUF_SIZE];
156   size_t bytesRead = 0;
157   while (true) {
158     int rc = read(fd, buf, sizeof(buf));
159     if (rc == 0) {
160       CHECK(false) << "unexpected EOF";
161     } else if (rc < 0) {
162       CHECK_EQ(errno, EAGAIN);
163       break;
164     } else {
165       bytesRead += rc;
166     }
167   }
168   return bytesRead;
169 }
170 
checkReadUntilEmpty(int fd,size_t expectedLength)171 FOLLY_ALWAYS_INLINE void checkReadUntilEmpty(int fd, size_t expectedLength) {
172   ASSERT_EQ(readUntilEmpty(fd), expectedLength);
173 }
174 
175 struct ScheduledEvent {
176   int milliseconds;
177   uint16_t events;
178   size_t length;
179   ssize_t result;
180 
performScheduledEvent181   void perform(int fd) {
182     if (events & folly::EventHandler::READ) {
183       if (length == 0) {
184         result = readUntilEmpty(fd);
185       } else {
186         result = readFromFD(fd, length);
187       }
188     }
189     if (events & folly::EventHandler::WRITE) {
190       if (length == 0) {
191         result = writeUntilFull(fd);
192       } else {
193         result = writeToFD(fd, length);
194       }
195     }
196   }
197 };
198 
scheduleEvents(EventBase * eventBase,int fd,ScheduledEvent * events)199 FOLLY_ALWAYS_INLINE void scheduleEvents(
200     EventBase* eventBase, int fd, ScheduledEvent* events) {
201   for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
202     eventBase->tryRunAfterDelay(
203         std::bind(&ScheduledEvent::perform, ev, fd), ev->milliseconds);
204   }
205 }
206 
207 class TestObserver : public folly::ExecutionObserver {
208  public:
starting(uintptr_t)209   virtual void starting(uintptr_t /* id */) noexcept override {
210     if (nestedStart_ == 0) {
211       nestedStart_ = 1;
212     }
213     numStartingCalled_++;
214   }
stopped(uintptr_t)215   virtual void stopped(uintptr_t /* id */) noexcept override {
216     nestedStart_--;
217     numStoppedCalled_++;
218   }
runnable(uintptr_t)219   virtual void runnable(uintptr_t /* id */) noexcept override {
220     // Unused
221   }
222 
223   int nestedStart_{0};
224   int numStartingCalled_{0};
225   int numStoppedCalled_{0};
226 };
227 
228 class TestHandler : public folly::EventHandler {
229  public:
TestHandler(folly::EventBase * eventBase,int fd)230   TestHandler(folly::EventBase* eventBase, int fd)
231       : EventHandler(eventBase, folly::NetworkSocket::fromFd(fd)), fd_(fd) {}
232 
handlerReady(uint16_t events)233   void handlerReady(uint16_t events) noexcept override {
234     ssize_t bytesRead = 0;
235     ssize_t bytesWritten = 0;
236     if (events & READ) {
237       // Read all available data, so EventBase will stop calling us
238       // until new data becomes available
239       bytesRead = readUntilEmpty(fd_);
240     }
241     if (events & WRITE) {
242       // Write until the pipe buffer is full, so EventBase will stop calling
243       // us until the other end has read some data
244       bytesWritten = writeUntilFull(fd_);
245     }
246 
247     log.emplace_back(events, bytesRead, bytesWritten);
248   }
249 
250   struct EventRecord {
EventRecordEventRecord251     EventRecord(uint16_t events_, size_t bytesRead_, size_t bytesWritten_)
252         : events(events_),
253           timestamp(),
254           bytesRead(bytesRead_),
255           bytesWritten(bytesWritten_) {}
256 
257     uint16_t events;
258     folly::TimePoint timestamp;
259     ssize_t bytesRead;
260     ssize_t bytesWritten;
261   };
262 
263   std::deque<EventRecord> log;
264 
265  private:
266   int fd_;
267 };
268 
269 /**
270  * Test a READ event
271  */
TYPED_TEST_P(EventBaseTest,ReadEvent)272 TYPED_TEST_P(EventBaseTest, ReadEvent) {
273   auto evbPtr = getEventBase<TypeParam>();
274   SKIP_IF(!evbPtr) << "Backend not available";
275   folly::EventBase& eb = *evbPtr;
276   SocketPair sp;
277 
278   // Register for read events
279   TestHandler handler(&eb, sp[0]);
280   handler.registerHandler(EventHandler::READ);
281 
282   // Register timeouts to perform two write events
283   ScheduledEvent events[] = {
284       {10, EventHandler::WRITE, 2345, 0},
285       {160, EventHandler::WRITE, 99, 0},
286       {0, 0, 0, 0},
287   };
288   TimePoint start;
289   scheduleEvents(&eb, sp[1], events);
290 
291   // Loop
292   eb.loop();
293   TimePoint end;
294 
295   // Since we didn't use the EventHandler::PERSIST flag, the handler should
296   // have received the first read, then unregistered itself.  Check that only
297   // the first chunk of data was received.
298   ASSERT_EQ(handler.log.size(), 1);
299   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
300   T_CHECK_TIMEOUT(
301       start,
302       handler.log[0].timestamp,
303       std::chrono::milliseconds(events[0].milliseconds),
304       std::chrono::milliseconds(90));
305   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
306   ASSERT_EQ(handler.log[0].bytesWritten, 0);
307   T_CHECK_TIMEOUT(
308       start,
309       end,
310       std::chrono::milliseconds(events[1].milliseconds),
311       std::chrono::milliseconds(30));
312 
313   // Make sure the second chunk of data is still waiting to be read.
314   size_t bytesRemaining = readUntilEmpty(sp[0]);
315   ASSERT_EQ(bytesRemaining, events[1].length);
316 }
317 
318 /**
319  * Test (READ | PERSIST)
320  */
TYPED_TEST_P(EventBaseTest,ReadPersist)321 TYPED_TEST_P(EventBaseTest, ReadPersist) {
322   auto evbPtr = getEventBase<TypeParam>();
323   SKIP_IF(!evbPtr) << "Backend not available";
324   folly::EventBase& eb = *evbPtr;
325   SocketPair sp;
326 
327   // Register for read events
328   TestHandler handler(&eb, sp[0]);
329   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
330 
331   // Register several timeouts to perform writes
332   ScheduledEvent events[] = {
333       {10, EventHandler::WRITE, 1024, 0},
334       {20, EventHandler::WRITE, 2211, 0},
335       {30, EventHandler::WRITE, 4096, 0},
336       {100, EventHandler::WRITE, 100, 0},
337       {0, 0, 0, 0},
338   };
339   TimePoint start;
340   scheduleEvents(&eb, sp[1], events);
341 
342   // Schedule a timeout to unregister the handler after the third write
343   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
344 
345   // Loop
346   eb.loop();
347   TimePoint end;
348 
349   // The handler should have received the first 3 events,
350   // then been unregistered after that.
351   ASSERT_EQ(handler.log.size(), 3);
352   for (int n = 0; n < 3; ++n) {
353     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
354     T_CHECK_TIMEOUT(
355         start,
356         handler.log[n].timestamp,
357         std::chrono::milliseconds(events[n].milliseconds));
358     ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
359     ASSERT_EQ(handler.log[n].bytesWritten, 0);
360   }
361   T_CHECK_TIMEOUT(
362       start, end, std::chrono::milliseconds(events[3].milliseconds));
363 
364   // Make sure the data from the last write is still waiting to be read
365   size_t bytesRemaining = readUntilEmpty(sp[0]);
366   ASSERT_EQ(bytesRemaining, events[3].length);
367 }
368 
369 /**
370  * Test registering for READ when the socket is immediately readable
371  */
TYPED_TEST_P(EventBaseTest,ReadImmediate)372 TYPED_TEST_P(EventBaseTest, ReadImmediate) {
373   auto evbPtr = getEventBase<TypeParam>();
374   SKIP_IF(!evbPtr) << "Backend not available";
375   folly::EventBase& eb = *evbPtr;
376   SocketPair sp;
377 
378   // Write some data to the socket so the other end will
379   // be immediately readable
380   size_t dataLength = 1234;
381   writeToFD(sp[1], dataLength);
382 
383   // Register for read events
384   TestHandler handler(&eb, sp[0]);
385   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
386 
387   // Register a timeout to perform another write
388   ScheduledEvent events[] = {
389       {10, EventHandler::WRITE, 2345, 0},
390       {0, 0, 0, 0},
391   };
392   TimePoint start;
393   scheduleEvents(&eb, sp[1], events);
394 
395   // Schedule a timeout to unregister the handler
396   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
397 
398   // Loop
399   eb.loop();
400   TimePoint end;
401 
402   ASSERT_EQ(handler.log.size(), 2);
403 
404   // There should have been 1 event for immediate readability
405   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
406   T_CHECK_TIMEOUT(
407       start, handler.log[0].timestamp, std::chrono::milliseconds(0));
408   ASSERT_EQ(handler.log[0].bytesRead, dataLength);
409   ASSERT_EQ(handler.log[0].bytesWritten, 0);
410 
411   // There should be another event after the timeout wrote more data
412   ASSERT_EQ(handler.log[1].events, EventHandler::READ);
413   T_CHECK_TIMEOUT(
414       start,
415       handler.log[1].timestamp,
416       std::chrono::milliseconds(events[0].milliseconds));
417   ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
418   ASSERT_EQ(handler.log[1].bytesWritten, 0);
419 
420   T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(20));
421 }
422 
423 /**
424  * Test a WRITE event
425  */
TYPED_TEST_P(EventBaseTest,WriteEvent)426 TYPED_TEST_P(EventBaseTest, WriteEvent) {
427   auto evbPtr = getEventBase<TypeParam>();
428   SKIP_IF(!evbPtr) << "Backend not available";
429   folly::EventBase& eb = *evbPtr;
430   SocketPair sp;
431 
432   // Fill up the write buffer before starting
433   size_t initialBytesWritten = writeUntilFull(sp[0]);
434 
435   // Register for write events
436   TestHandler handler(&eb, sp[0]);
437   handler.registerHandler(EventHandler::WRITE);
438 
439   // Register timeouts to perform two reads
440   ScheduledEvent events[] = {
441       {10, EventHandler::READ, 0, 0},
442       {60, EventHandler::READ, 0, 0},
443       {0, 0, 0, 0},
444   };
445   TimePoint start;
446   scheduleEvents(&eb, sp[1], events);
447 
448   // Loop
449   eb.loop();
450   TimePoint end;
451 
452   // Since we didn't use the EventHandler::PERSIST flag, the handler should
453   // have only been able to write once, then unregistered itself.
454   ASSERT_EQ(handler.log.size(), 1);
455   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
456   T_CHECK_TIMEOUT(
457       start,
458       handler.log[0].timestamp,
459       std::chrono::milliseconds(events[0].milliseconds));
460   ASSERT_EQ(handler.log[0].bytesRead, 0);
461   ASSERT_GT(handler.log[0].bytesWritten, 0);
462   T_CHECK_TIMEOUT(
463       start, end, std::chrono::milliseconds(events[1].milliseconds));
464 
465   ASSERT_EQ(events[0].result, initialBytesWritten);
466   ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
467 }
468 
469 /**
470  * Test (WRITE | PERSIST)
471  */
TYPED_TEST_P(EventBaseTest,WritePersist)472 TYPED_TEST_P(EventBaseTest, WritePersist) {
473   auto evbPtr = getEventBase<TypeParam>();
474   SKIP_IF(!evbPtr) << "Backend not available";
475   folly::EventBase& eb = *evbPtr;
476   SocketPair sp;
477 
478   // Fill up the write buffer before starting
479   size_t initialBytesWritten = writeUntilFull(sp[0]);
480 
481   // Register for write events
482   TestHandler handler(&eb, sp[0]);
483   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
484 
485   // Register several timeouts to read from the socket at several intervals
486   ScheduledEvent events[] = {
487       {10, EventHandler::READ, 0, 0},
488       {40, EventHandler::READ, 0, 0},
489       {70, EventHandler::READ, 0, 0},
490       {100, EventHandler::READ, 0, 0},
491       {0, 0, 0, 0},
492   };
493   TimePoint start;
494   scheduleEvents(&eb, sp[1], events);
495 
496   // Schedule a timeout to unregister the handler after the third read
497   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
498 
499   // Loop
500   eb.loop();
501   TimePoint end;
502 
503   // The handler should have received the first 3 events,
504   // then been unregistered after that.
505   ASSERT_EQ(handler.log.size(), 3);
506   ASSERT_EQ(events[0].result, initialBytesWritten);
507   for (int n = 0; n < 3; ++n) {
508     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
509     T_CHECK_TIMEOUT(
510         start,
511         handler.log[n].timestamp,
512         std::chrono::milliseconds(events[n].milliseconds));
513     ASSERT_EQ(handler.log[n].bytesRead, 0);
514     ASSERT_GT(handler.log[n].bytesWritten, 0);
515     ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
516   }
517   T_CHECK_TIMEOUT(
518       start, end, std::chrono::milliseconds(events[3].milliseconds));
519 }
520 
521 /**
522  * Test registering for WRITE when the socket is immediately writable
523  */
TYPED_TEST_P(EventBaseTest,WriteImmediate)524 TYPED_TEST_P(EventBaseTest, WriteImmediate) {
525   auto evbPtr = getEventBase<TypeParam>();
526   SKIP_IF(!evbPtr) << "Backend not available";
527   folly::EventBase& eb = *evbPtr;
528   SocketPair sp;
529 
530   // Register for write events
531   TestHandler handler(&eb, sp[0]);
532   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
533 
534   // Register a timeout to perform a read
535   ScheduledEvent events[] = {
536       {10, EventHandler::READ, 0, 0},
537       {0, 0, 0, 0},
538   };
539   TimePoint start;
540   scheduleEvents(&eb, sp[1], events);
541 
542   // Schedule a timeout to unregister the handler
543   int64_t unregisterTimeout = 40;
544   eb.tryRunAfterDelay(
545       std::bind(&TestHandler::unregisterHandler, &handler), unregisterTimeout);
546 
547   // Loop
548   eb.loop();
549   TimePoint end;
550 
551   ASSERT_EQ(handler.log.size(), 2);
552 
553   // Since the socket buffer was initially empty,
554   // there should have been 1 event for immediate writability
555   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
556   T_CHECK_TIMEOUT(
557       start, handler.log[0].timestamp, std::chrono::milliseconds(0));
558   ASSERT_EQ(handler.log[0].bytesRead, 0);
559   ASSERT_GT(handler.log[0].bytesWritten, 0);
560 
561   // There should be another event after the timeout wrote more data
562   ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
563   T_CHECK_TIMEOUT(
564       start,
565       handler.log[1].timestamp,
566       std::chrono::milliseconds(events[0].milliseconds));
567   ASSERT_EQ(handler.log[1].bytesRead, 0);
568   ASSERT_GT(handler.log[1].bytesWritten, 0);
569 
570   T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(unregisterTimeout));
571 }
572 
573 /**
574  * Test (READ | WRITE) when the socket becomes readable first
575  */
TYPED_TEST_P(EventBaseTest,ReadWrite)576 TYPED_TEST_P(EventBaseTest, ReadWrite) {
577   auto evbPtr = getEventBase<TypeParam>();
578   SKIP_IF(!evbPtr) << "Backend not available";
579   folly::EventBase& eb = *evbPtr;
580   SocketPair sp;
581 
582   // Fill up the write buffer before starting
583   size_t sock0WriteLength = writeUntilFull(sp[0]);
584 
585   // Register for read and write events
586   TestHandler handler(&eb, sp[0]);
587   handler.registerHandler(EventHandler::READ_WRITE);
588 
589   // Register timeouts to perform a write then a read.
590   ScheduledEvent events[] = {
591       {10, EventHandler::WRITE, 2345, 0},
592       {40, EventHandler::READ, 0, 0},
593       {0, 0, 0, 0},
594   };
595   TimePoint start;
596   scheduleEvents(&eb, sp[1], events);
597 
598   // Loop
599   eb.loop();
600   TimePoint end;
601 
602   // Since we didn't use the EventHandler::PERSIST flag, the handler should
603   // have only noticed readability, then unregistered itself.  Check that only
604   // one event was logged.
605   ASSERT_EQ(handler.log.size(), 1);
606   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
607   T_CHECK_TIMEOUT(
608       start,
609       handler.log[0].timestamp,
610       std::chrono::milliseconds(events[0].milliseconds));
611   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
612   ASSERT_EQ(handler.log[0].bytesWritten, 0);
613   ASSERT_EQ(events[1].result, sock0WriteLength);
614   T_CHECK_TIMEOUT(
615       start, end, std::chrono::milliseconds(events[1].milliseconds));
616 }
617 
618 /**
619  * Test (READ | WRITE) when the socket becomes writable first
620  */
TYPED_TEST_P(EventBaseTest,WriteRead)621 TYPED_TEST_P(EventBaseTest, WriteRead) {
622   auto evbPtr = getEventBase<TypeParam>();
623   SKIP_IF(!evbPtr) << "Backend not available";
624   folly::EventBase& eb = *evbPtr;
625   SocketPair sp;
626 
627   // Fill up the write buffer before starting
628   size_t sock0WriteLength = writeUntilFull(sp[0]);
629 
630   // Register for read and write events
631   TestHandler handler(&eb, sp[0]);
632   handler.registerHandler(EventHandler::READ_WRITE);
633 
634   // Register timeouts to perform a read then a write.
635   size_t sock1WriteLength = 2345;
636   ScheduledEvent events[] = {
637       {10, EventHandler::READ, 0, 0},
638       {40, EventHandler::WRITE, sock1WriteLength, 0},
639       {0, 0, 0, 0},
640   };
641   TimePoint start;
642   scheduleEvents(&eb, sp[1], events);
643 
644   // Loop
645   eb.loop();
646   TimePoint end;
647 
648   // Since we didn't use the EventHandler::PERSIST flag, the handler should
649   // have only noticed writability, then unregistered itself.  Check that only
650   // one event was logged.
651   ASSERT_EQ(handler.log.size(), 1);
652   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
653   T_CHECK_TIMEOUT(
654       start,
655       handler.log[0].timestamp,
656       std::chrono::milliseconds(events[0].milliseconds));
657   ASSERT_EQ(handler.log[0].bytesRead, 0);
658   ASSERT_GT(handler.log[0].bytesWritten, 0);
659   ASSERT_EQ(events[0].result, sock0WriteLength);
660   ASSERT_EQ(events[1].result, sock1WriteLength);
661   T_CHECK_TIMEOUT(
662       start, end, std::chrono::milliseconds(events[1].milliseconds));
663 
664   // Make sure the written data is still waiting to be read.
665   size_t bytesRemaining = readUntilEmpty(sp[0]);
666   ASSERT_EQ(bytesRemaining, events[1].length);
667 }
668 
669 /**
670  * Test (READ | WRITE) when the socket becomes readable and writable
671  * at the same time.
672  */
TYPED_TEST_P(EventBaseTest,ReadWriteSimultaneous)673 TYPED_TEST_P(EventBaseTest, ReadWriteSimultaneous) {
674   auto evbPtr = getEventBase<TypeParam>();
675   SKIP_IF(!evbPtr) << "Backend not available";
676   folly::EventBase& eb = *evbPtr;
677   SocketPair sp;
678 
679   // Fill up the write buffer before starting
680   size_t sock0WriteLength = writeUntilFull(sp[0]);
681 
682   // Register for read and write events
683   TestHandler handler(&eb, sp[0]);
684   handler.registerHandler(EventHandler::READ_WRITE);
685 
686   // Register a timeout to perform a read and write together
687   ScheduledEvent events[] = {
688       {10, EventHandler::READ | EventHandler::WRITE, 0, 0},
689       {0, 0, 0, 0},
690   };
691   TimePoint start;
692   scheduleEvents(&eb, sp[1], events);
693 
694   // Loop
695   eb.loop();
696   TimePoint end;
697 
698   // It's not strictly required that the EventBase register us about both
699   // events in the same call or thw read/write notifications are delievered at
700   // the same. So, it's possible that if the EventBase implementation changes
701   // this test could start failing, and it wouldn't be considered breaking the
702   // API.  However for now it's nice to exercise this code path.
703   ASSERT_EQ(handler.log.size(), 1);
704   if (handler.log[0].events & EventHandler::READ) {
705     ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
706     ASSERT_GT(handler.log[0].bytesWritten, 0);
707   }
708   T_CHECK_TIMEOUT(
709       start,
710       handler.log[0].timestamp,
711       std::chrono::milliseconds(events[0].milliseconds));
712   T_CHECK_TIMEOUT(
713       start, end, std::chrono::milliseconds(events[0].milliseconds));
714 }
715 
716 /**
717  * Test (READ | WRITE | PERSIST)
718  */
TYPED_TEST_P(EventBaseTest,ReadWritePersist)719 TYPED_TEST_P(EventBaseTest, ReadWritePersist) {
720   auto evbPtr = getEventBase<TypeParam>();
721   SKIP_IF(!evbPtr) << "Backend not available";
722   folly::EventBase& eb = *evbPtr;
723   SocketPair sp;
724 
725   // Register for read and write events
726   TestHandler handler(&eb, sp[0]);
727   handler.registerHandler(
728       EventHandler::READ | EventHandler::WRITE | EventHandler::PERSIST);
729 
730   // Register timeouts to perform several reads and writes
731   ScheduledEvent events[] = {
732       {10, EventHandler::WRITE, 2345, 0},
733       {20, EventHandler::READ, 0, 0},
734       {35, EventHandler::WRITE, 200, 0},
735       {45, EventHandler::WRITE, 15, 0},
736       {55, EventHandler::READ, 0, 0},
737       {120, EventHandler::WRITE, 2345, 0},
738       {0, 0, 0, 0},
739   };
740   TimePoint start;
741   scheduleEvents(&eb, sp[1], events);
742 
743   // Schedule a timeout to unregister the handler
744   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
745 
746   // Loop
747   eb.loop();
748   TimePoint end;
749 
750   ASSERT_EQ(handler.log.size(), 6);
751 
752   // Since we didn't fill up the write buffer immediately, there should
753   // be an immediate event for writability.
754   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
755   T_CHECK_TIMEOUT(
756       start, handler.log[0].timestamp, std::chrono::milliseconds(0));
757   ASSERT_EQ(handler.log[0].bytesRead, 0);
758   ASSERT_GT(handler.log[0].bytesWritten, 0);
759 
760   // Events 1 through 5 should correspond to the scheduled events
761   for (int n = 1; n < 6; ++n) {
762     ScheduledEvent* event = &events[n - 1];
763     T_CHECK_TIMEOUT(
764         start,
765         handler.log[n].timestamp,
766         std::chrono::milliseconds(event->milliseconds));
767     if (event->events == EventHandler::READ) {
768       ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
769       ASSERT_EQ(handler.log[n].bytesRead, 0);
770       ASSERT_GT(handler.log[n].bytesWritten, 0);
771     } else {
772       ASSERT_EQ(handler.log[n].events, EventHandler::READ);
773       ASSERT_EQ(handler.log[n].bytesRead, event->length);
774       ASSERT_EQ(handler.log[n].bytesWritten, 0);
775     }
776   }
777 
778   // The timeout should have unregistered the handler before the last write.
779   // Make sure that data is still waiting to be read
780   size_t bytesRemaining = readUntilEmpty(sp[0]);
781   ASSERT_EQ(bytesRemaining, events[5].length);
782 }
783 
784 namespace {
785 class PartialReadHandler : public TestHandler {
786  public:
PartialReadHandler(EventBase * eventBase,int fd,size_t readLength)787   PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
788       : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
789 
handlerReady(uint16_t events)790   void handlerReady(uint16_t events) noexcept override {
791     assert(events == EventHandler::READ);
792     ssize_t bytesRead = readFromFD(fd_, readLength_);
793     log.emplace_back(events, bytesRead, 0);
794   }
795 
796  private:
797   int fd_;
798   size_t readLength_;
799 };
800 } // namespace
801 
802 /**
803  * Test reading only part of the available data when a read event is fired.
804  * When PERSIST is used, make sure the handler gets notified again the next
805  * time around the loop.
806  */
TYPED_TEST_P(EventBaseTest,ReadPartial)807 TYPED_TEST_P(EventBaseTest, ReadPartial) {
808   auto evbPtr = getEventBase<TypeParam>();
809   SKIP_IF(!evbPtr) << "Backend not available";
810   folly::EventBase& eb = *evbPtr;
811   SocketPair sp;
812 
813   // Register for read events
814   size_t readLength = 100;
815   PartialReadHandler handler(&eb, sp[0], readLength);
816   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
817 
818   // Register a timeout to perform a single write,
819   // with more data than PartialReadHandler will read at once
820   ScheduledEvent events[] = {
821       {10, EventHandler::WRITE, (3 * readLength) + (readLength / 2), 0},
822       {0, 0, 0, 0},
823   };
824   TimePoint start;
825   scheduleEvents(&eb, sp[1], events);
826 
827   // Schedule a timeout to unregister the handler
828   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
829 
830   // Loop
831   eb.loop();
832   TimePoint end;
833 
834   ASSERT_EQ(handler.log.size(), 4);
835 
836   // The first 3 invocations should read readLength bytes each
837   for (int n = 0; n < 3; ++n) {
838     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
839     T_CHECK_TIMEOUT(
840         start,
841         handler.log[n].timestamp,
842         std::chrono::milliseconds(events[0].milliseconds));
843     ASSERT_EQ(handler.log[n].bytesRead, readLength);
844     ASSERT_EQ(handler.log[n].bytesWritten, 0);
845   }
846   // The last read only has readLength/2 bytes
847   ASSERT_EQ(handler.log[3].events, EventHandler::READ);
848   T_CHECK_TIMEOUT(
849       start,
850       handler.log[3].timestamp,
851       std::chrono::milliseconds(events[0].milliseconds));
852   ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
853   ASSERT_EQ(handler.log[3].bytesWritten, 0);
854 }
855 
856 namespace {
857 class PartialWriteHandler : public TestHandler {
858  public:
PartialWriteHandler(EventBase * eventBase,int fd,size_t writeLength)859   PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
860       : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
861 
handlerReady(uint16_t events)862   void handlerReady(uint16_t events) noexcept override {
863     assert(events == EventHandler::WRITE);
864     ssize_t bytesWritten = writeToFD(fd_, writeLength_);
865     log.emplace_back(events, 0, bytesWritten);
866   }
867 
868  private:
869   int fd_;
870   size_t writeLength_;
871 };
872 } // namespace
873 
874 /**
875  * Test writing without completely filling up the write buffer when the fd
876  * becomes writable.  When PERSIST is used, make sure the handler gets
877  * notified again the next time around the loop.
878  */
TYPED_TEST_P(EventBaseTest,WritePartial)879 TYPED_TEST_P(EventBaseTest, WritePartial) {
880   auto evbPtr = getEventBase<TypeParam>();
881   SKIP_IF(!evbPtr) << "Backend not available";
882   folly::EventBase& eb = *evbPtr;
883   SocketPair sp;
884 
885   // Fill up the write buffer before starting
886   size_t initialBytesWritten = writeUntilFull(sp[0]);
887 
888   // Register for write events
889   size_t writeLength = 100;
890   PartialWriteHandler handler(&eb, sp[0], writeLength);
891   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
892 
893   // Register a timeout to read, so that more data can be written
894   ScheduledEvent events[] = {
895       {10, EventHandler::READ, 0, 0},
896       {0, 0, 0, 0},
897   };
898   TimePoint start;
899   scheduleEvents(&eb, sp[1], events);
900 
901   // Schedule a timeout to unregister the handler
902   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
903 
904   // Loop
905   eb.loop();
906   TimePoint end;
907 
908   // Depending on how big the socket buffer is, there will be multiple writes
909   // Only check the first 5
910   int numChecked = 5;
911   ASSERT_GE(handler.log.size(), numChecked);
912   ASSERT_EQ(events[0].result, initialBytesWritten);
913 
914   // The first 3 invocations should read writeLength bytes each
915   for (int n = 0; n < numChecked; ++n) {
916     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
917     T_CHECK_TIMEOUT(
918         start,
919         handler.log[n].timestamp,
920         std::chrono::milliseconds(events[0].milliseconds));
921     ASSERT_EQ(handler.log[n].bytesRead, 0);
922     ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
923   }
924 }
925 
926 namespace {
927 class DestroyHandler : public AsyncTimeout {
928  public:
DestroyHandler(EventBase * eb,EventHandler * h)929   DestroyHandler(EventBase* eb, EventHandler* h)
930       : AsyncTimeout(eb), handler_(h) {}
931 
timeoutExpired()932   void timeoutExpired() noexcept override { delete handler_; }
933 
934  private:
935   EventHandler* handler_;
936 };
937 } // namespace
938 
939 /**
940  * Test destroying a registered EventHandler
941  */
TYPED_TEST_P(EventBaseTest,DestroyingHandler)942 TYPED_TEST_P(EventBaseTest, DestroyingHandler) {
943   auto evbPtr = getEventBase<TypeParam>();
944   SKIP_IF(!evbPtr) << "Backend not available";
945   folly::EventBase& eb = *evbPtr;
946   SocketPair sp;
947 
948   // Fill up the write buffer before starting
949   size_t initialBytesWritten = writeUntilFull(sp[0]);
950 
951   // Register for write events
952   TestHandler* handler = new TestHandler(&eb, sp[0]);
953   handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
954 
955   // After 10ms, read some data, so that the handler
956   // will be notified that it can write.
957   eb.tryRunAfterDelay(
958       std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten), 10);
959 
960   // Start a timer to destroy the handler after 25ms
961   // This mainly just makes sure the code doesn't break or assert
962   DestroyHandler dh(&eb, handler);
963   dh.scheduleTimeout(25);
964 
965   TimePoint start;
966   eb.loop();
967   TimePoint end;
968 
969   // Make sure the EventHandler was uninstalled properly when it was
970   // destroyed, and the EventBase loop exited
971   T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(25));
972 
973   // Make sure that the handler wrote data to the socket
974   // before it was destroyed
975   size_t bytesRemaining = readUntilEmpty(sp[1]);
976   ASSERT_GT(bytesRemaining, 0);
977 }
978 
979 ///////////////////////////////////////////////////////////////////////////
980 // Tests for timeout events
981 ///////////////////////////////////////////////////////////////////////////
982 
TYPED_TEST_P(EventBaseTest,RunAfterDelay)983 TYPED_TEST_P(EventBaseTest, RunAfterDelay) {
984   auto evbPtr = getEventBase<TypeParam>();
985   SKIP_IF(!evbPtr) << "Backend not available";
986   folly::EventBase& eb = *evbPtr;
987 
988   TimePoint timestamp1(false);
989   TimePoint timestamp2(false);
990   TimePoint timestamp3(false);
991   auto fn1 = std::bind(&TimePoint::reset, &timestamp1);
992   auto fn2 = std::bind(&TimePoint::reset, &timestamp2);
993   auto fn3 = std::bind(&TimePoint::reset, &timestamp3);
994 
995   TimePoint start;
996   eb.tryRunAfterDelay(std::move(fn1), 10);
997   eb.tryRunAfterDelay(std::move(fn2), 20);
998   eb.tryRunAfterDelay(std::move(fn3), 40);
999 
1000   eb.loop();
1001   TimePoint end;
1002 
1003   T_CHECK_TIMEOUT(start, timestamp1, std::chrono::milliseconds(10));
1004   T_CHECK_TIMEOUT(start, timestamp2, std::chrono::milliseconds(20));
1005   T_CHECK_TIMEOUT(start, timestamp3, std::chrono::milliseconds(40));
1006   T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(40));
1007 }
1008 
1009 /**
1010  * Test the behavior of tryRunAfterDelay() when some timeouts are
1011  * still scheduled when the EventBase is destroyed.
1012  */
TYPED_TEST_P(EventBaseTest,RunAfterDelayDestruction)1013 TYPED_TEST_P(EventBaseTest, RunAfterDelayDestruction) {
1014   TimePoint timestamp1(false);
1015   TimePoint timestamp2(false);
1016   TimePoint timestamp3(false);
1017   TimePoint timestamp4(false);
1018   TimePoint start(false);
1019   TimePoint end(false);
1020 
1021   {
1022     auto evbPtr = getEventBase<TypeParam>();
1023     SKIP_IF(!evbPtr) << "Backend not available";
1024     folly::EventBase& eb = *evbPtr;
1025     start.reset();
1026 
1027     // Run two normal timeouts
1028     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
1029     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
1030 
1031     // Schedule a timeout to stop the event loop after 40ms
1032     eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
1033 
1034     // Schedule 2 timeouts that would fire after the event loop stops
1035     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
1036     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
1037 
1038     eb.loop();
1039     end.reset();
1040   }
1041 
1042   T_CHECK_TIMEOUT(start, timestamp1, std::chrono::milliseconds(10));
1043   T_CHECK_TIMEOUT(start, timestamp2, std::chrono::milliseconds(20));
1044   T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(40));
1045 
1046   ASSERT_TRUE(timestamp3.isUnset());
1047   ASSERT_TRUE(timestamp4.isUnset());
1048 
1049   // Ideally this test should be run under valgrind to ensure that no
1050   // memory is leaked.
1051 }
1052 
1053 namespace {
1054 class TestTimeout : public AsyncTimeout {
1055  public:
TestTimeout(EventBase * eventBase)1056   explicit TestTimeout(EventBase* eventBase)
1057       : AsyncTimeout(eventBase), timestamp(false) {}
1058 
timeoutExpired()1059   void timeoutExpired() noexcept override { timestamp.reset(); }
1060 
1061   TimePoint timestamp;
1062 };
1063 } // namespace
1064 
TYPED_TEST_P(EventBaseTest,BasicTimeouts)1065 TYPED_TEST_P(EventBaseTest, BasicTimeouts) {
1066   auto evbPtr = getEventBase<TypeParam>();
1067   SKIP_IF(!evbPtr) << "Backend not available";
1068   folly::EventBase& eb = *evbPtr;
1069 
1070   TestTimeout t1(&eb);
1071   TestTimeout t2(&eb);
1072   TestTimeout t3(&eb);
1073   TimePoint start;
1074   t1.scheduleTimeout(10);
1075   t2.scheduleTimeout(20);
1076   t3.scheduleTimeout(40);
1077 
1078   eb.loop();
1079   TimePoint end;
1080 
1081   T_CHECK_TIMEOUT(start, t1.timestamp, std::chrono::milliseconds(10));
1082   T_CHECK_TIMEOUT(start, t2.timestamp, std::chrono::milliseconds(20));
1083   T_CHECK_TIMEOUT(start, t3.timestamp, std::chrono::milliseconds(40));
1084   T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(40));
1085 }
1086 
1087 namespace {
1088 class ReschedulingTimeout : public AsyncTimeout {
1089  public:
ReschedulingTimeout(EventBase * evb,const std::vector<uint32_t> & timeouts)1090   ReschedulingTimeout(EventBase* evb, const std::vector<uint32_t>& timeouts)
1091       : AsyncTimeout(evb), timeouts_(timeouts), iterator_(timeouts_.begin()) {}
1092 
start()1093   void start() { reschedule(); }
1094 
timeoutExpired()1095   void timeoutExpired() noexcept override {
1096     timestamps.emplace_back();
1097     reschedule();
1098   }
1099 
reschedule()1100   void reschedule() {
1101     if (iterator_ != timeouts_.end()) {
1102       uint32_t timeout = *iterator_;
1103       ++iterator_;
1104       scheduleTimeout(timeout);
1105     }
1106   }
1107 
1108   std::vector<TimePoint> timestamps;
1109 
1110  private:
1111   std::vector<uint32_t> timeouts_;
1112   std::vector<uint32_t>::const_iterator iterator_;
1113 };
1114 } // namespace
1115 
1116 /**
1117  * Test rescheduling the same timeout multiple times
1118  */
TYPED_TEST_P(EventBaseTest,ReuseTimeout)1119 TYPED_TEST_P(EventBaseTest, ReuseTimeout) {
1120   auto evbPtr = getEventBase<TypeParam>();
1121   SKIP_IF(!evbPtr) << "Backend not available";
1122   folly::EventBase& eb = *evbPtr;
1123 
1124   std::vector<uint32_t> timeouts;
1125   timeouts.push_back(10);
1126   timeouts.push_back(30);
1127   timeouts.push_back(15);
1128 
1129   ReschedulingTimeout t(&eb, timeouts);
1130   TimePoint start;
1131   t.start();
1132   eb.loop();
1133   TimePoint end;
1134 
1135   // Use a higher tolerance than usual.  We're waiting on 3 timeouts
1136   // consecutively.  In general, each timeout may go over by a few
1137   // milliseconds, and we're tripling this error by witing on 3 timeouts.
1138   std::chrono::milliseconds tolerance{6};
1139 
1140   ASSERT_EQ(timeouts.size(), t.timestamps.size());
1141   uint32_t total = 0;
1142   for (size_t n = 0; n < timeouts.size(); ++n) {
1143     total += timeouts[n];
1144     T_CHECK_TIMEOUT(
1145         start, t.timestamps[n], std::chrono::milliseconds(total), tolerance);
1146   }
1147   T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(total), tolerance);
1148 }
1149 
1150 /**
1151  * Test rescheduling a timeout before it has fired
1152  */
TYPED_TEST_P(EventBaseTest,RescheduleTimeout)1153 TYPED_TEST_P(EventBaseTest, RescheduleTimeout) {
1154   auto evbPtr = getEventBase<TypeParam>();
1155   SKIP_IF(!evbPtr) << "Backend not available";
1156   folly::EventBase& eb = *evbPtr;
1157 
1158   TestTimeout t1(&eb);
1159   TestTimeout t2(&eb);
1160   TestTimeout t3(&eb);
1161 
1162   TimePoint start;
1163   t1.scheduleTimeout(15);
1164   t2.scheduleTimeout(30);
1165   t3.scheduleTimeout(30);
1166 
1167   // after 10ms, reschedule t2 to run sooner than originally scheduled
1168   eb.tryRunAfterDelay([&] { t2.scheduleTimeout(10); }, 10);
1169   // after 10ms, reschedule t3 to run later than originally scheduled
1170   eb.tryRunAfterDelay([&] { t3.scheduleTimeout(40); }, 10);
1171 
1172   eb.loop();
1173   TimePoint end;
1174 
1175   T_CHECK_TIMEOUT(start, t1.timestamp, std::chrono::milliseconds(15));
1176   T_CHECK_TIMEOUT(start, t2.timestamp, std::chrono::milliseconds(20));
1177   T_CHECK_TIMEOUT(start, t3.timestamp, std::chrono::milliseconds(50));
1178   T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(50));
1179 }
1180 
1181 /**
1182  * Test cancelling a timeout
1183  */
TYPED_TEST_P(EventBaseTest,CancelTimeout)1184 TYPED_TEST_P(EventBaseTest, CancelTimeout) {
1185   auto evbPtr = getEventBase<TypeParam>();
1186   SKIP_IF(!evbPtr) << "Backend not available";
1187   folly::EventBase& eb = *evbPtr;
1188 
1189   std::vector<uint32_t> timeouts;
1190   timeouts.push_back(10);
1191   timeouts.push_back(30);
1192   timeouts.push_back(25);
1193 
1194   ReschedulingTimeout t(&eb, timeouts);
1195   TimePoint start;
1196   t.start();
1197   eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1198 
1199   eb.loop();
1200   TimePoint end;
1201 
1202   ASSERT_EQ(t.timestamps.size(), 2);
1203   T_CHECK_TIMEOUT(start, t.timestamps[0], std::chrono::milliseconds(10));
1204   T_CHECK_TIMEOUT(start, t.timestamps[1], std::chrono::milliseconds(40));
1205   T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(50));
1206 }
1207 
1208 namespace {
1209 class DestroyTimeout : public AsyncTimeout {
1210  public:
DestroyTimeout(EventBase * eb,AsyncTimeout * t)1211   DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1212       : AsyncTimeout(eb), timeout_(t) {}
1213 
timeoutExpired()1214   void timeoutExpired() noexcept override { delete timeout_; }
1215 
1216  private:
1217   AsyncTimeout* timeout_;
1218 };
1219 } // namespace
1220 
1221 /**
1222  * Test destroying a scheduled timeout object
1223  */
TYPED_TEST_P(EventBaseTest,DestroyingTimeout)1224 TYPED_TEST_P(EventBaseTest, DestroyingTimeout) {
1225   auto evbPtr = getEventBase<TypeParam>();
1226   SKIP_IF(!evbPtr) << "Backend not available";
1227   folly::EventBase& eb = *evbPtr;
1228 
1229   TestTimeout* t1 = new TestTimeout(&eb);
1230   TimePoint start;
1231   t1->scheduleTimeout(30);
1232 
1233   DestroyTimeout dt(&eb, t1);
1234   dt.scheduleTimeout(10);
1235 
1236   eb.loop();
1237   TimePoint end;
1238 
1239   T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(10));
1240 }
1241 
1242 /**
1243  * Test the scheduled executor impl
1244  */
TYPED_TEST_P(EventBaseTest,ScheduledFn)1245 TYPED_TEST_P(EventBaseTest, ScheduledFn) {
1246   auto evbPtr = getEventBase<TypeParam>();
1247   SKIP_IF(!evbPtr) << "Backend not available";
1248   folly::EventBase& eb = *evbPtr;
1249 
1250   TimePoint timestamp1(false);
1251   TimePoint timestamp2(false);
1252   TimePoint timestamp3(false);
1253   auto fn1 = std::bind(&TimePoint::reset, &timestamp1);
1254   auto fn2 = std::bind(&TimePoint::reset, &timestamp2);
1255   auto fn3 = std::bind(&TimePoint::reset, &timestamp3);
1256   TimePoint start;
1257   eb.schedule(std::move(fn1), std::chrono::milliseconds(9));
1258   eb.schedule(std::move(fn2), std::chrono::milliseconds(19));
1259   eb.schedule(std::move(fn3), std::chrono::milliseconds(39));
1260 
1261   eb.loop();
1262   TimePoint end;
1263 
1264   T_CHECK_TIMEOUT(start, timestamp1, std::chrono::milliseconds(9));
1265   T_CHECK_TIMEOUT(start, timestamp2, std::chrono::milliseconds(19));
1266   T_CHECK_TIMEOUT(start, timestamp3, std::chrono::milliseconds(39));
1267   T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(39));
1268 }
1269 
TYPED_TEST_P(EventBaseTest,ScheduledFnAt)1270 TYPED_TEST_P(EventBaseTest, ScheduledFnAt) {
1271   auto evbPtr = getEventBase<TypeParam>();
1272   SKIP_IF(!evbPtr) << "Backend not available";
1273   folly::EventBase& eb = *evbPtr;
1274 
1275   TimePoint timestamp0(false);
1276   TimePoint timestamp1(false);
1277   TimePoint timestamp2(false);
1278   TimePoint timestamp3(false);
1279   auto fn0 = std::bind(&TimePoint::reset, &timestamp0);
1280   auto fn1 = std::bind(&TimePoint::reset, &timestamp1);
1281   auto fn2 = std::bind(&TimePoint::reset, &timestamp2);
1282   auto fn3 = std::bind(&TimePoint::reset, &timestamp3);
1283   TimePoint start;
1284   eb.scheduleAt(fn0, eb.now() - std::chrono::milliseconds(5));
1285   eb.scheduleAt(fn1, eb.now() + std::chrono::milliseconds(9));
1286   eb.scheduleAt(fn2, eb.now() + std::chrono::milliseconds(19));
1287   eb.scheduleAt(fn3, eb.now() + std::chrono::milliseconds(39));
1288 
1289   TimePoint loopStart;
1290   eb.loop();
1291   TimePoint end;
1292 
1293   // Even though we asked to schedule the first function in the past,
1294   // in practice it doesn't run until after 1 iteration of the HHWheelTimer tick
1295   // interval.
1296   T_CHECK_TIMEOUT(start, timestamp0, eb.timer().getTickInterval());
1297 
1298   T_CHECK_TIMEOUT(start, timestamp1, std::chrono::milliseconds(9));
1299   T_CHECK_TIMEOUT(start, timestamp2, std::chrono::milliseconds(19));
1300   T_CHECK_TIMEOUT(start, timestamp3, std::chrono::milliseconds(39));
1301   T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(39));
1302 }
1303 
1304 ///////////////////////////////////////////////////////////////////////////
1305 // Test for runInThreadTestFunc()
1306 ///////////////////////////////////////////////////////////////////////////
1307 
1308 namespace {
1309 
1310 struct RunInThreadData {
RunInThreadDataRunInThreadData1311   RunInThreadData(
1312       folly::EventBaseBackendBase::FactoryFunc backendFactory,
1313       int numThreads,
1314       int opsPerThread_)
1315       : evb(folly::EventBase::Options().setBackendFactory(
1316             std::move(backendFactory))),
1317         opsPerThread(opsPerThread_),
1318         opsToGo(numThreads * opsPerThread) {}
1319 
1320   EventBase evb;
1321   std::deque<std::pair<int, int>> values;
1322 
1323   int opsPerThread;
1324   int opsToGo;
1325 };
1326 
1327 struct RunInThreadArg {
RunInThreadArgRunInThreadArg1328   RunInThreadArg(RunInThreadData* data_, int threadId, int value_)
1329       : data(data_), thread(threadId), value(value_) {}
1330 
1331   RunInThreadData* data;
1332   int thread;
1333   int value;
1334 };
1335 
runInThreadTestFunc(RunInThreadArg * arg)1336 static inline void runInThreadTestFunc(RunInThreadArg* arg) {
1337   arg->data->values.emplace_back(arg->thread, arg->value);
1338   RunInThreadData* data = arg->data;
1339   delete arg;
1340 
1341   if (--data->opsToGo == 0) {
1342     // Break out of the event base loop if we are the last thread running
1343     data->evb.terminateLoopSoon();
1344   }
1345 }
1346 
1347 } // namespace
1348 
TYPED_TEST_P(EventBaseTest,RunInThread)1349 TYPED_TEST_P(EventBaseTest, RunInThread) {
1350   constexpr uint32_t numThreads = 50;
1351   constexpr uint32_t opsPerThread = 100;
1352   auto backend = TypeParam::getBackend();
1353   SKIP_IF(!backend) << "Backend not available";
1354   RunInThreadData data(
1355       [] { return TypeParam::getBackend(); }, numThreads, opsPerThread);
1356 
1357   std::deque<std::thread> threads;
1358   SCOPE_EXIT {
1359     // Wait on all of the threads.
1360     for (auto& thread : threads) {
1361       thread.join();
1362     }
1363   };
1364 
1365   for (uint32_t i = 0; i < numThreads; ++i) {
1366     threads.emplace_back([i, &data] {
1367       for (int n = 0; n < data.opsPerThread; ++n) {
1368         RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1369         data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1370         usleep(10);
1371       }
1372     });
1373   }
1374 
1375   // Add a timeout event to run after 3 seconds.
1376   // Otherwise loop() will return immediately since there are no events to run.
1377   // Once the last thread exits, it will stop the loop().  However, this
1378   // timeout also stops the loop in case there is a bug performing the normal
1379   // stop.
1380   data.evb.tryRunAfterDelay(
1381       std::bind(&EventBase::terminateLoopSoon, &data.evb), 3000);
1382 
1383   TimePoint start;
1384   data.evb.loop();
1385   TimePoint end;
1386 
1387   // Verify that the loop exited because all threads finished and requested it
1388   // to stop.  This should happen much sooner than the 3 second timeout.
1389   // Assert that it happens in under a second.  (This is still tons of extra
1390   // padding.)
1391 
1392   auto timeTaken = std::chrono::duration_cast<std::chrono::milliseconds>(
1393       end.getTime() - start.getTime());
1394   ASSERT_LT(timeTaken.count(), 1000);
1395   VLOG(11) << "Time taken: " << timeTaken.count();
1396 
1397   // Verify that we have all of the events from every thread
1398   int expectedValues[numThreads];
1399   for (uint32_t n = 0; n < numThreads; ++n) {
1400     expectedValues[n] = 0;
1401   }
1402   for (const auto& dataValue : data.values) {
1403     int threadID = dataValue.first;
1404     int value = dataValue.second;
1405     ASSERT_EQ(expectedValues[threadID], value);
1406     ++expectedValues[threadID];
1407   }
1408   for (uint32_t n = 0; n < numThreads; ++n) {
1409     ASSERT_EQ(expectedValues[n], opsPerThread);
1410   }
1411 }
1412 
1413 //  This test simulates some calls, and verifies that the waiting happens by
1414 //  triggering what otherwise would be race conditions, and trying to detect
1415 //  whether any of the race conditions happened.
TYPED_TEST_P(EventBaseTest,RunInEventBaseThreadAndWait)1416 TYPED_TEST_P(EventBaseTest, RunInEventBaseThreadAndWait) {
1417   const size_t c = 256;
1418   std::vector<std::unique_ptr<EventBase>> evbs;
1419   for (size_t i = 0; i < c; ++i) {
1420     auto evbPtr = getEventBase<TypeParam>();
1421     SKIP_IF(!evbPtr) << "Backend not available";
1422     evbs.push_back(std::move(evbPtr));
1423   }
1424 
1425   std::vector<std::unique_ptr<std::atomic<size_t>>> atoms(c);
1426   for (size_t i = 0; i < c; ++i) {
1427     auto& atom = atoms.at(i);
1428     atom = std::make_unique<std::atomic<size_t>>(0);
1429   }
1430   std::vector<std::thread> threads;
1431   for (size_t i = 0; i < c; ++i) {
1432     threads.emplace_back([&atoms, i, evb = std::move(evbs[i])] {
1433       folly::EventBase& eb = *evb;
1434       auto& atom = *atoms.at(i);
1435       auto ebth = std::thread([&] { eb.loopForever(); });
1436       eb.waitUntilRunning();
1437       eb.runInEventBaseThreadAndWait([&] {
1438         size_t x = 0;
1439         atom.compare_exchange_weak(
1440             x, 1, std::memory_order_release, std::memory_order_relaxed);
1441       });
1442       size_t x = 0;
1443       atom.compare_exchange_weak(
1444           x, 2, std::memory_order_release, std::memory_order_relaxed);
1445       eb.terminateLoopSoon();
1446       ebth.join();
1447     });
1448   }
1449   for (size_t i = 0; i < c; ++i) {
1450     auto& th = threads.at(i);
1451     th.join();
1452   }
1453   size_t sum = 0;
1454   for (auto& atom : atoms) {
1455     sum += *atom;
1456   }
1457   EXPECT_EQ(c, sum);
1458 }
1459 
TYPED_TEST_P(EventBaseTest,RunImmediatelyOrRunInEventBaseThreadAndWaitCross)1460 TYPED_TEST_P(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1461   auto evbPtr = getEventBase<TypeParam>();
1462   SKIP_IF(!evbPtr) << "Backend not available";
1463   folly::EventBase& eb = *evbPtr;
1464   std::thread th(&EventBase::loopForever, &eb);
1465   SCOPE_EXIT {
1466     eb.terminateLoopSoon();
1467     th.join();
1468   };
1469   auto mutated = false;
1470   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
1471   EXPECT_TRUE(mutated);
1472 }
1473 
TYPED_TEST_P(EventBaseTest,RunImmediatelyOrRunInEventBaseThreadAndWaitWithin)1474 TYPED_TEST_P(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1475   auto evbPtr = getEventBase<TypeParam>();
1476   SKIP_IF(!evbPtr) << "Backend not available";
1477   folly::EventBase& eb = *evbPtr;
1478   std::thread th(&EventBase::loopForever, &eb);
1479   SCOPE_EXIT {
1480     eb.terminateLoopSoon();
1481     th.join();
1482   };
1483   eb.runInEventBaseThreadAndWait([&] {
1484     auto mutated = false;
1485     eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
1486     EXPECT_TRUE(mutated);
1487   });
1488 }
1489 
TYPED_TEST_P(EventBaseTest,RunImmediatelyOrRunInEventBaseThreadNotLooping)1490 TYPED_TEST_P(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1491   auto evbPtr = getEventBase<TypeParam>();
1492   SKIP_IF(!evbPtr) << "Backend not available";
1493   folly::EventBase& eb = *evbPtr;
1494   auto mutated = false;
1495   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
1496   EXPECT_TRUE(mutated);
1497 }
1498 
1499 ///////////////////////////////////////////////////////////////////////////
1500 // Tests for runInLoop()
1501 ///////////////////////////////////////////////////////////////////////////
1502 
1503 namespace {
1504 class CountedLoopCallback : public EventBase::LoopCallback {
1505  public:
1506   CountedLoopCallback(
1507       EventBase* eventBase,
1508       unsigned int count,
1509       std::function<void()> action = std::function<void()>())
eventBase_(eventBase)1510       : eventBase_(eventBase), count_(count), action_(action) {}
1511 
runLoopCallback()1512   void runLoopCallback() noexcept override {
1513     --count_;
1514     if (count_ > 0) {
1515       eventBase_->runInLoop(this);
1516     } else if (action_) {
1517       action_();
1518     }
1519   }
1520 
getCount()1521   unsigned int getCount() const { return count_; }
1522 
1523  private:
1524   EventBase* eventBase_;
1525   unsigned int count_;
1526   std::function<void()> action_;
1527 };
1528 } // namespace
1529 
1530 // Test that EventBase::loop() doesn't exit while there are
1531 // still LoopCallbacks remaining to be invoked.
TYPED_TEST_P(EventBaseTest,RepeatedRunInLoop)1532 TYPED_TEST_P(EventBaseTest, RepeatedRunInLoop) {
1533   auto evbPtr = getEventBase<TypeParam>();
1534   SKIP_IF(!evbPtr) << "Backend not available";
1535   folly::EventBase& eventBase = *evbPtr;
1536 
1537   CountedLoopCallback c(&eventBase, 10);
1538   eventBase.runInLoop(&c);
1539   // The callback shouldn't have run immediately
1540   ASSERT_EQ(c.getCount(), 10);
1541   eventBase.loop();
1542 
1543   // loop() should loop until the CountedLoopCallback stops
1544   // re-installing itself.
1545   ASSERT_EQ(c.getCount(), 0);
1546 }
1547 
1548 // Test that EventBase::loop() works as expected without time measurements.
TYPED_TEST_P(EventBaseTest,RunInLoopNoTimeMeasurement)1549 TYPED_TEST_P(EventBaseTest, RunInLoopNoTimeMeasurement) {
1550   auto evbPtr = getEventBase<TypeParam>(
1551       EventBase::Options().setSkipTimeMeasurement(true));
1552   SKIP_IF(!evbPtr) << "Backend not available";
1553   folly::EventBase& eventBase = *evbPtr;
1554 
1555   CountedLoopCallback c(&eventBase, 10);
1556   eventBase.runInLoop(&c);
1557   // The callback shouldn't have run immediately
1558   ASSERT_EQ(c.getCount(), 10);
1559   eventBase.loop();
1560 
1561   // loop() should loop until the CountedLoopCallback stops
1562   // re-installing itself.
1563   ASSERT_EQ(c.getCount(), 0);
1564 }
1565 
1566 // Test runInLoop() calls with terminateLoopSoon()
TYPED_TEST_P(EventBaseTest,RunInLoopStopLoop)1567 TYPED_TEST_P(EventBaseTest, RunInLoopStopLoop) {
1568   auto evbPtr = getEventBase<TypeParam>();
1569   SKIP_IF(!evbPtr) << "Backend not available";
1570   folly::EventBase& eventBase = *evbPtr;
1571 
1572   CountedLoopCallback c1(&eventBase, 20);
1573   CountedLoopCallback c2(
1574       &eventBase, 10, std::bind(&EventBase::terminateLoopSoon, &eventBase));
1575 
1576   eventBase.runInLoop(&c1);
1577   eventBase.runInLoop(&c2);
1578   ASSERT_EQ(c1.getCount(), 20);
1579   ASSERT_EQ(c2.getCount(), 10);
1580 
1581   eventBase.loopForever();
1582 
1583   // c2 should have stopped the loop after 10 iterations
1584   ASSERT_EQ(c2.getCount(), 0);
1585 
1586   // We allow the EventBase to run the loop callbacks in whatever order it
1587   // chooses.  We'll accept c1's count being either 10 (if the loop terminated
1588   // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1589   // before c1 ran).
1590   //
1591   // (With the current code, c1 will always run 10 times, but we don't consider
1592   // this a hard API requirement.)
1593   ASSERT_GE(c1.getCount(), 10);
1594   ASSERT_LE(c1.getCount(), 11);
1595 }
1596 
TYPED_TEST_P(EventBaseTest1,pidCheck)1597 TYPED_TEST_P(EventBaseTest1, pidCheck) {
1598   auto evbPtr = getEventBase<TypeParam>();
1599   SKIP_IF(!evbPtr) << "Backend not available";
1600 
1601   auto deadManWalking = [&]() { evbPtr->loopForever(); };
1602   EXPECT_DEATH(deadManWalking(), "pid");
1603 }
1604 
TYPED_TEST_P(EventBaseTest,messageAvailableException)1605 TYPED_TEST_P(EventBaseTest, messageAvailableException) {
1606   auto evbPtr = getEventBase<TypeParam>();
1607   SKIP_IF(!evbPtr) << "Backend not available";
1608 
1609   auto deadManWalking = []() {
1610     auto evb = getEventBase<TypeParam>();
1611     std::thread t([&] {
1612       // Call this from another thread to force use of NotificationQueue in
1613       // runInEventBaseThread
1614       evb->runInEventBaseThread([]() { throw std::runtime_error("boom"); });
1615     });
1616     t.join();
1617     evb->loopForever();
1618   };
1619   EXPECT_DEATH(deadManWalking(), "boom");
1620 }
1621 
TYPED_TEST_P(EventBaseTest,TryRunningAfterTerminate)1622 TYPED_TEST_P(EventBaseTest, TryRunningAfterTerminate) {
1623   auto eventBasePtr = getEventBase<TypeParam>();
1624   SKIP_IF(!eventBasePtr) << "Backend not available";
1625   folly::EventBase& eventBase = *eventBasePtr;
1626 
1627   bool ran = false;
1628   CountedLoopCallback c1(
1629       &eventBase, 1, std::bind(&EventBase::terminateLoopSoon, &eventBase));
1630   eventBase.runInLoop(&c1);
1631   eventBase.loopForever();
1632   eventBase.runInEventBaseThread([&]() { ran = true; });
1633 
1634   ASSERT_FALSE(ran);
1635 
1636   eventBasePtr.reset();
1637   // Loop callbacks are triggered on EventBase destruction
1638   ASSERT_TRUE(ran);
1639 }
1640 
1641 // Test cancelling runInLoop() callbacks
TYPED_TEST_P(EventBaseTest,CancelRunInLoop)1642 TYPED_TEST_P(EventBaseTest, CancelRunInLoop) {
1643   auto eventBasePtr = getEventBase<TypeParam>();
1644   SKIP_IF(!eventBasePtr) << "Backend not available";
1645   folly::EventBase& eventBase = *eventBasePtr;
1646 
1647   CountedLoopCallback c1(&eventBase, 20);
1648   CountedLoopCallback c2(&eventBase, 20);
1649   CountedLoopCallback c3(&eventBase, 20);
1650 
1651   std::function<void()> cancelC1Action =
1652       std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1653   std::function<void()> cancelC2Action =
1654       std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1655 
1656   CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1657   CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1658 
1659   // Install cancelC1 after c1
1660   eventBase.runInLoop(&c1);
1661   eventBase.runInLoop(&cancelC1);
1662 
1663   // Install cancelC2 before c2
1664   eventBase.runInLoop(&cancelC2);
1665   eventBase.runInLoop(&c2);
1666 
1667   // Install c3
1668   eventBase.runInLoop(&c3);
1669 
1670   ASSERT_EQ(c1.getCount(), 20);
1671   ASSERT_EQ(c2.getCount(), 20);
1672   ASSERT_EQ(c3.getCount(), 20);
1673   ASSERT_EQ(cancelC1.getCount(), 10);
1674   ASSERT_EQ(cancelC2.getCount(), 10);
1675 
1676   // Run the loop
1677   eventBase.loop();
1678 
1679   // cancelC1 and cancelC2 should have both fired after 10 iterations and
1680   // stopped re-installing themselves
1681   ASSERT_EQ(cancelC1.getCount(), 0);
1682   ASSERT_EQ(cancelC2.getCount(), 0);
1683   // c3 should have continued on for the full 20 iterations
1684   ASSERT_EQ(c3.getCount(), 0);
1685 
1686   // c1 and c2 should have both been cancelled on the 10th iteration.
1687   //
1688   // Callbacks are always run in the order they are installed,
1689   // so c1 should have fired 10 times, and been canceled after it ran on the
1690   // 10th iteration.  c2 should have only fired 9 times, because cancelC2 will
1691   // have run before it on the 10th iteration, and cancelled it before it
1692   // fired.
1693   ASSERT_EQ(c1.getCount(), 10);
1694   ASSERT_EQ(c2.getCount(), 11);
1695 }
1696 
1697 namespace {
1698 class TerminateTestCallback : public EventBase::LoopCallback,
1699                               public EventHandler {
1700  public:
TerminateTestCallback(EventBase * eventBase,int fd)1701   TerminateTestCallback(EventBase* eventBase, int fd)
1702       : EventHandler(eventBase, NetworkSocket::fromFd(fd)),
1703         eventBase_(eventBase),
1704         loopInvocations_(0),
1705         maxLoopInvocations_(0),
1706         eventInvocations_(0),
1707         maxEventInvocations_(0) {}
1708 
reset(uint32_t maxLoopInvocations,uint32_t maxEventInvocations)1709   void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1710     loopInvocations_ = 0;
1711     maxLoopInvocations_ = maxLoopInvocations;
1712     eventInvocations_ = 0;
1713     maxEventInvocations_ = maxEventInvocations;
1714 
1715     cancelLoopCallback();
1716     unregisterHandler();
1717   }
1718 
handlerReady(uint16_t)1719   void handlerReady(uint16_t /* events */) noexcept override {
1720     // We didn't register with PERSIST, so we will have been automatically
1721     // unregistered already.
1722     ASSERT_FALSE(isHandlerRegistered());
1723 
1724     ++eventInvocations_;
1725     if (eventInvocations_ >= maxEventInvocations_) {
1726       return;
1727     }
1728 
1729     eventBase_->runInLoop(this);
1730   }
runLoopCallback()1731   void runLoopCallback() noexcept override {
1732     ++loopInvocations_;
1733     if (loopInvocations_ >= maxLoopInvocations_) {
1734       return;
1735     }
1736 
1737     registerHandler(READ);
1738   }
1739 
getLoopInvocations()1740   uint32_t getLoopInvocations() const { return loopInvocations_; }
getEventInvocations()1741   uint32_t getEventInvocations() const { return eventInvocations_; }
1742 
1743  private:
1744   EventBase* eventBase_;
1745   uint32_t loopInvocations_;
1746   uint32_t maxLoopInvocations_;
1747   uint32_t eventInvocations_;
1748   uint32_t maxEventInvocations_;
1749 };
1750 } // namespace
1751 
1752 /**
1753  * Test that EventBase::loop() correctly detects when there are no more events
1754  * left to run.
1755  *
1756  * This uses a single callback, which alternates registering itself as a loop
1757  * callback versus a EventHandler callback.  This exercises a regression where
1758  * EventBase::loop() incorrectly exited if there were no more fd handlers
1759  * registered, but a loop callback installed a new fd handler.
1760  */
TYPED_TEST_P(EventBaseTest,LoopTermination)1761 TYPED_TEST_P(EventBaseTest, LoopTermination) {
1762   auto eventBasePtr = getEventBase<TypeParam>();
1763   SKIP_IF(!eventBasePtr) << "Backend not available";
1764   folly::EventBase& eventBase = *eventBasePtr;
1765 
1766   // Open a pipe and close the write end,
1767   // so the read endpoint will be readable
1768   int pipeFds[2];
1769   int rc = pipe(pipeFds);
1770   ASSERT_EQ(rc, 0);
1771   close(pipeFds[1]);
1772   TerminateTestCallback callback(&eventBase, pipeFds[0]);
1773 
1774   // Test once where the callback will exit after a loop callback
1775   callback.reset(10, 100);
1776   eventBase.runInLoop(&callback);
1777   eventBase.loop();
1778   ASSERT_EQ(callback.getLoopInvocations(), 10);
1779   ASSERT_EQ(callback.getEventInvocations(), 9);
1780 
1781   // Test once where the callback will exit after an fd event callback
1782   callback.reset(100, 7);
1783   eventBase.runInLoop(&callback);
1784   eventBase.loop();
1785   ASSERT_EQ(callback.getLoopInvocations(), 7);
1786   ASSERT_EQ(callback.getEventInvocations(), 7);
1787 
1788   close(pipeFds[0]);
1789 }
1790 
TYPED_TEST_P(EventBaseTest,CallbackOrderTest)1791 TYPED_TEST_P(EventBaseTest, CallbackOrderTest) {
1792   size_t num = 0;
1793   auto eventBasePtr = getEventBase<TypeParam>();
1794   SKIP_IF(!eventBasePtr) << "Backend not available";
1795   folly::EventBase& evb = *eventBasePtr;
1796 
1797   evb.runInEventBaseThread([&]() {
1798     std::thread t([&]() {
1799       evb.runInEventBaseThread([&]() {
1800         num++;
1801         EXPECT_EQ(num, 2);
1802       });
1803     });
1804     t.join();
1805 
1806     // this callback will run first
1807     // even if it is scheduled after the first one
1808     evb.runInEventBaseThread([&]() {
1809       num++;
1810       EXPECT_EQ(num, 1);
1811     });
1812   });
1813 
1814   evb.loop();
1815   EXPECT_EQ(num, 2);
1816 }
1817 
TYPED_TEST_P(EventBaseTest,AlwaysEnqueueCallbackOrderTest)1818 TYPED_TEST_P(EventBaseTest, AlwaysEnqueueCallbackOrderTest) {
1819   size_t num = 0;
1820   auto eventBasePtr = getEventBase<TypeParam>();
1821   SKIP_IF(!eventBasePtr) << "Backend not available";
1822   folly::EventBase& evb = *eventBasePtr;
1823 
1824   evb.runInEventBaseThread([&]() {
1825     std::thread t([&]() {
1826       evb.runInEventBaseThreadAlwaysEnqueue([&]() {
1827         num++;
1828         EXPECT_EQ(num, 1);
1829       });
1830     });
1831     t.join();
1832 
1833     // this callback will run second
1834     // since it was enqueued after the first one
1835     evb.runInEventBaseThreadAlwaysEnqueue([&]() {
1836       num++;
1837       EXPECT_EQ(num, 2);
1838     });
1839   });
1840 
1841   evb.loop();
1842   EXPECT_EQ(num, 2);
1843 }
1844 
TYPED_TEST_P(EventBaseTest1,InternalExternalCallbackOrderTest)1845 TYPED_TEST_P(EventBaseTest1, InternalExternalCallbackOrderTest) {
1846   size_t counter = 0;
1847 
1848   auto eventBasePtr = getEventBase<TypeParam>();
1849   SKIP_IF(!eventBasePtr) << "Backend not available";
1850   folly::EventBase& evb = *eventBasePtr;
1851 
1852   std::vector<size_t> calls;
1853 
1854   folly::Function<void(size_t)> runInLoopRecursive = [&](size_t left) {
1855     evb.runInLoop([&, left]() mutable {
1856       calls.push_back(counter++);
1857       if (--left == 0) {
1858         evb.terminateLoopSoon();
1859         return;
1860       }
1861       runInLoopRecursive(left);
1862     });
1863   };
1864 
1865   evb.runInEventBaseThread([&] { runInLoopRecursive(5); });
1866   for (size_t i = 0; i < 49; ++i) {
1867     evb.runInEventBaseThread([&] { ++counter; });
1868   }
1869   evb.loopForever();
1870 
1871   EXPECT_EQ(std::vector<size_t>({9, 20, 31, 42, 53}), calls);
1872 }
1873 
1874 ///////////////////////////////////////////////////////////////////////////
1875 // Tests for latency calculations
1876 ///////////////////////////////////////////////////////////////////////////
1877 
1878 namespace {
1879 class IdleTimeTimeoutSeries : public AsyncTimeout {
1880  public:
IdleTimeTimeoutSeries(EventBase * base,std::deque<std::size_t> & timeout)1881   explicit IdleTimeTimeoutSeries(
1882       EventBase* base, std::deque<std::size_t>& timeout)
1883       : AsyncTimeout(base), timeouts_(0), timeout_(timeout) {
1884     scheduleTimeout(1);
1885   }
1886 
~IdleTimeTimeoutSeries()1887   ~IdleTimeTimeoutSeries() override {}
1888 
timeoutExpired()1889   void timeoutExpired() noexcept override {
1890     ++timeouts_;
1891 
1892     if (timeout_.empty()) {
1893       cancelTimeout();
1894     } else {
1895       std::size_t sleepTime = timeout_.front();
1896       timeout_.pop_front();
1897       if (sleepTime) {
1898         usleep(sleepTime);
1899       }
1900       scheduleTimeout(1);
1901     }
1902   }
1903 
getTimeouts()1904   int getTimeouts() const { return timeouts_; }
1905 
1906  private:
1907   int timeouts_;
1908   std::deque<std::size_t>& timeout_;
1909 };
1910 } // namespace
1911 
1912 /**
1913  * Verify that idle time is correctly accounted for when decaying our loop
1914  * time.
1915  *
1916  * This works by creating a high loop time (via usleep), expecting a latency
1917  * callback with known value, and then scheduling a timeout for later. This
1918  * later timeout is far enough in the future that the idle time should have
1919  * caused the loop time to decay.
1920  */
TYPED_TEST_P(EventBaseTest,IdleTime)1921 TYPED_TEST_P(EventBaseTest, IdleTime) {
1922   auto eventBasePtr = getEventBase<TypeParam>();
1923   SKIP_IF(!eventBasePtr) << "Backend not available";
1924   folly::EventBase& eventBase = *eventBasePtr;
1925   std::deque<std::size_t> timeouts0(4, 8080);
1926   timeouts0.push_front(8000);
1927   timeouts0.push_back(14000);
1928   IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1929   std::deque<std::size_t> timeouts(20, 20);
1930   std::unique_ptr<IdleTimeTimeoutSeries> tos;
1931   bool hostOverloaded = false;
1932 
1933   // Loop once before starting the main test.  This will run NotificationQueue
1934   // callbacks that get automatically installed when the EventBase is first
1935   // created.  We want to make sure they don't interfere with the timing
1936   // operations below.
1937   eventBase.loopOnce(EVLOOP_NONBLOCK);
1938   eventBase.setLoadAvgMsec(std::chrono::milliseconds(1000));
1939   eventBase.resetLoadAvg(5900.0);
1940 
1941   int latencyCallbacks = 0;
1942   eventBase.setMaxLatency(std::chrono::microseconds(6000), [&]() {
1943     ++latencyCallbacks;
1944     if (latencyCallbacks != 1 || tos0.getTimeouts() < 6) {
1945       // This could only happen if the host this test is running
1946       // on is heavily loaded.
1947       hostOverloaded = true;
1948       return;
1949     }
1950 
1951     EXPECT_EQ(6, tos0.getTimeouts());
1952     EXPECT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1953     EXPECT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1954     tos = std::make_unique<IdleTimeTimeoutSeries>(&eventBase, timeouts);
1955   });
1956 
1957   // Kick things off with an "immediate" timeout
1958   tos0.scheduleTimeout(1);
1959 
1960   eventBase.loop();
1961 
1962   if (hostOverloaded) {
1963     SKIP() << "host too heavily loaded to execute test";
1964   }
1965 
1966   ASSERT_EQ(1, latencyCallbacks);
1967   ASSERT_EQ(7, tos0.getTimeouts());
1968   ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1969   ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1970   ASSERT_TRUE(!!tos);
1971   ASSERT_EQ(21, tos->getTimeouts());
1972 }
1973 
TYPED_TEST_P(EventBaseTest,MaxLatencyUndamped)1974 TYPED_TEST_P(EventBaseTest, MaxLatencyUndamped) {
1975   auto eventBasePtr = getEventBase<TypeParam>();
1976   folly::EventBase& eb = *eventBasePtr;
1977   int maxDurationViolations = 0;
1978   eb.setMaxLatency(
1979       std::chrono::milliseconds{1}, [&]() { maxDurationViolations++; }, false);
1980   eb.runInLoop(
1981       [&]() {
1982         /* sleep override */ std::this_thread::sleep_for(
1983             std::chrono::microseconds{1001});
1984         eb.terminateLoopSoon();
1985       },
1986       true);
1987   eb.loop();
1988   ASSERT_EQ(maxDurationViolations, 1);
1989 }
1990 
TYPED_TEST_P(EventBaseTest,UnsetMaxLatencyUndamped)1991 TYPED_TEST_P(EventBaseTest, UnsetMaxLatencyUndamped) {
1992   auto eventBasePtr = getEventBase<TypeParam>();
1993   folly::EventBase& eb = *eventBasePtr;
1994   int maxDurationViolations = 0;
1995   eb.setMaxLatency(
1996       std::chrono::milliseconds{1}, [&]() { maxDurationViolations++; }, false);
1997   // Immediately unset it and make sure the counter isn't incremented. If the
1998   // function gets called, this will raise an std::bad_function_call.
1999   std::function<void()> bad_func = nullptr;
2000   eb.setMaxLatency(std::chrono::milliseconds{0}, bad_func, false);
2001   eb.runInLoop(
2002       [&]() {
2003         /* sleep override */ std::this_thread::sleep_for(
2004             std::chrono::microseconds{1001});
2005         eb.terminateLoopSoon();
2006       },
2007       true);
2008   eb.loop();
2009   ASSERT_EQ(maxDurationViolations, 0);
2010 }
2011 
2012 /**
2013  * Test that thisLoop functionality works with terminateLoopSoon
2014  */
TYPED_TEST_P(EventBaseTest,ThisLoop)2015 TYPED_TEST_P(EventBaseTest, ThisLoop) {
2016   bool runInLoop = false;
2017   bool runThisLoop = false;
2018 
2019   {
2020     auto eventBasePtr = getEventBase<TypeParam>();
2021     SKIP_IF(!eventBasePtr) << "Backend not available";
2022     folly::EventBase& eb = *eventBasePtr;
2023     eb.runInLoop(
2024         [&]() {
2025           eb.terminateLoopSoon();
2026           eb.runInLoop([&]() { runInLoop = true; });
2027           eb.runInLoop([&]() { runThisLoop = true; }, true);
2028         },
2029         true);
2030     eb.loopForever();
2031 
2032     // Should not work
2033     ASSERT_FALSE(runInLoop);
2034     // Should work with thisLoop
2035     ASSERT_TRUE(runThisLoop);
2036   }
2037 
2038   // Pending loop callbacks will be run when the EventBase is destroyed.
2039   ASSERT_TRUE(runInLoop);
2040 }
2041 
TYPED_TEST_P(EventBaseTest,EventBaseThreadLoop)2042 TYPED_TEST_P(EventBaseTest, EventBaseThreadLoop) {
2043   auto eventBasePtr = getEventBase<TypeParam>();
2044   SKIP_IF(!eventBasePtr) << "Backend not available";
2045   folly::EventBase& base = *eventBasePtr;
2046   bool ran = false;
2047   base.runInEventBaseThread([&]() { ran = true; });
2048   base.loop();
2049 
2050   ASSERT_TRUE(ran);
2051 }
2052 
TYPED_TEST_P(EventBaseTest,EventBaseThreadName)2053 TYPED_TEST_P(EventBaseTest, EventBaseThreadName) {
2054   auto eventBasePtr = getEventBase<TypeParam>();
2055   SKIP_IF(!eventBasePtr) << "Backend not available";
2056   folly::EventBase& base = *eventBasePtr;
2057   base.setName("foo");
2058   base.loop();
2059 
2060   ASSERT_EQ("foo", *getCurrentThreadName());
2061 }
2062 
TYPED_TEST_P(EventBaseTest,RunBeforeLoop)2063 TYPED_TEST_P(EventBaseTest, RunBeforeLoop) {
2064   auto eventBasePtr = getEventBase<TypeParam>();
2065   SKIP_IF(!eventBasePtr) << "Backend not available";
2066   folly::EventBase& base = *eventBasePtr;
2067   CountedLoopCallback cb(&base, 1, [&]() { base.terminateLoopSoon(); });
2068   base.runBeforeLoop(&cb);
2069   base.loopForever();
2070   ASSERT_EQ(cb.getCount(), 0);
2071 }
2072 
TYPED_TEST_P(EventBaseTest,RunBeforeLoopWait)2073 TYPED_TEST_P(EventBaseTest, RunBeforeLoopWait) {
2074   auto eventBasePtr = getEventBase<TypeParam>();
2075   SKIP_IF(!eventBasePtr) << "Backend not available";
2076   folly::EventBase& base = *eventBasePtr;
2077   CountedLoopCallback cb(&base, 1);
2078   base.tryRunAfterDelay([&]() { base.terminateLoopSoon(); }, 500);
2079   base.runBeforeLoop(&cb);
2080   base.loopForever();
2081 
2082   // Check that we only ran once, and did not loop multiple times.
2083   ASSERT_EQ(cb.getCount(), 0);
2084 }
2085 
2086 namespace {
2087 class PipeHandler : public EventHandler {
2088  public:
PipeHandler(EventBase * eventBase,int fd)2089   PipeHandler(EventBase* eventBase, int fd)
2090       : EventHandler(eventBase, NetworkSocket::fromFd(fd)) {}
2091 
handlerReady(uint16_t)2092   void handlerReady(uint16_t /* events */) noexcept override { abort(); }
2093 };
2094 } // namespace
2095 
TYPED_TEST_P(EventBaseTest,StopBeforeLoop)2096 TYPED_TEST_P(EventBaseTest, StopBeforeLoop) {
2097   auto eventBasePtr = getEventBase<TypeParam>();
2098   SKIP_IF(!eventBasePtr) << "Backend not available";
2099   folly::EventBase& evb = *eventBasePtr;
2100 
2101   // Give the evb something to do.
2102   int p[2];
2103   ASSERT_EQ(0, pipe(p));
2104   PipeHandler handler(&evb, p[0]);
2105   handler.registerHandler(EventHandler::READ);
2106 
2107   // It's definitely not running yet
2108   evb.terminateLoopSoon();
2109 
2110   // let it run, it should exit quickly.
2111   std::thread t([&] { evb.loop(); });
2112   t.join();
2113 
2114   handler.unregisterHandler();
2115   close(p[0]);
2116   close(p[1]);
2117 
2118   SUCCEED();
2119 }
2120 
TYPED_TEST_P(EventBaseTest,RunCallbacksOnDestruction)2121 TYPED_TEST_P(EventBaseTest, RunCallbacksOnDestruction) {
2122   bool ran = false;
2123 
2124   {
2125     auto eventBasePtr = getEventBase<TypeParam>();
2126     SKIP_IF(!eventBasePtr) << "Backend not available";
2127     eventBasePtr->runInEventBaseThread([&]() { ran = true; });
2128   }
2129 
2130   ASSERT_TRUE(ran);
2131 }
2132 
TYPED_TEST_P(EventBaseTest,LoopKeepAlive)2133 TYPED_TEST_P(EventBaseTest, LoopKeepAlive) {
2134   auto evbPtr = getEventBase<TypeParam>();
2135   SKIP_IF(!evbPtr) << "Backend not available";
2136 
2137   bool done = false;
2138   std::thread t([&, loopKeepAlive = getKeepAliveToken(*evbPtr)]() mutable {
2139     /* sleep override */ std::this_thread::sleep_for(
2140         std::chrono::milliseconds(100));
2141     evbPtr->runInEventBaseThread(
2142         [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
2143   });
2144 
2145   evbPtr->loop();
2146 
2147   ASSERT_TRUE(done);
2148 
2149   t.join();
2150 }
2151 
TYPED_TEST_P(EventBaseTest,LoopKeepAliveInLoop)2152 TYPED_TEST_P(EventBaseTest, LoopKeepAliveInLoop) {
2153   auto evbPtr = getEventBase<TypeParam>();
2154   SKIP_IF(!evbPtr) << "Backend not available";
2155 
2156   bool done = false;
2157   std::thread t;
2158 
2159   evbPtr->runInEventBaseThread([&] {
2160     t = std::thread([&, loopKeepAlive = getKeepAliveToken(*evbPtr)]() mutable {
2161       /* sleep override */ std::this_thread::sleep_for(
2162           std::chrono::milliseconds(100));
2163       evbPtr->runInEventBaseThread(
2164           [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
2165     });
2166   });
2167 
2168   evbPtr->loop();
2169 
2170   ASSERT_TRUE(done);
2171 
2172   t.join();
2173 }
2174 
TYPED_TEST_P(EventBaseTest,LoopKeepAliveWithLoopForever)2175 TYPED_TEST_P(EventBaseTest, LoopKeepAliveWithLoopForever) {
2176   auto evbPtr = getEventBase<TypeParam>();
2177   SKIP_IF(!evbPtr) << "Backend not available";
2178 
2179   bool done = false;
2180 
2181   std::thread evThread([&] {
2182     evbPtr->loopForever();
2183     evbPtr.reset();
2184     done = true;
2185   });
2186 
2187   {
2188     auto* ev = evbPtr.get();
2189     Executor::KeepAlive<EventBase> keepAlive;
2190     ev->runInEventBaseThreadAndWait(
2191         [&ev, &keepAlive] { keepAlive = getKeepAliveToken(ev); });
2192     ASSERT_FALSE(done) << "Loop finished before we asked it to";
2193     ev->terminateLoopSoon();
2194     /* sleep override */
2195     std::this_thread::sleep_for(std::chrono::milliseconds(30));
2196     ASSERT_FALSE(done) << "Loop terminated early";
2197     ev->runInEventBaseThread([keepAlive = std::move(keepAlive)] {});
2198   }
2199 
2200   evThread.join();
2201   ASSERT_TRUE(done);
2202 }
2203 
TYPED_TEST_P(EventBaseTest,LoopKeepAliveShutdown)2204 TYPED_TEST_P(EventBaseTest, LoopKeepAliveShutdown) {
2205   auto evbPtr = getEventBase<TypeParam>();
2206   SKIP_IF(!evbPtr) << "Backend not available";
2207 
2208   bool done = false;
2209 
2210   std::thread t([&done,
2211                  loopKeepAlive = getKeepAliveToken(evbPtr.get()),
2212                  evbPtrRaw = evbPtr.get()]() mutable {
2213     /* sleep override */ std::this_thread::sleep_for(
2214         std::chrono::milliseconds(100));
2215     evbPtrRaw->runInEventBaseThread(
2216         [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
2217   });
2218 
2219   evbPtr.reset();
2220 
2221   ASSERT_TRUE(done);
2222 
2223   t.join();
2224 }
2225 
TYPED_TEST_P(EventBaseTest,LoopKeepAliveAtomic)2226 TYPED_TEST_P(EventBaseTest, LoopKeepAliveAtomic) {
2227   auto evbPtr = getEventBase<TypeParam>();
2228   SKIP_IF(!evbPtr) << "Backend not available";
2229 
2230   static constexpr size_t kNumThreads = 100;
2231   static constexpr size_t kNumTasks = 100;
2232 
2233   std::vector<std::thread> ts;
2234   std::vector<std::unique_ptr<Baton<>>> batons;
2235   size_t done{0};
2236 
2237   for (size_t i = 0; i < kNumThreads; ++i) {
2238     batons.emplace_back(std::make_unique<Baton<>>());
2239   }
2240 
2241   for (size_t i = 0; i < kNumThreads; ++i) {
2242     ts.emplace_back([evbPtrRaw = evbPtr.get(),
2243                      batonPtr = batons[i].get(),
2244                      &done] {
2245       std::vector<Executor::KeepAlive<EventBase>> keepAlives;
2246       for (size_t j = 0; j < kNumTasks; ++j) {
2247         keepAlives.emplace_back(getKeepAliveToken(evbPtrRaw));
2248       }
2249 
2250       batonPtr->post();
2251 
2252       /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
2253 
2254       for (auto& keepAlive : keepAlives) {
2255         evbPtrRaw->runInEventBaseThread(
2256             [&done, keepAlive = std::move(keepAlive)]() { ++done; });
2257       }
2258     });
2259   }
2260 
2261   for (auto& baton : batons) {
2262     baton->wait();
2263   }
2264 
2265   evbPtr.reset();
2266 
2267   EXPECT_EQ(kNumThreads * kNumTasks, done);
2268 
2269   for (auto& t : ts) {
2270     t.join();
2271   }
2272 }
2273 
TYPED_TEST_P(EventBaseTest,LoopKeepAliveCast)2274 TYPED_TEST_P(EventBaseTest, LoopKeepAliveCast) {
2275   auto evbPtr = getEventBase<TypeParam>();
2276   SKIP_IF(!evbPtr) << "Backend not available";
2277   Executor::KeepAlive<> keepAlive = getKeepAliveToken(*evbPtr);
2278 }
2279 
TYPED_TEST_P(EventBaseTest1,DrivableExecutorTest)2280 TYPED_TEST_P(EventBaseTest1, DrivableExecutorTest) {
2281   folly::Promise<bool> p;
2282   auto f = p.getFuture();
2283   auto eventBasePtr = getEventBase<TypeParam>();
2284   SKIP_IF(!eventBasePtr) << "Backend not available";
2285   folly::EventBase& base = *eventBasePtr;
2286   bool finished = false;
2287 
2288   Baton baton;
2289 
2290   std::thread t([&] {
2291     baton.wait();
2292     /* sleep override */
2293     std::this_thread::sleep_for(std::chrono::milliseconds(500));
2294     finished = true;
2295     base.runInEventBaseThread([&]() { p.setValue(true); });
2296   });
2297 
2298   // Ensure drive does not busy wait
2299   base.drive(); // TODO: fix notification queue init() extra wakeup
2300   baton.post();
2301   base.drive();
2302   EXPECT_TRUE(finished);
2303 
2304   folly::Promise<bool> p2;
2305   auto f2 = p2.getFuture();
2306   // Ensure waitVia gets woken up properly, even from
2307   // a separate thread.
2308   base.runAfterDelay([&]() { p2.setValue(true); }, 10);
2309   f2.waitVia(&base);
2310   EXPECT_TRUE(f2.isReady());
2311 
2312   t.join();
2313 }
2314 
TYPED_TEST_P(EventBaseTest1,IOExecutorTest)2315 TYPED_TEST_P(EventBaseTest1, IOExecutorTest) {
2316   auto evbPtr = getEventBase<TypeParam>();
2317   SKIP_IF(!evbPtr) << "Backend not available";
2318 
2319   // Ensure EventBase manages itself as an IOExecutor.
2320   EXPECT_EQ(evbPtr->getEventBase(), evbPtr.get());
2321 }
2322 
TYPED_TEST_P(EventBaseTest1,RequestContextTest)2323 TYPED_TEST_P(EventBaseTest1, RequestContextTest) {
2324   auto evbPtr = getEventBase<TypeParam>();
2325   SKIP_IF(!evbPtr) << "Backend not available";
2326   auto defaultCtx = RequestContext::get();
2327   std::weak_ptr<RequestContext> rctx_weak_ptr;
2328 
2329   {
2330     RequestContextScopeGuard rctx;
2331     rctx_weak_ptr = RequestContext::saveContext();
2332     auto context = RequestContext::get();
2333     EXPECT_NE(defaultCtx, context);
2334     evbPtr->runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
2335     evbPtr->loop();
2336   }
2337 
2338   // Ensure that RequestContext created for the scope has been released and
2339   // deleted.
2340   EXPECT_EQ(rctx_weak_ptr.expired(), true);
2341 
2342   EXPECT_EQ(defaultCtx, RequestContext::get());
2343 }
2344 
TYPED_TEST_P(EventBaseTest1,CancelLoopCallbackRequestContextTest)2345 TYPED_TEST_P(EventBaseTest1, CancelLoopCallbackRequestContextTest) {
2346   auto evbPtr = getEventBase<TypeParam>();
2347   SKIP_IF(!evbPtr) << "Backend not available";
2348   CountedLoopCallback c(evbPtr.get(), 1);
2349 
2350   auto defaultCtx = RequestContext::get();
2351   EXPECT_EQ(defaultCtx, RequestContext::get());
2352   std::weak_ptr<RequestContext> rctx_weak_ptr;
2353 
2354   {
2355     RequestContextScopeGuard rctx;
2356     rctx_weak_ptr = RequestContext::saveContext();
2357     auto context = RequestContext::get();
2358     EXPECT_NE(defaultCtx, context);
2359     evbPtr->runInLoop(&c);
2360     c.cancelLoopCallback();
2361   }
2362 
2363   // Ensure that RequestContext created for the scope has been released and
2364   // deleted.
2365   EXPECT_EQ(rctx_weak_ptr.expired(), true);
2366 
2367   EXPECT_EQ(defaultCtx, RequestContext::get());
2368 }
2369 
TYPED_TEST_P(EventBaseTest1,TestStarvation)2370 TYPED_TEST_P(EventBaseTest1, TestStarvation) {
2371   auto evbPtr = getEventBase<TypeParam>();
2372   SKIP_IF(!evbPtr) << "Backend not available";
2373 
2374   std::promise<void> stopRequested;
2375   std::promise<void> stopScheduled;
2376   bool stopping{false};
2377   std::thread t{[&] {
2378     stopRequested.get_future().get();
2379     evbPtr->add([&]() { stopping = true; });
2380     stopScheduled.set_value();
2381   }};
2382 
2383   size_t num{0};
2384   std::function<void()> fn;
2385   fn = [&]() {
2386     if (stopping || num >= 2000) {
2387       return;
2388     }
2389 
2390     if (++num == 1000) {
2391       stopRequested.set_value();
2392       stopScheduled.get_future().get();
2393     }
2394 
2395     evbPtr->add(fn);
2396   };
2397 
2398   evbPtr->add(fn);
2399   evbPtr->loop();
2400 
2401   EXPECT_EQ(1000, num);
2402   t.join();
2403 }
2404 
TYPED_TEST_P(EventBaseTest1,RunOnDestructionBasic)2405 TYPED_TEST_P(EventBaseTest1, RunOnDestructionBasic) {
2406   bool ranOnDestruction = false;
2407   {
2408     auto evbPtr = getEventBase<TypeParam>();
2409     SKIP_IF(!evbPtr) << "Backend not available";
2410     evbPtr->runOnDestruction([&ranOnDestruction] { ranOnDestruction = true; });
2411   }
2412   EXPECT_TRUE(ranOnDestruction);
2413 }
2414 
TYPED_TEST_P(EventBaseTest1,RunOnDestructionCancelled)2415 TYPED_TEST_P(EventBaseTest1, RunOnDestructionCancelled) {
2416   struct Callback : EventBase::OnDestructionCallback {
2417     bool ranOnDestruction{false};
2418 
2419     void onEventBaseDestruction() noexcept final { ranOnDestruction = true; }
2420   };
2421 
2422   auto cb = std::make_unique<Callback>();
2423   {
2424     auto evbPtr = getEventBase<TypeParam>();
2425     SKIP_IF(!evbPtr) << "Backend not available";
2426     evbPtr->runOnDestruction(*cb);
2427     EXPECT_TRUE(cb->cancel());
2428   }
2429   EXPECT_FALSE(cb->ranOnDestruction);
2430   EXPECT_FALSE(cb->cancel());
2431 }
2432 
TYPED_TEST_P(EventBaseTest1,RunOnDestructionAfterHandleDestroyed)2433 TYPED_TEST_P(EventBaseTest1, RunOnDestructionAfterHandleDestroyed) {
2434   auto evbPtr = getEventBase<TypeParam>();
2435   SKIP_IF(!evbPtr) << "Backend not available";
2436   {
2437     bool ranOnDestruction = false;
2438     auto* cb = new EventBase::FunctionOnDestructionCallback(
2439         [&ranOnDestruction] { ranOnDestruction = true; });
2440     evbPtr->runOnDestruction(*cb);
2441     EXPECT_TRUE(cb->cancel());
2442     delete cb;
2443   }
2444 }
2445 
TYPED_TEST_P(EventBaseTest1,RunOnDestructionAddCallbackWithinCallback)2446 TYPED_TEST_P(EventBaseTest1, RunOnDestructionAddCallbackWithinCallback) {
2447   size_t callbacksCalled = 0;
2448   {
2449     auto evbPtr = getEventBase<TypeParam>();
2450     SKIP_IF(!evbPtr) << "Backend not available";
2451     evbPtr->runOnDestruction([&] {
2452       ++callbacksCalled;
2453       evbPtr->runOnDestruction([&] { ++callbacksCalled; });
2454     });
2455   }
2456   EXPECT_EQ(2, callbacksCalled);
2457 }
2458 
TYPED_TEST_P(EventBaseTest1,EventBaseExecutionObserver)2459 TYPED_TEST_P(EventBaseTest1, EventBaseExecutionObserver) {
2460   auto eventBasePtr = getEventBase<TypeParam>();
2461   SKIP_IF(!eventBasePtr) << "Backend not available";
2462   folly::EventBase& base = *eventBasePtr;
2463   bool ranBeforeLoop = false;
2464   bool ran = false;
2465   TestObserver observer;
2466   base.setExecutionObserver(&observer);
2467 
2468   CountedLoopCallback cb(&base, 1, [&]() { ranBeforeLoop = true; });
2469   base.runBeforeLoop(&cb);
2470 
2471   base.runInEventBaseThread(
2472       [&]() { base.runInEventBaseThread([&]() { ran = true; }); });
2473   base.loop();
2474 
2475   ASSERT_TRUE(ranBeforeLoop);
2476   ASSERT_TRUE(ran);
2477   ASSERT_EQ(0, observer.nestedStart_);
2478   ASSERT_EQ(4, observer.numStartingCalled_);
2479   ASSERT_EQ(4, observer.numStoppedCalled_);
2480 }
2481 } // namespace test
2482 } // namespace folly
2483