1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/synchronization/test/Semaphore.h>
18 
19 #include <array>
20 #include <numeric>
21 #include <thread>
22 #include <vector>
23 
24 #include <glog/logging.h>
25 
26 #include <folly/Traits.h>
27 #include <folly/portability/GTest.h>
28 #include <folly/portability/SysMman.h>
29 #include <folly/synchronization/Latch.h>
30 #include <folly/synchronization/test/Barrier.h>
31 
32 using namespace folly::test;
33 
34 namespace {
35 
36 template <SemaphoreWakePolicy WakePolicy>
wake_policy(PolicySemaphore<WakePolicy> const &)37 auto wake_policy(PolicySemaphore<WakePolicy> const&) {
38   return WakePolicy;
39 }
40 
41 template <typename Sem>
test_basic()42 void test_basic() {
43   Sem sem;
44   EXPECT_FALSE(sem.try_wait());
45   sem.post();
46   EXPECT_TRUE(sem.try_wait());
47   sem.post();
48   sem.wait();
49 }
50 
51 template <typename Sem>
test_handoff_destruction()52 void test_handoff_destruction() {
53   //  regression to check for race:
54   //  * poster thread calls Sem::post()
55   //  * waiter thread calls Sem::wait() and then dtor+free
56   //  strategy: mprotect the sem page after dtor; racing post() will segv
57   //  alternate strategy: overwrite the former sem object, but non-portable
58 
59   constexpr auto const nthreads = 1ull << 4;
60   constexpr auto const rounds = 1ull << 6;
61 
62   std::array<size_t, nthreads> waits{};
63   for (auto r = 0ull; r < rounds; ++r) {
64     std::array<void*, nthreads> sems{};
65     std::array<std::thread, nthreads> threads;
66     folly::Latch ready(nthreads);
67     for (auto thi = 0ull; thi < nthreads; ++thi) {
68       sems[thi] = mmap(
69           nullptr,
70           sizeof(Sem),
71           PROT_READ | PROT_WRITE,
72           MAP_PRIVATE | MAP_ANONYMOUS,
73           -1,
74           0);
75       PCHECK((void*)-1 != sems[thi]);
76       auto sem_ = new (sems[thi]) Sem(0);
77       threads[thi] = std::thread([&, thi, sem_] {
78         auto& sem = *reinterpret_cast<Sem*>(sem_);
79         sem.wait([&] { ready.count_down(); }, [&, thi] { ++waits[thi]; });
80         sem.~Sem();
81         mprotect(sem_, sizeof(Sem), PROT_NONE);
82       });
83     }
84     ready.wait();
85     for (auto sem_ : sems) {
86       auto& sem = *reinterpret_cast<Sem*>(sem_);
87       sem.post();
88     }
89     for (auto thi = 0ull; thi < nthreads; ++thi) {
90       threads[thi].join();
91       munmap(sems[thi], sizeof(Sem));
92     }
93   }
94   auto const allwaits = std::accumulate(waits.begin(), waits.end(), size_t(0));
95   EXPECT_EQ(nthreads * rounds, allwaits);
96 }
97 
98 template <typename Sem>
test_wake_policy()99 void test_wake_policy() {
100   constexpr auto const nthreads = 16ull;
101   constexpr auto const rounds = 1ull << 4;
102 
103   Sem sem;
104   std::array<std::thread, nthreads> threads;
105   for (auto i = 0ull; i < rounds; ++i) {
106     std::vector<uint64_t> wait_seq;
107     std::vector<uint64_t> wake_seq;
108     folly::Latch ready(nthreads); // first nthreads waits, then nthreads posts
109     for (auto thi = 0ull; thi < nthreads; ++thi) {
110       threads[thi] = std::thread([&, thi] {
111         sem.wait(
112             [&, thi] { wait_seq.push_back(thi), ready.count_down(); },
113             [&, thi] { wake_seq.push_back(thi); });
114       });
115     }
116     ready.wait(); // first nthreads waits, then nthreads posts
117     for (auto thi = 0ull; thi < nthreads; ++thi) {
118       sem.post();
119     }
120     for (auto thi = 0ull; thi < nthreads; ++thi) {
121       threads[thi].join();
122     }
123     EXPECT_EQ(nthreads, wait_seq.size());
124     EXPECT_EQ(nthreads, wake_seq.size());
125     switch (wake_policy(sem)) {
126       case SemaphoreWakePolicy::Fifo:
127         break;
128       case SemaphoreWakePolicy::Lifo:
129         std::reverse(wake_seq.begin(), wake_seq.end());
130         break;
131     }
132     EXPECT_EQ(wait_seq, wake_seq);
133   }
134 }
135 
136 template <typename Sem>
test_multi_ping_pong()137 void test_multi_ping_pong() {
138   constexpr auto const nthreads = 4ull;
139   constexpr auto const iters = 1ull << 12;
140 
141   Sem sem;
142   std::array<std::thread, nthreads> threads;
143   size_t waits_before = 0;
144   size_t waits_after = 0;
145   size_t posts = 0;
146 
147   for (auto& th : threads) {
148     th = std::thread([&] {
149       for (auto i = 0ull; i < iters; ++i) {
150         sem.wait([&] { ++waits_before; }, [&] { ++waits_after; });
151         sem.post([&] { ++posts; });
152       }
153     });
154   }
155 
156   sem.post(); // start the flood
157 
158   for (auto& thr : threads) {
159     thr.join();
160   }
161 
162   sem.wait();
163   EXPECT_FALSE(sem.try_wait());
164   EXPECT_EQ(iters * nthreads, waits_before);
165   EXPECT_EQ(iters * nthreads, waits_after);
166   EXPECT_EQ(iters * nthreads, posts);
167 }
168 
169 template <typename Sem>
test_concurrent_split_waiters_posters()170 void test_concurrent_split_waiters_posters() {
171   constexpr auto const nthreads = 4ull;
172   constexpr auto const iters = 1ull << 12;
173 
174   Sem sem;
175   Barrier barrier(nthreads * 2);
176   std::array<std::thread, nthreads> posters;
177   std::array<std::thread, nthreads> waiters;
178 
179   for (auto& th : posters) {
180     th = std::thread([&] {
181       barrier.wait();
182       for (auto i = 0ull; i < iters; ++i) {
183         if (i % (iters >> 4) == 0) {
184           std::this_thread::yield();
185         }
186         sem.post();
187       }
188     });
189   }
190   for (auto& th : waiters) {
191     th = std::thread([&] {
192       barrier.wait();
193       for (auto i = 0ull; i < iters; ++i) {
194         sem.wait();
195       }
196     });
197   }
198 
199   for (auto& th : posters) {
200     th.join();
201   }
202   for (auto& th : waiters) {
203     th.join();
204   }
205 
206   EXPECT_FALSE(sem.try_wait());
207 }
208 
209 } // namespace
210 
211 class SemaphoreTest : public testing::Test {};
212 
TEST_F(SemaphoreTest,basic)213 TEST_F(SemaphoreTest, basic) {
214   test_basic<Semaphore>();
215 }
216 
TEST_F(SemaphoreTest,multi_ping_pong)217 TEST_F(SemaphoreTest, multi_ping_pong) {
218   test_multi_ping_pong<Semaphore>();
219 }
220 
TEST_F(SemaphoreTest,concurrent_split_waiters_posters)221 TEST_F(SemaphoreTest, concurrent_split_waiters_posters) {
222   test_concurrent_split_waiters_posters<Semaphore>();
223 }
224 
TEST_F(SemaphoreTest,handoff)225 TEST_F(SemaphoreTest, handoff) {
226   test_handoff_destruction<Semaphore>();
227 }
228 
229 class FifoSemaphoreTest : public testing::Test {};
230 
TEST_F(FifoSemaphoreTest,basic)231 TEST_F(FifoSemaphoreTest, basic) {
232   test_basic<FifoSemaphore>();
233 }
234 
TEST_F(FifoSemaphoreTest,wake_policy)235 TEST_F(FifoSemaphoreTest, wake_policy) {
236   test_wake_policy<FifoSemaphore>();
237 }
238 
TEST_F(FifoSemaphoreTest,multi_ping_pong)239 TEST_F(FifoSemaphoreTest, multi_ping_pong) {
240   test_multi_ping_pong<FifoSemaphore>();
241 }
242 
TEST_F(FifoSemaphoreTest,concurrent_split_waiters_posters)243 TEST_F(FifoSemaphoreTest, concurrent_split_waiters_posters) {
244   test_concurrent_split_waiters_posters<FifoSemaphore>();
245 }
246 
TEST_F(FifoSemaphoreTest,handoff)247 TEST_F(FifoSemaphoreTest, handoff) {
248   test_handoff_destruction<FifoSemaphore>();
249 }
250 
251 class LifoSemaphoreTest : public testing::Test {};
252 
TEST_F(LifoSemaphoreTest,basic)253 TEST_F(LifoSemaphoreTest, basic) {
254   test_basic<LifoSemaphore>();
255 }
256 
TEST_F(LifoSemaphoreTest,wake_policy)257 TEST_F(LifoSemaphoreTest, wake_policy) {
258   test_wake_policy<LifoSemaphore>();
259 }
260 
TEST_F(LifoSemaphoreTest,multi_ping_pong)261 TEST_F(LifoSemaphoreTest, multi_ping_pong) {
262   test_multi_ping_pong<LifoSemaphore>();
263 }
264 
TEST_F(LifoSemaphoreTest,concurrent_split_waiters_posters)265 TEST_F(LifoSemaphoreTest, concurrent_split_waiters_posters) {
266   test_concurrent_split_waiters_posters<LifoSemaphore>();
267 }
268 
TEST_F(LifoSemaphoreTest,handoff)269 TEST_F(LifoSemaphoreTest, handoff) {
270   test_handoff_destruction<LifoSemaphore>();
271 }
272