1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #pragma once
7 
8 #include <atomic>
9 #include <condition_variable>
10 #include <mutex>
11 
12 #include <folly/hash/Hash.h>
13 #include <folly/Indestructible.h>
14 #include <folly/Unit.h>
15 
16 namespace folly {
17 
18 namespace parking_lot_detail {
19 
20 struct WaitNodeBase {
21   const uint64_t key_;
22   const uint64_t lotid_;
23   WaitNodeBase* next_{nullptr};
24   WaitNodeBase* prev_{nullptr};
25 
26   // tricky: hold both bucket and node mutex to write, either to read
27   bool signaled_;
28   std::mutex mutex_;
29   std::condition_variable cond_;
30 
WaitNodeBaseWaitNodeBase31   WaitNodeBase(uint64_t key, uint64_t lotid)
32       : key_(key), lotid_(lotid), signaled_(false) {}
33 
34   template <typename Clock, typename Duration>
waitWaitNodeBase35   std::cv_status wait(std::chrono::time_point<Clock, Duration> deadline) {
36     std::cv_status status = std::cv_status::no_timeout;
37     std::unique_lock<std::mutex> nodeLock(mutex_);
38     while (!signaled_ && status != std::cv_status::timeout) {
39       if (deadline != std::chrono::time_point<Clock, Duration>::max()) {
40         status = cond_.wait_until(nodeLock, deadline);
41       } else {
42         cond_.wait(nodeLock);
43       }
44     }
45     return status;
46   }
47 
wakeWaitNodeBase48   void wake() {
49     std::lock_guard<std::mutex> nodeLock(mutex_);
50     signaled_ = true;
51     cond_.notify_one();
52   }
53 
signaledWaitNodeBase54   bool signaled() {
55     return signaled_;
56   }
57 };
58 
59 extern std::atomic<uint64_t> idallocator;
60 
61 // Our emulated futex uses 4096 lists of wait nodes.  There are two levels
62 // of locking: a per-list mutex that controls access to the list and a
63 // per-node mutex, condvar, and bool that are used for the actual wakeups.
64 // The per-node mutex allows us to do precise wakeups without thundering
65 // herds.
66 struct Bucket {
67   std::mutex mutex_;
68   WaitNodeBase* head_;
69   WaitNodeBase* tail_;
70   std::atomic<uint64_t> count_;
71 
72   static Bucket& bucketFor(uint64_t key);
73 
push_backBucket74   void push_back(WaitNodeBase* node) {
75     if (tail_) {
76       assert(head_);
77       node->prev_ = tail_;
78       tail_->next_ = node;
79       tail_ = node;
80     } else {
81       tail_ = node;
82       head_ = node;
83     }
84   }
85 
eraseBucket86   void erase(WaitNodeBase* node) {
87     assert(count_.load(std::memory_order_relaxed) >= 1);
88     if (head_ == node && tail_ == node) {
89       assert(node->prev_ == nullptr);
90       assert(node->next_ == nullptr);
91       head_ = nullptr;
92       tail_ = nullptr;
93     } else if (head_ == node) {
94       assert(node->prev_ == nullptr);
95       assert(node->next_);
96       head_ = node->next_;
97       head_->prev_ = nullptr;
98     } else if (tail_ == node) {
99       assert(node->next_ == nullptr);
100       assert(node->prev_);
101       tail_ = node->prev_;
102       tail_->next_ = nullptr;
103     } else {
104       assert(node->next_);
105       assert(node->prev_);
106       node->next_->prev_ = node->prev_;
107       node->prev_->next_ = node->next_;
108     }
109     count_.fetch_sub(1, std::memory_order_relaxed);
110   }
111 };
112 
113 } // namespace parking_lot_detail
114 
115 enum class UnparkControl {
116   RetainContinue,
117   RemoveContinue,
118   RetainBreak,
119   RemoveBreak,
120 };
121 
122 enum class ParkResult {
123   Skip,
124   Unpark,
125   Timeout,
126 };
127 
128 /*
129  * ParkingLot provides an interface that is similar to Linux's futex
130  * system call, but with additional functionality.  It is implemented
131  * in a portable way on top of std::mutex and std::condition_variable.
132  *
133  * Additional reading:
134  * https://webkit.org/blog/6161/locking-in-webkit/
135  * https://github.com/WebKit/webkit/blob/master/Source/WTF/wtf/ParkingLot.h
136  * https://locklessinc.com/articles/futex_cheat_sheet/
137  *
138  * The main difference from futex is that park/unpark take lambdas,
139  * such that nearly anything can be done while holding the bucket
140  * lock.  Unpark() lambda can also be used to wake up any number of
141  * waiters.
142  *
143  * ParkingLot is templated on the data type, however, all ParkingLot
144  * implementations are backed by a single static array of buckets to
145  * avoid large memory overhead.  Lambdas will only ever be called on
146  * the specific ParkingLot's nodes.
147  */
148 template <typename Data = Unit>
149 class ParkingLot {
150   const uint64_t lotid_;
151   ParkingLot(const ParkingLot&) = delete;
152 
153   struct WaitNode : public parking_lot_detail::WaitNodeBase {
154     const Data data_;
155 
156     template <typename D>
WaitNodeWaitNode157     WaitNode(uint64_t key, uint64_t lotid, D&& data)
158         : WaitNodeBase(key, lotid), data_(std::forward<D>(data)) {}
159   };
160 
161  public:
ParkingLot()162   ParkingLot() : lotid_(parking_lot_detail::idallocator++) {}
163 
164   /* Park API
165    *
166    * Key is almost always the address of a variable.
167    *
168    * ToPark runs while holding the bucket lock: usually this
169    * is a check to see if we can sleep, by checking waiter bits.
170    *
171    * PreWait is usually used to implement condition variable like
172    * things, such that you can unlock the condition variable's lock at
173    * the appropriate time.
174    */
175   template <typename Key, typename D, typename ToPark, typename PreWait>
park(const Key key,D && data,ToPark && toPark,PreWait && preWait)176   ParkResult park(const Key key, D&& data, ToPark&& toPark, PreWait&& preWait) {
177     return park_until(
178         key,
179         std::forward<D>(data),
180         std::forward<ToPark>(toPark),
181         std::forward<PreWait>(preWait),
182         std::chrono::steady_clock::time_point::max());
183   }
184 
185   template <
186       typename Key,
187       typename D,
188       typename ToPark,
189       typename PreWait,
190       typename Clock,
191       typename Duration>
192   ParkResult park_until(
193       const Key key,
194       D&& data,
195       ToPark&& toPark,
196       PreWait&& preWait,
197       std::chrono::time_point<Clock, Duration> deadline);
198 
199   template <
200       typename Key,
201       typename D,
202       typename ToPark,
203       typename PreWait,
204       typename Rep,
205       typename Period>
park_for(const Key key,D && data,ToPark && toPark,PreWait && preWait,std::chrono::duration<Rep,Period> & timeout)206   ParkResult park_for(
207       const Key key,
208       D&& data,
209       ToPark&& toPark,
210       PreWait&& preWait,
211       std::chrono::duration<Rep, Period>& timeout) {
212     return park_until(
213         key,
214         std::forward<D>(data),
215         std::forward<ToPark>(toPark),
216         std::forward<PreWait>(preWait),
217         timeout + std::chrono::steady_clock::now());
218   }
219 
220   /*
221    * Unpark API
222    *
223    * Key is the same uniqueaddress used in park(), and is used as a
224    * hash key for lookup of waiters.
225    *
226    * Unparker is a function that is given the Data parameter, and
227    * returns an UnparkControl.  The Remove* results will remove and
228    * wake the waiter, the Ignore/Stop results will not, while stopping
229    * or continuing iteration of the waiter list.
230    */
231   template <typename Key, typename Unparker>
232   void unpark(const Key key, Unparker&& func);
233 };
234 
235 template <typename Data>
236 template <
237     typename Key,
238     typename D,
239     typename ToPark,
240     typename PreWait,
241     typename Clock,
242     typename Duration>
park_until(const Key bits,D && data,ToPark && toPark,PreWait && preWait,std::chrono::time_point<Clock,Duration> deadline)243 ParkResult ParkingLot<Data>::park_until(
244     const Key bits,
245     D&& data,
246     ToPark&& toPark,
247     PreWait&& preWait,
248     std::chrono::time_point<Clock, Duration> deadline) {
249   auto key = hash::twang_mix64(uint64_t(bits));
250   auto& bucket = parking_lot_detail::Bucket::bucketFor(key);
251   WaitNode node(key, lotid_, std::forward<D>(data));
252 
253   {
254     // A: Must be seq_cst.  Matches B.
255     bucket.count_.fetch_add(1, std::memory_order_seq_cst);
256 
257     std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
258 
259     if (!std::forward<ToPark>(toPark)()) {
260       bucketLock.unlock();
261       bucket.count_.fetch_sub(1, std::memory_order_relaxed);
262       return ParkResult::Skip;
263     }
264 
265     bucket.push_back(&node);
266   } // bucketLock scope
267 
268   std::forward<PreWait>(preWait)();
269 
270   auto status = node.wait(deadline);
271 
272   if (status == std::cv_status::timeout) {
273     // it's not really a timeout until we unlink the unsignaled node
274     std::lock_guard<std::mutex> bucketLock(bucket.mutex_);
275     if (!node.signaled()) {
276       bucket.erase(&node);
277       return ParkResult::Timeout;
278     }
279   }
280 
281   return ParkResult::Unpark;
282 }
283 
284 template <typename Data>
285 template <typename Key, typename Func>
unpark(const Key bits,Func && func)286 void ParkingLot<Data>::unpark(const Key bits, Func&& func) {
287   auto key = hash::twang_mix64(uint64_t(bits));
288   auto& bucket = parking_lot_detail::Bucket::bucketFor(key);
289   // B: Must be seq_cst.  Matches A.  If true, A *must* see in seq_cst
290   // order any atomic updates in toPark() (and matching updates that
291   // happen before unpark is called)
292   if (bucket.count_.load(std::memory_order_seq_cst) == 0) {
293     return;
294   }
295 
296   std::lock_guard<std::mutex> bucketLock(bucket.mutex_);
297 
298   for (auto iter = bucket.head_; iter != nullptr;) {
299     auto node = static_cast<WaitNode*>(iter);
300     iter = iter->next_;
301     if (node->key_ == key && node->lotid_ == lotid_) {
302       auto result = std::forward<Func>(func)(node->data_);
303       if (result == UnparkControl::RemoveBreak ||
304           result == UnparkControl::RemoveContinue) {
305         // we unlink, but waiter destroys the node
306         bucket.erase(node);
307 
308         node->wake();
309       }
310       if (result == UnparkControl::RemoveBreak ||
311           result == UnparkControl::RetainBreak) {
312         return;
313       }
314     }
315   }
316 }
317 
318 } // namespace folly
319