1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <folly/test/DeterministicSchedule.h>
18
19 #include <algorithm>
20 #include <cassert>
21 #include <list>
22 #include <mutex>
23 #include <random>
24 #include <unordered_map>
25 #include <utility>
26
27 #include <folly/Random.h>
28 #include <folly/SingletonThreadLocal.h>
29
30 namespace folly {
31 namespace test {
32
33 using Sem = DeterministicSchedule::Sem;
34
35 AuxChk DeterministicSchedule::aux_chk;
36
37 // access is protected by futexLock
38 static std::unordered_map<
39 const detail::Futex<DeterministicAtomic>*,
40 std::list<std::pair<uint32_t, bool*>>>
41 futexQueues;
42
43 static std::mutex futexLock;
44
sync(const ThreadTimestamps & src)45 void ThreadTimestamps::sync(const ThreadTimestamps& src) {
46 if (src.timestamps_.size() > timestamps_.size()) {
47 timestamps_.resize(src.timestamps_.size());
48 }
49 for (size_t i = 0; i < src.timestamps_.size(); i++) {
50 timestamps_[i].sync(src.timestamps_[i]);
51 }
52 }
53
advance(DSchedThreadId tid)54 DSchedTimestamp ThreadTimestamps::advance(DSchedThreadId tid) {
55 assert(timestamps_.size() > tid.val);
56 return timestamps_[tid.val].advance();
57 }
58
setIfNotPresent(DSchedThreadId tid,DSchedTimestamp ts)59 void ThreadTimestamps::setIfNotPresent(DSchedThreadId tid, DSchedTimestamp ts) {
60 assert(ts.initialized());
61 if (tid.val >= timestamps_.size()) {
62 timestamps_.resize(tid.val + 1);
63 }
64 if (!timestamps_[tid.val].initialized()) {
65 timestamps_[tid.val].sync(ts);
66 }
67 }
68
clear()69 void ThreadTimestamps::clear() {
70 timestamps_.clear();
71 }
72
atLeastAsRecentAs(DSchedThreadId tid,DSchedTimestamp ts) const73 bool ThreadTimestamps::atLeastAsRecentAs(
74 DSchedThreadId tid, DSchedTimestamp ts) const {
75 // It is not meaningful learn whether any instance is at least
76 // as recent as timestamp 0.
77 assert(ts.initialized());
78 if (tid.val >= timestamps_.size()) {
79 return false;
80 }
81 return timestamps_[tid.val].atLeastAsRecentAs(ts);
82 }
83
atLeastAsRecentAsAny(const ThreadTimestamps & src) const84 bool ThreadTimestamps::atLeastAsRecentAsAny(const ThreadTimestamps& src) const {
85 size_t min = timestamps_.size() < src.timestamps_.size()
86 ? timestamps_.size()
87 : src.timestamps_.size();
88 for (size_t i = 0; i < min; i++) {
89 if (src.timestamps_[i].initialized() &&
90 timestamps_[i].atLeastAsRecentAs(src.timestamps_[i])) {
91 return true;
92 }
93 }
94 return false;
95 }
96
acquire()97 void ThreadSyncVar::acquire() {
98 ThreadInfo& threadInfo = DeterministicSchedule::getCurrentThreadInfo();
99 DSchedThreadId tid = DeterministicSchedule::getThreadId();
100 threadInfo.acqRelOrder_.advance(tid);
101 threadInfo.acqRelOrder_.sync(order_);
102 }
103
release()104 void ThreadSyncVar::release() {
105 ThreadInfo& threadInfo = DeterministicSchedule::getCurrentThreadInfo();
106 DSchedThreadId tid = DeterministicSchedule::getThreadId();
107 threadInfo.acqRelOrder_.advance(tid);
108 order_.sync(threadInfo.acqRelOrder_);
109 }
110
acq_rel()111 void ThreadSyncVar::acq_rel() {
112 ThreadInfo& threadInfo = DeterministicSchedule::getCurrentThreadInfo();
113 DSchedThreadId tid = DeterministicSchedule::getThreadId();
114 threadInfo.acqRelOrder_.advance(tid);
115 threadInfo.acqRelOrder_.sync(order_);
116 order_.sync(threadInfo.acqRelOrder_);
117 }
118
119 namespace {
120
121 struct PerThreadState {
122 // delete the constructors and assignment operators for sanity
123 //
124 // but... we can't delete the move constructor and assignment operators
125 // because those are required before C++17 in the implementation of
126 // SingletonThreadLocal
127 PerThreadState(const PerThreadState&) = delete;
128 PerThreadState& operator=(const PerThreadState&) = delete;
129 PerThreadState(PerThreadState&&) = default;
130 PerThreadState& operator=(PerThreadState&&) = default;
131 PerThreadState() = default;
132
133 Sem* sem{nullptr};
134 DeterministicSchedule* sched{nullptr};
135 bool exiting{false};
136 DSchedThreadId threadId{};
137 AuxAct aux_act{};
138 };
139 using TLState = SingletonThreadLocal<PerThreadState>;
140
141 } // namespace
142
DeterministicSchedule(std::function<size_t (size_t)> scheduler)143 DeterministicSchedule::DeterministicSchedule(
144 std::function<size_t(size_t)> scheduler)
145 : scheduler_(std::move(scheduler)), nextThreadId_(0), step_(0) {
146 auto& tls = TLState::get();
147 assert(tls.sem == nullptr);
148 assert(tls.sched == nullptr);
149 assert(tls.aux_act == nullptr);
150
151 tls.exiting = false;
152 tls.sem = new Sem(true);
153 sems_.push_back(tls.sem);
154
155 tls.threadId = nextThreadId_++;
156 threadInfoMap_.emplace_back(tls.threadId);
157 tls.sched = this;
158 }
159
~DeterministicSchedule()160 DeterministicSchedule::~DeterministicSchedule() {
161 auto& tls = TLState::get();
162 static_cast<void>(tls);
163 assert(tls.sched == this);
164 assert(sems_.size() == 1);
165 assert(sems_[0] == tls.sem);
166 delete tls.sem;
167 tls = PerThreadState();
168 }
169
uniform(uint64_t seed)170 std::function<size_t(size_t)> DeterministicSchedule::uniform(uint64_t seed) {
171 auto rand = std::make_shared<std::ranlux48>(seed);
172 return [rand](size_t numActive) {
173 auto dist = std::uniform_int_distribution<size_t>(0, numActive - 1);
174 return dist(*rand);
175 };
176 }
177
178 struct UniformSubset {
UniformSubsetfolly::test::UniformSubset179 UniformSubset(uint64_t seed, size_t subsetSize, size_t stepsBetweenSelect)
180 : uniform_(DeterministicSchedule::uniform(seed)),
181 subsetSize_(subsetSize),
182 stepsBetweenSelect_(stepsBetweenSelect),
183 stepsLeft_(0) {}
184
operator ()folly::test::UniformSubset185 size_t operator()(size_t numActive) {
186 adjustPermSize(numActive);
187 if (stepsLeft_-- == 0) {
188 stepsLeft_ = stepsBetweenSelect_ - 1;
189 shufflePrefix();
190 }
191 return perm_[uniform_(std::min(numActive, subsetSize_))];
192 }
193
194 private:
195 std::function<size_t(size_t)> uniform_;
196 const size_t subsetSize_;
197 const size_t stepsBetweenSelect_;
198
199 size_t stepsLeft_;
200 // only the first subsetSize_ is properly randomized
201 std::vector<size_t> perm_;
202
adjustPermSizefolly::test::UniformSubset203 void adjustPermSize(size_t numActive) {
204 if (perm_.size() > numActive) {
205 perm_.erase(
206 std::remove_if(
207 perm_.begin(),
208 perm_.end(),
209 [=](size_t x) { return x >= numActive; }),
210 perm_.end());
211 } else {
212 while (perm_.size() < numActive) {
213 perm_.push_back(perm_.size());
214 }
215 }
216 assert(perm_.size() == numActive);
217 }
218
shufflePrefixfolly::test::UniformSubset219 void shufflePrefix() {
220 for (size_t i = 0; i < std::min(perm_.size() - 1, subsetSize_); ++i) {
221 size_t j = uniform_(perm_.size() - i) + i;
222 std::swap(perm_[i], perm_[j]);
223 }
224 }
225 };
226
isCurrentThreadExiting()227 bool DeterministicSchedule::isCurrentThreadExiting() {
228 auto& tls = TLState::get();
229 return tls.exiting;
230 }
231
isActive()232 bool DeterministicSchedule::isActive() {
233 auto& tls = TLState::get();
234 return tls.sched != nullptr;
235 }
236
getThreadId()237 DSchedThreadId DeterministicSchedule::getThreadId() {
238 auto& tls = TLState::get();
239 assert(tls.sched != nullptr);
240 return tls.threadId;
241 }
242
getCurrentSchedule()243 DeterministicSchedule* DeterministicSchedule::getCurrentSchedule() {
244 auto& tls = TLState::get();
245 return tls.sched;
246 }
247
uniformSubset(uint64_t seed,size_t n,size_t m)248 std::function<size_t(size_t)> DeterministicSchedule::uniformSubset(
249 uint64_t seed, size_t n, size_t m) {
250 auto gen = std::make_shared<UniformSubset>(seed, n, m);
251 return [=](size_t numActive) { return (*gen)(numActive); };
252 }
253
beforeSharedAccess()254 void DeterministicSchedule::beforeSharedAccess() {
255 auto& tls = TLState::get();
256 if (tls.sem) {
257 tls.sem->wait();
258 }
259 }
260
afterSharedAccess()261 void DeterministicSchedule::afterSharedAccess() {
262 auto& tls = TLState::get();
263 auto sched = tls.sched;
264 if (!sched) {
265 return;
266 }
267 sched->sems_[sched->scheduler_(sched->sems_.size())]->post();
268 }
269
afterSharedAccess(bool success)270 void DeterministicSchedule::afterSharedAccess(bool success) {
271 auto& tls = TLState::get();
272 auto sched = tls.sched;
273 if (!sched) {
274 return;
275 }
276 sched->callAux(success);
277 sched->sems_[sched->scheduler_(sched->sems_.size())]->post();
278 }
279
getRandNumber(size_t n)280 size_t DeterministicSchedule::getRandNumber(size_t n) {
281 auto& tls = TLState::get();
282 if (tls.sched) {
283 return tls.sched->scheduler_(n);
284 }
285 return Random::rand32() % n;
286 }
287
getcpu(unsigned * cpu,unsigned * node,void *)288 int DeterministicSchedule::getcpu(
289 unsigned* cpu, unsigned* node, void* /* unused */) {
290 auto& tls = TLState::get();
291 if (cpu) {
292 *cpu = tls.threadId.val;
293 }
294 if (node) {
295 *node = tls.threadId.val;
296 }
297 return 0;
298 }
299
setAuxAct(AuxAct & aux)300 void DeterministicSchedule::setAuxAct(AuxAct& aux) {
301 auto& tls = TLState::get();
302 tls.aux_act = aux;
303 }
304
setAuxChk(AuxChk & aux)305 void DeterministicSchedule::setAuxChk(AuxChk& aux) {
306 aux_chk = aux;
307 }
308
clearAuxChk()309 void DeterministicSchedule::clearAuxChk() {
310 aux_chk = nullptr;
311 }
312
reschedule(Sem * sem)313 void DeterministicSchedule::reschedule(Sem* sem) {
314 auto& tls = TLState::get();
315 auto sched = tls.sched;
316 if (sched) {
317 sched->sems_.push_back(sem);
318 }
319 }
320
descheduleCurrentThread()321 Sem* DeterministicSchedule::descheduleCurrentThread() {
322 auto& tls = TLState::get();
323 auto sched = tls.sched;
324 if (sched) {
325 sched->sems_.erase(
326 std::find(sched->sems_.begin(), sched->sems_.end(), tls.sem));
327 }
328 return tls.sem;
329 }
330
beforeThreadCreate()331 Sem* DeterministicSchedule::beforeThreadCreate() {
332 Sem* s = new Sem(false);
333 beforeSharedAccess();
334 sems_.push_back(s);
335 afterSharedAccess();
336 return s;
337 }
338
afterThreadCreate(Sem * sem)339 void DeterministicSchedule::afterThreadCreate(Sem* sem) {
340 auto& tls = TLState::get();
341 assert(tls.sem == nullptr);
342 assert(tls.sched == nullptr);
343 tls.exiting = false;
344 tls.sem = sem;
345 tls.sched = this;
346 bool started = false;
347 while (!started) {
348 beforeSharedAccess();
349 if (active_.count(std::this_thread::get_id()) == 1) {
350 started = true;
351 tls.threadId = nextThreadId_++;
352 assert(tls.threadId.val == threadInfoMap_.size());
353 threadInfoMap_.emplace_back(tls.threadId);
354 }
355 afterSharedAccess();
356 }
357 atomic_thread_fence(std::memory_order_seq_cst);
358 }
359
beforeThreadExit()360 void DeterministicSchedule::beforeThreadExit() {
361 auto& tls = TLState::get();
362 assert(tls.sched == this);
363
364 atomic_thread_fence(std::memory_order_seq_cst);
365 beforeSharedAccess();
366 auto parent = joins_.find(std::this_thread::get_id());
367 if (parent != joins_.end()) {
368 reschedule(parent->second);
369 joins_.erase(parent);
370 }
371 descheduleCurrentThread();
372 active_.erase(std::this_thread::get_id());
373
374 FOLLY_TEST_DSCHED_VLOG("exiting");
375 exitingSems_[std::this_thread::get_id()] = tls.sem;
376 afterSharedAccess();
377 // Wait for the parent thread to allow us to run thread-local destructors.
378 tls.sem->wait();
379 delete tls.sem;
380 tls = PerThreadState();
381 }
382
waitForBeforeThreadExit(std::thread & child)383 void DeterministicSchedule::waitForBeforeThreadExit(std::thread& child) {
384 auto& tls = TLState::get();
385 assert(tls.sched == this);
386 beforeSharedAccess();
387 assert(tls.sched->joins_.count(child.get_id()) == 0);
388 if (tls.sched->active_.count(child.get_id())) {
389 Sem* sem = descheduleCurrentThread();
390 tls.sched->joins_.insert({child.get_id(), sem});
391 afterSharedAccess();
392 // Wait to be scheduled by exiting child thread
393 beforeSharedAccess();
394 assert(!tls.sched->active_.count(child.get_id()));
395 }
396 afterSharedAccess();
397 }
398
joinAll(std::vector<std::thread> & children)399 void DeterministicSchedule::joinAll(std::vector<std::thread>& children) {
400 auto& tls = TLState::get();
401 auto sched = tls.sched;
402 if (sched) {
403 // Wait until all children are about to exit
404 for (auto& child : children) {
405 sched->waitForBeforeThreadExit(child);
406 }
407 }
408 atomic_thread_fence(std::memory_order_seq_cst);
409 /* Let each child thread proceed one at a time to protect
410 * shared access during thread local destructors.*/
411 for (auto& child : children) {
412 if (sched) {
413 beforeSharedAccess();
414 sched->exitingSems_[child.get_id()]->post();
415 }
416 child.join();
417 if (sched) {
418 afterSharedAccess();
419 }
420 }
421 }
422
join(std::thread & child)423 void DeterministicSchedule::join(std::thread& child) {
424 auto& tls = TLState::get();
425 auto sched = tls.sched;
426 if (sched) {
427 sched->waitForBeforeThreadExit(child);
428 }
429 atomic_thread_fence(std::memory_order_seq_cst);
430 FOLLY_TEST_DSCHED_VLOG("joined " << std::hex << child.get_id());
431 if (sched) {
432 beforeSharedAccess();
433 sched->exitingSems_[child.get_id()]->post();
434 }
435 child.join();
436 if (sched) {
437 afterSharedAccess();
438 }
439 }
440
callAux(bool success)441 void DeterministicSchedule::callAux(bool success) {
442 auto& tls = TLState::get();
443 ++step_;
444 if (tls.aux_act) {
445 tls.aux_act(success);
446 tls.aux_act = nullptr;
447 }
448 if (aux_chk) {
449 aux_chk(step_);
450 }
451 }
452
453 static std::unordered_map<Sem*, std::unique_ptr<ThreadSyncVar>> semSyncVar;
454
post(Sem * sem)455 void DeterministicSchedule::post(Sem* sem) {
456 beforeSharedAccess();
457 if (semSyncVar.count(sem) == 0) {
458 semSyncVar[sem] = std::make_unique<ThreadSyncVar>();
459 }
460 semSyncVar[sem]->release();
461 sem->post();
462 FOLLY_TEST_DSCHED_VLOG("sem->post() [sem=" << sem << "]");
463 afterSharedAccess();
464 }
465
tryWait(Sem * sem)466 bool DeterministicSchedule::tryWait(Sem* sem) {
467 beforeSharedAccess();
468 if (semSyncVar.count(sem) == 0) {
469 semSyncVar[sem] = std::make_unique<ThreadSyncVar>();
470 }
471
472 bool acquired = sem->try_wait();
473 bool acquired_s = acquired ? "true" : "false";
474 FOLLY_TEST_DSCHED_VLOG(
475 "sem->try_wait() [sem=" << sem << "] -> " << acquired_s);
476 if (acquired) {
477 semSyncVar[sem]->acq_rel();
478 } else {
479 semSyncVar[sem]->acquire();
480 }
481
482 afterSharedAccess();
483 return acquired;
484 }
485
wait(Sem * sem)486 void DeterministicSchedule::wait(Sem* sem) {
487 while (!tryWait(sem)) {
488 // we're not busy waiting because this is a deterministic schedule
489 }
490 }
491
getCurrentThreadInfo()492 ThreadInfo& DeterministicSchedule::getCurrentThreadInfo() {
493 auto& tls = TLState::get();
494 auto sched = tls.sched;
495 assert(sched);
496 assert(tls.threadId.val < sched->threadInfoMap_.size());
497 return sched->threadInfoMap_[tls.threadId.val];
498 }
499
atomic_thread_fence(std::memory_order mo)500 void DeterministicSchedule::atomic_thread_fence(std::memory_order mo) {
501 auto& tls = TLState::get();
502 if (!tls.sched) {
503 std::atomic_thread_fence(mo);
504 return;
505 }
506 beforeSharedAccess();
507 ThreadInfo& threadInfo = getCurrentThreadInfo();
508 switch (mo) {
509 case std::memory_order_relaxed:
510 assert(false);
511 break;
512 case std::memory_order_consume:
513 case std::memory_order_acquire:
514 threadInfo.acqRelOrder_.sync(threadInfo.acqFenceOrder_);
515 break;
516 case std::memory_order_release:
517 threadInfo.relFenceOrder_.sync(threadInfo.acqRelOrder_);
518 break;
519 case std::memory_order_acq_rel:
520 threadInfo.acqRelOrder_.sync(threadInfo.acqFenceOrder_);
521 threadInfo.relFenceOrder_.sync(threadInfo.acqRelOrder_);
522 break;
523 case std::memory_order_seq_cst:
524 threadInfo.acqRelOrder_.sync(threadInfo.acqFenceOrder_);
525 threadInfo.acqRelOrder_.sync(tls.sched->seqCstFenceOrder_);
526 tls.sched->seqCstFenceOrder_ = threadInfo.acqRelOrder_;
527 threadInfo.relFenceOrder_.sync(threadInfo.acqRelOrder_);
528 break;
529 }
530 FOLLY_TEST_DSCHED_VLOG("fence: " << folly::detail::memory_order_to_str(mo));
531 afterSharedAccess();
532 }
533
futexWaitImpl(const detail::Futex<DeterministicAtomic> * futex,uint32_t expected,std::chrono::system_clock::time_point const * absSystemTimeout,std::chrono::steady_clock::time_point const * absSteadyTimeout,uint32_t waitMask)534 detail::FutexResult futexWaitImpl(
535 const detail::Futex<DeterministicAtomic>* futex,
536 uint32_t expected,
537 std::chrono::system_clock::time_point const* absSystemTimeout,
538 std::chrono::steady_clock::time_point const* absSteadyTimeout,
539 uint32_t waitMask) {
540 return deterministicFutexWaitImpl<DeterministicAtomic>(
541 futex,
542 futexLock,
543 futexQueues,
544 expected,
545 absSystemTimeout,
546 absSteadyTimeout,
547 waitMask);
548 }
549
futexWakeImpl(const detail::Futex<DeterministicAtomic> * futex,int count,uint32_t wakeMask)550 int futexWakeImpl(
551 const detail::Futex<DeterministicAtomic>* futex,
552 int count,
553 uint32_t wakeMask) {
554 return deterministicFutexWakeImpl<DeterministicAtomic>(
555 futex, futexLock, futexQueues, count, wakeMask);
556 }
557
558 } // namespace test
559 } // namespace folly
560
561 namespace folly {
562
563 template <>
system()564 CacheLocality const& CacheLocality::system<test::DeterministicAtomic>() {
565 static CacheLocality cache(CacheLocality::uniform(16));
566 return cache;
567 }
568
569 template <>
pickGetcpuFunc()570 Getcpu::Func AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc() {
571 return &test::DeterministicSchedule::getcpu;
572 }
573 } // namespace folly
574