1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <folly/CPortability.h>
18 #include <folly/DefaultKeepAliveExecutor.h>
19 #include <folly/executors/CPUThreadPoolExecutor.h>
20 #include <folly/executors/ThreadPoolExecutor.h>
21 #include <folly/lang/Keep.h>
22 #include <folly/synchronization/Latch.h>
23
24 #include <atomic>
25 #include <memory>
26 #include <thread>
27
28 #include <boost/thread.hpp>
29
30 #include <folly/Exception.h>
31 #include <folly/VirtualExecutor.h>
32 #include <folly/executors/CPUThreadPoolExecutor.h>
33 #include <folly/executors/EDFThreadPoolExecutor.h>
34 #include <folly/executors/FutureExecutor.h>
35 #include <folly/executors/IOThreadPoolExecutor.h>
36 #include <folly/executors/task_queue/LifoSemMPMCQueue.h>
37 #include <folly/executors/task_queue/UnboundedBlockingQueue.h>
38 #include <folly/executors/thread_factory/InitThreadFactory.h>
39 #include <folly/executors/thread_factory/PriorityThreadFactory.h>
40 #include <folly/portability/GTest.h>
41 #include <folly/portability/PThread.h>
42 #include <folly/synchronization/detail/Spin.h>
43
44 using namespace folly;
45 using namespace std::chrono;
46
47 // Like ASSERT_NEAR, for chrono duration types
48 #define ASSERT_NEAR_NS(a, b, c) \
49 do { \
50 ASSERT_NEAR( \
51 nanoseconds(a).count(), \
52 nanoseconds(b).count(), \
53 nanoseconds(c).count()); \
54 } while (0)
55
burnMs(uint64_t ms)56 static Func burnMs(uint64_t ms) {
57 return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
58 }
59
60 #ifdef __linux__
thread_clock_now()61 static std::chrono::nanoseconds thread_clock_now() {
62 timespec tp;
63 clockid_t clockid;
64 CHECK(!pthread_getcpuclockid(pthread_self(), &clockid));
65 CHECK(!clock_gettime(clockid, &tp));
66 return std::chrono::nanoseconds(tp.tv_nsec) + std::chrono::seconds(tp.tv_sec);
67 }
68
69 // Loop and burn cpu cycles
burnThreadCpu(milliseconds ms)70 static void burnThreadCpu(milliseconds ms) {
71 auto expires = thread_clock_now() + ms;
72 while (thread_clock_now() < expires) {
73 }
74 }
75
76 // Loop without using much cpu time
idleLoopFor(milliseconds ms)77 static void idleLoopFor(milliseconds ms) {
78 using clock = high_resolution_clock;
79 auto expires = clock::now() + ms;
80 while (clock::now() < expires) {
81 /* sleep override */ std::this_thread::sleep_for(100ms);
82 }
83 }
84 #endif
85
86 static WorkerProvider* kWorkerProviderGlobal = nullptr;
87
88 namespace folly {
89
90 #if FOLLY_HAVE_WEAK_SYMBOLS
make_queue_observer_factory(const std::string &,size_t,WorkerProvider * workerProvider)91 FOLLY_KEEP std::unique_ptr<QueueObserverFactory> make_queue_observer_factory(
92 const std::string&, size_t, WorkerProvider* workerProvider) {
93 kWorkerProviderGlobal = workerProvider;
94 return {};
95 }
96 #endif
97
98 } // namespace folly
99
100 template <class TPE>
basic()101 static void basic() {
102 // Create and destroy
103 TPE tpe(10);
104 }
105
TEST(ThreadPoolExecutorTest,CPUBasic)106 TEST(ThreadPoolExecutorTest, CPUBasic) {
107 basic<CPUThreadPoolExecutor>();
108 }
109
TEST(ThreadPoolExecutorTest,IOBasic)110 TEST(ThreadPoolExecutorTest, IOBasic) {
111 basic<IOThreadPoolExecutor>();
112 }
113
TEST(ThreadPoolExecutorTest,EDFBasic)114 TEST(ThreadPoolExecutorTest, EDFBasic) {
115 basic<EDFThreadPoolExecutor>();
116 }
117
118 template <class TPE>
resize()119 static void resize() {
120 TPE tpe(100);
121 EXPECT_EQ(100, tpe.numThreads());
122 tpe.setNumThreads(50);
123 EXPECT_EQ(50, tpe.numThreads());
124 tpe.setNumThreads(150);
125 EXPECT_EQ(150, tpe.numThreads());
126 }
127
TEST(ThreadPoolExecutorTest,CPUResize)128 TEST(ThreadPoolExecutorTest, CPUResize) {
129 resize<CPUThreadPoolExecutor>();
130 }
131
TEST(ThreadPoolExecutorTest,IOResize)132 TEST(ThreadPoolExecutorTest, IOResize) {
133 resize<IOThreadPoolExecutor>();
134 }
135
TEST(ThreadPoolExecutorTest,EDFResize)136 TEST(ThreadPoolExecutorTest, EDFResize) {
137 resize<EDFThreadPoolExecutor>();
138 }
139
140 template <class TPE>
stop()141 static void stop() {
142 TPE tpe(1);
143 std::atomic<int> completed(0);
144 auto f = [&]() {
145 burnMs(10)();
146 completed++;
147 };
148 for (int i = 0; i < 1000; i++) {
149 tpe.add(f);
150 }
151 tpe.stop();
152 EXPECT_GT(1000, completed);
153 }
154
155 // IOThreadPoolExecutor's stop() behaves like join(). Outstanding tasks belong
156 // to the event base, will be executed upon its destruction, and cannot be
157 // taken back.
158 template <>
stop()159 void stop<IOThreadPoolExecutor>() {
160 IOThreadPoolExecutor tpe(1);
161 std::atomic<int> completed(0);
162 auto f = [&]() {
163 burnMs(10)();
164 completed++;
165 };
166 for (int i = 0; i < 10; i++) {
167 tpe.add(f);
168 }
169 tpe.stop();
170 EXPECT_EQ(10, completed);
171 }
172
TEST(ThreadPoolExecutorTest,CPUStop)173 TEST(ThreadPoolExecutorTest, CPUStop) {
174 stop<CPUThreadPoolExecutor>();
175 }
176
TEST(ThreadPoolExecutorTest,IOStop)177 TEST(ThreadPoolExecutorTest, IOStop) {
178 stop<IOThreadPoolExecutor>();
179 }
180
TEST(ThreadPoolExecutorTest,EDFStop)181 TEST(ThreadPoolExecutorTest, EDFStop) {
182 stop<EDFThreadPoolExecutor>();
183 }
184
185 template <class TPE>
join()186 static void join() {
187 TPE tpe(10);
188 std::atomic<int> completed(0);
189 auto f = [&]() {
190 burnMs(1)();
191 completed++;
192 };
193 for (int i = 0; i < 1000; i++) {
194 tpe.add(f);
195 }
196 tpe.join();
197 EXPECT_EQ(1000, completed);
198 }
199
TEST(ThreadPoolExecutorTest,CPUJoin)200 TEST(ThreadPoolExecutorTest, CPUJoin) {
201 join<CPUThreadPoolExecutor>();
202 }
203
TEST(ThreadPoolExecutorTest,IOJoin)204 TEST(ThreadPoolExecutorTest, IOJoin) {
205 join<IOThreadPoolExecutor>();
206 }
207
TEST(ThreadPoolExecutorTest,EDFJoin)208 TEST(ThreadPoolExecutorTest, EDFJoin) {
209 join<EDFThreadPoolExecutor>();
210 }
211
212 template <class TPE>
destroy()213 static void destroy() {
214 TPE tpe(1);
215 std::atomic<int> completed(0);
216 auto f = [&]() {
217 burnMs(10)();
218 completed++;
219 };
220 for (int i = 0; i < 1000; i++) {
221 tpe.add(f);
222 }
223 tpe.stop();
224 EXPECT_GT(1000, completed);
225 }
226
227 // IOThreadPoolExecutor's destuctor joins all tasks. Outstanding tasks belong
228 // to the event base, will be executed upon its destruction, and cannot be
229 // taken back.
230 template <>
destroy()231 void destroy<IOThreadPoolExecutor>() {
232 Optional<IOThreadPoolExecutor> tpe(in_place, 1);
233 std::atomic<int> completed(0);
234 auto f = [&]() {
235 burnMs(10)();
236 completed++;
237 };
238 for (int i = 0; i < 10; i++) {
239 tpe->add(f);
240 }
241 tpe.reset();
242 EXPECT_EQ(10, completed);
243 }
244
TEST(ThreadPoolExecutorTest,CPUDestroy)245 TEST(ThreadPoolExecutorTest, CPUDestroy) {
246 destroy<CPUThreadPoolExecutor>();
247 }
248
TEST(ThreadPoolExecutorTest,IODestroy)249 TEST(ThreadPoolExecutorTest, IODestroy) {
250 destroy<IOThreadPoolExecutor>();
251 }
252
TEST(ThreadPoolExecutorTest,EDFDestroy)253 TEST(ThreadPoolExecutorTest, EDFDestroy) {
254 destroy<EDFThreadPoolExecutor>();
255 }
256
257 template <class TPE>
resizeUnderLoad()258 static void resizeUnderLoad() {
259 TPE tpe(10);
260 std::atomic<int> completed(0);
261 auto f = [&]() {
262 burnMs(1)();
263 completed++;
264 };
265 for (int i = 0; i < 1000; i++) {
266 tpe.add(f);
267 }
268 tpe.setNumThreads(5);
269 tpe.setNumThreads(15);
270 tpe.join();
271 EXPECT_EQ(1000, completed);
272 }
273
TEST(ThreadPoolExecutorTest,CPUResizeUnderLoad)274 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
275 resizeUnderLoad<CPUThreadPoolExecutor>();
276 }
277
TEST(ThreadPoolExecutorTest,IOResizeUnderLoad)278 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
279 resizeUnderLoad<IOThreadPoolExecutor>();
280 }
281
TEST(ThreadPoolExecutorTest,EDFResizeUnderLoad)282 TEST(ThreadPoolExecutorTest, EDFResizeUnderLoad) {
283 resizeUnderLoad<EDFThreadPoolExecutor>();
284 }
285
286 template <class TPE>
poolStats()287 static void poolStats() {
288 folly::Baton<> startBaton, endBaton;
289 TPE tpe(1);
290 auto stats = tpe.getPoolStats();
291 EXPECT_GE(1, stats.threadCount);
292 EXPECT_GE(1, stats.idleThreadCount);
293 EXPECT_EQ(0, stats.activeThreadCount);
294 EXPECT_EQ(0, stats.pendingTaskCount);
295 EXPECT_EQ(0, tpe.getPendingTaskCount());
296 EXPECT_EQ(0, stats.totalTaskCount);
297 tpe.add([&]() {
298 startBaton.post();
299 endBaton.wait();
300 });
301 tpe.add([&]() {});
302 startBaton.wait();
303 stats = tpe.getPoolStats();
304 EXPECT_EQ(1, stats.threadCount);
305 EXPECT_EQ(0, stats.idleThreadCount);
306 EXPECT_EQ(1, stats.activeThreadCount);
307 EXPECT_EQ(1, stats.pendingTaskCount);
308 EXPECT_EQ(1, tpe.getPendingTaskCount());
309 EXPECT_EQ(2, stats.totalTaskCount);
310 endBaton.post();
311 }
312
TEST(ThreadPoolExecutorTest,CPUPoolStats)313 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
314 poolStats<CPUThreadPoolExecutor>();
315 }
316
TEST(ThreadPoolExecutorTest,IOPoolStats)317 TEST(ThreadPoolExecutorTest, IOPoolStats) {
318 poolStats<IOThreadPoolExecutor>();
319 }
320
321 template <class TPE>
taskStats()322 static void taskStats() {
323 TPE tpe(1);
324 std::atomic<int> c(0);
325 auto now = std::chrono::steady_clock::now();
326 tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
327 int i = c++;
328 EXPECT_LT(now, stats.enqueueTime);
329 EXPECT_LT(milliseconds(0), stats.runTime);
330 if (i == 1) {
331 EXPECT_LT(milliseconds(0), stats.waitTime);
332 EXPECT_NE(0, stats.requestId);
333 } else {
334 EXPECT_EQ(0, stats.requestId);
335 }
336 });
337 tpe.add(burnMs(10));
338 RequestContextScopeGuard rctx;
339 tpe.add(burnMs(10));
340 tpe.join();
341 EXPECT_EQ(2, c);
342 }
343
TEST(ThreadPoolExecutorTest,CPUTaskStats)344 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
345 taskStats<CPUThreadPoolExecutor>();
346 }
347
TEST(ThreadPoolExecutorTest,IOTaskStats)348 TEST(ThreadPoolExecutorTest, IOTaskStats) {
349 taskStats<IOThreadPoolExecutor>();
350 }
351
TEST(ThreadPoolExecutorTest,EDFTaskStats)352 TEST(ThreadPoolExecutorTest, EDFTaskStats) {
353 taskStats<EDFThreadPoolExecutor>();
354 }
355
TEST(ThreadPoolExecutorTest,GetUsedCpuTime)356 TEST(ThreadPoolExecutorTest, GetUsedCpuTime) {
357 #ifdef __linux__
358 CPUThreadPoolExecutor e(4);
359 ASSERT_EQ(e.numActiveThreads(), 0);
360 ASSERT_EQ(e.getUsedCpuTime(), nanoseconds(0));
361 // get busy
362 Latch latch(4);
363 auto busy_loop = [&] {
364 burnThreadCpu(1s);
365 latch.count_down();
366 };
367 auto idle_loop = [&] {
368 idleLoopFor(1s);
369 latch.count_down();
370 };
371 e.add(busy_loop); // +1s cpu time
372 e.add(busy_loop); // +1s cpu time
373 e.add(idle_loop); // +0s cpu time
374 e.add(idle_loop); // +0s cpu time
375 latch.wait();
376 // pool should have used 2s cpu time (in 1s wall clock time)
377 auto elapsed0 = e.getUsedCpuTime();
378 ASSERT_NEAR_NS(elapsed0, 2s, 100ms);
379 // stop all threads
380 e.setNumThreads(0);
381 ASSERT_EQ(e.numActiveThreads(), 0);
382 // total pool CPU time should not have changed
383 auto elapsed1 = e.getUsedCpuTime();
384 ASSERT_NEAR_NS(elapsed0, elapsed1, 100ms);
385 // add a thread, do nothing, cpu time should stay the same
386 e.setNumThreads(1);
387 Baton<> baton;
388 e.add([&] { baton.post(); });
389 baton.wait();
390 ASSERT_EQ(e.numActiveThreads(), 1);
391 auto elapsed2 = e.getUsedCpuTime();
392 ASSERT_NEAR_NS(elapsed1, elapsed2, 100ms);
393 // now burn some more cycles
394 baton.reset();
395 e.add([&] {
396 burnThreadCpu(500ms);
397 baton.post();
398 });
399 baton.wait();
400 auto elapsed3 = e.getUsedCpuTime();
401 ASSERT_NEAR_NS(elapsed3, elapsed2 + 500ms, 100ms);
402 #else
403 CPUThreadPoolExecutor e(1);
404 // Just make sure 0 is returned
405 ASSERT_EQ(e.getUsedCpuTime(), nanoseconds(0));
406 Baton<> baton;
407 e.add([&] {
408 auto expires = steady_clock::now() + 500ms;
409 while (steady_clock::now() < expires) {
410 }
411 baton.post();
412 });
413 baton.wait();
414 ASSERT_EQ(e.getUsedCpuTime(), nanoseconds(0));
415 #endif
416 }
417
418 template <class TPE>
expiration()419 static void expiration() {
420 TPE tpe(1);
421 std::atomic<int> statCbCount(0);
422 tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
423 int i = statCbCount++;
424 if (i == 0) {
425 EXPECT_FALSE(stats.expired);
426 } else if (i == 1) {
427 EXPECT_TRUE(stats.expired);
428 } else {
429 FAIL();
430 }
431 });
432 std::atomic<int> expireCbCount(0);
433 auto expireCb = [&]() { expireCbCount++; };
434 tpe.add(burnMs(10), seconds(60), expireCb);
435 tpe.add(burnMs(10), milliseconds(10), expireCb);
436 tpe.join();
437 EXPECT_EQ(2, statCbCount);
438 EXPECT_EQ(1, expireCbCount);
439 }
440
TEST(ThreadPoolExecutorTest,CPUExpiration)441 TEST(ThreadPoolExecutorTest, CPUExpiration) {
442 expiration<CPUThreadPoolExecutor>();
443 }
444
TEST(ThreadPoolExecutorTest,IOExpiration)445 TEST(ThreadPoolExecutorTest, IOExpiration) {
446 expiration<IOThreadPoolExecutor>();
447 }
448
449 template <typename TPE>
futureExecutor()450 static void futureExecutor() {
451 FutureExecutor<TPE> fe(2);
452 std::atomic<int> c{0};
453 fe.addFuture([]() { return makeFuture<int>(42); }).then([&](Try<int>&& t) {
454 c++;
455 EXPECT_EQ(42, t.value());
456 });
457 fe.addFuture([]() { return 100; }).then([&](Try<int>&& t) {
458 c++;
459 EXPECT_EQ(100, t.value());
460 });
461 fe.addFuture([]() { return makeFuture(); }).then([&](Try<Unit>&& t) {
462 c++;
463 EXPECT_NO_THROW(t.value());
464 });
465 fe.addFuture([]() { return; }).then([&](Try<Unit>&& t) {
466 c++;
467 EXPECT_NO_THROW(t.value());
468 });
469 fe.addFuture([]() {
470 throw std::runtime_error("oops");
471 }).then([&](Try<Unit>&& t) {
472 c++;
473 EXPECT_THROW(t.value(), std::runtime_error);
474 });
475 // Test doing actual async work
476 folly::Baton<> baton;
477 fe.addFuture([&]() {
478 auto p = std::make_shared<Promise<int>>();
479 std::thread t([p]() {
480 burnMs(10)();
481 p->setValue(42);
482 });
483 t.detach();
484 return p->getFuture();
485 }).then([&](Try<int>&& t) {
486 EXPECT_EQ(42, t.value());
487 c++;
488 baton.post();
489 });
490 baton.wait();
491 fe.join();
492 EXPECT_EQ(6, c);
493 }
494
TEST(ThreadPoolExecutorTest,CPUFuturePool)495 TEST(ThreadPoolExecutorTest, CPUFuturePool) {
496 futureExecutor<CPUThreadPoolExecutor>();
497 }
498
TEST(ThreadPoolExecutorTest,IOFuturePool)499 TEST(ThreadPoolExecutorTest, IOFuturePool) {
500 futureExecutor<IOThreadPoolExecutor>();
501 }
502
TEST(ThreadPoolExecutorTest,EDFFuturePool)503 TEST(ThreadPoolExecutorTest, EDFFuturePool) {
504 futureExecutor<EDFThreadPoolExecutor>();
505 }
506
TEST(ThreadPoolExecutorTest,PriorityPreemptionTest)507 TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
508 bool tookLopri = false;
509 auto completed = 0;
510 auto hipri = [&] {
511 EXPECT_FALSE(tookLopri);
512 completed++;
513 };
514 auto lopri = [&] {
515 tookLopri = true;
516 completed++;
517 };
518 CPUThreadPoolExecutor pool(0, 2);
519 {
520 VirtualExecutor ve(pool);
521 for (int i = 0; i < 50; i++) {
522 ve.addWithPriority(lopri, Executor::LO_PRI);
523 }
524 for (int i = 0; i < 50; i++) {
525 ve.addWithPriority(hipri, Executor::HI_PRI);
526 }
527 pool.setNumThreads(1);
528 }
529 EXPECT_EQ(100, completed);
530 }
531
532 class TestObserver : public ThreadPoolExecutor::Observer {
533 public:
threadStarted(ThreadPoolExecutor::ThreadHandle *)534 void threadStarted(ThreadPoolExecutor::ThreadHandle*) override { threads_++; }
threadStopped(ThreadPoolExecutor::ThreadHandle *)535 void threadStopped(ThreadPoolExecutor::ThreadHandle*) override { threads_--; }
threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle *)536 void threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle*) override {
537 threads_++;
538 }
threadNotYetStopped(ThreadPoolExecutor::ThreadHandle *)539 void threadNotYetStopped(ThreadPoolExecutor::ThreadHandle*) override {
540 threads_--;
541 }
checkCalls()542 void checkCalls() { ASSERT_EQ(threads_, 0); }
543
544 private:
545 std::atomic<int> threads_{0};
546 };
547
548 template <typename TPE>
testObserver()549 static void testObserver() {
550 auto observer = std::make_shared<TestObserver>();
551
552 {
553 TPE exe(10);
554 exe.addObserver(observer);
555 exe.setNumThreads(3);
556 exe.setNumThreads(0);
557 exe.setNumThreads(7);
558 exe.removeObserver(observer);
559 exe.setNumThreads(10);
560 }
561
562 observer->checkCalls();
563 }
564
TEST(ThreadPoolExecutorTest,IOObserver)565 TEST(ThreadPoolExecutorTest, IOObserver) {
566 testObserver<IOThreadPoolExecutor>();
567 }
568
TEST(ThreadPoolExecutorTest,CPUObserver)569 TEST(ThreadPoolExecutorTest, CPUObserver) {
570 testObserver<CPUThreadPoolExecutor>();
571 }
572
TEST(ThreadPoolExecutorTest,EDFObserver)573 TEST(ThreadPoolExecutorTest, EDFObserver) {
574 testObserver<EDFThreadPoolExecutor>();
575 }
576
TEST(ThreadPoolExecutorTest,AddWithPriority)577 TEST(ThreadPoolExecutorTest, AddWithPriority) {
578 std::atomic_int c{0};
579 auto f = [&] { c++; };
580
581 // IO exe doesn't support priorities
582 IOThreadPoolExecutor ioExe(10);
583 EXPECT_THROW(ioExe.addWithPriority(f, 0), std::runtime_error);
584
585 // EDF exe doesn't support priorities
586 EDFThreadPoolExecutor edfExe(10);
587 EXPECT_THROW(edfExe.addWithPriority(f, 0), std::runtime_error);
588
589 CPUThreadPoolExecutor cpuExe(10, 3);
590 cpuExe.addWithPriority(f, -1);
591 cpuExe.addWithPriority(f, 0);
592 cpuExe.addWithPriority(f, 1);
593 cpuExe.addWithPriority(f, -2); // will add at the lowest priority
594 cpuExe.addWithPriority(f, 2); // will add at the highest priority
595 cpuExe.addWithPriority(f, Executor::LO_PRI);
596 cpuExe.addWithPriority(f, Executor::HI_PRI);
597 cpuExe.join();
598
599 EXPECT_EQ(7, c);
600 }
601
TEST(ThreadPoolExecutorTest,BlockingQueue)602 TEST(ThreadPoolExecutorTest, BlockingQueue) {
603 std::atomic_int c{0};
604 auto f = [&] {
605 burnMs(1)();
606 c++;
607 };
608 const int kQueueCapacity = 1;
609 const int kThreads = 1;
610
611 auto queue = std::make_unique<LifoSemMPMCQueue<
612 CPUThreadPoolExecutor::CPUTask,
613 QueueBehaviorIfFull::BLOCK>>(kQueueCapacity);
614
615 CPUThreadPoolExecutor cpuExe(
616 kThreads,
617 std::move(queue),
618 std::make_shared<NamedThreadFactory>("CPUThreadPool"));
619
620 // Add `f` five times. It sleeps for 1ms every time. Calling
621 // `cppExec.add()` is *almost* guaranteed to block because there's
622 // only 1 cpu worker thread.
623 for (int i = 0; i < 5; i++) {
624 EXPECT_NO_THROW(cpuExe.add(f));
625 }
626 cpuExe.join();
627
628 EXPECT_EQ(5, c);
629 }
630
TEST(PriorityThreadFactoryTest,ThreadPriority)631 TEST(PriorityThreadFactoryTest, ThreadPriority) {
632 errno = 0;
633 auto currentPriority = getpriority(PRIO_PROCESS, 0);
634 if (errno != 0) {
635 throwSystemError("failed to get current priority");
636 }
637
638 // Non-root users can only increase the priority value. Make sure we are
639 // trying to go to a higher priority than we are currently running as, up to
640 // the maximum allowed of 20.
641 int desiredPriority = std::min(20, currentPriority + 1);
642
643 PriorityThreadFactory factory(
644 std::make_shared<NamedThreadFactory>("stuff"), desiredPriority);
645 int actualPriority = -21;
646 factory.newThread([&]() { actualPriority = getpriority(PRIO_PROCESS, 0); })
647 .join();
648 EXPECT_EQ(desiredPriority, actualPriority);
649 }
650
TEST(InitThreadFactoryTest,InitializerCalled)651 TEST(InitThreadFactoryTest, InitializerCalled) {
652 int initializerCalledCount = 0;
653 InitThreadFactory factory(
654 std::make_shared<NamedThreadFactory>("test"),
655 [&initializerCalledCount] { initializerCalledCount++; });
656 factory
657 .newThread(
658 [&initializerCalledCount]() { EXPECT_EQ(initializerCalledCount, 1); })
659 .join();
660 EXPECT_EQ(initializerCalledCount, 1);
661 }
662
TEST(InitThreadFactoryTest,InitializerAndFinalizerCalled)663 TEST(InitThreadFactoryTest, InitializerAndFinalizerCalled) {
664 bool initializerCalled = false;
665 bool taskBodyCalled = false;
666 bool finalizerCalled = false;
667
668 InitThreadFactory factory(
669 std::make_shared<NamedThreadFactory>("test"),
670 [&] {
671 // thread initializer
672 EXPECT_FALSE(initializerCalled);
673 EXPECT_FALSE(taskBodyCalled);
674 EXPECT_FALSE(finalizerCalled);
675 initializerCalled = true;
676 },
677 [&] {
678 // thread finalizer
679 EXPECT_TRUE(initializerCalled);
680 EXPECT_TRUE(taskBodyCalled);
681 EXPECT_FALSE(finalizerCalled);
682 finalizerCalled = true;
683 });
684
685 factory
686 .newThread([&]() {
687 EXPECT_TRUE(initializerCalled);
688 EXPECT_FALSE(taskBodyCalled);
689 EXPECT_FALSE(finalizerCalled);
690 taskBodyCalled = true;
691 })
692 .join();
693
694 EXPECT_TRUE(initializerCalled);
695 EXPECT_TRUE(taskBodyCalled);
696 EXPECT_TRUE(finalizerCalled);
697 }
698
699 class TestData : public folly::RequestData {
700 public:
TestData(int data)701 explicit TestData(int data) : data_(data) {}
~TestData()702 ~TestData() override {}
703
hasCallback()704 bool hasCallback() override { return false; }
705
706 int data_;
707 };
708
TEST(ThreadPoolExecutorTest,RequestContext)709 TEST(ThreadPoolExecutorTest, RequestContext) {
710 RequestContextScopeGuard rctx; // create new request context for this scope
711 EXPECT_EQ(nullptr, RequestContext::get()->getContextData("test"));
712 RequestContext::get()->setContextData("test", std::make_unique<TestData>(42));
713 auto data = RequestContext::get()->getContextData("test");
714 EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_);
715
716 struct VerifyRequestContext {
717 ~VerifyRequestContext() {
718 auto data2 = RequestContext::get()->getContextData("test");
719 EXPECT_TRUE(data2 != nullptr);
720 if (data2 != nullptr) {
721 EXPECT_EQ(42, dynamic_cast<TestData*>(data2)->data_);
722 }
723 }
724 };
725
726 {
727 CPUThreadPoolExecutor executor(1);
728 executor.add([] { VerifyRequestContext(); });
729 executor.add([x = VerifyRequestContext()] {});
730 }
731 }
732
733 std::atomic<int> g_sequence{};
734
735 struct SlowMover {
SlowMoverSlowMover736 explicit SlowMover(bool slow_ = false) : slow(slow_) {}
SlowMoverSlowMover737 SlowMover(SlowMover&& other) noexcept { *this = std::move(other); }
operator =SlowMover738 SlowMover& operator=(SlowMover&& other) noexcept {
739 ++g_sequence;
740 slow = other.slow;
741 if (slow) {
742 /* sleep override */ std::this_thread::sleep_for(milliseconds(50));
743 }
744 ++g_sequence;
745 return *this;
746 }
747
748 bool slow;
749 };
750
751 template <typename Q>
bugD3527722_test()752 void bugD3527722_test() {
753 // Test that the queue does not get stuck if writes are completed in
754 // order opposite to how they are initiated.
755 Q q(1024);
756 std::atomic<int> turn{};
757
758 std::thread consumer1([&] {
759 ++turn;
760 q.take();
761 });
762 std::thread consumer2([&] {
763 ++turn;
764 q.take();
765 });
766
767 std::thread producer1([&] {
768 ++turn;
769 while (turn < 4) {
770 ;
771 }
772 ++turn;
773 q.add(SlowMover(true));
774 });
775 std::thread producer2([&] {
776 ++turn;
777 while (turn < 5) {
778 ;
779 }
780 q.add(SlowMover(false));
781 });
782
783 producer1.join();
784 producer2.join();
785 consumer1.join();
786 consumer2.join();
787 }
788
TEST(ThreadPoolExecutorTest,LifoSemMPMCQueueBugD3527722)789 TEST(ThreadPoolExecutorTest, LifoSemMPMCQueueBugD3527722) {
790 bugD3527722_test<LifoSemMPMCQueue<SlowMover>>();
791 }
792
793 template <typename T>
794 struct UBQ : public UnboundedBlockingQueue<T> {
UBQUBQ795 explicit UBQ(int) {}
796 };
797
TEST(ThreadPoolExecutorTest,UnboundedBlockingQueueBugD3527722)798 TEST(ThreadPoolExecutorTest, UnboundedBlockingQueueBugD3527722) {
799 bugD3527722_test<UBQ<SlowMover>>();
800 }
801
802 template <typename Q>
nothrow_not_full_test()803 void nothrow_not_full_test() {
804 /* LifoSemMPMCQueue should not throw when not full when active
805 consumers are delayed. */
806 Q q(2);
807 g_sequence = 0;
808
809 std::thread consumer1([&] {
810 while (g_sequence < 4) {
811 ;
812 }
813 q.take(); // ++g_sequence to 5 then slow
814 });
815 std::thread consumer2([&] {
816 while (g_sequence < 5) {
817 ;
818 }
819 q.take(); // ++g_sequence to 6 and 7 - fast
820 });
821
822 std::thread producer([&] {
823 q.add(SlowMover(true)); // ++g_sequence to 1 and 2
824 q.add(SlowMover(false)); // ++g_sequence to 3 and 4
825 while (g_sequence < 7) { // g_sequence == 7 implies queue is not full
826 ;
827 }
828 EXPECT_NO_THROW(q.add(SlowMover(false)));
829 });
830
831 producer.join();
832 consumer1.join();
833 consumer2.join();
834 }
835
TEST(ThreadPoolExecutorTest,LifoSemMPMCQueueNoThrowNotFull)836 TEST(ThreadPoolExecutorTest, LifoSemMPMCQueueNoThrowNotFull) {
837 nothrow_not_full_test<LifoSemMPMCQueue<SlowMover>>();
838 }
839
840 template <typename TPE>
removeThreadTest()841 static void removeThreadTest() {
842 // test that adding a .then() after we have removed some threads
843 // doesn't cause deadlock and they are executed on different threads
844 folly::Optional<folly::Future<int>> f;
845 std::thread::id id1, id2;
846 TPE fe(2);
847 f = folly::makeFuture()
848 .via(&fe)
849 .thenValue([&id1](auto&&) {
850 burnMs(100)();
851 id1 = std::this_thread::get_id();
852 })
853 .thenValue([&id2](auto&&) {
854 return 77;
855 id2 = std::this_thread::get_id();
856 });
857 fe.setNumThreads(1);
858
859 // future::then should be fulfilled because there is other thread available
860 EXPECT_EQ(77, std::move(*f).get());
861 // two thread should be different because then part should be rescheduled to
862 // the other thread
863 EXPECT_NE(id1, id2);
864 }
865
TEST(ThreadPoolExecutorTest,RemoveThreadTestIO)866 TEST(ThreadPoolExecutorTest, RemoveThreadTestIO) {
867 removeThreadTest<IOThreadPoolExecutor>();
868 }
869
TEST(ThreadPoolExecutorTest,RemoveThreadTestCPU)870 TEST(ThreadPoolExecutorTest, RemoveThreadTestCPU) {
871 removeThreadTest<CPUThreadPoolExecutor>();
872 }
873
TEST(ThreadPoolExecutorTest,RemoveThreadTestEDF)874 TEST(ThreadPoolExecutorTest, RemoveThreadTestEDF) {
875 removeThreadTest<EDFThreadPoolExecutor>();
876 }
877
878 template <typename TPE>
resizeThreadWhileExecutingTest()879 static void resizeThreadWhileExecutingTest() {
880 TPE tpe(10);
881 EXPECT_EQ(10, tpe.numThreads());
882
883 std::atomic<int> completed(0);
884 auto f = [&]() {
885 burnMs(10)();
886 completed++;
887 };
888 for (int i = 0; i < 1000; i++) {
889 tpe.add(f);
890 }
891 tpe.setNumThreads(8);
892 EXPECT_EQ(8, tpe.numThreads());
893 tpe.setNumThreads(5);
894 EXPECT_EQ(5, tpe.numThreads());
895 tpe.setNumThreads(15);
896 EXPECT_EQ(15, tpe.numThreads());
897 tpe.join();
898 EXPECT_EQ(1000, completed);
899 }
900
TEST(ThreadPoolExecutorTest,resizeThreadWhileExecutingTestIO)901 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestIO) {
902 resizeThreadWhileExecutingTest<IOThreadPoolExecutor>();
903 }
904
TEST(ThreadPoolExecutorTest,resizeThreadWhileExecutingTestCPU)905 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestCPU) {
906 resizeThreadWhileExecutingTest<CPUThreadPoolExecutor>();
907 }
908
TEST(ThreadPoolExecutorTest,resizeThreadWhileExecutingTestEDF)909 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestEDF) {
910 resizeThreadWhileExecutingTest<EDFThreadPoolExecutor>();
911 }
912
913 template <typename TPE>
keepAliveTest()914 void keepAliveTest() {
915 auto executor = std::make_unique<TPE>(4);
916
917 auto f = futures::sleep(std::chrono::milliseconds{100})
918 .via(executor.get())
919 .thenValue([keepAlive = getKeepAliveToken(executor.get())](
920 auto&&) { return 42; })
921 .semi();
922
923 executor.reset();
924
925 EXPECT_TRUE(f.isReady());
926 EXPECT_EQ(42, std::move(f).get());
927 }
928
TEST(ThreadPoolExecutorTest,KeepAliveTestIO)929 TEST(ThreadPoolExecutorTest, KeepAliveTestIO) {
930 keepAliveTest<IOThreadPoolExecutor>();
931 }
932
TEST(ThreadPoolExecutorTest,KeepAliveTestCPU)933 TEST(ThreadPoolExecutorTest, KeepAliveTestCPU) {
934 keepAliveTest<CPUThreadPoolExecutor>();
935 }
936
TEST(ThreadPoolExecutorTest,KeepAliveTestEDF)937 TEST(ThreadPoolExecutorTest, KeepAliveTestEDF) {
938 keepAliveTest<EDFThreadPoolExecutor>();
939 }
940
getNumThreadPoolExecutors()941 int getNumThreadPoolExecutors() {
942 int count = 0;
943 ThreadPoolExecutor::withAll([&count](ThreadPoolExecutor&) { count++; });
944 return count;
945 }
946
947 template <typename TPE>
registersToExecutorListTest()948 static void registersToExecutorListTest() {
949 EXPECT_EQ(0, getNumThreadPoolExecutors());
950 {
951 TPE tpe(10);
952 EXPECT_EQ(1, getNumThreadPoolExecutors());
953 {
954 TPE tpe2(5);
955 EXPECT_EQ(2, getNumThreadPoolExecutors());
956 }
957 EXPECT_EQ(1, getNumThreadPoolExecutors());
958 }
959 EXPECT_EQ(0, getNumThreadPoolExecutors());
960 }
961
TEST(ThreadPoolExecutorTest,registersToExecutorListTestIO)962 TEST(ThreadPoolExecutorTest, registersToExecutorListTestIO) {
963 registersToExecutorListTest<IOThreadPoolExecutor>();
964 }
965
TEST(ThreadPoolExecutorTest,registersToExecutorListTestCPU)966 TEST(ThreadPoolExecutorTest, registersToExecutorListTestCPU) {
967 registersToExecutorListTest<CPUThreadPoolExecutor>();
968 }
969
TEST(ThreadPoolExecutorTest,registersToExecutorListTestEDF)970 TEST(ThreadPoolExecutorTest, registersToExecutorListTestEDF) {
971 registersToExecutorListTest<EDFThreadPoolExecutor>();
972 }
973
974 template <typename TPE>
testUsesNameFromNamedThreadFactory()975 static void testUsesNameFromNamedThreadFactory() {
976 auto ntf = std::make_shared<NamedThreadFactory>("my_executor");
977 TPE tpe(10, ntf);
978 EXPECT_EQ("my_executor", tpe.getName());
979 }
980
TEST(ThreadPoolExecutorTest,testUsesNameFromNamedThreadFactoryIO)981 TEST(ThreadPoolExecutorTest, testUsesNameFromNamedThreadFactoryIO) {
982 testUsesNameFromNamedThreadFactory<IOThreadPoolExecutor>();
983 }
984
TEST(ThreadPoolExecutorTest,testUsesNameFromNamedThreadFactoryCPU)985 TEST(ThreadPoolExecutorTest, testUsesNameFromNamedThreadFactoryCPU) {
986 testUsesNameFromNamedThreadFactory<CPUThreadPoolExecutor>();
987 }
988
TEST(ThreadPoolExecutorTest,testUsesNameFromNamedThreadFactoryEDF)989 TEST(ThreadPoolExecutorTest, testUsesNameFromNamedThreadFactoryEDF) {
990 testUsesNameFromNamedThreadFactory<EDFThreadPoolExecutor>();
991 }
992
TEST(ThreadPoolExecutorTest,DynamicThreadsTest)993 TEST(ThreadPoolExecutorTest, DynamicThreadsTest) {
994 boost::barrier barrier{3};
995 auto twice_waiting_task = [&] { barrier.wait(), barrier.wait(); };
996 CPUThreadPoolExecutor e(2);
997 e.setThreadDeathTimeout(std::chrono::milliseconds(100));
998 e.add(twice_waiting_task);
999 e.add(twice_waiting_task);
1000 barrier.wait(); // ensure both tasks are mid-flight
1001 EXPECT_EQ(2, e.getPoolStats().activeThreadCount) << "sanity check";
1002
1003 auto pred = [&] { return e.getPoolStats().activeThreadCount == 0; };
1004 EXPECT_FALSE(pred()) << "sanity check";
1005 barrier.wait(); // let both mid-flight tasks complete
1006 EXPECT_EQ(
1007 folly::detail::spin_result::success,
1008 folly::detail::spin_yield_until(
1009 std::chrono::steady_clock::now() + std::chrono::seconds(1), pred));
1010 }
1011
TEST(ThreadPoolExecutorTest,DynamicThreadAddRemoveRace)1012 TEST(ThreadPoolExecutorTest, DynamicThreadAddRemoveRace) {
1013 CPUThreadPoolExecutor e(1);
1014 e.setThreadDeathTimeout(std::chrono::milliseconds(0));
1015 std::atomic<uint64_t> count{0};
1016 for (int i = 0; i < 10000; i++) {
1017 Baton<> b;
1018 e.add([&]() {
1019 count.fetch_add(1, std::memory_order_relaxed);
1020 b.post();
1021 });
1022 b.wait();
1023 }
1024 e.join();
1025 EXPECT_EQ(count, 10000);
1026 }
1027
TEST(ThreadPoolExecutorTest,AddPerf)1028 TEST(ThreadPoolExecutorTest, AddPerf) {
1029 auto queue = std::make_unique<
1030 UnboundedBlockingQueue<CPUThreadPoolExecutor::CPUTask>>();
1031 CPUThreadPoolExecutor e(
1032 1000,
1033 std::move(queue),
1034 std::make_shared<NamedThreadFactory>("CPUThreadPool"));
1035 e.setThreadDeathTimeout(std::chrono::milliseconds(1));
1036 for (int i = 0; i < 10000; i++) {
1037 e.add([&]() { e.add([]() { /* sleep override */ usleep(1000); }); });
1038 }
1039 e.stop();
1040 }
1041
1042 class ExecutorWorkerProviderTest : public ::testing::Test {
1043 protected:
SetUp()1044 void SetUp() override { kWorkerProviderGlobal = nullptr; }
TearDown()1045 void TearDown() override { kWorkerProviderGlobal = nullptr; }
1046 };
1047
TEST_F(ExecutorWorkerProviderTest,ThreadCollectorBasicTest)1048 TEST_F(ExecutorWorkerProviderTest, ThreadCollectorBasicTest) {
1049 // Start 4 threads and have all of them work on a task.
1050 // Then invoke the ThreadIdCollector::collectThreadIds()
1051 // method to capture the set of active thread ids.
1052 boost::barrier barrier{5};
1053 Synchronized<std::vector<pid_t>> expectedTids;
1054 auto task = [&]() {
1055 expectedTids.wlock()->push_back(folly::getOSThreadID());
1056 barrier.wait();
1057 };
1058 CPUThreadPoolExecutor e(4);
1059 for (int i = 0; i < 4; ++i) {
1060 e.add(task);
1061 }
1062 barrier.wait();
1063 {
1064 const auto threadIdsWithKA = kWorkerProviderGlobal->collectThreadIds();
1065 const auto& ids = threadIdsWithKA.threadIds;
1066 auto locked = expectedTids.rlock();
1067 EXPECT_EQ(ids.size(), locked->size());
1068 EXPECT_TRUE(std::is_permutation(ids.begin(), ids.end(), locked->begin()));
1069 }
1070 e.join();
1071 }
1072
TEST_F(ExecutorWorkerProviderTest,ThreadCollectorMultipleInvocationTest)1073 TEST_F(ExecutorWorkerProviderTest, ThreadCollectorMultipleInvocationTest) {
1074 // Run some tasks via the executor and invoke
1075 // WorkerProvider::collectThreadIds() at least twice to make sure that there
1076 // is no deadlock in repeated invocations.
1077 CPUThreadPoolExecutor e(1);
1078 e.add([&]() {});
1079 {
1080 auto idsWithKA1 = kWorkerProviderGlobal->collectThreadIds();
1081 auto idsWithKA2 = kWorkerProviderGlobal->collectThreadIds();
1082 auto& ids1 = idsWithKA1.threadIds;
1083 auto& ids2 = idsWithKA2.threadIds;
1084 EXPECT_EQ(ids1.size(), 1);
1085 EXPECT_EQ(ids1.size(), ids2.size());
1086 EXPECT_EQ(ids1, ids2);
1087 }
1088 // Add some more threads and schedule tasks while the collector
1089 // is capturing thread Ids.
1090 std::array<folly::Baton<>, 4> bats;
1091 {
1092 auto idsWithKA1 = kWorkerProviderGlobal->collectThreadIds();
1093 e.setNumThreads(4);
1094 for (size_t i = 0; i < 4; ++i) {
1095 e.add([i, &bats]() { bats[i].wait(); });
1096 }
1097 for (auto& bat : bats) {
1098 bat.post();
1099 }
1100 auto idsWithKA2 = kWorkerProviderGlobal->collectThreadIds();
1101 auto& ids1 = idsWithKA1.threadIds;
1102 auto& ids2 = idsWithKA2.threadIds;
1103 EXPECT_EQ(ids1.size(), 1);
1104 EXPECT_EQ(ids2.size(), 4);
1105 }
1106 e.join();
1107 }
1108
TEST_F(ExecutorWorkerProviderTest,ThreadCollectorBlocksThreadExitTest)1109 TEST_F(ExecutorWorkerProviderTest, ThreadCollectorBlocksThreadExitTest) {
1110 // We need to ensure that the collector's keep alive effectively
1111 // blocks the executor's threads from exiting. This is done by verifying
1112 // that a call to reduce the worker count via setNumThreads() does not
1113 // actually reduce the workers (kills threads) while the keep alive is
1114 // in scope.
1115 constexpr size_t kNumThreads = 4;
1116 std::array<folly::Baton<>, kNumThreads> bats;
1117 CPUThreadPoolExecutor e(kNumThreads);
1118 for (size_t i = 0; i < kNumThreads; ++i) {
1119 e.add([i, &bats]() { bats[i].wait(); });
1120 }
1121 Baton<> baton;
1122 Baton<> threadCountBaton;
1123 auto bgCollector = std::thread([&]() {
1124 {
1125 auto idsWithKA = kWorkerProviderGlobal->collectThreadIds();
1126 baton.post();
1127 // Since this thread is holding the KeepAlive, it should block
1128 // the main thread's `setNumThreads()` call which is trying to
1129 // reduce the thread count of the executor. We verify that by
1130 // checking that the baton isn't posted after a 100ms wait.
1131 auto posted =
1132 threadCountBaton.try_wait_for(std::chrono::milliseconds(100));
1133 EXPECT_FALSE(posted);
1134 auto& ids = idsWithKA.threadIds;
1135 // The thread count should still be 4 since the collector's
1136 // keep alive is active. To further verify that the threads are
1137 EXPECT_EQ(ids.size(), kNumThreads);
1138 }
1139 });
1140 baton.wait();
1141 for (auto& bat : bats) {
1142 bat.post();
1143 }
1144 e.setNumThreads(2);
1145 threadCountBaton.post();
1146 bgCollector.join();
1147 // The thread count should now be reduced to 2.
1148 EXPECT_EQ(e.numThreads(), 2);
1149 e.join();
1150 }
1151
1152 template <typename TPE>
WeakRefTest()1153 static void WeakRefTest() {
1154 // test that adding a .then() after we have
1155 // started shutting down does not deadlock
1156 folly::Optional<folly::Future<folly::Unit>> f;
1157 int counter{0};
1158 {
1159 TPE fe(1);
1160 f = folly::makeFuture()
1161 .via(&fe)
1162 .thenValue([](auto&&) { burnMs(100)(); })
1163 .thenValue([&](auto&&) { ++counter; })
1164 .via(getWeakRef(fe))
1165 .thenValue([](auto&&) { burnMs(100)(); })
1166 .thenValue([&](auto&&) { ++counter; });
1167 }
1168 EXPECT_THROW(std::move(*f).get(), folly::BrokenPromise);
1169 EXPECT_EQ(1, counter);
1170 }
1171
1172 template <typename TPE>
virtualExecutorTest()1173 static void virtualExecutorTest() {
1174 using namespace std::literals;
1175
1176 folly::Optional<folly::SemiFuture<folly::Unit>> f;
1177 int counter{0};
1178 {
1179 TPE fe(1);
1180 {
1181 VirtualExecutor ve(fe);
1182 f = futures::sleep(100ms)
1183 .via(&ve)
1184 .thenValue([&](auto&&) {
1185 ++counter;
1186 return futures::sleep(100ms);
1187 })
1188 .via(&fe)
1189 .thenValue([&](auto&&) { ++counter; })
1190 .semi();
1191 }
1192 EXPECT_EQ(1, counter);
1193
1194 bool functionDestroyed{false};
1195 bool functionCalled{false};
1196 {
1197 VirtualExecutor ve(fe);
1198 auto guard = makeGuard([&functionDestroyed] {
1199 std::this_thread::sleep_for(100ms);
1200 functionDestroyed = true;
1201 });
1202 ve.add([&functionCalled, guard = std::move(guard)] {
1203 functionCalled = true;
1204 });
1205 }
1206 EXPECT_TRUE(functionCalled);
1207 EXPECT_TRUE(functionDestroyed);
1208 }
1209 EXPECT_TRUE(f->isReady());
1210 EXPECT_NO_THROW(std::move(*f).get());
1211 EXPECT_EQ(2, counter);
1212 }
1213
1214 class SingleThreadedCPUThreadPoolExecutor : public CPUThreadPoolExecutor,
1215 public SequencedExecutor {
1216 public:
SingleThreadedCPUThreadPoolExecutor(size_t)1217 explicit SingleThreadedCPUThreadPoolExecutor(size_t)
1218 : CPUThreadPoolExecutor(1) {}
1219 };
1220
TEST(ThreadPoolExecutorTest,WeakRefTestIO)1221 TEST(ThreadPoolExecutorTest, WeakRefTestIO) {
1222 WeakRefTest<IOThreadPoolExecutor>();
1223 }
1224
TEST(ThreadPoolExecutorTest,WeakRefTestCPU)1225 TEST(ThreadPoolExecutorTest, WeakRefTestCPU) {
1226 WeakRefTest<CPUThreadPoolExecutor>();
1227 }
1228
TEST(ThreadPoolExecutorTest,WeakRefTestEDF)1229 TEST(ThreadPoolExecutorTest, WeakRefTestEDF) {
1230 WeakRefTest<EDFThreadPoolExecutor>();
1231 }
1232
TEST(ThreadPoolExecutorTest,WeakRefTestSingleThreadedCPU)1233 TEST(ThreadPoolExecutorTest, WeakRefTestSingleThreadedCPU) {
1234 WeakRefTest<SingleThreadedCPUThreadPoolExecutor>();
1235 }
1236
TEST(ThreadPoolExecutorTest,WeakRefTestSequential)1237 TEST(ThreadPoolExecutorTest, WeakRefTestSequential) {
1238 SingleThreadedCPUThreadPoolExecutor ex(1);
1239 auto weakRef = getWeakRef(ex);
1240 EXPECT_TRUE((std::is_same_v<
1241 decltype(weakRef),
1242 Executor::KeepAlive<SequencedExecutor>>));
1243 }
1244
TEST(ThreadPoolExecutorTest,VirtualExecutorTestIO)1245 TEST(ThreadPoolExecutorTest, VirtualExecutorTestIO) {
1246 virtualExecutorTest<IOThreadPoolExecutor>();
1247 }
1248
TEST(ThreadPoolExecutorTest,VirtualExecutorTestCPU)1249 TEST(ThreadPoolExecutorTest, VirtualExecutorTestCPU) {
1250 virtualExecutorTest<CPUThreadPoolExecutor>();
1251 }
1252
TEST(ThreadPoolExecutorTest,VirtualExecutorTestEDF)1253 TEST(ThreadPoolExecutorTest, VirtualExecutorTestEDF) {
1254 virtualExecutorTest<EDFThreadPoolExecutor>();
1255 }
1256
1257 // Test use of guard inside executors
1258 template <class TPE>
currentThreadTest(folly::StringPiece executorName)1259 static void currentThreadTest(folly::StringPiece executorName) {
1260 folly::Optional<ExecutorBlockingContext> ctx{};
1261 TPE tpe(1, std::make_shared<NamedThreadFactory>(executorName));
1262 tpe.add([&ctx]() { ctx = getExecutorBlockingContext(); });
1263 tpe.join();
1264 EXPECT_EQ(ctx->tag, executorName);
1265 }
1266
1267 // Test the nesting of the permit guard
1268 template <class TPE>
currentThreadTestDisabled(folly::StringPiece executorName)1269 static void currentThreadTestDisabled(folly::StringPiece executorName) {
1270 folly::Optional<ExecutorBlockingContext> ctxPermit{};
1271 folly::Optional<ExecutorBlockingContext> ctxForbid{};
1272 TPE tpe(1, std::make_shared<NamedThreadFactory>(executorName));
1273 tpe.add([&]() {
1274 {
1275 // Nest the guard that permits blocking
1276 ExecutorBlockingGuard guard{ExecutorBlockingGuard::PermitTag{}};
1277 ctxPermit = getExecutorBlockingContext();
1278 }
1279 ctxForbid = getExecutorBlockingContext();
1280 });
1281 tpe.join();
1282 EXPECT_TRUE(!ctxPermit.has_value());
1283 EXPECT_EQ(ctxForbid->tag, executorName);
1284 }
1285
TEST(ThreadPoolExecutorTest,CPUCurrentThreadExecutor)1286 TEST(ThreadPoolExecutorTest, CPUCurrentThreadExecutor) {
1287 currentThreadTest<CPUThreadPoolExecutor>("CPU-ExecutorName");
1288 currentThreadTestDisabled<CPUThreadPoolExecutor>("CPU-ExecutorName");
1289 }
1290
TEST(ThreadPoolExecutorTest,IOCurrentThreadExecutor)1291 TEST(ThreadPoolExecutorTest, IOCurrentThreadExecutor) {
1292 currentThreadTest<IOThreadPoolExecutor>("IO-ExecutorName");
1293 currentThreadTestDisabled<IOThreadPoolExecutor>("IO-ExecutorName");
1294 }
1295
TEST(ThreadPoolExecutorTest,EDFCurrentThreadExecutor)1296 TEST(ThreadPoolExecutorTest, EDFCurrentThreadExecutor) {
1297 currentThreadTest<EDFThreadPoolExecutor>("EDF-ExecutorName");
1298 currentThreadTestDisabled<EDFThreadPoolExecutor>("EDF-ExecutorName");
1299 }
1300