1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #pragma once
18
19 #include <assert.h>
20 #include <errno.h>
21
22 #include <atomic>
23 #include <functional>
24 #include <list>
25 #include <mutex>
26 #include <queue>
27 #include <thread>
28 #include <unordered_set>
29 #include <vector>
30
31 #include <glog/logging.h>
32
33 #include <folly/ScopeGuard.h>
34 #include <folly/concurrency/CacheLocality.h>
35 #include <folly/detail/Futex.h>
36 #include <folly/lang/CustomizationPoint.h>
37 #include <folly/synchronization/AtomicNotification.h>
38 #include <folly/synchronization/detail/AtomicUtils.h>
39 #include <folly/synchronization/test/Semaphore.h>
40
41 namespace folly {
42 namespace test {
43
44 // This is ugly, but better perf for DeterministicAtomic translates
45 // directly to more states explored and tested
46 #define FOLLY_TEST_DSCHED_VLOG(...) \
47 do { \
48 if (false) { \
49 VLOG(2) << std::hex << std::this_thread::get_id() << ": " \
50 << __VA_ARGS__; \
51 } \
52 } while (false)
53
54 /* signatures of user-defined auxiliary functions */
55 using AuxAct = std::function<void(bool)>;
56 using AuxChk = std::function<void(uint64_t)>;
57
58 struct DSchedThreadId {
59 unsigned val;
DSchedThreadIdDSchedThreadId60 explicit constexpr DSchedThreadId() : val(0) {}
DSchedThreadIdDSchedThreadId61 explicit constexpr DSchedThreadId(unsigned v) : val(v) {}
62 unsigned operator=(unsigned v) { return val = v; }
63 };
64
65 class DSchedTimestamp {
66 public:
DSchedTimestamp()67 constexpr explicit DSchedTimestamp() : val_(0) {}
advance()68 DSchedTimestamp advance() { return DSchedTimestamp(++val_); }
atLeastAsRecentAs(const DSchedTimestamp & other)69 bool atLeastAsRecentAs(const DSchedTimestamp& other) const {
70 return val_ >= other.val_;
71 }
sync(const DSchedTimestamp & other)72 void sync(const DSchedTimestamp& other) { val_ = std::max(val_, other.val_); }
initialized()73 bool initialized() const { return val_ > 0; }
initial()74 static constexpr DSchedTimestamp initial() { return DSchedTimestamp(1); }
75
76 protected:
DSchedTimestamp(size_t v)77 constexpr explicit DSchedTimestamp(size_t v) : val_(v) {}
78
79 private:
80 size_t val_;
81 };
82
83 class ThreadTimestamps {
84 public:
85 void sync(const ThreadTimestamps& src);
86 DSchedTimestamp advance(DSchedThreadId tid);
87
88 void setIfNotPresent(DSchedThreadId tid, DSchedTimestamp ts);
89 void clear();
90 bool atLeastAsRecentAs(DSchedThreadId tid, DSchedTimestamp ts) const;
91 bool atLeastAsRecentAsAny(const ThreadTimestamps& src) const;
92
93 private:
94 std::vector<DSchedTimestamp> timestamps_;
95 };
96
97 struct ThreadInfo {
98 ThreadInfo() = delete;
ThreadInfoThreadInfo99 explicit ThreadInfo(DSchedThreadId tid) {
100 acqRelOrder_.setIfNotPresent(tid, DSchedTimestamp::initial());
101 }
102 ThreadTimestamps acqRelOrder_;
103 ThreadTimestamps acqFenceOrder_;
104 ThreadTimestamps relFenceOrder_;
105 };
106
107 class ThreadSyncVar {
108 public:
109 ThreadSyncVar() = default;
110
111 void acquire();
112 void release();
113 void acq_rel();
114
115 private:
116 ThreadTimestamps order_;
117 };
118
119 /**
120 * DeterministicSchedule coordinates the inter-thread communication of a
121 * set of threads under test, so that despite concurrency the execution is
122 * the same every time. It works by stashing a reference to the schedule
123 * in a thread-local variable, then blocking all but one thread at a time.
124 *
125 * In order for DeterministicSchedule to work, it needs to intercept
126 * all inter-thread communication. To do this you should use
127 * DeterministicAtomic<T> instead of std::atomic<T>, create threads
128 * using DeterministicSchedule::thread() instead of the std::thread
129 * constructor, DeterministicSchedule::join(thr) instead of thr.join(),
130 * and access semaphores via the helper functions in DeterministicSchedule.
131 * Locks are not yet supported, although they would be easy to add with
132 * the same strategy as the mapping of Sem::wait.
133 *
134 * The actual schedule is defined by a function from n -> [0,n). At
135 * each step, the function will be given the number of active threads
136 * (n), and it returns the index of the thread that should be run next.
137 * Invocations of the scheduler function will be serialized, but will
138 * occur from multiple threads. A good starting schedule is uniform(0).
139 */
140 class DeterministicSchedule {
141 public:
142 using Sem = Semaphore;
143
144 /**
145 * Arranges for the current thread (and all threads created by
146 * DeterministicSchedule::thread on a thread participating in this
147 * schedule) to participate in a deterministic schedule.
148 */
149 explicit DeterministicSchedule(std::function<size_t(size_t)> scheduler);
150
151 DeterministicSchedule(const DeterministicSchedule&) = delete;
152 DeterministicSchedule& operator=(const DeterministicSchedule&) = delete;
153
154 /** Completes the schedule. */
155 ~DeterministicSchedule();
156
157 /**
158 * Returns a scheduling function that randomly chooses one of the
159 * runnable threads at each step, with no history. This implements
160 * a schedule that is equivalent to one in which the steps between
161 * inter-thread communication are random variables following a poisson
162 * distribution.
163 */
164 static std::function<size_t(size_t)> uniform(uint64_t seed);
165
166 /**
167 * Returns a scheduling function that chooses a subset of the active
168 * threads and randomly chooses a member of the subset as the next
169 * runnable thread. The subset is chosen with size n, and the choice
170 * is made every m steps.
171 */
172 static std::function<size_t(size_t)> uniformSubset(
173 uint64_t seed, size_t n = 2, size_t m = 64);
174
175 /** Obtains permission for the current thread to perform inter-thread
176 * communication. */
177 static void beforeSharedAccess();
178
179 /** Releases permission for the current thread to perform inter-thread
180 * communication. */
181 static void afterSharedAccess();
182
183 /** Calls a user-defined auxiliary function if any, and releases
184 * permission for the current thread to perform inter-thread
185 * communication. The bool parameter indicates the success of the
186 * shared access (if conditional, true otherwise). */
187 static void afterSharedAccess(bool success);
188
189 /** Launches a thread that will participate in the same deterministic
190 * schedule as the current thread. */
191 template <typename Func, typename... Args>
thread(Func && func,Args &&...args)192 static inline std::thread thread(Func&& func, Args&&... args) {
193 // TODO: maybe future versions of gcc will allow forwarding to thread
194 atomic_thread_fence(std::memory_order_seq_cst);
195 auto sched = getCurrentSchedule();
196 auto sem = sched ? sched->beforeThreadCreate() : nullptr;
197 auto child = std::thread(
198 [=](Args... a) {
199 if (sched) {
200 sched->afterThreadCreate(sem);
201 beforeSharedAccess();
202 FOLLY_TEST_DSCHED_VLOG("running");
203 afterSharedAccess();
204 }
205 SCOPE_EXIT {
206 if (sched) {
207 sched->beforeThreadExit();
208 }
209 };
210 func(a...);
211 },
212 args...);
213 if (sched) {
214 beforeSharedAccess();
215 sched->active_.insert(child.get_id());
216 FOLLY_TEST_DSCHED_VLOG("forked " << std::hex << child.get_id());
217 afterSharedAccess();
218 }
219 return child;
220 }
221
222 /** Calls child.join() as part of a deterministic schedule. */
223 static void join(std::thread& child);
224
225 /** Waits for each thread in children to reach the end of their
226 * thread function without allowing them to fully terminate. Then,
227 * allow one child at a time to fully terminate and join each one.
228 * This functionality is important to protect shared access that can
229 * take place after beforeThreadExit() has already been invoked,
230 * for example when executing thread local destructors.
231 */
232 static void joinAll(std::vector<std::thread>& children);
233
234 /** Calls sem->post() as part of a deterministic schedule. */
235 static void post(Sem* sem);
236
237 /** Calls sem->try_wait() as part of a deterministic schedule, returning
238 * true on success and false on transient failure. */
239 static bool tryWait(Sem* sem);
240
241 /** Calls sem->wait() as part of a deterministic schedule. */
242 static void wait(Sem* sem);
243
244 /** Used scheduler_ to get a random number b/w [0, n). If tls_sched is
245 * not set-up it falls back to std::rand() */
246 static size_t getRandNumber(size_t n);
247
248 /** Deterministic implemencation of getcpu */
249 static int getcpu(unsigned* cpu, unsigned* node, void* unused);
250
251 /** Sets up a thread-specific function for call immediately after
252 * the next shared access by the thread for managing auxiliary
253 * data. The function takes a bool parameter that indicates the
254 * success of the shared access (if it is conditional, true
255 * otherwise). The function is cleared after one use. */
256 static void setAuxAct(AuxAct& aux);
257
258 /** Sets up a function to be called after every subsequent shared
259 * access (until clearAuxChk() is called) for checking global
260 * invariants and logging. The function takes a uint64_t parameter
261 * that indicates the number of shared accesses so far. */
262 static void setAuxChk(AuxChk& aux);
263
264 /** Clears the function set by setAuxChk */
265 static void clearAuxChk();
266
267 /** Remove the current thread's semaphore from sems_ */
268 static Sem* descheduleCurrentThread();
269
270 /** Returns true if the current thread has already completed
271 * the thread function, for example if the thread is executing
272 * thread local destructors. */
273 static bool isCurrentThreadExiting();
274
275 /** Add sem back into sems_ */
276 static void reschedule(Sem* sem);
277
278 static bool isActive();
279
280 static DSchedThreadId getThreadId();
281
282 static ThreadInfo& getCurrentThreadInfo();
283
284 static void atomic_thread_fence(std::memory_order mo);
285
286 private:
287 static DeterministicSchedule* getCurrentSchedule();
288
289 static AuxChk aux_chk;
290
291 std::function<size_t(size_t)> scheduler_;
292 std::vector<Sem*> sems_;
293 std::unordered_set<std::thread::id> active_;
294 std::unordered_map<std::thread::id, Sem*> joins_;
295 std::unordered_map<std::thread::id, Sem*> exitingSems_;
296
297 std::vector<ThreadInfo> threadInfoMap_;
298 ThreadTimestamps seqCstFenceOrder_;
299
300 unsigned nextThreadId_;
301 /* step_ keeps count of shared accesses that correspond to user
302 * synchronization steps (atomic accesses for now).
303 * The reason for keeping track of this here and not just with
304 * auxiliary data is to provide users with warning signs (e.g.,
305 * skipped steps) if they inadvertently forget to set up aux
306 * functions for some shared accesses. */
307 uint64_t step_;
308
309 Sem* beforeThreadCreate();
310 void afterThreadCreate(Sem*);
311 void beforeThreadExit();
312 void waitForBeforeThreadExit(std::thread& child);
313 /** Calls user-defined auxiliary function (if any) */
314 void callAux(bool);
315 };
316
317 /**
318 * DeterministicAtomic<T> is a drop-in replacement std::atomic<T> that
319 * cooperates with DeterministicSchedule.
320 */
321 template <
322 typename T,
323 typename Schedule = DeterministicSchedule,
324 template <typename> class Atom = std::atomic>
325 struct DeterministicAtomicImpl {
326 DeterministicAtomicImpl() = default;
327 ~DeterministicAtomicImpl() = default;
328 DeterministicAtomicImpl(DeterministicAtomicImpl<T> const&) = delete;
329 DeterministicAtomicImpl<T>& operator=(DeterministicAtomicImpl<T> const&) =
330 delete;
331
DeterministicAtomicImplDeterministicAtomicImpl332 constexpr /* implicit */ DeterministicAtomicImpl(T v) noexcept : data_(v) {}
333
is_lock_freeDeterministicAtomicImpl334 bool is_lock_free() const noexcept { return data_.is_lock_free(); }
335
336 bool compare_exchange_strong(
337 T& v0, T v1, std::memory_order mo = std::memory_order_seq_cst) noexcept {
338 return compare_exchange_strong(
339 v0, v1, mo, ::folly::detail::default_failure_memory_order(mo));
340 }
compare_exchange_strongDeterministicAtomicImpl341 bool compare_exchange_strong(
342 T& v0,
343 T v1,
344 std::memory_order success,
345 std::memory_order failure) noexcept {
346 Schedule::beforeSharedAccess();
347 auto orig = v0;
348 bool rv = data_.compare_exchange_strong(v0, v1, success, failure);
349 FOLLY_TEST_DSCHED_VLOG(
350 this << ".compare_exchange_strong(" << std::hex << orig << ", "
351 << std::hex << v1 << ") -> " << rv << "," << std::hex << v0);
352 Schedule::afterSharedAccess(rv);
353 return rv;
354 }
355
356 bool compare_exchange_weak(
357 T& v0, T v1, std::memory_order mo = std::memory_order_seq_cst) noexcept {
358 return compare_exchange_weak(
359 v0, v1, mo, ::folly::detail::default_failure_memory_order(mo));
360 }
compare_exchange_weakDeterministicAtomicImpl361 bool compare_exchange_weak(
362 T& v0,
363 T v1,
364 std::memory_order success,
365 std::memory_order failure) noexcept {
366 Schedule::beforeSharedAccess();
367 auto orig = v0;
368 bool rv = data_.compare_exchange_weak(v0, v1, success, failure);
369 FOLLY_TEST_DSCHED_VLOG(
370 this << ".compare_exchange_weak(" << std::hex << orig << ", "
371 << std::hex << v1 << ") -> " << rv << "," << std::hex << v0);
372 Schedule::afterSharedAccess(rv);
373 return rv;
374 }
375
376 T exchange(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
377 Schedule::beforeSharedAccess();
378 T rv = data_.exchange(v, mo);
379 FOLLY_TEST_DSCHED_VLOG(
380 this << ".exchange(" << std::hex << v << ") -> " << std::hex << rv);
381 Schedule::afterSharedAccess(true);
382 return rv;
383 }
384
TDeterministicAtomicImpl385 /* implicit */ operator T() const noexcept {
386 Schedule::beforeSharedAccess();
387 T rv = data_.operator T();
388 FOLLY_TEST_DSCHED_VLOG(this << "() -> " << std::hex << rv);
389 Schedule::afterSharedAccess(true);
390 return rv;
391 }
392
393 T load(std::memory_order mo = std::memory_order_seq_cst) const noexcept {
394 Schedule::beforeSharedAccess();
395 T rv = data_.load(mo);
396 FOLLY_TEST_DSCHED_VLOG(this << ".load() -> " << std::hex << rv);
397 Schedule::afterSharedAccess(true);
398 return rv;
399 }
400
401 T operator=(T v) noexcept {
402 Schedule::beforeSharedAccess();
403 T rv = (data_ = v);
404 FOLLY_TEST_DSCHED_VLOG(this << " = " << std::hex << v);
405 Schedule::afterSharedAccess(true);
406 return rv;
407 }
408
409 void store(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
410 Schedule::beforeSharedAccess();
411 data_.store(v, mo);
412 FOLLY_TEST_DSCHED_VLOG(this << ".store(" << std::hex << v << ")");
413 Schedule::afterSharedAccess(true);
414 }
415
416 T operator++() noexcept {
417 Schedule::beforeSharedAccess();
418 T rv = ++data_;
419 FOLLY_TEST_DSCHED_VLOG(this << " pre++ -> " << std::hex << rv);
420 Schedule::afterSharedAccess(true);
421 return rv;
422 }
423
424 T operator++(int /* postDummy */) noexcept {
425 Schedule::beforeSharedAccess();
426 T rv = data_++;
427 FOLLY_TEST_DSCHED_VLOG(this << " post++ -> " << std::hex << rv);
428 Schedule::afterSharedAccess(true);
429 return rv;
430 }
431
432 T operator--() noexcept {
433 Schedule::beforeSharedAccess();
434 T rv = --data_;
435 FOLLY_TEST_DSCHED_VLOG(this << " pre-- -> " << std::hex << rv);
436 Schedule::afterSharedAccess(true);
437 return rv;
438 }
439
440 T operator--(int /* postDummy */) noexcept {
441 Schedule::beforeSharedAccess();
442 T rv = data_--;
443 FOLLY_TEST_DSCHED_VLOG(this << " post-- -> " << std::hex << rv);
444 Schedule::afterSharedAccess(true);
445 return rv;
446 }
447
448 T operator+=(T v) noexcept {
449 Schedule::beforeSharedAccess();
450 T rv = (data_ += v);
451 FOLLY_TEST_DSCHED_VLOG(
452 this << " += " << std::hex << v << " -> " << std::hex << rv);
453 Schedule::afterSharedAccess(true);
454 return rv;
455 }
456
457 T fetch_add(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
458 Schedule::beforeSharedAccess();
459 T rv = data_.fetch_add(v, mo);
460 FOLLY_TEST_DSCHED_VLOG(
461 this << ".fetch_add(" << std::hex << v << ") -> " << std::hex << rv);
462 Schedule::afterSharedAccess(true);
463 return rv;
464 }
465
466 T operator-=(T v) noexcept {
467 Schedule::beforeSharedAccess();
468 T rv = (data_ -= v);
469 FOLLY_TEST_DSCHED_VLOG(
470 this << " -= " << std::hex << v << " -> " << std::hex << rv);
471 Schedule::afterSharedAccess(true);
472 return rv;
473 }
474
475 T fetch_sub(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
476 Schedule::beforeSharedAccess();
477 T rv = data_.fetch_sub(v, mo);
478 FOLLY_TEST_DSCHED_VLOG(
479 this << ".fetch_sub(" << std::hex << v << ") -> " << std::hex << rv);
480 Schedule::afterSharedAccess(true);
481 return rv;
482 }
483
484 T operator&=(T v) noexcept {
485 Schedule::beforeSharedAccess();
486 T rv = (data_ &= v);
487 FOLLY_TEST_DSCHED_VLOG(
488 this << " &= " << std::hex << v << " -> " << std::hex << rv);
489 Schedule::afterSharedAccess(true);
490 return rv;
491 }
492
493 T fetch_and(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
494 Schedule::beforeSharedAccess();
495 T rv = data_.fetch_and(v, mo);
496 FOLLY_TEST_DSCHED_VLOG(
497 this << ".fetch_and(" << std::hex << v << ") -> " << std::hex << rv);
498 Schedule::afterSharedAccess(true);
499 return rv;
500 }
501
502 T operator|=(T v) noexcept {
503 Schedule::beforeSharedAccess();
504 T rv = (data_ |= v);
505 FOLLY_TEST_DSCHED_VLOG(
506 this << " |= " << std::hex << v << " -> " << std::hex << rv);
507 Schedule::afterSharedAccess(true);
508 return rv;
509 }
510
511 T fetch_or(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
512 Schedule::beforeSharedAccess();
513 T rv = data_.fetch_or(v, mo);
514 FOLLY_TEST_DSCHED_VLOG(
515 this << ".fetch_or(" << std::hex << v << ") -> " << std::hex << rv);
516 Schedule::afterSharedAccess(true);
517 return rv;
518 }
519
520 T operator^=(T v) noexcept {
521 Schedule::beforeSharedAccess();
522 T rv = (data_ ^= v);
523 FOLLY_TEST_DSCHED_VLOG(
524 this << " ^= " << std::hex << v << " -> " << std::hex << rv);
525 Schedule::afterSharedAccess(true);
526 return rv;
527 }
528
529 T fetch_xor(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
530 Schedule::beforeSharedAccess();
531 T rv = data_.fetch_xor(v, mo);
532 FOLLY_TEST_DSCHED_VLOG(
533 this << ".fetch_xor(" << std::hex << v << ") -> " << std::hex << rv);
534 Schedule::afterSharedAccess(true);
535 return rv;
536 }
537
538 /** Read the value of the atomic variable without context switching */
load_directDeterministicAtomicImpl539 T load_direct() const noexcept {
540 return data_.load(std::memory_order_relaxed);
541 }
542
543 private:
544 Atom<T> data_;
545 };
546
547 template <typename T>
548 using DeterministicAtomic = DeterministicAtomicImpl<T, DeterministicSchedule>;
549
550 /* Futex extensions for DeterministicSchedule based Futexes */
551 int futexWakeImpl(
552 const detail::Futex<test::DeterministicAtomic>* futex,
553 int count,
554 uint32_t wakeMask);
555 detail::FutexResult futexWaitImpl(
556 const detail::Futex<test::DeterministicAtomic>* futex,
557 uint32_t expected,
558 std::chrono::system_clock::time_point const* absSystemTime,
559 std::chrono::steady_clock::time_point const* absSteadyTime,
560 uint32_t waitMask);
561
562 /* Generic futex extensions to allow sharing between DeterministicAtomic and
563 * BufferedDeterministicAtomic.*/
564 template <template <typename> class Atom>
deterministicFutexWakeImpl(const detail::Futex<Atom> * futex,std::mutex & futexLock,std::unordered_map<const detail::Futex<Atom> *,std::list<std::pair<uint32_t,bool * >>> & futexQueues,int count,uint32_t wakeMask)565 int deterministicFutexWakeImpl(
566 const detail::Futex<Atom>* futex,
567 std::mutex& futexLock,
568 std::unordered_map<
569 const detail::Futex<Atom>*,
570 std::list<std::pair<uint32_t, bool*>>>& futexQueues,
571 int count,
572 uint32_t wakeMask) {
573 using namespace test;
574 using namespace std::chrono;
575
576 int rv = 0;
577 DeterministicSchedule::beforeSharedAccess();
578 futexLock.lock();
579 if (futexQueues.count(futex) > 0) {
580 auto& queue = futexQueues[futex];
581 auto iter = queue.begin();
582 while (iter != queue.end() && rv < count) {
583 auto cur = iter++;
584 if ((cur->first & wakeMask) != 0) {
585 *(cur->second) = true;
586 rv++;
587 queue.erase(cur);
588 }
589 }
590 if (queue.empty()) {
591 futexQueues.erase(futex);
592 }
593 }
594 futexLock.unlock();
595 FOLLY_TEST_DSCHED_VLOG(
596 "futexWake(" << futex << ", " << count << ", " << std::hex << wakeMask
597 << ") -> " << rv);
598 DeterministicSchedule::afterSharedAccess();
599 return rv;
600 }
601
602 template <template <typename> class Atom>
deterministicFutexWaitImpl(const detail::Futex<Atom> * futex,std::mutex & futexLock,std::unordered_map<const detail::Futex<Atom> *,std::list<std::pair<uint32_t,bool * >>> & futexQueues,uint32_t expected,std::chrono::system_clock::time_point const * absSystemTimeout,std::chrono::steady_clock::time_point const * absSteadyTimeout,uint32_t waitMask)603 detail::FutexResult deterministicFutexWaitImpl(
604 const detail::Futex<Atom>* futex,
605 std::mutex& futexLock,
606 std::unordered_map<
607 const detail::Futex<Atom>*,
608 std::list<std::pair<uint32_t, bool*>>>& futexQueues,
609 uint32_t expected,
610 std::chrono::system_clock::time_point const* absSystemTimeout,
611 std::chrono::steady_clock::time_point const* absSteadyTimeout,
612 uint32_t waitMask) {
613 using namespace test;
614 using namespace std::chrono;
615 using namespace folly::detail;
616
617 bool hasTimeout = absSystemTimeout != nullptr || absSteadyTimeout != nullptr;
618 bool awoken = false;
619 FutexResult result = FutexResult::AWOKEN;
620
621 DeterministicSchedule::beforeSharedAccess();
622 FOLLY_TEST_DSCHED_VLOG(
623 "futexWait(" << futex << ", " << std::hex << expected << ", .., "
624 << std::hex << waitMask << ") beginning..");
625 futexLock.lock();
626 // load_direct avoids deadlock on inner call to beforeSharedAccess
627 if (futex->load_direct() == expected) {
628 auto& queue = futexQueues[futex];
629 queue.emplace_back(waitMask, &awoken);
630 auto ours = queue.end();
631 ours--;
632 while (!awoken) {
633 futexLock.unlock();
634 DeterministicSchedule::afterSharedAccess();
635 DeterministicSchedule::beforeSharedAccess();
636 futexLock.lock();
637
638 // Simulate spurious wake-ups, timeouts each time with
639 // a 10% probability if we haven't been woken up already
640 if (!awoken && hasTimeout &&
641 DeterministicSchedule::getRandNumber(100) < 10) {
642 assert(futexQueues.count(futex) != 0 && &futexQueues[futex] == &queue);
643 queue.erase(ours);
644 if (queue.empty()) {
645 futexQueues.erase(futex);
646 }
647 // Simulate ETIMEDOUT 90% of the time and other failures
648 // remaining time
649 result = DeterministicSchedule::getRandNumber(100) >= 10
650 ? FutexResult::TIMEDOUT
651 : FutexResult::INTERRUPTED;
652 break;
653 }
654 }
655 } else {
656 result = FutexResult::VALUE_CHANGED;
657 }
658 futexLock.unlock();
659
660 char const* resultStr = "?";
661 switch (result) {
662 case FutexResult::AWOKEN:
663 resultStr = "AWOKEN";
664 break;
665 case FutexResult::TIMEDOUT:
666 resultStr = "TIMEDOUT";
667 break;
668 case FutexResult::INTERRUPTED:
669 resultStr = "INTERRUPTED";
670 break;
671 case FutexResult::VALUE_CHANGED:
672 resultStr = "VALUE_CHANGED";
673 break;
674 }
675 FOLLY_TEST_DSCHED_VLOG(
676 "futexWait(" << futex << ", " << std::hex << expected << ", .., "
677 << std::hex << waitMask << ") -> " << resultStr);
678 DeterministicSchedule::afterSharedAccess();
679 return result;
680 }
681
682 /**
683 * Implementations of the atomic_wait API for DeterministicAtomic, these are
684 * no-ops here. Which for a correct implementation should not make a
685 * difference because threads are required to have atomic operations around
686 * waits and wakes
687 */
688 template <typename Integer>
tag_invoke(cpo_t<atomic_wait>,const DeterministicAtomic<Integer> *,Integer)689 void tag_invoke(
690 cpo_t<atomic_wait>, const DeterministicAtomic<Integer>*, Integer) {}
691 template <typename Integer, typename Clock, typename Duration>
tag_invoke(cpo_t<atomic_wait_until>,const DeterministicAtomic<Integer> *,Integer,const std::chrono::time_point<Clock,Duration> &)692 std::cv_status tag_invoke(
693 cpo_t<atomic_wait_until>,
694 const DeterministicAtomic<Integer>*,
695 Integer,
696 const std::chrono::time_point<Clock, Duration>&) {
697 return std::cv_status::no_timeout;
698 }
699 template <typename Integer>
tag_invoke(cpo_t<atomic_notify_one>,const DeterministicAtomic<Integer> *)700 void tag_invoke(cpo_t<atomic_notify_one>, const DeterministicAtomic<Integer>*) {
701 }
702 template <typename Integer>
tag_invoke(cpo_t<atomic_notify_all>,const DeterministicAtomic<Integer> *)703 void tag_invoke(cpo_t<atomic_notify_all>, const DeterministicAtomic<Integer>*) {
704 }
705
706 /**
707 * DeterministicMutex is a drop-in replacement of std::mutex that
708 * cooperates with DeterministicSchedule.
709 */
710 struct DeterministicMutex {
711 using Sem = DeterministicSchedule::Sem;
712
713 std::mutex m;
714 std::queue<Sem*> waiters_;
715 ThreadSyncVar syncVar_;
716
717 DeterministicMutex() = default;
718 ~DeterministicMutex() = default;
719 DeterministicMutex(DeterministicMutex const&) = delete;
720 DeterministicMutex& operator=(DeterministicMutex const&) = delete;
721
lockDeterministicMutex722 void lock() {
723 FOLLY_TEST_DSCHED_VLOG(this << ".lock()");
724 DeterministicSchedule::beforeSharedAccess();
725 while (!m.try_lock()) {
726 Sem* sem = DeterministicSchedule::descheduleCurrentThread();
727 if (sem) {
728 waiters_.push(sem);
729 }
730 DeterministicSchedule::afterSharedAccess();
731 // Wait to be scheduled by unlock
732 DeterministicSchedule::beforeSharedAccess();
733 }
734 if (DeterministicSchedule::isActive()) {
735 syncVar_.acquire();
736 }
737 DeterministicSchedule::afterSharedAccess();
738 }
739
try_lockDeterministicMutex740 bool try_lock() {
741 DeterministicSchedule::beforeSharedAccess();
742 bool rv = m.try_lock();
743 if (rv && DeterministicSchedule::isActive()) {
744 syncVar_.acquire();
745 }
746 FOLLY_TEST_DSCHED_VLOG(this << ".try_lock() -> " << rv);
747 DeterministicSchedule::afterSharedAccess();
748 return rv;
749 }
750
unlockDeterministicMutex751 void unlock() {
752 FOLLY_TEST_DSCHED_VLOG(this << ".unlock()");
753 DeterministicSchedule::beforeSharedAccess();
754 m.unlock();
755 if (DeterministicSchedule::isActive()) {
756 syncVar_.release();
757 }
758 if (!waiters_.empty()) {
759 Sem* sem = waiters_.front();
760 DeterministicSchedule::reschedule(sem);
761 waiters_.pop();
762 }
763 DeterministicSchedule::afterSharedAccess();
764 }
765 };
766 } // namespace test
767
768 template <>
769 Getcpu::Func AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc();
770
771 } // namespace folly
772