1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <folly/synchronization/LifoSem.h>
18
19 #include <thread>
20
21 #include <folly/Benchmark.h>
22 #include <folly/Random.h>
23 #include <folly/portability/Asm.h>
24 #include <folly/portability/GFlags.h>
25 #include <folly/portability/GTest.h>
26 #include <folly/synchronization/NativeSemaphore.h>
27 #include <folly/test/DeterministicSchedule.h>
28
29 using namespace folly;
30 using namespace folly::test;
31
32 typedef LifoSemImpl<DeterministicAtomic> DLifoSem;
33 typedef DeterministicSchedule DSched;
34
35 class LifoSemTest : public testing::Test {
36 private:
37 // pre-init the pool to avoid deadlock when using DeterministicAtomic
38 using Node = detail::LifoSemRawNode<DeterministicAtomic>;
39 Node::Pool& pool_{Node::pool()};
40 };
41
TEST(LifoSem,basic)42 TEST(LifoSem, basic) {
43 LifoSem sem;
44 EXPECT_FALSE(sem.tryWait());
45 sem.post();
46 EXPECT_TRUE(sem.tryWait());
47 sem.post();
48 sem.wait();
49 }
50
TEST(LifoSem,multi)51 TEST(LifoSem, multi) {
52 LifoSem sem;
53
54 const int opsPerThread = 10000;
55 std::thread threads[10];
56 std::atomic<int> blocks(0);
57
58 for (auto& thr : threads) {
59 thr = std::thread([&] {
60 int b = 0;
61 for (int i = 0; i < opsPerThread; ++i) {
62 if (!sem.tryWait()) {
63 sem.wait();
64 ++b;
65 }
66 sem.post();
67 }
68 blocks += b;
69 });
70 }
71
72 // start the flood
73 sem.post();
74
75 for (auto& thr : threads) {
76 thr.join();
77 }
78
79 LOG(INFO) << opsPerThread * sizeof(threads) / sizeof(threads[0])
80 << " post/wait pairs, " << blocks << " blocked";
81 }
82
TEST_F(LifoSemTest,pingpong)83 TEST_F(LifoSemTest, pingpong) {
84 DSched sched(DSched::uniform(0));
85
86 const int iters = 100;
87
88 for (int pass = 0; pass < 10; ++pass) {
89 DLifoSem a;
90 DLifoSem b;
91
92 auto thr = DSched::thread([&] {
93 for (int i = 0; i < iters; ++i) {
94 a.wait();
95 // main thread can't be running here
96 EXPECT_EQ(a.valueGuess(), 0);
97 EXPECT_EQ(b.valueGuess(), 0);
98 b.post();
99 }
100 });
101 for (int i = 0; i < iters; ++i) {
102 a.post();
103 b.wait();
104 // child thread can't be running here
105 EXPECT_EQ(a.valueGuess(), 0);
106 EXPECT_EQ(b.valueGuess(), 0);
107 }
108 DSched::join(thr);
109 }
110 }
111
TEST_F(LifoSemTest,mutex)112 TEST_F(LifoSemTest, mutex) {
113 DSched sched(DSched::uniform(0));
114
115 const int iters = 100;
116
117 for (int pass = 0; pass < 10; ++pass) {
118 DLifoSem a;
119
120 auto thr = DSched::thread([&] {
121 for (int i = 0; i < iters; ++i) {
122 a.wait();
123 a.post();
124 }
125 });
126 for (int i = 0; i < iters; ++i) {
127 a.post();
128 a.wait();
129 }
130 a.post();
131 DSched::join(thr);
132 a.wait();
133 }
134 }
135
TEST_F(LifoSemTest,no_blocking)136 TEST_F(LifoSemTest, no_blocking) {
137 long seed = folly::randomNumberSeed() % 10000;
138 LOG(INFO) << "seed=" << seed;
139 DSched sched(DSched::uniform(seed));
140
141 const int iters = 100;
142 const int numThreads = 2;
143 const int width = 10;
144
145 for (int pass = 0; pass < 10; ++pass) {
146 DLifoSem a;
147
148 std::vector<std::thread> threads;
149 while (threads.size() < numThreads) {
150 threads.emplace_back(DSched::thread([&] {
151 for (int i = 0; i < iters; ++i) {
152 a.post(width);
153 for (int w = 0; w < width; ++w) {
154 a.wait();
155 }
156 }
157 }));
158 }
159 for (auto& thr : threads) {
160 DSched::join(thr);
161 }
162 }
163 }
164
TEST_F(LifoSemTest,one_way)165 TEST_F(LifoSemTest, one_way) {
166 long seed = folly::randomNumberSeed() % 10000;
167 LOG(INFO) << "seed=" << seed;
168 DSched sched(DSched::uniformSubset(seed, 1, 6));
169
170 const int iters = 1000;
171
172 for (int pass = 0; pass < 10; ++pass) {
173 DLifoSem a;
174
175 auto thr = DSched::thread([&] {
176 for (int i = 0; i < iters; ++i) {
177 a.wait();
178 }
179 });
180 for (int i = 0; i < iters; ++i) {
181 a.post();
182 }
183 DSched::join(thr);
184 }
185 }
186
TEST_F(LifoSemTest,shutdown_wait_order)187 TEST_F(LifoSemTest, shutdown_wait_order) {
188 DLifoSem a;
189 a.shutdown();
190 a.post();
191 a.wait();
192 EXPECT_THROW(a.wait(), ShutdownSemError);
193 EXPECT_TRUE(a.isShutdown());
194 }
195
TEST_F(LifoSemTest,shutdown_multi)196 TEST_F(LifoSemTest, shutdown_multi) {
197 DSched sched(DSched::uniform(0));
198
199 for (int pass = 0; pass < 10; ++pass) {
200 DLifoSem a;
201 std::vector<std::thread> threads;
202 while (threads.size() < 20) {
203 threads.push_back(DSched::thread([&] {
204 try {
205 a.wait();
206 ADD_FAILURE();
207 } catch (ShutdownSemError&) {
208 // expected
209 EXPECT_TRUE(a.isShutdown());
210 }
211 }));
212 }
213 a.shutdown();
214 for (auto& thr : threads) {
215 DSched::join(thr);
216 }
217 }
218 }
219
TEST(LifoSem,multi_try_wait_simple)220 TEST(LifoSem, multi_try_wait_simple) {
221 LifoSem sem;
222 sem.post(5);
223 auto n = sem.tryWait(10); // this used to trigger an assert
224 ASSERT_EQ(5, n);
225 }
226
TEST_F(LifoSemTest,multi_try_wait)227 TEST_F(LifoSemTest, multi_try_wait) {
228 long seed = folly::randomNumberSeed() % 10000;
229 LOG(INFO) << "seed=" << seed;
230 DSched sched(DSched::uniform(seed));
231 DLifoSem sem;
232
233 const int NPOSTS = 1000;
234
235 auto producer = [&] {
236 for (int i = 0; i < NPOSTS; ++i) {
237 sem.post();
238 }
239 };
240
241 DeterministicAtomic<bool> consumer_stop(false);
242 int consumed = 0;
243
244 auto consumer = [&] {
245 bool stop;
246 do {
247 stop = consumer_stop.load();
248 int n;
249 do {
250 n = sem.tryWait(10);
251 consumed += n;
252 } while (n > 0);
253 } while (!stop);
254 };
255
256 std::thread producer_thread(DSched::thread(producer));
257 std::thread consumer_thread(DSched::thread(consumer));
258 DSched::join(producer_thread);
259 consumer_stop.store(true);
260 DSched::join(consumer_thread);
261
262 ASSERT_EQ(NPOSTS, consumed);
263 }
264
TEST_F(LifoSemTest,timeout)265 TEST_F(LifoSemTest, timeout) {
266 long seed = folly::randomNumberSeed() % 10000;
267 LOG(INFO) << "seed=" << seed;
268 DSched sched(DSched::uniform(seed));
269 DeterministicAtomic<uint32_t> handoffs{0};
270
271 for (int pass = 0; pass < 10; ++pass) {
272 DLifoSem a;
273 std::vector<std::thread> threads;
274 while (threads.size() < 20) {
275 threads.push_back(DSched::thread([&] {
276 for (int i = 0; i < 10; i++) {
277 try {
278 if (a.try_wait_for(std::chrono::milliseconds(1))) {
279 handoffs--;
280 }
281 } catch (ShutdownSemError&) {
282 // expected
283 EXPECT_TRUE(a.isShutdown());
284 }
285 }
286 }));
287 }
288 std::vector<std::thread> threads2;
289 while (threads2.size() < 20) {
290 threads2.push_back(DSched::thread([&] {
291 for (int i = 0; i < 10; i++) {
292 a.post();
293 handoffs++;
294 }
295 }));
296 }
297 if (pass > 5) {
298 a.shutdown();
299 }
300 for (auto& thr : threads) {
301 DSched::join(thr);
302 }
303 for (auto& thr : threads2) {
304 DSched::join(thr);
305 }
306 // At least one timeout must occur.
307 EXPECT_GT(handoffs.load(), 0);
308 }
309 }
310
TEST_F(LifoSemTest,shutdown_try_wait_for)311 TEST_F(LifoSemTest, shutdown_try_wait_for) {
312 long seed = folly::randomNumberSeed() % 1000000;
313 LOG(INFO) << "seed=" << seed;
314 DSched sched(DSched::uniform(seed));
315
316 DLifoSem stopped;
317 std::thread worker1 = DSched::thread([&stopped] {
318 while (!stopped.isShutdown()) {
319 // i.e. poll for messages with timeout
320 LOG(INFO) << "thread polled";
321 }
322 });
323 std::thread worker2 = DSched::thread([&stopped] {
324 while (!stopped.isShutdown()) {
325 // Do some work every 1 second
326
327 try {
328 // this is normally 1 second in prod use case.
329 stopped.try_wait_for(std::chrono::milliseconds(1));
330 } catch (folly::ShutdownSemError&) {
331 LOG(INFO) << "try_wait_for shutdown";
332 }
333 }
334 });
335
336 std::thread shutdown = DSched::thread([&stopped] {
337 LOG(INFO) << "LifoSem shutdown";
338 stopped.shutdown();
339 LOG(INFO) << "LifoSem shutdown done";
340 });
341
342 DSched::join(shutdown);
343 DSched::join(worker1);
344 DSched::join(worker2);
345 LOG(INFO) << "Threads joined";
346 }
347
BENCHMARK(lifo_sem_pingpong,iters)348 BENCHMARK(lifo_sem_pingpong, iters) {
349 LifoSem a;
350 LifoSem b;
351 auto thr = std::thread([&] {
352 for (size_t i = 0; i < iters; ++i) {
353 a.wait();
354 b.post();
355 }
356 });
357 for (size_t i = 0; i < iters; ++i) {
358 a.post();
359 b.wait();
360 }
361 thr.join();
362 }
363
BENCHMARK(lifo_sem_oneway,iters)364 BENCHMARK(lifo_sem_oneway, iters) {
365 LifoSem a;
366 auto thr = std::thread([&] {
367 for (size_t i = 0; i < iters; ++i) {
368 a.wait();
369 }
370 });
371 for (size_t i = 0; i < iters; ++i) {
372 a.post();
373 }
374 thr.join();
375 }
376
BENCHMARK(single_thread_lifo_post,iters)377 BENCHMARK(single_thread_lifo_post, iters) {
378 LifoSem sem;
379 for (size_t n = 0; n < iters; ++n) {
380 sem.post();
381 asm_volatile_memory();
382 }
383 }
384
BENCHMARK(single_thread_lifo_wait,iters)385 BENCHMARK(single_thread_lifo_wait, iters) {
386 LifoSem sem(iters);
387 for (size_t n = 0; n < iters; ++n) {
388 sem.wait();
389 asm_volatile_memory();
390 }
391 }
392
BENCHMARK(single_thread_lifo_postwait,iters)393 BENCHMARK(single_thread_lifo_postwait, iters) {
394 LifoSem sem;
395 for (size_t n = 0; n < iters; ++n) {
396 sem.post();
397 asm_volatile_memory();
398 sem.wait();
399 asm_volatile_memory();
400 }
401 }
402
BENCHMARK(single_thread_lifo_trywait,iters)403 BENCHMARK(single_thread_lifo_trywait, iters) {
404 LifoSem sem;
405 for (size_t n = 0; n < iters; ++n) {
406 EXPECT_FALSE(sem.tryWait());
407 asm_volatile_memory();
408 }
409 }
410
BENCHMARK(single_thread_native_postwait,iters)411 BENCHMARK(single_thread_native_postwait, iters) {
412 folly::NativeSemaphore sem;
413 for (size_t n = 0; n < iters; ++n) {
414 sem.post();
415 sem.wait();
416 }
417 }
418
BENCHMARK(single_thread_native_trywait,iters)419 BENCHMARK(single_thread_native_trywait, iters) {
420 folly::NativeSemaphore sem;
421 for (size_t n = 0; n < iters; ++n) {
422 EXPECT_FALSE(sem.try_wait());
423 }
424 }
425
contendedUse(uint32_t n,int posters,int waiters)426 static void contendedUse(uint32_t n, int posters, int waiters) {
427 LifoSemImpl<std::atomic> sem;
428
429 std::vector<std::thread> threads;
430 std::atomic<bool> go(false);
431
432 BENCHMARK_SUSPEND {
433 for (int t = 0; t < waiters; ++t) {
434 threads.emplace_back([=, &sem] {
435 for (uint32_t i = t; i < n; i += waiters) {
436 sem.wait();
437 }
438 });
439 }
440 for (int t = 0; t < posters; ++t) {
441 threads.emplace_back([=, &sem, &go] {
442 while (!go.load()) {
443 std::this_thread::yield();
444 }
445 for (uint32_t i = t; i < n; i += posters) {
446 sem.post();
447 }
448 });
449 }
450 }
451
452 go.store(true);
453 for (auto& thr : threads) {
454 thr.join();
455 }
456 }
457
458 BENCHMARK_DRAW_LINE();
459 BENCHMARK_NAMED_PARAM(contendedUse, 1_to_1, 1, 1)
460 BENCHMARK_NAMED_PARAM(contendedUse, 1_to_4, 1, 4)
461 BENCHMARK_NAMED_PARAM(contendedUse, 1_to_32, 1, 32)
462 BENCHMARK_NAMED_PARAM(contendedUse, 4_to_1, 4, 1)
463 BENCHMARK_NAMED_PARAM(contendedUse, 4_to_24, 4, 24)
464 BENCHMARK_NAMED_PARAM(contendedUse, 8_to_100, 8, 100)
465 BENCHMARK_NAMED_PARAM(contendedUse, 32_to_1, 31, 1)
466 BENCHMARK_NAMED_PARAM(contendedUse, 16_to_16, 16, 16)
467 BENCHMARK_NAMED_PARAM(contendedUse, 32_to_32, 32, 32)
468 BENCHMARK_NAMED_PARAM(contendedUse, 32_to_1000, 32, 1000)
469
470 // sudo nice -n -20 _build/opt/folly/test/LifoSemTests
471 // --benchmark --bm_min_iters=10000000 --gtest_filter=-\*
472 // ============================================================================
473 // folly/test/LifoSemTests.cpp relative time/iter iters/s
474 // ============================================================================
475 // lifo_sem_pingpong 1.31us 762.40K
476 // lifo_sem_oneway 193.89ns 5.16M
477 // single_thread_lifo_post 15.37ns 65.08M
478 // single_thread_lifo_wait 13.60ns 73.53M
479 // single_thread_lifo_postwait 29.43ns 33.98M
480 // single_thread_lifo_trywait 677.69ps 1.48G
481 // single_thread_native_postwait 25.03ns 39.95M
482 // single_thread_native_trywait 7.30ns 136.98M
483 // ----------------------------------------------------------------------------
484 // contendedUse(1_to_1) 158.22ns 6.32M
485 // contendedUse(1_to_4) 574.73ns 1.74M
486 // contendedUse(1_to_32) 592.94ns 1.69M
487 // contendedUse(4_to_1) 118.28ns 8.45M
488 // contendedUse(4_to_24) 667.62ns 1.50M
489 // contendedUse(8_to_100) 701.46ns 1.43M
490 // contendedUse(32_to_1) 165.06ns 6.06M
491 // contendedUse(16_to_16) 238.57ns 4.19M
492 // contendedUse(32_to_32) 219.82ns 4.55M
493 // contendedUse(32_to_1000) 777.42ns 1.29M
494 // ============================================================================
495
main(int argc,char ** argv)496 int main(int argc, char** argv) {
497 testing::InitGoogleTest(&argc, argv);
498 gflags::ParseCommandLineFlags(&argc, &argv, true);
499 int rv = RUN_ALL_TESTS();
500 folly::runBenchmarksOnFlag();
501 return rv;
502 }
503