1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <algorithm>
18 #include <atomic>
19 #include <cassert>
20 #include <random>
21 
22 #include <boost/thread.hpp>
23 #include <glog/logging.h>
24 
25 #include <folly/Random.h>
26 #include <folly/experimental/FunctionScheduler.h>
27 #include <folly/portability/GTest.h>
28 #include <folly/synchronization/Baton.h>
29 
30 #if defined(__linux__)
31 #include <dlfcn.h>
32 #endif
33 
34 using namespace folly;
35 using std::atomic;
36 using std::chrono::duration_cast;
37 using std::chrono::microseconds;
38 using std::chrono::milliseconds;
39 using std::chrono::steady_clock;
40 
41 namespace {
42 
43 /*
44  * Helper functions for controlling how long this test takes.
45  *
46  * Using larger intervals here will make the tests less flaky when run on
47  * heavily loaded systems.  However, this will also make the tests take longer
48  * to run.
49  */
50 static const auto timeFactor = std::chrono::milliseconds(400);
testInterval(int n)51 std::chrono::milliseconds testInterval(int n) {
52   return n * timeFactor;
53 }
getTicksWithinRange(int n,int min,int max)54 int getTicksWithinRange(int n, int min, int max) {
55   assert(min <= max);
56   n = std::max(min, n);
57   n = std::min(max, n);
58   return n;
59 }
delay(float n)60 void delay(float n) {
61   microseconds usec(static_cast<microseconds::rep>(
62       duration_cast<microseconds>(timeFactor).count() * n));
63   usleep(usec.count());
64 }
65 
66 } // namespace
67 
TEST(FunctionScheduler,StartAndShutdown)68 TEST(FunctionScheduler, StartAndShutdown) {
69   FunctionScheduler fs;
70   EXPECT_TRUE(fs.start());
71   EXPECT_FALSE(fs.start());
72   EXPECT_TRUE(fs.shutdown());
73   EXPECT_FALSE(fs.shutdown());
74   // start again
75   EXPECT_TRUE(fs.start());
76   EXPECT_FALSE(fs.start());
77   EXPECT_TRUE(fs.shutdown());
78   EXPECT_FALSE(fs.shutdown());
79 }
80 
TEST(FunctionScheduler,SimpleAdd)81 TEST(FunctionScheduler, SimpleAdd) {
82   atomic<int> total{0};
83   FunctionScheduler fs;
84   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
85   fs.start();
86   delay(1);
87   EXPECT_EQ(2, total);
88   fs.shutdown();
89   delay(2);
90   EXPECT_EQ(2, total);
91 }
92 
TEST(FunctionScheduler,AddCancel)93 TEST(FunctionScheduler, AddCancel) {
94   atomic<int> total{0};
95   FunctionScheduler fs;
96   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
97   fs.start();
98   delay(1);
99   EXPECT_EQ(2, total);
100   delay(2);
101   EXPECT_EQ(4, total);
102   EXPECT_TRUE(fs.cancelFunction("add2"));
103   EXPECT_FALSE(fs.cancelFunction("NO SUCH FUNC"));
104   delay(2);
105   EXPECT_EQ(4, total);
106   fs.addFunction([&] { total += 1; }, testInterval(2), "add2");
107   delay(1);
108   EXPECT_EQ(5, total);
109   delay(2);
110   EXPECT_EQ(6, total);
111   fs.shutdown();
112 }
113 
TEST(FunctionScheduler,AddCancel2)114 TEST(FunctionScheduler, AddCancel2) {
115   atomic<int> total{0};
116   FunctionScheduler fs;
117 
118   // Test adds and cancels while the scheduler is stopped
119   EXPECT_FALSE(fs.cancelFunction("add2"));
120   fs.addFunction([&] { total += 1; }, testInterval(2), "add2");
121   EXPECT_TRUE(fs.cancelFunction("add2"));
122   EXPECT_FALSE(fs.cancelFunction("add2"));
123   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
124   fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
125 
126   EXPECT_EQ(0, total);
127   fs.start();
128   delay(1);
129   EXPECT_EQ(5, total);
130 
131   // Cancel add2 while the scheduler is running
132   EXPECT_TRUE(fs.cancelFunction("add2"));
133   EXPECT_FALSE(fs.cancelFunction("add2"));
134   EXPECT_FALSE(fs.cancelFunction("bogus"));
135 
136   delay(3);
137   EXPECT_EQ(8, total);
138   EXPECT_TRUE(fs.cancelFunction("add3"));
139 
140   // Test a function that cancels itself
141   atomic<int> selfCancelCount{0};
142   fs.addFunction(
143       [&] {
144         ++selfCancelCount;
145         if (selfCancelCount > 2) {
146           fs.cancelFunction("selfCancel");
147         }
148       },
149       testInterval(1),
150       "selfCancel",
151       testInterval(1));
152   delay(4);
153   EXPECT_EQ(3, selfCancelCount);
154   EXPECT_FALSE(fs.cancelFunction("selfCancel"));
155 
156   // Test a function that schedules another function
157   atomic<int> adderCount{0};
158   atomic<int> fn2Count = 0;
159   auto fn2 = [&] { ++fn2Count; };
160   auto fnAdder = [&] {
161     ++adderCount;
162     if (adderCount == 2) {
163       fs.addFunction(fn2, testInterval(3), "fn2", testInterval(2));
164     }
165   };
166   fs.addFunction(fnAdder, testInterval(4), "adder");
167   // t0: adder fires
168   delay(1); // t1
169   EXPECT_EQ(1, adderCount);
170   EXPECT_EQ(0, fn2Count);
171   // t4: adder fires, schedules fn2
172   delay(4); // t5
173   EXPECT_EQ(2, adderCount);
174   EXPECT_EQ(0, fn2Count);
175   // t6: fn2 fires
176   delay(2); // t7
177   EXPECT_EQ(2, adderCount);
178   EXPECT_EQ(1, fn2Count);
179   // t8: adder fires
180   // t9: fn2 fires
181   delay(3); // t10
182   EXPECT_EQ(3, adderCount);
183   EXPECT_EQ(2, fn2Count);
184   EXPECT_TRUE(fs.cancelFunction("fn2"));
185   EXPECT_TRUE(fs.cancelFunction("adder"));
186   delay(5); // t10
187   EXPECT_EQ(3, adderCount);
188   EXPECT_EQ(2, fn2Count);
189 
190   EXPECT_EQ(8, total);
191   EXPECT_EQ(3, selfCancelCount);
192 }
193 
TEST(FunctionScheduler,AddMultiple)194 TEST(FunctionScheduler, AddMultiple) {
195   atomic<int> total{0};
196   FunctionScheduler fs;
197   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
198   fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
199   EXPECT_THROW(
200       fs.addFunction([&] { total += 2; }, testInterval(2), "add2"),
201       std::invalid_argument); // function name already exists
202 
203   fs.start();
204   delay(1);
205   EXPECT_EQ(5, total);
206   delay(4);
207   EXPECT_EQ(12, total);
208   EXPECT_TRUE(fs.cancelFunction("add2"));
209   delay(2);
210   EXPECT_EQ(15, total);
211   fs.shutdown();
212   delay(3);
213   EXPECT_EQ(15, total);
214   fs.shutdown();
215 }
216 
TEST(FunctionScheduler,AddAfterStart)217 TEST(FunctionScheduler, AddAfterStart) {
218   atomic<int> total{0};
219   FunctionScheduler fs;
220   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
221   fs.addFunction([&] { total += 3; }, testInterval(2), "add3");
222   fs.start();
223   delay(3);
224   EXPECT_EQ(10, total);
225   fs.addFunction([&] { total += 2; }, testInterval(3), "add22");
226   delay(2);
227   EXPECT_EQ(17, total);
228 }
229 
TEST(FunctionScheduler,ShutdownStart)230 TEST(FunctionScheduler, ShutdownStart) {
231   atomic<int> total{0};
232   FunctionScheduler fs;
233   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
234   fs.start();
235   delay(1);
236   fs.shutdown();
237   fs.start();
238   delay(1);
239   EXPECT_EQ(4, total);
240   EXPECT_FALSE(fs.cancelFunction("add3")); // non existing
241   delay(2);
242   EXPECT_EQ(6, total);
243 }
244 
TEST(FunctionScheduler,ResetFunc)245 TEST(FunctionScheduler, ResetFunc) {
246   atomic<int> total{0};
247   FunctionScheduler fs;
248   fs.addFunction([&] { total += 2; }, testInterval(3), "add2");
249   fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
250   fs.start();
251   delay(1);
252   EXPECT_EQ(5, total);
253   EXPECT_FALSE(fs.resetFunctionTimer("NON_EXISTING"));
254   EXPECT_TRUE(fs.resetFunctionTimer("add2"));
255   delay(1);
256   // t2: after the reset, add2 should have been invoked immediately
257   EXPECT_EQ(7, total);
258   delay(1.5);
259   // t3.5: add3 should have been invoked. add2 should not
260   EXPECT_EQ(10, total);
261   delay(1);
262   // t4.5: add2 should have been invoked once more (it was reset at t1)
263   EXPECT_EQ(12, total);
264 }
265 
TEST(FunctionScheduler,ResetFunc2)266 TEST(FunctionScheduler, ResetFunc2) {
267   atomic<int> total{0};
268   FunctionScheduler fs;
269   fs.addFunctionOnce([&] { total += 2; }, "add2", testInterval(1));
270   fs.addFunctionOnce([&] { total += 3; }, "add3", testInterval(1));
271   fs.start();
272   delay(2);
273   fs.addFunctionOnce([&] { total += 3; }, "add4", testInterval(2));
274   EXPECT_TRUE(fs.resetFunctionTimer("add4"));
275   fs.addFunctionOnce([&] { total += 3; }, "add6", testInterval(2));
276   delay(1);
277   EXPECT_TRUE(fs.resetFunctionTimer("add4"));
278   delay(3);
279   EXPECT_FALSE(fs.resetFunctionTimer("add3"));
280   fs.addFunctionOnce([&] { total += 3; }, "add4", testInterval(1));
281 }
282 
TEST(FunctionScheduler,ResetFuncWhileRunning)283 TEST(FunctionScheduler, ResetFuncWhileRunning) {
284   struct State {
285     boost::barrier barrier_a{2};
286     boost::barrier barrier_b{2};
287     boost::barrier barrier_c{2};
288     boost::barrier barrier_d{2};
289     atomic<bool> set = false;
290     atomic<size_t> count = 0;
291   };
292 
293   State state; // held by ref
294   auto mv = std::make_shared<size_t>(); // gets moved
295 
296   FunctionScheduler fs;
297   fs.addFunction(
298       [&, mv /* ref + shared_ptr fit in in-situ storage */] {
299         if (!state.set) { // first invocation
300           state.barrier_a.wait();
301           // ensure that resetFunctionTimer is called in this critical section
302           state.barrier_b.wait();
303           ++state.count;
304           EXPECT_TRUE(bool(mv)) << "bug repro: mv was moved-out";
305           state.barrier_c.wait();
306           // main thread checks count here
307           state.barrier_d.wait();
308         } else { // subsequent invocations
309           ++state.count;
310         }
311       },
312       testInterval(3),
313       "nada");
314   fs.start();
315 
316   state.barrier_a.wait();
317   state.set = true;
318   fs.resetFunctionTimer("nada");
319   EXPECT_EQ(0, state.count) << "sanity check";
320   state.barrier_b.wait();
321   // fn thread increments count and checks mv here
322   state.barrier_c.wait();
323   EXPECT_EQ(1, state.count) << "sanity check";
324   state.barrier_d.wait();
325   delay(1);
326   EXPECT_EQ(2, state.count) << "sanity check";
327 }
328 
TEST(FunctionScheduler,AddInvalid)329 TEST(FunctionScheduler, AddInvalid) {
330   atomic<int> total{0};
331   FunctionScheduler fs;
332   // interval may not be negative
333   EXPECT_THROW(
334       fs.addFunction([&] { total += 2; }, testInterval(-1), "add2"),
335       std::invalid_argument);
336 
337   EXPECT_FALSE(fs.cancelFunction("addNoFunc"));
338 }
339 
TEST(FunctionScheduler,NoFunctions)340 TEST(FunctionScheduler, NoFunctions) {
341   FunctionScheduler fs;
342   EXPECT_TRUE(fs.start());
343   fs.shutdown();
344   FunctionScheduler fs2;
345   fs2.shutdown();
346 }
347 
TEST(FunctionScheduler,AddWhileRunning)348 TEST(FunctionScheduler, AddWhileRunning) {
349   atomic<int> total{0};
350   FunctionScheduler fs;
351   fs.start();
352   delay(1);
353   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
354   // The function should be invoked nearly immediately when we add it
355   // and the FunctionScheduler is already running
356   delay(0.5);
357   auto t = total.load();
358   EXPECT_EQ(2, t);
359   delay(2);
360   t = total.load();
361   EXPECT_EQ(4, t);
362 }
363 
TEST(FunctionScheduler,NoShutdown)364 TEST(FunctionScheduler, NoShutdown) {
365   atomic<int> total{0};
366   {
367     FunctionScheduler fs;
368     fs.addFunction([&] { total += 2; }, testInterval(1), "add2");
369     fs.start();
370     delay(0.5);
371     EXPECT_EQ(2, total);
372   }
373   // Destroyed the FunctionScheduler without calling shutdown.
374   // Everything should have been cleaned up, and the function will no longer
375   // get called.
376   delay(2);
377   EXPECT_EQ(2, total);
378 }
379 
TEST(FunctionScheduler,StartDelay)380 TEST(FunctionScheduler, StartDelay) {
381   atomic<int> total{0};
382   FunctionScheduler fs;
383   fs.addFunction([&] { total += 2; }, testInterval(2), "add2", testInterval(2));
384   fs.addFunction([&] { total += 3; }, testInterval(3), "add3", testInterval(2));
385   EXPECT_THROW(
386       fs.addFunction(
387           [&] { total += 2; }, testInterval(3), "addX", testInterval(-1)),
388       std::invalid_argument);
389   fs.start();
390   delay(1); // t1
391   EXPECT_EQ(0, total);
392   // t2 : add2 total=2
393   // t2 : add3 total=5
394   delay(2); // t3
395   EXPECT_EQ(5, total);
396   // t4 : add2: total=7
397   // t5 : add3: total=10
398   // t6 : add2: total=12
399   delay(4); // t7
400   EXPECT_EQ(12, total);
401   fs.cancelFunction("add2");
402   // t8 : add3: total=15
403   delay(2); // t9
404   EXPECT_EQ(15, total);
405   fs.shutdown();
406   delay(3);
407   EXPECT_EQ(15, total);
408   fs.shutdown();
409 }
410 
TEST(FunctionScheduler,NoSteadyCatchup)411 TEST(FunctionScheduler, NoSteadyCatchup) {
412   std::atomic<int> ticks(0);
413   FunctionScheduler fs;
414   // fs.setSteady(false); is the default
415   fs.addFunction(
416       [&ticks] {
417         if (++ticks == 2) {
418           std::this_thread::sleep_for(std::chrono::milliseconds(200));
419         }
420       },
421       milliseconds(5));
422   fs.start();
423   std::this_thread::sleep_for(std::chrono::milliseconds(500));
424 
425   // no steady catch up means we'd tick once for 200ms, then remaining
426   // 300ms / 5 = 60 times
427   EXPECT_LE(ticks.load(), 61);
428 }
429 
TEST(FunctionScheduler,SteadyCatchup)430 TEST(FunctionScheduler, SteadyCatchup) {
431   std::atomic<int> ticks(0);
432   FunctionScheduler fs;
433   fs.setSteady(true);
434   fs.addFunction(
435       [&ticks] {
436         if (++ticks == 2) {
437           std::this_thread::sleep_for(std::chrono::milliseconds(200));
438         }
439       },
440       milliseconds(5));
441   fs.start();
442 
443   std::this_thread::sleep_for(std::chrono::milliseconds(500));
444 
445   // tick every 5ms. Despite tick == 2 is slow, later ticks should be fast
446   // enough to catch back up to schedule
447   EXPECT_NEAR(100, ticks.load(), 10);
448 }
449 
TEST(FunctionScheduler,UniformDistribution)450 TEST(FunctionScheduler, UniformDistribution) {
451   atomic<int> total{0};
452   const int kTicks = 2;
453   std::chrono::milliseconds minInterval =
454       testInterval(kTicks) - (timeFactor / 5);
455   std::chrono::milliseconds maxInterval =
456       testInterval(kTicks) + (timeFactor / 5);
457   FunctionScheduler fs;
458   fs.addFunctionUniformDistribution(
459       [&] { total += 2; },
460       minInterval,
461       maxInterval,
462       "UniformDistribution",
463       std::chrono::milliseconds(0));
464   fs.start();
465   delay(1);
466   EXPECT_EQ(2, total);
467   delay(kTicks);
468   EXPECT_EQ(4, total);
469   delay(kTicks);
470   EXPECT_EQ(6, total);
471   fs.shutdown();
472   delay(2);
473   EXPECT_EQ(6, total);
474 }
475 
TEST(FunctionScheduler,ConsistentDelay)476 TEST(FunctionScheduler, ConsistentDelay) {
477   std::atomic<int> ticks(0);
478   FunctionScheduler fs;
479 
480   std::atomic<long long> epoch(0);
481   epoch = duration_cast<milliseconds>(steady_clock::now().time_since_epoch())
482               .count();
483 
484   // We should have runs at t = 0, 600, 800, 1200, or 4 total.
485   // If at const interval, it would be t = 0, 600, 1000, or 3 total.
486   fs.addFunctionConsistentDelay(
487       [&ticks, &epoch] {
488         auto now =
489             duration_cast<milliseconds>(steady_clock::now().time_since_epoch())
490                 .count();
491         int t = ++ticks;
492         if (t != 2) {
493           // Sensitive to delays above 100ms.
494           EXPECT_NEAR((now - epoch) - (t - 1) * 400, 0, 100);
495         }
496         if (t == 1) {
497           /* sleep override */
498           std::this_thread::sleep_for(std::chrono::milliseconds(600));
499         }
500       },
501       milliseconds(400),
502       "ConsistentDelay");
503 
504   fs.start();
505 
506   /* sleep override */
507   std::this_thread::sleep_for(std::chrono::milliseconds(1300));
508   EXPECT_EQ(ticks.load(), 4);
509 }
510 
TEST(FunctionScheduler,ExponentialBackoff)511 TEST(FunctionScheduler, ExponentialBackoff) {
512   atomic<int> total{0};
513   atomic<int> expectedInterval{0};
514   atomic<int> nextInterval{2};
515   FunctionScheduler fs;
516   fs.addFunctionGenericDistribution(
517       [&] { total += 2; },
518       [&expectedInterval, &nextInterval]() mutable {
519         auto interval = nextInterval.load();
520         expectedInterval = interval;
521         nextInterval = interval * interval;
522         return testInterval(interval);
523       },
524       "ExponentialBackoff",
525       "2^n * 100ms",
526       std::chrono::milliseconds(0));
527   fs.start();
528   delay(1);
529   EXPECT_EQ(2, total);
530   delay(expectedInterval);
531   EXPECT_EQ(4, total);
532   delay(expectedInterval);
533   EXPECT_EQ(6, total);
534   fs.shutdown();
535   delay(2);
536   EXPECT_EQ(6, total);
537 }
538 
TEST(FunctionScheduler,GammaIntervalDistribution)539 TEST(FunctionScheduler, GammaIntervalDistribution) {
540   atomic<int> total{0};
541   atomic<int> expectedInterval{0};
542   FunctionScheduler fs;
543   std::default_random_engine generator(folly::Random::rand32());
544   // The alpha and beta arguments are selected, somewhat randomly, to be 2.0.
545   // These values do not matter much in this test, as we are not testing the
546   // std::gamma_distribution itself...
547   std::gamma_distribution<double> gamma(2.0, 2.0);
548   fs.addFunctionGenericDistribution(
549       [&] { total += 2; },
550       [&expectedInterval, generator, gamma]() mutable {
551         expectedInterval =
552             getTicksWithinRange(static_cast<int>(gamma(generator)), 2, 10);
553         return testInterval(expectedInterval);
554       },
555       "GammaDistribution",
556       "gamma(2.0,2.0)*100ms",
557       std::chrono::milliseconds(0));
558   fs.start();
559   delay(1);
560   EXPECT_EQ(2, total);
561   delay(expectedInterval);
562   EXPECT_EQ(4, total);
563   delay(expectedInterval);
564   EXPECT_EQ(6, total);
565   fs.shutdown();
566   delay(2);
567   EXPECT_EQ(6, total);
568 }
569 
TEST(FunctionScheduler,PoissonDistribution)570 TEST(FunctionScheduler, PoissonDistribution) {
571   auto interval = std::chrono::hours(24 * 365 * 10);
572   atomic<int> total{0};
573   FunctionScheduler fs;
574   fs.addFunction(
575       [&] { total += 2; },
576       interval,
577       folly::FunctionScheduler::LatencyDistribution(true, interval),
578       "PoissonDistribution",
579       std::chrono::milliseconds(0));
580   fs.start();
581   delay(1);
582   EXPECT_EQ(2, total);
583 }
584 
TEST(FunctionScheduler,AddWithRunOnce)585 TEST(FunctionScheduler, AddWithRunOnce) {
586   atomic<int> total{0};
587   FunctionScheduler fs;
588   fs.addFunctionOnce([&] { total += 2; }, "add2");
589   fs.start();
590   delay(1);
591   EXPECT_EQ(2, total);
592   delay(2);
593   EXPECT_EQ(2, total);
594 
595   fs.addFunctionOnce([&] { total += 2; }, "add2");
596   delay(1);
597   EXPECT_EQ(4, total);
598   delay(2);
599   EXPECT_EQ(4, total);
600 
601   fs.shutdown();
602 }
603 
TEST(FunctionScheduler,cancelFunctionAndWait)604 TEST(FunctionScheduler, cancelFunctionAndWait) {
605   atomic<int> total{0};
606   FunctionScheduler fs;
607   fs.addFunction(
608       [&] {
609         delay(5);
610         total += 2;
611       },
612       testInterval(100),
613       "add2");
614 
615   fs.start();
616   delay(1);
617   EXPECT_EQ(0, total); // add2 is still sleeping
618 
619   EXPECT_TRUE(fs.cancelFunctionAndWait("add2"));
620   EXPECT_EQ(2, total); // add2 should have completed
621 
622   EXPECT_FALSE(fs.cancelFunction("add2")); // add2 has been canceled
623   fs.shutdown();
624 }
625 
TEST(FunctionScheduler,cancelAllFunctionsAndWait)626 TEST(FunctionScheduler, cancelAllFunctionsAndWait) {
627   atomic<int> total{0};
628   FunctionScheduler fs;
629 
630   fs.addFunction(
631       [&] {
632         delay(5);
633         total += 2;
634       },
635       testInterval(100),
636       "add2");
637 
638   fs.start();
639   delay(1);
640   EXPECT_EQ(0, total); // add2 is still sleeping
641 
642   fs.cancelAllFunctionsAndWait();
643   EXPECT_EQ(2, total);
644 
645   EXPECT_FALSE(fs.cancelFunction("add2")); // add2 has been canceled
646   fs.shutdown();
647 }
648 
TEST(FunctionScheduler,CancelAndWaitOnRunningFunc)649 TEST(FunctionScheduler, CancelAndWaitOnRunningFunc) {
650   folly::Baton<> baton;
651   std::thread th([&baton]() {
652     FunctionScheduler fs;
653     fs.addFunction([] { delay(10); }, testInterval(2), "func");
654     fs.start();
655     delay(1);
656     EXPECT_TRUE(fs.cancelFunctionAndWait("func"));
657     baton.post();
658   });
659 
660   ASSERT_TRUE(baton.try_wait_for(testInterval(15)));
661   th.join();
662 }
663 
TEST(FunctionScheduler,CancelAllAndWaitWithRunningFunc)664 TEST(FunctionScheduler, CancelAllAndWaitWithRunningFunc) {
665   folly::Baton<> baton;
666   std::thread th([&baton]() {
667     FunctionScheduler fs;
668     fs.addFunction([] { delay(10); }, testInterval(2), "func");
669     fs.start();
670     delay(1);
671     fs.cancelAllFunctionsAndWait();
672     baton.post();
673   });
674 
675   ASSERT_TRUE(baton.try_wait_for(testInterval(15)));
676   th.join();
677 }
678 
TEST(FunctionScheduler,CancelAllAndWaitWithOneRunningAndOneWaiting)679 TEST(FunctionScheduler, CancelAllAndWaitWithOneRunningAndOneWaiting) {
680   folly::Baton<> baton;
681   std::thread th([&baton]() {
682     std::atomic<int> nExecuted(0);
683     FunctionScheduler fs;
684     fs.addFunction(
685         [&nExecuted] {
686           nExecuted++;
687           delay(10);
688         },
689         testInterval(2),
690         "func0");
691     fs.addFunction(
692         [&nExecuted] {
693           nExecuted++;
694           delay(10);
695         },
696         testInterval(2),
697         "func1",
698         testInterval(5));
699     fs.start();
700     delay(1);
701     fs.cancelAllFunctionsAndWait();
702     EXPECT_EQ(nExecuted, 1);
703     baton.post();
704   });
705 
706   ASSERT_TRUE(baton.try_wait_for(testInterval(15)));
707   th.join();
708 }
709 
TEST(FunctionScheduler,ConcurrentCancelFunctionAndWait)710 TEST(FunctionScheduler, ConcurrentCancelFunctionAndWait) {
711   FunctionScheduler fs;
712   fs.addFunction([] { delay(10); }, testInterval(2), "func");
713 
714   fs.start();
715   delay(1);
716   std::thread th1([&fs] { EXPECT_TRUE(fs.cancelFunctionAndWait("func")); });
717   delay(1);
718   std::thread th2([&fs] { EXPECT_FALSE(fs.cancelFunctionAndWait("func")); });
719   th1.join();
720   th2.join();
721 }
722