1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 
8 use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
9 use std::time::{Duration, Instant};
10 use std::cell::{Cell, UnsafeCell};
11 use std::ptr;
12 use std::mem;
13 use std::thread::LocalKey;
14 #[cfg(not(feature = "nightly"))]
15 use std::panic;
16 use smallvec::SmallVec;
17 use rand::{self, Rng, XorShiftRng};
18 use thread_parker::ThreadParker;
19 use word_lock::WordLock;
20 use util::UncheckedOptionExt;
21 
22 static NUM_THREADS: AtomicUsize = ATOMIC_USIZE_INIT;
23 static HASHTABLE: AtomicUsize = ATOMIC_USIZE_INIT;
24 
25 // Even with 3x more buckets than threads, the memory overhead per thread is
26 // still only a few hundred bytes per thread.
27 const LOAD_FACTOR: usize = 3;
28 
29 struct HashTable {
30     // Hash buckets for the table
31     entries: Box<[Bucket]>,
32 
33     // Number of bits used for the hash function
34     hash_bits: u32,
35 
36     // Previous table. This is only kept to keep leak detectors happy.
37     _prev: *const HashTable,
38 }
39 
40 impl HashTable {
new(num_threads: usize, prev: *const HashTable) -> Box<HashTable>41     fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> {
42         let new_size = (num_threads * LOAD_FACTOR).next_power_of_two();
43         let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1;
44         let bucket = Bucket {
45             mutex: WordLock::new(),
46             queue_head: Cell::new(ptr::null()),
47             queue_tail: Cell::new(ptr::null()),
48             fair_timeout: UnsafeCell::new(FairTimeout::new()),
49             _padding: unsafe { mem::uninitialized() },
50         };
51         Box::new(HashTable {
52             entries: vec![bucket; new_size].into_boxed_slice(),
53             hash_bits: hash_bits,
54             _prev: prev,
55         })
56     }
57 }
58 
59 struct Bucket {
60     // Lock protecting the queue
61     mutex: WordLock,
62 
63     // Linked list of threads waiting on this bucket
64     queue_head: Cell<*const ThreadData>,
65     queue_tail: Cell<*const ThreadData>,
66 
67     // Next time at which point be_fair should be set
68     fair_timeout: UnsafeCell<FairTimeout>,
69 
70     // Padding to avoid false sharing between buckets. Ideally we would just
71     // align the bucket structure to 64 bytes, but Rust doesn't support that
72     // yet.
73     _padding: [u8; 64],
74 }
75 
76 // Implementation of Clone for Bucket, needed to make vec![] work
77 impl Clone for Bucket {
clone(&self) -> Bucket78     fn clone(&self) -> Bucket {
79         Bucket {
80             mutex: WordLock::new(),
81             queue_head: Cell::new(ptr::null()),
82             queue_tail: Cell::new(ptr::null()),
83             fair_timeout: UnsafeCell::new(FairTimeout::new()),
84             _padding: unsafe { mem::uninitialized() },
85         }
86     }
87 }
88 
89 struct FairTimeout {
90     // Next time at which point be_fair should be set
91     timeout: Instant,
92 
93     // Random number generator for calculating the next timeout
94     rng: XorShiftRng,
95 }
96 
97 impl FairTimeout {
new() -> FairTimeout98     fn new() -> FairTimeout {
99         FairTimeout {
100             timeout: Instant::now(),
101             rng: rand::weak_rng(),
102         }
103     }
104 
105     // Determine whether we should force a fair unlock, and update the timeout
should_timeout(&mut self) -> bool106     fn should_timeout(&mut self) -> bool {
107         let now = Instant::now();
108         if now > self.timeout {
109             self.timeout = now + Duration::new(0, self.rng.gen_range(0, 1000000));
110             true
111         } else {
112             false
113         }
114     }
115 }
116 
117 struct ThreadData {
118     parker: ThreadParker,
119 
120     // Key that this thread is sleeping on. This may change if the thread is
121     // requeued to a different key.
122     key: AtomicUsize,
123 
124     // Linked list of parked threads in a bucket
125     next_in_queue: Cell<*const ThreadData>,
126 
127     // UnparkToken passed to this thread when it is unparked
128     unpark_token: Cell<UnparkToken>,
129 
130     // ParkToken value set by the thread when it was parked
131     park_token: Cell<ParkToken>,
132 
133     // Is the thread parked with a timeout?
134     parked_with_timeout: Cell<bool>,
135 
136     // Extra data for deadlock detection
137     // TODO: once supported in stable replace with #[cfg...] & remove dummy struct/impl
138     #[allow(dead_code)] deadlock_data: deadlock::DeadlockData,
139 }
140 
141 impl ThreadData {
new() -> ThreadData142     fn new() -> ThreadData {
143         // Keep track of the total number of live ThreadData objects and resize
144         // the hash table accordingly.
145         let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1;
146         unsafe {
147             grow_hashtable(num_threads);
148         }
149 
150         ThreadData {
151             parker: ThreadParker::new(),
152             key: AtomicUsize::new(0),
153             next_in_queue: Cell::new(ptr::null()),
154             unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN),
155             park_token: Cell::new(DEFAULT_PARK_TOKEN),
156             parked_with_timeout: Cell::new(false),
157             deadlock_data: deadlock::DeadlockData::new(),
158         }
159     }
160 }
161 
162 // Returns a ThreadData structure for the current thread
get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData163 unsafe fn get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData {
164     // Try to read from thread-local storage, but return None if the TLS has
165     // already been destroyed.
166     #[cfg(feature = "nightly")]
167     fn try_get_tls(key: &'static LocalKey<ThreadData>) -> Option<*const ThreadData> {
168         key.try_with(|x| x as *const ThreadData).ok()
169     }
170     #[cfg(not(feature = "nightly"))]
171     fn try_get_tls(key: &'static LocalKey<ThreadData>) -> Option<*const ThreadData> {
172         panic::catch_unwind(|| key.with(|x| x as *const ThreadData)).ok()
173     }
174 
175     // Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive
176     // to construct. Try to use a thread-local version if possible.
177     thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
178     if let Some(tls) = try_get_tls(&THREAD_DATA) {
179         return &*tls;
180     }
181 
182     // Otherwise just create a ThreadData on the stack
183     *local = Some(ThreadData::new());
184     local.as_ref().unwrap()
185 }
186 
187 impl Drop for ThreadData {
drop(&mut self)188     fn drop(&mut self) {
189         NUM_THREADS.fetch_sub(1, Ordering::Relaxed);
190     }
191 }
192 
193 // Get a pointer to the latest hash table, creating one if it doesn't exist yet.
get_hashtable() -> *const HashTable194 unsafe fn get_hashtable() -> *const HashTable {
195     let mut table = HASHTABLE.load(Ordering::Acquire);
196 
197     // If there is no table, create one
198     if table == 0 {
199         let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null()));
200 
201         // If this fails then it means some other thread created the hash
202         // table first.
203         match HASHTABLE.compare_exchange(
204             0,
205             new_table as usize,
206             Ordering::Release,
207             Ordering::Relaxed,
208         ) {
209             Ok(_) => return new_table,
210             Err(x) => table = x,
211         }
212 
213         // Free the table we created
214         Box::from_raw(new_table);
215     }
216 
217     table as *const HashTable
218 }
219 
220 // Grow the hash table so that it is big enough for the given number of threads.
221 // This isn't performance-critical since it is only done when a ThreadData is
222 // created, which only happens once per thread.
grow_hashtable(num_threads: usize)223 unsafe fn grow_hashtable(num_threads: usize) {
224     // If there is no table, create one
225     if HASHTABLE.load(Ordering::Relaxed) == 0 {
226         let new_table = Box::into_raw(HashTable::new(num_threads, ptr::null()));
227 
228         // If this fails then it means some other thread created the hash
229         // table first.
230         if HASHTABLE
231             .compare_exchange(0, new_table as usize, Ordering::Release, Ordering::Relaxed)
232             .is_ok()
233         {
234             return;
235         }
236 
237         // Free the table we created
238         Box::from_raw(new_table);
239     }
240 
241     let mut old_table;
242     loop {
243         old_table = HASHTABLE.load(Ordering::Acquire) as *mut HashTable;
244 
245         // Check if we need to resize the existing table
246         if (*old_table).entries.len() >= LOAD_FACTOR * num_threads {
247             return;
248         }
249 
250         // Lock all buckets in the old table
251         for b in &(*old_table).entries[..] {
252             b.mutex.lock();
253         }
254 
255         // Now check if our table is still the latest one. Another thread could
256         // have grown the hash table between us reading HASHTABLE and locking
257         // the buckets.
258         if HASHTABLE.load(Ordering::Relaxed) == old_table as usize {
259             break;
260         }
261 
262         // Unlock buckets and try again
263         for b in &(*old_table).entries[..] {
264             b.mutex.unlock();
265         }
266     }
267 
268     // Create the new table
269     let new_table = HashTable::new(num_threads, old_table);
270 
271     // Move the entries from the old table to the new one
272     for b in &(*old_table).entries[..] {
273         let mut current = b.queue_head.get();
274         while !current.is_null() {
275             let next = (*current).next_in_queue.get();
276             let hash = hash((*current).key.load(Ordering::Relaxed), new_table.hash_bits);
277             if new_table.entries[hash].queue_tail.get().is_null() {
278                 new_table.entries[hash].queue_head.set(current);
279             } else {
280                 (*new_table.entries[hash].queue_tail.get())
281                     .next_in_queue
282                     .set(current);
283             }
284             new_table.entries[hash].queue_tail.set(current);
285             (*current).next_in_queue.set(ptr::null());
286             current = next;
287         }
288     }
289 
290     // Publish the new table. No races are possible at this point because
291     // any other thread trying to grow the hash table is blocked on the bucket
292     // locks in the old table.
293     HASHTABLE.store(Box::into_raw(new_table) as usize, Ordering::Release);
294 
295     // Unlock all buckets in the old table
296     for b in &(*old_table).entries[..] {
297         b.mutex.unlock();
298     }
299 }
300 
301 // Hash function for addresses
302 #[cfg(target_pointer_width = "32")]
hash(key: usize, bits: u32) -> usize303 fn hash(key: usize, bits: u32) -> usize {
304     key.wrapping_mul(0x9E3779B9) >> (32 - bits)
305 }
306 #[cfg(target_pointer_width = "64")]
hash(key: usize, bits: u32) -> usize307 fn hash(key: usize, bits: u32) -> usize {
308     key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits)
309 }
310 
311 // Lock the bucket for the given key
lock_bucket<'a>(key: usize) -> &'a Bucket312 unsafe fn lock_bucket<'a>(key: usize) -> &'a Bucket {
313     let mut bucket;
314     loop {
315         let hashtable = get_hashtable();
316 
317         let hash = hash(key, (*hashtable).hash_bits);
318         bucket = &(*hashtable).entries[hash];
319 
320         // Lock the bucket
321         bucket.mutex.lock();
322 
323         // If no other thread has rehashed the table before we grabbed the lock
324         // then we are good to go! The lock we grabbed prevents any rehashes.
325         if HASHTABLE.load(Ordering::Relaxed) == hashtable as usize {
326             return bucket;
327         }
328 
329         // Unlock the bucket and try again
330         bucket.mutex.unlock();
331     }
332 }
333 
334 // Lock the bucket for the given key, but check that the key hasn't been changed
335 // in the meantime due to a requeue.
lock_bucket_checked<'a>(key: &AtomicUsize) -> (usize, &'a Bucket)336 unsafe fn lock_bucket_checked<'a>(key: &AtomicUsize) -> (usize, &'a Bucket) {
337     let mut bucket;
338     loop {
339         let hashtable = get_hashtable();
340         let current_key = key.load(Ordering::Relaxed);
341 
342         let hash = hash(current_key, (*hashtable).hash_bits);
343         bucket = &(*hashtable).entries[hash];
344 
345         // Lock the bucket
346         bucket.mutex.lock();
347 
348         // Check that both the hash table and key are correct while the bucket
349         // is locked. Note that the key can't change once we locked the proper
350         // bucket for it, so we just keep trying until we have the correct key.
351         if HASHTABLE.load(Ordering::Relaxed) == hashtable as usize
352             && key.load(Ordering::Relaxed) == current_key
353         {
354             return (current_key, bucket);
355         }
356 
357         // Unlock the bucket and try again
358         bucket.mutex.unlock();
359     }
360 }
361 
362 // Lock the two buckets for the given pair of keys
lock_bucket_pair<'a>(key1: usize, key2: usize) -> (&'a Bucket, &'a Bucket)363 unsafe fn lock_bucket_pair<'a>(key1: usize, key2: usize) -> (&'a Bucket, &'a Bucket) {
364     let mut bucket1;
365     loop {
366         let hashtable = get_hashtable();
367 
368         // Get the lowest bucket first
369         let hash1 = hash(key1, (*hashtable).hash_bits);
370         let hash2 = hash(key2, (*hashtable).hash_bits);
371         if hash1 <= hash2 {
372             bucket1 = &(*hashtable).entries[hash1];
373         } else {
374             bucket1 = &(*hashtable).entries[hash2];
375         }
376 
377         // Lock the first bucket
378         bucket1.mutex.lock();
379 
380         // If no other thread has rehashed the table before we grabbed the lock
381         // then we are good to go! The lock we grabbed prevents any rehashes.
382         if HASHTABLE.load(Ordering::Relaxed) == hashtable as usize {
383             // Now lock the second bucket and return the two buckets
384             if hash1 == hash2 {
385                 return (bucket1, bucket1);
386             } else if hash1 < hash2 {
387                 let bucket2 = &(*hashtable).entries[hash2];
388                 bucket2.mutex.lock();
389                 return (bucket1, bucket2);
390             } else {
391                 let bucket2 = &(*hashtable).entries[hash1];
392                 bucket2.mutex.lock();
393                 return (bucket2, bucket1);
394             }
395         }
396 
397         // Unlock the bucket and try again
398         bucket1.mutex.unlock();
399     }
400 }
401 
402 // Unlock a pair of buckets
unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket)403 unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) {
404     if bucket1 as *const _ == bucket2 as *const _ {
405         bucket1.mutex.unlock();
406     } else if bucket1 as *const _ < bucket2 as *const _ {
407         bucket2.mutex.unlock();
408         bucket1.mutex.unlock();
409     } else {
410         bucket1.mutex.unlock();
411         bucket2.mutex.unlock();
412     }
413 }
414 
415 /// Result of a park operation.
416 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
417 pub enum ParkResult {
418     /// We were unparked by another thread with the given token.
419     Unparked(UnparkToken),
420 
421     /// The validation callback returned false.
422     Invalid,
423 
424     /// The timeout expired.
425     TimedOut,
426 }
427 
428 impl ParkResult {
429     /// Returns true if we were unparked by another thread.
is_unparked(self) -> bool430     pub fn is_unparked(self) -> bool {
431         if let ParkResult::Unparked(_) = self {
432             true
433         } else {
434             false
435         }
436     }
437 }
438 
439 /// Result of an unpark operation.
440 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
441 pub struct UnparkResult {
442     /// The number of threads that were unparked.
443     pub unparked_threads: usize,
444 
445     /// Whether there are any threads remaining in the queue. This only returns
446     /// true if a thread was unparked.
447     pub have_more_threads: bool,
448 
449     /// This is set to true on average once every 0.5ms for any given key. It
450     /// should be used to switch to a fair unlocking mechanism for a particular
451     /// unlock.
452     pub be_fair: bool,
453 }
454 
455 /// Operation that `unpark_requeue` should perform.
456 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
457 pub enum RequeueOp {
458     /// Abort the operation without doing anything.
459     Abort,
460 
461     /// Unpark one thread and requeue the rest onto the target queue.
462     UnparkOneRequeueRest,
463 
464     /// Requeue all threads onto the target queue.
465     RequeueAll,
466 }
467 
468 /// Operation that `unpark_filter` should perform for each thread.
469 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
470 pub enum FilterOp {
471     /// Unpark the thread and continue scanning the list of parked threads.
472     Unpark,
473 
474     /// Don't unpark the thread and continue scanning the list of parked threads.
475     Skip,
476 
477     /// Don't unpark the thread and stop scanning the list of parked threads.
478     Stop,
479 }
480 
481 /// A value which is passed from an unparker to a parked thread.
482 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
483 pub struct UnparkToken(pub usize);
484 
485 /// A value associated with a parked thread which can be used by `unpark_filter`.
486 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
487 pub struct ParkToken(pub usize);
488 
489 /// A default unpark token to use.
490 pub const DEFAULT_UNPARK_TOKEN: UnparkToken = UnparkToken(0);
491 
492 /// A default park token to use.
493 pub const DEFAULT_PARK_TOKEN: ParkToken = ParkToken(0);
494 
495 /// Parks the current thread in the queue associated with the given key.
496 ///
497 /// The `validate` function is called while the queue is locked and can abort
498 /// the operation by returning false. If `validate` returns true then the
499 /// current thread is appended to the queue and the queue is unlocked.
500 ///
501 /// The `before_sleep` function is called after the queue is unlocked but before
502 /// the thread is put to sleep. The thread will then sleep until it is unparked
503 /// or the given timeout is reached.
504 ///
505 /// The `timed_out` function is also called while the queue is locked, but only
506 /// if the timeout was reached. It is passed the key of the queue it was in when
507 /// it timed out, which may be different from the original key if
508 /// `unpark_requeue` was called. It is also passed a bool which indicates
509 /// whether it was the last thread in the queue.
510 ///
511 /// # Safety
512 ///
513 /// You should only call this function with an address that you control, since
514 /// you could otherwise interfere with the operation of other synchronization
515 /// primitives.
516 ///
517 /// The `validate` and `timed_out` functions are called while the queue is
518 /// locked and must not panic or call into any function in `parking_lot`.
519 ///
520 /// The `before_sleep` function is called outside the queue lock and is allowed
521 /// to call `unpark_one`, `unpark_all`, `unpark_requeue` or `unpark_filter`, but
522 /// it is not allowed to call `park` or panic.
523 #[inline]
park<V, B, T>( key: usize, validate: V, before_sleep: B, timed_out: T, park_token: ParkToken, timeout: Option<Instant>, ) -> ParkResult where V: FnOnce() -> bool, B: FnOnce(), T: FnOnce(usize, bool),524 pub unsafe fn park<V, B, T>(
525     key: usize,
526     validate: V,
527     before_sleep: B,
528     timed_out: T,
529     park_token: ParkToken,
530     timeout: Option<Instant>,
531 ) -> ParkResult
532 where
533     V: FnOnce() -> bool,
534     B: FnOnce(),
535     T: FnOnce(usize, bool),
536 {
537     let mut v = Some(validate);
538     let mut b = Some(before_sleep);
539     let mut t = Some(timed_out);
540     park_internal(
541         key,
542         &mut || v.take().unchecked_unwrap()(),
543         &mut || b.take().unchecked_unwrap()(),
544         &mut |key, was_last_thread| t.take().unchecked_unwrap()(key, was_last_thread),
545         park_token,
546         timeout,
547     )
548 }
549 
550 // Non-generic version to reduce monomorphization cost
park_internal( key: usize, validate: &mut FnMut() -> bool, before_sleep: &mut FnMut(), timed_out: &mut FnMut(usize, bool), park_token: ParkToken, timeout: Option<Instant>, ) -> ParkResult551 unsafe fn park_internal(
552     key: usize,
553     validate: &mut FnMut() -> bool,
554     before_sleep: &mut FnMut(),
555     timed_out: &mut FnMut(usize, bool),
556     park_token: ParkToken,
557     timeout: Option<Instant>,
558 ) -> ParkResult {
559     // Grab our thread data, this also ensures that the hash table exists
560     let mut thread_data = None;
561     let thread_data = get_thread_data(&mut thread_data);
562 
563     // Lock the bucket for the given key
564     let bucket = lock_bucket(key);
565 
566     // If the validation function fails, just return
567     if !validate() {
568         bucket.mutex.unlock();
569         return ParkResult::Invalid;
570     }
571 
572     // Append our thread data to the queue and unlock the bucket
573     thread_data.parked_with_timeout.set(timeout.is_some());
574     thread_data.next_in_queue.set(ptr::null());
575     thread_data.key.store(key, Ordering::Relaxed);
576     thread_data.park_token.set(park_token);
577     thread_data.parker.prepare_park();
578     if !bucket.queue_head.get().is_null() {
579         (*bucket.queue_tail.get()).next_in_queue.set(thread_data);
580     } else {
581         bucket.queue_head.set(thread_data);
582     }
583     bucket.queue_tail.set(thread_data);
584     bucket.mutex.unlock();
585 
586     // Invoke the pre-sleep callback
587     before_sleep();
588 
589     // Park our thread and determine whether we were woken up by an unpark or by
590     // our timeout. Note that this isn't precise: we can still be unparked since
591     // we are still in the queue.
592     let unparked = match timeout {
593         Some(timeout) => thread_data.parker.park_until(timeout),
594         None => {
595             thread_data.parker.park();
596             // call deadlock detection on_unpark hook
597             deadlock::on_unpark(thread_data);
598             true
599         }
600     };
601 
602     // If we were unparked, return now
603     if unparked {
604         return ParkResult::Unparked(thread_data.unpark_token.get());
605     }
606 
607     // Lock our bucket again. Note that the hashtable may have been rehashed in
608     // the meantime. Our key may also have changed if we were requeued.
609     let (key, bucket) = lock_bucket_checked(&thread_data.key);
610 
611     // Now we need to check again if we were unparked or timed out. Unlike the
612     // last check this is precise because we hold the bucket lock.
613     if !thread_data.parker.timed_out() {
614         bucket.mutex.unlock();
615         return ParkResult::Unparked(thread_data.unpark_token.get());
616     }
617 
618     // We timed out, so we now need to remove our thread from the queue
619     let mut link = &bucket.queue_head;
620     let mut current = bucket.queue_head.get();
621     let mut previous = ptr::null();
622     while !current.is_null() {
623         if current == thread_data {
624             let next = (*current).next_in_queue.get();
625             link.set(next);
626             let mut was_last_thread = true;
627             if bucket.queue_tail.get() == current {
628                 bucket.queue_tail.set(previous);
629             } else {
630                 // Scan the rest of the queue to see if there are any other
631                 // entries with the given key.
632                 let mut scan = next;
633                 while !scan.is_null() {
634                     if (*scan).key.load(Ordering::Relaxed) == key {
635                         was_last_thread = false;
636                         break;
637                     }
638                     scan = (*scan).next_in_queue.get();
639                 }
640             }
641 
642             // Callback to indicate that we timed out, and whether we were the
643             // last thread on the queue.
644             timed_out(key, was_last_thread);
645             break;
646         } else {
647             link = &(*current).next_in_queue;
648             previous = current;
649             current = link.get();
650         }
651     }
652 
653     // There should be no way for our thread to have been removed from the queue
654     // if we timed out.
655     debug_assert!(!current.is_null());
656 
657     // Unlock the bucket, we are done
658     bucket.mutex.unlock();
659     ParkResult::TimedOut
660 }
661 
662 /// Unparks one thread from the queue associated with the given key.
663 ///
664 /// The `callback` function is called while the queue is locked and before the
665 /// target thread is woken up. The `UnparkResult` argument to the function
666 /// indicates whether a thread was found in the queue and whether this was the
667 /// last thread in the queue. This value is also returned by `unpark_one`.
668 ///
669 /// The `callback` function should return an `UnparkToken` value which will be
670 /// passed to the thread that is unparked. If no thread is unparked then the
671 /// returned value is ignored.
672 ///
673 /// # Safety
674 ///
675 /// You should only call this function with an address that you control, since
676 /// you could otherwise interfere with the operation of other synchronization
677 /// primitives.
678 ///
679 /// The `callback` function is called while the queue is locked and must not
680 /// panic or call into any function in `parking_lot`.
681 #[inline]
unpark_one<C>(key: usize, callback: C) -> UnparkResult where C: FnOnce(UnparkResult) -> UnparkToken,682 pub unsafe fn unpark_one<C>(key: usize, callback: C) -> UnparkResult
683 where
684     C: FnOnce(UnparkResult) -> UnparkToken,
685 {
686     let mut c = Some(callback);
687     unpark_one_internal(key, &mut |result| c.take().unchecked_unwrap()(result))
688 }
689 
690 // Non-generic version to reduce monomorphization cost
unpark_one_internal( key: usize, callback: &mut FnMut(UnparkResult) -> UnparkToken, ) -> UnparkResult691 unsafe fn unpark_one_internal(
692     key: usize,
693     callback: &mut FnMut(UnparkResult) -> UnparkToken,
694 ) -> UnparkResult {
695     // Lock the bucket for the given key
696     let bucket = lock_bucket(key);
697 
698     // Find a thread with a matching key and remove it from the queue
699     let mut link = &bucket.queue_head;
700     let mut current = bucket.queue_head.get();
701     let mut previous = ptr::null();
702     let mut result = UnparkResult {
703         unparked_threads: 0,
704         have_more_threads: false,
705         be_fair: false,
706     };
707     while !current.is_null() {
708         if (*current).key.load(Ordering::Relaxed) == key {
709             // Remove the thread from the queue
710             let next = (*current).next_in_queue.get();
711             link.set(next);
712             if bucket.queue_tail.get() == current {
713                 bucket.queue_tail.set(previous);
714             } else {
715                 // Scan the rest of the queue to see if there are any other
716                 // entries with the given key.
717                 let mut scan = next;
718                 while !scan.is_null() {
719                     if (*scan).key.load(Ordering::Relaxed) == key {
720                         result.have_more_threads = true;
721                         break;
722                     }
723                     scan = (*scan).next_in_queue.get();
724                 }
725             }
726 
727             // Invoke the callback before waking up the thread
728             result.unparked_threads = 1;
729             result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
730             let token = callback(result);
731 
732             // Set the token for the target thread
733             (*current).unpark_token.set(token);
734 
735             // This is a bit tricky: we first lock the ThreadParker to prevent
736             // the thread from exiting and freeing its ThreadData if its wait
737             // times out. Then we unlock the queue since we don't want to keep
738             // the queue locked while we perform a system call. Finally we wake
739             // up the parked thread.
740             let handle = (*current).parker.unpark_lock();
741             bucket.mutex.unlock();
742             handle.unpark();
743 
744             return result;
745         } else {
746             link = &(*current).next_in_queue;
747             previous = current;
748             current = link.get();
749         }
750     }
751 
752     // No threads with a matching key were found in the bucket
753     callback(result);
754     bucket.mutex.unlock();
755     result
756 }
757 
758 /// Unparks all threads in the queue associated with the given key.
759 ///
760 /// The given `UnparkToken` is passed to all unparked threads.
761 ///
762 /// This function returns the number of threads that were unparked.
763 ///
764 /// # Safety
765 ///
766 /// You should only call this function with an address that you control, since
767 /// you could otherwise interfere with the operation of other synchronization
768 /// primitives.
unpark_all(key: usize, unpark_token: UnparkToken) -> usize769 pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize {
770     // Lock the bucket for the given key
771     let bucket = lock_bucket(key);
772 
773     // Remove all threads with the given key in the bucket
774     let mut link = &bucket.queue_head;
775     let mut current = bucket.queue_head.get();
776     let mut previous = ptr::null();
777     let mut threads = SmallVec::<[_; 8]>::new();
778     while !current.is_null() {
779         if (*current).key.load(Ordering::Relaxed) == key {
780             // Remove the thread from the queue
781             let next = (*current).next_in_queue.get();
782             link.set(next);
783             if bucket.queue_tail.get() == current {
784                 bucket.queue_tail.set(previous);
785             }
786 
787             // Set the token for the target thread
788             (*current).unpark_token.set(unpark_token);
789 
790             // Don't wake up threads while holding the queue lock. See comment
791             // in unpark_one. For now just record which threads we need to wake
792             // up.
793             threads.push((*current).parker.unpark_lock());
794             current = next;
795         } else {
796             link = &(*current).next_in_queue;
797             previous = current;
798             current = link.get();
799         }
800     }
801 
802     // Unlock the bucket
803     bucket.mutex.unlock();
804 
805     // Now that we are outside the lock, wake up all the threads that we removed
806     // from the queue.
807     let num_threads = threads.len();
808     for handle in threads.into_iter() {
809         handle.unpark();
810     }
811 
812     num_threads
813 }
814 
815 /// Removes all threads from the queue associated with `key_from`, optionally
816 /// unparks the first one and requeues the rest onto the queue associated with
817 /// `key_to`.
818 ///
819 /// The `validate` function is called while both queues are locked and can abort
820 /// the operation by returning `RequeueOp::Abort`. It can also choose to
821 /// unpark the first thread in the source queue while moving the rest by
822 /// returning `RequeueOp::UnparkFirstRequeueRest`. Returning
823 /// `RequeueOp::RequeueAll` will move all threads to the destination queue.
824 ///
825 /// The `callback` function is also called while both queues are locked. It is
826 /// passed the `RequeueOp` returned by `validate` and an `UnparkResult`
827 /// indicating whether a thread was unparked and whether there are threads still
828 /// parked in the new queue. This `UnparkResult` value is also returned by
829 /// `unpark_requeue`.
830 ///
831 /// The `callback` function should return an `UnparkToken` value which will be
832 /// passed to the thread that is unparked. If no thread is unparked then the
833 /// returned value is ignored.
834 ///
835 /// # Safety
836 ///
837 /// You should only call this function with an address that you control, since
838 /// you could otherwise interfere with the operation of other synchronization
839 /// primitives.
840 ///
841 /// The `validate` and `callback` functions are called while the queue is locked
842 /// and must not panic or call into any function in `parking_lot`.
843 #[inline]
unpark_requeue<V, C>( key_from: usize, key_to: usize, validate: V, callback: C, ) -> UnparkResult where V: FnOnce() -> RequeueOp, C: FnOnce(RequeueOp, UnparkResult) -> UnparkToken,844 pub unsafe fn unpark_requeue<V, C>(
845     key_from: usize,
846     key_to: usize,
847     validate: V,
848     callback: C,
849 ) -> UnparkResult
850 where
851     V: FnOnce() -> RequeueOp,
852     C: FnOnce(RequeueOp, UnparkResult) -> UnparkToken,
853 {
854     let mut v = Some(validate);
855     let mut c = Some(callback);
856     unpark_requeue_internal(
857         key_from,
858         key_to,
859         &mut || v.take().unchecked_unwrap()(),
860         &mut |op, r| c.take().unchecked_unwrap()(op, r),
861     )
862 }
863 
864 // Non-generic version to reduce monomorphization cost
unpark_requeue_internal( key_from: usize, key_to: usize, validate: &mut FnMut() -> RequeueOp, callback: &mut FnMut(RequeueOp, UnparkResult) -> UnparkToken, ) -> UnparkResult865 unsafe fn unpark_requeue_internal(
866     key_from: usize,
867     key_to: usize,
868     validate: &mut FnMut() -> RequeueOp,
869     callback: &mut FnMut(RequeueOp, UnparkResult) -> UnparkToken,
870 ) -> UnparkResult {
871     // Lock the two buckets for the given key
872     let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to);
873 
874     // If the validation function fails, just return
875     let mut result = UnparkResult {
876         unparked_threads: 0,
877         have_more_threads: false,
878         be_fair: false,
879     };
880     let op = validate();
881     if op == RequeueOp::Abort {
882         unlock_bucket_pair(bucket_from, bucket_to);
883         return result;
884     }
885 
886     // Remove all threads with the given key in the source bucket
887     let mut link = &bucket_from.queue_head;
888     let mut current = bucket_from.queue_head.get();
889     let mut previous = ptr::null();
890     let mut requeue_threads: *const ThreadData = ptr::null();
891     let mut requeue_threads_tail: *const ThreadData = ptr::null();
892     let mut wakeup_thread = None;
893     while !current.is_null() {
894         if (*current).key.load(Ordering::Relaxed) == key_from {
895             // Remove the thread from the queue
896             let next = (*current).next_in_queue.get();
897             link.set(next);
898             if bucket_from.queue_tail.get() == current {
899                 bucket_from.queue_tail.set(previous);
900             }
901 
902             // Prepare the first thread for wakeup and requeue the rest.
903             if op == RequeueOp::UnparkOneRequeueRest && wakeup_thread.is_none() {
904                 wakeup_thread = Some(current);
905                 result.unparked_threads = 1;
906             } else {
907                 if !requeue_threads.is_null() {
908                     (*requeue_threads_tail).next_in_queue.set(current);
909                 } else {
910                     requeue_threads = current;
911                 }
912                 requeue_threads_tail = current;
913                 (*current).key.store(key_to, Ordering::Relaxed);
914                 result.have_more_threads = true;
915             }
916             current = next;
917         } else {
918             link = &(*current).next_in_queue;
919             previous = current;
920             current = link.get();
921         }
922     }
923 
924     // Add the requeued threads to the destination bucket
925     if !requeue_threads.is_null() {
926         (*requeue_threads_tail).next_in_queue.set(ptr::null());
927         if !bucket_to.queue_head.get().is_null() {
928             (*bucket_to.queue_tail.get())
929                 .next_in_queue
930                 .set(requeue_threads);
931         } else {
932             bucket_to.queue_head.set(requeue_threads);
933         }
934         bucket_to.queue_tail.set(requeue_threads_tail);
935     }
936 
937     // Invoke the callback before waking up the thread
938     if result.unparked_threads != 0 {
939         result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout();
940     }
941     let token = callback(op, result);
942 
943     // See comment in unpark_one for why we mess with the locking
944     if let Some(wakeup_thread) = wakeup_thread {
945         (*wakeup_thread).unpark_token.set(token);
946         let handle = (*wakeup_thread).parker.unpark_lock();
947         unlock_bucket_pair(bucket_from, bucket_to);
948         handle.unpark();
949     } else {
950         unlock_bucket_pair(bucket_from, bucket_to);
951     }
952 
953     result
954 }
955 
956 /// Unparks a number of threads from the front of the queue associated with
957 /// `key` depending on the results of a filter function which inspects the
958 /// `ParkToken` associated with each thread.
959 ///
960 /// The `filter` function is called for each thread in the queue or until
961 /// `FilterOp::Stop` is returned. This function is passed the `ParkToken`
962 /// associated with a particular thread, which is unparked if `FilterOp::Unpark`
963 /// is returned.
964 ///
965 /// The `callback` function is also called while both queues are locked. It is
966 /// passed an `UnparkResult` indicating the number of threads that were unparked
967 /// and whether there are still parked threads in the queue. This `UnparkResult`
968 /// value is also returned by `unpark_filter`.
969 ///
970 /// The `callback` function should return an `UnparkToken` value which will be
971 /// passed to all threads that are unparked. If no thread is unparked then the
972 /// returned value is ignored.
973 ///
974 /// # Safety
975 ///
976 /// You should only call this function with an address that you control, since
977 /// you could otherwise interfere with the operation of other synchronization
978 /// primitives.
979 ///
980 /// The `filter` and `callback` functions are called while the queue is locked
981 /// and must not panic or call into any function in `parking_lot`.
982 #[inline]
unpark_filter<F, C>(key: usize, mut filter: F, callback: C) -> UnparkResult where F: FnMut(ParkToken) -> FilterOp, C: FnOnce(UnparkResult) -> UnparkToken,983 pub unsafe fn unpark_filter<F, C>(key: usize, mut filter: F, callback: C) -> UnparkResult
984 where
985     F: FnMut(ParkToken) -> FilterOp,
986     C: FnOnce(UnparkResult) -> UnparkToken,
987 {
988     let mut c = Some(callback);
989     unpark_filter_internal(key, &mut filter, &mut |r| c.take().unchecked_unwrap()(r))
990 }
991 
992 // Non-generic version to reduce monomorphization cost
unpark_filter_internal( key: usize, filter: &mut FnMut(ParkToken) -> FilterOp, callback: &mut FnMut(UnparkResult) -> UnparkToken, ) -> UnparkResult993 unsafe fn unpark_filter_internal(
994     key: usize,
995     filter: &mut FnMut(ParkToken) -> FilterOp,
996     callback: &mut FnMut(UnparkResult) -> UnparkToken,
997 ) -> UnparkResult {
998     // Lock the bucket for the given key
999     let bucket = lock_bucket(key);
1000 
1001     // Go through the queue looking for threads with a matching key
1002     let mut link = &bucket.queue_head;
1003     let mut current = bucket.queue_head.get();
1004     let mut previous = ptr::null();
1005     let mut threads = SmallVec::<[_; 8]>::new();
1006     let mut result = UnparkResult {
1007         unparked_threads: 0,
1008         have_more_threads: false,
1009         be_fair: false,
1010     };
1011     while !current.is_null() {
1012         if (*current).key.load(Ordering::Relaxed) == key {
1013             // Call the filter function with the thread's ParkToken
1014             let next = (*current).next_in_queue.get();
1015             match filter((*current).park_token.get()) {
1016                 FilterOp::Unpark => {
1017                     // Remove the thread from the queue
1018                     link.set(next);
1019                     if bucket.queue_tail.get() == current {
1020                         bucket.queue_tail.set(previous);
1021                     }
1022 
1023                     // Add the thread to our list of threads to unpark
1024                     threads.push((current, None));
1025 
1026                     current = next;
1027                 }
1028                 FilterOp::Skip => {
1029                     result.have_more_threads = true;
1030                     link = &(*current).next_in_queue;
1031                     previous = current;
1032                     current = link.get();
1033                 }
1034                 FilterOp::Stop => {
1035                     result.have_more_threads = true;
1036                     break;
1037                 }
1038             }
1039         } else {
1040             link = &(*current).next_in_queue;
1041             previous = current;
1042             current = link.get();
1043         }
1044     }
1045 
1046     // Invoke the callback before waking up the threads
1047     result.unparked_threads = threads.len();
1048     if result.unparked_threads != 0 {
1049         result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
1050     }
1051     let token = callback(result);
1052 
1053     // Pass the token to all threads that are going to be unparked and prepare
1054     // them for unparking.
1055     for t in threads.iter_mut() {
1056         (*t.0).unpark_token.set(token);
1057         t.1 = Some((*t.0).parker.unpark_lock());
1058     }
1059 
1060     bucket.mutex.unlock();
1061 
1062     // Now that we are outside the lock, wake up all the threads that we removed
1063     // from the queue.
1064     for (_, handle) in threads.into_iter() {
1065         handle.unchecked_unwrap().unpark();
1066     }
1067 
1068     result
1069 }
1070 
1071 /// [Experimental] Deadlock detection
1072 ///
1073 /// Enabled via the `deadlock_detection` feature flag.
1074 pub mod deadlock {
1075     #[cfg(feature = "deadlock_detection")]
1076     use super::deadlock_impl;
1077 
1078     #[cfg(feature = "deadlock_detection")]
1079     pub(super) use super::deadlock_impl::DeadlockData;
1080 
1081     #[cfg(not(feature = "deadlock_detection"))]
1082     pub(super) struct DeadlockData {}
1083 
1084     #[cfg(not(feature = "deadlock_detection"))]
1085     impl DeadlockData {
new() -> Self1086         pub(super) fn new() -> Self {
1087             DeadlockData {}
1088         }
1089     }
1090 
1091     /// Acquire a resource identified by key in the deadlock detector
1092     /// Noop if deadlock_detection feature isn't enabled.
1093     /// Note: Call after the resource is acquired
1094     #[inline]
acquire_resource(_key: usize)1095     pub unsafe fn acquire_resource(_key: usize) {
1096         #[cfg(feature = "deadlock_detection")]
1097         deadlock_impl::acquire_resource(_key);
1098     }
1099 
1100     /// Release a resource identified by key in the deadlock detector.
1101     /// Noop if deadlock_detection feature isn't enabled.
1102     /// Note: Call before the resource is released
1103     /// # Panics
1104     /// Panics if the resource was already released or wasn't acquired in this thread.
1105     #[inline]
release_resource(_key: usize)1106     pub unsafe fn release_resource(_key: usize) {
1107         #[cfg(feature = "deadlock_detection")]
1108         deadlock_impl::release_resource(_key);
1109     }
1110 
1111     /// Returns all deadlocks detected *since* the last call.
1112     /// Each cycle consist of a vector of `DeadlockedThread`.
1113     #[cfg(feature = "deadlock_detection")]
1114     #[inline]
check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>>1115     pub fn check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>> {
1116         deadlock_impl::check_deadlock()
1117     }
1118 
1119     #[inline]
on_unpark(_td: &super::ThreadData)1120     pub(super) unsafe fn on_unpark(_td: &super::ThreadData) {
1121         #[cfg(feature = "deadlock_detection")]
1122         deadlock_impl::on_unpark(_td);
1123     }
1124 }
1125 
1126 #[cfg(feature = "deadlock_detection")]
1127 mod deadlock_impl {
1128     use super::{get_hashtable, get_thread_data, lock_bucket, ThreadData, NUM_THREADS};
1129     use std::cell::{Cell, UnsafeCell};
1130     use std::sync::mpsc;
1131     use std::sync::atomic::Ordering;
1132     use std::collections::HashSet;
1133     use thread_id;
1134     use backtrace::Backtrace;
1135     use petgraph;
1136     use petgraph::graphmap::DiGraphMap;
1137 
1138     /// Representation of a deadlocked thread
1139     pub struct DeadlockedThread {
1140         thread_id: usize,
1141         backtrace: Backtrace,
1142     }
1143 
1144     impl DeadlockedThread {
1145         /// The system thread id
thread_id(&self) -> usize1146         pub fn thread_id(&self) -> usize {
1147             self.thread_id
1148         }
1149 
1150         /// The thread backtrace
backtrace(&self) -> &Backtrace1151         pub fn backtrace(&self) -> &Backtrace {
1152             &self.backtrace
1153         }
1154     }
1155 
1156     pub struct DeadlockData {
1157         // Currently owned resources (keys)
1158         resources: UnsafeCell<Vec<usize>>,
1159 
1160         // Set when there's a pending callstack request
1161         deadlocked: Cell<bool>,
1162 
1163         // Sender used to report the backtrace
1164         backtrace_sender: UnsafeCell<Option<mpsc::Sender<DeadlockedThread>>>,
1165 
1166         // System thread id
1167         thread_id: usize,
1168     }
1169 
1170     impl DeadlockData {
new() -> Self1171         pub fn new() -> Self {
1172             DeadlockData {
1173                 resources: UnsafeCell::new(Vec::new()),
1174                 deadlocked: Cell::new(false),
1175                 backtrace_sender: UnsafeCell::new(None),
1176                 thread_id: thread_id::get(),
1177             }
1178         }
1179     }
1180 
on_unpark(td: &ThreadData)1181     pub(super) unsafe fn on_unpark(td: &ThreadData) {
1182         if td.deadlock_data.deadlocked.get() {
1183             let sender = (*td.deadlock_data.backtrace_sender.get()).take().unwrap();
1184             sender
1185                 .send(DeadlockedThread {
1186                     thread_id: td.deadlock_data.thread_id,
1187                     backtrace: Backtrace::new(),
1188                 })
1189                 .unwrap();
1190             // make sure to close this sender
1191             drop(sender);
1192 
1193             // park until the end of the time
1194             td.parker.prepare_park();
1195             td.parker.park();
1196             unreachable!("unparked deadlocked thread!");
1197         }
1198     }
1199 
acquire_resource(key: usize)1200     pub unsafe fn acquire_resource(key: usize) {
1201         let mut thread_data = None;
1202         let thread_data = get_thread_data(&mut thread_data);
1203         (*thread_data.deadlock_data.resources.get()).push(key);
1204     }
1205 
release_resource(key: usize)1206     pub unsafe fn release_resource(key: usize) {
1207         let mut thread_data = None;
1208         let thread_data = get_thread_data(&mut thread_data);
1209         let resources = &mut (*thread_data.deadlock_data.resources.get());
1210         match resources.iter().rposition(|x| *x == key) {
1211             Some(p) => resources.swap_remove(p),
1212             None => panic!("key {} not found in thread resources", key),
1213         };
1214     }
1215 
check_deadlock() -> Vec<Vec<DeadlockedThread>>1216     pub fn check_deadlock() -> Vec<Vec<DeadlockedThread>> {
1217         unsafe {
1218             // fast pass
1219             if check_wait_graph_fast() {
1220                 // double check
1221                 check_wait_graph_slow()
1222             } else {
1223                 Vec::new()
1224             }
1225         }
1226     }
1227 
1228     // Simple algorithm that builds a wait graph f the threads and the resources,
1229     // then checks for the presence of cycles (deadlocks).
1230     // This variant isn't precise as it doesn't lock the entire table before checking
check_wait_graph_fast() -> bool1231     unsafe fn check_wait_graph_fast() -> bool {
1232         let table = get_hashtable();
1233         let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1234         let mut graph = DiGraphMap::<usize, ()>::with_capacity(thread_count * 2, thread_count * 2);
1235 
1236         for b in &(*table).entries[..] {
1237             b.mutex.lock();
1238             let mut current = b.queue_head.get();
1239             while !current.is_null() {
1240                 if !(*current).parked_with_timeout.get()
1241                     && !(*current).deadlock_data.deadlocked.get()
1242                 {
1243                     // .resources are waiting for their owner
1244                     for &resource in &(*(*current).deadlock_data.resources.get()) {
1245                         graph.add_edge(resource, current as usize, ());
1246                     }
1247                     // owner waits for resource .key
1248                     graph.add_edge(current as usize, (*current).key.load(Ordering::Relaxed), ());
1249                 }
1250                 current = (*current).next_in_queue.get();
1251             }
1252             b.mutex.unlock();
1253         }
1254 
1255         petgraph::algo::is_cyclic_directed(&graph)
1256     }
1257 
1258     #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
1259     enum WaitGraphNode {
1260         Thread(*const ThreadData),
1261         Resource(usize),
1262     }
1263 
1264     use self::WaitGraphNode::*;
1265 
1266     // Contrary to the _fast variant this locks the entrie table before looking for cycles.
1267     // Returns all detected thread wait cycles.
1268     // Note that once a cycle is reported it's never reported again.
check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>>1269     unsafe fn check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>> {
1270         let mut table = get_hashtable();
1271         loop {
1272             // Lock all buckets in the old table
1273             for b in &(*table).entries[..] {
1274                 b.mutex.lock();
1275             }
1276 
1277             // Now check if our table is still the latest one. Another thread could
1278             // have grown the hash table between us getting and locking the hash table.
1279             let new_table = get_hashtable();
1280             if new_table == table {
1281                 break;
1282             }
1283 
1284             // Unlock buckets and try again
1285             for b in &(*table).entries[..] {
1286                 b.mutex.unlock();
1287             }
1288 
1289             table = new_table;
1290         }
1291 
1292         let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1293         let mut graph =
1294             DiGraphMap::<WaitGraphNode, ()>::with_capacity(thread_count * 2, thread_count * 2);
1295 
1296         for b in &(*table).entries[..] {
1297             let mut current = b.queue_head.get();
1298             while !current.is_null() {
1299                 if !(*current).parked_with_timeout.get()
1300                     && !(*current).deadlock_data.deadlocked.get()
1301                 {
1302                     // .resources are waiting for their owner
1303                     for &resource in &(*(*current).deadlock_data.resources.get()) {
1304                         graph.add_edge(Resource(resource), Thread(current), ());
1305                     }
1306                     // owner waits for resource .key
1307                     graph.add_edge(
1308                         Thread(current),
1309                         Resource((*current).key.load(Ordering::Relaxed)),
1310                         (),
1311                     );
1312                 }
1313                 current = (*current).next_in_queue.get();
1314             }
1315         }
1316 
1317         for b in &(*table).entries[..] {
1318             b.mutex.unlock();
1319         }
1320 
1321         // find cycles
1322         let cycles = graph_cycles(&graph);
1323 
1324         let mut results = Vec::with_capacity(cycles.len());
1325 
1326         for cycle in cycles {
1327             let (sender, receiver) = mpsc::channel();
1328             for td in cycle {
1329                 let bucket = lock_bucket((*td).key.load(Ordering::Relaxed));
1330                 (*td).deadlock_data.deadlocked.set(true);
1331                 *(*td).deadlock_data.backtrace_sender.get() = Some(sender.clone());
1332                 let handle = (*td).parker.unpark_lock();
1333                 bucket.mutex.unlock();
1334                 // unpark the deadlocked thread!
1335                 // on unpark it'll notice the deadlocked flag and report back
1336                 handle.unpark();
1337             }
1338             // make sure to drop our sender before collecting results
1339             drop(sender);
1340             results.push(receiver.iter().collect());
1341         }
1342 
1343         results
1344     }
1345 
1346     // normalize a cycle to start with the "smallest" node
normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T>1347     fn normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T> {
1348         let min_pos = input
1349             .iter()
1350             .enumerate()
1351             .min_by_key(|&(_, &t)| t)
1352             .map(|(p, _)| p)
1353             .unwrap_or(0);
1354         input
1355             .iter()
1356             .cycle()
1357             .skip(min_pos)
1358             .take(input.len())
1359             .cloned()
1360             .collect()
1361     }
1362 
1363     // returns all thread cycles in the wait graph
graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>>1364     fn graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>> {
1365         use petgraph::visit::NodeIndexable;
1366         use petgraph::visit::depth_first_search;
1367         use petgraph::visit::DfsEvent;
1368 
1369         let mut cycles = HashSet::new();
1370         let mut path = Vec::with_capacity(g.node_bound());
1371         // start from threads to get the correct threads cycle
1372         let threads = g.nodes()
1373             .filter(|n| if let &Thread(_) = n { true } else { false });
1374 
1375         depth_first_search(g, threads, |e| match e {
1376             DfsEvent::Discover(Thread(n), _) => path.push(n),
1377             DfsEvent::Finish(Thread(_), _) => {
1378                 path.pop();
1379             }
1380             DfsEvent::BackEdge(_, Thread(n)) => {
1381                 let from = path.iter().rposition(|&i| i == n).unwrap();
1382                 cycles.insert(normalize_cycle(&path[from..]));
1383             }
1384             _ => (),
1385         });
1386 
1387         cycles.iter().cloned().collect()
1388     }
1389 }
1390