1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT};
8 use crate::util::UncheckedOptionExt;
9 use crate::word_lock::WordLock;
10 use core::{
11     cell::{Cell, UnsafeCell},
12     ptr,
13     sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
14 };
15 use instant::Instant;
16 use smallvec::SmallVec;
17 use std::time::Duration;
18 
19 static NUM_THREADS: AtomicUsize = AtomicUsize::new(0);
20 
21 /// Holds the pointer to the currently active `HashTable`.
22 ///
23 /// # Safety
24 ///
25 /// Except for the initial value of null, it must always point to a valid `HashTable` instance.
26 /// Any `HashTable` this global static has ever pointed to must never be freed.
27 static HASHTABLE: AtomicPtr<HashTable> = AtomicPtr::new(ptr::null_mut());
28 
29 // Even with 3x more buckets than threads, the memory overhead per thread is
30 // still only a few hundred bytes per thread.
31 const LOAD_FACTOR: usize = 3;
32 
33 struct HashTable {
34     // Hash buckets for the table
35     entries: Box<[Bucket]>,
36 
37     // Number of bits used for the hash function
38     hash_bits: u32,
39 
40     // Previous table. This is only kept to keep leak detectors happy.
41     _prev: *const HashTable,
42 }
43 
44 impl HashTable {
45     #[inline]
new(num_threads: usize, prev: *const HashTable) -> Box<HashTable>46     fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> {
47         let new_size = (num_threads * LOAD_FACTOR).next_power_of_two();
48         let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1;
49 
50         let now = Instant::now();
51         let mut entries = Vec::with_capacity(new_size);
52         for i in 0..new_size {
53             // We must ensure the seed is not zero
54             entries.push(Bucket::new(now, i as u32 + 1));
55         }
56 
57         Box::new(HashTable {
58             entries: entries.into_boxed_slice(),
59             hash_bits,
60             _prev: prev,
61         })
62     }
63 }
64 
65 #[repr(align(64))]
66 struct Bucket {
67     // Lock protecting the queue
68     mutex: WordLock,
69 
70     // Linked list of threads waiting on this bucket
71     queue_head: Cell<*const ThreadData>,
72     queue_tail: Cell<*const ThreadData>,
73 
74     // Next time at which point be_fair should be set
75     fair_timeout: UnsafeCell<FairTimeout>,
76 }
77 
78 impl Bucket {
79     #[inline]
new(timeout: Instant, seed: u32) -> Self80     pub fn new(timeout: Instant, seed: u32) -> Self {
81         Self {
82             mutex: WordLock::new(),
83             queue_head: Cell::new(ptr::null()),
84             queue_tail: Cell::new(ptr::null()),
85             fair_timeout: UnsafeCell::new(FairTimeout::new(timeout, seed)),
86         }
87     }
88 }
89 
90 struct FairTimeout {
91     // Next time at which point be_fair should be set
92     timeout: Instant,
93 
94     // the PRNG state for calculating the next timeout
95     seed: u32,
96 }
97 
98 impl FairTimeout {
99     #[inline]
new(timeout: Instant, seed: u32) -> FairTimeout100     fn new(timeout: Instant, seed: u32) -> FairTimeout {
101         FairTimeout { timeout, seed }
102     }
103 
104     // Determine whether we should force a fair unlock, and update the timeout
105     #[inline]
should_timeout(&mut self) -> bool106     fn should_timeout(&mut self) -> bool {
107         let now = Instant::now();
108         if now > self.timeout {
109             // Time between 0 and 1ms.
110             let nanos = self.gen_u32() % 1_000_000;
111             self.timeout = now + Duration::new(0, nanos);
112             true
113         } else {
114             false
115         }
116     }
117 
118     // Pseudorandom number generator from the "Xorshift RNGs" paper by George Marsaglia.
gen_u32(&mut self) -> u32119     fn gen_u32(&mut self) -> u32 {
120         self.seed ^= self.seed << 13;
121         self.seed ^= self.seed >> 17;
122         self.seed ^= self.seed << 5;
123         self.seed
124     }
125 }
126 
127 struct ThreadData {
128     parker: ThreadParker,
129 
130     // Key that this thread is sleeping on. This may change if the thread is
131     // requeued to a different key.
132     key: AtomicUsize,
133 
134     // Linked list of parked threads in a bucket
135     next_in_queue: Cell<*const ThreadData>,
136 
137     // UnparkToken passed to this thread when it is unparked
138     unpark_token: Cell<UnparkToken>,
139 
140     // ParkToken value set by the thread when it was parked
141     park_token: Cell<ParkToken>,
142 
143     // Is the thread parked with a timeout?
144     parked_with_timeout: Cell<bool>,
145 
146     // Extra data for deadlock detection
147     #[cfg(feature = "deadlock_detection")]
148     deadlock_data: deadlock::DeadlockData,
149 }
150 
151 impl ThreadData {
new() -> ThreadData152     fn new() -> ThreadData {
153         // Keep track of the total number of live ThreadData objects and resize
154         // the hash table accordingly.
155         let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1;
156         grow_hashtable(num_threads);
157 
158         ThreadData {
159             parker: ThreadParker::new(),
160             key: AtomicUsize::new(0),
161             next_in_queue: Cell::new(ptr::null()),
162             unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN),
163             park_token: Cell::new(DEFAULT_PARK_TOKEN),
164             parked_with_timeout: Cell::new(false),
165             #[cfg(feature = "deadlock_detection")]
166             deadlock_data: deadlock::DeadlockData::new(),
167         }
168     }
169 }
170 
171 // Invokes the given closure with a reference to the current thread `ThreadData`.
172 #[inline(always)]
with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T173 fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T {
174     // Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive
175     // to construct. Try to use a thread-local version if possible. Otherwise just
176     // create a ThreadData on the stack
177     let mut thread_data_storage = None;
178     thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
179     let thread_data_ptr = THREAD_DATA
180         .try_with(|x| x as *const ThreadData)
181         .unwrap_or_else(|_| thread_data_storage.get_or_insert_with(ThreadData::new));
182 
183     f(unsafe { &*thread_data_ptr })
184 }
185 
186 impl Drop for ThreadData {
drop(&mut self)187     fn drop(&mut self) {
188         NUM_THREADS.fetch_sub(1, Ordering::Relaxed);
189     }
190 }
191 
192 /// Returns a reference to the latest hash table, creating one if it doesn't exist yet.
193 /// The reference is valid forever. However, the `HashTable` it references might become stale
194 /// at any point. Meaning it still exists, but it is not the instance in active use.
195 #[inline]
get_hashtable() -> &'static HashTable196 fn get_hashtable() -> &'static HashTable {
197     let table = HASHTABLE.load(Ordering::Acquire);
198 
199     // If there is no table, create one
200     if table.is_null() {
201         create_hashtable()
202     } else {
203         // SAFETY: when not null, `HASHTABLE` always points to a `HashTable` that is never freed.
204         unsafe { &*table }
205     }
206 }
207 
208 /// Returns a reference to the latest hash table, creating one if it doesn't exist yet.
209 /// The reference is valid forever. However, the `HashTable` it references might become stale
210 /// at any point. Meaning it still exists, but it is not the instance in active use.
211 #[cold]
create_hashtable() -> &'static HashTable212 fn create_hashtable() -> &'static HashTable {
213     let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null()));
214 
215     // If this fails then it means some other thread created the hash table first.
216     let table = match HASHTABLE.compare_exchange(
217         ptr::null_mut(),
218         new_table,
219         Ordering::AcqRel,
220         Ordering::Acquire,
221     ) {
222         Ok(_) => new_table,
223         Err(old_table) => {
224             // Free the table we created
225             // SAFETY: `new_table` is created from `Box::into_raw` above and only freed here.
226             unsafe {
227                 Box::from_raw(new_table);
228             }
229             old_table
230         }
231     };
232     // SAFETY: The `HashTable` behind `table` is never freed. It is either the table pointer we
233     // created here, or it is one loaded from `HASHTABLE`.
234     unsafe { &*table }
235 }
236 
237 // Grow the hash table so that it is big enough for the given number of threads.
238 // This isn't performance-critical since it is only done when a ThreadData is
239 // created, which only happens once per thread.
grow_hashtable(num_threads: usize)240 fn grow_hashtable(num_threads: usize) {
241     // Lock all buckets in the existing table and get a reference to it
242     let old_table = loop {
243         let table = get_hashtable();
244 
245         // Check if we need to resize the existing table
246         if table.entries.len() >= LOAD_FACTOR * num_threads {
247             return;
248         }
249 
250         // Lock all buckets in the old table
251         for bucket in &table.entries[..] {
252             bucket.mutex.lock();
253         }
254 
255         // Now check if our table is still the latest one. Another thread could
256         // have grown the hash table between us reading HASHTABLE and locking
257         // the buckets.
258         if HASHTABLE.load(Ordering::Relaxed) == table as *const _ as *mut _ {
259             break table;
260         }
261 
262         // Unlock buckets and try again
263         for bucket in &table.entries[..] {
264             // SAFETY: We hold the lock here, as required
265             unsafe { bucket.mutex.unlock() };
266         }
267     };
268 
269     // Create the new table
270     let mut new_table = HashTable::new(num_threads, old_table);
271 
272     // Move the entries from the old table to the new one
273     for bucket in &old_table.entries[..] {
274         // SAFETY: The park, unpark* and check_wait_graph_fast functions create only correct linked
275         // lists. All `ThreadData` instances in these lists will remain valid as long as they are
276         // present in the lists, meaning as long as their threads are parked.
277         unsafe { rehash_bucket_into(bucket, &mut new_table) };
278     }
279 
280     // Publish the new table. No races are possible at this point because
281     // any other thread trying to grow the hash table is blocked on the bucket
282     // locks in the old table.
283     HASHTABLE.store(Box::into_raw(new_table), Ordering::Release);
284 
285     // Unlock all buckets in the old table
286     for bucket in &old_table.entries[..] {
287         // SAFETY: We hold the lock here, as required
288         unsafe { bucket.mutex.unlock() };
289     }
290 }
291 
292 /// Iterate through all `ThreadData` objects in the bucket and insert them into the given table
293 /// in the bucket their key correspond to for this table.
294 ///
295 /// # Safety
296 ///
297 /// The given `bucket` must have a correctly constructed linked list under `queue_head`, containing
298 /// `ThreadData` instances that must stay valid at least as long as the given `table` is in use.
299 ///
300 /// The given `table` must only contain buckets with correctly constructed linked lists.
rehash_bucket_into(bucket: &'static Bucket, table: &mut HashTable)301 unsafe fn rehash_bucket_into(bucket: &'static Bucket, table: &mut HashTable) {
302     let mut current: *const ThreadData = bucket.queue_head.get();
303     while !current.is_null() {
304         let next = (*current).next_in_queue.get();
305         let hash = hash((*current).key.load(Ordering::Relaxed), table.hash_bits);
306         if table.entries[hash].queue_tail.get().is_null() {
307             table.entries[hash].queue_head.set(current);
308         } else {
309             (*table.entries[hash].queue_tail.get())
310                 .next_in_queue
311                 .set(current);
312         }
313         table.entries[hash].queue_tail.set(current);
314         (*current).next_in_queue.set(ptr::null());
315         current = next;
316     }
317 }
318 
319 // Hash function for addresses
320 #[cfg(target_pointer_width = "32")]
321 #[inline]
hash(key: usize, bits: u32) -> usize322 fn hash(key: usize, bits: u32) -> usize {
323     key.wrapping_mul(0x9E3779B9) >> (32 - bits)
324 }
325 #[cfg(target_pointer_width = "64")]
326 #[inline]
hash(key: usize, bits: u32) -> usize327 fn hash(key: usize, bits: u32) -> usize {
328     key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits)
329 }
330 
331 /// Locks the bucket for the given key and returns a reference to it.
332 /// The returned bucket must be unlocked again in order to not cause deadlocks.
333 #[inline]
lock_bucket(key: usize) -> &'static Bucket334 fn lock_bucket(key: usize) -> &'static Bucket {
335     loop {
336         let hashtable = get_hashtable();
337 
338         let hash = hash(key, hashtable.hash_bits);
339         let bucket = &hashtable.entries[hash];
340 
341         // Lock the bucket
342         bucket.mutex.lock();
343 
344         // If no other thread has rehashed the table before we grabbed the lock
345         // then we are good to go! The lock we grabbed prevents any rehashes.
346         if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ {
347             return bucket;
348         }
349 
350         // Unlock the bucket and try again
351         // SAFETY: We hold the lock here, as required
352         unsafe { bucket.mutex.unlock() };
353     }
354 }
355 
356 /// Locks the bucket for the given key and returns a reference to it. But checks that the key
357 /// hasn't been changed in the meantime due to a requeue.
358 /// The returned bucket must be unlocked again in order to not cause deadlocks.
359 #[inline]
lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket)360 fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) {
361     loop {
362         let hashtable = get_hashtable();
363         let current_key = key.load(Ordering::Relaxed);
364 
365         let hash = hash(current_key, hashtable.hash_bits);
366         let bucket = &hashtable.entries[hash];
367 
368         // Lock the bucket
369         bucket.mutex.lock();
370 
371         // Check that both the hash table and key are correct while the bucket
372         // is locked. Note that the key can't change once we locked the proper
373         // bucket for it, so we just keep trying until we have the correct key.
374         if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _
375             && key.load(Ordering::Relaxed) == current_key
376         {
377             return (current_key, bucket);
378         }
379 
380         // Unlock the bucket and try again
381         // SAFETY: We hold the lock here, as required
382         unsafe { bucket.mutex.unlock() };
383     }
384 }
385 
386 /// Locks the two buckets for the given pair of keys and returns references to them.
387 /// The returned buckets must be unlocked again in order to not cause deadlocks.
388 ///
389 /// If both keys hash to the same value, both returned references will be to the same bucket. Be
390 /// careful to only unlock it once in this case, always use `unlock_bucket_pair`.
391 #[inline]
lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Bucket)392 fn lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Bucket) {
393     loop {
394         let hashtable = get_hashtable();
395 
396         let hash1 = hash(key1, hashtable.hash_bits);
397         let hash2 = hash(key2, hashtable.hash_bits);
398 
399         // Get the bucket at the lowest hash/index first
400         let bucket1 = if hash1 <= hash2 {
401             &hashtable.entries[hash1]
402         } else {
403             &hashtable.entries[hash2]
404         };
405 
406         // Lock the first bucket
407         bucket1.mutex.lock();
408 
409         // If no other thread has rehashed the table before we grabbed the lock
410         // then we are good to go! The lock we grabbed prevents any rehashes.
411         if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ {
412             // Now lock the second bucket and return the two buckets
413             if hash1 == hash2 {
414                 return (bucket1, bucket1);
415             } else if hash1 < hash2 {
416                 let bucket2 = &hashtable.entries[hash2];
417                 bucket2.mutex.lock();
418                 return (bucket1, bucket2);
419             } else {
420                 let bucket2 = &hashtable.entries[hash1];
421                 bucket2.mutex.lock();
422                 return (bucket2, bucket1);
423             }
424         }
425 
426         // Unlock the bucket and try again
427         // SAFETY: We hold the lock here, as required
428         unsafe { bucket1.mutex.unlock() };
429     }
430 }
431 
432 /// Unlock a pair of buckets
433 ///
434 /// # Safety
435 ///
436 /// Both buckets must be locked
437 #[inline]
unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket)438 unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) {
439     bucket1.mutex.unlock();
440     if !ptr::eq(bucket1, bucket2) {
441         bucket2.mutex.unlock();
442     }
443 }
444 
445 /// Result of a park operation.
446 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
447 pub enum ParkResult {
448     /// We were unparked by another thread with the given token.
449     Unparked(UnparkToken),
450 
451     /// The validation callback returned false.
452     Invalid,
453 
454     /// The timeout expired.
455     TimedOut,
456 }
457 
458 impl ParkResult {
459     /// Returns true if we were unparked by another thread.
460     #[inline]
is_unparked(self) -> bool461     pub fn is_unparked(self) -> bool {
462         if let ParkResult::Unparked(_) = self {
463             true
464         } else {
465             false
466         }
467     }
468 }
469 
470 /// Result of an unpark operation.
471 #[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
472 pub struct UnparkResult {
473     /// The number of threads that were unparked.
474     pub unparked_threads: usize,
475 
476     /// The number of threads that were requeued.
477     pub requeued_threads: usize,
478 
479     /// Whether there are any threads remaining in the queue. This only returns
480     /// true if a thread was unparked.
481     pub have_more_threads: bool,
482 
483     /// This is set to true on average once every 0.5ms for any given key. It
484     /// should be used to switch to a fair unlocking mechanism for a particular
485     /// unlock.
486     pub be_fair: bool,
487 
488     /// Private field so new fields can be added without breakage.
489     _sealed: (),
490 }
491 
492 /// Operation that `unpark_requeue` should perform.
493 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
494 pub enum RequeueOp {
495     /// Abort the operation without doing anything.
496     Abort,
497 
498     /// Unpark one thread and requeue the rest onto the target queue.
499     UnparkOneRequeueRest,
500 
501     /// Requeue all threads onto the target queue.
502     RequeueAll,
503 
504     /// Unpark one thread and leave the rest parked. No requeuing is done.
505     UnparkOne,
506 
507     /// Requeue one thread and leave the rest parked on the original queue.
508     RequeueOne,
509 }
510 
511 /// Operation that `unpark_filter` should perform for each thread.
512 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
513 pub enum FilterOp {
514     /// Unpark the thread and continue scanning the list of parked threads.
515     Unpark,
516 
517     /// Don't unpark the thread and continue scanning the list of parked threads.
518     Skip,
519 
520     /// Don't unpark the thread and stop scanning the list of parked threads.
521     Stop,
522 }
523 
524 /// A value which is passed from an unparker to a parked thread.
525 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
526 pub struct UnparkToken(pub usize);
527 
528 /// A value associated with a parked thread which can be used by `unpark_filter`.
529 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
530 pub struct ParkToken(pub usize);
531 
532 /// A default unpark token to use.
533 pub const DEFAULT_UNPARK_TOKEN: UnparkToken = UnparkToken(0);
534 
535 /// A default park token to use.
536 pub const DEFAULT_PARK_TOKEN: ParkToken = ParkToken(0);
537 
538 /// Parks the current thread in the queue associated with the given key.
539 ///
540 /// The `validate` function is called while the queue is locked and can abort
541 /// the operation by returning false. If `validate` returns true then the
542 /// current thread is appended to the queue and the queue is unlocked.
543 ///
544 /// The `before_sleep` function is called after the queue is unlocked but before
545 /// the thread is put to sleep. The thread will then sleep until it is unparked
546 /// or the given timeout is reached.
547 ///
548 /// The `timed_out` function is also called while the queue is locked, but only
549 /// if the timeout was reached. It is passed the key of the queue it was in when
550 /// it timed out, which may be different from the original key if
551 /// `unpark_requeue` was called. It is also passed a bool which indicates
552 /// whether it was the last thread in the queue.
553 ///
554 /// # Safety
555 ///
556 /// You should only call this function with an address that you control, since
557 /// you could otherwise interfere with the operation of other synchronization
558 /// primitives.
559 ///
560 /// The `validate` and `timed_out` functions are called while the queue is
561 /// locked and must not panic or call into any function in `parking_lot`.
562 ///
563 /// The `before_sleep` function is called outside the queue lock and is allowed
564 /// to call `unpark_one`, `unpark_all`, `unpark_requeue` or `unpark_filter`, but
565 /// it is not allowed to call `park` or panic.
566 #[inline]
park( key: usize, validate: impl FnOnce() -> bool, before_sleep: impl FnOnce(), timed_out: impl FnOnce(usize, bool), park_token: ParkToken, timeout: Option<Instant>, ) -> ParkResult567 pub unsafe fn park(
568     key: usize,
569     validate: impl FnOnce() -> bool,
570     before_sleep: impl FnOnce(),
571     timed_out: impl FnOnce(usize, bool),
572     park_token: ParkToken,
573     timeout: Option<Instant>,
574 ) -> ParkResult {
575     // Grab our thread data, this also ensures that the hash table exists
576     with_thread_data(|thread_data| {
577         // Lock the bucket for the given key
578         let bucket = lock_bucket(key);
579 
580         // If the validation function fails, just return
581         if !validate() {
582             // SAFETY: We hold the lock here, as required
583             bucket.mutex.unlock();
584             return ParkResult::Invalid;
585         }
586 
587         // Append our thread data to the queue and unlock the bucket
588         thread_data.parked_with_timeout.set(timeout.is_some());
589         thread_data.next_in_queue.set(ptr::null());
590         thread_data.key.store(key, Ordering::Relaxed);
591         thread_data.park_token.set(park_token);
592         thread_data.parker.prepare_park();
593         if !bucket.queue_head.get().is_null() {
594             (*bucket.queue_tail.get()).next_in_queue.set(thread_data);
595         } else {
596             bucket.queue_head.set(thread_data);
597         }
598         bucket.queue_tail.set(thread_data);
599         // SAFETY: We hold the lock here, as required
600         bucket.mutex.unlock();
601 
602         // Invoke the pre-sleep callback
603         before_sleep();
604 
605         // Park our thread and determine whether we were woken up by an unpark
606         // or by our timeout. Note that this isn't precise: we can still be
607         // unparked since we are still in the queue.
608         let unparked = match timeout {
609             Some(timeout) => thread_data.parker.park_until(timeout),
610             None => {
611                 thread_data.parker.park();
612                 // call deadlock detection on_unpark hook
613                 deadlock::on_unpark(thread_data);
614                 true
615             }
616         };
617 
618         // If we were unparked, return now
619         if unparked {
620             return ParkResult::Unparked(thread_data.unpark_token.get());
621         }
622 
623         // Lock our bucket again. Note that the hashtable may have been rehashed in
624         // the meantime. Our key may also have changed if we were requeued.
625         let (key, bucket) = lock_bucket_checked(&thread_data.key);
626 
627         // Now we need to check again if we were unparked or timed out. Unlike the
628         // last check this is precise because we hold the bucket lock.
629         if !thread_data.parker.timed_out() {
630             // SAFETY: We hold the lock here, as required
631             bucket.mutex.unlock();
632             return ParkResult::Unparked(thread_data.unpark_token.get());
633         }
634 
635         // We timed out, so we now need to remove our thread from the queue
636         let mut link = &bucket.queue_head;
637         let mut current = bucket.queue_head.get();
638         let mut previous = ptr::null();
639         let mut was_last_thread = true;
640         while !current.is_null() {
641             if current == thread_data {
642                 let next = (*current).next_in_queue.get();
643                 link.set(next);
644                 if bucket.queue_tail.get() == current {
645                     bucket.queue_tail.set(previous);
646                 } else {
647                     // Scan the rest of the queue to see if there are any other
648                     // entries with the given key.
649                     let mut scan = next;
650                     while !scan.is_null() {
651                         if (*scan).key.load(Ordering::Relaxed) == key {
652                             was_last_thread = false;
653                             break;
654                         }
655                         scan = (*scan).next_in_queue.get();
656                     }
657                 }
658 
659                 // Callback to indicate that we timed out, and whether we were the
660                 // last thread on the queue.
661                 timed_out(key, was_last_thread);
662                 break;
663             } else {
664                 if (*current).key.load(Ordering::Relaxed) == key {
665                     was_last_thread = false;
666                 }
667                 link = &(*current).next_in_queue;
668                 previous = current;
669                 current = link.get();
670             }
671         }
672 
673         // There should be no way for our thread to have been removed from the queue
674         // if we timed out.
675         debug_assert!(!current.is_null());
676 
677         // Unlock the bucket, we are done
678         // SAFETY: We hold the lock here, as required
679         bucket.mutex.unlock();
680         ParkResult::TimedOut
681     })
682 }
683 
684 /// Unparks one thread from the queue associated with the given key.
685 ///
686 /// The `callback` function is called while the queue is locked and before the
687 /// target thread is woken up. The `UnparkResult` argument to the function
688 /// indicates whether a thread was found in the queue and whether this was the
689 /// last thread in the queue. This value is also returned by `unpark_one`.
690 ///
691 /// The `callback` function should return an `UnparkToken` value which will be
692 /// passed to the thread that is unparked. If no thread is unparked then the
693 /// returned value is ignored.
694 ///
695 /// # Safety
696 ///
697 /// You should only call this function with an address that you control, since
698 /// you could otherwise interfere with the operation of other synchronization
699 /// primitives.
700 ///
701 /// The `callback` function is called while the queue is locked and must not
702 /// panic or call into any function in `parking_lot`.
703 #[inline]
unpark_one( key: usize, callback: impl FnOnce(UnparkResult) -> UnparkToken, ) -> UnparkResult704 pub unsafe fn unpark_one(
705     key: usize,
706     callback: impl FnOnce(UnparkResult) -> UnparkToken,
707 ) -> UnparkResult {
708     // Lock the bucket for the given key
709     let bucket = lock_bucket(key);
710 
711     // Find a thread with a matching key and remove it from the queue
712     let mut link = &bucket.queue_head;
713     let mut current = bucket.queue_head.get();
714     let mut previous = ptr::null();
715     let mut result = UnparkResult::default();
716     while !current.is_null() {
717         if (*current).key.load(Ordering::Relaxed) == key {
718             // Remove the thread from the queue
719             let next = (*current).next_in_queue.get();
720             link.set(next);
721             if bucket.queue_tail.get() == current {
722                 bucket.queue_tail.set(previous);
723             } else {
724                 // Scan the rest of the queue to see if there are any other
725                 // entries with the given key.
726                 let mut scan = next;
727                 while !scan.is_null() {
728                     if (*scan).key.load(Ordering::Relaxed) == key {
729                         result.have_more_threads = true;
730                         break;
731                     }
732                     scan = (*scan).next_in_queue.get();
733                 }
734             }
735 
736             // Invoke the callback before waking up the thread
737             result.unparked_threads = 1;
738             result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
739             let token = callback(result);
740 
741             // Set the token for the target thread
742             (*current).unpark_token.set(token);
743 
744             // This is a bit tricky: we first lock the ThreadParker to prevent
745             // the thread from exiting and freeing its ThreadData if its wait
746             // times out. Then we unlock the queue since we don't want to keep
747             // the queue locked while we perform a system call. Finally we wake
748             // up the parked thread.
749             let handle = (*current).parker.unpark_lock();
750             // SAFETY: We hold the lock here, as required
751             bucket.mutex.unlock();
752             handle.unpark();
753 
754             return result;
755         } else {
756             link = &(*current).next_in_queue;
757             previous = current;
758             current = link.get();
759         }
760     }
761 
762     // No threads with a matching key were found in the bucket
763     callback(result);
764     // SAFETY: We hold the lock here, as required
765     bucket.mutex.unlock();
766     result
767 }
768 
769 /// Unparks all threads in the queue associated with the given key.
770 ///
771 /// The given `UnparkToken` is passed to all unparked threads.
772 ///
773 /// This function returns the number of threads that were unparked.
774 ///
775 /// # Safety
776 ///
777 /// You should only call this function with an address that you control, since
778 /// you could otherwise interfere with the operation of other synchronization
779 /// primitives.
780 #[inline]
unpark_all(key: usize, unpark_token: UnparkToken) -> usize781 pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize {
782     // Lock the bucket for the given key
783     let bucket = lock_bucket(key);
784 
785     // Remove all threads with the given key in the bucket
786     let mut link = &bucket.queue_head;
787     let mut current = bucket.queue_head.get();
788     let mut previous = ptr::null();
789     let mut threads = SmallVec::<[_; 8]>::new();
790     while !current.is_null() {
791         if (*current).key.load(Ordering::Relaxed) == key {
792             // Remove the thread from the queue
793             let next = (*current).next_in_queue.get();
794             link.set(next);
795             if bucket.queue_tail.get() == current {
796                 bucket.queue_tail.set(previous);
797             }
798 
799             // Set the token for the target thread
800             (*current).unpark_token.set(unpark_token);
801 
802             // Don't wake up threads while holding the queue lock. See comment
803             // in unpark_one. For now just record which threads we need to wake
804             // up.
805             threads.push((*current).parker.unpark_lock());
806             current = next;
807         } else {
808             link = &(*current).next_in_queue;
809             previous = current;
810             current = link.get();
811         }
812     }
813 
814     // Unlock the bucket
815     // SAFETY: We hold the lock here, as required
816     bucket.mutex.unlock();
817 
818     // Now that we are outside the lock, wake up all the threads that we removed
819     // from the queue.
820     let num_threads = threads.len();
821     for handle in threads.into_iter() {
822         handle.unpark();
823     }
824 
825     num_threads
826 }
827 
828 /// Removes all threads from the queue associated with `key_from`, optionally
829 /// unparks the first one and requeues the rest onto the queue associated with
830 /// `key_to`.
831 ///
832 /// The `validate` function is called while both queues are locked. Its return
833 /// value will determine which operation is performed, or whether the operation
834 /// should be aborted. See `RequeueOp` for details about the different possible
835 /// return values.
836 ///
837 /// The `callback` function is also called while both queues are locked. It is
838 /// passed the `RequeueOp` returned by `validate` and an `UnparkResult`
839 /// indicating whether a thread was unparked and whether there are threads still
840 /// parked in the new queue. This `UnparkResult` value is also returned by
841 /// `unpark_requeue`.
842 ///
843 /// The `callback` function should return an `UnparkToken` value which will be
844 /// passed to the thread that is unparked. If no thread is unparked then the
845 /// returned value is ignored.
846 ///
847 /// # Safety
848 ///
849 /// You should only call this function with an address that you control, since
850 /// you could otherwise interfere with the operation of other synchronization
851 /// primitives.
852 ///
853 /// The `validate` and `callback` functions are called while the queue is locked
854 /// and must not panic or call into any function in `parking_lot`.
855 #[inline]
unpark_requeue( key_from: usize, key_to: usize, validate: impl FnOnce() -> RequeueOp, callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken, ) -> UnparkResult856 pub unsafe fn unpark_requeue(
857     key_from: usize,
858     key_to: usize,
859     validate: impl FnOnce() -> RequeueOp,
860     callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken,
861 ) -> UnparkResult {
862     // Lock the two buckets for the given key
863     let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to);
864 
865     // If the validation function fails, just return
866     let mut result = UnparkResult::default();
867     let op = validate();
868     if op == RequeueOp::Abort {
869         // SAFETY: Both buckets are locked, as required.
870         unlock_bucket_pair(bucket_from, bucket_to);
871         return result;
872     }
873 
874     // Remove all threads with the given key in the source bucket
875     let mut link = &bucket_from.queue_head;
876     let mut current = bucket_from.queue_head.get();
877     let mut previous = ptr::null();
878     let mut requeue_threads: *const ThreadData = ptr::null();
879     let mut requeue_threads_tail: *const ThreadData = ptr::null();
880     let mut wakeup_thread = None;
881     while !current.is_null() {
882         if (*current).key.load(Ordering::Relaxed) == key_from {
883             // Remove the thread from the queue
884             let next = (*current).next_in_queue.get();
885             link.set(next);
886             if bucket_from.queue_tail.get() == current {
887                 bucket_from.queue_tail.set(previous);
888             }
889 
890             // Prepare the first thread for wakeup and requeue the rest.
891             if (op == RequeueOp::UnparkOneRequeueRest || op == RequeueOp::UnparkOne)
892                 && wakeup_thread.is_none()
893             {
894                 wakeup_thread = Some(current);
895                 result.unparked_threads = 1;
896             } else {
897                 if !requeue_threads.is_null() {
898                     (*requeue_threads_tail).next_in_queue.set(current);
899                 } else {
900                     requeue_threads = current;
901                 }
902                 requeue_threads_tail = current;
903                 (*current).key.store(key_to, Ordering::Relaxed);
904                 result.requeued_threads += 1;
905             }
906             if op == RequeueOp::UnparkOne || op == RequeueOp::RequeueOne {
907                 // Scan the rest of the queue to see if there are any other
908                 // entries with the given key.
909                 let mut scan = next;
910                 while !scan.is_null() {
911                     if (*scan).key.load(Ordering::Relaxed) == key_from {
912                         result.have_more_threads = true;
913                         break;
914                     }
915                     scan = (*scan).next_in_queue.get();
916                 }
917                 break;
918             }
919             current = next;
920         } else {
921             link = &(*current).next_in_queue;
922             previous = current;
923             current = link.get();
924         }
925     }
926 
927     // Add the requeued threads to the destination bucket
928     if !requeue_threads.is_null() {
929         (*requeue_threads_tail).next_in_queue.set(ptr::null());
930         if !bucket_to.queue_head.get().is_null() {
931             (*bucket_to.queue_tail.get())
932                 .next_in_queue
933                 .set(requeue_threads);
934         } else {
935             bucket_to.queue_head.set(requeue_threads);
936         }
937         bucket_to.queue_tail.set(requeue_threads_tail);
938     }
939 
940     // Invoke the callback before waking up the thread
941     if result.unparked_threads != 0 {
942         result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout();
943     }
944     let token = callback(op, result);
945 
946     // See comment in unpark_one for why we mess with the locking
947     if let Some(wakeup_thread) = wakeup_thread {
948         (*wakeup_thread).unpark_token.set(token);
949         let handle = (*wakeup_thread).parker.unpark_lock();
950         // SAFETY: Both buckets are locked, as required.
951         unlock_bucket_pair(bucket_from, bucket_to);
952         handle.unpark();
953     } else {
954         // SAFETY: Both buckets are locked, as required.
955         unlock_bucket_pair(bucket_from, bucket_to);
956     }
957 
958     result
959 }
960 
961 /// Unparks a number of threads from the front of the queue associated with
962 /// `key` depending on the results of a filter function which inspects the
963 /// `ParkToken` associated with each thread.
964 ///
965 /// The `filter` function is called for each thread in the queue or until
966 /// `FilterOp::Stop` is returned. This function is passed the `ParkToken`
967 /// associated with a particular thread, which is unparked if `FilterOp::Unpark`
968 /// is returned.
969 ///
970 /// The `callback` function is also called while both queues are locked. It is
971 /// passed an `UnparkResult` indicating the number of threads that were unparked
972 /// and whether there are still parked threads in the queue. This `UnparkResult`
973 /// value is also returned by `unpark_filter`.
974 ///
975 /// The `callback` function should return an `UnparkToken` value which will be
976 /// passed to all threads that are unparked. If no thread is unparked then the
977 /// returned value is ignored.
978 ///
979 /// # Safety
980 ///
981 /// You should only call this function with an address that you control, since
982 /// you could otherwise interfere with the operation of other synchronization
983 /// primitives.
984 ///
985 /// The `filter` and `callback` functions are called while the queue is locked
986 /// and must not panic or call into any function in `parking_lot`.
987 #[inline]
unpark_filter( key: usize, mut filter: impl FnMut(ParkToken) -> FilterOp, callback: impl FnOnce(UnparkResult) -> UnparkToken, ) -> UnparkResult988 pub unsafe fn unpark_filter(
989     key: usize,
990     mut filter: impl FnMut(ParkToken) -> FilterOp,
991     callback: impl FnOnce(UnparkResult) -> UnparkToken,
992 ) -> UnparkResult {
993     // Lock the bucket for the given key
994     let bucket = lock_bucket(key);
995 
996     // Go through the queue looking for threads with a matching key
997     let mut link = &bucket.queue_head;
998     let mut current = bucket.queue_head.get();
999     let mut previous = ptr::null();
1000     let mut threads = SmallVec::<[_; 8]>::new();
1001     let mut result = UnparkResult::default();
1002     while !current.is_null() {
1003         if (*current).key.load(Ordering::Relaxed) == key {
1004             // Call the filter function with the thread's ParkToken
1005             let next = (*current).next_in_queue.get();
1006             match filter((*current).park_token.get()) {
1007                 FilterOp::Unpark => {
1008                     // Remove the thread from the queue
1009                     link.set(next);
1010                     if bucket.queue_tail.get() == current {
1011                         bucket.queue_tail.set(previous);
1012                     }
1013 
1014                     // Add the thread to our list of threads to unpark
1015                     threads.push((current, None));
1016 
1017                     current = next;
1018                 }
1019                 FilterOp::Skip => {
1020                     result.have_more_threads = true;
1021                     link = &(*current).next_in_queue;
1022                     previous = current;
1023                     current = link.get();
1024                 }
1025                 FilterOp::Stop => {
1026                     result.have_more_threads = true;
1027                     break;
1028                 }
1029             }
1030         } else {
1031             link = &(*current).next_in_queue;
1032             previous = current;
1033             current = link.get();
1034         }
1035     }
1036 
1037     // Invoke the callback before waking up the threads
1038     result.unparked_threads = threads.len();
1039     if result.unparked_threads != 0 {
1040         result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
1041     }
1042     let token = callback(result);
1043 
1044     // Pass the token to all threads that are going to be unparked and prepare
1045     // them for unparking.
1046     for t in threads.iter_mut() {
1047         (*t.0).unpark_token.set(token);
1048         t.1 = Some((*t.0).parker.unpark_lock());
1049     }
1050 
1051     // SAFETY: We hold the lock here, as required
1052     bucket.mutex.unlock();
1053 
1054     // Now that we are outside the lock, wake up all the threads that we removed
1055     // from the queue.
1056     for (_, handle) in threads.into_iter() {
1057         handle.unchecked_unwrap().unpark();
1058     }
1059 
1060     result
1061 }
1062 
1063 /// \[Experimental\] Deadlock detection
1064 ///
1065 /// Enabled via the `deadlock_detection` feature flag.
1066 pub mod deadlock {
1067     #[cfg(feature = "deadlock_detection")]
1068     use super::deadlock_impl;
1069 
1070     #[cfg(feature = "deadlock_detection")]
1071     pub(super) use super::deadlock_impl::DeadlockData;
1072 
1073     /// Acquire a resource identified by key in the deadlock detector
1074     /// Noop if deadlock_detection feature isn't enabled.
1075     ///
1076     /// # Safety
1077     ///
1078     /// Call after the resource is acquired
1079     #[inline]
acquire_resource(_key: usize)1080     pub unsafe fn acquire_resource(_key: usize) {
1081         #[cfg(feature = "deadlock_detection")]
1082         deadlock_impl::acquire_resource(_key);
1083     }
1084 
1085     /// Release a resource identified by key in the deadlock detector.
1086     /// Noop if deadlock_detection feature isn't enabled.
1087     ///
1088     /// # Panics
1089     ///
1090     /// Panics if the resource was already released or wasn't acquired in this thread.
1091     ///
1092     /// # Safety
1093     ///
1094     /// Call before the resource is released
1095     #[inline]
release_resource(_key: usize)1096     pub unsafe fn release_resource(_key: usize) {
1097         #[cfg(feature = "deadlock_detection")]
1098         deadlock_impl::release_resource(_key);
1099     }
1100 
1101     /// Returns all deadlocks detected *since* the last call.
1102     /// Each cycle consist of a vector of `DeadlockedThread`.
1103     #[cfg(feature = "deadlock_detection")]
1104     #[inline]
check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>>1105     pub fn check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>> {
1106         deadlock_impl::check_deadlock()
1107     }
1108 
1109     #[inline]
on_unpark(_td: &super::ThreadData)1110     pub(super) unsafe fn on_unpark(_td: &super::ThreadData) {
1111         #[cfg(feature = "deadlock_detection")]
1112         deadlock_impl::on_unpark(_td);
1113     }
1114 }
1115 
1116 #[cfg(feature = "deadlock_detection")]
1117 mod deadlock_impl {
1118     use super::{get_hashtable, lock_bucket, with_thread_data, ThreadData, NUM_THREADS};
1119     use crate::thread_parker::{ThreadParkerT, UnparkHandleT};
1120     use crate::word_lock::WordLock;
1121     use backtrace::Backtrace;
1122     use petgraph;
1123     use petgraph::graphmap::DiGraphMap;
1124     use std::cell::{Cell, UnsafeCell};
1125     use std::collections::HashSet;
1126     use std::sync::atomic::Ordering;
1127     use std::sync::mpsc;
1128     use thread_id;
1129 
1130     /// Representation of a deadlocked thread
1131     pub struct DeadlockedThread {
1132         thread_id: usize,
1133         backtrace: Backtrace,
1134     }
1135 
1136     impl DeadlockedThread {
1137         /// The system thread id
thread_id(&self) -> usize1138         pub fn thread_id(&self) -> usize {
1139             self.thread_id
1140         }
1141 
1142         /// The thread backtrace
backtrace(&self) -> &Backtrace1143         pub fn backtrace(&self) -> &Backtrace {
1144             &self.backtrace
1145         }
1146     }
1147 
1148     pub struct DeadlockData {
1149         // Currently owned resources (keys)
1150         resources: UnsafeCell<Vec<usize>>,
1151 
1152         // Set when there's a pending callstack request
1153         deadlocked: Cell<bool>,
1154 
1155         // Sender used to report the backtrace
1156         backtrace_sender: UnsafeCell<Option<mpsc::Sender<DeadlockedThread>>>,
1157 
1158         // System thread id
1159         thread_id: usize,
1160     }
1161 
1162     impl DeadlockData {
new() -> Self1163         pub fn new() -> Self {
1164             DeadlockData {
1165                 resources: UnsafeCell::new(Vec::new()),
1166                 deadlocked: Cell::new(false),
1167                 backtrace_sender: UnsafeCell::new(None),
1168                 thread_id: thread_id::get(),
1169             }
1170         }
1171     }
1172 
on_unpark(td: &ThreadData)1173     pub(super) unsafe fn on_unpark(td: &ThreadData) {
1174         if td.deadlock_data.deadlocked.get() {
1175             let sender = (*td.deadlock_data.backtrace_sender.get()).take().unwrap();
1176             sender
1177                 .send(DeadlockedThread {
1178                     thread_id: td.deadlock_data.thread_id,
1179                     backtrace: Backtrace::new(),
1180                 })
1181                 .unwrap();
1182             // make sure to close this sender
1183             drop(sender);
1184 
1185             // park until the end of the time
1186             td.parker.prepare_park();
1187             td.parker.park();
1188             unreachable!("unparked deadlocked thread!");
1189         }
1190     }
1191 
acquire_resource(key: usize)1192     pub unsafe fn acquire_resource(key: usize) {
1193         with_thread_data(|thread_data| {
1194             (*thread_data.deadlock_data.resources.get()).push(key);
1195         });
1196     }
1197 
release_resource(key: usize)1198     pub unsafe fn release_resource(key: usize) {
1199         with_thread_data(|thread_data| {
1200             let resources = &mut (*thread_data.deadlock_data.resources.get());
1201 
1202             // There is only one situation where we can fail to find the
1203             // resource: we are currently running TLS destructors and our
1204             // ThreadData has already been freed. There isn't much we can do
1205             // about it at this point, so just ignore it.
1206             if let Some(p) = resources.iter().rposition(|x| *x == key) {
1207                 resources.swap_remove(p);
1208             }
1209         });
1210     }
1211 
check_deadlock() -> Vec<Vec<DeadlockedThread>>1212     pub fn check_deadlock() -> Vec<Vec<DeadlockedThread>> {
1213         unsafe {
1214             // fast pass
1215             if check_wait_graph_fast() {
1216                 // double check
1217                 check_wait_graph_slow()
1218             } else {
1219                 Vec::new()
1220             }
1221         }
1222     }
1223 
1224     // Simple algorithm that builds a wait graph f the threads and the resources,
1225     // then checks for the presence of cycles (deadlocks).
1226     // This variant isn't precise as it doesn't lock the entire table before checking
check_wait_graph_fast() -> bool1227     unsafe fn check_wait_graph_fast() -> bool {
1228         let table = get_hashtable();
1229         let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1230         let mut graph = DiGraphMap::<usize, ()>::with_capacity(thread_count * 2, thread_count * 2);
1231 
1232         for b in &(*table).entries[..] {
1233             b.mutex.lock();
1234             let mut current = b.queue_head.get();
1235             while !current.is_null() {
1236                 if !(*current).parked_with_timeout.get()
1237                     && !(*current).deadlock_data.deadlocked.get()
1238                 {
1239                     // .resources are waiting for their owner
1240                     for &resource in &(*(*current).deadlock_data.resources.get()) {
1241                         graph.add_edge(resource, current as usize, ());
1242                     }
1243                     // owner waits for resource .key
1244                     graph.add_edge(current as usize, (*current).key.load(Ordering::Relaxed), ());
1245                 }
1246                 current = (*current).next_in_queue.get();
1247             }
1248             // SAFETY: We hold the lock here, as required
1249             b.mutex.unlock();
1250         }
1251 
1252         petgraph::algo::is_cyclic_directed(&graph)
1253     }
1254 
1255     #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
1256     enum WaitGraphNode {
1257         Thread(*const ThreadData),
1258         Resource(usize),
1259     }
1260 
1261     use self::WaitGraphNode::*;
1262 
1263     // Contrary to the _fast variant this locks the entries table before looking for cycles.
1264     // Returns all detected thread wait cycles.
1265     // Note that once a cycle is reported it's never reported again.
check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>>1266     unsafe fn check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>> {
1267         static DEADLOCK_DETECTION_LOCK: WordLock = WordLock::new();
1268         DEADLOCK_DETECTION_LOCK.lock();
1269 
1270         let mut table = get_hashtable();
1271         loop {
1272             // Lock all buckets in the old table
1273             for b in &table.entries[..] {
1274                 b.mutex.lock();
1275             }
1276 
1277             // Now check if our table is still the latest one. Another thread could
1278             // have grown the hash table between us getting and locking the hash table.
1279             let new_table = get_hashtable();
1280             if new_table as *const _ == table as *const _ {
1281                 break;
1282             }
1283 
1284             // Unlock buckets and try again
1285             for b in &table.entries[..] {
1286                 // SAFETY: We hold the lock here, as required
1287                 b.mutex.unlock();
1288             }
1289 
1290             table = new_table;
1291         }
1292 
1293         let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1294         let mut graph =
1295             DiGraphMap::<WaitGraphNode, ()>::with_capacity(thread_count * 2, thread_count * 2);
1296 
1297         for b in &table.entries[..] {
1298             let mut current = b.queue_head.get();
1299             while !current.is_null() {
1300                 if !(*current).parked_with_timeout.get()
1301                     && !(*current).deadlock_data.deadlocked.get()
1302                 {
1303                     // .resources are waiting for their owner
1304                     for &resource in &(*(*current).deadlock_data.resources.get()) {
1305                         graph.add_edge(Resource(resource), Thread(current), ());
1306                     }
1307                     // owner waits for resource .key
1308                     graph.add_edge(
1309                         Thread(current),
1310                         Resource((*current).key.load(Ordering::Relaxed)),
1311                         (),
1312                     );
1313                 }
1314                 current = (*current).next_in_queue.get();
1315             }
1316         }
1317 
1318         for b in &table.entries[..] {
1319             // SAFETY: We hold the lock here, as required
1320             b.mutex.unlock();
1321         }
1322 
1323         // find cycles
1324         let cycles = graph_cycles(&graph);
1325 
1326         let mut results = Vec::with_capacity(cycles.len());
1327 
1328         for cycle in cycles {
1329             let (sender, receiver) = mpsc::channel();
1330             for td in cycle {
1331                 let bucket = lock_bucket((*td).key.load(Ordering::Relaxed));
1332                 (*td).deadlock_data.deadlocked.set(true);
1333                 *(*td).deadlock_data.backtrace_sender.get() = Some(sender.clone());
1334                 let handle = (*td).parker.unpark_lock();
1335                 // SAFETY: We hold the lock here, as required
1336                 bucket.mutex.unlock();
1337                 // unpark the deadlocked thread!
1338                 // on unpark it'll notice the deadlocked flag and report back
1339                 handle.unpark();
1340             }
1341             // make sure to drop our sender before collecting results
1342             drop(sender);
1343             results.push(receiver.iter().collect());
1344         }
1345 
1346         DEADLOCK_DETECTION_LOCK.unlock();
1347 
1348         results
1349     }
1350 
1351     // normalize a cycle to start with the "smallest" node
normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T>1352     fn normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T> {
1353         let min_pos = input
1354             .iter()
1355             .enumerate()
1356             .min_by_key(|&(_, &t)| t)
1357             .map(|(p, _)| p)
1358             .unwrap_or(0);
1359         input
1360             .iter()
1361             .cycle()
1362             .skip(min_pos)
1363             .take(input.len())
1364             .cloned()
1365             .collect()
1366     }
1367 
1368     // returns all thread cycles in the wait graph
graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>>1369     fn graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>> {
1370         use petgraph::visit::depth_first_search;
1371         use petgraph::visit::DfsEvent;
1372         use petgraph::visit::NodeIndexable;
1373 
1374         let mut cycles = HashSet::new();
1375         let mut path = Vec::with_capacity(g.node_bound());
1376         // start from threads to get the correct threads cycle
1377         let threads = g
1378             .nodes()
1379             .filter(|n| if let &Thread(_) = n { true } else { false });
1380 
1381         depth_first_search(g, threads, |e| match e {
1382             DfsEvent::Discover(Thread(n), _) => path.push(n),
1383             DfsEvent::Finish(Thread(_), _) => {
1384                 path.pop();
1385             }
1386             DfsEvent::BackEdge(_, Thread(n)) => {
1387                 let from = path.iter().rposition(|&i| i == n).unwrap();
1388                 cycles.insert(normalize_cycle(&path[from..]));
1389             }
1390             _ => (),
1391         });
1392 
1393         cycles.iter().cloned().collect()
1394     }
1395 }
1396 
1397 #[cfg(test)]
1398 mod tests {
1399     use super::{ThreadData, DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN};
1400     use std::{
1401         ptr,
1402         sync::{
1403             atomic::{AtomicIsize, AtomicPtr, AtomicUsize, Ordering},
1404             Arc,
1405         },
1406         thread,
1407         time::Duration,
1408     };
1409 
1410     /// Calls a closure for every `ThreadData` currently parked on a given key
for_each(key: usize, mut f: impl FnMut(&ThreadData))1411     fn for_each(key: usize, mut f: impl FnMut(&ThreadData)) {
1412         let bucket = super::lock_bucket(key);
1413 
1414         let mut current: *const ThreadData = bucket.queue_head.get();
1415         while !current.is_null() {
1416             let current_ref = unsafe { &*current };
1417             if current_ref.key.load(Ordering::Relaxed) == key {
1418                 f(current_ref);
1419             }
1420             current = current_ref.next_in_queue.get();
1421         }
1422 
1423         // SAFETY: We hold the lock here, as required
1424         unsafe { bucket.mutex.unlock() };
1425     }
1426 
1427     macro_rules! test {
1428         ( $( $name:ident(
1429             repeats: $repeats:expr,
1430             latches: $latches:expr,
1431             delay: $delay:expr,
1432             threads: $threads:expr,
1433             single_unparks: $single_unparks:expr);
1434         )* ) => {
1435             $(#[test]
1436             fn $name() {
1437                 let delay = Duration::from_micros($delay);
1438                 for _ in 0..$repeats {
1439                     run_parking_test($latches, delay, $threads, $single_unparks);
1440                 }
1441             })*
1442         };
1443     }
1444 
1445     test! {
1446         unpark_all_one_fast(
1447             repeats: 10000, latches: 1, delay: 0, threads: 1, single_unparks: 0
1448         );
1449         unpark_all_hundred_fast(
1450             repeats: 100, latches: 1, delay: 0, threads: 100, single_unparks: 0
1451         );
1452         unpark_one_one_fast(
1453             repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 1
1454         );
1455         unpark_one_hundred_fast(
1456             repeats: 20, latches: 1, delay: 0, threads: 100, single_unparks: 100
1457         );
1458         unpark_one_fifty_then_fifty_all_fast(
1459             repeats: 50, latches: 1, delay: 0, threads: 100, single_unparks: 50
1460         );
1461         unpark_all_one(
1462             repeats: 100, latches: 1, delay: 10000, threads: 1, single_unparks: 0
1463         );
1464         unpark_all_hundred(
1465             repeats: 100, latches: 1, delay: 10000, threads: 100, single_unparks: 0
1466         );
1467         unpark_one_one(
1468             repeats: 10, latches: 1, delay: 10000, threads: 1, single_unparks: 1
1469         );
1470         unpark_one_fifty(
1471             repeats: 1, latches: 1, delay: 10000, threads: 50, single_unparks: 50
1472         );
1473         unpark_one_fifty_then_fifty_all(
1474             repeats: 2, latches: 1, delay: 10000, threads: 100, single_unparks: 50
1475         );
1476         hundred_unpark_all_one_fast(
1477             repeats: 100, latches: 100, delay: 0, threads: 1, single_unparks: 0
1478         );
1479         hundred_unpark_all_one(
1480             repeats: 1, latches: 100, delay: 10000, threads: 1, single_unparks: 0
1481         );
1482     }
1483 
run_parking_test( num_latches: usize, delay: Duration, num_threads: usize, num_single_unparks: usize, )1484     fn run_parking_test(
1485         num_latches: usize,
1486         delay: Duration,
1487         num_threads: usize,
1488         num_single_unparks: usize,
1489     ) {
1490         let mut tests = Vec::with_capacity(num_latches);
1491 
1492         for _ in 0..num_latches {
1493             let test = Arc::new(SingleLatchTest::new(num_threads));
1494             let mut threads = Vec::with_capacity(num_threads);
1495             for _ in 0..num_threads {
1496                 let test = test.clone();
1497                 threads.push(thread::spawn(move || test.run()));
1498             }
1499             tests.push((test, threads));
1500         }
1501 
1502         for unpark_index in 0..num_single_unparks {
1503             thread::sleep(delay);
1504             for (test, _) in &tests {
1505                 test.unpark_one(unpark_index);
1506             }
1507         }
1508 
1509         for (test, threads) in tests {
1510             test.finish(num_single_unparks);
1511             for thread in threads {
1512                 thread.join().expect("Test thread panic");
1513             }
1514         }
1515     }
1516 
1517     struct SingleLatchTest {
1518         semaphore: AtomicIsize,
1519         num_awake: AtomicUsize,
1520         /// Holds the pointer to the last *unprocessed* woken up thread.
1521         last_awoken: AtomicPtr<ThreadData>,
1522         /// Total number of threads participating in this test.
1523         num_threads: usize,
1524     }
1525 
1526     impl SingleLatchTest {
new(num_threads: usize) -> Self1527         pub fn new(num_threads: usize) -> Self {
1528             Self {
1529                 // This implements a fair (FIFO) semaphore, and it starts out unavailable.
1530                 semaphore: AtomicIsize::new(0),
1531                 num_awake: AtomicUsize::new(0),
1532                 last_awoken: AtomicPtr::new(ptr::null_mut()),
1533                 num_threads,
1534             }
1535         }
1536 
run(&self)1537         pub fn run(&self) {
1538             // Get one slot from the semaphore
1539             self.down();
1540 
1541             // Report back to the test verification code that this thread woke up
1542             let this_thread_ptr = super::with_thread_data(|t| t as *const _ as *mut _);
1543             self.last_awoken.store(this_thread_ptr, Ordering::SeqCst);
1544             self.num_awake.fetch_add(1, Ordering::SeqCst);
1545         }
1546 
unpark_one(&self, single_unpark_index: usize)1547         pub fn unpark_one(&self, single_unpark_index: usize) {
1548             // last_awoken should be null at all times except between self.up() and at the bottom
1549             // of this method where it's reset to null again
1550             assert!(self.last_awoken.load(Ordering::SeqCst).is_null());
1551 
1552             let mut queue: Vec<*mut ThreadData> = Vec::with_capacity(self.num_threads);
1553             for_each(self.semaphore_addr(), |thread_data| {
1554                 queue.push(thread_data as *const _ as *mut _);
1555             });
1556             assert!(queue.len() <= self.num_threads - single_unpark_index);
1557 
1558             let num_awake_before_up = self.num_awake.load(Ordering::SeqCst);
1559 
1560             self.up();
1561 
1562             // Wait for a parked thread to wake up and update num_awake + last_awoken.
1563             while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 {
1564                 thread::yield_now();
1565             }
1566 
1567             // At this point the other thread should have set last_awoken inside the run() method
1568             let last_awoken = self.last_awoken.load(Ordering::SeqCst);
1569             assert!(!last_awoken.is_null());
1570             if !queue.is_empty() && queue[0] != last_awoken {
1571                 panic!(
1572                     "Woke up wrong thread:\n\tqueue: {:?}\n\tlast awoken: {:?}",
1573                     queue, last_awoken
1574                 );
1575             }
1576             self.last_awoken.store(ptr::null_mut(), Ordering::SeqCst);
1577         }
1578 
finish(&self, num_single_unparks: usize)1579         pub fn finish(&self, num_single_unparks: usize) {
1580             // The amount of threads not unparked via unpark_one
1581             let mut num_threads_left = self.num_threads.checked_sub(num_single_unparks).unwrap();
1582 
1583             // Wake remaining threads up with unpark_all. Has to be in a loop, because there might
1584             // still be threads that has not yet parked.
1585             while num_threads_left > 0 {
1586                 let mut num_waiting_on_address = 0;
1587                 for_each(self.semaphore_addr(), |_thread_data| {
1588                     num_waiting_on_address += 1;
1589                 });
1590                 assert!(num_waiting_on_address <= num_threads_left);
1591 
1592                 let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst);
1593 
1594                 let num_unparked =
1595                     unsafe { super::unpark_all(self.semaphore_addr(), DEFAULT_UNPARK_TOKEN) };
1596                 assert!(num_unparked >= num_waiting_on_address);
1597                 assert!(num_unparked <= num_threads_left);
1598 
1599                 // Wait for all unparked threads to wake up and update num_awake + last_awoken.
1600                 while self.num_awake.load(Ordering::SeqCst)
1601                     != num_awake_before_unpark + num_unparked
1602                 {
1603                     thread::yield_now()
1604                 }
1605 
1606                 num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap();
1607             }
1608             // By now, all threads should have been woken up
1609             assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads);
1610 
1611             // Make sure no thread is parked on our semaphore address
1612             let mut num_waiting_on_address = 0;
1613             for_each(self.semaphore_addr(), |_thread_data| {
1614                 num_waiting_on_address += 1;
1615             });
1616             assert_eq!(num_waiting_on_address, 0);
1617         }
1618 
down(&self)1619         pub fn down(&self) {
1620             let old_semaphore_value = self.semaphore.fetch_sub(1, Ordering::SeqCst);
1621 
1622             if old_semaphore_value > 0 {
1623                 // We acquired the semaphore. Done.
1624                 return;
1625             }
1626 
1627             // We need to wait.
1628             let validate = || true;
1629             let before_sleep = || {};
1630             let timed_out = |_, _| {};
1631             unsafe {
1632                 super::park(
1633                     self.semaphore_addr(),
1634                     validate,
1635                     before_sleep,
1636                     timed_out,
1637                     DEFAULT_PARK_TOKEN,
1638                     None,
1639                 );
1640             }
1641         }
1642 
up(&self)1643         pub fn up(&self) {
1644             let old_semaphore_value = self.semaphore.fetch_add(1, Ordering::SeqCst);
1645 
1646             // Check if anyone was waiting on the semaphore. If they were, then pass ownership to them.
1647             if old_semaphore_value < 0 {
1648                 // We need to continue until we have actually unparked someone. It might be that
1649                 // the thread we want to pass ownership to has decremented the semaphore counter,
1650                 // but not yet parked.
1651                 loop {
1652                     match unsafe {
1653                         super::unpark_one(self.semaphore_addr(), |_| DEFAULT_UNPARK_TOKEN)
1654                             .unparked_threads
1655                     } {
1656                         1 => break,
1657                         0 => (),
1658                         i => panic!("Should not wake up {} threads", i),
1659                     }
1660                 }
1661             }
1662         }
1663 
semaphore_addr(&self) -> usize1664         fn semaphore_addr(&self) -> usize {
1665             &self.semaphore as *const _ as usize
1666         }
1667     }
1668 }
1669