1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/CPortability.h>
18 #include <folly/DefaultKeepAliveExecutor.h>
19 #include <folly/executors/CPUThreadPoolExecutor.h>
20 #include <folly/executors/ThreadPoolExecutor.h>
21 #include <folly/lang/Keep.h>
22 #include <folly/synchronization/Latch.h>
23 
24 #include <atomic>
25 #include <memory>
26 #include <thread>
27 
28 #include <boost/thread.hpp>
29 
30 #include <folly/Exception.h>
31 #include <folly/VirtualExecutor.h>
32 #include <folly/executors/CPUThreadPoolExecutor.h>
33 #include <folly/executors/EDFThreadPoolExecutor.h>
34 #include <folly/executors/FutureExecutor.h>
35 #include <folly/executors/IOThreadPoolExecutor.h>
36 #include <folly/executors/task_queue/LifoSemMPMCQueue.h>
37 #include <folly/executors/task_queue/UnboundedBlockingQueue.h>
38 #include <folly/executors/thread_factory/InitThreadFactory.h>
39 #include <folly/executors/thread_factory/PriorityThreadFactory.h>
40 #include <folly/portability/GTest.h>
41 #include <folly/portability/PThread.h>
42 #include <folly/synchronization/detail/Spin.h>
43 
44 using namespace folly;
45 using namespace std::chrono;
46 
47 // Like ASSERT_NEAR, for chrono duration types
48 #define ASSERT_NEAR_NS(a, b, c)  \
49   do {                           \
50     ASSERT_NEAR(                 \
51         nanoseconds(a).count(),  \
52         nanoseconds(b).count(),  \
53         nanoseconds(c).count()); \
54   } while (0)
55 
burnMs(uint64_t ms)56 static Func burnMs(uint64_t ms) {
57   return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
58 }
59 
60 #ifdef __linux__
thread_clock_now()61 static std::chrono::nanoseconds thread_clock_now() {
62   timespec tp;
63   clockid_t clockid;
64   CHECK(!pthread_getcpuclockid(pthread_self(), &clockid));
65   CHECK(!clock_gettime(clockid, &tp));
66   return std::chrono::nanoseconds(tp.tv_nsec) + std::chrono::seconds(tp.tv_sec);
67 }
68 
69 // Loop and burn cpu cycles
burnThreadCpu(milliseconds ms)70 static void burnThreadCpu(milliseconds ms) {
71   auto expires = thread_clock_now() + ms;
72   while (thread_clock_now() < expires) {
73   }
74 }
75 
76 // Loop without using much cpu time
idleLoopFor(milliseconds ms)77 static void idleLoopFor(milliseconds ms) {
78   using clock = high_resolution_clock;
79   auto expires = clock::now() + ms;
80   while (clock::now() < expires) {
81     /* sleep override */ std::this_thread::sleep_for(100ms);
82   }
83 }
84 #endif
85 
86 static WorkerProvider* kWorkerProviderGlobal = nullptr;
87 
88 namespace folly {
89 
90 #if FOLLY_HAVE_WEAK_SYMBOLS
make_queue_observer_factory(const std::string &,size_t,WorkerProvider * workerProvider)91 FOLLY_KEEP std::unique_ptr<QueueObserverFactory> make_queue_observer_factory(
92     const std::string&, size_t, WorkerProvider* workerProvider) {
93   kWorkerProviderGlobal = workerProvider;
94   return {};
95 }
96 #endif
97 
98 } // namespace folly
99 
100 template <class TPE>
basic()101 static void basic() {
102   // Create and destroy
103   TPE tpe(10);
104 }
105 
TEST(ThreadPoolExecutorTest,CPUBasic)106 TEST(ThreadPoolExecutorTest, CPUBasic) {
107   basic<CPUThreadPoolExecutor>();
108 }
109 
TEST(ThreadPoolExecutorTest,IOBasic)110 TEST(ThreadPoolExecutorTest, IOBasic) {
111   basic<IOThreadPoolExecutor>();
112 }
113 
TEST(ThreadPoolExecutorTest,EDFBasic)114 TEST(ThreadPoolExecutorTest, EDFBasic) {
115   basic<EDFThreadPoolExecutor>();
116 }
117 
118 template <class TPE>
resize()119 static void resize() {
120   TPE tpe(100);
121   EXPECT_EQ(100, tpe.numThreads());
122   tpe.setNumThreads(50);
123   EXPECT_EQ(50, tpe.numThreads());
124   tpe.setNumThreads(150);
125   EXPECT_EQ(150, tpe.numThreads());
126 }
127 
TEST(ThreadPoolExecutorTest,CPUResize)128 TEST(ThreadPoolExecutorTest, CPUResize) {
129   resize<CPUThreadPoolExecutor>();
130 }
131 
TEST(ThreadPoolExecutorTest,IOResize)132 TEST(ThreadPoolExecutorTest, IOResize) {
133   resize<IOThreadPoolExecutor>();
134 }
135 
TEST(ThreadPoolExecutorTest,EDFResize)136 TEST(ThreadPoolExecutorTest, EDFResize) {
137   resize<EDFThreadPoolExecutor>();
138 }
139 
140 template <class TPE>
stop()141 static void stop() {
142   TPE tpe(1);
143   std::atomic<int> completed(0);
144   auto f = [&]() {
145     burnMs(10)();
146     completed++;
147   };
148   for (int i = 0; i < 1000; i++) {
149     tpe.add(f);
150   }
151   tpe.stop();
152   EXPECT_GT(1000, completed);
153 }
154 
155 // IOThreadPoolExecutor's stop() behaves like join(). Outstanding tasks belong
156 // to the event base, will be executed upon its destruction, and cannot be
157 // taken back.
158 template <>
stop()159 void stop<IOThreadPoolExecutor>() {
160   IOThreadPoolExecutor tpe(1);
161   std::atomic<int> completed(0);
162   auto f = [&]() {
163     burnMs(10)();
164     completed++;
165   };
166   for (int i = 0; i < 10; i++) {
167     tpe.add(f);
168   }
169   tpe.stop();
170   EXPECT_EQ(10, completed);
171 }
172 
TEST(ThreadPoolExecutorTest,CPUStop)173 TEST(ThreadPoolExecutorTest, CPUStop) {
174   stop<CPUThreadPoolExecutor>();
175 }
176 
TEST(ThreadPoolExecutorTest,IOStop)177 TEST(ThreadPoolExecutorTest, IOStop) {
178   stop<IOThreadPoolExecutor>();
179 }
180 
TEST(ThreadPoolExecutorTest,EDFStop)181 TEST(ThreadPoolExecutorTest, EDFStop) {
182   stop<EDFThreadPoolExecutor>();
183 }
184 
185 template <class TPE>
join()186 static void join() {
187   TPE tpe(10);
188   std::atomic<int> completed(0);
189   auto f = [&]() {
190     burnMs(1)();
191     completed++;
192   };
193   for (int i = 0; i < 1000; i++) {
194     tpe.add(f);
195   }
196   tpe.join();
197   EXPECT_EQ(1000, completed);
198 }
199 
TEST(ThreadPoolExecutorTest,CPUJoin)200 TEST(ThreadPoolExecutorTest, CPUJoin) {
201   join<CPUThreadPoolExecutor>();
202 }
203 
TEST(ThreadPoolExecutorTest,IOJoin)204 TEST(ThreadPoolExecutorTest, IOJoin) {
205   join<IOThreadPoolExecutor>();
206 }
207 
TEST(ThreadPoolExecutorTest,EDFJoin)208 TEST(ThreadPoolExecutorTest, EDFJoin) {
209   join<EDFThreadPoolExecutor>();
210 }
211 
212 template <class TPE>
destroy()213 static void destroy() {
214   TPE tpe(1);
215   std::atomic<int> completed(0);
216   auto f = [&]() {
217     burnMs(10)();
218     completed++;
219   };
220   for (int i = 0; i < 1000; i++) {
221     tpe.add(f);
222   }
223   tpe.stop();
224   EXPECT_GT(1000, completed);
225 }
226 
227 // IOThreadPoolExecutor's destuctor joins all tasks. Outstanding tasks belong
228 // to the event base, will be executed upon its destruction, and cannot be
229 // taken back.
230 template <>
destroy()231 void destroy<IOThreadPoolExecutor>() {
232   Optional<IOThreadPoolExecutor> tpe(in_place, 1);
233   std::atomic<int> completed(0);
234   auto f = [&]() {
235     burnMs(10)();
236     completed++;
237   };
238   for (int i = 0; i < 10; i++) {
239     tpe->add(f);
240   }
241   tpe.reset();
242   EXPECT_EQ(10, completed);
243 }
244 
TEST(ThreadPoolExecutorTest,CPUDestroy)245 TEST(ThreadPoolExecutorTest, CPUDestroy) {
246   destroy<CPUThreadPoolExecutor>();
247 }
248 
TEST(ThreadPoolExecutorTest,IODestroy)249 TEST(ThreadPoolExecutorTest, IODestroy) {
250   destroy<IOThreadPoolExecutor>();
251 }
252 
TEST(ThreadPoolExecutorTest,EDFDestroy)253 TEST(ThreadPoolExecutorTest, EDFDestroy) {
254   destroy<EDFThreadPoolExecutor>();
255 }
256 
257 template <class TPE>
resizeUnderLoad()258 static void resizeUnderLoad() {
259   TPE tpe(10);
260   std::atomic<int> completed(0);
261   auto f = [&]() {
262     burnMs(1)();
263     completed++;
264   };
265   for (int i = 0; i < 1000; i++) {
266     tpe.add(f);
267   }
268   tpe.setNumThreads(5);
269   tpe.setNumThreads(15);
270   tpe.join();
271   EXPECT_EQ(1000, completed);
272 }
273 
TEST(ThreadPoolExecutorTest,CPUResizeUnderLoad)274 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
275   resizeUnderLoad<CPUThreadPoolExecutor>();
276 }
277 
TEST(ThreadPoolExecutorTest,IOResizeUnderLoad)278 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
279   resizeUnderLoad<IOThreadPoolExecutor>();
280 }
281 
TEST(ThreadPoolExecutorTest,EDFResizeUnderLoad)282 TEST(ThreadPoolExecutorTest, EDFResizeUnderLoad) {
283   resizeUnderLoad<EDFThreadPoolExecutor>();
284 }
285 
286 template <class TPE>
poolStats()287 static void poolStats() {
288   folly::Baton<> startBaton, endBaton;
289   TPE tpe(1);
290   auto stats = tpe.getPoolStats();
291   EXPECT_GE(1, stats.threadCount);
292   EXPECT_GE(1, stats.idleThreadCount);
293   EXPECT_EQ(0, stats.activeThreadCount);
294   EXPECT_EQ(0, stats.pendingTaskCount);
295   EXPECT_EQ(0, tpe.getPendingTaskCount());
296   EXPECT_EQ(0, stats.totalTaskCount);
297   tpe.add([&]() {
298     startBaton.post();
299     endBaton.wait();
300   });
301   tpe.add([&]() {});
302   startBaton.wait();
303   stats = tpe.getPoolStats();
304   EXPECT_EQ(1, stats.threadCount);
305   EXPECT_EQ(0, stats.idleThreadCount);
306   EXPECT_EQ(1, stats.activeThreadCount);
307   EXPECT_EQ(1, stats.pendingTaskCount);
308   EXPECT_EQ(1, tpe.getPendingTaskCount());
309   EXPECT_EQ(2, stats.totalTaskCount);
310   endBaton.post();
311 }
312 
TEST(ThreadPoolExecutorTest,CPUPoolStats)313 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
314   poolStats<CPUThreadPoolExecutor>();
315 }
316 
TEST(ThreadPoolExecutorTest,IOPoolStats)317 TEST(ThreadPoolExecutorTest, IOPoolStats) {
318   poolStats<IOThreadPoolExecutor>();
319 }
320 
321 template <class TPE>
taskStats()322 static void taskStats() {
323   TPE tpe(1);
324   std::atomic<int> c(0);
325   auto now = std::chrono::steady_clock::now();
326   tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
327     int i = c++;
328     EXPECT_LT(now, stats.enqueueTime);
329     EXPECT_LT(milliseconds(0), stats.runTime);
330     if (i == 1) {
331       EXPECT_LT(milliseconds(0), stats.waitTime);
332       EXPECT_NE(0, stats.requestId);
333     } else {
334       EXPECT_EQ(0, stats.requestId);
335     }
336   });
337   tpe.add(burnMs(10));
338   RequestContextScopeGuard rctx;
339   tpe.add(burnMs(10));
340   tpe.join();
341   EXPECT_EQ(2, c);
342 }
343 
TEST(ThreadPoolExecutorTest,CPUTaskStats)344 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
345   taskStats<CPUThreadPoolExecutor>();
346 }
347 
TEST(ThreadPoolExecutorTest,IOTaskStats)348 TEST(ThreadPoolExecutorTest, IOTaskStats) {
349   taskStats<IOThreadPoolExecutor>();
350 }
351 
TEST(ThreadPoolExecutorTest,EDFTaskStats)352 TEST(ThreadPoolExecutorTest, EDFTaskStats) {
353   taskStats<EDFThreadPoolExecutor>();
354 }
355 
TEST(ThreadPoolExecutorTest,GetUsedCpuTime)356 TEST(ThreadPoolExecutorTest, GetUsedCpuTime) {
357 #ifdef __linux__
358   CPUThreadPoolExecutor e(4);
359   ASSERT_EQ(e.numActiveThreads(), 0);
360   ASSERT_EQ(e.getUsedCpuTime(), nanoseconds(0));
361   // get busy
362   Latch latch(4);
363   auto busy_loop = [&] {
364     burnThreadCpu(1s);
365     latch.count_down();
366   };
367   auto idle_loop = [&] {
368     idleLoopFor(1s);
369     latch.count_down();
370   };
371   e.add(busy_loop); // +1s cpu time
372   e.add(busy_loop); // +1s cpu time
373   e.add(idle_loop); // +0s cpu time
374   e.add(idle_loop); // +0s cpu time
375   latch.wait();
376   // pool should have used 2s cpu time (in 1s wall clock time)
377   auto elapsed0 = e.getUsedCpuTime();
378   ASSERT_NEAR_NS(elapsed0, 2s, 100ms);
379   // stop all threads
380   e.setNumThreads(0);
381   ASSERT_EQ(e.numActiveThreads(), 0);
382   // total pool CPU time should not have changed
383   auto elapsed1 = e.getUsedCpuTime();
384   ASSERT_NEAR_NS(elapsed0, elapsed1, 100ms);
385   // add a thread, do nothing, cpu time should stay the same
386   e.setNumThreads(1);
387   Baton<> baton;
388   e.add([&] { baton.post(); });
389   baton.wait();
390   ASSERT_EQ(e.numActiveThreads(), 1);
391   auto elapsed2 = e.getUsedCpuTime();
392   ASSERT_NEAR_NS(elapsed1, elapsed2, 100ms);
393   // now burn some more cycles
394   baton.reset();
395   e.add([&] {
396     burnThreadCpu(500ms);
397     baton.post();
398   });
399   baton.wait();
400   auto elapsed3 = e.getUsedCpuTime();
401   ASSERT_NEAR_NS(elapsed3, elapsed2 + 500ms, 100ms);
402 #else
403   CPUThreadPoolExecutor e(1);
404   // Just make sure 0 is returned
405   ASSERT_EQ(e.getUsedCpuTime(), nanoseconds(0));
406   Baton<> baton;
407   e.add([&] {
408     auto expires = steady_clock::now() + 500ms;
409     while (steady_clock::now() < expires) {
410     }
411     baton.post();
412   });
413   baton.wait();
414   ASSERT_EQ(e.getUsedCpuTime(), nanoseconds(0));
415 #endif
416 }
417 
418 template <class TPE>
expiration()419 static void expiration() {
420   TPE tpe(1);
421   std::atomic<int> statCbCount(0);
422   tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
423     int i = statCbCount++;
424     if (i == 0) {
425       EXPECT_FALSE(stats.expired);
426     } else if (i == 1) {
427       EXPECT_TRUE(stats.expired);
428     } else {
429       FAIL();
430     }
431   });
432   std::atomic<int> expireCbCount(0);
433   auto expireCb = [&]() { expireCbCount++; };
434   tpe.add(burnMs(10), seconds(60), expireCb);
435   tpe.add(burnMs(10), milliseconds(10), expireCb);
436   tpe.join();
437   EXPECT_EQ(2, statCbCount);
438   EXPECT_EQ(1, expireCbCount);
439 }
440 
TEST(ThreadPoolExecutorTest,CPUExpiration)441 TEST(ThreadPoolExecutorTest, CPUExpiration) {
442   expiration<CPUThreadPoolExecutor>();
443 }
444 
TEST(ThreadPoolExecutorTest,IOExpiration)445 TEST(ThreadPoolExecutorTest, IOExpiration) {
446   expiration<IOThreadPoolExecutor>();
447 }
448 
449 template <typename TPE>
futureExecutor()450 static void futureExecutor() {
451   FutureExecutor<TPE> fe(2);
452   std::atomic<int> c{0};
453   fe.addFuture([]() { return makeFuture<int>(42); }).then([&](Try<int>&& t) {
454     c++;
455     EXPECT_EQ(42, t.value());
456   });
457   fe.addFuture([]() { return 100; }).then([&](Try<int>&& t) {
458     c++;
459     EXPECT_EQ(100, t.value());
460   });
461   fe.addFuture([]() { return makeFuture(); }).then([&](Try<Unit>&& t) {
462     c++;
463     EXPECT_NO_THROW(t.value());
464   });
465   fe.addFuture([]() { return; }).then([&](Try<Unit>&& t) {
466     c++;
467     EXPECT_NO_THROW(t.value());
468   });
469   fe.addFuture([]() {
470       throw std::runtime_error("oops");
471     }).then([&](Try<Unit>&& t) {
472     c++;
473     EXPECT_THROW(t.value(), std::runtime_error);
474   });
475   // Test doing actual async work
476   folly::Baton<> baton;
477   fe.addFuture([&]() {
478       auto p = std::make_shared<Promise<int>>();
479       std::thread t([p]() {
480         burnMs(10)();
481         p->setValue(42);
482       });
483       t.detach();
484       return p->getFuture();
485     }).then([&](Try<int>&& t) {
486     EXPECT_EQ(42, t.value());
487     c++;
488     baton.post();
489   });
490   baton.wait();
491   fe.join();
492   EXPECT_EQ(6, c);
493 }
494 
TEST(ThreadPoolExecutorTest,CPUFuturePool)495 TEST(ThreadPoolExecutorTest, CPUFuturePool) {
496   futureExecutor<CPUThreadPoolExecutor>();
497 }
498 
TEST(ThreadPoolExecutorTest,IOFuturePool)499 TEST(ThreadPoolExecutorTest, IOFuturePool) {
500   futureExecutor<IOThreadPoolExecutor>();
501 }
502 
TEST(ThreadPoolExecutorTest,EDFFuturePool)503 TEST(ThreadPoolExecutorTest, EDFFuturePool) {
504   futureExecutor<EDFThreadPoolExecutor>();
505 }
506 
TEST(ThreadPoolExecutorTest,PriorityPreemptionTest)507 TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
508   bool tookLopri = false;
509   auto completed = 0;
510   auto hipri = [&] {
511     EXPECT_FALSE(tookLopri);
512     completed++;
513   };
514   auto lopri = [&] {
515     tookLopri = true;
516     completed++;
517   };
518   CPUThreadPoolExecutor pool(0, 2);
519   {
520     VirtualExecutor ve(pool);
521     for (int i = 0; i < 50; i++) {
522       ve.addWithPriority(lopri, Executor::LO_PRI);
523     }
524     for (int i = 0; i < 50; i++) {
525       ve.addWithPriority(hipri, Executor::HI_PRI);
526     }
527     pool.setNumThreads(1);
528   }
529   EXPECT_EQ(100, completed);
530 }
531 
532 class TestObserver : public ThreadPoolExecutor::Observer {
533  public:
threadStarted(ThreadPoolExecutor::ThreadHandle *)534   void threadStarted(ThreadPoolExecutor::ThreadHandle*) override { threads_++; }
threadStopped(ThreadPoolExecutor::ThreadHandle *)535   void threadStopped(ThreadPoolExecutor::ThreadHandle*) override { threads_--; }
threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle *)536   void threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle*) override {
537     threads_++;
538   }
threadNotYetStopped(ThreadPoolExecutor::ThreadHandle *)539   void threadNotYetStopped(ThreadPoolExecutor::ThreadHandle*) override {
540     threads_--;
541   }
checkCalls()542   void checkCalls() { ASSERT_EQ(threads_, 0); }
543 
544  private:
545   std::atomic<int> threads_{0};
546 };
547 
548 template <typename TPE>
testObserver()549 static void testObserver() {
550   auto observer = std::make_shared<TestObserver>();
551 
552   {
553     TPE exe(10);
554     exe.addObserver(observer);
555     exe.setNumThreads(3);
556     exe.setNumThreads(0);
557     exe.setNumThreads(7);
558     exe.removeObserver(observer);
559     exe.setNumThreads(10);
560   }
561 
562   observer->checkCalls();
563 }
564 
TEST(ThreadPoolExecutorTest,IOObserver)565 TEST(ThreadPoolExecutorTest, IOObserver) {
566   testObserver<IOThreadPoolExecutor>();
567 }
568 
TEST(ThreadPoolExecutorTest,CPUObserver)569 TEST(ThreadPoolExecutorTest, CPUObserver) {
570   testObserver<CPUThreadPoolExecutor>();
571 }
572 
TEST(ThreadPoolExecutorTest,EDFObserver)573 TEST(ThreadPoolExecutorTest, EDFObserver) {
574   testObserver<EDFThreadPoolExecutor>();
575 }
576 
TEST(ThreadPoolExecutorTest,AddWithPriority)577 TEST(ThreadPoolExecutorTest, AddWithPriority) {
578   std::atomic_int c{0};
579   auto f = [&] { c++; };
580 
581   // IO exe doesn't support priorities
582   IOThreadPoolExecutor ioExe(10);
583   EXPECT_THROW(ioExe.addWithPriority(f, 0), std::runtime_error);
584 
585   // EDF exe doesn't support priorities
586   EDFThreadPoolExecutor edfExe(10);
587   EXPECT_THROW(edfExe.addWithPriority(f, 0), std::runtime_error);
588 
589   CPUThreadPoolExecutor cpuExe(10, 3);
590   cpuExe.addWithPriority(f, -1);
591   cpuExe.addWithPriority(f, 0);
592   cpuExe.addWithPriority(f, 1);
593   cpuExe.addWithPriority(f, -2); // will add at the lowest priority
594   cpuExe.addWithPriority(f, 2); // will add at the highest priority
595   cpuExe.addWithPriority(f, Executor::LO_PRI);
596   cpuExe.addWithPriority(f, Executor::HI_PRI);
597   cpuExe.join();
598 
599   EXPECT_EQ(7, c);
600 }
601 
TEST(ThreadPoolExecutorTest,BlockingQueue)602 TEST(ThreadPoolExecutorTest, BlockingQueue) {
603   std::atomic_int c{0};
604   auto f = [&] {
605     burnMs(1)();
606     c++;
607   };
608   const int kQueueCapacity = 1;
609   const int kThreads = 1;
610 
611   auto queue = std::make_unique<LifoSemMPMCQueue<
612       CPUThreadPoolExecutor::CPUTask,
613       QueueBehaviorIfFull::BLOCK>>(kQueueCapacity);
614 
615   CPUThreadPoolExecutor cpuExe(
616       kThreads,
617       std::move(queue),
618       std::make_shared<NamedThreadFactory>("CPUThreadPool"));
619 
620   // Add `f` five times. It sleeps for 1ms every time. Calling
621   // `cppExec.add()` is *almost* guaranteed to block because there's
622   // only 1 cpu worker thread.
623   for (int i = 0; i < 5; i++) {
624     EXPECT_NO_THROW(cpuExe.add(f));
625   }
626   cpuExe.join();
627 
628   EXPECT_EQ(5, c);
629 }
630 
TEST(PriorityThreadFactoryTest,ThreadPriority)631 TEST(PriorityThreadFactoryTest, ThreadPriority) {
632   errno = 0;
633   auto currentPriority = getpriority(PRIO_PROCESS, 0);
634   if (errno != 0) {
635     throwSystemError("failed to get current priority");
636   }
637 
638   // Non-root users can only increase the priority value.  Make sure we are
639   // trying to go to a higher priority than we are currently running as, up to
640   // the maximum allowed of 20.
641   int desiredPriority = std::min(20, currentPriority + 1);
642 
643   PriorityThreadFactory factory(
644       std::make_shared<NamedThreadFactory>("stuff"), desiredPriority);
645   int actualPriority = -21;
646   factory.newThread([&]() { actualPriority = getpriority(PRIO_PROCESS, 0); })
647       .join();
648   EXPECT_EQ(desiredPriority, actualPriority);
649 }
650 
TEST(InitThreadFactoryTest,InitializerCalled)651 TEST(InitThreadFactoryTest, InitializerCalled) {
652   int initializerCalledCount = 0;
653   InitThreadFactory factory(
654       std::make_shared<NamedThreadFactory>("test"),
655       [&initializerCalledCount] { initializerCalledCount++; });
656   factory
657       .newThread(
658           [&initializerCalledCount]() { EXPECT_EQ(initializerCalledCount, 1); })
659       .join();
660   EXPECT_EQ(initializerCalledCount, 1);
661 }
662 
TEST(InitThreadFactoryTest,InitializerAndFinalizerCalled)663 TEST(InitThreadFactoryTest, InitializerAndFinalizerCalled) {
664   bool initializerCalled = false;
665   bool taskBodyCalled = false;
666   bool finalizerCalled = false;
667 
668   InitThreadFactory factory(
669       std::make_shared<NamedThreadFactory>("test"),
670       [&] {
671         // thread initializer
672         EXPECT_FALSE(initializerCalled);
673         EXPECT_FALSE(taskBodyCalled);
674         EXPECT_FALSE(finalizerCalled);
675         initializerCalled = true;
676       },
677       [&] {
678         // thread finalizer
679         EXPECT_TRUE(initializerCalled);
680         EXPECT_TRUE(taskBodyCalled);
681         EXPECT_FALSE(finalizerCalled);
682         finalizerCalled = true;
683       });
684 
685   factory
686       .newThread([&]() {
687         EXPECT_TRUE(initializerCalled);
688         EXPECT_FALSE(taskBodyCalled);
689         EXPECT_FALSE(finalizerCalled);
690         taskBodyCalled = true;
691       })
692       .join();
693 
694   EXPECT_TRUE(initializerCalled);
695   EXPECT_TRUE(taskBodyCalled);
696   EXPECT_TRUE(finalizerCalled);
697 }
698 
699 class TestData : public folly::RequestData {
700  public:
TestData(int data)701   explicit TestData(int data) : data_(data) {}
~TestData()702   ~TestData() override {}
703 
hasCallback()704   bool hasCallback() override { return false; }
705 
706   int data_;
707 };
708 
TEST(ThreadPoolExecutorTest,RequestContext)709 TEST(ThreadPoolExecutorTest, RequestContext) {
710   RequestContextScopeGuard rctx; // create new request context for this scope
711   EXPECT_EQ(nullptr, RequestContext::get()->getContextData("test"));
712   RequestContext::get()->setContextData("test", std::make_unique<TestData>(42));
713   auto data = RequestContext::get()->getContextData("test");
714   EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_);
715 
716   struct VerifyRequestContext {
717     ~VerifyRequestContext() {
718       auto data2 = RequestContext::get()->getContextData("test");
719       EXPECT_TRUE(data2 != nullptr);
720       if (data2 != nullptr) {
721         EXPECT_EQ(42, dynamic_cast<TestData*>(data2)->data_);
722       }
723     }
724   };
725 
726   {
727     CPUThreadPoolExecutor executor(1);
728     executor.add([] { VerifyRequestContext(); });
729     executor.add([x = VerifyRequestContext()] {});
730   }
731 }
732 
733 std::atomic<int> g_sequence{};
734 
735 struct SlowMover {
SlowMoverSlowMover736   explicit SlowMover(bool slow_ = false) : slow(slow_) {}
SlowMoverSlowMover737   SlowMover(SlowMover&& other) noexcept { *this = std::move(other); }
operator =SlowMover738   SlowMover& operator=(SlowMover&& other) noexcept {
739     ++g_sequence;
740     slow = other.slow;
741     if (slow) {
742       /* sleep override */ std::this_thread::sleep_for(milliseconds(50));
743     }
744     ++g_sequence;
745     return *this;
746   }
747 
748   bool slow;
749 };
750 
751 template <typename Q>
bugD3527722_test()752 void bugD3527722_test() {
753   // Test that the queue does not get stuck if writes are completed in
754   // order opposite to how they are initiated.
755   Q q(1024);
756   std::atomic<int> turn{};
757 
758   std::thread consumer1([&] {
759     ++turn;
760     q.take();
761   });
762   std::thread consumer2([&] {
763     ++turn;
764     q.take();
765   });
766 
767   std::thread producer1([&] {
768     ++turn;
769     while (turn < 4) {
770       ;
771     }
772     ++turn;
773     q.add(SlowMover(true));
774   });
775   std::thread producer2([&] {
776     ++turn;
777     while (turn < 5) {
778       ;
779     }
780     q.add(SlowMover(false));
781   });
782 
783   producer1.join();
784   producer2.join();
785   consumer1.join();
786   consumer2.join();
787 }
788 
TEST(ThreadPoolExecutorTest,LifoSemMPMCQueueBugD3527722)789 TEST(ThreadPoolExecutorTest, LifoSemMPMCQueueBugD3527722) {
790   bugD3527722_test<LifoSemMPMCQueue<SlowMover>>();
791 }
792 
793 template <typename T>
794 struct UBQ : public UnboundedBlockingQueue<T> {
UBQUBQ795   explicit UBQ(int) {}
796 };
797 
TEST(ThreadPoolExecutorTest,UnboundedBlockingQueueBugD3527722)798 TEST(ThreadPoolExecutorTest, UnboundedBlockingQueueBugD3527722) {
799   bugD3527722_test<UBQ<SlowMover>>();
800 }
801 
802 template <typename Q>
nothrow_not_full_test()803 void nothrow_not_full_test() {
804   /* LifoSemMPMCQueue should not throw when not full when active
805      consumers are delayed. */
806   Q q(2);
807   g_sequence = 0;
808 
809   std::thread consumer1([&] {
810     while (g_sequence < 4) {
811       ;
812     }
813     q.take(); // ++g_sequence to 5 then slow
814   });
815   std::thread consumer2([&] {
816     while (g_sequence < 5) {
817       ;
818     }
819     q.take(); // ++g_sequence to 6 and 7 - fast
820   });
821 
822   std::thread producer([&] {
823     q.add(SlowMover(true)); // ++g_sequence to 1 and 2
824     q.add(SlowMover(false)); // ++g_sequence to 3 and 4
825     while (g_sequence < 7) { // g_sequence == 7 implies queue is not full
826       ;
827     }
828     EXPECT_NO_THROW(q.add(SlowMover(false)));
829   });
830 
831   producer.join();
832   consumer1.join();
833   consumer2.join();
834 }
835 
TEST(ThreadPoolExecutorTest,LifoSemMPMCQueueNoThrowNotFull)836 TEST(ThreadPoolExecutorTest, LifoSemMPMCQueueNoThrowNotFull) {
837   nothrow_not_full_test<LifoSemMPMCQueue<SlowMover>>();
838 }
839 
840 template <typename TPE>
removeThreadTest()841 static void removeThreadTest() {
842   // test that adding a .then() after we have removed some threads
843   // doesn't cause deadlock and they are executed on different threads
844   folly::Optional<folly::Future<int>> f;
845   std::thread::id id1, id2;
846   TPE fe(2);
847   f = folly::makeFuture()
848           .via(&fe)
849           .thenValue([&id1](auto&&) {
850             burnMs(100)();
851             id1 = std::this_thread::get_id();
852           })
853           .thenValue([&id2](auto&&) {
854             return 77;
855             id2 = std::this_thread::get_id();
856           });
857   fe.setNumThreads(1);
858 
859   // future::then should be fulfilled because there is other thread available
860   EXPECT_EQ(77, std::move(*f).get());
861   // two thread should be different because then part should be rescheduled to
862   // the other thread
863   EXPECT_NE(id1, id2);
864 }
865 
TEST(ThreadPoolExecutorTest,RemoveThreadTestIO)866 TEST(ThreadPoolExecutorTest, RemoveThreadTestIO) {
867   removeThreadTest<IOThreadPoolExecutor>();
868 }
869 
TEST(ThreadPoolExecutorTest,RemoveThreadTestCPU)870 TEST(ThreadPoolExecutorTest, RemoveThreadTestCPU) {
871   removeThreadTest<CPUThreadPoolExecutor>();
872 }
873 
TEST(ThreadPoolExecutorTest,RemoveThreadTestEDF)874 TEST(ThreadPoolExecutorTest, RemoveThreadTestEDF) {
875   removeThreadTest<EDFThreadPoolExecutor>();
876 }
877 
878 template <typename TPE>
resizeThreadWhileExecutingTest()879 static void resizeThreadWhileExecutingTest() {
880   TPE tpe(10);
881   EXPECT_EQ(10, tpe.numThreads());
882 
883   std::atomic<int> completed(0);
884   auto f = [&]() {
885     burnMs(10)();
886     completed++;
887   };
888   for (int i = 0; i < 1000; i++) {
889     tpe.add(f);
890   }
891   tpe.setNumThreads(8);
892   EXPECT_EQ(8, tpe.numThreads());
893   tpe.setNumThreads(5);
894   EXPECT_EQ(5, tpe.numThreads());
895   tpe.setNumThreads(15);
896   EXPECT_EQ(15, tpe.numThreads());
897   tpe.join();
898   EXPECT_EQ(1000, completed);
899 }
900 
TEST(ThreadPoolExecutorTest,resizeThreadWhileExecutingTestIO)901 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestIO) {
902   resizeThreadWhileExecutingTest<IOThreadPoolExecutor>();
903 }
904 
TEST(ThreadPoolExecutorTest,resizeThreadWhileExecutingTestCPU)905 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestCPU) {
906   resizeThreadWhileExecutingTest<CPUThreadPoolExecutor>();
907 }
908 
TEST(ThreadPoolExecutorTest,resizeThreadWhileExecutingTestEDF)909 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestEDF) {
910   resizeThreadWhileExecutingTest<EDFThreadPoolExecutor>();
911 }
912 
913 template <typename TPE>
keepAliveTest()914 void keepAliveTest() {
915   auto executor = std::make_unique<TPE>(4);
916 
917   auto f = futures::sleep(std::chrono::milliseconds{100})
918                .via(executor.get())
919                .thenValue([keepAlive = getKeepAliveToken(executor.get())](
920                               auto&&) { return 42; })
921                .semi();
922 
923   executor.reset();
924 
925   EXPECT_TRUE(f.isReady());
926   EXPECT_EQ(42, std::move(f).get());
927 }
928 
TEST(ThreadPoolExecutorTest,KeepAliveTestIO)929 TEST(ThreadPoolExecutorTest, KeepAliveTestIO) {
930   keepAliveTest<IOThreadPoolExecutor>();
931 }
932 
TEST(ThreadPoolExecutorTest,KeepAliveTestCPU)933 TEST(ThreadPoolExecutorTest, KeepAliveTestCPU) {
934   keepAliveTest<CPUThreadPoolExecutor>();
935 }
936 
TEST(ThreadPoolExecutorTest,KeepAliveTestEDF)937 TEST(ThreadPoolExecutorTest, KeepAliveTestEDF) {
938   keepAliveTest<EDFThreadPoolExecutor>();
939 }
940 
getNumThreadPoolExecutors()941 int getNumThreadPoolExecutors() {
942   int count = 0;
943   ThreadPoolExecutor::withAll([&count](ThreadPoolExecutor&) { count++; });
944   return count;
945 }
946 
947 template <typename TPE>
registersToExecutorListTest()948 static void registersToExecutorListTest() {
949   EXPECT_EQ(0, getNumThreadPoolExecutors());
950   {
951     TPE tpe(10);
952     EXPECT_EQ(1, getNumThreadPoolExecutors());
953     {
954       TPE tpe2(5);
955       EXPECT_EQ(2, getNumThreadPoolExecutors());
956     }
957     EXPECT_EQ(1, getNumThreadPoolExecutors());
958   }
959   EXPECT_EQ(0, getNumThreadPoolExecutors());
960 }
961 
TEST(ThreadPoolExecutorTest,registersToExecutorListTestIO)962 TEST(ThreadPoolExecutorTest, registersToExecutorListTestIO) {
963   registersToExecutorListTest<IOThreadPoolExecutor>();
964 }
965 
TEST(ThreadPoolExecutorTest,registersToExecutorListTestCPU)966 TEST(ThreadPoolExecutorTest, registersToExecutorListTestCPU) {
967   registersToExecutorListTest<CPUThreadPoolExecutor>();
968 }
969 
TEST(ThreadPoolExecutorTest,registersToExecutorListTestEDF)970 TEST(ThreadPoolExecutorTest, registersToExecutorListTestEDF) {
971   registersToExecutorListTest<EDFThreadPoolExecutor>();
972 }
973 
974 template <typename TPE>
testUsesNameFromNamedThreadFactory()975 static void testUsesNameFromNamedThreadFactory() {
976   auto ntf = std::make_shared<NamedThreadFactory>("my_executor");
977   TPE tpe(10, ntf);
978   EXPECT_EQ("my_executor", tpe.getName());
979 }
980 
TEST(ThreadPoolExecutorTest,testUsesNameFromNamedThreadFactoryIO)981 TEST(ThreadPoolExecutorTest, testUsesNameFromNamedThreadFactoryIO) {
982   testUsesNameFromNamedThreadFactory<IOThreadPoolExecutor>();
983 }
984 
TEST(ThreadPoolExecutorTest,testUsesNameFromNamedThreadFactoryCPU)985 TEST(ThreadPoolExecutorTest, testUsesNameFromNamedThreadFactoryCPU) {
986   testUsesNameFromNamedThreadFactory<CPUThreadPoolExecutor>();
987 }
988 
TEST(ThreadPoolExecutorTest,testUsesNameFromNamedThreadFactoryEDF)989 TEST(ThreadPoolExecutorTest, testUsesNameFromNamedThreadFactoryEDF) {
990   testUsesNameFromNamedThreadFactory<EDFThreadPoolExecutor>();
991 }
992 
TEST(ThreadPoolExecutorTest,DynamicThreadsTest)993 TEST(ThreadPoolExecutorTest, DynamicThreadsTest) {
994   boost::barrier barrier{3};
995   auto twice_waiting_task = [&] { barrier.wait(), barrier.wait(); };
996   CPUThreadPoolExecutor e(2);
997   e.setThreadDeathTimeout(std::chrono::milliseconds(100));
998   e.add(twice_waiting_task);
999   e.add(twice_waiting_task);
1000   barrier.wait(); // ensure both tasks are mid-flight
1001   EXPECT_EQ(2, e.getPoolStats().activeThreadCount) << "sanity check";
1002 
1003   auto pred = [&] { return e.getPoolStats().activeThreadCount == 0; };
1004   EXPECT_FALSE(pred()) << "sanity check";
1005   barrier.wait(); // let both mid-flight tasks complete
1006   EXPECT_EQ(
1007       folly::detail::spin_result::success,
1008       folly::detail::spin_yield_until(
1009           std::chrono::steady_clock::now() + std::chrono::seconds(1), pred));
1010 }
1011 
TEST(ThreadPoolExecutorTest,DynamicThreadAddRemoveRace)1012 TEST(ThreadPoolExecutorTest, DynamicThreadAddRemoveRace) {
1013   CPUThreadPoolExecutor e(1);
1014   e.setThreadDeathTimeout(std::chrono::milliseconds(0));
1015   std::atomic<uint64_t> count{0};
1016   for (int i = 0; i < 10000; i++) {
1017     Baton<> b;
1018     e.add([&]() {
1019       count.fetch_add(1, std::memory_order_relaxed);
1020       b.post();
1021     });
1022     b.wait();
1023   }
1024   e.join();
1025   EXPECT_EQ(count, 10000);
1026 }
1027 
TEST(ThreadPoolExecutorTest,AddPerf)1028 TEST(ThreadPoolExecutorTest, AddPerf) {
1029   auto queue = std::make_unique<
1030       UnboundedBlockingQueue<CPUThreadPoolExecutor::CPUTask>>();
1031   CPUThreadPoolExecutor e(
1032       1000,
1033       std::move(queue),
1034       std::make_shared<NamedThreadFactory>("CPUThreadPool"));
1035   e.setThreadDeathTimeout(std::chrono::milliseconds(1));
1036   for (int i = 0; i < 10000; i++) {
1037     e.add([&]() { e.add([]() { /* sleep override */ usleep(1000); }); });
1038   }
1039   e.stop();
1040 }
1041 
1042 class ExecutorWorkerProviderTest : public ::testing::Test {
1043  protected:
SetUp()1044   void SetUp() override { kWorkerProviderGlobal = nullptr; }
TearDown()1045   void TearDown() override { kWorkerProviderGlobal = nullptr; }
1046 };
1047 
TEST_F(ExecutorWorkerProviderTest,ThreadCollectorBasicTest)1048 TEST_F(ExecutorWorkerProviderTest, ThreadCollectorBasicTest) {
1049   // Start 4 threads and have all of them work on a task.
1050   // Then invoke the ThreadIdCollector::collectThreadIds()
1051   // method to capture the set of active thread ids.
1052   boost::barrier barrier{5};
1053   Synchronized<std::vector<pid_t>> expectedTids;
1054   auto task = [&]() {
1055     expectedTids.wlock()->push_back(folly::getOSThreadID());
1056     barrier.wait();
1057   };
1058   CPUThreadPoolExecutor e(4);
1059   for (int i = 0; i < 4; ++i) {
1060     e.add(task);
1061   }
1062   barrier.wait();
1063   {
1064     const auto threadIdsWithKA = kWorkerProviderGlobal->collectThreadIds();
1065     const auto& ids = threadIdsWithKA.threadIds;
1066     auto locked = expectedTids.rlock();
1067     EXPECT_EQ(ids.size(), locked->size());
1068     EXPECT_TRUE(std::is_permutation(ids.begin(), ids.end(), locked->begin()));
1069   }
1070   e.join();
1071 }
1072 
TEST_F(ExecutorWorkerProviderTest,ThreadCollectorMultipleInvocationTest)1073 TEST_F(ExecutorWorkerProviderTest, ThreadCollectorMultipleInvocationTest) {
1074   // Run some tasks via the executor and invoke
1075   // WorkerProvider::collectThreadIds() at least twice to make sure that there
1076   // is no deadlock in repeated invocations.
1077   CPUThreadPoolExecutor e(1);
1078   e.add([&]() {});
1079   {
1080     auto idsWithKA1 = kWorkerProviderGlobal->collectThreadIds();
1081     auto idsWithKA2 = kWorkerProviderGlobal->collectThreadIds();
1082     auto& ids1 = idsWithKA1.threadIds;
1083     auto& ids2 = idsWithKA2.threadIds;
1084     EXPECT_EQ(ids1.size(), 1);
1085     EXPECT_EQ(ids1.size(), ids2.size());
1086     EXPECT_EQ(ids1, ids2);
1087   }
1088   // Add some more threads and schedule tasks while the collector
1089   // is capturing thread Ids.
1090   std::array<folly::Baton<>, 4> bats;
1091   {
1092     auto idsWithKA1 = kWorkerProviderGlobal->collectThreadIds();
1093     e.setNumThreads(4);
1094     for (size_t i = 0; i < 4; ++i) {
1095       e.add([i, &bats]() { bats[i].wait(); });
1096     }
1097     for (auto& bat : bats) {
1098       bat.post();
1099     }
1100     auto idsWithKA2 = kWorkerProviderGlobal->collectThreadIds();
1101     auto& ids1 = idsWithKA1.threadIds;
1102     auto& ids2 = idsWithKA2.threadIds;
1103     EXPECT_EQ(ids1.size(), 1);
1104     EXPECT_EQ(ids2.size(), 4);
1105   }
1106   e.join();
1107 }
1108 
TEST_F(ExecutorWorkerProviderTest,ThreadCollectorBlocksThreadExitTest)1109 TEST_F(ExecutorWorkerProviderTest, ThreadCollectorBlocksThreadExitTest) {
1110   // We need to ensure that the collector's keep alive effectively
1111   // blocks the executor's threads from exiting. This is done by verifying
1112   // that a call to reduce the worker count via setNumThreads() does not
1113   // actually reduce the workers (kills threads) while  the keep alive is
1114   // in scope.
1115   constexpr size_t kNumThreads = 4;
1116   std::array<folly::Baton<>, kNumThreads> bats;
1117   CPUThreadPoolExecutor e(kNumThreads);
1118   for (size_t i = 0; i < kNumThreads; ++i) {
1119     e.add([i, &bats]() { bats[i].wait(); });
1120   }
1121   Baton<> baton;
1122   Baton<> threadCountBaton;
1123   auto bgCollector = std::thread([&]() {
1124     {
1125       auto idsWithKA = kWorkerProviderGlobal->collectThreadIds();
1126       baton.post();
1127       // Since this thread is holding the KeepAlive, it should block
1128       // the main thread's `setNumThreads()` call which is trying to
1129       // reduce the thread count of the executor. We verify that by
1130       // checking that the baton isn't posted after a 100ms wait.
1131       auto posted =
1132           threadCountBaton.try_wait_for(std::chrono::milliseconds(100));
1133       EXPECT_FALSE(posted);
1134       auto& ids = idsWithKA.threadIds;
1135       // The thread count should still be 4 since the collector's
1136       // keep alive is active. To further verify that the threads are
1137       EXPECT_EQ(ids.size(), kNumThreads);
1138     }
1139   });
1140   baton.wait();
1141   for (auto& bat : bats) {
1142     bat.post();
1143   }
1144   e.setNumThreads(2);
1145   threadCountBaton.post();
1146   bgCollector.join();
1147   // The thread count should now be reduced to 2.
1148   EXPECT_EQ(e.numThreads(), 2);
1149   e.join();
1150 }
1151 
1152 template <typename TPE>
WeakRefTest()1153 static void WeakRefTest() {
1154   // test that adding a .then() after we have
1155   // started shutting down does not deadlock
1156   folly::Optional<folly::Future<folly::Unit>> f;
1157   int counter{0};
1158   {
1159     TPE fe(1);
1160     f = folly::makeFuture()
1161             .via(&fe)
1162             .thenValue([](auto&&) { burnMs(100)(); })
1163             .thenValue([&](auto&&) { ++counter; })
1164             .via(getWeakRef(fe))
1165             .thenValue([](auto&&) { burnMs(100)(); })
1166             .thenValue([&](auto&&) { ++counter; });
1167   }
1168   EXPECT_THROW(std::move(*f).get(), folly::BrokenPromise);
1169   EXPECT_EQ(1, counter);
1170 }
1171 
1172 template <typename TPE>
virtualExecutorTest()1173 static void virtualExecutorTest() {
1174   using namespace std::literals;
1175 
1176   folly::Optional<folly::SemiFuture<folly::Unit>> f;
1177   int counter{0};
1178   {
1179     TPE fe(1);
1180     {
1181       VirtualExecutor ve(fe);
1182       f = futures::sleep(100ms)
1183               .via(&ve)
1184               .thenValue([&](auto&&) {
1185                 ++counter;
1186                 return futures::sleep(100ms);
1187               })
1188               .via(&fe)
1189               .thenValue([&](auto&&) { ++counter; })
1190               .semi();
1191     }
1192     EXPECT_EQ(1, counter);
1193 
1194     bool functionDestroyed{false};
1195     bool functionCalled{false};
1196     {
1197       VirtualExecutor ve(fe);
1198       auto guard = makeGuard([&functionDestroyed] {
1199         std::this_thread::sleep_for(100ms);
1200         functionDestroyed = true;
1201       });
1202       ve.add([&functionCalled, guard = std::move(guard)] {
1203         functionCalled = true;
1204       });
1205     }
1206     EXPECT_TRUE(functionCalled);
1207     EXPECT_TRUE(functionDestroyed);
1208   }
1209   EXPECT_TRUE(f->isReady());
1210   EXPECT_NO_THROW(std::move(*f).get());
1211   EXPECT_EQ(2, counter);
1212 }
1213 
1214 class SingleThreadedCPUThreadPoolExecutor : public CPUThreadPoolExecutor,
1215                                             public SequencedExecutor {
1216  public:
SingleThreadedCPUThreadPoolExecutor(size_t)1217   explicit SingleThreadedCPUThreadPoolExecutor(size_t)
1218       : CPUThreadPoolExecutor(1) {}
1219 };
1220 
TEST(ThreadPoolExecutorTest,WeakRefTestIO)1221 TEST(ThreadPoolExecutorTest, WeakRefTestIO) {
1222   WeakRefTest<IOThreadPoolExecutor>();
1223 }
1224 
TEST(ThreadPoolExecutorTest,WeakRefTestCPU)1225 TEST(ThreadPoolExecutorTest, WeakRefTestCPU) {
1226   WeakRefTest<CPUThreadPoolExecutor>();
1227 }
1228 
TEST(ThreadPoolExecutorTest,WeakRefTestEDF)1229 TEST(ThreadPoolExecutorTest, WeakRefTestEDF) {
1230   WeakRefTest<EDFThreadPoolExecutor>();
1231 }
1232 
TEST(ThreadPoolExecutorTest,WeakRefTestSingleThreadedCPU)1233 TEST(ThreadPoolExecutorTest, WeakRefTestSingleThreadedCPU) {
1234   WeakRefTest<SingleThreadedCPUThreadPoolExecutor>();
1235 }
1236 
TEST(ThreadPoolExecutorTest,WeakRefTestSequential)1237 TEST(ThreadPoolExecutorTest, WeakRefTestSequential) {
1238   SingleThreadedCPUThreadPoolExecutor ex(1);
1239   auto weakRef = getWeakRef(ex);
1240   EXPECT_TRUE((std::is_same_v<
1241                decltype(weakRef),
1242                Executor::KeepAlive<SequencedExecutor>>));
1243 }
1244 
TEST(ThreadPoolExecutorTest,VirtualExecutorTestIO)1245 TEST(ThreadPoolExecutorTest, VirtualExecutorTestIO) {
1246   virtualExecutorTest<IOThreadPoolExecutor>();
1247 }
1248 
TEST(ThreadPoolExecutorTest,VirtualExecutorTestCPU)1249 TEST(ThreadPoolExecutorTest, VirtualExecutorTestCPU) {
1250   virtualExecutorTest<CPUThreadPoolExecutor>();
1251 }
1252 
TEST(ThreadPoolExecutorTest,VirtualExecutorTestEDF)1253 TEST(ThreadPoolExecutorTest, VirtualExecutorTestEDF) {
1254   virtualExecutorTest<EDFThreadPoolExecutor>();
1255 }
1256 
1257 // Test use of guard inside executors
1258 template <class TPE>
currentThreadTest(folly::StringPiece executorName)1259 static void currentThreadTest(folly::StringPiece executorName) {
1260   folly::Optional<ExecutorBlockingContext> ctx{};
1261   TPE tpe(1, std::make_shared<NamedThreadFactory>(executorName));
1262   tpe.add([&ctx]() { ctx = getExecutorBlockingContext(); });
1263   tpe.join();
1264   EXPECT_EQ(ctx->tag, executorName);
1265 }
1266 
1267 // Test the nesting of the permit guard
1268 template <class TPE>
currentThreadTestDisabled(folly::StringPiece executorName)1269 static void currentThreadTestDisabled(folly::StringPiece executorName) {
1270   folly::Optional<ExecutorBlockingContext> ctxPermit{};
1271   folly::Optional<ExecutorBlockingContext> ctxForbid{};
1272   TPE tpe(1, std::make_shared<NamedThreadFactory>(executorName));
1273   tpe.add([&]() {
1274     {
1275       // Nest the guard that permits blocking
1276       ExecutorBlockingGuard guard{ExecutorBlockingGuard::PermitTag{}};
1277       ctxPermit = getExecutorBlockingContext();
1278     }
1279     ctxForbid = getExecutorBlockingContext();
1280   });
1281   tpe.join();
1282   EXPECT_TRUE(!ctxPermit.has_value());
1283   EXPECT_EQ(ctxForbid->tag, executorName);
1284 }
1285 
TEST(ThreadPoolExecutorTest,CPUCurrentThreadExecutor)1286 TEST(ThreadPoolExecutorTest, CPUCurrentThreadExecutor) {
1287   currentThreadTest<CPUThreadPoolExecutor>("CPU-ExecutorName");
1288   currentThreadTestDisabled<CPUThreadPoolExecutor>("CPU-ExecutorName");
1289 }
1290 
TEST(ThreadPoolExecutorTest,IOCurrentThreadExecutor)1291 TEST(ThreadPoolExecutorTest, IOCurrentThreadExecutor) {
1292   currentThreadTest<IOThreadPoolExecutor>("IO-ExecutorName");
1293   currentThreadTestDisabled<IOThreadPoolExecutor>("IO-ExecutorName");
1294 }
1295 
TEST(ThreadPoolExecutorTest,EDFCurrentThreadExecutor)1296 TEST(ThreadPoolExecutorTest, EDFCurrentThreadExecutor) {
1297   currentThreadTest<EDFThreadPoolExecutor>("EDF-ExecutorName");
1298   currentThreadTestDisabled<EDFThreadPoolExecutor>("EDF-ExecutorName");
1299 }
1300