1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/test/DeterministicSchedule.h>
18 
19 #include <algorithm>
20 #include <cassert>
21 #include <list>
22 #include <mutex>
23 #include <random>
24 #include <unordered_map>
25 #include <utility>
26 
27 #include <folly/Random.h>
28 #include <folly/SingletonThreadLocal.h>
29 
30 namespace folly {
31 namespace test {
32 
33 using Sem = DeterministicSchedule::Sem;
34 
35 AuxChk DeterministicSchedule::aux_chk;
36 
37 // access is protected by futexLock
38 static std::unordered_map<
39     const detail::Futex<DeterministicAtomic>*,
40     std::list<std::pair<uint32_t, bool*>>>
41     futexQueues;
42 
43 static std::mutex futexLock;
44 
sync(const ThreadTimestamps & src)45 void ThreadTimestamps::sync(const ThreadTimestamps& src) {
46   if (src.timestamps_.size() > timestamps_.size()) {
47     timestamps_.resize(src.timestamps_.size());
48   }
49   for (size_t i = 0; i < src.timestamps_.size(); i++) {
50     timestamps_[i].sync(src.timestamps_[i]);
51   }
52 }
53 
advance(DSchedThreadId tid)54 DSchedTimestamp ThreadTimestamps::advance(DSchedThreadId tid) {
55   assert(timestamps_.size() > tid.val);
56   return timestamps_[tid.val].advance();
57 }
58 
setIfNotPresent(DSchedThreadId tid,DSchedTimestamp ts)59 void ThreadTimestamps::setIfNotPresent(DSchedThreadId tid, DSchedTimestamp ts) {
60   assert(ts.initialized());
61   if (tid.val >= timestamps_.size()) {
62     timestamps_.resize(tid.val + 1);
63   }
64   if (!timestamps_[tid.val].initialized()) {
65     timestamps_[tid.val].sync(ts);
66   }
67 }
68 
clear()69 void ThreadTimestamps::clear() {
70   timestamps_.clear();
71 }
72 
atLeastAsRecentAs(DSchedThreadId tid,DSchedTimestamp ts) const73 bool ThreadTimestamps::atLeastAsRecentAs(
74     DSchedThreadId tid, DSchedTimestamp ts) const {
75   // It is not meaningful learn whether any instance is at least
76   // as recent as timestamp 0.
77   assert(ts.initialized());
78   if (tid.val >= timestamps_.size()) {
79     return false;
80   }
81   return timestamps_[tid.val].atLeastAsRecentAs(ts);
82 }
83 
atLeastAsRecentAsAny(const ThreadTimestamps & src) const84 bool ThreadTimestamps::atLeastAsRecentAsAny(const ThreadTimestamps& src) const {
85   size_t min = timestamps_.size() < src.timestamps_.size()
86       ? timestamps_.size()
87       : src.timestamps_.size();
88   for (size_t i = 0; i < min; i++) {
89     if (src.timestamps_[i].initialized() &&
90         timestamps_[i].atLeastAsRecentAs(src.timestamps_[i])) {
91       return true;
92     }
93   }
94   return false;
95 }
96 
acquire()97 void ThreadSyncVar::acquire() {
98   ThreadInfo& threadInfo = DeterministicSchedule::getCurrentThreadInfo();
99   DSchedThreadId tid = DeterministicSchedule::getThreadId();
100   threadInfo.acqRelOrder_.advance(tid);
101   threadInfo.acqRelOrder_.sync(order_);
102 }
103 
release()104 void ThreadSyncVar::release() {
105   ThreadInfo& threadInfo = DeterministicSchedule::getCurrentThreadInfo();
106   DSchedThreadId tid = DeterministicSchedule::getThreadId();
107   threadInfo.acqRelOrder_.advance(tid);
108   order_.sync(threadInfo.acqRelOrder_);
109 }
110 
acq_rel()111 void ThreadSyncVar::acq_rel() {
112   ThreadInfo& threadInfo = DeterministicSchedule::getCurrentThreadInfo();
113   DSchedThreadId tid = DeterministicSchedule::getThreadId();
114   threadInfo.acqRelOrder_.advance(tid);
115   threadInfo.acqRelOrder_.sync(order_);
116   order_.sync(threadInfo.acqRelOrder_);
117 }
118 
119 namespace {
120 
121 struct PerThreadState {
122   // delete the constructors and assignment operators for sanity
123   //
124   // but... we can't delete the move constructor and assignment operators
125   // because those are required before C++17 in the implementation of
126   // SingletonThreadLocal
127   PerThreadState(const PerThreadState&) = delete;
128   PerThreadState& operator=(const PerThreadState&) = delete;
129   PerThreadState(PerThreadState&&) = default;
130   PerThreadState& operator=(PerThreadState&&) = default;
131   PerThreadState() = default;
132 
133   Sem* sem{nullptr};
134   DeterministicSchedule* sched{nullptr};
135   bool exiting{false};
136   DSchedThreadId threadId{};
137   AuxAct aux_act{};
138 };
139 using TLState = SingletonThreadLocal<PerThreadState>;
140 
141 } // namespace
142 
DeterministicSchedule(std::function<size_t (size_t)> scheduler)143 DeterministicSchedule::DeterministicSchedule(
144     std::function<size_t(size_t)> scheduler)
145     : scheduler_(std::move(scheduler)), nextThreadId_(0), step_(0) {
146   auto& tls = TLState::get();
147   assert(tls.sem == nullptr);
148   assert(tls.sched == nullptr);
149   assert(tls.aux_act == nullptr);
150 
151   tls.exiting = false;
152   tls.sem = new Sem(true);
153   sems_.push_back(tls.sem);
154 
155   tls.threadId = nextThreadId_++;
156   threadInfoMap_.emplace_back(tls.threadId);
157   tls.sched = this;
158 }
159 
~DeterministicSchedule()160 DeterministicSchedule::~DeterministicSchedule() {
161   auto& tls = TLState::get();
162   static_cast<void>(tls);
163   assert(tls.sched == this);
164   assert(sems_.size() == 1);
165   assert(sems_[0] == tls.sem);
166   delete tls.sem;
167   tls = PerThreadState();
168 }
169 
uniform(uint64_t seed)170 std::function<size_t(size_t)> DeterministicSchedule::uniform(uint64_t seed) {
171   auto rand = std::make_shared<std::ranlux48>(seed);
172   return [rand](size_t numActive) {
173     auto dist = std::uniform_int_distribution<size_t>(0, numActive - 1);
174     return dist(*rand);
175   };
176 }
177 
178 struct UniformSubset {
UniformSubsetfolly::test::UniformSubset179   UniformSubset(uint64_t seed, size_t subsetSize, size_t stepsBetweenSelect)
180       : uniform_(DeterministicSchedule::uniform(seed)),
181         subsetSize_(subsetSize),
182         stepsBetweenSelect_(stepsBetweenSelect),
183         stepsLeft_(0) {}
184 
operator ()folly::test::UniformSubset185   size_t operator()(size_t numActive) {
186     adjustPermSize(numActive);
187     if (stepsLeft_-- == 0) {
188       stepsLeft_ = stepsBetweenSelect_ - 1;
189       shufflePrefix();
190     }
191     return perm_[uniform_(std::min(numActive, subsetSize_))];
192   }
193 
194  private:
195   std::function<size_t(size_t)> uniform_;
196   const size_t subsetSize_;
197   const size_t stepsBetweenSelect_;
198 
199   size_t stepsLeft_;
200   // only the first subsetSize_ is properly randomized
201   std::vector<size_t> perm_;
202 
adjustPermSizefolly::test::UniformSubset203   void adjustPermSize(size_t numActive) {
204     if (perm_.size() > numActive) {
205       perm_.erase(
206           std::remove_if(
207               perm_.begin(),
208               perm_.end(),
209               [=](size_t x) { return x >= numActive; }),
210           perm_.end());
211     } else {
212       while (perm_.size() < numActive) {
213         perm_.push_back(perm_.size());
214       }
215     }
216     assert(perm_.size() == numActive);
217   }
218 
shufflePrefixfolly::test::UniformSubset219   void shufflePrefix() {
220     for (size_t i = 0; i < std::min(perm_.size() - 1, subsetSize_); ++i) {
221       size_t j = uniform_(perm_.size() - i) + i;
222       std::swap(perm_[i], perm_[j]);
223     }
224   }
225 };
226 
isCurrentThreadExiting()227 bool DeterministicSchedule::isCurrentThreadExiting() {
228   auto& tls = TLState::get();
229   return tls.exiting;
230 }
231 
isActive()232 bool DeterministicSchedule::isActive() {
233   auto& tls = TLState::get();
234   return tls.sched != nullptr;
235 }
236 
getThreadId()237 DSchedThreadId DeterministicSchedule::getThreadId() {
238   auto& tls = TLState::get();
239   assert(tls.sched != nullptr);
240   return tls.threadId;
241 }
242 
getCurrentSchedule()243 DeterministicSchedule* DeterministicSchedule::getCurrentSchedule() {
244   auto& tls = TLState::get();
245   return tls.sched;
246 }
247 
uniformSubset(uint64_t seed,size_t n,size_t m)248 std::function<size_t(size_t)> DeterministicSchedule::uniformSubset(
249     uint64_t seed, size_t n, size_t m) {
250   auto gen = std::make_shared<UniformSubset>(seed, n, m);
251   return [=](size_t numActive) { return (*gen)(numActive); };
252 }
253 
beforeSharedAccess()254 void DeterministicSchedule::beforeSharedAccess() {
255   auto& tls = TLState::get();
256   if (tls.sem) {
257     tls.sem->wait();
258   }
259 }
260 
afterSharedAccess()261 void DeterministicSchedule::afterSharedAccess() {
262   auto& tls = TLState::get();
263   auto sched = tls.sched;
264   if (!sched) {
265     return;
266   }
267   sched->sems_[sched->scheduler_(sched->sems_.size())]->post();
268 }
269 
afterSharedAccess(bool success)270 void DeterministicSchedule::afterSharedAccess(bool success) {
271   auto& tls = TLState::get();
272   auto sched = tls.sched;
273   if (!sched) {
274     return;
275   }
276   sched->callAux(success);
277   sched->sems_[sched->scheduler_(sched->sems_.size())]->post();
278 }
279 
getRandNumber(size_t n)280 size_t DeterministicSchedule::getRandNumber(size_t n) {
281   auto& tls = TLState::get();
282   if (tls.sched) {
283     return tls.sched->scheduler_(n);
284   }
285   return Random::rand32() % n;
286 }
287 
getcpu(unsigned * cpu,unsigned * node,void *)288 int DeterministicSchedule::getcpu(
289     unsigned* cpu, unsigned* node, void* /* unused */) {
290   auto& tls = TLState::get();
291   if (cpu) {
292     *cpu = tls.threadId.val;
293   }
294   if (node) {
295     *node = tls.threadId.val;
296   }
297   return 0;
298 }
299 
setAuxAct(AuxAct & aux)300 void DeterministicSchedule::setAuxAct(AuxAct& aux) {
301   auto& tls = TLState::get();
302   tls.aux_act = aux;
303 }
304 
setAuxChk(AuxChk & aux)305 void DeterministicSchedule::setAuxChk(AuxChk& aux) {
306   aux_chk = aux;
307 }
308 
clearAuxChk()309 void DeterministicSchedule::clearAuxChk() {
310   aux_chk = nullptr;
311 }
312 
reschedule(Sem * sem)313 void DeterministicSchedule::reschedule(Sem* sem) {
314   auto& tls = TLState::get();
315   auto sched = tls.sched;
316   if (sched) {
317     sched->sems_.push_back(sem);
318   }
319 }
320 
descheduleCurrentThread()321 Sem* DeterministicSchedule::descheduleCurrentThread() {
322   auto& tls = TLState::get();
323   auto sched = tls.sched;
324   if (sched) {
325     sched->sems_.erase(
326         std::find(sched->sems_.begin(), sched->sems_.end(), tls.sem));
327   }
328   return tls.sem;
329 }
330 
beforeThreadCreate()331 Sem* DeterministicSchedule::beforeThreadCreate() {
332   Sem* s = new Sem(false);
333   beforeSharedAccess();
334   sems_.push_back(s);
335   afterSharedAccess();
336   return s;
337 }
338 
afterThreadCreate(Sem * sem)339 void DeterministicSchedule::afterThreadCreate(Sem* sem) {
340   auto& tls = TLState::get();
341   assert(tls.sem == nullptr);
342   assert(tls.sched == nullptr);
343   tls.exiting = false;
344   tls.sem = sem;
345   tls.sched = this;
346   bool started = false;
347   while (!started) {
348     beforeSharedAccess();
349     if (active_.count(std::this_thread::get_id()) == 1) {
350       started = true;
351       tls.threadId = nextThreadId_++;
352       assert(tls.threadId.val == threadInfoMap_.size());
353       threadInfoMap_.emplace_back(tls.threadId);
354     }
355     afterSharedAccess();
356   }
357   atomic_thread_fence(std::memory_order_seq_cst);
358 }
359 
beforeThreadExit()360 void DeterministicSchedule::beforeThreadExit() {
361   auto& tls = TLState::get();
362   assert(tls.sched == this);
363 
364   atomic_thread_fence(std::memory_order_seq_cst);
365   beforeSharedAccess();
366   auto parent = joins_.find(std::this_thread::get_id());
367   if (parent != joins_.end()) {
368     reschedule(parent->second);
369     joins_.erase(parent);
370   }
371   descheduleCurrentThread();
372   active_.erase(std::this_thread::get_id());
373 
374   FOLLY_TEST_DSCHED_VLOG("exiting");
375   exitingSems_[std::this_thread::get_id()] = tls.sem;
376   afterSharedAccess();
377   // Wait for the parent thread to allow us to run thread-local destructors.
378   tls.sem->wait();
379   delete tls.sem;
380   tls = PerThreadState();
381 }
382 
waitForBeforeThreadExit(std::thread & child)383 void DeterministicSchedule::waitForBeforeThreadExit(std::thread& child) {
384   auto& tls = TLState::get();
385   assert(tls.sched == this);
386   beforeSharedAccess();
387   assert(tls.sched->joins_.count(child.get_id()) == 0);
388   if (tls.sched->active_.count(child.get_id())) {
389     Sem* sem = descheduleCurrentThread();
390     tls.sched->joins_.insert({child.get_id(), sem});
391     afterSharedAccess();
392     // Wait to be scheduled by exiting child thread
393     beforeSharedAccess();
394     assert(!tls.sched->active_.count(child.get_id()));
395   }
396   afterSharedAccess();
397 }
398 
joinAll(std::vector<std::thread> & children)399 void DeterministicSchedule::joinAll(std::vector<std::thread>& children) {
400   auto& tls = TLState::get();
401   auto sched = tls.sched;
402   if (sched) {
403     // Wait until all children are about to exit
404     for (auto& child : children) {
405       sched->waitForBeforeThreadExit(child);
406     }
407   }
408   atomic_thread_fence(std::memory_order_seq_cst);
409   /* Let each child thread proceed one at a time to protect
410    * shared access during thread local destructors.*/
411   for (auto& child : children) {
412     if (sched) {
413       beforeSharedAccess();
414       sched->exitingSems_[child.get_id()]->post();
415     }
416     child.join();
417     if (sched) {
418       afterSharedAccess();
419     }
420   }
421 }
422 
join(std::thread & child)423 void DeterministicSchedule::join(std::thread& child) {
424   auto& tls = TLState::get();
425   auto sched = tls.sched;
426   if (sched) {
427     sched->waitForBeforeThreadExit(child);
428   }
429   atomic_thread_fence(std::memory_order_seq_cst);
430   FOLLY_TEST_DSCHED_VLOG("joined " << std::hex << child.get_id());
431   if (sched) {
432     beforeSharedAccess();
433     sched->exitingSems_[child.get_id()]->post();
434   }
435   child.join();
436   if (sched) {
437     afterSharedAccess();
438   }
439 }
440 
callAux(bool success)441 void DeterministicSchedule::callAux(bool success) {
442   auto& tls = TLState::get();
443   ++step_;
444   if (tls.aux_act) {
445     tls.aux_act(success);
446     tls.aux_act = nullptr;
447   }
448   if (aux_chk) {
449     aux_chk(step_);
450   }
451 }
452 
453 static std::unordered_map<Sem*, std::unique_ptr<ThreadSyncVar>> semSyncVar;
454 
post(Sem * sem)455 void DeterministicSchedule::post(Sem* sem) {
456   beforeSharedAccess();
457   if (semSyncVar.count(sem) == 0) {
458     semSyncVar[sem] = std::make_unique<ThreadSyncVar>();
459   }
460   semSyncVar[sem]->release();
461   sem->post();
462   FOLLY_TEST_DSCHED_VLOG("sem->post() [sem=" << sem << "]");
463   afterSharedAccess();
464 }
465 
tryWait(Sem * sem)466 bool DeterministicSchedule::tryWait(Sem* sem) {
467   beforeSharedAccess();
468   if (semSyncVar.count(sem) == 0) {
469     semSyncVar[sem] = std::make_unique<ThreadSyncVar>();
470   }
471 
472   bool acquired = sem->try_wait();
473   bool acquired_s = acquired ? "true" : "false";
474   FOLLY_TEST_DSCHED_VLOG(
475       "sem->try_wait() [sem=" << sem << "] -> " << acquired_s);
476   if (acquired) {
477     semSyncVar[sem]->acq_rel();
478   } else {
479     semSyncVar[sem]->acquire();
480   }
481 
482   afterSharedAccess();
483   return acquired;
484 }
485 
wait(Sem * sem)486 void DeterministicSchedule::wait(Sem* sem) {
487   while (!tryWait(sem)) {
488     // we're not busy waiting because this is a deterministic schedule
489   }
490 }
491 
getCurrentThreadInfo()492 ThreadInfo& DeterministicSchedule::getCurrentThreadInfo() {
493   auto& tls = TLState::get();
494   auto sched = tls.sched;
495   assert(sched);
496   assert(tls.threadId.val < sched->threadInfoMap_.size());
497   return sched->threadInfoMap_[tls.threadId.val];
498 }
499 
atomic_thread_fence(std::memory_order mo)500 void DeterministicSchedule::atomic_thread_fence(std::memory_order mo) {
501   auto& tls = TLState::get();
502   if (!tls.sched) {
503     std::atomic_thread_fence(mo);
504     return;
505   }
506   beforeSharedAccess();
507   ThreadInfo& threadInfo = getCurrentThreadInfo();
508   switch (mo) {
509     case std::memory_order_relaxed:
510       assert(false);
511       break;
512     case std::memory_order_consume:
513     case std::memory_order_acquire:
514       threadInfo.acqRelOrder_.sync(threadInfo.acqFenceOrder_);
515       break;
516     case std::memory_order_release:
517       threadInfo.relFenceOrder_.sync(threadInfo.acqRelOrder_);
518       break;
519     case std::memory_order_acq_rel:
520       threadInfo.acqRelOrder_.sync(threadInfo.acqFenceOrder_);
521       threadInfo.relFenceOrder_.sync(threadInfo.acqRelOrder_);
522       break;
523     case std::memory_order_seq_cst:
524       threadInfo.acqRelOrder_.sync(threadInfo.acqFenceOrder_);
525       threadInfo.acqRelOrder_.sync(tls.sched->seqCstFenceOrder_);
526       tls.sched->seqCstFenceOrder_ = threadInfo.acqRelOrder_;
527       threadInfo.relFenceOrder_.sync(threadInfo.acqRelOrder_);
528       break;
529   }
530   FOLLY_TEST_DSCHED_VLOG("fence: " << folly::detail::memory_order_to_str(mo));
531   afterSharedAccess();
532 }
533 
futexWaitImpl(const detail::Futex<DeterministicAtomic> * futex,uint32_t expected,std::chrono::system_clock::time_point const * absSystemTimeout,std::chrono::steady_clock::time_point const * absSteadyTimeout,uint32_t waitMask)534 detail::FutexResult futexWaitImpl(
535     const detail::Futex<DeterministicAtomic>* futex,
536     uint32_t expected,
537     std::chrono::system_clock::time_point const* absSystemTimeout,
538     std::chrono::steady_clock::time_point const* absSteadyTimeout,
539     uint32_t waitMask) {
540   return deterministicFutexWaitImpl<DeterministicAtomic>(
541       futex,
542       futexLock,
543       futexQueues,
544       expected,
545       absSystemTimeout,
546       absSteadyTimeout,
547       waitMask);
548 }
549 
futexWakeImpl(const detail::Futex<DeterministicAtomic> * futex,int count,uint32_t wakeMask)550 int futexWakeImpl(
551     const detail::Futex<DeterministicAtomic>* futex,
552     int count,
553     uint32_t wakeMask) {
554   return deterministicFutexWakeImpl<DeterministicAtomic>(
555       futex, futexLock, futexQueues, count, wakeMask);
556 }
557 
558 } // namespace test
559 } // namespace folly
560 
561 namespace folly {
562 
563 template <>
system()564 CacheLocality const& CacheLocality::system<test::DeterministicAtomic>() {
565   static CacheLocality cache(CacheLocality::uniform(16));
566   return cache;
567 }
568 
569 template <>
pickGetcpuFunc()570 Getcpu::Func AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc() {
571   return &test::DeterministicSchedule::getcpu;
572 }
573 } // namespace folly
574