1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <atomic>
18 #include <future>
19 #include <iostream>
20 #include <memory>
21 #include <thread>
22
23 #include <folly/Memory.h>
24 #include <folly/ScopeGuard.h>
25 #include <folly/futures/Promise.h>
26 #include <folly/io/async/AsyncTimeout.h>
27 #include <folly/io/async/EventBase.h>
28 #include <folly/io/async/EventHandler.h>
29 #include <folly/io/async/test/SocketPair.h>
30 #include <folly/io/async/test/Util.h>
31 #include <folly/portability/Stdlib.h>
32 #include <folly/portability/Unistd.h>
33 #include <folly/synchronization/Baton.h>
34 #include <folly/system/ThreadName.h>
35
36 #define FOLLY_SKIP_IF_NULLPTR_BACKEND_WITH_OPTS(evb, opts) \
37 std::unique_ptr<EventBase> evb##Ptr; \
38 try { \
39 auto factory = [] { \
40 auto backend = TypeParam::getBackend(); \
41 if (!backend) { \
42 throw std::runtime_error("backend not available"); \
43 } \
44 return backend; \
45 }; \
46 auto evbOpts = opts; \
47 evb##Ptr = std::make_unique<EventBase>( \
48 opts.setBackendFactory(std::move(factory))); \
49 } catch (const std::runtime_error& e) { \
50 if (std::string("backend not available") == e.what()) { \
51 SKIP() << "Backend not available"; \
52 } \
53 } \
54 EventBase& evb = *evb##Ptr.get()
55
56 #define FOLLY_SKIP_IF_NULLPTR_BACKEND(evb) \
57 FOLLY_SKIP_IF_NULLPTR_BACKEND_WITH_OPTS(evb, EventBase::Options())
58
59 ///////////////////////////////////////////////////////////////////////////
60 // Tests for read and write events
61 ///////////////////////////////////////////////////////////////////////////
62
63 namespace folly {
64 namespace test {
65 class EventBaseTestBase : public ::testing::Test {
66 public:
EventBaseTestBase()67 EventBaseTestBase() {
68 // libevent 2.x uses a coarse monotonic timer by default on Linux.
69 // This timer is imprecise enough to cause several of our tests to fail.
70 //
71 // Set an environment variable that causes libevent to use a non-coarse
72 // timer. This can be controlled programmatically by using the
73 // EVENT_BASE_FLAG_PRECISE_TIMER flag with event_base_new_with_config().
74 // However, this would require more compile-time #ifdefs to tell if we are
75 // using libevent 2.1+ or not. Simply using the environment variable is
76 // the easiest option for now.
77 setenv("EVENT_PRECISE_TIMER", "1", 1);
78 }
79 };
80
81 template <typename T>
82 class EventBaseTest : public EventBaseTestBase {
83 public:
84 EventBaseTest() = default;
85 };
86
87 TYPED_TEST_SUITE_P(EventBaseTest);
88
89 template <typename T>
90 class EventBaseTest1 : public EventBaseTestBase {
91 public:
92 EventBaseTest1() = default;
93 };
94
95 template <class Factory>
96 std::unique_ptr<EventBase> getEventBase(
97 folly::EventBase::Options opts = folly::EventBase::Options()) {
98 try {
99 auto factory = [] {
100 auto backend = Factory::getBackend();
101 if (!backend) {
102 throw std::runtime_error("backend not available");
103 }
104 return backend;
105 };
106 return std::make_unique<EventBase>(
107 opts.setBackendFactory(std::move(factory)));
catch(const std::runtime_error & e)108 } catch (const std::runtime_error& e) {
109 if (std::string("backend not available") == e.what()) {
110 return nullptr;
111 }
112 throw;
113 }
114 }
115
116 TYPED_TEST_SUITE_P(EventBaseTest1);
117
118 enum { BUF_SIZE = 4096 };
119
writeToFD(int fd,size_t length)120 FOLLY_ALWAYS_INLINE ssize_t writeToFD(int fd, size_t length) {
121 // write an arbitrary amount of data to the fd
122 auto bufv = std::vector<char>(length);
123 auto buf = bufv.data();
124 memset(buf, 'a', length);
125 ssize_t rc = write(fd, buf, length);
126 CHECK_EQ(rc, length);
127 return rc;
128 }
129
writeUntilFull(int fd)130 FOLLY_ALWAYS_INLINE size_t writeUntilFull(int fd) {
131 // Write to the fd until EAGAIN is returned
132 size_t bytesWritten = 0;
133 char buf[BUF_SIZE];
134 memset(buf, 'a', sizeof(buf));
135 while (true) {
136 ssize_t rc = write(fd, buf, sizeof(buf));
137 if (rc < 0) {
138 CHECK_EQ(errno, EAGAIN);
139 break;
140 } else {
141 bytesWritten += rc;
142 }
143 }
144 return bytesWritten;
145 }
146
readFromFD(int fd,size_t length)147 FOLLY_ALWAYS_INLINE ssize_t readFromFD(int fd, size_t length) {
148 // write an arbitrary amount of data to the fd
149 auto buf = std::vector<char>(length);
150 return read(fd, buf.data(), length);
151 }
152
readUntilEmpty(int fd)153 FOLLY_ALWAYS_INLINE size_t readUntilEmpty(int fd) {
154 // Read from the fd until EAGAIN is returned
155 char buf[BUF_SIZE];
156 size_t bytesRead = 0;
157 while (true) {
158 int rc = read(fd, buf, sizeof(buf));
159 if (rc == 0) {
160 CHECK(false) << "unexpected EOF";
161 } else if (rc < 0) {
162 CHECK_EQ(errno, EAGAIN);
163 break;
164 } else {
165 bytesRead += rc;
166 }
167 }
168 return bytesRead;
169 }
170
checkReadUntilEmpty(int fd,size_t expectedLength)171 FOLLY_ALWAYS_INLINE void checkReadUntilEmpty(int fd, size_t expectedLength) {
172 ASSERT_EQ(readUntilEmpty(fd), expectedLength);
173 }
174
175 struct ScheduledEvent {
176 int milliseconds;
177 uint16_t events;
178 size_t length;
179 ssize_t result;
180
performScheduledEvent181 void perform(int fd) {
182 if (events & folly::EventHandler::READ) {
183 if (length == 0) {
184 result = readUntilEmpty(fd);
185 } else {
186 result = readFromFD(fd, length);
187 }
188 }
189 if (events & folly::EventHandler::WRITE) {
190 if (length == 0) {
191 result = writeUntilFull(fd);
192 } else {
193 result = writeToFD(fd, length);
194 }
195 }
196 }
197 };
198
scheduleEvents(EventBase * eventBase,int fd,ScheduledEvent * events)199 FOLLY_ALWAYS_INLINE void scheduleEvents(
200 EventBase* eventBase, int fd, ScheduledEvent* events) {
201 for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
202 eventBase->tryRunAfterDelay(
203 std::bind(&ScheduledEvent::perform, ev, fd), ev->milliseconds);
204 }
205 }
206
207 class TestObserver : public folly::ExecutionObserver {
208 public:
starting(uintptr_t)209 virtual void starting(uintptr_t /* id */) noexcept override {
210 if (nestedStart_ == 0) {
211 nestedStart_ = 1;
212 }
213 numStartingCalled_++;
214 }
stopped(uintptr_t)215 virtual void stopped(uintptr_t /* id */) noexcept override {
216 nestedStart_--;
217 numStoppedCalled_++;
218 }
runnable(uintptr_t)219 virtual void runnable(uintptr_t /* id */) noexcept override {
220 // Unused
221 }
222
223 int nestedStart_{0};
224 int numStartingCalled_{0};
225 int numStoppedCalled_{0};
226 };
227
228 class TestHandler : public folly::EventHandler {
229 public:
TestHandler(folly::EventBase * eventBase,int fd)230 TestHandler(folly::EventBase* eventBase, int fd)
231 : EventHandler(eventBase, folly::NetworkSocket::fromFd(fd)), fd_(fd) {}
232
handlerReady(uint16_t events)233 void handlerReady(uint16_t events) noexcept override {
234 ssize_t bytesRead = 0;
235 ssize_t bytesWritten = 0;
236 if (events & READ) {
237 // Read all available data, so EventBase will stop calling us
238 // until new data becomes available
239 bytesRead = readUntilEmpty(fd_);
240 }
241 if (events & WRITE) {
242 // Write until the pipe buffer is full, so EventBase will stop calling
243 // us until the other end has read some data
244 bytesWritten = writeUntilFull(fd_);
245 }
246
247 log.emplace_back(events, bytesRead, bytesWritten);
248 }
249
250 struct EventRecord {
EventRecordEventRecord251 EventRecord(uint16_t events_, size_t bytesRead_, size_t bytesWritten_)
252 : events(events_),
253 timestamp(),
254 bytesRead(bytesRead_),
255 bytesWritten(bytesWritten_) {}
256
257 uint16_t events;
258 folly::TimePoint timestamp;
259 ssize_t bytesRead;
260 ssize_t bytesWritten;
261 };
262
263 std::deque<EventRecord> log;
264
265 private:
266 int fd_;
267 };
268
269 /**
270 * Test a READ event
271 */
TYPED_TEST_P(EventBaseTest,ReadEvent)272 TYPED_TEST_P(EventBaseTest, ReadEvent) {
273 auto evbPtr = getEventBase<TypeParam>();
274 SKIP_IF(!evbPtr) << "Backend not available";
275 folly::EventBase& eb = *evbPtr;
276 SocketPair sp;
277
278 // Register for read events
279 TestHandler handler(&eb, sp[0]);
280 handler.registerHandler(EventHandler::READ);
281
282 // Register timeouts to perform two write events
283 ScheduledEvent events[] = {
284 {10, EventHandler::WRITE, 2345, 0},
285 {160, EventHandler::WRITE, 99, 0},
286 {0, 0, 0, 0},
287 };
288 TimePoint start;
289 scheduleEvents(&eb, sp[1], events);
290
291 // Loop
292 eb.loop();
293 TimePoint end;
294
295 // Since we didn't use the EventHandler::PERSIST flag, the handler should
296 // have received the first read, then unregistered itself. Check that only
297 // the first chunk of data was received.
298 ASSERT_EQ(handler.log.size(), 1);
299 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
300 T_CHECK_TIMEOUT(
301 start,
302 handler.log[0].timestamp,
303 std::chrono::milliseconds(events[0].milliseconds),
304 std::chrono::milliseconds(90));
305 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
306 ASSERT_EQ(handler.log[0].bytesWritten, 0);
307 T_CHECK_TIMEOUT(
308 start,
309 end,
310 std::chrono::milliseconds(events[1].milliseconds),
311 std::chrono::milliseconds(30));
312
313 // Make sure the second chunk of data is still waiting to be read.
314 size_t bytesRemaining = readUntilEmpty(sp[0]);
315 ASSERT_EQ(bytesRemaining, events[1].length);
316 }
317
318 /**
319 * Test (READ | PERSIST)
320 */
TYPED_TEST_P(EventBaseTest,ReadPersist)321 TYPED_TEST_P(EventBaseTest, ReadPersist) {
322 auto evbPtr = getEventBase<TypeParam>();
323 SKIP_IF(!evbPtr) << "Backend not available";
324 folly::EventBase& eb = *evbPtr;
325 SocketPair sp;
326
327 // Register for read events
328 TestHandler handler(&eb, sp[0]);
329 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
330
331 // Register several timeouts to perform writes
332 ScheduledEvent events[] = {
333 {10, EventHandler::WRITE, 1024, 0},
334 {20, EventHandler::WRITE, 2211, 0},
335 {30, EventHandler::WRITE, 4096, 0},
336 {100, EventHandler::WRITE, 100, 0},
337 {0, 0, 0, 0},
338 };
339 TimePoint start;
340 scheduleEvents(&eb, sp[1], events);
341
342 // Schedule a timeout to unregister the handler after the third write
343 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
344
345 // Loop
346 eb.loop();
347 TimePoint end;
348
349 // The handler should have received the first 3 events,
350 // then been unregistered after that.
351 ASSERT_EQ(handler.log.size(), 3);
352 for (int n = 0; n < 3; ++n) {
353 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
354 T_CHECK_TIMEOUT(
355 start,
356 handler.log[n].timestamp,
357 std::chrono::milliseconds(events[n].milliseconds));
358 ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
359 ASSERT_EQ(handler.log[n].bytesWritten, 0);
360 }
361 T_CHECK_TIMEOUT(
362 start, end, std::chrono::milliseconds(events[3].milliseconds));
363
364 // Make sure the data from the last write is still waiting to be read
365 size_t bytesRemaining = readUntilEmpty(sp[0]);
366 ASSERT_EQ(bytesRemaining, events[3].length);
367 }
368
369 /**
370 * Test registering for READ when the socket is immediately readable
371 */
TYPED_TEST_P(EventBaseTest,ReadImmediate)372 TYPED_TEST_P(EventBaseTest, ReadImmediate) {
373 auto evbPtr = getEventBase<TypeParam>();
374 SKIP_IF(!evbPtr) << "Backend not available";
375 folly::EventBase& eb = *evbPtr;
376 SocketPair sp;
377
378 // Write some data to the socket so the other end will
379 // be immediately readable
380 size_t dataLength = 1234;
381 writeToFD(sp[1], dataLength);
382
383 // Register for read events
384 TestHandler handler(&eb, sp[0]);
385 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
386
387 // Register a timeout to perform another write
388 ScheduledEvent events[] = {
389 {10, EventHandler::WRITE, 2345, 0},
390 {0, 0, 0, 0},
391 };
392 TimePoint start;
393 scheduleEvents(&eb, sp[1], events);
394
395 // Schedule a timeout to unregister the handler
396 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
397
398 // Loop
399 eb.loop();
400 TimePoint end;
401
402 ASSERT_EQ(handler.log.size(), 2);
403
404 // There should have been 1 event for immediate readability
405 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
406 T_CHECK_TIMEOUT(
407 start, handler.log[0].timestamp, std::chrono::milliseconds(0));
408 ASSERT_EQ(handler.log[0].bytesRead, dataLength);
409 ASSERT_EQ(handler.log[0].bytesWritten, 0);
410
411 // There should be another event after the timeout wrote more data
412 ASSERT_EQ(handler.log[1].events, EventHandler::READ);
413 T_CHECK_TIMEOUT(
414 start,
415 handler.log[1].timestamp,
416 std::chrono::milliseconds(events[0].milliseconds));
417 ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
418 ASSERT_EQ(handler.log[1].bytesWritten, 0);
419
420 T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(20));
421 }
422
423 /**
424 * Test a WRITE event
425 */
TYPED_TEST_P(EventBaseTest,WriteEvent)426 TYPED_TEST_P(EventBaseTest, WriteEvent) {
427 auto evbPtr = getEventBase<TypeParam>();
428 SKIP_IF(!evbPtr) << "Backend not available";
429 folly::EventBase& eb = *evbPtr;
430 SocketPair sp;
431
432 // Fill up the write buffer before starting
433 size_t initialBytesWritten = writeUntilFull(sp[0]);
434
435 // Register for write events
436 TestHandler handler(&eb, sp[0]);
437 handler.registerHandler(EventHandler::WRITE);
438
439 // Register timeouts to perform two reads
440 ScheduledEvent events[] = {
441 {10, EventHandler::READ, 0, 0},
442 {60, EventHandler::READ, 0, 0},
443 {0, 0, 0, 0},
444 };
445 TimePoint start;
446 scheduleEvents(&eb, sp[1], events);
447
448 // Loop
449 eb.loop();
450 TimePoint end;
451
452 // Since we didn't use the EventHandler::PERSIST flag, the handler should
453 // have only been able to write once, then unregistered itself.
454 ASSERT_EQ(handler.log.size(), 1);
455 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
456 T_CHECK_TIMEOUT(
457 start,
458 handler.log[0].timestamp,
459 std::chrono::milliseconds(events[0].milliseconds));
460 ASSERT_EQ(handler.log[0].bytesRead, 0);
461 ASSERT_GT(handler.log[0].bytesWritten, 0);
462 T_CHECK_TIMEOUT(
463 start, end, std::chrono::milliseconds(events[1].milliseconds));
464
465 ASSERT_EQ(events[0].result, initialBytesWritten);
466 ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
467 }
468
469 /**
470 * Test (WRITE | PERSIST)
471 */
TYPED_TEST_P(EventBaseTest,WritePersist)472 TYPED_TEST_P(EventBaseTest, WritePersist) {
473 auto evbPtr = getEventBase<TypeParam>();
474 SKIP_IF(!evbPtr) << "Backend not available";
475 folly::EventBase& eb = *evbPtr;
476 SocketPair sp;
477
478 // Fill up the write buffer before starting
479 size_t initialBytesWritten = writeUntilFull(sp[0]);
480
481 // Register for write events
482 TestHandler handler(&eb, sp[0]);
483 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
484
485 // Register several timeouts to read from the socket at several intervals
486 ScheduledEvent events[] = {
487 {10, EventHandler::READ, 0, 0},
488 {40, EventHandler::READ, 0, 0},
489 {70, EventHandler::READ, 0, 0},
490 {100, EventHandler::READ, 0, 0},
491 {0, 0, 0, 0},
492 };
493 TimePoint start;
494 scheduleEvents(&eb, sp[1], events);
495
496 // Schedule a timeout to unregister the handler after the third read
497 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
498
499 // Loop
500 eb.loop();
501 TimePoint end;
502
503 // The handler should have received the first 3 events,
504 // then been unregistered after that.
505 ASSERT_EQ(handler.log.size(), 3);
506 ASSERT_EQ(events[0].result, initialBytesWritten);
507 for (int n = 0; n < 3; ++n) {
508 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
509 T_CHECK_TIMEOUT(
510 start,
511 handler.log[n].timestamp,
512 std::chrono::milliseconds(events[n].milliseconds));
513 ASSERT_EQ(handler.log[n].bytesRead, 0);
514 ASSERT_GT(handler.log[n].bytesWritten, 0);
515 ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
516 }
517 T_CHECK_TIMEOUT(
518 start, end, std::chrono::milliseconds(events[3].milliseconds));
519 }
520
521 /**
522 * Test registering for WRITE when the socket is immediately writable
523 */
TYPED_TEST_P(EventBaseTest,WriteImmediate)524 TYPED_TEST_P(EventBaseTest, WriteImmediate) {
525 auto evbPtr = getEventBase<TypeParam>();
526 SKIP_IF(!evbPtr) << "Backend not available";
527 folly::EventBase& eb = *evbPtr;
528 SocketPair sp;
529
530 // Register for write events
531 TestHandler handler(&eb, sp[0]);
532 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
533
534 // Register a timeout to perform a read
535 ScheduledEvent events[] = {
536 {10, EventHandler::READ, 0, 0},
537 {0, 0, 0, 0},
538 };
539 TimePoint start;
540 scheduleEvents(&eb, sp[1], events);
541
542 // Schedule a timeout to unregister the handler
543 int64_t unregisterTimeout = 40;
544 eb.tryRunAfterDelay(
545 std::bind(&TestHandler::unregisterHandler, &handler), unregisterTimeout);
546
547 // Loop
548 eb.loop();
549 TimePoint end;
550
551 ASSERT_EQ(handler.log.size(), 2);
552
553 // Since the socket buffer was initially empty,
554 // there should have been 1 event for immediate writability
555 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
556 T_CHECK_TIMEOUT(
557 start, handler.log[0].timestamp, std::chrono::milliseconds(0));
558 ASSERT_EQ(handler.log[0].bytesRead, 0);
559 ASSERT_GT(handler.log[0].bytesWritten, 0);
560
561 // There should be another event after the timeout wrote more data
562 ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
563 T_CHECK_TIMEOUT(
564 start,
565 handler.log[1].timestamp,
566 std::chrono::milliseconds(events[0].milliseconds));
567 ASSERT_EQ(handler.log[1].bytesRead, 0);
568 ASSERT_GT(handler.log[1].bytesWritten, 0);
569
570 T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(unregisterTimeout));
571 }
572
573 /**
574 * Test (READ | WRITE) when the socket becomes readable first
575 */
TYPED_TEST_P(EventBaseTest,ReadWrite)576 TYPED_TEST_P(EventBaseTest, ReadWrite) {
577 auto evbPtr = getEventBase<TypeParam>();
578 SKIP_IF(!evbPtr) << "Backend not available";
579 folly::EventBase& eb = *evbPtr;
580 SocketPair sp;
581
582 // Fill up the write buffer before starting
583 size_t sock0WriteLength = writeUntilFull(sp[0]);
584
585 // Register for read and write events
586 TestHandler handler(&eb, sp[0]);
587 handler.registerHandler(EventHandler::READ_WRITE);
588
589 // Register timeouts to perform a write then a read.
590 ScheduledEvent events[] = {
591 {10, EventHandler::WRITE, 2345, 0},
592 {40, EventHandler::READ, 0, 0},
593 {0, 0, 0, 0},
594 };
595 TimePoint start;
596 scheduleEvents(&eb, sp[1], events);
597
598 // Loop
599 eb.loop();
600 TimePoint end;
601
602 // Since we didn't use the EventHandler::PERSIST flag, the handler should
603 // have only noticed readability, then unregistered itself. Check that only
604 // one event was logged.
605 ASSERT_EQ(handler.log.size(), 1);
606 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
607 T_CHECK_TIMEOUT(
608 start,
609 handler.log[0].timestamp,
610 std::chrono::milliseconds(events[0].milliseconds));
611 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
612 ASSERT_EQ(handler.log[0].bytesWritten, 0);
613 ASSERT_EQ(events[1].result, sock0WriteLength);
614 T_CHECK_TIMEOUT(
615 start, end, std::chrono::milliseconds(events[1].milliseconds));
616 }
617
618 /**
619 * Test (READ | WRITE) when the socket becomes writable first
620 */
TYPED_TEST_P(EventBaseTest,WriteRead)621 TYPED_TEST_P(EventBaseTest, WriteRead) {
622 auto evbPtr = getEventBase<TypeParam>();
623 SKIP_IF(!evbPtr) << "Backend not available";
624 folly::EventBase& eb = *evbPtr;
625 SocketPair sp;
626
627 // Fill up the write buffer before starting
628 size_t sock0WriteLength = writeUntilFull(sp[0]);
629
630 // Register for read and write events
631 TestHandler handler(&eb, sp[0]);
632 handler.registerHandler(EventHandler::READ_WRITE);
633
634 // Register timeouts to perform a read then a write.
635 size_t sock1WriteLength = 2345;
636 ScheduledEvent events[] = {
637 {10, EventHandler::READ, 0, 0},
638 {40, EventHandler::WRITE, sock1WriteLength, 0},
639 {0, 0, 0, 0},
640 };
641 TimePoint start;
642 scheduleEvents(&eb, sp[1], events);
643
644 // Loop
645 eb.loop();
646 TimePoint end;
647
648 // Since we didn't use the EventHandler::PERSIST flag, the handler should
649 // have only noticed writability, then unregistered itself. Check that only
650 // one event was logged.
651 ASSERT_EQ(handler.log.size(), 1);
652 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
653 T_CHECK_TIMEOUT(
654 start,
655 handler.log[0].timestamp,
656 std::chrono::milliseconds(events[0].milliseconds));
657 ASSERT_EQ(handler.log[0].bytesRead, 0);
658 ASSERT_GT(handler.log[0].bytesWritten, 0);
659 ASSERT_EQ(events[0].result, sock0WriteLength);
660 ASSERT_EQ(events[1].result, sock1WriteLength);
661 T_CHECK_TIMEOUT(
662 start, end, std::chrono::milliseconds(events[1].milliseconds));
663
664 // Make sure the written data is still waiting to be read.
665 size_t bytesRemaining = readUntilEmpty(sp[0]);
666 ASSERT_EQ(bytesRemaining, events[1].length);
667 }
668
669 /**
670 * Test (READ | WRITE) when the socket becomes readable and writable
671 * at the same time.
672 */
TYPED_TEST_P(EventBaseTest,ReadWriteSimultaneous)673 TYPED_TEST_P(EventBaseTest, ReadWriteSimultaneous) {
674 auto evbPtr = getEventBase<TypeParam>();
675 SKIP_IF(!evbPtr) << "Backend not available";
676 folly::EventBase& eb = *evbPtr;
677 SocketPair sp;
678
679 // Fill up the write buffer before starting
680 size_t sock0WriteLength = writeUntilFull(sp[0]);
681
682 // Register for read and write events
683 TestHandler handler(&eb, sp[0]);
684 handler.registerHandler(EventHandler::READ_WRITE);
685
686 // Register a timeout to perform a read and write together
687 ScheduledEvent events[] = {
688 {10, EventHandler::READ | EventHandler::WRITE, 0, 0},
689 {0, 0, 0, 0},
690 };
691 TimePoint start;
692 scheduleEvents(&eb, sp[1], events);
693
694 // Loop
695 eb.loop();
696 TimePoint end;
697
698 // It's not strictly required that the EventBase register us about both
699 // events in the same call or thw read/write notifications are delievered at
700 // the same. So, it's possible that if the EventBase implementation changes
701 // this test could start failing, and it wouldn't be considered breaking the
702 // API. However for now it's nice to exercise this code path.
703 ASSERT_EQ(handler.log.size(), 1);
704 if (handler.log[0].events & EventHandler::READ) {
705 ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
706 ASSERT_GT(handler.log[0].bytesWritten, 0);
707 }
708 T_CHECK_TIMEOUT(
709 start,
710 handler.log[0].timestamp,
711 std::chrono::milliseconds(events[0].milliseconds));
712 T_CHECK_TIMEOUT(
713 start, end, std::chrono::milliseconds(events[0].milliseconds));
714 }
715
716 /**
717 * Test (READ | WRITE | PERSIST)
718 */
TYPED_TEST_P(EventBaseTest,ReadWritePersist)719 TYPED_TEST_P(EventBaseTest, ReadWritePersist) {
720 auto evbPtr = getEventBase<TypeParam>();
721 SKIP_IF(!evbPtr) << "Backend not available";
722 folly::EventBase& eb = *evbPtr;
723 SocketPair sp;
724
725 // Register for read and write events
726 TestHandler handler(&eb, sp[0]);
727 handler.registerHandler(
728 EventHandler::READ | EventHandler::WRITE | EventHandler::PERSIST);
729
730 // Register timeouts to perform several reads and writes
731 ScheduledEvent events[] = {
732 {10, EventHandler::WRITE, 2345, 0},
733 {20, EventHandler::READ, 0, 0},
734 {35, EventHandler::WRITE, 200, 0},
735 {45, EventHandler::WRITE, 15, 0},
736 {55, EventHandler::READ, 0, 0},
737 {120, EventHandler::WRITE, 2345, 0},
738 {0, 0, 0, 0},
739 };
740 TimePoint start;
741 scheduleEvents(&eb, sp[1], events);
742
743 // Schedule a timeout to unregister the handler
744 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
745
746 // Loop
747 eb.loop();
748 TimePoint end;
749
750 ASSERT_EQ(handler.log.size(), 6);
751
752 // Since we didn't fill up the write buffer immediately, there should
753 // be an immediate event for writability.
754 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
755 T_CHECK_TIMEOUT(
756 start, handler.log[0].timestamp, std::chrono::milliseconds(0));
757 ASSERT_EQ(handler.log[0].bytesRead, 0);
758 ASSERT_GT(handler.log[0].bytesWritten, 0);
759
760 // Events 1 through 5 should correspond to the scheduled events
761 for (int n = 1; n < 6; ++n) {
762 ScheduledEvent* event = &events[n - 1];
763 T_CHECK_TIMEOUT(
764 start,
765 handler.log[n].timestamp,
766 std::chrono::milliseconds(event->milliseconds));
767 if (event->events == EventHandler::READ) {
768 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
769 ASSERT_EQ(handler.log[n].bytesRead, 0);
770 ASSERT_GT(handler.log[n].bytesWritten, 0);
771 } else {
772 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
773 ASSERT_EQ(handler.log[n].bytesRead, event->length);
774 ASSERT_EQ(handler.log[n].bytesWritten, 0);
775 }
776 }
777
778 // The timeout should have unregistered the handler before the last write.
779 // Make sure that data is still waiting to be read
780 size_t bytesRemaining = readUntilEmpty(sp[0]);
781 ASSERT_EQ(bytesRemaining, events[5].length);
782 }
783
784 namespace {
785 class PartialReadHandler : public TestHandler {
786 public:
PartialReadHandler(EventBase * eventBase,int fd,size_t readLength)787 PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
788 : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
789
handlerReady(uint16_t events)790 void handlerReady(uint16_t events) noexcept override {
791 assert(events == EventHandler::READ);
792 ssize_t bytesRead = readFromFD(fd_, readLength_);
793 log.emplace_back(events, bytesRead, 0);
794 }
795
796 private:
797 int fd_;
798 size_t readLength_;
799 };
800 } // namespace
801
802 /**
803 * Test reading only part of the available data when a read event is fired.
804 * When PERSIST is used, make sure the handler gets notified again the next
805 * time around the loop.
806 */
TYPED_TEST_P(EventBaseTest,ReadPartial)807 TYPED_TEST_P(EventBaseTest, ReadPartial) {
808 auto evbPtr = getEventBase<TypeParam>();
809 SKIP_IF(!evbPtr) << "Backend not available";
810 folly::EventBase& eb = *evbPtr;
811 SocketPair sp;
812
813 // Register for read events
814 size_t readLength = 100;
815 PartialReadHandler handler(&eb, sp[0], readLength);
816 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
817
818 // Register a timeout to perform a single write,
819 // with more data than PartialReadHandler will read at once
820 ScheduledEvent events[] = {
821 {10, EventHandler::WRITE, (3 * readLength) + (readLength / 2), 0},
822 {0, 0, 0, 0},
823 };
824 TimePoint start;
825 scheduleEvents(&eb, sp[1], events);
826
827 // Schedule a timeout to unregister the handler
828 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
829
830 // Loop
831 eb.loop();
832 TimePoint end;
833
834 ASSERT_EQ(handler.log.size(), 4);
835
836 // The first 3 invocations should read readLength bytes each
837 for (int n = 0; n < 3; ++n) {
838 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
839 T_CHECK_TIMEOUT(
840 start,
841 handler.log[n].timestamp,
842 std::chrono::milliseconds(events[0].milliseconds));
843 ASSERT_EQ(handler.log[n].bytesRead, readLength);
844 ASSERT_EQ(handler.log[n].bytesWritten, 0);
845 }
846 // The last read only has readLength/2 bytes
847 ASSERT_EQ(handler.log[3].events, EventHandler::READ);
848 T_CHECK_TIMEOUT(
849 start,
850 handler.log[3].timestamp,
851 std::chrono::milliseconds(events[0].milliseconds));
852 ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
853 ASSERT_EQ(handler.log[3].bytesWritten, 0);
854 }
855
856 namespace {
857 class PartialWriteHandler : public TestHandler {
858 public:
PartialWriteHandler(EventBase * eventBase,int fd,size_t writeLength)859 PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
860 : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
861
handlerReady(uint16_t events)862 void handlerReady(uint16_t events) noexcept override {
863 assert(events == EventHandler::WRITE);
864 ssize_t bytesWritten = writeToFD(fd_, writeLength_);
865 log.emplace_back(events, 0, bytesWritten);
866 }
867
868 private:
869 int fd_;
870 size_t writeLength_;
871 };
872 } // namespace
873
874 /**
875 * Test writing without completely filling up the write buffer when the fd
876 * becomes writable. When PERSIST is used, make sure the handler gets
877 * notified again the next time around the loop.
878 */
TYPED_TEST_P(EventBaseTest,WritePartial)879 TYPED_TEST_P(EventBaseTest, WritePartial) {
880 auto evbPtr = getEventBase<TypeParam>();
881 SKIP_IF(!evbPtr) << "Backend not available";
882 folly::EventBase& eb = *evbPtr;
883 SocketPair sp;
884
885 // Fill up the write buffer before starting
886 size_t initialBytesWritten = writeUntilFull(sp[0]);
887
888 // Register for write events
889 size_t writeLength = 100;
890 PartialWriteHandler handler(&eb, sp[0], writeLength);
891 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
892
893 // Register a timeout to read, so that more data can be written
894 ScheduledEvent events[] = {
895 {10, EventHandler::READ, 0, 0},
896 {0, 0, 0, 0},
897 };
898 TimePoint start;
899 scheduleEvents(&eb, sp[1], events);
900
901 // Schedule a timeout to unregister the handler
902 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
903
904 // Loop
905 eb.loop();
906 TimePoint end;
907
908 // Depending on how big the socket buffer is, there will be multiple writes
909 // Only check the first 5
910 int numChecked = 5;
911 ASSERT_GE(handler.log.size(), numChecked);
912 ASSERT_EQ(events[0].result, initialBytesWritten);
913
914 // The first 3 invocations should read writeLength bytes each
915 for (int n = 0; n < numChecked; ++n) {
916 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
917 T_CHECK_TIMEOUT(
918 start,
919 handler.log[n].timestamp,
920 std::chrono::milliseconds(events[0].milliseconds));
921 ASSERT_EQ(handler.log[n].bytesRead, 0);
922 ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
923 }
924 }
925
926 namespace {
927 class DestroyHandler : public AsyncTimeout {
928 public:
DestroyHandler(EventBase * eb,EventHandler * h)929 DestroyHandler(EventBase* eb, EventHandler* h)
930 : AsyncTimeout(eb), handler_(h) {}
931
timeoutExpired()932 void timeoutExpired() noexcept override { delete handler_; }
933
934 private:
935 EventHandler* handler_;
936 };
937 } // namespace
938
939 /**
940 * Test destroying a registered EventHandler
941 */
TYPED_TEST_P(EventBaseTest,DestroyingHandler)942 TYPED_TEST_P(EventBaseTest, DestroyingHandler) {
943 auto evbPtr = getEventBase<TypeParam>();
944 SKIP_IF(!evbPtr) << "Backend not available";
945 folly::EventBase& eb = *evbPtr;
946 SocketPair sp;
947
948 // Fill up the write buffer before starting
949 size_t initialBytesWritten = writeUntilFull(sp[0]);
950
951 // Register for write events
952 TestHandler* handler = new TestHandler(&eb, sp[0]);
953 handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
954
955 // After 10ms, read some data, so that the handler
956 // will be notified that it can write.
957 eb.tryRunAfterDelay(
958 std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten), 10);
959
960 // Start a timer to destroy the handler after 25ms
961 // This mainly just makes sure the code doesn't break or assert
962 DestroyHandler dh(&eb, handler);
963 dh.scheduleTimeout(25);
964
965 TimePoint start;
966 eb.loop();
967 TimePoint end;
968
969 // Make sure the EventHandler was uninstalled properly when it was
970 // destroyed, and the EventBase loop exited
971 T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(25));
972
973 // Make sure that the handler wrote data to the socket
974 // before it was destroyed
975 size_t bytesRemaining = readUntilEmpty(sp[1]);
976 ASSERT_GT(bytesRemaining, 0);
977 }
978
979 ///////////////////////////////////////////////////////////////////////////
980 // Tests for timeout events
981 ///////////////////////////////////////////////////////////////////////////
982
TYPED_TEST_P(EventBaseTest,RunAfterDelay)983 TYPED_TEST_P(EventBaseTest, RunAfterDelay) {
984 auto evbPtr = getEventBase<TypeParam>();
985 SKIP_IF(!evbPtr) << "Backend not available";
986 folly::EventBase& eb = *evbPtr;
987
988 TimePoint timestamp1(false);
989 TimePoint timestamp2(false);
990 TimePoint timestamp3(false);
991 auto fn1 = std::bind(&TimePoint::reset, ×tamp1);
992 auto fn2 = std::bind(&TimePoint::reset, ×tamp2);
993 auto fn3 = std::bind(&TimePoint::reset, ×tamp3);
994
995 TimePoint start;
996 eb.tryRunAfterDelay(std::move(fn1), 10);
997 eb.tryRunAfterDelay(std::move(fn2), 20);
998 eb.tryRunAfterDelay(std::move(fn3), 40);
999
1000 eb.loop();
1001 TimePoint end;
1002
1003 T_CHECK_TIMEOUT(start, timestamp1, std::chrono::milliseconds(10));
1004 T_CHECK_TIMEOUT(start, timestamp2, std::chrono::milliseconds(20));
1005 T_CHECK_TIMEOUT(start, timestamp3, std::chrono::milliseconds(40));
1006 T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(40));
1007 }
1008
1009 /**
1010 * Test the behavior of tryRunAfterDelay() when some timeouts are
1011 * still scheduled when the EventBase is destroyed.
1012 */
TYPED_TEST_P(EventBaseTest,RunAfterDelayDestruction)1013 TYPED_TEST_P(EventBaseTest, RunAfterDelayDestruction) {
1014 TimePoint timestamp1(false);
1015 TimePoint timestamp2(false);
1016 TimePoint timestamp3(false);
1017 TimePoint timestamp4(false);
1018 TimePoint start(false);
1019 TimePoint end(false);
1020
1021 {
1022 auto evbPtr = getEventBase<TypeParam>();
1023 SKIP_IF(!evbPtr) << "Backend not available";
1024 folly::EventBase& eb = *evbPtr;
1025 start.reset();
1026
1027 // Run two normal timeouts
1028 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
1029 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
1030
1031 // Schedule a timeout to stop the event loop after 40ms
1032 eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
1033
1034 // Schedule 2 timeouts that would fire after the event loop stops
1035 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80);
1036 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160);
1037
1038 eb.loop();
1039 end.reset();
1040 }
1041
1042 T_CHECK_TIMEOUT(start, timestamp1, std::chrono::milliseconds(10));
1043 T_CHECK_TIMEOUT(start, timestamp2, std::chrono::milliseconds(20));
1044 T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(40));
1045
1046 ASSERT_TRUE(timestamp3.isUnset());
1047 ASSERT_TRUE(timestamp4.isUnset());
1048
1049 // Ideally this test should be run under valgrind to ensure that no
1050 // memory is leaked.
1051 }
1052
1053 namespace {
1054 class TestTimeout : public AsyncTimeout {
1055 public:
TestTimeout(EventBase * eventBase)1056 explicit TestTimeout(EventBase* eventBase)
1057 : AsyncTimeout(eventBase), timestamp(false) {}
1058
timeoutExpired()1059 void timeoutExpired() noexcept override { timestamp.reset(); }
1060
1061 TimePoint timestamp;
1062 };
1063 } // namespace
1064
TYPED_TEST_P(EventBaseTest,BasicTimeouts)1065 TYPED_TEST_P(EventBaseTest, BasicTimeouts) {
1066 auto evbPtr = getEventBase<TypeParam>();
1067 SKIP_IF(!evbPtr) << "Backend not available";
1068 folly::EventBase& eb = *evbPtr;
1069
1070 TestTimeout t1(&eb);
1071 TestTimeout t2(&eb);
1072 TestTimeout t3(&eb);
1073 TimePoint start;
1074 t1.scheduleTimeout(10);
1075 t2.scheduleTimeout(20);
1076 t3.scheduleTimeout(40);
1077
1078 eb.loop();
1079 TimePoint end;
1080
1081 T_CHECK_TIMEOUT(start, t1.timestamp, std::chrono::milliseconds(10));
1082 T_CHECK_TIMEOUT(start, t2.timestamp, std::chrono::milliseconds(20));
1083 T_CHECK_TIMEOUT(start, t3.timestamp, std::chrono::milliseconds(40));
1084 T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(40));
1085 }
1086
1087 namespace {
1088 class ReschedulingTimeout : public AsyncTimeout {
1089 public:
ReschedulingTimeout(EventBase * evb,const std::vector<uint32_t> & timeouts)1090 ReschedulingTimeout(EventBase* evb, const std::vector<uint32_t>& timeouts)
1091 : AsyncTimeout(evb), timeouts_(timeouts), iterator_(timeouts_.begin()) {}
1092
start()1093 void start() { reschedule(); }
1094
timeoutExpired()1095 void timeoutExpired() noexcept override {
1096 timestamps.emplace_back();
1097 reschedule();
1098 }
1099
reschedule()1100 void reschedule() {
1101 if (iterator_ != timeouts_.end()) {
1102 uint32_t timeout = *iterator_;
1103 ++iterator_;
1104 scheduleTimeout(timeout);
1105 }
1106 }
1107
1108 std::vector<TimePoint> timestamps;
1109
1110 private:
1111 std::vector<uint32_t> timeouts_;
1112 std::vector<uint32_t>::const_iterator iterator_;
1113 };
1114 } // namespace
1115
1116 /**
1117 * Test rescheduling the same timeout multiple times
1118 */
TYPED_TEST_P(EventBaseTest,ReuseTimeout)1119 TYPED_TEST_P(EventBaseTest, ReuseTimeout) {
1120 auto evbPtr = getEventBase<TypeParam>();
1121 SKIP_IF(!evbPtr) << "Backend not available";
1122 folly::EventBase& eb = *evbPtr;
1123
1124 std::vector<uint32_t> timeouts;
1125 timeouts.push_back(10);
1126 timeouts.push_back(30);
1127 timeouts.push_back(15);
1128
1129 ReschedulingTimeout t(&eb, timeouts);
1130 TimePoint start;
1131 t.start();
1132 eb.loop();
1133 TimePoint end;
1134
1135 // Use a higher tolerance than usual. We're waiting on 3 timeouts
1136 // consecutively. In general, each timeout may go over by a few
1137 // milliseconds, and we're tripling this error by witing on 3 timeouts.
1138 std::chrono::milliseconds tolerance{6};
1139
1140 ASSERT_EQ(timeouts.size(), t.timestamps.size());
1141 uint32_t total = 0;
1142 for (size_t n = 0; n < timeouts.size(); ++n) {
1143 total += timeouts[n];
1144 T_CHECK_TIMEOUT(
1145 start, t.timestamps[n], std::chrono::milliseconds(total), tolerance);
1146 }
1147 T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(total), tolerance);
1148 }
1149
1150 /**
1151 * Test rescheduling a timeout before it has fired
1152 */
TYPED_TEST_P(EventBaseTest,RescheduleTimeout)1153 TYPED_TEST_P(EventBaseTest, RescheduleTimeout) {
1154 auto evbPtr = getEventBase<TypeParam>();
1155 SKIP_IF(!evbPtr) << "Backend not available";
1156 folly::EventBase& eb = *evbPtr;
1157
1158 TestTimeout t1(&eb);
1159 TestTimeout t2(&eb);
1160 TestTimeout t3(&eb);
1161
1162 TimePoint start;
1163 t1.scheduleTimeout(15);
1164 t2.scheduleTimeout(30);
1165 t3.scheduleTimeout(30);
1166
1167 // after 10ms, reschedule t2 to run sooner than originally scheduled
1168 eb.tryRunAfterDelay([&] { t2.scheduleTimeout(10); }, 10);
1169 // after 10ms, reschedule t3 to run later than originally scheduled
1170 eb.tryRunAfterDelay([&] { t3.scheduleTimeout(40); }, 10);
1171
1172 eb.loop();
1173 TimePoint end;
1174
1175 T_CHECK_TIMEOUT(start, t1.timestamp, std::chrono::milliseconds(15));
1176 T_CHECK_TIMEOUT(start, t2.timestamp, std::chrono::milliseconds(20));
1177 T_CHECK_TIMEOUT(start, t3.timestamp, std::chrono::milliseconds(50));
1178 T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(50));
1179 }
1180
1181 /**
1182 * Test cancelling a timeout
1183 */
TYPED_TEST_P(EventBaseTest,CancelTimeout)1184 TYPED_TEST_P(EventBaseTest, CancelTimeout) {
1185 auto evbPtr = getEventBase<TypeParam>();
1186 SKIP_IF(!evbPtr) << "Backend not available";
1187 folly::EventBase& eb = *evbPtr;
1188
1189 std::vector<uint32_t> timeouts;
1190 timeouts.push_back(10);
1191 timeouts.push_back(30);
1192 timeouts.push_back(25);
1193
1194 ReschedulingTimeout t(&eb, timeouts);
1195 TimePoint start;
1196 t.start();
1197 eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1198
1199 eb.loop();
1200 TimePoint end;
1201
1202 ASSERT_EQ(t.timestamps.size(), 2);
1203 T_CHECK_TIMEOUT(start, t.timestamps[0], std::chrono::milliseconds(10));
1204 T_CHECK_TIMEOUT(start, t.timestamps[1], std::chrono::milliseconds(40));
1205 T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(50));
1206 }
1207
1208 namespace {
1209 class DestroyTimeout : public AsyncTimeout {
1210 public:
DestroyTimeout(EventBase * eb,AsyncTimeout * t)1211 DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1212 : AsyncTimeout(eb), timeout_(t) {}
1213
timeoutExpired()1214 void timeoutExpired() noexcept override { delete timeout_; }
1215
1216 private:
1217 AsyncTimeout* timeout_;
1218 };
1219 } // namespace
1220
1221 /**
1222 * Test destroying a scheduled timeout object
1223 */
TYPED_TEST_P(EventBaseTest,DestroyingTimeout)1224 TYPED_TEST_P(EventBaseTest, DestroyingTimeout) {
1225 auto evbPtr = getEventBase<TypeParam>();
1226 SKIP_IF(!evbPtr) << "Backend not available";
1227 folly::EventBase& eb = *evbPtr;
1228
1229 TestTimeout* t1 = new TestTimeout(&eb);
1230 TimePoint start;
1231 t1->scheduleTimeout(30);
1232
1233 DestroyTimeout dt(&eb, t1);
1234 dt.scheduleTimeout(10);
1235
1236 eb.loop();
1237 TimePoint end;
1238
1239 T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(10));
1240 }
1241
1242 /**
1243 * Test the scheduled executor impl
1244 */
TYPED_TEST_P(EventBaseTest,ScheduledFn)1245 TYPED_TEST_P(EventBaseTest, ScheduledFn) {
1246 auto evbPtr = getEventBase<TypeParam>();
1247 SKIP_IF(!evbPtr) << "Backend not available";
1248 folly::EventBase& eb = *evbPtr;
1249
1250 TimePoint timestamp1(false);
1251 TimePoint timestamp2(false);
1252 TimePoint timestamp3(false);
1253 auto fn1 = std::bind(&TimePoint::reset, ×tamp1);
1254 auto fn2 = std::bind(&TimePoint::reset, ×tamp2);
1255 auto fn3 = std::bind(&TimePoint::reset, ×tamp3);
1256 TimePoint start;
1257 eb.schedule(std::move(fn1), std::chrono::milliseconds(9));
1258 eb.schedule(std::move(fn2), std::chrono::milliseconds(19));
1259 eb.schedule(std::move(fn3), std::chrono::milliseconds(39));
1260
1261 eb.loop();
1262 TimePoint end;
1263
1264 T_CHECK_TIMEOUT(start, timestamp1, std::chrono::milliseconds(9));
1265 T_CHECK_TIMEOUT(start, timestamp2, std::chrono::milliseconds(19));
1266 T_CHECK_TIMEOUT(start, timestamp3, std::chrono::milliseconds(39));
1267 T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(39));
1268 }
1269
TYPED_TEST_P(EventBaseTest,ScheduledFnAt)1270 TYPED_TEST_P(EventBaseTest, ScheduledFnAt) {
1271 auto evbPtr = getEventBase<TypeParam>();
1272 SKIP_IF(!evbPtr) << "Backend not available";
1273 folly::EventBase& eb = *evbPtr;
1274
1275 TimePoint timestamp0(false);
1276 TimePoint timestamp1(false);
1277 TimePoint timestamp2(false);
1278 TimePoint timestamp3(false);
1279 auto fn0 = std::bind(&TimePoint::reset, ×tamp0);
1280 auto fn1 = std::bind(&TimePoint::reset, ×tamp1);
1281 auto fn2 = std::bind(&TimePoint::reset, ×tamp2);
1282 auto fn3 = std::bind(&TimePoint::reset, ×tamp3);
1283 TimePoint start;
1284 eb.scheduleAt(fn0, eb.now() - std::chrono::milliseconds(5));
1285 eb.scheduleAt(fn1, eb.now() + std::chrono::milliseconds(9));
1286 eb.scheduleAt(fn2, eb.now() + std::chrono::milliseconds(19));
1287 eb.scheduleAt(fn3, eb.now() + std::chrono::milliseconds(39));
1288
1289 TimePoint loopStart;
1290 eb.loop();
1291 TimePoint end;
1292
1293 // Even though we asked to schedule the first function in the past,
1294 // in practice it doesn't run until after 1 iteration of the HHWheelTimer tick
1295 // interval.
1296 T_CHECK_TIMEOUT(start, timestamp0, eb.timer().getTickInterval());
1297
1298 T_CHECK_TIMEOUT(start, timestamp1, std::chrono::milliseconds(9));
1299 T_CHECK_TIMEOUT(start, timestamp2, std::chrono::milliseconds(19));
1300 T_CHECK_TIMEOUT(start, timestamp3, std::chrono::milliseconds(39));
1301 T_CHECK_TIMEOUT(start, end, std::chrono::milliseconds(39));
1302 }
1303
1304 ///////////////////////////////////////////////////////////////////////////
1305 // Test for runInThreadTestFunc()
1306 ///////////////////////////////////////////////////////////////////////////
1307
1308 namespace {
1309
1310 struct RunInThreadData {
RunInThreadDataRunInThreadData1311 RunInThreadData(
1312 folly::EventBaseBackendBase::FactoryFunc backendFactory,
1313 int numThreads,
1314 int opsPerThread_)
1315 : evb(folly::EventBase::Options().setBackendFactory(
1316 std::move(backendFactory))),
1317 opsPerThread(opsPerThread_),
1318 opsToGo(numThreads * opsPerThread) {}
1319
1320 EventBase evb;
1321 std::deque<std::pair<int, int>> values;
1322
1323 int opsPerThread;
1324 int opsToGo;
1325 };
1326
1327 struct RunInThreadArg {
RunInThreadArgRunInThreadArg1328 RunInThreadArg(RunInThreadData* data_, int threadId, int value_)
1329 : data(data_), thread(threadId), value(value_) {}
1330
1331 RunInThreadData* data;
1332 int thread;
1333 int value;
1334 };
1335
runInThreadTestFunc(RunInThreadArg * arg)1336 static inline void runInThreadTestFunc(RunInThreadArg* arg) {
1337 arg->data->values.emplace_back(arg->thread, arg->value);
1338 RunInThreadData* data = arg->data;
1339 delete arg;
1340
1341 if (--data->opsToGo == 0) {
1342 // Break out of the event base loop if we are the last thread running
1343 data->evb.terminateLoopSoon();
1344 }
1345 }
1346
1347 } // namespace
1348
TYPED_TEST_P(EventBaseTest,RunInThread)1349 TYPED_TEST_P(EventBaseTest, RunInThread) {
1350 constexpr uint32_t numThreads = 50;
1351 constexpr uint32_t opsPerThread = 100;
1352 auto backend = TypeParam::getBackend();
1353 SKIP_IF(!backend) << "Backend not available";
1354 RunInThreadData data(
1355 [] { return TypeParam::getBackend(); }, numThreads, opsPerThread);
1356
1357 std::deque<std::thread> threads;
1358 SCOPE_EXIT {
1359 // Wait on all of the threads.
1360 for (auto& thread : threads) {
1361 thread.join();
1362 }
1363 };
1364
1365 for (uint32_t i = 0; i < numThreads; ++i) {
1366 threads.emplace_back([i, &data] {
1367 for (int n = 0; n < data.opsPerThread; ++n) {
1368 RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1369 data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1370 usleep(10);
1371 }
1372 });
1373 }
1374
1375 // Add a timeout event to run after 3 seconds.
1376 // Otherwise loop() will return immediately since there are no events to run.
1377 // Once the last thread exits, it will stop the loop(). However, this
1378 // timeout also stops the loop in case there is a bug performing the normal
1379 // stop.
1380 data.evb.tryRunAfterDelay(
1381 std::bind(&EventBase::terminateLoopSoon, &data.evb), 3000);
1382
1383 TimePoint start;
1384 data.evb.loop();
1385 TimePoint end;
1386
1387 // Verify that the loop exited because all threads finished and requested it
1388 // to stop. This should happen much sooner than the 3 second timeout.
1389 // Assert that it happens in under a second. (This is still tons of extra
1390 // padding.)
1391
1392 auto timeTaken = std::chrono::duration_cast<std::chrono::milliseconds>(
1393 end.getTime() - start.getTime());
1394 ASSERT_LT(timeTaken.count(), 1000);
1395 VLOG(11) << "Time taken: " << timeTaken.count();
1396
1397 // Verify that we have all of the events from every thread
1398 int expectedValues[numThreads];
1399 for (uint32_t n = 0; n < numThreads; ++n) {
1400 expectedValues[n] = 0;
1401 }
1402 for (const auto& dataValue : data.values) {
1403 int threadID = dataValue.first;
1404 int value = dataValue.second;
1405 ASSERT_EQ(expectedValues[threadID], value);
1406 ++expectedValues[threadID];
1407 }
1408 for (uint32_t n = 0; n < numThreads; ++n) {
1409 ASSERT_EQ(expectedValues[n], opsPerThread);
1410 }
1411 }
1412
1413 // This test simulates some calls, and verifies that the waiting happens by
1414 // triggering what otherwise would be race conditions, and trying to detect
1415 // whether any of the race conditions happened.
TYPED_TEST_P(EventBaseTest,RunInEventBaseThreadAndWait)1416 TYPED_TEST_P(EventBaseTest, RunInEventBaseThreadAndWait) {
1417 const size_t c = 256;
1418 std::vector<std::unique_ptr<EventBase>> evbs;
1419 for (size_t i = 0; i < c; ++i) {
1420 auto evbPtr = getEventBase<TypeParam>();
1421 SKIP_IF(!evbPtr) << "Backend not available";
1422 evbs.push_back(std::move(evbPtr));
1423 }
1424
1425 std::vector<std::unique_ptr<std::atomic<size_t>>> atoms(c);
1426 for (size_t i = 0; i < c; ++i) {
1427 auto& atom = atoms.at(i);
1428 atom = std::make_unique<std::atomic<size_t>>(0);
1429 }
1430 std::vector<std::thread> threads;
1431 for (size_t i = 0; i < c; ++i) {
1432 threads.emplace_back([&atoms, i, evb = std::move(evbs[i])] {
1433 folly::EventBase& eb = *evb;
1434 auto& atom = *atoms.at(i);
1435 auto ebth = std::thread([&] { eb.loopForever(); });
1436 eb.waitUntilRunning();
1437 eb.runInEventBaseThreadAndWait([&] {
1438 size_t x = 0;
1439 atom.compare_exchange_weak(
1440 x, 1, std::memory_order_release, std::memory_order_relaxed);
1441 });
1442 size_t x = 0;
1443 atom.compare_exchange_weak(
1444 x, 2, std::memory_order_release, std::memory_order_relaxed);
1445 eb.terminateLoopSoon();
1446 ebth.join();
1447 });
1448 }
1449 for (size_t i = 0; i < c; ++i) {
1450 auto& th = threads.at(i);
1451 th.join();
1452 }
1453 size_t sum = 0;
1454 for (auto& atom : atoms) {
1455 sum += *atom;
1456 }
1457 EXPECT_EQ(c, sum);
1458 }
1459
TYPED_TEST_P(EventBaseTest,RunImmediatelyOrRunInEventBaseThreadAndWaitCross)1460 TYPED_TEST_P(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1461 auto evbPtr = getEventBase<TypeParam>();
1462 SKIP_IF(!evbPtr) << "Backend not available";
1463 folly::EventBase& eb = *evbPtr;
1464 std::thread th(&EventBase::loopForever, &eb);
1465 SCOPE_EXIT {
1466 eb.terminateLoopSoon();
1467 th.join();
1468 };
1469 auto mutated = false;
1470 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
1471 EXPECT_TRUE(mutated);
1472 }
1473
TYPED_TEST_P(EventBaseTest,RunImmediatelyOrRunInEventBaseThreadAndWaitWithin)1474 TYPED_TEST_P(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1475 auto evbPtr = getEventBase<TypeParam>();
1476 SKIP_IF(!evbPtr) << "Backend not available";
1477 folly::EventBase& eb = *evbPtr;
1478 std::thread th(&EventBase::loopForever, &eb);
1479 SCOPE_EXIT {
1480 eb.terminateLoopSoon();
1481 th.join();
1482 };
1483 eb.runInEventBaseThreadAndWait([&] {
1484 auto mutated = false;
1485 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
1486 EXPECT_TRUE(mutated);
1487 });
1488 }
1489
TYPED_TEST_P(EventBaseTest,RunImmediatelyOrRunInEventBaseThreadNotLooping)1490 TYPED_TEST_P(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1491 auto evbPtr = getEventBase<TypeParam>();
1492 SKIP_IF(!evbPtr) << "Backend not available";
1493 folly::EventBase& eb = *evbPtr;
1494 auto mutated = false;
1495 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
1496 EXPECT_TRUE(mutated);
1497 }
1498
1499 ///////////////////////////////////////////////////////////////////////////
1500 // Tests for runInLoop()
1501 ///////////////////////////////////////////////////////////////////////////
1502
1503 namespace {
1504 class CountedLoopCallback : public EventBase::LoopCallback {
1505 public:
1506 CountedLoopCallback(
1507 EventBase* eventBase,
1508 unsigned int count,
1509 std::function<void()> action = std::function<void()>())
eventBase_(eventBase)1510 : eventBase_(eventBase), count_(count), action_(action) {}
1511
runLoopCallback()1512 void runLoopCallback() noexcept override {
1513 --count_;
1514 if (count_ > 0) {
1515 eventBase_->runInLoop(this);
1516 } else if (action_) {
1517 action_();
1518 }
1519 }
1520
getCount()1521 unsigned int getCount() const { return count_; }
1522
1523 private:
1524 EventBase* eventBase_;
1525 unsigned int count_;
1526 std::function<void()> action_;
1527 };
1528 } // namespace
1529
1530 // Test that EventBase::loop() doesn't exit while there are
1531 // still LoopCallbacks remaining to be invoked.
TYPED_TEST_P(EventBaseTest,RepeatedRunInLoop)1532 TYPED_TEST_P(EventBaseTest, RepeatedRunInLoop) {
1533 auto evbPtr = getEventBase<TypeParam>();
1534 SKIP_IF(!evbPtr) << "Backend not available";
1535 folly::EventBase& eventBase = *evbPtr;
1536
1537 CountedLoopCallback c(&eventBase, 10);
1538 eventBase.runInLoop(&c);
1539 // The callback shouldn't have run immediately
1540 ASSERT_EQ(c.getCount(), 10);
1541 eventBase.loop();
1542
1543 // loop() should loop until the CountedLoopCallback stops
1544 // re-installing itself.
1545 ASSERT_EQ(c.getCount(), 0);
1546 }
1547
1548 // Test that EventBase::loop() works as expected without time measurements.
TYPED_TEST_P(EventBaseTest,RunInLoopNoTimeMeasurement)1549 TYPED_TEST_P(EventBaseTest, RunInLoopNoTimeMeasurement) {
1550 auto evbPtr = getEventBase<TypeParam>(
1551 EventBase::Options().setSkipTimeMeasurement(true));
1552 SKIP_IF(!evbPtr) << "Backend not available";
1553 folly::EventBase& eventBase = *evbPtr;
1554
1555 CountedLoopCallback c(&eventBase, 10);
1556 eventBase.runInLoop(&c);
1557 // The callback shouldn't have run immediately
1558 ASSERT_EQ(c.getCount(), 10);
1559 eventBase.loop();
1560
1561 // loop() should loop until the CountedLoopCallback stops
1562 // re-installing itself.
1563 ASSERT_EQ(c.getCount(), 0);
1564 }
1565
1566 // Test runInLoop() calls with terminateLoopSoon()
TYPED_TEST_P(EventBaseTest,RunInLoopStopLoop)1567 TYPED_TEST_P(EventBaseTest, RunInLoopStopLoop) {
1568 auto evbPtr = getEventBase<TypeParam>();
1569 SKIP_IF(!evbPtr) << "Backend not available";
1570 folly::EventBase& eventBase = *evbPtr;
1571
1572 CountedLoopCallback c1(&eventBase, 20);
1573 CountedLoopCallback c2(
1574 &eventBase, 10, std::bind(&EventBase::terminateLoopSoon, &eventBase));
1575
1576 eventBase.runInLoop(&c1);
1577 eventBase.runInLoop(&c2);
1578 ASSERT_EQ(c1.getCount(), 20);
1579 ASSERT_EQ(c2.getCount(), 10);
1580
1581 eventBase.loopForever();
1582
1583 // c2 should have stopped the loop after 10 iterations
1584 ASSERT_EQ(c2.getCount(), 0);
1585
1586 // We allow the EventBase to run the loop callbacks in whatever order it
1587 // chooses. We'll accept c1's count being either 10 (if the loop terminated
1588 // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1589 // before c1 ran).
1590 //
1591 // (With the current code, c1 will always run 10 times, but we don't consider
1592 // this a hard API requirement.)
1593 ASSERT_GE(c1.getCount(), 10);
1594 ASSERT_LE(c1.getCount(), 11);
1595 }
1596
TYPED_TEST_P(EventBaseTest1,pidCheck)1597 TYPED_TEST_P(EventBaseTest1, pidCheck) {
1598 auto evbPtr = getEventBase<TypeParam>();
1599 SKIP_IF(!evbPtr) << "Backend not available";
1600
1601 auto deadManWalking = [&]() { evbPtr->loopForever(); };
1602 EXPECT_DEATH(deadManWalking(), "pid");
1603 }
1604
TYPED_TEST_P(EventBaseTest,messageAvailableException)1605 TYPED_TEST_P(EventBaseTest, messageAvailableException) {
1606 auto evbPtr = getEventBase<TypeParam>();
1607 SKIP_IF(!evbPtr) << "Backend not available";
1608
1609 auto deadManWalking = []() {
1610 auto evb = getEventBase<TypeParam>();
1611 std::thread t([&] {
1612 // Call this from another thread to force use of NotificationQueue in
1613 // runInEventBaseThread
1614 evb->runInEventBaseThread([]() { throw std::runtime_error("boom"); });
1615 });
1616 t.join();
1617 evb->loopForever();
1618 };
1619 EXPECT_DEATH(deadManWalking(), "boom");
1620 }
1621
TYPED_TEST_P(EventBaseTest,TryRunningAfterTerminate)1622 TYPED_TEST_P(EventBaseTest, TryRunningAfterTerminate) {
1623 auto eventBasePtr = getEventBase<TypeParam>();
1624 SKIP_IF(!eventBasePtr) << "Backend not available";
1625 folly::EventBase& eventBase = *eventBasePtr;
1626
1627 bool ran = false;
1628 CountedLoopCallback c1(
1629 &eventBase, 1, std::bind(&EventBase::terminateLoopSoon, &eventBase));
1630 eventBase.runInLoop(&c1);
1631 eventBase.loopForever();
1632 eventBase.runInEventBaseThread([&]() { ran = true; });
1633
1634 ASSERT_FALSE(ran);
1635
1636 eventBasePtr.reset();
1637 // Loop callbacks are triggered on EventBase destruction
1638 ASSERT_TRUE(ran);
1639 }
1640
1641 // Test cancelling runInLoop() callbacks
TYPED_TEST_P(EventBaseTest,CancelRunInLoop)1642 TYPED_TEST_P(EventBaseTest, CancelRunInLoop) {
1643 auto eventBasePtr = getEventBase<TypeParam>();
1644 SKIP_IF(!eventBasePtr) << "Backend not available";
1645 folly::EventBase& eventBase = *eventBasePtr;
1646
1647 CountedLoopCallback c1(&eventBase, 20);
1648 CountedLoopCallback c2(&eventBase, 20);
1649 CountedLoopCallback c3(&eventBase, 20);
1650
1651 std::function<void()> cancelC1Action =
1652 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1653 std::function<void()> cancelC2Action =
1654 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1655
1656 CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1657 CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1658
1659 // Install cancelC1 after c1
1660 eventBase.runInLoop(&c1);
1661 eventBase.runInLoop(&cancelC1);
1662
1663 // Install cancelC2 before c2
1664 eventBase.runInLoop(&cancelC2);
1665 eventBase.runInLoop(&c2);
1666
1667 // Install c3
1668 eventBase.runInLoop(&c3);
1669
1670 ASSERT_EQ(c1.getCount(), 20);
1671 ASSERT_EQ(c2.getCount(), 20);
1672 ASSERT_EQ(c3.getCount(), 20);
1673 ASSERT_EQ(cancelC1.getCount(), 10);
1674 ASSERT_EQ(cancelC2.getCount(), 10);
1675
1676 // Run the loop
1677 eventBase.loop();
1678
1679 // cancelC1 and cancelC2 should have both fired after 10 iterations and
1680 // stopped re-installing themselves
1681 ASSERT_EQ(cancelC1.getCount(), 0);
1682 ASSERT_EQ(cancelC2.getCount(), 0);
1683 // c3 should have continued on for the full 20 iterations
1684 ASSERT_EQ(c3.getCount(), 0);
1685
1686 // c1 and c2 should have both been cancelled on the 10th iteration.
1687 //
1688 // Callbacks are always run in the order they are installed,
1689 // so c1 should have fired 10 times, and been canceled after it ran on the
1690 // 10th iteration. c2 should have only fired 9 times, because cancelC2 will
1691 // have run before it on the 10th iteration, and cancelled it before it
1692 // fired.
1693 ASSERT_EQ(c1.getCount(), 10);
1694 ASSERT_EQ(c2.getCount(), 11);
1695 }
1696
1697 namespace {
1698 class TerminateTestCallback : public EventBase::LoopCallback,
1699 public EventHandler {
1700 public:
TerminateTestCallback(EventBase * eventBase,int fd)1701 TerminateTestCallback(EventBase* eventBase, int fd)
1702 : EventHandler(eventBase, NetworkSocket::fromFd(fd)),
1703 eventBase_(eventBase),
1704 loopInvocations_(0),
1705 maxLoopInvocations_(0),
1706 eventInvocations_(0),
1707 maxEventInvocations_(0) {}
1708
reset(uint32_t maxLoopInvocations,uint32_t maxEventInvocations)1709 void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1710 loopInvocations_ = 0;
1711 maxLoopInvocations_ = maxLoopInvocations;
1712 eventInvocations_ = 0;
1713 maxEventInvocations_ = maxEventInvocations;
1714
1715 cancelLoopCallback();
1716 unregisterHandler();
1717 }
1718
handlerReady(uint16_t)1719 void handlerReady(uint16_t /* events */) noexcept override {
1720 // We didn't register with PERSIST, so we will have been automatically
1721 // unregistered already.
1722 ASSERT_FALSE(isHandlerRegistered());
1723
1724 ++eventInvocations_;
1725 if (eventInvocations_ >= maxEventInvocations_) {
1726 return;
1727 }
1728
1729 eventBase_->runInLoop(this);
1730 }
runLoopCallback()1731 void runLoopCallback() noexcept override {
1732 ++loopInvocations_;
1733 if (loopInvocations_ >= maxLoopInvocations_) {
1734 return;
1735 }
1736
1737 registerHandler(READ);
1738 }
1739
getLoopInvocations()1740 uint32_t getLoopInvocations() const { return loopInvocations_; }
getEventInvocations()1741 uint32_t getEventInvocations() const { return eventInvocations_; }
1742
1743 private:
1744 EventBase* eventBase_;
1745 uint32_t loopInvocations_;
1746 uint32_t maxLoopInvocations_;
1747 uint32_t eventInvocations_;
1748 uint32_t maxEventInvocations_;
1749 };
1750 } // namespace
1751
1752 /**
1753 * Test that EventBase::loop() correctly detects when there are no more events
1754 * left to run.
1755 *
1756 * This uses a single callback, which alternates registering itself as a loop
1757 * callback versus a EventHandler callback. This exercises a regression where
1758 * EventBase::loop() incorrectly exited if there were no more fd handlers
1759 * registered, but a loop callback installed a new fd handler.
1760 */
TYPED_TEST_P(EventBaseTest,LoopTermination)1761 TYPED_TEST_P(EventBaseTest, LoopTermination) {
1762 auto eventBasePtr = getEventBase<TypeParam>();
1763 SKIP_IF(!eventBasePtr) << "Backend not available";
1764 folly::EventBase& eventBase = *eventBasePtr;
1765
1766 // Open a pipe and close the write end,
1767 // so the read endpoint will be readable
1768 int pipeFds[2];
1769 int rc = pipe(pipeFds);
1770 ASSERT_EQ(rc, 0);
1771 close(pipeFds[1]);
1772 TerminateTestCallback callback(&eventBase, pipeFds[0]);
1773
1774 // Test once where the callback will exit after a loop callback
1775 callback.reset(10, 100);
1776 eventBase.runInLoop(&callback);
1777 eventBase.loop();
1778 ASSERT_EQ(callback.getLoopInvocations(), 10);
1779 ASSERT_EQ(callback.getEventInvocations(), 9);
1780
1781 // Test once where the callback will exit after an fd event callback
1782 callback.reset(100, 7);
1783 eventBase.runInLoop(&callback);
1784 eventBase.loop();
1785 ASSERT_EQ(callback.getLoopInvocations(), 7);
1786 ASSERT_EQ(callback.getEventInvocations(), 7);
1787
1788 close(pipeFds[0]);
1789 }
1790
TYPED_TEST_P(EventBaseTest,CallbackOrderTest)1791 TYPED_TEST_P(EventBaseTest, CallbackOrderTest) {
1792 size_t num = 0;
1793 auto eventBasePtr = getEventBase<TypeParam>();
1794 SKIP_IF(!eventBasePtr) << "Backend not available";
1795 folly::EventBase& evb = *eventBasePtr;
1796
1797 evb.runInEventBaseThread([&]() {
1798 std::thread t([&]() {
1799 evb.runInEventBaseThread([&]() {
1800 num++;
1801 EXPECT_EQ(num, 2);
1802 });
1803 });
1804 t.join();
1805
1806 // this callback will run first
1807 // even if it is scheduled after the first one
1808 evb.runInEventBaseThread([&]() {
1809 num++;
1810 EXPECT_EQ(num, 1);
1811 });
1812 });
1813
1814 evb.loop();
1815 EXPECT_EQ(num, 2);
1816 }
1817
TYPED_TEST_P(EventBaseTest,AlwaysEnqueueCallbackOrderTest)1818 TYPED_TEST_P(EventBaseTest, AlwaysEnqueueCallbackOrderTest) {
1819 size_t num = 0;
1820 auto eventBasePtr = getEventBase<TypeParam>();
1821 SKIP_IF(!eventBasePtr) << "Backend not available";
1822 folly::EventBase& evb = *eventBasePtr;
1823
1824 evb.runInEventBaseThread([&]() {
1825 std::thread t([&]() {
1826 evb.runInEventBaseThreadAlwaysEnqueue([&]() {
1827 num++;
1828 EXPECT_EQ(num, 1);
1829 });
1830 });
1831 t.join();
1832
1833 // this callback will run second
1834 // since it was enqueued after the first one
1835 evb.runInEventBaseThreadAlwaysEnqueue([&]() {
1836 num++;
1837 EXPECT_EQ(num, 2);
1838 });
1839 });
1840
1841 evb.loop();
1842 EXPECT_EQ(num, 2);
1843 }
1844
TYPED_TEST_P(EventBaseTest1,InternalExternalCallbackOrderTest)1845 TYPED_TEST_P(EventBaseTest1, InternalExternalCallbackOrderTest) {
1846 size_t counter = 0;
1847
1848 auto eventBasePtr = getEventBase<TypeParam>();
1849 SKIP_IF(!eventBasePtr) << "Backend not available";
1850 folly::EventBase& evb = *eventBasePtr;
1851
1852 std::vector<size_t> calls;
1853
1854 folly::Function<void(size_t)> runInLoopRecursive = [&](size_t left) {
1855 evb.runInLoop([&, left]() mutable {
1856 calls.push_back(counter++);
1857 if (--left == 0) {
1858 evb.terminateLoopSoon();
1859 return;
1860 }
1861 runInLoopRecursive(left);
1862 });
1863 };
1864
1865 evb.runInEventBaseThread([&] { runInLoopRecursive(5); });
1866 for (size_t i = 0; i < 49; ++i) {
1867 evb.runInEventBaseThread([&] { ++counter; });
1868 }
1869 evb.loopForever();
1870
1871 EXPECT_EQ(std::vector<size_t>({9, 20, 31, 42, 53}), calls);
1872 }
1873
1874 ///////////////////////////////////////////////////////////////////////////
1875 // Tests for latency calculations
1876 ///////////////////////////////////////////////////////////////////////////
1877
1878 namespace {
1879 class IdleTimeTimeoutSeries : public AsyncTimeout {
1880 public:
IdleTimeTimeoutSeries(EventBase * base,std::deque<std::size_t> & timeout)1881 explicit IdleTimeTimeoutSeries(
1882 EventBase* base, std::deque<std::size_t>& timeout)
1883 : AsyncTimeout(base), timeouts_(0), timeout_(timeout) {
1884 scheduleTimeout(1);
1885 }
1886
~IdleTimeTimeoutSeries()1887 ~IdleTimeTimeoutSeries() override {}
1888
timeoutExpired()1889 void timeoutExpired() noexcept override {
1890 ++timeouts_;
1891
1892 if (timeout_.empty()) {
1893 cancelTimeout();
1894 } else {
1895 std::size_t sleepTime = timeout_.front();
1896 timeout_.pop_front();
1897 if (sleepTime) {
1898 usleep(sleepTime);
1899 }
1900 scheduleTimeout(1);
1901 }
1902 }
1903
getTimeouts()1904 int getTimeouts() const { return timeouts_; }
1905
1906 private:
1907 int timeouts_;
1908 std::deque<std::size_t>& timeout_;
1909 };
1910 } // namespace
1911
1912 /**
1913 * Verify that idle time is correctly accounted for when decaying our loop
1914 * time.
1915 *
1916 * This works by creating a high loop time (via usleep), expecting a latency
1917 * callback with known value, and then scheduling a timeout for later. This
1918 * later timeout is far enough in the future that the idle time should have
1919 * caused the loop time to decay.
1920 */
TYPED_TEST_P(EventBaseTest,IdleTime)1921 TYPED_TEST_P(EventBaseTest, IdleTime) {
1922 auto eventBasePtr = getEventBase<TypeParam>();
1923 SKIP_IF(!eventBasePtr) << "Backend not available";
1924 folly::EventBase& eventBase = *eventBasePtr;
1925 std::deque<std::size_t> timeouts0(4, 8080);
1926 timeouts0.push_front(8000);
1927 timeouts0.push_back(14000);
1928 IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1929 std::deque<std::size_t> timeouts(20, 20);
1930 std::unique_ptr<IdleTimeTimeoutSeries> tos;
1931 bool hostOverloaded = false;
1932
1933 // Loop once before starting the main test. This will run NotificationQueue
1934 // callbacks that get automatically installed when the EventBase is first
1935 // created. We want to make sure they don't interfere with the timing
1936 // operations below.
1937 eventBase.loopOnce(EVLOOP_NONBLOCK);
1938 eventBase.setLoadAvgMsec(std::chrono::milliseconds(1000));
1939 eventBase.resetLoadAvg(5900.0);
1940
1941 int latencyCallbacks = 0;
1942 eventBase.setMaxLatency(std::chrono::microseconds(6000), [&]() {
1943 ++latencyCallbacks;
1944 if (latencyCallbacks != 1 || tos0.getTimeouts() < 6) {
1945 // This could only happen if the host this test is running
1946 // on is heavily loaded.
1947 hostOverloaded = true;
1948 return;
1949 }
1950
1951 EXPECT_EQ(6, tos0.getTimeouts());
1952 EXPECT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1953 EXPECT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1954 tos = std::make_unique<IdleTimeTimeoutSeries>(&eventBase, timeouts);
1955 });
1956
1957 // Kick things off with an "immediate" timeout
1958 tos0.scheduleTimeout(1);
1959
1960 eventBase.loop();
1961
1962 if (hostOverloaded) {
1963 SKIP() << "host too heavily loaded to execute test";
1964 }
1965
1966 ASSERT_EQ(1, latencyCallbacks);
1967 ASSERT_EQ(7, tos0.getTimeouts());
1968 ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1969 ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1970 ASSERT_TRUE(!!tos);
1971 ASSERT_EQ(21, tos->getTimeouts());
1972 }
1973
TYPED_TEST_P(EventBaseTest,MaxLatencyUndamped)1974 TYPED_TEST_P(EventBaseTest, MaxLatencyUndamped) {
1975 auto eventBasePtr = getEventBase<TypeParam>();
1976 folly::EventBase& eb = *eventBasePtr;
1977 int maxDurationViolations = 0;
1978 eb.setMaxLatency(
1979 std::chrono::milliseconds{1}, [&]() { maxDurationViolations++; }, false);
1980 eb.runInLoop(
1981 [&]() {
1982 /* sleep override */ std::this_thread::sleep_for(
1983 std::chrono::microseconds{1001});
1984 eb.terminateLoopSoon();
1985 },
1986 true);
1987 eb.loop();
1988 ASSERT_EQ(maxDurationViolations, 1);
1989 }
1990
TYPED_TEST_P(EventBaseTest,UnsetMaxLatencyUndamped)1991 TYPED_TEST_P(EventBaseTest, UnsetMaxLatencyUndamped) {
1992 auto eventBasePtr = getEventBase<TypeParam>();
1993 folly::EventBase& eb = *eventBasePtr;
1994 int maxDurationViolations = 0;
1995 eb.setMaxLatency(
1996 std::chrono::milliseconds{1}, [&]() { maxDurationViolations++; }, false);
1997 // Immediately unset it and make sure the counter isn't incremented. If the
1998 // function gets called, this will raise an std::bad_function_call.
1999 std::function<void()> bad_func = nullptr;
2000 eb.setMaxLatency(std::chrono::milliseconds{0}, bad_func, false);
2001 eb.runInLoop(
2002 [&]() {
2003 /* sleep override */ std::this_thread::sleep_for(
2004 std::chrono::microseconds{1001});
2005 eb.terminateLoopSoon();
2006 },
2007 true);
2008 eb.loop();
2009 ASSERT_EQ(maxDurationViolations, 0);
2010 }
2011
2012 /**
2013 * Test that thisLoop functionality works with terminateLoopSoon
2014 */
TYPED_TEST_P(EventBaseTest,ThisLoop)2015 TYPED_TEST_P(EventBaseTest, ThisLoop) {
2016 bool runInLoop = false;
2017 bool runThisLoop = false;
2018
2019 {
2020 auto eventBasePtr = getEventBase<TypeParam>();
2021 SKIP_IF(!eventBasePtr) << "Backend not available";
2022 folly::EventBase& eb = *eventBasePtr;
2023 eb.runInLoop(
2024 [&]() {
2025 eb.terminateLoopSoon();
2026 eb.runInLoop([&]() { runInLoop = true; });
2027 eb.runInLoop([&]() { runThisLoop = true; }, true);
2028 },
2029 true);
2030 eb.loopForever();
2031
2032 // Should not work
2033 ASSERT_FALSE(runInLoop);
2034 // Should work with thisLoop
2035 ASSERT_TRUE(runThisLoop);
2036 }
2037
2038 // Pending loop callbacks will be run when the EventBase is destroyed.
2039 ASSERT_TRUE(runInLoop);
2040 }
2041
TYPED_TEST_P(EventBaseTest,EventBaseThreadLoop)2042 TYPED_TEST_P(EventBaseTest, EventBaseThreadLoop) {
2043 auto eventBasePtr = getEventBase<TypeParam>();
2044 SKIP_IF(!eventBasePtr) << "Backend not available";
2045 folly::EventBase& base = *eventBasePtr;
2046 bool ran = false;
2047 base.runInEventBaseThread([&]() { ran = true; });
2048 base.loop();
2049
2050 ASSERT_TRUE(ran);
2051 }
2052
TYPED_TEST_P(EventBaseTest,EventBaseThreadName)2053 TYPED_TEST_P(EventBaseTest, EventBaseThreadName) {
2054 auto eventBasePtr = getEventBase<TypeParam>();
2055 SKIP_IF(!eventBasePtr) << "Backend not available";
2056 folly::EventBase& base = *eventBasePtr;
2057 base.setName("foo");
2058 base.loop();
2059
2060 ASSERT_EQ("foo", *getCurrentThreadName());
2061 }
2062
TYPED_TEST_P(EventBaseTest,RunBeforeLoop)2063 TYPED_TEST_P(EventBaseTest, RunBeforeLoop) {
2064 auto eventBasePtr = getEventBase<TypeParam>();
2065 SKIP_IF(!eventBasePtr) << "Backend not available";
2066 folly::EventBase& base = *eventBasePtr;
2067 CountedLoopCallback cb(&base, 1, [&]() { base.terminateLoopSoon(); });
2068 base.runBeforeLoop(&cb);
2069 base.loopForever();
2070 ASSERT_EQ(cb.getCount(), 0);
2071 }
2072
TYPED_TEST_P(EventBaseTest,RunBeforeLoopWait)2073 TYPED_TEST_P(EventBaseTest, RunBeforeLoopWait) {
2074 auto eventBasePtr = getEventBase<TypeParam>();
2075 SKIP_IF(!eventBasePtr) << "Backend not available";
2076 folly::EventBase& base = *eventBasePtr;
2077 CountedLoopCallback cb(&base, 1);
2078 base.tryRunAfterDelay([&]() { base.terminateLoopSoon(); }, 500);
2079 base.runBeforeLoop(&cb);
2080 base.loopForever();
2081
2082 // Check that we only ran once, and did not loop multiple times.
2083 ASSERT_EQ(cb.getCount(), 0);
2084 }
2085
2086 namespace {
2087 class PipeHandler : public EventHandler {
2088 public:
PipeHandler(EventBase * eventBase,int fd)2089 PipeHandler(EventBase* eventBase, int fd)
2090 : EventHandler(eventBase, NetworkSocket::fromFd(fd)) {}
2091
handlerReady(uint16_t)2092 void handlerReady(uint16_t /* events */) noexcept override { abort(); }
2093 };
2094 } // namespace
2095
TYPED_TEST_P(EventBaseTest,StopBeforeLoop)2096 TYPED_TEST_P(EventBaseTest, StopBeforeLoop) {
2097 auto eventBasePtr = getEventBase<TypeParam>();
2098 SKIP_IF(!eventBasePtr) << "Backend not available";
2099 folly::EventBase& evb = *eventBasePtr;
2100
2101 // Give the evb something to do.
2102 int p[2];
2103 ASSERT_EQ(0, pipe(p));
2104 PipeHandler handler(&evb, p[0]);
2105 handler.registerHandler(EventHandler::READ);
2106
2107 // It's definitely not running yet
2108 evb.terminateLoopSoon();
2109
2110 // let it run, it should exit quickly.
2111 std::thread t([&] { evb.loop(); });
2112 t.join();
2113
2114 handler.unregisterHandler();
2115 close(p[0]);
2116 close(p[1]);
2117
2118 SUCCEED();
2119 }
2120
TYPED_TEST_P(EventBaseTest,RunCallbacksOnDestruction)2121 TYPED_TEST_P(EventBaseTest, RunCallbacksOnDestruction) {
2122 bool ran = false;
2123
2124 {
2125 auto eventBasePtr = getEventBase<TypeParam>();
2126 SKIP_IF(!eventBasePtr) << "Backend not available";
2127 eventBasePtr->runInEventBaseThread([&]() { ran = true; });
2128 }
2129
2130 ASSERT_TRUE(ran);
2131 }
2132
TYPED_TEST_P(EventBaseTest,LoopKeepAlive)2133 TYPED_TEST_P(EventBaseTest, LoopKeepAlive) {
2134 auto evbPtr = getEventBase<TypeParam>();
2135 SKIP_IF(!evbPtr) << "Backend not available";
2136
2137 bool done = false;
2138 std::thread t([&, loopKeepAlive = getKeepAliveToken(*evbPtr)]() mutable {
2139 /* sleep override */ std::this_thread::sleep_for(
2140 std::chrono::milliseconds(100));
2141 evbPtr->runInEventBaseThread(
2142 [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
2143 });
2144
2145 evbPtr->loop();
2146
2147 ASSERT_TRUE(done);
2148
2149 t.join();
2150 }
2151
TYPED_TEST_P(EventBaseTest,LoopKeepAliveInLoop)2152 TYPED_TEST_P(EventBaseTest, LoopKeepAliveInLoop) {
2153 auto evbPtr = getEventBase<TypeParam>();
2154 SKIP_IF(!evbPtr) << "Backend not available";
2155
2156 bool done = false;
2157 std::thread t;
2158
2159 evbPtr->runInEventBaseThread([&] {
2160 t = std::thread([&, loopKeepAlive = getKeepAliveToken(*evbPtr)]() mutable {
2161 /* sleep override */ std::this_thread::sleep_for(
2162 std::chrono::milliseconds(100));
2163 evbPtr->runInEventBaseThread(
2164 [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
2165 });
2166 });
2167
2168 evbPtr->loop();
2169
2170 ASSERT_TRUE(done);
2171
2172 t.join();
2173 }
2174
TYPED_TEST_P(EventBaseTest,LoopKeepAliveWithLoopForever)2175 TYPED_TEST_P(EventBaseTest, LoopKeepAliveWithLoopForever) {
2176 auto evbPtr = getEventBase<TypeParam>();
2177 SKIP_IF(!evbPtr) << "Backend not available";
2178
2179 bool done = false;
2180
2181 std::thread evThread([&] {
2182 evbPtr->loopForever();
2183 evbPtr.reset();
2184 done = true;
2185 });
2186
2187 {
2188 auto* ev = evbPtr.get();
2189 Executor::KeepAlive<EventBase> keepAlive;
2190 ev->runInEventBaseThreadAndWait(
2191 [&ev, &keepAlive] { keepAlive = getKeepAliveToken(ev); });
2192 ASSERT_FALSE(done) << "Loop finished before we asked it to";
2193 ev->terminateLoopSoon();
2194 /* sleep override */
2195 std::this_thread::sleep_for(std::chrono::milliseconds(30));
2196 ASSERT_FALSE(done) << "Loop terminated early";
2197 ev->runInEventBaseThread([keepAlive = std::move(keepAlive)] {});
2198 }
2199
2200 evThread.join();
2201 ASSERT_TRUE(done);
2202 }
2203
TYPED_TEST_P(EventBaseTest,LoopKeepAliveShutdown)2204 TYPED_TEST_P(EventBaseTest, LoopKeepAliveShutdown) {
2205 auto evbPtr = getEventBase<TypeParam>();
2206 SKIP_IF(!evbPtr) << "Backend not available";
2207
2208 bool done = false;
2209
2210 std::thread t([&done,
2211 loopKeepAlive = getKeepAliveToken(evbPtr.get()),
2212 evbPtrRaw = evbPtr.get()]() mutable {
2213 /* sleep override */ std::this_thread::sleep_for(
2214 std::chrono::milliseconds(100));
2215 evbPtrRaw->runInEventBaseThread(
2216 [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
2217 });
2218
2219 evbPtr.reset();
2220
2221 ASSERT_TRUE(done);
2222
2223 t.join();
2224 }
2225
TYPED_TEST_P(EventBaseTest,LoopKeepAliveAtomic)2226 TYPED_TEST_P(EventBaseTest, LoopKeepAliveAtomic) {
2227 auto evbPtr = getEventBase<TypeParam>();
2228 SKIP_IF(!evbPtr) << "Backend not available";
2229
2230 static constexpr size_t kNumThreads = 100;
2231 static constexpr size_t kNumTasks = 100;
2232
2233 std::vector<std::thread> ts;
2234 std::vector<std::unique_ptr<Baton<>>> batons;
2235 size_t done{0};
2236
2237 for (size_t i = 0; i < kNumThreads; ++i) {
2238 batons.emplace_back(std::make_unique<Baton<>>());
2239 }
2240
2241 for (size_t i = 0; i < kNumThreads; ++i) {
2242 ts.emplace_back([evbPtrRaw = evbPtr.get(),
2243 batonPtr = batons[i].get(),
2244 &done] {
2245 std::vector<Executor::KeepAlive<EventBase>> keepAlives;
2246 for (size_t j = 0; j < kNumTasks; ++j) {
2247 keepAlives.emplace_back(getKeepAliveToken(evbPtrRaw));
2248 }
2249
2250 batonPtr->post();
2251
2252 /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
2253
2254 for (auto& keepAlive : keepAlives) {
2255 evbPtrRaw->runInEventBaseThread(
2256 [&done, keepAlive = std::move(keepAlive)]() { ++done; });
2257 }
2258 });
2259 }
2260
2261 for (auto& baton : batons) {
2262 baton->wait();
2263 }
2264
2265 evbPtr.reset();
2266
2267 EXPECT_EQ(kNumThreads * kNumTasks, done);
2268
2269 for (auto& t : ts) {
2270 t.join();
2271 }
2272 }
2273
TYPED_TEST_P(EventBaseTest,LoopKeepAliveCast)2274 TYPED_TEST_P(EventBaseTest, LoopKeepAliveCast) {
2275 auto evbPtr = getEventBase<TypeParam>();
2276 SKIP_IF(!evbPtr) << "Backend not available";
2277 Executor::KeepAlive<> keepAlive = getKeepAliveToken(*evbPtr);
2278 }
2279
TYPED_TEST_P(EventBaseTest1,DrivableExecutorTest)2280 TYPED_TEST_P(EventBaseTest1, DrivableExecutorTest) {
2281 folly::Promise<bool> p;
2282 auto f = p.getFuture();
2283 auto eventBasePtr = getEventBase<TypeParam>();
2284 SKIP_IF(!eventBasePtr) << "Backend not available";
2285 folly::EventBase& base = *eventBasePtr;
2286 bool finished = false;
2287
2288 Baton baton;
2289
2290 std::thread t([&] {
2291 baton.wait();
2292 /* sleep override */
2293 std::this_thread::sleep_for(std::chrono::milliseconds(500));
2294 finished = true;
2295 base.runInEventBaseThread([&]() { p.setValue(true); });
2296 });
2297
2298 // Ensure drive does not busy wait
2299 base.drive(); // TODO: fix notification queue init() extra wakeup
2300 baton.post();
2301 base.drive();
2302 EXPECT_TRUE(finished);
2303
2304 folly::Promise<bool> p2;
2305 auto f2 = p2.getFuture();
2306 // Ensure waitVia gets woken up properly, even from
2307 // a separate thread.
2308 base.runAfterDelay([&]() { p2.setValue(true); }, 10);
2309 f2.waitVia(&base);
2310 EXPECT_TRUE(f2.isReady());
2311
2312 t.join();
2313 }
2314
TYPED_TEST_P(EventBaseTest1,IOExecutorTest)2315 TYPED_TEST_P(EventBaseTest1, IOExecutorTest) {
2316 auto evbPtr = getEventBase<TypeParam>();
2317 SKIP_IF(!evbPtr) << "Backend not available";
2318
2319 // Ensure EventBase manages itself as an IOExecutor.
2320 EXPECT_EQ(evbPtr->getEventBase(), evbPtr.get());
2321 }
2322
TYPED_TEST_P(EventBaseTest1,RequestContextTest)2323 TYPED_TEST_P(EventBaseTest1, RequestContextTest) {
2324 auto evbPtr = getEventBase<TypeParam>();
2325 SKIP_IF(!evbPtr) << "Backend not available";
2326 auto defaultCtx = RequestContext::get();
2327 std::weak_ptr<RequestContext> rctx_weak_ptr;
2328
2329 {
2330 RequestContextScopeGuard rctx;
2331 rctx_weak_ptr = RequestContext::saveContext();
2332 auto context = RequestContext::get();
2333 EXPECT_NE(defaultCtx, context);
2334 evbPtr->runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
2335 evbPtr->loop();
2336 }
2337
2338 // Ensure that RequestContext created for the scope has been released and
2339 // deleted.
2340 EXPECT_EQ(rctx_weak_ptr.expired(), true);
2341
2342 EXPECT_EQ(defaultCtx, RequestContext::get());
2343 }
2344
TYPED_TEST_P(EventBaseTest1,CancelLoopCallbackRequestContextTest)2345 TYPED_TEST_P(EventBaseTest1, CancelLoopCallbackRequestContextTest) {
2346 auto evbPtr = getEventBase<TypeParam>();
2347 SKIP_IF(!evbPtr) << "Backend not available";
2348 CountedLoopCallback c(evbPtr.get(), 1);
2349
2350 auto defaultCtx = RequestContext::get();
2351 EXPECT_EQ(defaultCtx, RequestContext::get());
2352 std::weak_ptr<RequestContext> rctx_weak_ptr;
2353
2354 {
2355 RequestContextScopeGuard rctx;
2356 rctx_weak_ptr = RequestContext::saveContext();
2357 auto context = RequestContext::get();
2358 EXPECT_NE(defaultCtx, context);
2359 evbPtr->runInLoop(&c);
2360 c.cancelLoopCallback();
2361 }
2362
2363 // Ensure that RequestContext created for the scope has been released and
2364 // deleted.
2365 EXPECT_EQ(rctx_weak_ptr.expired(), true);
2366
2367 EXPECT_EQ(defaultCtx, RequestContext::get());
2368 }
2369
TYPED_TEST_P(EventBaseTest1,TestStarvation)2370 TYPED_TEST_P(EventBaseTest1, TestStarvation) {
2371 auto evbPtr = getEventBase<TypeParam>();
2372 SKIP_IF(!evbPtr) << "Backend not available";
2373
2374 std::promise<void> stopRequested;
2375 std::promise<void> stopScheduled;
2376 bool stopping{false};
2377 std::thread t{[&] {
2378 stopRequested.get_future().get();
2379 evbPtr->add([&]() { stopping = true; });
2380 stopScheduled.set_value();
2381 }};
2382
2383 size_t num{0};
2384 std::function<void()> fn;
2385 fn = [&]() {
2386 if (stopping || num >= 2000) {
2387 return;
2388 }
2389
2390 if (++num == 1000) {
2391 stopRequested.set_value();
2392 stopScheduled.get_future().get();
2393 }
2394
2395 evbPtr->add(fn);
2396 };
2397
2398 evbPtr->add(fn);
2399 evbPtr->loop();
2400
2401 EXPECT_EQ(1000, num);
2402 t.join();
2403 }
2404
TYPED_TEST_P(EventBaseTest1,RunOnDestructionBasic)2405 TYPED_TEST_P(EventBaseTest1, RunOnDestructionBasic) {
2406 bool ranOnDestruction = false;
2407 {
2408 auto evbPtr = getEventBase<TypeParam>();
2409 SKIP_IF(!evbPtr) << "Backend not available";
2410 evbPtr->runOnDestruction([&ranOnDestruction] { ranOnDestruction = true; });
2411 }
2412 EXPECT_TRUE(ranOnDestruction);
2413 }
2414
TYPED_TEST_P(EventBaseTest1,RunOnDestructionCancelled)2415 TYPED_TEST_P(EventBaseTest1, RunOnDestructionCancelled) {
2416 struct Callback : EventBase::OnDestructionCallback {
2417 bool ranOnDestruction{false};
2418
2419 void onEventBaseDestruction() noexcept final { ranOnDestruction = true; }
2420 };
2421
2422 auto cb = std::make_unique<Callback>();
2423 {
2424 auto evbPtr = getEventBase<TypeParam>();
2425 SKIP_IF(!evbPtr) << "Backend not available";
2426 evbPtr->runOnDestruction(*cb);
2427 EXPECT_TRUE(cb->cancel());
2428 }
2429 EXPECT_FALSE(cb->ranOnDestruction);
2430 EXPECT_FALSE(cb->cancel());
2431 }
2432
TYPED_TEST_P(EventBaseTest1,RunOnDestructionAfterHandleDestroyed)2433 TYPED_TEST_P(EventBaseTest1, RunOnDestructionAfterHandleDestroyed) {
2434 auto evbPtr = getEventBase<TypeParam>();
2435 SKIP_IF(!evbPtr) << "Backend not available";
2436 {
2437 bool ranOnDestruction = false;
2438 auto* cb = new EventBase::FunctionOnDestructionCallback(
2439 [&ranOnDestruction] { ranOnDestruction = true; });
2440 evbPtr->runOnDestruction(*cb);
2441 EXPECT_TRUE(cb->cancel());
2442 delete cb;
2443 }
2444 }
2445
TYPED_TEST_P(EventBaseTest1,RunOnDestructionAddCallbackWithinCallback)2446 TYPED_TEST_P(EventBaseTest1, RunOnDestructionAddCallbackWithinCallback) {
2447 size_t callbacksCalled = 0;
2448 {
2449 auto evbPtr = getEventBase<TypeParam>();
2450 SKIP_IF(!evbPtr) << "Backend not available";
2451 evbPtr->runOnDestruction([&] {
2452 ++callbacksCalled;
2453 evbPtr->runOnDestruction([&] { ++callbacksCalled; });
2454 });
2455 }
2456 EXPECT_EQ(2, callbacksCalled);
2457 }
2458
TYPED_TEST_P(EventBaseTest1,EventBaseExecutionObserver)2459 TYPED_TEST_P(EventBaseTest1, EventBaseExecutionObserver) {
2460 auto eventBasePtr = getEventBase<TypeParam>();
2461 SKIP_IF(!eventBasePtr) << "Backend not available";
2462 folly::EventBase& base = *eventBasePtr;
2463 bool ranBeforeLoop = false;
2464 bool ran = false;
2465 TestObserver observer;
2466 base.setExecutionObserver(&observer);
2467
2468 CountedLoopCallback cb(&base, 1, [&]() { ranBeforeLoop = true; });
2469 base.runBeforeLoop(&cb);
2470
2471 base.runInEventBaseThread(
2472 [&]() { base.runInEventBaseThread([&]() { ran = true; }); });
2473 base.loop();
2474
2475 ASSERT_TRUE(ranBeforeLoop);
2476 ASSERT_TRUE(ran);
2477 ASSERT_EQ(0, observer.nestedStart_);
2478 ASSERT_EQ(4, observer.numStartingCalled_);
2479 ASSERT_EQ(4, observer.numStoppedCalled_);
2480 }
2481 } // namespace test
2482 } // namespace folly
2483