1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 use cfg_if::cfg_if;
8 use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT};
9 use crate::util::UncheckedOptionExt;
10 use crate::word_lock::WordLock;
11 use core::{
12     cell::{Cell, UnsafeCell},
13     ptr,
14     sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
15 };
16 use smallvec::SmallVec;
17 use std::time::{Duration, Instant};
18 
19 cfg_if! {
20     if #[cfg(all(
21         target_arch = "wasm32",
22         target_os = "unknown",
23         target_vendor = "unknown"
24     ))] {
25         use core::ops::Add;
26 
27         #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)]
28         struct DummyInstant(Duration);
29 
30         impl DummyInstant {
31             pub fn now() -> DummyInstant {
32                 DummyInstant::zero()
33             }
34 
35             const fn zero() -> DummyInstant {
36                 DummyInstant(Duration::from_secs(0))
37             }
38         }
39 
40         impl Add<Duration> for DummyInstant {
41             type Output = DummyInstant;
42 
43             fn add(self, _rhs: Duration) -> DummyInstant {
44                 DummyInstant::zero()
45             }
46         }
47 
48         // Use dummy implementation for `Instant` on `wasm32`. The reason for this is
49         // that `Instant::now()` will always panic because time is currently not implemented
50         // on wasm32-unknown-unknown.
51         // See https://github.com/rust-lang/rust/blob/master/src/libstd/sys/wasm/time.rs
52         type InstantType = DummyInstant;
53     } else {
54         // Otherwise use `std::time::Instant`
55         type InstantType = Instant;
56     }
57 }
58 
59 static NUM_THREADS: AtomicUsize = AtomicUsize::new(0);
60 
61 /// Holds the pointer to the currently active `HashTable`.
62 ///
63 /// # Safety
64 ///
65 /// Except for the initial value of null, it must always point to a valid `HashTable` instance.
66 /// Any `HashTable` this global static has ever pointed to must never be freed.
67 static HASHTABLE: AtomicPtr<HashTable> = AtomicPtr::new(ptr::null_mut());
68 
69 // Even with 3x more buckets than threads, the memory overhead per thread is
70 // still only a few hundred bytes per thread.
71 const LOAD_FACTOR: usize = 3;
72 
73 struct HashTable {
74     // Hash buckets for the table
75     entries: Box<[Bucket]>,
76 
77     // Number of bits used for the hash function
78     hash_bits: u32,
79 
80     // Previous table. This is only kept to keep leak detectors happy.
81     _prev: *const HashTable,
82 }
83 
84 impl HashTable {
85     #[inline]
new(num_threads: usize, prev: *const HashTable) -> Box<HashTable>86     fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> {
87         let new_size = (num_threads * LOAD_FACTOR).next_power_of_two();
88         let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1;
89 
90         let now = InstantType::now();
91         let mut entries = Vec::with_capacity(new_size);
92         for i in 0..new_size {
93             // We must ensure the seed is not zero
94             entries.push(Bucket::new(now, i as u32 + 1));
95         }
96 
97         Box::new(HashTable {
98             entries: entries.into_boxed_slice(),
99             hash_bits,
100             _prev: prev,
101         })
102     }
103 }
104 
105 #[repr(align(64))]
106 struct Bucket {
107     // Lock protecting the queue
108     mutex: WordLock,
109 
110     // Linked list of threads waiting on this bucket
111     queue_head: Cell<*const ThreadData>,
112     queue_tail: Cell<*const ThreadData>,
113 
114     // Next time at which point be_fair should be set
115     fair_timeout: UnsafeCell<FairTimeout>,
116 }
117 
118 impl Bucket {
119     #[inline]
new(timeout: InstantType, seed: u32) -> Self120     pub fn new(timeout: InstantType, seed: u32) -> Self {
121         Self {
122             mutex: WordLock::new(),
123             queue_head: Cell::new(ptr::null()),
124             queue_tail: Cell::new(ptr::null()),
125             fair_timeout: UnsafeCell::new(FairTimeout::new(timeout, seed)),
126         }
127     }
128 }
129 
130 struct FairTimeout {
131     // Next time at which point be_fair should be set
132     timeout: InstantType,
133 
134     // the PRNG state for calculating the next timeout
135     seed: u32,
136 }
137 
138 impl FairTimeout {
139     #[inline]
new(timeout: InstantType, seed: u32) -> FairTimeout140     fn new(timeout: InstantType, seed: u32) -> FairTimeout {
141         FairTimeout { timeout, seed }
142     }
143 
144     // Determine whether we should force a fair unlock, and update the timeout
145     #[inline]
should_timeout(&mut self) -> bool146     fn should_timeout(&mut self) -> bool {
147         let now = InstantType::now();
148         if now > self.timeout {
149             // Time between 0 and 1ms.
150             let nanos = self.gen_u32() % 1_000_000;
151             self.timeout = now + Duration::new(0, nanos);
152             true
153         } else {
154             false
155         }
156     }
157 
158     // Pseudorandom number generator from the "Xorshift RNGs" paper by George Marsaglia.
gen_u32(&mut self) -> u32159     fn gen_u32(&mut self) -> u32 {
160         self.seed ^= self.seed << 13;
161         self.seed ^= self.seed >> 17;
162         self.seed ^= self.seed << 5;
163         self.seed
164     }
165 }
166 
167 struct ThreadData {
168     parker: ThreadParker,
169 
170     // Key that this thread is sleeping on. This may change if the thread is
171     // requeued to a different key.
172     key: AtomicUsize,
173 
174     // Linked list of parked threads in a bucket
175     next_in_queue: Cell<*const ThreadData>,
176 
177     // UnparkToken passed to this thread when it is unparked
178     unpark_token: Cell<UnparkToken>,
179 
180     // ParkToken value set by the thread when it was parked
181     park_token: Cell<ParkToken>,
182 
183     // Is the thread parked with a timeout?
184     parked_with_timeout: Cell<bool>,
185 
186     // Extra data for deadlock detection
187     #[cfg(feature = "deadlock_detection")]
188     deadlock_data: deadlock::DeadlockData,
189 }
190 
191 impl ThreadData {
new() -> ThreadData192     fn new() -> ThreadData {
193         // Keep track of the total number of live ThreadData objects and resize
194         // the hash table accordingly.
195         let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1;
196         grow_hashtable(num_threads);
197 
198         ThreadData {
199             parker: ThreadParker::new(),
200             key: AtomicUsize::new(0),
201             next_in_queue: Cell::new(ptr::null()),
202             unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN),
203             park_token: Cell::new(DEFAULT_PARK_TOKEN),
204             parked_with_timeout: Cell::new(false),
205             #[cfg(feature = "deadlock_detection")]
206             deadlock_data: deadlock::DeadlockData::new(),
207         }
208     }
209 }
210 
211 // Invokes the given closure with a reference to the current thread `ThreadData`.
212 #[inline(always)]
with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T213 fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T {
214     // Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive
215     // to construct. Try to use a thread-local version if possible. Otherwise just
216     // create a ThreadData on the stack
217     let mut thread_data_storage = None;
218     thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
219     let thread_data_ptr = THREAD_DATA
220         .try_with(|x| x as *const ThreadData)
221         .unwrap_or_else(|_| thread_data_storage.get_or_insert_with(ThreadData::new));
222 
223     f(unsafe { &*thread_data_ptr })
224 }
225 
226 impl Drop for ThreadData {
drop(&mut self)227     fn drop(&mut self) {
228         NUM_THREADS.fetch_sub(1, Ordering::Relaxed);
229     }
230 }
231 
232 /// Returns a reference to the latest hash table, creating one if it doesn't exist yet.
233 /// The reference is valid forever. However, the `HashTable` it references might become stale
234 /// at any point. Meaning it still exists, but it is not the instance in active use.
235 #[inline]
get_hashtable() -> &'static HashTable236 fn get_hashtable() -> &'static HashTable {
237     let table = HASHTABLE.load(Ordering::Acquire);
238 
239     // If there is no table, create one
240     if table.is_null() {
241         create_hashtable()
242     } else {
243         // SAFETY: when not null, `HASHTABLE` always points to a `HashTable` that is never freed.
244         unsafe { &*table }
245     }
246 }
247 
248 /// Returns a reference to the latest hash table, creating one if it doesn't exist yet.
249 /// The reference is valid forever. However, the `HashTable` it references might become stale
250 /// at any point. Meaning it still exists, but it is not the instance in active use.
251 #[cold]
create_hashtable() -> &'static HashTable252 fn create_hashtable() -> &'static HashTable {
253     let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null()));
254 
255     // If this fails then it means some other thread created the hash table first.
256     let table = match HASHTABLE.compare_exchange(
257         ptr::null_mut(),
258         new_table,
259         Ordering::AcqRel,
260         Ordering::Acquire,
261     ) {
262         Ok(_) => new_table,
263         Err(old_table) => {
264             // Free the table we created
265             // SAFETY: `new_table` is created from `Box::into_raw` above and only freed here.
266             unsafe {
267                 Box::from_raw(new_table);
268             }
269             old_table
270         }
271     };
272     // SAFETY: The `HashTable` behind `table` is never freed. It is either the table pointer we
273     // created here, or it is one loaded from `HASHTABLE`.
274     unsafe { &*table }
275 }
276 
277 // Grow the hash table so that it is big enough for the given number of threads.
278 // This isn't performance-critical since it is only done when a ThreadData is
279 // created, which only happens once per thread.
grow_hashtable(num_threads: usize)280 fn grow_hashtable(num_threads: usize) {
281     // Lock all buckets in the existing table and get a reference to it
282     let old_table = loop {
283         let table = get_hashtable();
284 
285         // Check if we need to resize the existing table
286         if table.entries.len() >= LOAD_FACTOR * num_threads {
287             return;
288         }
289 
290         // Lock all buckets in the old table
291         for bucket in &table.entries[..] {
292             bucket.mutex.lock();
293         }
294 
295         // Now check if our table is still the latest one. Another thread could
296         // have grown the hash table between us reading HASHTABLE and locking
297         // the buckets.
298         if HASHTABLE.load(Ordering::Relaxed) == table as *const _ as *mut _ {
299             break table;
300         }
301 
302         // Unlock buckets and try again
303         for bucket in &table.entries[..] {
304             // SAFETY: We hold the lock here, as required
305             unsafe { bucket.mutex.unlock() };
306         }
307     };
308 
309     // Create the new table
310     let mut new_table = HashTable::new(num_threads, old_table);
311 
312     // Move the entries from the old table to the new one
313     for bucket in &old_table.entries[..] {
314         // SAFETY: The park, unpark* and check_wait_graph_fast functions create only correct linked
315         // lists. All `ThreadData` instances in these lists will remain valid as long as they are
316         // present in the lists, meaning as long as their threads are parked.
317         unsafe { rehash_bucket_into(bucket, &mut new_table) };
318     }
319 
320     // Publish the new table. No races are possible at this point because
321     // any other thread trying to grow the hash table is blocked on the bucket
322     // locks in the old table.
323     HASHTABLE.store(Box::into_raw(new_table), Ordering::Release);
324 
325     // Unlock all buckets in the old table
326     for bucket in &old_table.entries[..] {
327         // SAFETY: We hold the lock here, as required
328         unsafe { bucket.mutex.unlock() };
329     }
330 }
331 
332 /// Iterate through all `ThreadData` objects in the bucket and insert them into the given table
333 /// in the bucket their key correspond to for this table.
334 ///
335 /// # Safety
336 ///
337 /// The given `bucket` must have a correctly constructed linked list under `queue_head`, containing
338 /// `ThreadData` instances that must stay valid at least as long as the given `table` is in use.
339 ///
340 /// The given `table` must only contain buckets with correctly constructed linked lists.
rehash_bucket_into(bucket: &'static Bucket, table: &mut HashTable)341 unsafe fn rehash_bucket_into(bucket: &'static Bucket, table: &mut HashTable) {
342     let mut current: *const ThreadData = bucket.queue_head.get();
343     while !current.is_null() {
344         let next = (*current).next_in_queue.get();
345         let hash = hash((*current).key.load(Ordering::Relaxed), table.hash_bits);
346         if table.entries[hash].queue_tail.get().is_null() {
347             table.entries[hash].queue_head.set(current);
348         } else {
349             (*table.entries[hash].queue_tail.get())
350                 .next_in_queue
351                 .set(current);
352         }
353         table.entries[hash].queue_tail.set(current);
354         (*current).next_in_queue.set(ptr::null());
355         current = next;
356     }
357 }
358 
359 // Hash function for addresses
360 #[cfg(target_pointer_width = "32")]
361 #[inline]
hash(key: usize, bits: u32) -> usize362 fn hash(key: usize, bits: u32) -> usize {
363     key.wrapping_mul(0x9E3779B9) >> (32 - bits)
364 }
365 #[cfg(target_pointer_width = "64")]
366 #[inline]
hash(key: usize, bits: u32) -> usize367 fn hash(key: usize, bits: u32) -> usize {
368     key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits)
369 }
370 
371 /// Locks the bucket for the given key and returns a reference to it.
372 /// The returned bucket must be unlocked again in order to not cause deadlocks.
373 #[inline]
lock_bucket(key: usize) -> &'static Bucket374 fn lock_bucket(key: usize) -> &'static Bucket {
375     loop {
376         let hashtable = get_hashtable();
377 
378         let hash = hash(key, hashtable.hash_bits);
379         let bucket = &hashtable.entries[hash];
380 
381         // Lock the bucket
382         bucket.mutex.lock();
383 
384         // If no other thread has rehashed the table before we grabbed the lock
385         // then we are good to go! The lock we grabbed prevents any rehashes.
386         if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ {
387             return bucket;
388         }
389 
390         // Unlock the bucket and try again
391         // SAFETY: We hold the lock here, as required
392         unsafe { bucket.mutex.unlock() };
393     }
394 }
395 
396 /// Locks the bucket for the given key and returns a reference to it. But checks that the key
397 /// hasn't been changed in the meantime due to a requeue.
398 /// The returned bucket must be unlocked again in order to not cause deadlocks.
399 #[inline]
lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket)400 fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) {
401     loop {
402         let hashtable = get_hashtable();
403         let current_key = key.load(Ordering::Relaxed);
404 
405         let hash = hash(current_key, hashtable.hash_bits);
406         let bucket = &hashtable.entries[hash];
407 
408         // Lock the bucket
409         bucket.mutex.lock();
410 
411         // Check that both the hash table and key are correct while the bucket
412         // is locked. Note that the key can't change once we locked the proper
413         // bucket for it, so we just keep trying until we have the correct key.
414         if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _
415             && key.load(Ordering::Relaxed) == current_key
416         {
417             return (current_key, bucket);
418         }
419 
420         // Unlock the bucket and try again
421         // SAFETY: We hold the lock here, as required
422         unsafe { bucket.mutex.unlock() };
423     }
424 }
425 
426 /// Locks the two buckets for the given pair of keys and returns references to them.
427 /// The returned buckets must be unlocked again in order to not cause deadlocks.
428 ///
429 /// If both keys hash to the same value, both returned references will be to the same bucket. Be
430 /// careful to only unlock it once in this case, always use `unlock_bucket_pair`.
431 #[inline]
lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Bucket)432 fn lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Bucket) {
433     loop {
434         let hashtable = get_hashtable();
435 
436         let hash1 = hash(key1, hashtable.hash_bits);
437         let hash2 = hash(key2, hashtable.hash_bits);
438 
439         // Get the bucket at the lowest hash/index first
440         let bucket1 = if hash1 <= hash2 {
441             &hashtable.entries[hash1]
442         } else {
443             &hashtable.entries[hash2]
444         };
445 
446         // Lock the first bucket
447         bucket1.mutex.lock();
448 
449         // If no other thread has rehashed the table before we grabbed the lock
450         // then we are good to go! The lock we grabbed prevents any rehashes.
451         if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ {
452             // Now lock the second bucket and return the two buckets
453             if hash1 == hash2 {
454                 return (bucket1, bucket1);
455             } else if hash1 < hash2 {
456                 let bucket2 = &hashtable.entries[hash2];
457                 bucket2.mutex.lock();
458                 return (bucket1, bucket2);
459             } else {
460                 let bucket2 = &hashtable.entries[hash1];
461                 bucket2.mutex.lock();
462                 return (bucket2, bucket1);
463             }
464         }
465 
466         // Unlock the bucket and try again
467         // SAFETY: We hold the lock here, as required
468         unsafe { bucket1.mutex.unlock() };
469     }
470 }
471 
472 /// Unlock a pair of buckets
473 ///
474 /// # Safety
475 ///
476 /// Both buckets must be locked
477 #[inline]
unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket)478 unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) {
479     bucket1.mutex.unlock();
480     if !ptr::eq(bucket1, bucket2) {
481         bucket2.mutex.unlock();
482     }
483 }
484 
485 /// Result of a park operation.
486 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
487 pub enum ParkResult {
488     /// We were unparked by another thread with the given token.
489     Unparked(UnparkToken),
490 
491     /// The validation callback returned false.
492     Invalid,
493 
494     /// The timeout expired.
495     TimedOut,
496 }
497 
498 impl ParkResult {
499     /// Returns true if we were unparked by another thread.
500     #[inline]
is_unparked(self) -> bool501     pub fn is_unparked(self) -> bool {
502         if let ParkResult::Unparked(_) = self {
503             true
504         } else {
505             false
506         }
507     }
508 }
509 
510 /// Result of an unpark operation.
511 #[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
512 pub struct UnparkResult {
513     /// The number of threads that were unparked.
514     pub unparked_threads: usize,
515 
516     /// The number of threads that were requeued.
517     pub requeued_threads: usize,
518 
519     /// Whether there are any threads remaining in the queue. This only returns
520     /// true if a thread was unparked.
521     pub have_more_threads: bool,
522 
523     /// This is set to true on average once every 0.5ms for any given key. It
524     /// should be used to switch to a fair unlocking mechanism for a particular
525     /// unlock.
526     pub be_fair: bool,
527 
528     /// Private field so new fields can be added without breakage.
529     _sealed: (),
530 }
531 
532 /// Operation that `unpark_requeue` should perform.
533 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
534 pub enum RequeueOp {
535     /// Abort the operation without doing anything.
536     Abort,
537 
538     /// Unpark one thread and requeue the rest onto the target queue.
539     UnparkOneRequeueRest,
540 
541     /// Requeue all threads onto the target queue.
542     RequeueAll,
543 
544     /// Unpark one thread and leave the rest parked. No requeuing is done.
545     UnparkOne,
546 
547     /// Requeue one thread and leave the rest parked on the original queue.
548     RequeueOne,
549 }
550 
551 /// Operation that `unpark_filter` should perform for each thread.
552 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
553 pub enum FilterOp {
554     /// Unpark the thread and continue scanning the list of parked threads.
555     Unpark,
556 
557     /// Don't unpark the thread and continue scanning the list of parked threads.
558     Skip,
559 
560     /// Don't unpark the thread and stop scanning the list of parked threads.
561     Stop,
562 }
563 
564 /// A value which is passed from an unparker to a parked thread.
565 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
566 pub struct UnparkToken(pub usize);
567 
568 /// A value associated with a parked thread which can be used by `unpark_filter`.
569 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
570 pub struct ParkToken(pub usize);
571 
572 /// A default unpark token to use.
573 pub const DEFAULT_UNPARK_TOKEN: UnparkToken = UnparkToken(0);
574 
575 /// A default park token to use.
576 pub const DEFAULT_PARK_TOKEN: ParkToken = ParkToken(0);
577 
578 /// Parks the current thread in the queue associated with the given key.
579 ///
580 /// The `validate` function is called while the queue is locked and can abort
581 /// the operation by returning false. If `validate` returns true then the
582 /// current thread is appended to the queue and the queue is unlocked.
583 ///
584 /// The `before_sleep` function is called after the queue is unlocked but before
585 /// the thread is put to sleep. The thread will then sleep until it is unparked
586 /// or the given timeout is reached.
587 ///
588 /// The `timed_out` function is also called while the queue is locked, but only
589 /// if the timeout was reached. It is passed the key of the queue it was in when
590 /// it timed out, which may be different from the original key if
591 /// `unpark_requeue` was called. It is also passed a bool which indicates
592 /// whether it was the last thread in the queue.
593 ///
594 /// # Safety
595 ///
596 /// You should only call this function with an address that you control, since
597 /// you could otherwise interfere with the operation of other synchronization
598 /// primitives.
599 ///
600 /// The `validate` and `timed_out` functions are called while the queue is
601 /// locked and must not panic or call into any function in `parking_lot`.
602 ///
603 /// The `before_sleep` function is called outside the queue lock and is allowed
604 /// to call `unpark_one`, `unpark_all`, `unpark_requeue` or `unpark_filter`, but
605 /// it is not allowed to call `park` or panic.
606 #[inline]
park( key: usize, validate: impl FnOnce() -> bool, before_sleep: impl FnOnce(), timed_out: impl FnOnce(usize, bool), park_token: ParkToken, timeout: Option<Instant>, ) -> ParkResult607 pub unsafe fn park(
608     key: usize,
609     validate: impl FnOnce() -> bool,
610     before_sleep: impl FnOnce(),
611     timed_out: impl FnOnce(usize, bool),
612     park_token: ParkToken,
613     timeout: Option<Instant>,
614 ) -> ParkResult {
615     // Grab our thread data, this also ensures that the hash table exists
616     with_thread_data(|thread_data| {
617         // Lock the bucket for the given key
618         let bucket = lock_bucket(key);
619 
620         // If the validation function fails, just return
621         if !validate() {
622             // SAFETY: We hold the lock here, as required
623             bucket.mutex.unlock();
624             return ParkResult::Invalid;
625         }
626 
627         // Append our thread data to the queue and unlock the bucket
628         thread_data.parked_with_timeout.set(timeout.is_some());
629         thread_data.next_in_queue.set(ptr::null());
630         thread_data.key.store(key, Ordering::Relaxed);
631         thread_data.park_token.set(park_token);
632         thread_data.parker.prepare_park();
633         if !bucket.queue_head.get().is_null() {
634             (*bucket.queue_tail.get()).next_in_queue.set(thread_data);
635         } else {
636             bucket.queue_head.set(thread_data);
637         }
638         bucket.queue_tail.set(thread_data);
639         // SAFETY: We hold the lock here, as required
640         bucket.mutex.unlock();
641 
642         // Invoke the pre-sleep callback
643         before_sleep();
644 
645         // Park our thread and determine whether we were woken up by an unpark
646         // or by our timeout. Note that this isn't precise: we can still be
647         // unparked since we are still in the queue.
648         let unparked = match timeout {
649             Some(timeout) => thread_data.parker.park_until(timeout),
650             None => {
651                 thread_data.parker.park();
652                 // call deadlock detection on_unpark hook
653                 deadlock::on_unpark(thread_data);
654                 true
655             }
656         };
657 
658         // If we were unparked, return now
659         if unparked {
660             return ParkResult::Unparked(thread_data.unpark_token.get());
661         }
662 
663         // Lock our bucket again. Note that the hashtable may have been rehashed in
664         // the meantime. Our key may also have changed if we were requeued.
665         let (key, bucket) = lock_bucket_checked(&thread_data.key);
666 
667         // Now we need to check again if we were unparked or timed out. Unlike the
668         // last check this is precise because we hold the bucket lock.
669         if !thread_data.parker.timed_out() {
670             // SAFETY: We hold the lock here, as required
671             bucket.mutex.unlock();
672             return ParkResult::Unparked(thread_data.unpark_token.get());
673         }
674 
675         // We timed out, so we now need to remove our thread from the queue
676         let mut link = &bucket.queue_head;
677         let mut current = bucket.queue_head.get();
678         let mut previous = ptr::null();
679         let mut was_last_thread = true;
680         while !current.is_null() {
681             if current == thread_data {
682                 let next = (*current).next_in_queue.get();
683                 link.set(next);
684                 if bucket.queue_tail.get() == current {
685                     bucket.queue_tail.set(previous);
686                 } else {
687                     // Scan the rest of the queue to see if there are any other
688                     // entries with the given key.
689                     let mut scan = next;
690                     while !scan.is_null() {
691                         if (*scan).key.load(Ordering::Relaxed) == key {
692                             was_last_thread = false;
693                             break;
694                         }
695                         scan = (*scan).next_in_queue.get();
696                     }
697                 }
698 
699                 // Callback to indicate that we timed out, and whether we were the
700                 // last thread on the queue.
701                 timed_out(key, was_last_thread);
702                 break;
703             } else {
704                 if (*current).key.load(Ordering::Relaxed) == key {
705                     was_last_thread = false;
706                 }
707                 link = &(*current).next_in_queue;
708                 previous = current;
709                 current = link.get();
710             }
711         }
712 
713         // There should be no way for our thread to have been removed from the queue
714         // if we timed out.
715         debug_assert!(!current.is_null());
716 
717         // Unlock the bucket, we are done
718         // SAFETY: We hold the lock here, as required
719         bucket.mutex.unlock();
720         ParkResult::TimedOut
721     })
722 }
723 
724 /// Unparks one thread from the queue associated with the given key.
725 ///
726 /// The `callback` function is called while the queue is locked and before the
727 /// target thread is woken up. The `UnparkResult` argument to the function
728 /// indicates whether a thread was found in the queue and whether this was the
729 /// last thread in the queue. This value is also returned by `unpark_one`.
730 ///
731 /// The `callback` function should return an `UnparkToken` value which will be
732 /// passed to the thread that is unparked. If no thread is unparked then the
733 /// returned value is ignored.
734 ///
735 /// # Safety
736 ///
737 /// You should only call this function with an address that you control, since
738 /// you could otherwise interfere with the operation of other synchronization
739 /// primitives.
740 ///
741 /// The `callback` function is called while the queue is locked and must not
742 /// panic or call into any function in `parking_lot`.
743 #[inline]
unpark_one( key: usize, callback: impl FnOnce(UnparkResult) -> UnparkToken, ) -> UnparkResult744 pub unsafe fn unpark_one(
745     key: usize,
746     callback: impl FnOnce(UnparkResult) -> UnparkToken,
747 ) -> UnparkResult {
748     // Lock the bucket for the given key
749     let bucket = lock_bucket(key);
750 
751     // Find a thread with a matching key and remove it from the queue
752     let mut link = &bucket.queue_head;
753     let mut current = bucket.queue_head.get();
754     let mut previous = ptr::null();
755     let mut result = UnparkResult::default();
756     while !current.is_null() {
757         if (*current).key.load(Ordering::Relaxed) == key {
758             // Remove the thread from the queue
759             let next = (*current).next_in_queue.get();
760             link.set(next);
761             if bucket.queue_tail.get() == current {
762                 bucket.queue_tail.set(previous);
763             } else {
764                 // Scan the rest of the queue to see if there are any other
765                 // entries with the given key.
766                 let mut scan = next;
767                 while !scan.is_null() {
768                     if (*scan).key.load(Ordering::Relaxed) == key {
769                         result.have_more_threads = true;
770                         break;
771                     }
772                     scan = (*scan).next_in_queue.get();
773                 }
774             }
775 
776             // Invoke the callback before waking up the thread
777             result.unparked_threads = 1;
778             result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
779             let token = callback(result);
780 
781             // Set the token for the target thread
782             (*current).unpark_token.set(token);
783 
784             // This is a bit tricky: we first lock the ThreadParker to prevent
785             // the thread from exiting and freeing its ThreadData if its wait
786             // times out. Then we unlock the queue since we don't want to keep
787             // the queue locked while we perform a system call. Finally we wake
788             // up the parked thread.
789             let handle = (*current).parker.unpark_lock();
790             // SAFETY: We hold the lock here, as required
791             bucket.mutex.unlock();
792             handle.unpark();
793 
794             return result;
795         } else {
796             link = &(*current).next_in_queue;
797             previous = current;
798             current = link.get();
799         }
800     }
801 
802     // No threads with a matching key were found in the bucket
803     callback(result);
804     // SAFETY: We hold the lock here, as required
805     bucket.mutex.unlock();
806     result
807 }
808 
809 /// Unparks all threads in the queue associated with the given key.
810 ///
811 /// The given `UnparkToken` is passed to all unparked threads.
812 ///
813 /// This function returns the number of threads that were unparked.
814 ///
815 /// # Safety
816 ///
817 /// You should only call this function with an address that you control, since
818 /// you could otherwise interfere with the operation of other synchronization
819 /// primitives.
820 #[inline]
unpark_all(key: usize, unpark_token: UnparkToken) -> usize821 pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize {
822     // Lock the bucket for the given key
823     let bucket = lock_bucket(key);
824 
825     // Remove all threads with the given key in the bucket
826     let mut link = &bucket.queue_head;
827     let mut current = bucket.queue_head.get();
828     let mut previous = ptr::null();
829     let mut threads = SmallVec::<[_; 8]>::new();
830     while !current.is_null() {
831         if (*current).key.load(Ordering::Relaxed) == key {
832             // Remove the thread from the queue
833             let next = (*current).next_in_queue.get();
834             link.set(next);
835             if bucket.queue_tail.get() == current {
836                 bucket.queue_tail.set(previous);
837             }
838 
839             // Set the token for the target thread
840             (*current).unpark_token.set(unpark_token);
841 
842             // Don't wake up threads while holding the queue lock. See comment
843             // in unpark_one. For now just record which threads we need to wake
844             // up.
845             threads.push((*current).parker.unpark_lock());
846             current = next;
847         } else {
848             link = &(*current).next_in_queue;
849             previous = current;
850             current = link.get();
851         }
852     }
853 
854     // Unlock the bucket
855     // SAFETY: We hold the lock here, as required
856     bucket.mutex.unlock();
857 
858     // Now that we are outside the lock, wake up all the threads that we removed
859     // from the queue.
860     let num_threads = threads.len();
861     for handle in threads.into_iter() {
862         handle.unpark();
863     }
864 
865     num_threads
866 }
867 
868 /// Removes all threads from the queue associated with `key_from`, optionally
869 /// unparks the first one and requeues the rest onto the queue associated with
870 /// `key_to`.
871 ///
872 /// The `validate` function is called while both queues are locked. Its return
873 /// value will determine which operation is performed, or whether the operation
874 /// should be aborted. See `RequeueOp` for details about the different possible
875 /// return values.
876 ///
877 /// The `callback` function is also called while both queues are locked. It is
878 /// passed the `RequeueOp` returned by `validate` and an `UnparkResult`
879 /// indicating whether a thread was unparked and whether there are threads still
880 /// parked in the new queue. This `UnparkResult` value is also returned by
881 /// `unpark_requeue`.
882 ///
883 /// The `callback` function should return an `UnparkToken` value which will be
884 /// passed to the thread that is unparked. If no thread is unparked then the
885 /// returned value is ignored.
886 ///
887 /// # Safety
888 ///
889 /// You should only call this function with an address that you control, since
890 /// you could otherwise interfere with the operation of other synchronization
891 /// primitives.
892 ///
893 /// The `validate` and `callback` functions are called while the queue is locked
894 /// and must not panic or call into any function in `parking_lot`.
895 #[inline]
unpark_requeue( key_from: usize, key_to: usize, validate: impl FnOnce() -> RequeueOp, callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken, ) -> UnparkResult896 pub unsafe fn unpark_requeue(
897     key_from: usize,
898     key_to: usize,
899     validate: impl FnOnce() -> RequeueOp,
900     callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken,
901 ) -> UnparkResult {
902     // Lock the two buckets for the given key
903     let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to);
904 
905     // If the validation function fails, just return
906     let mut result = UnparkResult::default();
907     let op = validate();
908     if op == RequeueOp::Abort {
909         // SAFETY: Both buckets are locked, as required.
910         unlock_bucket_pair(bucket_from, bucket_to);
911         return result;
912     }
913 
914     // Remove all threads with the given key in the source bucket
915     let mut link = &bucket_from.queue_head;
916     let mut current = bucket_from.queue_head.get();
917     let mut previous = ptr::null();
918     let mut requeue_threads: *const ThreadData = ptr::null();
919     let mut requeue_threads_tail: *const ThreadData = ptr::null();
920     let mut wakeup_thread = None;
921     while !current.is_null() {
922         if (*current).key.load(Ordering::Relaxed) == key_from {
923             // Remove the thread from the queue
924             let next = (*current).next_in_queue.get();
925             link.set(next);
926             if bucket_from.queue_tail.get() == current {
927                 bucket_from.queue_tail.set(previous);
928             }
929 
930             // Prepare the first thread for wakeup and requeue the rest.
931             if (op == RequeueOp::UnparkOneRequeueRest || op == RequeueOp::UnparkOne)
932                 && wakeup_thread.is_none()
933             {
934                 wakeup_thread = Some(current);
935                 result.unparked_threads = 1;
936             } else {
937                 if !requeue_threads.is_null() {
938                     (*requeue_threads_tail).next_in_queue.set(current);
939                 } else {
940                     requeue_threads = current;
941                 }
942                 requeue_threads_tail = current;
943                 (*current).key.store(key_to, Ordering::Relaxed);
944                 result.requeued_threads += 1;
945             }
946             if op == RequeueOp::UnparkOne || op == RequeueOp::RequeueOne {
947                 // Scan the rest of the queue to see if there are any other
948                 // entries with the given key.
949                 let mut scan = next;
950                 while !scan.is_null() {
951                     if (*scan).key.load(Ordering::Relaxed) == key_from {
952                         result.have_more_threads = true;
953                         break;
954                     }
955                     scan = (*scan).next_in_queue.get();
956                 }
957                 break;
958             }
959             current = next;
960         } else {
961             link = &(*current).next_in_queue;
962             previous = current;
963             current = link.get();
964         }
965     }
966 
967     // Add the requeued threads to the destination bucket
968     if !requeue_threads.is_null() {
969         (*requeue_threads_tail).next_in_queue.set(ptr::null());
970         if !bucket_to.queue_head.get().is_null() {
971             (*bucket_to.queue_tail.get())
972                 .next_in_queue
973                 .set(requeue_threads);
974         } else {
975             bucket_to.queue_head.set(requeue_threads);
976         }
977         bucket_to.queue_tail.set(requeue_threads_tail);
978     }
979 
980     // Invoke the callback before waking up the thread
981     if result.unparked_threads != 0 {
982         result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout();
983     }
984     let token = callback(op, result);
985 
986     // See comment in unpark_one for why we mess with the locking
987     if let Some(wakeup_thread) = wakeup_thread {
988         (*wakeup_thread).unpark_token.set(token);
989         let handle = (*wakeup_thread).parker.unpark_lock();
990         // SAFETY: Both buckets are locked, as required.
991         unlock_bucket_pair(bucket_from, bucket_to);
992         handle.unpark();
993     } else {
994         // SAFETY: Both buckets are locked, as required.
995         unlock_bucket_pair(bucket_from, bucket_to);
996     }
997 
998     result
999 }
1000 
1001 /// Unparks a number of threads from the front of the queue associated with
1002 /// `key` depending on the results of a filter function which inspects the
1003 /// `ParkToken` associated with each thread.
1004 ///
1005 /// The `filter` function is called for each thread in the queue or until
1006 /// `FilterOp::Stop` is returned. This function is passed the `ParkToken`
1007 /// associated with a particular thread, which is unparked if `FilterOp::Unpark`
1008 /// is returned.
1009 ///
1010 /// The `callback` function is also called while both queues are locked. It is
1011 /// passed an `UnparkResult` indicating the number of threads that were unparked
1012 /// and whether there are still parked threads in the queue. This `UnparkResult`
1013 /// value is also returned by `unpark_filter`.
1014 ///
1015 /// The `callback` function should return an `UnparkToken` value which will be
1016 /// passed to all threads that are unparked. If no thread is unparked then the
1017 /// returned value is ignored.
1018 ///
1019 /// # Safety
1020 ///
1021 /// You should only call this function with an address that you control, since
1022 /// you could otherwise interfere with the operation of other synchronization
1023 /// primitives.
1024 ///
1025 /// The `filter` and `callback` functions are called while the queue is locked
1026 /// and must not panic or call into any function in `parking_lot`.
1027 #[inline]
unpark_filter( key: usize, mut filter: impl FnMut(ParkToken) -> FilterOp, callback: impl FnOnce(UnparkResult) -> UnparkToken, ) -> UnparkResult1028 pub unsafe fn unpark_filter(
1029     key: usize,
1030     mut filter: impl FnMut(ParkToken) -> FilterOp,
1031     callback: impl FnOnce(UnparkResult) -> UnparkToken,
1032 ) -> UnparkResult {
1033     // Lock the bucket for the given key
1034     let bucket = lock_bucket(key);
1035 
1036     // Go through the queue looking for threads with a matching key
1037     let mut link = &bucket.queue_head;
1038     let mut current = bucket.queue_head.get();
1039     let mut previous = ptr::null();
1040     let mut threads = SmallVec::<[_; 8]>::new();
1041     let mut result = UnparkResult::default();
1042     while !current.is_null() {
1043         if (*current).key.load(Ordering::Relaxed) == key {
1044             // Call the filter function with the thread's ParkToken
1045             let next = (*current).next_in_queue.get();
1046             match filter((*current).park_token.get()) {
1047                 FilterOp::Unpark => {
1048                     // Remove the thread from the queue
1049                     link.set(next);
1050                     if bucket.queue_tail.get() == current {
1051                         bucket.queue_tail.set(previous);
1052                     }
1053 
1054                     // Add the thread to our list of threads to unpark
1055                     threads.push((current, None));
1056 
1057                     current = next;
1058                 }
1059                 FilterOp::Skip => {
1060                     result.have_more_threads = true;
1061                     link = &(*current).next_in_queue;
1062                     previous = current;
1063                     current = link.get();
1064                 }
1065                 FilterOp::Stop => {
1066                     result.have_more_threads = true;
1067                     break;
1068                 }
1069             }
1070         } else {
1071             link = &(*current).next_in_queue;
1072             previous = current;
1073             current = link.get();
1074         }
1075     }
1076 
1077     // Invoke the callback before waking up the threads
1078     result.unparked_threads = threads.len();
1079     if result.unparked_threads != 0 {
1080         result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
1081     }
1082     let token = callback(result);
1083 
1084     // Pass the token to all threads that are going to be unparked and prepare
1085     // them for unparking.
1086     for t in threads.iter_mut() {
1087         (*t.0).unpark_token.set(token);
1088         t.1 = Some((*t.0).parker.unpark_lock());
1089     }
1090 
1091     // SAFETY: We hold the lock here, as required
1092     bucket.mutex.unlock();
1093 
1094     // Now that we are outside the lock, wake up all the threads that we removed
1095     // from the queue.
1096     for (_, handle) in threads.into_iter() {
1097         handle.unchecked_unwrap().unpark();
1098     }
1099 
1100     result
1101 }
1102 
1103 /// \[Experimental\] Deadlock detection
1104 ///
1105 /// Enabled via the `deadlock_detection` feature flag.
1106 pub mod deadlock {
1107     #[cfg(feature = "deadlock_detection")]
1108     use super::deadlock_impl;
1109 
1110     #[cfg(feature = "deadlock_detection")]
1111     pub(super) use super::deadlock_impl::DeadlockData;
1112 
1113     /// Acquire a resource identified by key in the deadlock detector
1114     /// Noop if deadlock_detection feature isn't enabled.
1115     ///
1116     /// # Safety
1117     ///
1118     /// Call after the resource is acquired
1119     #[inline]
acquire_resource(_key: usize)1120     pub unsafe fn acquire_resource(_key: usize) {
1121         #[cfg(feature = "deadlock_detection")]
1122         deadlock_impl::acquire_resource(_key);
1123     }
1124 
1125     /// Release a resource identified by key in the deadlock detector.
1126     /// Noop if deadlock_detection feature isn't enabled.
1127     ///
1128     /// # Panics
1129     ///
1130     /// Panics if the resource was already released or wasn't acquired in this thread.
1131     ///
1132     /// # Safety
1133     ///
1134     /// Call before the resource is released
1135     #[inline]
release_resource(_key: usize)1136     pub unsafe fn release_resource(_key: usize) {
1137         #[cfg(feature = "deadlock_detection")]
1138         deadlock_impl::release_resource(_key);
1139     }
1140 
1141     /// Returns all deadlocks detected *since* the last call.
1142     /// Each cycle consist of a vector of `DeadlockedThread`.
1143     #[cfg(feature = "deadlock_detection")]
1144     #[inline]
check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>>1145     pub fn check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>> {
1146         deadlock_impl::check_deadlock()
1147     }
1148 
1149     #[inline]
on_unpark(_td: &super::ThreadData)1150     pub(super) unsafe fn on_unpark(_td: &super::ThreadData) {
1151         #[cfg(feature = "deadlock_detection")]
1152         deadlock_impl::on_unpark(_td);
1153     }
1154 }
1155 
1156 #[cfg(feature = "deadlock_detection")]
1157 mod deadlock_impl {
1158     use super::{get_hashtable, lock_bucket, with_thread_data, ThreadData, NUM_THREADS};
1159     use crate::thread_parker::{ThreadParkerT, UnparkHandleT};
1160     use crate::word_lock::WordLock;
1161     use backtrace::Backtrace;
1162     use petgraph;
1163     use petgraph::graphmap::DiGraphMap;
1164     use std::cell::{Cell, UnsafeCell};
1165     use std::collections::HashSet;
1166     use std::sync::atomic::Ordering;
1167     use std::sync::mpsc;
1168     use thread_id;
1169 
1170     /// Representation of a deadlocked thread
1171     pub struct DeadlockedThread {
1172         thread_id: usize,
1173         backtrace: Backtrace,
1174     }
1175 
1176     impl DeadlockedThread {
1177         /// The system thread id
thread_id(&self) -> usize1178         pub fn thread_id(&self) -> usize {
1179             self.thread_id
1180         }
1181 
1182         /// The thread backtrace
backtrace(&self) -> &Backtrace1183         pub fn backtrace(&self) -> &Backtrace {
1184             &self.backtrace
1185         }
1186     }
1187 
1188     pub struct DeadlockData {
1189         // Currently owned resources (keys)
1190         resources: UnsafeCell<Vec<usize>>,
1191 
1192         // Set when there's a pending callstack request
1193         deadlocked: Cell<bool>,
1194 
1195         // Sender used to report the backtrace
1196         backtrace_sender: UnsafeCell<Option<mpsc::Sender<DeadlockedThread>>>,
1197 
1198         // System thread id
1199         thread_id: usize,
1200     }
1201 
1202     impl DeadlockData {
new() -> Self1203         pub fn new() -> Self {
1204             DeadlockData {
1205                 resources: UnsafeCell::new(Vec::new()),
1206                 deadlocked: Cell::new(false),
1207                 backtrace_sender: UnsafeCell::new(None),
1208                 thread_id: thread_id::get(),
1209             }
1210         }
1211     }
1212 
on_unpark(td: &ThreadData)1213     pub(super) unsafe fn on_unpark(td: &ThreadData) {
1214         if td.deadlock_data.deadlocked.get() {
1215             let sender = (*td.deadlock_data.backtrace_sender.get()).take().unwrap();
1216             sender
1217                 .send(DeadlockedThread {
1218                     thread_id: td.deadlock_data.thread_id,
1219                     backtrace: Backtrace::new(),
1220                 })
1221                 .unwrap();
1222             // make sure to close this sender
1223             drop(sender);
1224 
1225             // park until the end of the time
1226             td.parker.prepare_park();
1227             td.parker.park();
1228             unreachable!("unparked deadlocked thread!");
1229         }
1230     }
1231 
acquire_resource(key: usize)1232     pub unsafe fn acquire_resource(key: usize) {
1233         with_thread_data(|thread_data| {
1234             (*thread_data.deadlock_data.resources.get()).push(key);
1235         });
1236     }
1237 
release_resource(key: usize)1238     pub unsafe fn release_resource(key: usize) {
1239         with_thread_data(|thread_data| {
1240             let resources = &mut (*thread_data.deadlock_data.resources.get());
1241 
1242             // There is only one situation where we can fail to find the
1243             // resource: we are currently running TLS destructors and our
1244             // ThreadData has already been freed. There isn't much we can do
1245             // about it at this point, so just ignore it.
1246             if let Some(p) = resources.iter().rposition(|x| *x == key) {
1247                 resources.swap_remove(p);
1248             }
1249         });
1250     }
1251 
check_deadlock() -> Vec<Vec<DeadlockedThread>>1252     pub fn check_deadlock() -> Vec<Vec<DeadlockedThread>> {
1253         unsafe {
1254             // fast pass
1255             if check_wait_graph_fast() {
1256                 // double check
1257                 check_wait_graph_slow()
1258             } else {
1259                 Vec::new()
1260             }
1261         }
1262     }
1263 
1264     // Simple algorithm that builds a wait graph f the threads and the resources,
1265     // then checks for the presence of cycles (deadlocks).
1266     // This variant isn't precise as it doesn't lock the entire table before checking
check_wait_graph_fast() -> bool1267     unsafe fn check_wait_graph_fast() -> bool {
1268         let table = get_hashtable();
1269         let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1270         let mut graph = DiGraphMap::<usize, ()>::with_capacity(thread_count * 2, thread_count * 2);
1271 
1272         for b in &(*table).entries[..] {
1273             b.mutex.lock();
1274             let mut current = b.queue_head.get();
1275             while !current.is_null() {
1276                 if !(*current).parked_with_timeout.get()
1277                     && !(*current).deadlock_data.deadlocked.get()
1278                 {
1279                     // .resources are waiting for their owner
1280                     for &resource in &(*(*current).deadlock_data.resources.get()) {
1281                         graph.add_edge(resource, current as usize, ());
1282                     }
1283                     // owner waits for resource .key
1284                     graph.add_edge(current as usize, (*current).key.load(Ordering::Relaxed), ());
1285                 }
1286                 current = (*current).next_in_queue.get();
1287             }
1288             // SAFETY: We hold the lock here, as required
1289             b.mutex.unlock();
1290         }
1291 
1292         petgraph::algo::is_cyclic_directed(&graph)
1293     }
1294 
1295     #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
1296     enum WaitGraphNode {
1297         Thread(*const ThreadData),
1298         Resource(usize),
1299     }
1300 
1301     use self::WaitGraphNode::*;
1302 
1303     // Contrary to the _fast variant this locks the entries table before looking for cycles.
1304     // Returns all detected thread wait cycles.
1305     // Note that once a cycle is reported it's never reported again.
check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>>1306     unsafe fn check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>> {
1307         static DEADLOCK_DETECTION_LOCK: WordLock = WordLock::new();
1308         DEADLOCK_DETECTION_LOCK.lock();
1309 
1310         let mut table = get_hashtable();
1311         loop {
1312             // Lock all buckets in the old table
1313             for b in &table.entries[..] {
1314                 b.mutex.lock();
1315             }
1316 
1317             // Now check if our table is still the latest one. Another thread could
1318             // have grown the hash table between us getting and locking the hash table.
1319             let new_table = get_hashtable();
1320             if new_table as *const _ == table as *const _ {
1321                 break;
1322             }
1323 
1324             // Unlock buckets and try again
1325             for b in &table.entries[..] {
1326                 // SAFETY: We hold the lock here, as required
1327                 b.mutex.unlock();
1328             }
1329 
1330             table = new_table;
1331         }
1332 
1333         let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1334         let mut graph =
1335             DiGraphMap::<WaitGraphNode, ()>::with_capacity(thread_count * 2, thread_count * 2);
1336 
1337         for b in &table.entries[..] {
1338             let mut current = b.queue_head.get();
1339             while !current.is_null() {
1340                 if !(*current).parked_with_timeout.get()
1341                     && !(*current).deadlock_data.deadlocked.get()
1342                 {
1343                     // .resources are waiting for their owner
1344                     for &resource in &(*(*current).deadlock_data.resources.get()) {
1345                         graph.add_edge(Resource(resource), Thread(current), ());
1346                     }
1347                     // owner waits for resource .key
1348                     graph.add_edge(
1349                         Thread(current),
1350                         Resource((*current).key.load(Ordering::Relaxed)),
1351                         (),
1352                     );
1353                 }
1354                 current = (*current).next_in_queue.get();
1355             }
1356         }
1357 
1358         for b in &table.entries[..] {
1359             // SAFETY: We hold the lock here, as required
1360             b.mutex.unlock();
1361         }
1362 
1363         // find cycles
1364         let cycles = graph_cycles(&graph);
1365 
1366         let mut results = Vec::with_capacity(cycles.len());
1367 
1368         for cycle in cycles {
1369             let (sender, receiver) = mpsc::channel();
1370             for td in cycle {
1371                 let bucket = lock_bucket((*td).key.load(Ordering::Relaxed));
1372                 (*td).deadlock_data.deadlocked.set(true);
1373                 *(*td).deadlock_data.backtrace_sender.get() = Some(sender.clone());
1374                 let handle = (*td).parker.unpark_lock();
1375                 // SAFETY: We hold the lock here, as required
1376                 bucket.mutex.unlock();
1377                 // unpark the deadlocked thread!
1378                 // on unpark it'll notice the deadlocked flag and report back
1379                 handle.unpark();
1380             }
1381             // make sure to drop our sender before collecting results
1382             drop(sender);
1383             results.push(receiver.iter().collect());
1384         }
1385 
1386         DEADLOCK_DETECTION_LOCK.unlock();
1387 
1388         results
1389     }
1390 
1391     // normalize a cycle to start with the "smallest" node
normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T>1392     fn normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T> {
1393         let min_pos = input
1394             .iter()
1395             .enumerate()
1396             .min_by_key(|&(_, &t)| t)
1397             .map(|(p, _)| p)
1398             .unwrap_or(0);
1399         input
1400             .iter()
1401             .cycle()
1402             .skip(min_pos)
1403             .take(input.len())
1404             .cloned()
1405             .collect()
1406     }
1407 
1408     // returns all thread cycles in the wait graph
graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>>1409     fn graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>> {
1410         use petgraph::visit::depth_first_search;
1411         use petgraph::visit::DfsEvent;
1412         use petgraph::visit::NodeIndexable;
1413 
1414         let mut cycles = HashSet::new();
1415         let mut path = Vec::with_capacity(g.node_bound());
1416         // start from threads to get the correct threads cycle
1417         let threads = g
1418             .nodes()
1419             .filter(|n| if let &Thread(_) = n { true } else { false });
1420 
1421         depth_first_search(g, threads, |e| match e {
1422             DfsEvent::Discover(Thread(n), _) => path.push(n),
1423             DfsEvent::Finish(Thread(_), _) => {
1424                 path.pop();
1425             }
1426             DfsEvent::BackEdge(_, Thread(n)) => {
1427                 let from = path.iter().rposition(|&i| i == n).unwrap();
1428                 cycles.insert(normalize_cycle(&path[from..]));
1429             }
1430             _ => (),
1431         });
1432 
1433         cycles.iter().cloned().collect()
1434     }
1435 }
1436 
1437 #[cfg(test)]
1438 mod tests {
1439     use super::{ThreadData, DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN};
1440     use std::{
1441         ptr,
1442         sync::{
1443             atomic::{AtomicIsize, AtomicPtr, AtomicUsize, Ordering},
1444             Arc,
1445         },
1446         thread,
1447         time::Duration,
1448     };
1449 
1450     /// Calls a closure for every `ThreadData` currently parked on a given key
for_each(key: usize, mut f: impl FnMut(&ThreadData))1451     fn for_each(key: usize, mut f: impl FnMut(&ThreadData)) {
1452         let bucket = super::lock_bucket(key);
1453 
1454         let mut current: *const ThreadData = bucket.queue_head.get();
1455         while !current.is_null() {
1456             let current_ref = unsafe { &*current };
1457             if current_ref.key.load(Ordering::Relaxed) == key {
1458                 f(current_ref);
1459             }
1460             current = current_ref.next_in_queue.get();
1461         }
1462 
1463         // SAFETY: We hold the lock here, as required
1464         unsafe { bucket.mutex.unlock() };
1465     }
1466 
1467     macro_rules! test {
1468         ( $( $name:ident(
1469             repeats: $repeats:expr,
1470             latches: $latches:expr,
1471             delay: $delay:expr,
1472             threads: $threads:expr,
1473             single_unparks: $single_unparks:expr);
1474         )* ) => {
1475             $(#[test]
1476             fn $name() {
1477                 let delay = Duration::from_micros($delay);
1478                 for _ in 0..$repeats {
1479                     run_parking_test($latches, delay, $threads, $single_unparks);
1480                 }
1481             })*
1482         };
1483     }
1484 
1485     test! {
1486         unpark_all_one_fast(
1487             repeats: 10000, latches: 1, delay: 0, threads: 1, single_unparks: 0
1488         );
1489         unpark_all_hundred_fast(
1490             repeats: 100, latches: 1, delay: 0, threads: 100, single_unparks: 0
1491         );
1492         unpark_one_one_fast(
1493             repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 1
1494         );
1495         unpark_one_hundred_fast(
1496             repeats: 20, latches: 1, delay: 0, threads: 100, single_unparks: 100
1497         );
1498         unpark_one_fifty_then_fifty_all_fast(
1499             repeats: 50, latches: 1, delay: 0, threads: 100, single_unparks: 50
1500         );
1501         unpark_all_one(
1502             repeats: 100, latches: 1, delay: 10000, threads: 1, single_unparks: 0
1503         );
1504         unpark_all_hundred(
1505             repeats: 100, latches: 1, delay: 10000, threads: 100, single_unparks: 0
1506         );
1507         unpark_one_one(
1508             repeats: 10, latches: 1, delay: 10000, threads: 1, single_unparks: 1
1509         );
1510         unpark_one_fifty(
1511             repeats: 1, latches: 1, delay: 10000, threads: 50, single_unparks: 50
1512         );
1513         unpark_one_fifty_then_fifty_all(
1514             repeats: 2, latches: 1, delay: 10000, threads: 100, single_unparks: 50
1515         );
1516         hundred_unpark_all_one_fast(
1517             repeats: 100, latches: 100, delay: 0, threads: 1, single_unparks: 0
1518         );
1519         hundred_unpark_all_one(
1520             repeats: 1, latches: 100, delay: 10000, threads: 1, single_unparks: 0
1521         );
1522     }
1523 
run_parking_test( num_latches: usize, delay: Duration, num_threads: usize, num_single_unparks: usize, )1524     fn run_parking_test(
1525         num_latches: usize,
1526         delay: Duration,
1527         num_threads: usize,
1528         num_single_unparks: usize,
1529     ) {
1530         let mut tests = Vec::with_capacity(num_latches);
1531 
1532         for _ in 0..num_latches {
1533             let test = Arc::new(SingleLatchTest::new(num_threads));
1534             let mut threads = Vec::with_capacity(num_threads);
1535             for _ in 0..num_threads {
1536                 let test = test.clone();
1537                 threads.push(thread::spawn(move || test.run()));
1538             }
1539             tests.push((test, threads));
1540         }
1541 
1542         for unpark_index in 0..num_single_unparks {
1543             thread::sleep(delay);
1544             for (test, _) in &tests {
1545                 test.unpark_one(unpark_index);
1546             }
1547         }
1548 
1549         for (test, threads) in tests {
1550             test.finish(num_single_unparks);
1551             for thread in threads {
1552                 thread.join().expect("Test thread panic");
1553             }
1554         }
1555     }
1556 
1557     struct SingleLatchTest {
1558         semaphore: AtomicIsize,
1559         num_awake: AtomicUsize,
1560         /// Holds the pointer to the last *unprocessed* woken up thread.
1561         last_awoken: AtomicPtr<ThreadData>,
1562         /// Total number of threads participating in this test.
1563         num_threads: usize,
1564     }
1565 
1566     impl SingleLatchTest {
new(num_threads: usize) -> Self1567         pub fn new(num_threads: usize) -> Self {
1568             Self {
1569                 // This implements a fair (FIFO) semaphore, and it starts out unavailable.
1570                 semaphore: AtomicIsize::new(0),
1571                 num_awake: AtomicUsize::new(0),
1572                 last_awoken: AtomicPtr::new(ptr::null_mut()),
1573                 num_threads,
1574             }
1575         }
1576 
run(&self)1577         pub fn run(&self) {
1578             // Get one slot from the semaphore
1579             self.down();
1580 
1581             // Report back to the test verification code that this thread woke up
1582             let this_thread_ptr = super::with_thread_data(|t| t as *const _ as *mut _);
1583             self.last_awoken.store(this_thread_ptr, Ordering::SeqCst);
1584             self.num_awake.fetch_add(1, Ordering::SeqCst);
1585         }
1586 
unpark_one(&self, single_unpark_index: usize)1587         pub fn unpark_one(&self, single_unpark_index: usize) {
1588             // last_awoken should be null at all times except between self.up() and at the bottom
1589             // of this method where it's reset to null again
1590             assert!(self.last_awoken.load(Ordering::SeqCst).is_null());
1591 
1592             let mut queue: Vec<*mut ThreadData> = Vec::with_capacity(self.num_threads);
1593             for_each(self.semaphore_addr(), |thread_data| {
1594                 queue.push(thread_data as *const _ as *mut _);
1595             });
1596             assert!(queue.len() <= self.num_threads - single_unpark_index);
1597 
1598             let num_awake_before_up = self.num_awake.load(Ordering::SeqCst);
1599 
1600             self.up();
1601 
1602             // Wait for a parked thread to wake up and update num_awake + last_awoken.
1603             while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 {
1604                 thread::yield_now();
1605             }
1606 
1607             // At this point the other thread should have set last_awoken inside the run() method
1608             let last_awoken = self.last_awoken.load(Ordering::SeqCst);
1609             assert!(!last_awoken.is_null());
1610             if !queue.is_empty() && queue[0] != last_awoken {
1611                 panic!(
1612                     "Woke up wrong thread:\n\tqueue: {:?}\n\tlast awoken: {:?}",
1613                     queue, last_awoken
1614                 );
1615             }
1616             self.last_awoken.store(ptr::null_mut(), Ordering::SeqCst);
1617         }
1618 
finish(&self, num_single_unparks: usize)1619         pub fn finish(&self, num_single_unparks: usize) {
1620             // The amount of threads not unparked via unpark_one
1621             let mut num_threads_left = self.num_threads.checked_sub(num_single_unparks).unwrap();
1622 
1623             // Wake remaining threads up with unpark_all. Has to be in a loop, because there might
1624             // still be threads that has not yet parked.
1625             while num_threads_left > 0 {
1626                 let mut num_waiting_on_address = 0;
1627                 for_each(self.semaphore_addr(), |_thread_data| {
1628                     num_waiting_on_address += 1;
1629                 });
1630                 assert!(num_waiting_on_address <= num_threads_left);
1631 
1632                 let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst);
1633 
1634                 let num_unparked =
1635                     unsafe { super::unpark_all(self.semaphore_addr(), DEFAULT_UNPARK_TOKEN) };
1636                 assert!(num_unparked >= num_waiting_on_address);
1637                 assert!(num_unparked <= num_threads_left);
1638 
1639                 // Wait for all unparked threads to wake up and update num_awake + last_awoken.
1640                 while self.num_awake.load(Ordering::SeqCst)
1641                     != num_awake_before_unpark + num_unparked
1642                 {
1643                     thread::yield_now()
1644                 }
1645 
1646                 num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap();
1647             }
1648             // By now, all threads should have been woken up
1649             assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads);
1650 
1651             // Make sure no thread is parked on our semaphore address
1652             let mut num_waiting_on_address = 0;
1653             for_each(self.semaphore_addr(), |_thread_data| {
1654                 num_waiting_on_address += 1;
1655             });
1656             assert_eq!(num_waiting_on_address, 0);
1657         }
1658 
down(&self)1659         pub fn down(&self) {
1660             let old_semaphore_value = self.semaphore.fetch_sub(1, Ordering::SeqCst);
1661 
1662             if old_semaphore_value > 0 {
1663                 // We acquired the semaphore. Done.
1664                 return;
1665             }
1666 
1667             // We need to wait.
1668             let validate = || true;
1669             let before_sleep = || {};
1670             let timed_out = |_, _| {};
1671             unsafe {
1672                 super::park(
1673                     self.semaphore_addr(),
1674                     validate,
1675                     before_sleep,
1676                     timed_out,
1677                     DEFAULT_PARK_TOKEN,
1678                     None,
1679                 );
1680             }
1681         }
1682 
up(&self)1683         pub fn up(&self) {
1684             let old_semaphore_value = self.semaphore.fetch_add(1, Ordering::SeqCst);
1685 
1686             // Check if anyone was waiting on the semaphore. If they were, then pass ownership to them.
1687             if old_semaphore_value < 0 {
1688                 // We need to continue until we have actually unparked someone. It might be that
1689                 // the thread we want to pass ownership to has decremented the semaphore counter,
1690                 // but not yet parked.
1691                 loop {
1692                     match unsafe {
1693                         super::unpark_one(self.semaphore_addr(), |_| DEFAULT_UNPARK_TOKEN)
1694                             .unparked_threads
1695                     } {
1696                         1 => break,
1697                         0 => (),
1698                         i => panic!("Should not wake up {} threads", i),
1699                     }
1700                 }
1701             }
1702         }
1703 
semaphore_addr(&self) -> usize1704         fn semaphore_addr(&self) -> usize {
1705             &self.semaphore as *const _ as usize
1706         }
1707     }
1708 }
1709