1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/synchronization/LifoSem.h>
18 
19 #include <thread>
20 
21 #include <folly/Benchmark.h>
22 #include <folly/Random.h>
23 #include <folly/portability/Asm.h>
24 #include <folly/portability/GFlags.h>
25 #include <folly/portability/GTest.h>
26 #include <folly/synchronization/NativeSemaphore.h>
27 #include <folly/test/DeterministicSchedule.h>
28 
29 using namespace folly;
30 using namespace folly::test;
31 
32 typedef LifoSemImpl<DeterministicAtomic> DLifoSem;
33 typedef DeterministicSchedule DSched;
34 
35 class LifoSemTest : public testing::Test {
36  private:
37   // pre-init the pool to avoid deadlock when using DeterministicAtomic
38   using Node = detail::LifoSemRawNode<DeterministicAtomic>;
39   Node::Pool& pool_{Node::pool()};
40 };
41 
TEST(LifoSem,basic)42 TEST(LifoSem, basic) {
43   LifoSem sem;
44   EXPECT_FALSE(sem.tryWait());
45   sem.post();
46   EXPECT_TRUE(sem.tryWait());
47   sem.post();
48   sem.wait();
49 }
50 
TEST(LifoSem,multi)51 TEST(LifoSem, multi) {
52   LifoSem sem;
53 
54   const int opsPerThread = 10000;
55   std::thread threads[10];
56   std::atomic<int> blocks(0);
57 
58   for (auto& thr : threads) {
59     thr = std::thread([&] {
60       int b = 0;
61       for (int i = 0; i < opsPerThread; ++i) {
62         if (!sem.tryWait()) {
63           sem.wait();
64           ++b;
65         }
66         sem.post();
67       }
68       blocks += b;
69     });
70   }
71 
72   // start the flood
73   sem.post();
74 
75   for (auto& thr : threads) {
76     thr.join();
77   }
78 
79   LOG(INFO) << opsPerThread * sizeof(threads) / sizeof(threads[0])
80             << " post/wait pairs, " << blocks << " blocked";
81 }
82 
TEST_F(LifoSemTest,pingpong)83 TEST_F(LifoSemTest, pingpong) {
84   DSched sched(DSched::uniform(0));
85 
86   const int iters = 100;
87 
88   for (int pass = 0; pass < 10; ++pass) {
89     DLifoSem a;
90     DLifoSem b;
91 
92     auto thr = DSched::thread([&] {
93       for (int i = 0; i < iters; ++i) {
94         a.wait();
95         // main thread can't be running here
96         EXPECT_EQ(a.valueGuess(), 0);
97         EXPECT_EQ(b.valueGuess(), 0);
98         b.post();
99       }
100     });
101     for (int i = 0; i < iters; ++i) {
102       a.post();
103       b.wait();
104       // child thread can't be running here
105       EXPECT_EQ(a.valueGuess(), 0);
106       EXPECT_EQ(b.valueGuess(), 0);
107     }
108     DSched::join(thr);
109   }
110 }
111 
TEST_F(LifoSemTest,mutex)112 TEST_F(LifoSemTest, mutex) {
113   DSched sched(DSched::uniform(0));
114 
115   const int iters = 100;
116 
117   for (int pass = 0; pass < 10; ++pass) {
118     DLifoSem a;
119 
120     auto thr = DSched::thread([&] {
121       for (int i = 0; i < iters; ++i) {
122         a.wait();
123         a.post();
124       }
125     });
126     for (int i = 0; i < iters; ++i) {
127       a.post();
128       a.wait();
129     }
130     a.post();
131     DSched::join(thr);
132     a.wait();
133   }
134 }
135 
TEST_F(LifoSemTest,no_blocking)136 TEST_F(LifoSemTest, no_blocking) {
137   long seed = folly::randomNumberSeed() % 10000;
138   LOG(INFO) << "seed=" << seed;
139   DSched sched(DSched::uniform(seed));
140 
141   const int iters = 100;
142   const int numThreads = 2;
143   const int width = 10;
144 
145   for (int pass = 0; pass < 10; ++pass) {
146     DLifoSem a;
147 
148     std::vector<std::thread> threads;
149     while (threads.size() < numThreads) {
150       threads.emplace_back(DSched::thread([&] {
151         for (int i = 0; i < iters; ++i) {
152           a.post(width);
153           for (int w = 0; w < width; ++w) {
154             a.wait();
155           }
156         }
157       }));
158     }
159     for (auto& thr : threads) {
160       DSched::join(thr);
161     }
162   }
163 }
164 
TEST_F(LifoSemTest,one_way)165 TEST_F(LifoSemTest, one_way) {
166   long seed = folly::randomNumberSeed() % 10000;
167   LOG(INFO) << "seed=" << seed;
168   DSched sched(DSched::uniformSubset(seed, 1, 6));
169 
170   const int iters = 1000;
171 
172   for (int pass = 0; pass < 10; ++pass) {
173     DLifoSem a;
174 
175     auto thr = DSched::thread([&] {
176       for (int i = 0; i < iters; ++i) {
177         a.wait();
178       }
179     });
180     for (int i = 0; i < iters; ++i) {
181       a.post();
182     }
183     DSched::join(thr);
184   }
185 }
186 
TEST_F(LifoSemTest,shutdown_wait_order)187 TEST_F(LifoSemTest, shutdown_wait_order) {
188   DLifoSem a;
189   a.shutdown();
190   a.post();
191   a.wait();
192   EXPECT_THROW(a.wait(), ShutdownSemError);
193   EXPECT_TRUE(a.isShutdown());
194 }
195 
TEST_F(LifoSemTest,shutdown_multi)196 TEST_F(LifoSemTest, shutdown_multi) {
197   DSched sched(DSched::uniform(0));
198 
199   for (int pass = 0; pass < 10; ++pass) {
200     DLifoSem a;
201     std::vector<std::thread> threads;
202     while (threads.size() < 20) {
203       threads.push_back(DSched::thread([&] {
204         try {
205           a.wait();
206           ADD_FAILURE();
207         } catch (ShutdownSemError&) {
208           // expected
209           EXPECT_TRUE(a.isShutdown());
210         }
211       }));
212     }
213     a.shutdown();
214     for (auto& thr : threads) {
215       DSched::join(thr);
216     }
217   }
218 }
219 
TEST(LifoSem,multi_try_wait_simple)220 TEST(LifoSem, multi_try_wait_simple) {
221   LifoSem sem;
222   sem.post(5);
223   auto n = sem.tryWait(10); // this used to trigger an assert
224   ASSERT_EQ(5, n);
225 }
226 
TEST_F(LifoSemTest,multi_try_wait)227 TEST_F(LifoSemTest, multi_try_wait) {
228   long seed = folly::randomNumberSeed() % 10000;
229   LOG(INFO) << "seed=" << seed;
230   DSched sched(DSched::uniform(seed));
231   DLifoSem sem;
232 
233   const int NPOSTS = 1000;
234 
235   auto producer = [&] {
236     for (int i = 0; i < NPOSTS; ++i) {
237       sem.post();
238     }
239   };
240 
241   DeterministicAtomic<bool> consumer_stop(false);
242   int consumed = 0;
243 
244   auto consumer = [&] {
245     bool stop;
246     do {
247       stop = consumer_stop.load();
248       int n;
249       do {
250         n = sem.tryWait(10);
251         consumed += n;
252       } while (n > 0);
253     } while (!stop);
254   };
255 
256   std::thread producer_thread(DSched::thread(producer));
257   std::thread consumer_thread(DSched::thread(consumer));
258   DSched::join(producer_thread);
259   consumer_stop.store(true);
260   DSched::join(consumer_thread);
261 
262   ASSERT_EQ(NPOSTS, consumed);
263 }
264 
TEST_F(LifoSemTest,timeout)265 TEST_F(LifoSemTest, timeout) {
266   long seed = folly::randomNumberSeed() % 10000;
267   LOG(INFO) << "seed=" << seed;
268   DSched sched(DSched::uniform(seed));
269   DeterministicAtomic<uint32_t> handoffs{0};
270 
271   for (int pass = 0; pass < 10; ++pass) {
272     DLifoSem a;
273     std::vector<std::thread> threads;
274     while (threads.size() < 20) {
275       threads.push_back(DSched::thread([&] {
276         for (int i = 0; i < 10; i++) {
277           try {
278             if (a.try_wait_for(std::chrono::milliseconds(1))) {
279               handoffs--;
280             }
281           } catch (ShutdownSemError&) {
282             // expected
283             EXPECT_TRUE(a.isShutdown());
284           }
285         }
286       }));
287     }
288     std::vector<std::thread> threads2;
289     while (threads2.size() < 20) {
290       threads2.push_back(DSched::thread([&] {
291         for (int i = 0; i < 10; i++) {
292           a.post();
293           handoffs++;
294         }
295       }));
296     }
297     if (pass > 5) {
298       a.shutdown();
299     }
300     for (auto& thr : threads) {
301       DSched::join(thr);
302     }
303     for (auto& thr : threads2) {
304       DSched::join(thr);
305     }
306     // At least one timeout must occur.
307     EXPECT_GT(handoffs.load(), 0);
308   }
309 }
310 
TEST_F(LifoSemTest,shutdown_try_wait_for)311 TEST_F(LifoSemTest, shutdown_try_wait_for) {
312   long seed = folly::randomNumberSeed() % 1000000;
313   LOG(INFO) << "seed=" << seed;
314   DSched sched(DSched::uniform(seed));
315 
316   DLifoSem stopped;
317   std::thread worker1 = DSched::thread([&stopped] {
318     while (!stopped.isShutdown()) {
319       // i.e. poll for messages with timeout
320       LOG(INFO) << "thread polled";
321     }
322   });
323   std::thread worker2 = DSched::thread([&stopped] {
324     while (!stopped.isShutdown()) {
325       // Do some work every 1 second
326 
327       try {
328         // this is normally 1 second in prod use case.
329         stopped.try_wait_for(std::chrono::milliseconds(1));
330       } catch (folly::ShutdownSemError&) {
331         LOG(INFO) << "try_wait_for shutdown";
332       }
333     }
334   });
335 
336   std::thread shutdown = DSched::thread([&stopped] {
337     LOG(INFO) << "LifoSem shutdown";
338     stopped.shutdown();
339     LOG(INFO) << "LifoSem shutdown done";
340   });
341 
342   DSched::join(shutdown);
343   DSched::join(worker1);
344   DSched::join(worker2);
345   LOG(INFO) << "Threads joined";
346 }
347 
BENCHMARK(lifo_sem_pingpong,iters)348 BENCHMARK(lifo_sem_pingpong, iters) {
349   LifoSem a;
350   LifoSem b;
351   auto thr = std::thread([&] {
352     for (size_t i = 0; i < iters; ++i) {
353       a.wait();
354       b.post();
355     }
356   });
357   for (size_t i = 0; i < iters; ++i) {
358     a.post();
359     b.wait();
360   }
361   thr.join();
362 }
363 
BENCHMARK(lifo_sem_oneway,iters)364 BENCHMARK(lifo_sem_oneway, iters) {
365   LifoSem a;
366   auto thr = std::thread([&] {
367     for (size_t i = 0; i < iters; ++i) {
368       a.wait();
369     }
370   });
371   for (size_t i = 0; i < iters; ++i) {
372     a.post();
373   }
374   thr.join();
375 }
376 
BENCHMARK(single_thread_lifo_post,iters)377 BENCHMARK(single_thread_lifo_post, iters) {
378   LifoSem sem;
379   for (size_t n = 0; n < iters; ++n) {
380     sem.post();
381     asm_volatile_memory();
382   }
383 }
384 
BENCHMARK(single_thread_lifo_wait,iters)385 BENCHMARK(single_thread_lifo_wait, iters) {
386   LifoSem sem(iters);
387   for (size_t n = 0; n < iters; ++n) {
388     sem.wait();
389     asm_volatile_memory();
390   }
391 }
392 
BENCHMARK(single_thread_lifo_postwait,iters)393 BENCHMARK(single_thread_lifo_postwait, iters) {
394   LifoSem sem;
395   for (size_t n = 0; n < iters; ++n) {
396     sem.post();
397     asm_volatile_memory();
398     sem.wait();
399     asm_volatile_memory();
400   }
401 }
402 
BENCHMARK(single_thread_lifo_trywait,iters)403 BENCHMARK(single_thread_lifo_trywait, iters) {
404   LifoSem sem;
405   for (size_t n = 0; n < iters; ++n) {
406     EXPECT_FALSE(sem.tryWait());
407     asm_volatile_memory();
408   }
409 }
410 
BENCHMARK(single_thread_native_postwait,iters)411 BENCHMARK(single_thread_native_postwait, iters) {
412   folly::NativeSemaphore sem;
413   for (size_t n = 0; n < iters; ++n) {
414     sem.post();
415     sem.wait();
416   }
417 }
418 
BENCHMARK(single_thread_native_trywait,iters)419 BENCHMARK(single_thread_native_trywait, iters) {
420   folly::NativeSemaphore sem;
421   for (size_t n = 0; n < iters; ++n) {
422     EXPECT_FALSE(sem.try_wait());
423   }
424 }
425 
contendedUse(uint32_t n,int posters,int waiters)426 static void contendedUse(uint32_t n, int posters, int waiters) {
427   LifoSemImpl<std::atomic> sem;
428 
429   std::vector<std::thread> threads;
430   std::atomic<bool> go(false);
431 
432   BENCHMARK_SUSPEND {
433     for (int t = 0; t < waiters; ++t) {
434       threads.emplace_back([=, &sem] {
435         for (uint32_t i = t; i < n; i += waiters) {
436           sem.wait();
437         }
438       });
439     }
440     for (int t = 0; t < posters; ++t) {
441       threads.emplace_back([=, &sem, &go] {
442         while (!go.load()) {
443           std::this_thread::yield();
444         }
445         for (uint32_t i = t; i < n; i += posters) {
446           sem.post();
447         }
448       });
449     }
450   }
451 
452   go.store(true);
453   for (auto& thr : threads) {
454     thr.join();
455   }
456 }
457 
458 BENCHMARK_DRAW_LINE();
459 BENCHMARK_NAMED_PARAM(contendedUse, 1_to_1, 1, 1)
460 BENCHMARK_NAMED_PARAM(contendedUse, 1_to_4, 1, 4)
461 BENCHMARK_NAMED_PARAM(contendedUse, 1_to_32, 1, 32)
462 BENCHMARK_NAMED_PARAM(contendedUse, 4_to_1, 4, 1)
463 BENCHMARK_NAMED_PARAM(contendedUse, 4_to_24, 4, 24)
464 BENCHMARK_NAMED_PARAM(contendedUse, 8_to_100, 8, 100)
465 BENCHMARK_NAMED_PARAM(contendedUse, 32_to_1, 31, 1)
466 BENCHMARK_NAMED_PARAM(contendedUse, 16_to_16, 16, 16)
467 BENCHMARK_NAMED_PARAM(contendedUse, 32_to_32, 32, 32)
468 BENCHMARK_NAMED_PARAM(contendedUse, 32_to_1000, 32, 1000)
469 
470 // sudo nice -n -20 _build/opt/folly/test/LifoSemTests
471 //     --benchmark --bm_min_iters=10000000 --gtest_filter=-\*
472 // ============================================================================
473 // folly/test/LifoSemTests.cpp                     relative  time/iter  iters/s
474 // ============================================================================
475 // lifo_sem_pingpong                                            1.31us  762.40K
476 // lifo_sem_oneway                                            193.89ns    5.16M
477 // single_thread_lifo_post                                     15.37ns   65.08M
478 // single_thread_lifo_wait                                     13.60ns   73.53M
479 // single_thread_lifo_postwait                                 29.43ns   33.98M
480 // single_thread_lifo_trywait                                 677.69ps    1.48G
481 // single_thread_native_postwait                                25.03ns   39.95M
482 // single_thread_native_trywait                                  7.30ns  136.98M
483 // ----------------------------------------------------------------------------
484 // contendedUse(1_to_1)                                       158.22ns    6.32M
485 // contendedUse(1_to_4)                                       574.73ns    1.74M
486 // contendedUse(1_to_32)                                      592.94ns    1.69M
487 // contendedUse(4_to_1)                                       118.28ns    8.45M
488 // contendedUse(4_to_24)                                      667.62ns    1.50M
489 // contendedUse(8_to_100)                                     701.46ns    1.43M
490 // contendedUse(32_to_1)                                      165.06ns    6.06M
491 // contendedUse(16_to_16)                                     238.57ns    4.19M
492 // contendedUse(32_to_32)                                     219.82ns    4.55M
493 // contendedUse(32_to_1000)                                   777.42ns    1.29M
494 // ============================================================================
495 
main(int argc,char ** argv)496 int main(int argc, char** argv) {
497   testing::InitGoogleTest(&argc, argv);
498   gflags::ParseCommandLineFlags(&argc, &argv, true);
499   int rv = RUN_ALL_TESTS();
500   folly::runBenchmarksOnFlag();
501   return rv;
502 }
503