1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <folly/synchronization/test/Semaphore.h>
18
19 #include <array>
20 #include <numeric>
21 #include <thread>
22 #include <vector>
23
24 #include <glog/logging.h>
25
26 #include <folly/Traits.h>
27 #include <folly/portability/GTest.h>
28 #include <folly/portability/SysMman.h>
29 #include <folly/synchronization/Latch.h>
30 #include <folly/synchronization/test/Barrier.h>
31
32 using namespace folly::test;
33
34 namespace {
35
36 template <SemaphoreWakePolicy WakePolicy>
wake_policy(PolicySemaphore<WakePolicy> const &)37 auto wake_policy(PolicySemaphore<WakePolicy> const&) {
38 return WakePolicy;
39 }
40
41 template <typename Sem>
test_basic()42 void test_basic() {
43 Sem sem;
44 EXPECT_FALSE(sem.try_wait());
45 sem.post();
46 EXPECT_TRUE(sem.try_wait());
47 sem.post();
48 sem.wait();
49 }
50
51 template <typename Sem>
test_handoff_destruction()52 void test_handoff_destruction() {
53 // regression to check for race:
54 // * poster thread calls Sem::post()
55 // * waiter thread calls Sem::wait() and then dtor+free
56 // strategy: mprotect the sem page after dtor; racing post() will segv
57 // alternate strategy: overwrite the former sem object, but non-portable
58
59 constexpr auto const nthreads = 1ull << 4;
60 constexpr auto const rounds = 1ull << 6;
61
62 std::array<size_t, nthreads> waits{};
63 for (auto r = 0ull; r < rounds; ++r) {
64 std::array<void*, nthreads> sems{};
65 std::array<std::thread, nthreads> threads;
66 folly::Latch ready(nthreads);
67 for (auto thi = 0ull; thi < nthreads; ++thi) {
68 sems[thi] = mmap(
69 nullptr,
70 sizeof(Sem),
71 PROT_READ | PROT_WRITE,
72 MAP_PRIVATE | MAP_ANONYMOUS,
73 -1,
74 0);
75 PCHECK((void*)-1 != sems[thi]);
76 auto sem_ = new (sems[thi]) Sem(0);
77 threads[thi] = std::thread([&, thi, sem_] {
78 auto& sem = *reinterpret_cast<Sem*>(sem_);
79 sem.wait([&] { ready.count_down(); }, [&, thi] { ++waits[thi]; });
80 sem.~Sem();
81 mprotect(sem_, sizeof(Sem), PROT_NONE);
82 });
83 }
84 ready.wait();
85 for (auto sem_ : sems) {
86 auto& sem = *reinterpret_cast<Sem*>(sem_);
87 sem.post();
88 }
89 for (auto thi = 0ull; thi < nthreads; ++thi) {
90 threads[thi].join();
91 munmap(sems[thi], sizeof(Sem));
92 }
93 }
94 auto const allwaits = std::accumulate(waits.begin(), waits.end(), size_t(0));
95 EXPECT_EQ(nthreads * rounds, allwaits);
96 }
97
98 template <typename Sem>
test_wake_policy()99 void test_wake_policy() {
100 constexpr auto const nthreads = 16ull;
101 constexpr auto const rounds = 1ull << 4;
102
103 Sem sem;
104 std::array<std::thread, nthreads> threads;
105 for (auto i = 0ull; i < rounds; ++i) {
106 std::vector<uint64_t> wait_seq;
107 std::vector<uint64_t> wake_seq;
108 folly::Latch ready(nthreads); // first nthreads waits, then nthreads posts
109 for (auto thi = 0ull; thi < nthreads; ++thi) {
110 threads[thi] = std::thread([&, thi] {
111 sem.wait(
112 [&, thi] { wait_seq.push_back(thi), ready.count_down(); },
113 [&, thi] { wake_seq.push_back(thi); });
114 });
115 }
116 ready.wait(); // first nthreads waits, then nthreads posts
117 for (auto thi = 0ull; thi < nthreads; ++thi) {
118 sem.post();
119 }
120 for (auto thi = 0ull; thi < nthreads; ++thi) {
121 threads[thi].join();
122 }
123 EXPECT_EQ(nthreads, wait_seq.size());
124 EXPECT_EQ(nthreads, wake_seq.size());
125 switch (wake_policy(sem)) {
126 case SemaphoreWakePolicy::Fifo:
127 break;
128 case SemaphoreWakePolicy::Lifo:
129 std::reverse(wake_seq.begin(), wake_seq.end());
130 break;
131 }
132 EXPECT_EQ(wait_seq, wake_seq);
133 }
134 }
135
136 template <typename Sem>
test_multi_ping_pong()137 void test_multi_ping_pong() {
138 constexpr auto const nthreads = 4ull;
139 constexpr auto const iters = 1ull << 12;
140
141 Sem sem;
142 std::array<std::thread, nthreads> threads;
143 size_t waits_before = 0;
144 size_t waits_after = 0;
145 size_t posts = 0;
146
147 for (auto& th : threads) {
148 th = std::thread([&] {
149 for (auto i = 0ull; i < iters; ++i) {
150 sem.wait([&] { ++waits_before; }, [&] { ++waits_after; });
151 sem.post([&] { ++posts; });
152 }
153 });
154 }
155
156 sem.post(); // start the flood
157
158 for (auto& thr : threads) {
159 thr.join();
160 }
161
162 sem.wait();
163 EXPECT_FALSE(sem.try_wait());
164 EXPECT_EQ(iters * nthreads, waits_before);
165 EXPECT_EQ(iters * nthreads, waits_after);
166 EXPECT_EQ(iters * nthreads, posts);
167 }
168
169 template <typename Sem>
test_concurrent_split_waiters_posters()170 void test_concurrent_split_waiters_posters() {
171 constexpr auto const nthreads = 4ull;
172 constexpr auto const iters = 1ull << 12;
173
174 Sem sem;
175 Barrier barrier(nthreads * 2);
176 std::array<std::thread, nthreads> posters;
177 std::array<std::thread, nthreads> waiters;
178
179 for (auto& th : posters) {
180 th = std::thread([&] {
181 barrier.wait();
182 for (auto i = 0ull; i < iters; ++i) {
183 if (i % (iters >> 4) == 0) {
184 std::this_thread::yield();
185 }
186 sem.post();
187 }
188 });
189 }
190 for (auto& th : waiters) {
191 th = std::thread([&] {
192 barrier.wait();
193 for (auto i = 0ull; i < iters; ++i) {
194 sem.wait();
195 }
196 });
197 }
198
199 for (auto& th : posters) {
200 th.join();
201 }
202 for (auto& th : waiters) {
203 th.join();
204 }
205
206 EXPECT_FALSE(sem.try_wait());
207 }
208
209 } // namespace
210
211 class SemaphoreTest : public testing::Test {};
212
TEST_F(SemaphoreTest,basic)213 TEST_F(SemaphoreTest, basic) {
214 test_basic<Semaphore>();
215 }
216
TEST_F(SemaphoreTest,multi_ping_pong)217 TEST_F(SemaphoreTest, multi_ping_pong) {
218 test_multi_ping_pong<Semaphore>();
219 }
220
TEST_F(SemaphoreTest,concurrent_split_waiters_posters)221 TEST_F(SemaphoreTest, concurrent_split_waiters_posters) {
222 test_concurrent_split_waiters_posters<Semaphore>();
223 }
224
TEST_F(SemaphoreTest,handoff)225 TEST_F(SemaphoreTest, handoff) {
226 test_handoff_destruction<Semaphore>();
227 }
228
229 class FifoSemaphoreTest : public testing::Test {};
230
TEST_F(FifoSemaphoreTest,basic)231 TEST_F(FifoSemaphoreTest, basic) {
232 test_basic<FifoSemaphore>();
233 }
234
TEST_F(FifoSemaphoreTest,wake_policy)235 TEST_F(FifoSemaphoreTest, wake_policy) {
236 test_wake_policy<FifoSemaphore>();
237 }
238
TEST_F(FifoSemaphoreTest,multi_ping_pong)239 TEST_F(FifoSemaphoreTest, multi_ping_pong) {
240 test_multi_ping_pong<FifoSemaphore>();
241 }
242
TEST_F(FifoSemaphoreTest,concurrent_split_waiters_posters)243 TEST_F(FifoSemaphoreTest, concurrent_split_waiters_posters) {
244 test_concurrent_split_waiters_posters<FifoSemaphore>();
245 }
246
TEST_F(FifoSemaphoreTest,handoff)247 TEST_F(FifoSemaphoreTest, handoff) {
248 test_handoff_destruction<FifoSemaphore>();
249 }
250
251 class LifoSemaphoreTest : public testing::Test {};
252
TEST_F(LifoSemaphoreTest,basic)253 TEST_F(LifoSemaphoreTest, basic) {
254 test_basic<LifoSemaphore>();
255 }
256
TEST_F(LifoSemaphoreTest,wake_policy)257 TEST_F(LifoSemaphoreTest, wake_policy) {
258 test_wake_policy<LifoSemaphore>();
259 }
260
TEST_F(LifoSemaphoreTest,multi_ping_pong)261 TEST_F(LifoSemaphoreTest, multi_ping_pong) {
262 test_multi_ping_pong<LifoSemaphore>();
263 }
264
TEST_F(LifoSemaphoreTest,concurrent_split_waiters_posters)265 TEST_F(LifoSemaphoreTest, concurrent_split_waiters_posters) {
266 test_concurrent_split_waiters_posters<LifoSemaphore>();
267 }
268
TEST_F(LifoSemaphoreTest,handoff)269 TEST_F(LifoSemaphoreTest, handoff) {
270 test_handoff_destruction<LifoSemaphore>();
271 }
272