1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <assert.h>
20 #include <errno.h>
21 
22 #include <atomic>
23 #include <functional>
24 #include <list>
25 #include <mutex>
26 #include <queue>
27 #include <thread>
28 #include <unordered_set>
29 #include <vector>
30 
31 #include <glog/logging.h>
32 
33 #include <folly/ScopeGuard.h>
34 #include <folly/concurrency/CacheLocality.h>
35 #include <folly/detail/Futex.h>
36 #include <folly/lang/CustomizationPoint.h>
37 #include <folly/synchronization/AtomicNotification.h>
38 #include <folly/synchronization/detail/AtomicUtils.h>
39 #include <folly/synchronization/test/Semaphore.h>
40 
41 namespace folly {
42 namespace test {
43 
44 // This is ugly, but better perf for DeterministicAtomic translates
45 // directly to more states explored and tested
46 #define FOLLY_TEST_DSCHED_VLOG(...)                             \
47   do {                                                          \
48     if (false) {                                                \
49       VLOG(2) << std::hex << std::this_thread::get_id() << ": " \
50               << __VA_ARGS__;                                   \
51     }                                                           \
52   } while (false)
53 
54 /* signatures of user-defined auxiliary functions */
55 using AuxAct = std::function<void(bool)>;
56 using AuxChk = std::function<void(uint64_t)>;
57 
58 struct DSchedThreadId {
59   unsigned val;
DSchedThreadIdDSchedThreadId60   explicit constexpr DSchedThreadId() : val(0) {}
DSchedThreadIdDSchedThreadId61   explicit constexpr DSchedThreadId(unsigned v) : val(v) {}
62   unsigned operator=(unsigned v) { return val = v; }
63 };
64 
65 class DSchedTimestamp {
66  public:
DSchedTimestamp()67   constexpr explicit DSchedTimestamp() : val_(0) {}
advance()68   DSchedTimestamp advance() { return DSchedTimestamp(++val_); }
atLeastAsRecentAs(const DSchedTimestamp & other)69   bool atLeastAsRecentAs(const DSchedTimestamp& other) const {
70     return val_ >= other.val_;
71   }
sync(const DSchedTimestamp & other)72   void sync(const DSchedTimestamp& other) { val_ = std::max(val_, other.val_); }
initialized()73   bool initialized() const { return val_ > 0; }
initial()74   static constexpr DSchedTimestamp initial() { return DSchedTimestamp(1); }
75 
76  protected:
DSchedTimestamp(size_t v)77   constexpr explicit DSchedTimestamp(size_t v) : val_(v) {}
78 
79  private:
80   size_t val_;
81 };
82 
83 class ThreadTimestamps {
84  public:
85   void sync(const ThreadTimestamps& src);
86   DSchedTimestamp advance(DSchedThreadId tid);
87 
88   void setIfNotPresent(DSchedThreadId tid, DSchedTimestamp ts);
89   void clear();
90   bool atLeastAsRecentAs(DSchedThreadId tid, DSchedTimestamp ts) const;
91   bool atLeastAsRecentAsAny(const ThreadTimestamps& src) const;
92 
93  private:
94   std::vector<DSchedTimestamp> timestamps_;
95 };
96 
97 struct ThreadInfo {
98   ThreadInfo() = delete;
ThreadInfoThreadInfo99   explicit ThreadInfo(DSchedThreadId tid) {
100     acqRelOrder_.setIfNotPresent(tid, DSchedTimestamp::initial());
101   }
102   ThreadTimestamps acqRelOrder_;
103   ThreadTimestamps acqFenceOrder_;
104   ThreadTimestamps relFenceOrder_;
105 };
106 
107 class ThreadSyncVar {
108  public:
109   ThreadSyncVar() = default;
110 
111   void acquire();
112   void release();
113   void acq_rel();
114 
115  private:
116   ThreadTimestamps order_;
117 };
118 
119 /**
120  * DeterministicSchedule coordinates the inter-thread communication of a
121  * set of threads under test, so that despite concurrency the execution is
122  * the same every time.  It works by stashing a reference to the schedule
123  * in a thread-local variable, then blocking all but one thread at a time.
124  *
125  * In order for DeterministicSchedule to work, it needs to intercept
126  * all inter-thread communication.  To do this you should use
127  * DeterministicAtomic<T> instead of std::atomic<T>, create threads
128  * using DeterministicSchedule::thread() instead of the std::thread
129  * constructor, DeterministicSchedule::join(thr) instead of thr.join(),
130  * and access semaphores via the helper functions in DeterministicSchedule.
131  * Locks are not yet supported, although they would be easy to add with
132  * the same strategy as the mapping of Sem::wait.
133  *
134  * The actual schedule is defined by a function from n -> [0,n). At
135  * each step, the function will be given the number of active threads
136  * (n), and it returns the index of the thread that should be run next.
137  * Invocations of the scheduler function will be serialized, but will
138  * occur from multiple threads.  A good starting schedule is uniform(0).
139  */
140 class DeterministicSchedule {
141  public:
142   using Sem = Semaphore;
143 
144   /**
145    * Arranges for the current thread (and all threads created by
146    * DeterministicSchedule::thread on a thread participating in this
147    * schedule) to participate in a deterministic schedule.
148    */
149   explicit DeterministicSchedule(std::function<size_t(size_t)> scheduler);
150 
151   DeterministicSchedule(const DeterministicSchedule&) = delete;
152   DeterministicSchedule& operator=(const DeterministicSchedule&) = delete;
153 
154   /** Completes the schedule. */
155   ~DeterministicSchedule();
156 
157   /**
158    * Returns a scheduling function that randomly chooses one of the
159    * runnable threads at each step, with no history.  This implements
160    * a schedule that is equivalent to one in which the steps between
161    * inter-thread communication are random variables following a poisson
162    * distribution.
163    */
164   static std::function<size_t(size_t)> uniform(uint64_t seed);
165 
166   /**
167    * Returns a scheduling function that chooses a subset of the active
168    * threads and randomly chooses a member of the subset as the next
169    * runnable thread.  The subset is chosen with size n, and the choice
170    * is made every m steps.
171    */
172   static std::function<size_t(size_t)> uniformSubset(
173       uint64_t seed, size_t n = 2, size_t m = 64);
174 
175   /** Obtains permission for the current thread to perform inter-thread
176    *  communication. */
177   static void beforeSharedAccess();
178 
179   /** Releases permission for the current thread to perform inter-thread
180    *  communication. */
181   static void afterSharedAccess();
182 
183   /** Calls a user-defined auxiliary function if any, and releases
184    *  permission for the current thread to perform inter-thread
185    *  communication. The bool parameter indicates the success of the
186    *  shared access (if conditional, true otherwise). */
187   static void afterSharedAccess(bool success);
188 
189   /** Launches a thread that will participate in the same deterministic
190    *  schedule as the current thread. */
191   template <typename Func, typename... Args>
thread(Func && func,Args &&...args)192   static inline std::thread thread(Func&& func, Args&&... args) {
193     // TODO: maybe future versions of gcc will allow forwarding to thread
194     atomic_thread_fence(std::memory_order_seq_cst);
195     auto sched = getCurrentSchedule();
196     auto sem = sched ? sched->beforeThreadCreate() : nullptr;
197     auto child = std::thread(
198         [=](Args... a) {
199           if (sched) {
200             sched->afterThreadCreate(sem);
201             beforeSharedAccess();
202             FOLLY_TEST_DSCHED_VLOG("running");
203             afterSharedAccess();
204           }
205           SCOPE_EXIT {
206             if (sched) {
207               sched->beforeThreadExit();
208             }
209           };
210           func(a...);
211         },
212         args...);
213     if (sched) {
214       beforeSharedAccess();
215       sched->active_.insert(child.get_id());
216       FOLLY_TEST_DSCHED_VLOG("forked " << std::hex << child.get_id());
217       afterSharedAccess();
218     }
219     return child;
220   }
221 
222   /** Calls child.join() as part of a deterministic schedule. */
223   static void join(std::thread& child);
224 
225   /** Waits for each thread in children to reach the end of their
226    * thread function without allowing them to fully terminate. Then,
227    * allow one child at a time to fully terminate and join each one.
228    * This functionality is important to protect shared access that can
229    * take place after beforeThreadExit() has already been invoked,
230    * for example when executing thread local destructors.
231    */
232   static void joinAll(std::vector<std::thread>& children);
233 
234   /** Calls sem->post() as part of a deterministic schedule. */
235   static void post(Sem* sem);
236 
237   /** Calls sem->try_wait() as part of a deterministic schedule, returning
238    *  true on success and false on transient failure. */
239   static bool tryWait(Sem* sem);
240 
241   /** Calls sem->wait() as part of a deterministic schedule. */
242   static void wait(Sem* sem);
243 
244   /** Used scheduler_ to get a random number b/w [0, n). If tls_sched is
245    *  not set-up it falls back to std::rand() */
246   static size_t getRandNumber(size_t n);
247 
248   /** Deterministic implemencation of getcpu */
249   static int getcpu(unsigned* cpu, unsigned* node, void* unused);
250 
251   /** Sets up a thread-specific function for call immediately after
252    *  the next shared access by the thread for managing auxiliary
253    *  data. The function takes a bool parameter that indicates the
254    *  success of the shared access (if it is conditional, true
255    *  otherwise). The function is cleared after one use. */
256   static void setAuxAct(AuxAct& aux);
257 
258   /** Sets up a function to be called after every subsequent shared
259    *  access (until clearAuxChk() is called) for checking global
260    *  invariants and logging. The function takes a uint64_t parameter
261    *  that indicates the number of shared accesses so far. */
262   static void setAuxChk(AuxChk& aux);
263 
264   /** Clears the function set by setAuxChk */
265   static void clearAuxChk();
266 
267   /** Remove the current thread's semaphore from sems_ */
268   static Sem* descheduleCurrentThread();
269 
270   /** Returns true if the current thread has already completed
271    * the thread function, for example if the thread is executing
272    * thread local destructors. */
273   static bool isCurrentThreadExiting();
274 
275   /** Add sem back into sems_ */
276   static void reschedule(Sem* sem);
277 
278   static bool isActive();
279 
280   static DSchedThreadId getThreadId();
281 
282   static ThreadInfo& getCurrentThreadInfo();
283 
284   static void atomic_thread_fence(std::memory_order mo);
285 
286  private:
287   static DeterministicSchedule* getCurrentSchedule();
288 
289   static AuxChk aux_chk;
290 
291   std::function<size_t(size_t)> scheduler_;
292   std::vector<Sem*> sems_;
293   std::unordered_set<std::thread::id> active_;
294   std::unordered_map<std::thread::id, Sem*> joins_;
295   std::unordered_map<std::thread::id, Sem*> exitingSems_;
296 
297   std::vector<ThreadInfo> threadInfoMap_;
298   ThreadTimestamps seqCstFenceOrder_;
299 
300   unsigned nextThreadId_;
301   /* step_ keeps count of shared accesses that correspond to user
302    * synchronization steps (atomic accesses for now).
303    * The reason for keeping track of this here and not just with
304    * auxiliary data is to provide users with warning signs (e.g.,
305    * skipped steps) if they inadvertently forget to set up aux
306    * functions for some shared accesses. */
307   uint64_t step_;
308 
309   Sem* beforeThreadCreate();
310   void afterThreadCreate(Sem*);
311   void beforeThreadExit();
312   void waitForBeforeThreadExit(std::thread& child);
313   /** Calls user-defined auxiliary function (if any) */
314   void callAux(bool);
315 };
316 
317 /**
318  * DeterministicAtomic<T> is a drop-in replacement std::atomic<T> that
319  * cooperates with DeterministicSchedule.
320  */
321 template <
322     typename T,
323     typename Schedule = DeterministicSchedule,
324     template <typename> class Atom = std::atomic>
325 struct DeterministicAtomicImpl {
326   DeterministicAtomicImpl() = default;
327   ~DeterministicAtomicImpl() = default;
328   DeterministicAtomicImpl(DeterministicAtomicImpl<T> const&) = delete;
329   DeterministicAtomicImpl<T>& operator=(DeterministicAtomicImpl<T> const&) =
330       delete;
331 
DeterministicAtomicImplDeterministicAtomicImpl332   constexpr /* implicit */ DeterministicAtomicImpl(T v) noexcept : data_(v) {}
333 
is_lock_freeDeterministicAtomicImpl334   bool is_lock_free() const noexcept { return data_.is_lock_free(); }
335 
336   bool compare_exchange_strong(
337       T& v0, T v1, std::memory_order mo = std::memory_order_seq_cst) noexcept {
338     return compare_exchange_strong(
339         v0, v1, mo, ::folly::detail::default_failure_memory_order(mo));
340   }
compare_exchange_strongDeterministicAtomicImpl341   bool compare_exchange_strong(
342       T& v0,
343       T v1,
344       std::memory_order success,
345       std::memory_order failure) noexcept {
346     Schedule::beforeSharedAccess();
347     auto orig = v0;
348     bool rv = data_.compare_exchange_strong(v0, v1, success, failure);
349     FOLLY_TEST_DSCHED_VLOG(
350         this << ".compare_exchange_strong(" << std::hex << orig << ", "
351              << std::hex << v1 << ") -> " << rv << "," << std::hex << v0);
352     Schedule::afterSharedAccess(rv);
353     return rv;
354   }
355 
356   bool compare_exchange_weak(
357       T& v0, T v1, std::memory_order mo = std::memory_order_seq_cst) noexcept {
358     return compare_exchange_weak(
359         v0, v1, mo, ::folly::detail::default_failure_memory_order(mo));
360   }
compare_exchange_weakDeterministicAtomicImpl361   bool compare_exchange_weak(
362       T& v0,
363       T v1,
364       std::memory_order success,
365       std::memory_order failure) noexcept {
366     Schedule::beforeSharedAccess();
367     auto orig = v0;
368     bool rv = data_.compare_exchange_weak(v0, v1, success, failure);
369     FOLLY_TEST_DSCHED_VLOG(
370         this << ".compare_exchange_weak(" << std::hex << orig << ", "
371              << std::hex << v1 << ") -> " << rv << "," << std::hex << v0);
372     Schedule::afterSharedAccess(rv);
373     return rv;
374   }
375 
376   T exchange(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
377     Schedule::beforeSharedAccess();
378     T rv = data_.exchange(v, mo);
379     FOLLY_TEST_DSCHED_VLOG(
380         this << ".exchange(" << std::hex << v << ") -> " << std::hex << rv);
381     Schedule::afterSharedAccess(true);
382     return rv;
383   }
384 
TDeterministicAtomicImpl385   /* implicit */ operator T() const noexcept {
386     Schedule::beforeSharedAccess();
387     T rv = data_.operator T();
388     FOLLY_TEST_DSCHED_VLOG(this << "() -> " << std::hex << rv);
389     Schedule::afterSharedAccess(true);
390     return rv;
391   }
392 
393   T load(std::memory_order mo = std::memory_order_seq_cst) const noexcept {
394     Schedule::beforeSharedAccess();
395     T rv = data_.load(mo);
396     FOLLY_TEST_DSCHED_VLOG(this << ".load() -> " << std::hex << rv);
397     Schedule::afterSharedAccess(true);
398     return rv;
399   }
400 
401   T operator=(T v) noexcept {
402     Schedule::beforeSharedAccess();
403     T rv = (data_ = v);
404     FOLLY_TEST_DSCHED_VLOG(this << " = " << std::hex << v);
405     Schedule::afterSharedAccess(true);
406     return rv;
407   }
408 
409   void store(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
410     Schedule::beforeSharedAccess();
411     data_.store(v, mo);
412     FOLLY_TEST_DSCHED_VLOG(this << ".store(" << std::hex << v << ")");
413     Schedule::afterSharedAccess(true);
414   }
415 
416   T operator++() noexcept {
417     Schedule::beforeSharedAccess();
418     T rv = ++data_;
419     FOLLY_TEST_DSCHED_VLOG(this << " pre++ -> " << std::hex << rv);
420     Schedule::afterSharedAccess(true);
421     return rv;
422   }
423 
424   T operator++(int /* postDummy */) noexcept {
425     Schedule::beforeSharedAccess();
426     T rv = data_++;
427     FOLLY_TEST_DSCHED_VLOG(this << " post++ -> " << std::hex << rv);
428     Schedule::afterSharedAccess(true);
429     return rv;
430   }
431 
432   T operator--() noexcept {
433     Schedule::beforeSharedAccess();
434     T rv = --data_;
435     FOLLY_TEST_DSCHED_VLOG(this << " pre-- -> " << std::hex << rv);
436     Schedule::afterSharedAccess(true);
437     return rv;
438   }
439 
440   T operator--(int /* postDummy */) noexcept {
441     Schedule::beforeSharedAccess();
442     T rv = data_--;
443     FOLLY_TEST_DSCHED_VLOG(this << " post-- -> " << std::hex << rv);
444     Schedule::afterSharedAccess(true);
445     return rv;
446   }
447 
448   T operator+=(T v) noexcept {
449     Schedule::beforeSharedAccess();
450     T rv = (data_ += v);
451     FOLLY_TEST_DSCHED_VLOG(
452         this << " += " << std::hex << v << " -> " << std::hex << rv);
453     Schedule::afterSharedAccess(true);
454     return rv;
455   }
456 
457   T fetch_add(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
458     Schedule::beforeSharedAccess();
459     T rv = data_.fetch_add(v, mo);
460     FOLLY_TEST_DSCHED_VLOG(
461         this << ".fetch_add(" << std::hex << v << ") -> " << std::hex << rv);
462     Schedule::afterSharedAccess(true);
463     return rv;
464   }
465 
466   T operator-=(T v) noexcept {
467     Schedule::beforeSharedAccess();
468     T rv = (data_ -= v);
469     FOLLY_TEST_DSCHED_VLOG(
470         this << " -= " << std::hex << v << " -> " << std::hex << rv);
471     Schedule::afterSharedAccess(true);
472     return rv;
473   }
474 
475   T fetch_sub(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
476     Schedule::beforeSharedAccess();
477     T rv = data_.fetch_sub(v, mo);
478     FOLLY_TEST_DSCHED_VLOG(
479         this << ".fetch_sub(" << std::hex << v << ") -> " << std::hex << rv);
480     Schedule::afterSharedAccess(true);
481     return rv;
482   }
483 
484   T operator&=(T v) noexcept {
485     Schedule::beforeSharedAccess();
486     T rv = (data_ &= v);
487     FOLLY_TEST_DSCHED_VLOG(
488         this << " &= " << std::hex << v << " -> " << std::hex << rv);
489     Schedule::afterSharedAccess(true);
490     return rv;
491   }
492 
493   T fetch_and(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
494     Schedule::beforeSharedAccess();
495     T rv = data_.fetch_and(v, mo);
496     FOLLY_TEST_DSCHED_VLOG(
497         this << ".fetch_and(" << std::hex << v << ") -> " << std::hex << rv);
498     Schedule::afterSharedAccess(true);
499     return rv;
500   }
501 
502   T operator|=(T v) noexcept {
503     Schedule::beforeSharedAccess();
504     T rv = (data_ |= v);
505     FOLLY_TEST_DSCHED_VLOG(
506         this << " |= " << std::hex << v << " -> " << std::hex << rv);
507     Schedule::afterSharedAccess(true);
508     return rv;
509   }
510 
511   T fetch_or(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
512     Schedule::beforeSharedAccess();
513     T rv = data_.fetch_or(v, mo);
514     FOLLY_TEST_DSCHED_VLOG(
515         this << ".fetch_or(" << std::hex << v << ") -> " << std::hex << rv);
516     Schedule::afterSharedAccess(true);
517     return rv;
518   }
519 
520   T operator^=(T v) noexcept {
521     Schedule::beforeSharedAccess();
522     T rv = (data_ ^= v);
523     FOLLY_TEST_DSCHED_VLOG(
524         this << " ^= " << std::hex << v << " -> " << std::hex << rv);
525     Schedule::afterSharedAccess(true);
526     return rv;
527   }
528 
529   T fetch_xor(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
530     Schedule::beforeSharedAccess();
531     T rv = data_.fetch_xor(v, mo);
532     FOLLY_TEST_DSCHED_VLOG(
533         this << ".fetch_xor(" << std::hex << v << ") -> " << std::hex << rv);
534     Schedule::afterSharedAccess(true);
535     return rv;
536   }
537 
538   /** Read the value of the atomic variable without context switching */
load_directDeterministicAtomicImpl539   T load_direct() const noexcept {
540     return data_.load(std::memory_order_relaxed);
541   }
542 
543  private:
544   Atom<T> data_;
545 };
546 
547 template <typename T>
548 using DeterministicAtomic = DeterministicAtomicImpl<T, DeterministicSchedule>;
549 
550 /* Futex extensions for DeterministicSchedule based Futexes */
551 int futexWakeImpl(
552     const detail::Futex<test::DeterministicAtomic>* futex,
553     int count,
554     uint32_t wakeMask);
555 detail::FutexResult futexWaitImpl(
556     const detail::Futex<test::DeterministicAtomic>* futex,
557     uint32_t expected,
558     std::chrono::system_clock::time_point const* absSystemTime,
559     std::chrono::steady_clock::time_point const* absSteadyTime,
560     uint32_t waitMask);
561 
562 /* Generic futex extensions to allow sharing between DeterministicAtomic and
563  * BufferedDeterministicAtomic.*/
564 template <template <typename> class Atom>
deterministicFutexWakeImpl(const detail::Futex<Atom> * futex,std::mutex & futexLock,std::unordered_map<const detail::Futex<Atom> *,std::list<std::pair<uint32_t,bool * >>> & futexQueues,int count,uint32_t wakeMask)565 int deterministicFutexWakeImpl(
566     const detail::Futex<Atom>* futex,
567     std::mutex& futexLock,
568     std::unordered_map<
569         const detail::Futex<Atom>*,
570         std::list<std::pair<uint32_t, bool*>>>& futexQueues,
571     int count,
572     uint32_t wakeMask) {
573   using namespace test;
574   using namespace std::chrono;
575 
576   int rv = 0;
577   DeterministicSchedule::beforeSharedAccess();
578   futexLock.lock();
579   if (futexQueues.count(futex) > 0) {
580     auto& queue = futexQueues[futex];
581     auto iter = queue.begin();
582     while (iter != queue.end() && rv < count) {
583       auto cur = iter++;
584       if ((cur->first & wakeMask) != 0) {
585         *(cur->second) = true;
586         rv++;
587         queue.erase(cur);
588       }
589     }
590     if (queue.empty()) {
591       futexQueues.erase(futex);
592     }
593   }
594   futexLock.unlock();
595   FOLLY_TEST_DSCHED_VLOG(
596       "futexWake(" << futex << ", " << count << ", " << std::hex << wakeMask
597                    << ") -> " << rv);
598   DeterministicSchedule::afterSharedAccess();
599   return rv;
600 }
601 
602 template <template <typename> class Atom>
deterministicFutexWaitImpl(const detail::Futex<Atom> * futex,std::mutex & futexLock,std::unordered_map<const detail::Futex<Atom> *,std::list<std::pair<uint32_t,bool * >>> & futexQueues,uint32_t expected,std::chrono::system_clock::time_point const * absSystemTimeout,std::chrono::steady_clock::time_point const * absSteadyTimeout,uint32_t waitMask)603 detail::FutexResult deterministicFutexWaitImpl(
604     const detail::Futex<Atom>* futex,
605     std::mutex& futexLock,
606     std::unordered_map<
607         const detail::Futex<Atom>*,
608         std::list<std::pair<uint32_t, bool*>>>& futexQueues,
609     uint32_t expected,
610     std::chrono::system_clock::time_point const* absSystemTimeout,
611     std::chrono::steady_clock::time_point const* absSteadyTimeout,
612     uint32_t waitMask) {
613   using namespace test;
614   using namespace std::chrono;
615   using namespace folly::detail;
616 
617   bool hasTimeout = absSystemTimeout != nullptr || absSteadyTimeout != nullptr;
618   bool awoken = false;
619   FutexResult result = FutexResult::AWOKEN;
620 
621   DeterministicSchedule::beforeSharedAccess();
622   FOLLY_TEST_DSCHED_VLOG(
623       "futexWait(" << futex << ", " << std::hex << expected << ", .., "
624                    << std::hex << waitMask << ") beginning..");
625   futexLock.lock();
626   // load_direct avoids deadlock on inner call to beforeSharedAccess
627   if (futex->load_direct() == expected) {
628     auto& queue = futexQueues[futex];
629     queue.emplace_back(waitMask, &awoken);
630     auto ours = queue.end();
631     ours--;
632     while (!awoken) {
633       futexLock.unlock();
634       DeterministicSchedule::afterSharedAccess();
635       DeterministicSchedule::beforeSharedAccess();
636       futexLock.lock();
637 
638       // Simulate spurious wake-ups, timeouts each time with
639       // a 10% probability if we haven't been woken up already
640       if (!awoken && hasTimeout &&
641           DeterministicSchedule::getRandNumber(100) < 10) {
642         assert(futexQueues.count(futex) != 0 && &futexQueues[futex] == &queue);
643         queue.erase(ours);
644         if (queue.empty()) {
645           futexQueues.erase(futex);
646         }
647         // Simulate ETIMEDOUT 90% of the time and other failures
648         // remaining time
649         result = DeterministicSchedule::getRandNumber(100) >= 10
650             ? FutexResult::TIMEDOUT
651             : FutexResult::INTERRUPTED;
652         break;
653       }
654     }
655   } else {
656     result = FutexResult::VALUE_CHANGED;
657   }
658   futexLock.unlock();
659 
660   char const* resultStr = "?";
661   switch (result) {
662     case FutexResult::AWOKEN:
663       resultStr = "AWOKEN";
664       break;
665     case FutexResult::TIMEDOUT:
666       resultStr = "TIMEDOUT";
667       break;
668     case FutexResult::INTERRUPTED:
669       resultStr = "INTERRUPTED";
670       break;
671     case FutexResult::VALUE_CHANGED:
672       resultStr = "VALUE_CHANGED";
673       break;
674   }
675   FOLLY_TEST_DSCHED_VLOG(
676       "futexWait(" << futex << ", " << std::hex << expected << ", .., "
677                    << std::hex << waitMask << ") -> " << resultStr);
678   DeterministicSchedule::afterSharedAccess();
679   return result;
680 }
681 
682 /**
683  * Implementations of the atomic_wait API for DeterministicAtomic, these are
684  * no-ops here.  Which for a correct implementation should not make a
685  * difference because threads are required to have atomic operations around
686  * waits and wakes
687  */
688 template <typename Integer>
tag_invoke(cpo_t<atomic_wait>,const DeterministicAtomic<Integer> *,Integer)689 void tag_invoke(
690     cpo_t<atomic_wait>, const DeterministicAtomic<Integer>*, Integer) {}
691 template <typename Integer, typename Clock, typename Duration>
tag_invoke(cpo_t<atomic_wait_until>,const DeterministicAtomic<Integer> *,Integer,const std::chrono::time_point<Clock,Duration> &)692 std::cv_status tag_invoke(
693     cpo_t<atomic_wait_until>,
694     const DeterministicAtomic<Integer>*,
695     Integer,
696     const std::chrono::time_point<Clock, Duration>&) {
697   return std::cv_status::no_timeout;
698 }
699 template <typename Integer>
tag_invoke(cpo_t<atomic_notify_one>,const DeterministicAtomic<Integer> *)700 void tag_invoke(cpo_t<atomic_notify_one>, const DeterministicAtomic<Integer>*) {
701 }
702 template <typename Integer>
tag_invoke(cpo_t<atomic_notify_all>,const DeterministicAtomic<Integer> *)703 void tag_invoke(cpo_t<atomic_notify_all>, const DeterministicAtomic<Integer>*) {
704 }
705 
706 /**
707  * DeterministicMutex is a drop-in replacement of std::mutex that
708  * cooperates with DeterministicSchedule.
709  */
710 struct DeterministicMutex {
711   using Sem = DeterministicSchedule::Sem;
712 
713   std::mutex m;
714   std::queue<Sem*> waiters_;
715   ThreadSyncVar syncVar_;
716 
717   DeterministicMutex() = default;
718   ~DeterministicMutex() = default;
719   DeterministicMutex(DeterministicMutex const&) = delete;
720   DeterministicMutex& operator=(DeterministicMutex const&) = delete;
721 
lockDeterministicMutex722   void lock() {
723     FOLLY_TEST_DSCHED_VLOG(this << ".lock()");
724     DeterministicSchedule::beforeSharedAccess();
725     while (!m.try_lock()) {
726       Sem* sem = DeterministicSchedule::descheduleCurrentThread();
727       if (sem) {
728         waiters_.push(sem);
729       }
730       DeterministicSchedule::afterSharedAccess();
731       // Wait to be scheduled by unlock
732       DeterministicSchedule::beforeSharedAccess();
733     }
734     if (DeterministicSchedule::isActive()) {
735       syncVar_.acquire();
736     }
737     DeterministicSchedule::afterSharedAccess();
738   }
739 
try_lockDeterministicMutex740   bool try_lock() {
741     DeterministicSchedule::beforeSharedAccess();
742     bool rv = m.try_lock();
743     if (rv && DeterministicSchedule::isActive()) {
744       syncVar_.acquire();
745     }
746     FOLLY_TEST_DSCHED_VLOG(this << ".try_lock() -> " << rv);
747     DeterministicSchedule::afterSharedAccess();
748     return rv;
749   }
750 
unlockDeterministicMutex751   void unlock() {
752     FOLLY_TEST_DSCHED_VLOG(this << ".unlock()");
753     DeterministicSchedule::beforeSharedAccess();
754     m.unlock();
755     if (DeterministicSchedule::isActive()) {
756       syncVar_.release();
757     }
758     if (!waiters_.empty()) {
759       Sem* sem = waiters_.front();
760       DeterministicSchedule::reschedule(sem);
761       waiters_.pop();
762     }
763     DeterministicSchedule::afterSharedAccess();
764   }
765 };
766 } // namespace test
767 
768 template <>
769 Getcpu::Func AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc();
770 
771 } // namespace folly
772