1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 
8 use rand::rngs::SmallRng;
9 use rand::{FromEntropy, Rng};
10 use smallvec::SmallVec;
11 use std::cell::{Cell, UnsafeCell};
12 use std::mem;
13 #[cfg(not(has_localkey_try_with))]
14 use std::panic;
15 use std::ptr;
16 use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
17 use std::thread::LocalKey;
18 use std::time::{Duration, Instant};
19 use thread_parker::ThreadParker;
20 use util::UncheckedOptionExt;
21 use word_lock::WordLock;
22 
23 static NUM_THREADS: AtomicUsize = ATOMIC_USIZE_INIT;
24 static HASHTABLE: AtomicUsize = ATOMIC_USIZE_INIT;
25 
26 // Even with 3x more buckets than threads, the memory overhead per thread is
27 // still only a few hundred bytes per thread.
28 const LOAD_FACTOR: usize = 3;
29 
30 struct HashTable {
31     // Hash buckets for the table
32     entries: Box<[Bucket]>,
33 
34     // Number of bits used for the hash function
35     hash_bits: u32,
36 
37     // Previous table. This is only kept to keep leak detectors happy.
38     _prev: *const HashTable,
39 }
40 
41 impl HashTable {
new(num_threads: usize, prev: *const HashTable) -> Box<HashTable>42     fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> {
43         let new_size = (num_threads * LOAD_FACTOR).next_power_of_two();
44         let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1;
45         let bucket = Bucket {
46             mutex: WordLock::new(),
47             queue_head: Cell::new(ptr::null()),
48             queue_tail: Cell::new(ptr::null()),
49             fair_timeout: UnsafeCell::new(FairTimeout::new()),
50             _padding: unsafe { mem::uninitialized() },
51         };
52         Box::new(HashTable {
53             entries: vec![bucket; new_size].into_boxed_slice(),
54             hash_bits: hash_bits,
55             _prev: prev,
56         })
57     }
58 }
59 
60 struct Bucket {
61     // Lock protecting the queue
62     mutex: WordLock,
63 
64     // Linked list of threads waiting on this bucket
65     queue_head: Cell<*const ThreadData>,
66     queue_tail: Cell<*const ThreadData>,
67 
68     // Next time at which point be_fair should be set
69     fair_timeout: UnsafeCell<FairTimeout>,
70 
71     // Padding to avoid false sharing between buckets. Ideally we would just
72     // align the bucket structure to 64 bytes, but Rust doesn't support that
73     // yet.
74     _padding: [u8; 64],
75 }
76 
77 // Implementation of Clone for Bucket, needed to make vec![] work
78 impl Clone for Bucket {
clone(&self) -> Bucket79     fn clone(&self) -> Bucket {
80         Bucket {
81             mutex: WordLock::new(),
82             queue_head: Cell::new(ptr::null()),
83             queue_tail: Cell::new(ptr::null()),
84             fair_timeout: UnsafeCell::new(FairTimeout::new()),
85             _padding: unsafe { mem::uninitialized() },
86         }
87     }
88 }
89 
90 struct FairTimeout {
91     // Next time at which point be_fair should be set
92     timeout: Instant,
93 
94     // Random number generator for calculating the next timeout
95     rng: SmallRng,
96 }
97 
98 impl FairTimeout {
new() -> FairTimeout99     fn new() -> FairTimeout {
100         FairTimeout {
101             timeout: Instant::now(),
102             rng: SmallRng::from_entropy(),
103         }
104     }
105 
106     // Determine whether we should force a fair unlock, and update the timeout
should_timeout(&mut self) -> bool107     fn should_timeout(&mut self) -> bool {
108         let now = Instant::now();
109         if now > self.timeout {
110             self.timeout = now + Duration::new(0, self.rng.gen_range(0, 1000000));
111             true
112         } else {
113             false
114         }
115     }
116 }
117 
118 struct ThreadData {
119     parker: ThreadParker,
120 
121     // Key that this thread is sleeping on. This may change if the thread is
122     // requeued to a different key.
123     key: AtomicUsize,
124 
125     // Linked list of parked threads in a bucket
126     next_in_queue: Cell<*const ThreadData>,
127 
128     // UnparkToken passed to this thread when it is unparked
129     unpark_token: Cell<UnparkToken>,
130 
131     // ParkToken value set by the thread when it was parked
132     park_token: Cell<ParkToken>,
133 
134     // Is the thread parked with a timeout?
135     parked_with_timeout: Cell<bool>,
136 
137     // Extra data for deadlock detection
138     // TODO: once supported in stable replace with #[cfg...] & remove dummy struct/impl
139     #[allow(dead_code)]
140     deadlock_data: deadlock::DeadlockData,
141 }
142 
143 impl ThreadData {
new() -> ThreadData144     fn new() -> ThreadData {
145         // Keep track of the total number of live ThreadData objects and resize
146         // the hash table accordingly.
147         let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1;
148         unsafe {
149             grow_hashtable(num_threads);
150         }
151 
152         ThreadData {
153             parker: ThreadParker::new(),
154             key: AtomicUsize::new(0),
155             next_in_queue: Cell::new(ptr::null()),
156             unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN),
157             park_token: Cell::new(DEFAULT_PARK_TOKEN),
158             parked_with_timeout: Cell::new(false),
159             deadlock_data: deadlock::DeadlockData::new(),
160         }
161     }
162 }
163 
164 // Returns a ThreadData structure for the current thread
get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData165 unsafe fn get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData {
166     // Try to read from thread-local storage, but return None if the TLS has
167     // already been destroyed.
168     #[cfg(has_localkey_try_with)]
169     fn try_get_tls(key: &'static LocalKey<ThreadData>) -> Option<*const ThreadData> {
170         key.try_with(|x| x as *const ThreadData).ok()
171     }
172     #[cfg(not(has_localkey_try_with))]
173     fn try_get_tls(key: &'static LocalKey<ThreadData>) -> Option<*const ThreadData> {
174         panic::catch_unwind(|| key.with(|x| x as *const ThreadData)).ok()
175     }
176 
177     // Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive
178     // to construct. Try to use a thread-local version if possible.
179     thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
180     if let Some(tls) = try_get_tls(&THREAD_DATA) {
181         return &*tls;
182     }
183 
184     // Otherwise just create a ThreadData on the stack
185     *local = Some(ThreadData::new());
186     local.as_ref().unwrap()
187 }
188 
189 impl Drop for ThreadData {
drop(&mut self)190     fn drop(&mut self) {
191         NUM_THREADS.fetch_sub(1, Ordering::Relaxed);
192     }
193 }
194 
195 // Get a pointer to the latest hash table, creating one if it doesn't exist yet.
get_hashtable() -> *const HashTable196 unsafe fn get_hashtable() -> *const HashTable {
197     let mut table = HASHTABLE.load(Ordering::Acquire);
198 
199     // If there is no table, create one
200     if table == 0 {
201         let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null()));
202 
203         // If this fails then it means some other thread created the hash
204         // table first.
205         match HASHTABLE.compare_exchange(
206             0,
207             new_table as usize,
208             Ordering::Release,
209             Ordering::Relaxed,
210         ) {
211             Ok(_) => return new_table,
212             Err(x) => table = x,
213         }
214 
215         // Free the table we created
216         Box::from_raw(new_table);
217     }
218 
219     table as *const HashTable
220 }
221 
222 // Grow the hash table so that it is big enough for the given number of threads.
223 // This isn't performance-critical since it is only done when a ThreadData is
224 // created, which only happens once per thread.
grow_hashtable(num_threads: usize)225 unsafe fn grow_hashtable(num_threads: usize) {
226     // If there is no table, create one
227     if HASHTABLE.load(Ordering::Relaxed) == 0 {
228         let new_table = Box::into_raw(HashTable::new(num_threads, ptr::null()));
229 
230         // If this fails then it means some other thread created the hash
231         // table first.
232         if HASHTABLE
233             .compare_exchange(0, new_table as usize, Ordering::Release, Ordering::Relaxed)
234             .is_ok()
235         {
236             return;
237         }
238 
239         // Free the table we created
240         Box::from_raw(new_table);
241     }
242 
243     let mut old_table;
244     loop {
245         old_table = HASHTABLE.load(Ordering::Acquire) as *mut HashTable;
246 
247         // Check if we need to resize the existing table
248         if (*old_table).entries.len() >= LOAD_FACTOR * num_threads {
249             return;
250         }
251 
252         // Lock all buckets in the old table
253         for b in &(*old_table).entries[..] {
254             b.mutex.lock();
255         }
256 
257         // Now check if our table is still the latest one. Another thread could
258         // have grown the hash table between us reading HASHTABLE and locking
259         // the buckets.
260         if HASHTABLE.load(Ordering::Relaxed) == old_table as usize {
261             break;
262         }
263 
264         // Unlock buckets and try again
265         for b in &(*old_table).entries[..] {
266             b.mutex.unlock();
267         }
268     }
269 
270     // Create the new table
271     let new_table = HashTable::new(num_threads, old_table);
272 
273     // Move the entries from the old table to the new one
274     for b in &(*old_table).entries[..] {
275         let mut current = b.queue_head.get();
276         while !current.is_null() {
277             let next = (*current).next_in_queue.get();
278             let hash = hash((*current).key.load(Ordering::Relaxed), new_table.hash_bits);
279             if new_table.entries[hash].queue_tail.get().is_null() {
280                 new_table.entries[hash].queue_head.set(current);
281             } else {
282                 (*new_table.entries[hash].queue_tail.get())
283                     .next_in_queue
284                     .set(current);
285             }
286             new_table.entries[hash].queue_tail.set(current);
287             (*current).next_in_queue.set(ptr::null());
288             current = next;
289         }
290     }
291 
292     // Publish the new table. No races are possible at this point because
293     // any other thread trying to grow the hash table is blocked on the bucket
294     // locks in the old table.
295     HASHTABLE.store(Box::into_raw(new_table) as usize, Ordering::Release);
296 
297     // Unlock all buckets in the old table
298     for b in &(*old_table).entries[..] {
299         b.mutex.unlock();
300     }
301 }
302 
303 // Hash function for addresses
304 #[cfg(target_pointer_width = "32")]
hash(key: usize, bits: u32) -> usize305 fn hash(key: usize, bits: u32) -> usize {
306     key.wrapping_mul(0x9E3779B9) >> (32 - bits)
307 }
308 #[cfg(target_pointer_width = "64")]
hash(key: usize, bits: u32) -> usize309 fn hash(key: usize, bits: u32) -> usize {
310     key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits)
311 }
312 
313 // Lock the bucket for the given key
lock_bucket<'a>(key: usize) -> &'a Bucket314 unsafe fn lock_bucket<'a>(key: usize) -> &'a Bucket {
315     let mut bucket;
316     loop {
317         let hashtable = get_hashtable();
318 
319         let hash = hash(key, (*hashtable).hash_bits);
320         bucket = &(*hashtable).entries[hash];
321 
322         // Lock the bucket
323         bucket.mutex.lock();
324 
325         // If no other thread has rehashed the table before we grabbed the lock
326         // then we are good to go! The lock we grabbed prevents any rehashes.
327         if HASHTABLE.load(Ordering::Relaxed) == hashtable as usize {
328             return bucket;
329         }
330 
331         // Unlock the bucket and try again
332         bucket.mutex.unlock();
333     }
334 }
335 
336 // Lock the bucket for the given key, but check that the key hasn't been changed
337 // in the meantime due to a requeue.
lock_bucket_checked<'a>(key: &AtomicUsize) -> (usize, &'a Bucket)338 unsafe fn lock_bucket_checked<'a>(key: &AtomicUsize) -> (usize, &'a Bucket) {
339     let mut bucket;
340     loop {
341         let hashtable = get_hashtable();
342         let current_key = key.load(Ordering::Relaxed);
343 
344         let hash = hash(current_key, (*hashtable).hash_bits);
345         bucket = &(*hashtable).entries[hash];
346 
347         // Lock the bucket
348         bucket.mutex.lock();
349 
350         // Check that both the hash table and key are correct while the bucket
351         // is locked. Note that the key can't change once we locked the proper
352         // bucket for it, so we just keep trying until we have the correct key.
353         if HASHTABLE.load(Ordering::Relaxed) == hashtable as usize
354             && key.load(Ordering::Relaxed) == current_key
355         {
356             return (current_key, bucket);
357         }
358 
359         // Unlock the bucket and try again
360         bucket.mutex.unlock();
361     }
362 }
363 
364 // Lock the two buckets for the given pair of keys
lock_bucket_pair<'a>(key1: usize, key2: usize) -> (&'a Bucket, &'a Bucket)365 unsafe fn lock_bucket_pair<'a>(key1: usize, key2: usize) -> (&'a Bucket, &'a Bucket) {
366     let mut bucket1;
367     loop {
368         let hashtable = get_hashtable();
369 
370         // Get the lowest bucket first
371         let hash1 = hash(key1, (*hashtable).hash_bits);
372         let hash2 = hash(key2, (*hashtable).hash_bits);
373         if hash1 <= hash2 {
374             bucket1 = &(*hashtable).entries[hash1];
375         } else {
376             bucket1 = &(*hashtable).entries[hash2];
377         }
378 
379         // Lock the first bucket
380         bucket1.mutex.lock();
381 
382         // If no other thread has rehashed the table before we grabbed the lock
383         // then we are good to go! The lock we grabbed prevents any rehashes.
384         if HASHTABLE.load(Ordering::Relaxed) == hashtable as usize {
385             // Now lock the second bucket and return the two buckets
386             if hash1 == hash2 {
387                 return (bucket1, bucket1);
388             } else if hash1 < hash2 {
389                 let bucket2 = &(*hashtable).entries[hash2];
390                 bucket2.mutex.lock();
391                 return (bucket1, bucket2);
392             } else {
393                 let bucket2 = &(*hashtable).entries[hash1];
394                 bucket2.mutex.lock();
395                 return (bucket2, bucket1);
396             }
397         }
398 
399         // Unlock the bucket and try again
400         bucket1.mutex.unlock();
401     }
402 }
403 
404 // Unlock a pair of buckets
unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket)405 unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) {
406     if bucket1 as *const _ == bucket2 as *const _ {
407         bucket1.mutex.unlock();
408     } else if bucket1 as *const _ < bucket2 as *const _ {
409         bucket2.mutex.unlock();
410         bucket1.mutex.unlock();
411     } else {
412         bucket1.mutex.unlock();
413         bucket2.mutex.unlock();
414     }
415 }
416 
417 /// Result of a park operation.
418 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
419 pub enum ParkResult {
420     /// We were unparked by another thread with the given token.
421     Unparked(UnparkToken),
422 
423     /// The validation callback returned false.
424     Invalid,
425 
426     /// The timeout expired.
427     TimedOut,
428 }
429 
430 impl ParkResult {
431     /// Returns true if we were unparked by another thread.
is_unparked(self) -> bool432     pub fn is_unparked(self) -> bool {
433         if let ParkResult::Unparked(_) = self {
434             true
435         } else {
436             false
437         }
438     }
439 }
440 
441 /// Result of an unpark operation.
442 #[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
443 pub struct UnparkResult {
444     /// The number of threads that were unparked.
445     pub unparked_threads: usize,
446 
447     /// The number of threads that were requeued.
448     pub requeued_threads: usize,
449 
450     /// Whether there are any threads remaining in the queue. This only returns
451     /// true if a thread was unparked.
452     pub have_more_threads: bool,
453 
454     /// This is set to true on average once every 0.5ms for any given key. It
455     /// should be used to switch to a fair unlocking mechanism for a particular
456     /// unlock.
457     pub be_fair: bool,
458 
459     /// Private field so new fields can be added without breakage.
460     _sealed: (),
461 }
462 
463 /// Operation that `unpark_requeue` should perform.
464 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
465 pub enum RequeueOp {
466     /// Abort the operation without doing anything.
467     Abort,
468 
469     /// Unpark one thread and requeue the rest onto the target queue.
470     UnparkOneRequeueRest,
471 
472     /// Requeue all threads onto the target queue.
473     RequeueAll,
474 
475     /// Unpark one thread and leave the rest parked. No requeuing is done.
476     UnparkOne,
477 
478     /// Requeue one thread and leave the rest parked on the original queue.
479     RequeueOne,
480 }
481 
482 /// Operation that `unpark_filter` should perform for each thread.
483 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
484 pub enum FilterOp {
485     /// Unpark the thread and continue scanning the list of parked threads.
486     Unpark,
487 
488     /// Don't unpark the thread and continue scanning the list of parked threads.
489     Skip,
490 
491     /// Don't unpark the thread and stop scanning the list of parked threads.
492     Stop,
493 }
494 
495 /// A value which is passed from an unparker to a parked thread.
496 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
497 pub struct UnparkToken(pub usize);
498 
499 /// A value associated with a parked thread which can be used by `unpark_filter`.
500 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
501 pub struct ParkToken(pub usize);
502 
503 /// A default unpark token to use.
504 pub const DEFAULT_UNPARK_TOKEN: UnparkToken = UnparkToken(0);
505 
506 /// A default park token to use.
507 pub const DEFAULT_PARK_TOKEN: ParkToken = ParkToken(0);
508 
509 /// Parks the current thread in the queue associated with the given key.
510 ///
511 /// The `validate` function is called while the queue is locked and can abort
512 /// the operation by returning false. If `validate` returns true then the
513 /// current thread is appended to the queue and the queue is unlocked.
514 ///
515 /// The `before_sleep` function is called after the queue is unlocked but before
516 /// the thread is put to sleep. The thread will then sleep until it is unparked
517 /// or the given timeout is reached.
518 ///
519 /// The `timed_out` function is also called while the queue is locked, but only
520 /// if the timeout was reached. It is passed the key of the queue it was in when
521 /// it timed out, which may be different from the original key if
522 /// `unpark_requeue` was called. It is also passed a bool which indicates
523 /// whether it was the last thread in the queue.
524 ///
525 /// # Safety
526 ///
527 /// You should only call this function with an address that you control, since
528 /// you could otherwise interfere with the operation of other synchronization
529 /// primitives.
530 ///
531 /// The `validate` and `timed_out` functions are called while the queue is
532 /// locked and must not panic or call into any function in `parking_lot`.
533 ///
534 /// The `before_sleep` function is called outside the queue lock and is allowed
535 /// to call `unpark_one`, `unpark_all`, `unpark_requeue` or `unpark_filter`, but
536 /// it is not allowed to call `park` or panic.
537 #[inline]
park<V, B, T>( key: usize, validate: V, before_sleep: B, timed_out: T, park_token: ParkToken, timeout: Option<Instant>, ) -> ParkResult where V: FnOnce() -> bool, B: FnOnce(), T: FnOnce(usize, bool),538 pub unsafe fn park<V, B, T>(
539     key: usize,
540     validate: V,
541     before_sleep: B,
542     timed_out: T,
543     park_token: ParkToken,
544     timeout: Option<Instant>,
545 ) -> ParkResult
546 where
547     V: FnOnce() -> bool,
548     B: FnOnce(),
549     T: FnOnce(usize, bool),
550 {
551     let mut v = Some(validate);
552     let mut b = Some(before_sleep);
553     let mut t = Some(timed_out);
554     park_internal(
555         key,
556         &mut || v.take().unchecked_unwrap()(),
557         &mut || b.take().unchecked_unwrap()(),
558         &mut |key, was_last_thread| t.take().unchecked_unwrap()(key, was_last_thread),
559         park_token,
560         timeout,
561     )
562 }
563 
564 // Non-generic version to reduce monomorphization cost
park_internal( key: usize, validate: &mut FnMut() -> bool, before_sleep: &mut FnMut(), timed_out: &mut FnMut(usize, bool), park_token: ParkToken, timeout: Option<Instant>, ) -> ParkResult565 unsafe fn park_internal(
566     key: usize,
567     validate: &mut FnMut() -> bool,
568     before_sleep: &mut FnMut(),
569     timed_out: &mut FnMut(usize, bool),
570     park_token: ParkToken,
571     timeout: Option<Instant>,
572 ) -> ParkResult {
573     // Grab our thread data, this also ensures that the hash table exists
574     let mut thread_data = None;
575     let thread_data = get_thread_data(&mut thread_data);
576 
577     // Lock the bucket for the given key
578     let bucket = lock_bucket(key);
579 
580     // If the validation function fails, just return
581     if !validate() {
582         bucket.mutex.unlock();
583         return ParkResult::Invalid;
584     }
585 
586     // Append our thread data to the queue and unlock the bucket
587     thread_data.parked_with_timeout.set(timeout.is_some());
588     thread_data.next_in_queue.set(ptr::null());
589     thread_data.key.store(key, Ordering::Relaxed);
590     thread_data.park_token.set(park_token);
591     thread_data.parker.prepare_park();
592     if !bucket.queue_head.get().is_null() {
593         (*bucket.queue_tail.get()).next_in_queue.set(thread_data);
594     } else {
595         bucket.queue_head.set(thread_data);
596     }
597     bucket.queue_tail.set(thread_data);
598     bucket.mutex.unlock();
599 
600     // Invoke the pre-sleep callback
601     before_sleep();
602 
603     // Park our thread and determine whether we were woken up by an unpark or by
604     // our timeout. Note that this isn't precise: we can still be unparked since
605     // we are still in the queue.
606     let unparked = match timeout {
607         Some(timeout) => thread_data.parker.park_until(timeout),
608         None => {
609             thread_data.parker.park();
610             // call deadlock detection on_unpark hook
611             deadlock::on_unpark(thread_data);
612             true
613         }
614     };
615 
616     // If we were unparked, return now
617     if unparked {
618         return ParkResult::Unparked(thread_data.unpark_token.get());
619     }
620 
621     // Lock our bucket again. Note that the hashtable may have been rehashed in
622     // the meantime. Our key may also have changed if we were requeued.
623     let (key, bucket) = lock_bucket_checked(&thread_data.key);
624 
625     // Now we need to check again if we were unparked or timed out. Unlike the
626     // last check this is precise because we hold the bucket lock.
627     if !thread_data.parker.timed_out() {
628         bucket.mutex.unlock();
629         return ParkResult::Unparked(thread_data.unpark_token.get());
630     }
631 
632     // We timed out, so we now need to remove our thread from the queue
633     let mut link = &bucket.queue_head;
634     let mut current = bucket.queue_head.get();
635     let mut previous = ptr::null();
636     while !current.is_null() {
637         if current == thread_data {
638             let next = (*current).next_in_queue.get();
639             link.set(next);
640             let mut was_last_thread = true;
641             if bucket.queue_tail.get() == current {
642                 bucket.queue_tail.set(previous);
643             } else {
644                 // Scan the rest of the queue to see if there are any other
645                 // entries with the given key.
646                 let mut scan = next;
647                 while !scan.is_null() {
648                     if (*scan).key.load(Ordering::Relaxed) == key {
649                         was_last_thread = false;
650                         break;
651                     }
652                     scan = (*scan).next_in_queue.get();
653                 }
654             }
655 
656             // Callback to indicate that we timed out, and whether we were the
657             // last thread on the queue.
658             timed_out(key, was_last_thread);
659             break;
660         } else {
661             link = &(*current).next_in_queue;
662             previous = current;
663             current = link.get();
664         }
665     }
666 
667     // There should be no way for our thread to have been removed from the queue
668     // if we timed out.
669     debug_assert!(!current.is_null());
670 
671     // Unlock the bucket, we are done
672     bucket.mutex.unlock();
673     ParkResult::TimedOut
674 }
675 
676 /// Unparks one thread from the queue associated with the given key.
677 ///
678 /// The `callback` function is called while the queue is locked and before the
679 /// target thread is woken up. The `UnparkResult` argument to the function
680 /// indicates whether a thread was found in the queue and whether this was the
681 /// last thread in the queue. This value is also returned by `unpark_one`.
682 ///
683 /// The `callback` function should return an `UnparkToken` value which will be
684 /// passed to the thread that is unparked. If no thread is unparked then the
685 /// returned value is ignored.
686 ///
687 /// # Safety
688 ///
689 /// You should only call this function with an address that you control, since
690 /// you could otherwise interfere with the operation of other synchronization
691 /// primitives.
692 ///
693 /// The `callback` function is called while the queue is locked and must not
694 /// panic or call into any function in `parking_lot`.
695 #[inline]
unpark_one<C>(key: usize, callback: C) -> UnparkResult where C: FnOnce(UnparkResult) -> UnparkToken,696 pub unsafe fn unpark_one<C>(key: usize, callback: C) -> UnparkResult
697 where
698     C: FnOnce(UnparkResult) -> UnparkToken,
699 {
700     let mut c = Some(callback);
701     unpark_one_internal(key, &mut |result| c.take().unchecked_unwrap()(result))
702 }
703 
704 // Non-generic version to reduce monomorphization cost
unpark_one_internal( key: usize, callback: &mut FnMut(UnparkResult) -> UnparkToken, ) -> UnparkResult705 unsafe fn unpark_one_internal(
706     key: usize,
707     callback: &mut FnMut(UnparkResult) -> UnparkToken,
708 ) -> UnparkResult {
709     // Lock the bucket for the given key
710     let bucket = lock_bucket(key);
711 
712     // Find a thread with a matching key and remove it from the queue
713     let mut link = &bucket.queue_head;
714     let mut current = bucket.queue_head.get();
715     let mut previous = ptr::null();
716     let mut result = UnparkResult::default();
717     while !current.is_null() {
718         if (*current).key.load(Ordering::Relaxed) == key {
719             // Remove the thread from the queue
720             let next = (*current).next_in_queue.get();
721             link.set(next);
722             if bucket.queue_tail.get() == current {
723                 bucket.queue_tail.set(previous);
724             } else {
725                 // Scan the rest of the queue to see if there are any other
726                 // entries with the given key.
727                 let mut scan = next;
728                 while !scan.is_null() {
729                     if (*scan).key.load(Ordering::Relaxed) == key {
730                         result.have_more_threads = true;
731                         break;
732                     }
733                     scan = (*scan).next_in_queue.get();
734                 }
735             }
736 
737             // Invoke the callback before waking up the thread
738             result.unparked_threads = 1;
739             result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
740             let token = callback(result);
741 
742             // Set the token for the target thread
743             (*current).unpark_token.set(token);
744 
745             // This is a bit tricky: we first lock the ThreadParker to prevent
746             // the thread from exiting and freeing its ThreadData if its wait
747             // times out. Then we unlock the queue since we don't want to keep
748             // the queue locked while we perform a system call. Finally we wake
749             // up the parked thread.
750             let handle = (*current).parker.unpark_lock();
751             bucket.mutex.unlock();
752             handle.unpark();
753 
754             return result;
755         } else {
756             link = &(*current).next_in_queue;
757             previous = current;
758             current = link.get();
759         }
760     }
761 
762     // No threads with a matching key were found in the bucket
763     callback(result);
764     bucket.mutex.unlock();
765     result
766 }
767 
768 /// Unparks all threads in the queue associated with the given key.
769 ///
770 /// The given `UnparkToken` is passed to all unparked threads.
771 ///
772 /// This function returns the number of threads that were unparked.
773 ///
774 /// # Safety
775 ///
776 /// You should only call this function with an address that you control, since
777 /// you could otherwise interfere with the operation of other synchronization
778 /// primitives.
unpark_all(key: usize, unpark_token: UnparkToken) -> usize779 pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize {
780     // Lock the bucket for the given key
781     let bucket = lock_bucket(key);
782 
783     // Remove all threads with the given key in the bucket
784     let mut link = &bucket.queue_head;
785     let mut current = bucket.queue_head.get();
786     let mut previous = ptr::null();
787     let mut threads = SmallVec::<[_; 8]>::new();
788     while !current.is_null() {
789         if (*current).key.load(Ordering::Relaxed) == key {
790             // Remove the thread from the queue
791             let next = (*current).next_in_queue.get();
792             link.set(next);
793             if bucket.queue_tail.get() == current {
794                 bucket.queue_tail.set(previous);
795             }
796 
797             // Set the token for the target thread
798             (*current).unpark_token.set(unpark_token);
799 
800             // Don't wake up threads while holding the queue lock. See comment
801             // in unpark_one. For now just record which threads we need to wake
802             // up.
803             threads.push((*current).parker.unpark_lock());
804             current = next;
805         } else {
806             link = &(*current).next_in_queue;
807             previous = current;
808             current = link.get();
809         }
810     }
811 
812     // Unlock the bucket
813     bucket.mutex.unlock();
814 
815     // Now that we are outside the lock, wake up all the threads that we removed
816     // from the queue.
817     let num_threads = threads.len();
818     for handle in threads.into_iter() {
819         handle.unpark();
820     }
821 
822     num_threads
823 }
824 
825 /// Removes all threads from the queue associated with `key_from`, optionally
826 /// unparks the first one and requeues the rest onto the queue associated with
827 /// `key_to`.
828 ///
829 /// The `validate` function is called while both queues are locked. Its return
830 /// value will determine which operation is performed, or whether the operation
831 /// should be aborted. See `RequeueOp` for details about the different possible
832 /// return values.
833 ///
834 /// The `callback` function is also called while both queues are locked. It is
835 /// passed the `RequeueOp` returned by `validate` and an `UnparkResult`
836 /// indicating whether a thread was unparked and whether there are threads still
837 /// parked in the new queue. This `UnparkResult` value is also returned by
838 /// `unpark_requeue`.
839 ///
840 /// The `callback` function should return an `UnparkToken` value which will be
841 /// passed to the thread that is unparked. If no thread is unparked then the
842 /// returned value is ignored.
843 ///
844 /// # Safety
845 ///
846 /// You should only call this function with an address that you control, since
847 /// you could otherwise interfere with the operation of other synchronization
848 /// primitives.
849 ///
850 /// The `validate` and `callback` functions are called while the queue is locked
851 /// and must not panic or call into any function in `parking_lot`.
852 #[inline]
unpark_requeue<V, C>( key_from: usize, key_to: usize, validate: V, callback: C, ) -> UnparkResult where V: FnOnce() -> RequeueOp, C: FnOnce(RequeueOp, UnparkResult) -> UnparkToken,853 pub unsafe fn unpark_requeue<V, C>(
854     key_from: usize,
855     key_to: usize,
856     validate: V,
857     callback: C,
858 ) -> UnparkResult
859 where
860     V: FnOnce() -> RequeueOp,
861     C: FnOnce(RequeueOp, UnparkResult) -> UnparkToken,
862 {
863     let mut v = Some(validate);
864     let mut c = Some(callback);
865     unpark_requeue_internal(
866         key_from,
867         key_to,
868         &mut || v.take().unchecked_unwrap()(),
869         &mut |op, r| c.take().unchecked_unwrap()(op, r),
870     )
871 }
872 
873 // Non-generic version to reduce monomorphization cost
unpark_requeue_internal( key_from: usize, key_to: usize, validate: &mut FnMut() -> RequeueOp, callback: &mut FnMut(RequeueOp, UnparkResult) -> UnparkToken, ) -> UnparkResult874 unsafe fn unpark_requeue_internal(
875     key_from: usize,
876     key_to: usize,
877     validate: &mut FnMut() -> RequeueOp,
878     callback: &mut FnMut(RequeueOp, UnparkResult) -> UnparkToken,
879 ) -> UnparkResult {
880     // Lock the two buckets for the given key
881     let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to);
882 
883     // If the validation function fails, just return
884     let mut result = UnparkResult::default();
885     let op = validate();
886     if op == RequeueOp::Abort {
887         unlock_bucket_pair(bucket_from, bucket_to);
888         return result;
889     }
890 
891     // Remove all threads with the given key in the source bucket
892     let mut link = &bucket_from.queue_head;
893     let mut current = bucket_from.queue_head.get();
894     let mut previous = ptr::null();
895     let mut requeue_threads: *const ThreadData = ptr::null();
896     let mut requeue_threads_tail: *const ThreadData = ptr::null();
897     let mut wakeup_thread = None;
898     while !current.is_null() {
899         if (*current).key.load(Ordering::Relaxed) == key_from {
900             // Remove the thread from the queue
901             let next = (*current).next_in_queue.get();
902             link.set(next);
903             if bucket_from.queue_tail.get() == current {
904                 bucket_from.queue_tail.set(previous);
905             }
906 
907             // Prepare the first thread for wakeup and requeue the rest.
908             if (op == RequeueOp::UnparkOneRequeueRest || op == RequeueOp::UnparkOne)
909                 && wakeup_thread.is_none()
910             {
911                 wakeup_thread = Some(current);
912                 result.unparked_threads = 1;
913             } else {
914                 if !requeue_threads.is_null() {
915                     (*requeue_threads_tail).next_in_queue.set(current);
916                 } else {
917                     requeue_threads = current;
918                 }
919                 requeue_threads_tail = current;
920                 (*current).key.store(key_to, Ordering::Relaxed);
921                 result.requeued_threads += 1;
922             }
923             if op == RequeueOp::UnparkOne || op == RequeueOp::RequeueOne {
924                 // Scan the rest of the queue to see if there are any other
925                 // entries with the given key.
926                 let mut scan = next;
927                 while !scan.is_null() {
928                     if (*scan).key.load(Ordering::Relaxed) == key_from {
929                         result.have_more_threads = true;
930                         break;
931                     }
932                     scan = (*scan).next_in_queue.get();
933                 }
934                 break;
935             }
936             current = next;
937         } else {
938             link = &(*current).next_in_queue;
939             previous = current;
940             current = link.get();
941         }
942     }
943 
944     // Add the requeued threads to the destination bucket
945     if !requeue_threads.is_null() {
946         (*requeue_threads_tail).next_in_queue.set(ptr::null());
947         if !bucket_to.queue_head.get().is_null() {
948             (*bucket_to.queue_tail.get())
949                 .next_in_queue
950                 .set(requeue_threads);
951         } else {
952             bucket_to.queue_head.set(requeue_threads);
953         }
954         bucket_to.queue_tail.set(requeue_threads_tail);
955     }
956 
957     // Invoke the callback before waking up the thread
958     if result.unparked_threads != 0 {
959         result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout();
960     }
961     let token = callback(op, result);
962 
963     // See comment in unpark_one for why we mess with the locking
964     if let Some(wakeup_thread) = wakeup_thread {
965         (*wakeup_thread).unpark_token.set(token);
966         let handle = (*wakeup_thread).parker.unpark_lock();
967         unlock_bucket_pair(bucket_from, bucket_to);
968         handle.unpark();
969     } else {
970         unlock_bucket_pair(bucket_from, bucket_to);
971     }
972 
973     result
974 }
975 
976 /// Unparks a number of threads from the front of the queue associated with
977 /// `key` depending on the results of a filter function which inspects the
978 /// `ParkToken` associated with each thread.
979 ///
980 /// The `filter` function is called for each thread in the queue or until
981 /// `FilterOp::Stop` is returned. This function is passed the `ParkToken`
982 /// associated with a particular thread, which is unparked if `FilterOp::Unpark`
983 /// is returned.
984 ///
985 /// The `callback` function is also called while both queues are locked. It is
986 /// passed an `UnparkResult` indicating the number of threads that were unparked
987 /// and whether there are still parked threads in the queue. This `UnparkResult`
988 /// value is also returned by `unpark_filter`.
989 ///
990 /// The `callback` function should return an `UnparkToken` value which will be
991 /// passed to all threads that are unparked. If no thread is unparked then the
992 /// returned value is ignored.
993 ///
994 /// # Safety
995 ///
996 /// You should only call this function with an address that you control, since
997 /// you could otherwise interfere with the operation of other synchronization
998 /// primitives.
999 ///
1000 /// The `filter` and `callback` functions are called while the queue is locked
1001 /// and must not panic or call into any function in `parking_lot`.
1002 #[inline]
unpark_filter<F, C>(key: usize, mut filter: F, callback: C) -> UnparkResult where F: FnMut(ParkToken) -> FilterOp, C: FnOnce(UnparkResult) -> UnparkToken,1003 pub unsafe fn unpark_filter<F, C>(key: usize, mut filter: F, callback: C) -> UnparkResult
1004 where
1005     F: FnMut(ParkToken) -> FilterOp,
1006     C: FnOnce(UnparkResult) -> UnparkToken,
1007 {
1008     let mut c = Some(callback);
1009     unpark_filter_internal(key, &mut filter, &mut |r| c.take().unchecked_unwrap()(r))
1010 }
1011 
1012 // Non-generic version to reduce monomorphization cost
unpark_filter_internal( key: usize, filter: &mut FnMut(ParkToken) -> FilterOp, callback: &mut FnMut(UnparkResult) -> UnparkToken, ) -> UnparkResult1013 unsafe fn unpark_filter_internal(
1014     key: usize,
1015     filter: &mut FnMut(ParkToken) -> FilterOp,
1016     callback: &mut FnMut(UnparkResult) -> UnparkToken,
1017 ) -> UnparkResult {
1018     // Lock the bucket for the given key
1019     let bucket = lock_bucket(key);
1020 
1021     // Go through the queue looking for threads with a matching key
1022     let mut link = &bucket.queue_head;
1023     let mut current = bucket.queue_head.get();
1024     let mut previous = ptr::null();
1025     let mut threads = SmallVec::<[_; 8]>::new();
1026     let mut result = UnparkResult::default();
1027     while !current.is_null() {
1028         if (*current).key.load(Ordering::Relaxed) == key {
1029             // Call the filter function with the thread's ParkToken
1030             let next = (*current).next_in_queue.get();
1031             match filter((*current).park_token.get()) {
1032                 FilterOp::Unpark => {
1033                     // Remove the thread from the queue
1034                     link.set(next);
1035                     if bucket.queue_tail.get() == current {
1036                         bucket.queue_tail.set(previous);
1037                     }
1038 
1039                     // Add the thread to our list of threads to unpark
1040                     threads.push((current, None));
1041 
1042                     current = next;
1043                 }
1044                 FilterOp::Skip => {
1045                     result.have_more_threads = true;
1046                     link = &(*current).next_in_queue;
1047                     previous = current;
1048                     current = link.get();
1049                 }
1050                 FilterOp::Stop => {
1051                     result.have_more_threads = true;
1052                     break;
1053                 }
1054             }
1055         } else {
1056             link = &(*current).next_in_queue;
1057             previous = current;
1058             current = link.get();
1059         }
1060     }
1061 
1062     // Invoke the callback before waking up the threads
1063     result.unparked_threads = threads.len();
1064     if result.unparked_threads != 0 {
1065         result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
1066     }
1067     let token = callback(result);
1068 
1069     // Pass the token to all threads that are going to be unparked and prepare
1070     // them for unparking.
1071     for t in threads.iter_mut() {
1072         (*t.0).unpark_token.set(token);
1073         t.1 = Some((*t.0).parker.unpark_lock());
1074     }
1075 
1076     bucket.mutex.unlock();
1077 
1078     // Now that we are outside the lock, wake up all the threads that we removed
1079     // from the queue.
1080     for (_, handle) in threads.into_iter() {
1081         handle.unchecked_unwrap().unpark();
1082     }
1083 
1084     result
1085 }
1086 
1087 /// \[Experimental\] Deadlock detection
1088 ///
1089 /// Enabled via the `deadlock_detection` feature flag.
1090 pub mod deadlock {
1091     #[cfg(feature = "deadlock_detection")]
1092     use super::deadlock_impl;
1093 
1094     #[cfg(feature = "deadlock_detection")]
1095     pub(super) use super::deadlock_impl::DeadlockData;
1096 
1097     #[cfg(not(feature = "deadlock_detection"))]
1098     pub(super) struct DeadlockData {}
1099 
1100     #[cfg(not(feature = "deadlock_detection"))]
1101     impl DeadlockData {
new() -> Self1102         pub(super) fn new() -> Self {
1103             DeadlockData {}
1104         }
1105     }
1106 
1107     /// Acquire a resource identified by key in the deadlock detector
1108     /// Noop if deadlock_detection feature isn't enabled.
1109     /// Note: Call after the resource is acquired
1110     #[inline]
acquire_resource(_key: usize)1111     pub unsafe fn acquire_resource(_key: usize) {
1112         #[cfg(feature = "deadlock_detection")]
1113         deadlock_impl::acquire_resource(_key);
1114     }
1115 
1116     /// Release a resource identified by key in the deadlock detector.
1117     /// Noop if deadlock_detection feature isn't enabled.
1118     /// Note: Call before the resource is released
1119     /// # Panics
1120     /// Panics if the resource was already released or wasn't acquired in this thread.
1121     #[inline]
release_resource(_key: usize)1122     pub unsafe fn release_resource(_key: usize) {
1123         #[cfg(feature = "deadlock_detection")]
1124         deadlock_impl::release_resource(_key);
1125     }
1126 
1127     /// Returns all deadlocks detected *since* the last call.
1128     /// Each cycle consist of a vector of `DeadlockedThread`.
1129     #[cfg(feature = "deadlock_detection")]
1130     #[inline]
check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>>1131     pub fn check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>> {
1132         deadlock_impl::check_deadlock()
1133     }
1134 
1135     #[inline]
on_unpark(_td: &super::ThreadData)1136     pub(super) unsafe fn on_unpark(_td: &super::ThreadData) {
1137         #[cfg(feature = "deadlock_detection")]
1138         deadlock_impl::on_unpark(_td);
1139     }
1140 }
1141 
1142 #[cfg(feature = "deadlock_detection")]
1143 mod deadlock_impl {
1144     use super::{get_hashtable, get_thread_data, lock_bucket, ThreadData, NUM_THREADS};
1145     use backtrace::Backtrace;
1146     use petgraph;
1147     use petgraph::graphmap::DiGraphMap;
1148     use std::cell::{Cell, UnsafeCell};
1149     use std::collections::HashSet;
1150     use std::sync::atomic::Ordering;
1151     use std::sync::mpsc;
1152     use thread_id;
1153 
1154     /// Representation of a deadlocked thread
1155     pub struct DeadlockedThread {
1156         thread_id: usize,
1157         backtrace: Backtrace,
1158     }
1159 
1160     impl DeadlockedThread {
1161         /// The system thread id
thread_id(&self) -> usize1162         pub fn thread_id(&self) -> usize {
1163             self.thread_id
1164         }
1165 
1166         /// The thread backtrace
backtrace(&self) -> &Backtrace1167         pub fn backtrace(&self) -> &Backtrace {
1168             &self.backtrace
1169         }
1170     }
1171 
1172     pub struct DeadlockData {
1173         // Currently owned resources (keys)
1174         resources: UnsafeCell<Vec<usize>>,
1175 
1176         // Set when there's a pending callstack request
1177         deadlocked: Cell<bool>,
1178 
1179         // Sender used to report the backtrace
1180         backtrace_sender: UnsafeCell<Option<mpsc::Sender<DeadlockedThread>>>,
1181 
1182         // System thread id
1183         thread_id: usize,
1184     }
1185 
1186     impl DeadlockData {
new() -> Self1187         pub fn new() -> Self {
1188             DeadlockData {
1189                 resources: UnsafeCell::new(Vec::new()),
1190                 deadlocked: Cell::new(false),
1191                 backtrace_sender: UnsafeCell::new(None),
1192                 thread_id: thread_id::get(),
1193             }
1194         }
1195     }
1196 
on_unpark(td: &ThreadData)1197     pub(super) unsafe fn on_unpark(td: &ThreadData) {
1198         if td.deadlock_data.deadlocked.get() {
1199             let sender = (*td.deadlock_data.backtrace_sender.get()).take().unwrap();
1200             sender
1201                 .send(DeadlockedThread {
1202                     thread_id: td.deadlock_data.thread_id,
1203                     backtrace: Backtrace::new(),
1204                 })
1205                 .unwrap();
1206             // make sure to close this sender
1207             drop(sender);
1208 
1209             // park until the end of the time
1210             td.parker.prepare_park();
1211             td.parker.park();
1212             unreachable!("unparked deadlocked thread!");
1213         }
1214     }
1215 
acquire_resource(key: usize)1216     pub unsafe fn acquire_resource(key: usize) {
1217         let mut thread_data = None;
1218         let thread_data = get_thread_data(&mut thread_data);
1219         (*thread_data.deadlock_data.resources.get()).push(key);
1220     }
1221 
release_resource(key: usize)1222     pub unsafe fn release_resource(key: usize) {
1223         let mut thread_data = None;
1224         let thread_data = get_thread_data(&mut thread_data);
1225         let resources = &mut (*thread_data.deadlock_data.resources.get());
1226         match resources.iter().rposition(|x| *x == key) {
1227             Some(p) => resources.swap_remove(p),
1228             None => panic!("key {} not found in thread resources", key),
1229         };
1230     }
1231 
check_deadlock() -> Vec<Vec<DeadlockedThread>>1232     pub fn check_deadlock() -> Vec<Vec<DeadlockedThread>> {
1233         unsafe {
1234             // fast pass
1235             if check_wait_graph_fast() {
1236                 // double check
1237                 check_wait_graph_slow()
1238             } else {
1239                 Vec::new()
1240             }
1241         }
1242     }
1243 
1244     // Simple algorithm that builds a wait graph f the threads and the resources,
1245     // then checks for the presence of cycles (deadlocks).
1246     // This variant isn't precise as it doesn't lock the entire table before checking
check_wait_graph_fast() -> bool1247     unsafe fn check_wait_graph_fast() -> bool {
1248         let table = get_hashtable();
1249         let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1250         let mut graph = DiGraphMap::<usize, ()>::with_capacity(thread_count * 2, thread_count * 2);
1251 
1252         for b in &(*table).entries[..] {
1253             b.mutex.lock();
1254             let mut current = b.queue_head.get();
1255             while !current.is_null() {
1256                 if !(*current).parked_with_timeout.get()
1257                     && !(*current).deadlock_data.deadlocked.get()
1258                 {
1259                     // .resources are waiting for their owner
1260                     for &resource in &(*(*current).deadlock_data.resources.get()) {
1261                         graph.add_edge(resource, current as usize, ());
1262                     }
1263                     // owner waits for resource .key
1264                     graph.add_edge(current as usize, (*current).key.load(Ordering::Relaxed), ());
1265                 }
1266                 current = (*current).next_in_queue.get();
1267             }
1268             b.mutex.unlock();
1269         }
1270 
1271         petgraph::algo::is_cyclic_directed(&graph)
1272     }
1273 
1274     #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
1275     enum WaitGraphNode {
1276         Thread(*const ThreadData),
1277         Resource(usize),
1278     }
1279 
1280     use self::WaitGraphNode::*;
1281 
1282     // Contrary to the _fast variant this locks the entries table before looking for cycles.
1283     // Returns all detected thread wait cycles.
1284     // Note that once a cycle is reported it's never reported again.
check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>>1285     unsafe fn check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>> {
1286         let mut table = get_hashtable();
1287         loop {
1288             // Lock all buckets in the old table
1289             for b in &(*table).entries[..] {
1290                 b.mutex.lock();
1291             }
1292 
1293             // Now check if our table is still the latest one. Another thread could
1294             // have grown the hash table between us getting and locking the hash table.
1295             let new_table = get_hashtable();
1296             if new_table == table {
1297                 break;
1298             }
1299 
1300             // Unlock buckets and try again
1301             for b in &(*table).entries[..] {
1302                 b.mutex.unlock();
1303             }
1304 
1305             table = new_table;
1306         }
1307 
1308         let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1309         let mut graph =
1310             DiGraphMap::<WaitGraphNode, ()>::with_capacity(thread_count * 2, thread_count * 2);
1311 
1312         for b in &(*table).entries[..] {
1313             let mut current = b.queue_head.get();
1314             while !current.is_null() {
1315                 if !(*current).parked_with_timeout.get()
1316                     && !(*current).deadlock_data.deadlocked.get()
1317                 {
1318                     // .resources are waiting for their owner
1319                     for &resource in &(*(*current).deadlock_data.resources.get()) {
1320                         graph.add_edge(Resource(resource), Thread(current), ());
1321                     }
1322                     // owner waits for resource .key
1323                     graph.add_edge(
1324                         Thread(current),
1325                         Resource((*current).key.load(Ordering::Relaxed)),
1326                         (),
1327                     );
1328                 }
1329                 current = (*current).next_in_queue.get();
1330             }
1331         }
1332 
1333         for b in &(*table).entries[..] {
1334             b.mutex.unlock();
1335         }
1336 
1337         // find cycles
1338         let cycles = graph_cycles(&graph);
1339 
1340         let mut results = Vec::with_capacity(cycles.len());
1341 
1342         for cycle in cycles {
1343             let (sender, receiver) = mpsc::channel();
1344             for td in cycle {
1345                 let bucket = lock_bucket((*td).key.load(Ordering::Relaxed));
1346                 (*td).deadlock_data.deadlocked.set(true);
1347                 *(*td).deadlock_data.backtrace_sender.get() = Some(sender.clone());
1348                 let handle = (*td).parker.unpark_lock();
1349                 bucket.mutex.unlock();
1350                 // unpark the deadlocked thread!
1351                 // on unpark it'll notice the deadlocked flag and report back
1352                 handle.unpark();
1353             }
1354             // make sure to drop our sender before collecting results
1355             drop(sender);
1356             results.push(receiver.iter().collect());
1357         }
1358 
1359         results
1360     }
1361 
1362     // normalize a cycle to start with the "smallest" node
normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T>1363     fn normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T> {
1364         let min_pos = input
1365             .iter()
1366             .enumerate()
1367             .min_by_key(|&(_, &t)| t)
1368             .map(|(p, _)| p)
1369             .unwrap_or(0);
1370         input
1371             .iter()
1372             .cycle()
1373             .skip(min_pos)
1374             .take(input.len())
1375             .cloned()
1376             .collect()
1377     }
1378 
1379     // returns all thread cycles in the wait graph
graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>>1380     fn graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>> {
1381         use petgraph::visit::depth_first_search;
1382         use petgraph::visit::DfsEvent;
1383         use petgraph::visit::NodeIndexable;
1384 
1385         let mut cycles = HashSet::new();
1386         let mut path = Vec::with_capacity(g.node_bound());
1387         // start from threads to get the correct threads cycle
1388         let threads = g
1389             .nodes()
1390             .filter(|n| if let &Thread(_) = n { true } else { false });
1391 
1392         depth_first_search(g, threads, |e| match e {
1393             DfsEvent::Discover(Thread(n), _) => path.push(n),
1394             DfsEvent::Finish(Thread(_), _) => {
1395                 path.pop();
1396             }
1397             DfsEvent::BackEdge(_, Thread(n)) => {
1398                 let from = path.iter().rposition(|&i| i == n).unwrap();
1399                 cycles.insert(normalize_cycle(&path[from..]));
1400             }
1401             _ => (),
1402         });
1403 
1404         cycles.iter().cloned().collect()
1405     }
1406 }
1407