1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 
8 use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT};
9 use crate::util::UncheckedOptionExt;
10 use crate::word_lock::WordLock;
11 use core::{
12     cell::{Cell, UnsafeCell},
13     ptr,
14     sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
15 };
16 use smallvec::SmallVec;
17 use std::time::{Duration, Instant};
18 
19 static NUM_THREADS: AtomicUsize = AtomicUsize::new(0);
20 static HASHTABLE: AtomicPtr<HashTable> = AtomicPtr::new(ptr::null_mut());
21 
22 // Even with 3x more buckets than threads, the memory overhead per thread is
23 // still only a few hundred bytes per thread.
24 const LOAD_FACTOR: usize = 3;
25 
26 struct HashTable {
27     // Hash buckets for the table
28     entries: Box<[Bucket]>,
29 
30     // Number of bits used for the hash function
31     hash_bits: u32,
32 
33     // Previous table. This is only kept to keep leak detectors happy.
34     _prev: *const HashTable,
35 }
36 
37 impl HashTable {
38     #[inline]
new(num_threads: usize, prev: *const HashTable) -> Box<HashTable>39     fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> {
40         let new_size = (num_threads * LOAD_FACTOR).next_power_of_two();
41         let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1;
42 
43         let now = Instant::now();
44         let mut entries = Vec::with_capacity(new_size);
45         for i in 0..new_size {
46             // We must ensure the seed is not zero
47             entries.push(Bucket::new(now, i as u32 + 1));
48         }
49 
50         Box::new(HashTable {
51             entries: entries.into_boxed_slice(),
52             hash_bits,
53             _prev: prev,
54         })
55     }
56 }
57 
58 #[repr(align(64))]
59 struct Bucket {
60     // Lock protecting the queue
61     mutex: WordLock,
62 
63     // Linked list of threads waiting on this bucket
64     queue_head: Cell<*const ThreadData>,
65     queue_tail: Cell<*const ThreadData>,
66 
67     // Next time at which point be_fair should be set
68     fair_timeout: UnsafeCell<FairTimeout>,
69 }
70 
71 impl Bucket {
72     #[inline]
new(timeout: Instant, seed: u32) -> Self73     pub fn new(timeout: Instant, seed: u32) -> Self {
74         Self {
75             mutex: WordLock::INIT,
76             queue_head: Cell::new(ptr::null()),
77             queue_tail: Cell::new(ptr::null()),
78             fair_timeout: UnsafeCell::new(FairTimeout::new(timeout, seed)),
79         }
80     }
81 }
82 
83 struct FairTimeout {
84     // Next time at which point be_fair should be set
85     timeout: Instant,
86 
87     // the PRNG state for calculating the next timeout
88     seed: u32,
89 }
90 
91 impl FairTimeout {
92     #[inline]
new(timeout: Instant, seed: u32) -> FairTimeout93     fn new(timeout: Instant, seed: u32) -> FairTimeout {
94         FairTimeout { timeout, seed }
95     }
96 
97     // Determine whether we should force a fair unlock, and update the timeout
98     #[inline]
should_timeout(&mut self) -> bool99     fn should_timeout(&mut self) -> bool {
100         let now = Instant::now();
101         if now > self.timeout {
102             // Time between 0 and 1ms.
103             let nanos = self.gen_u32() % 1_000_000;
104             self.timeout = now + Duration::new(0, nanos);
105             true
106         } else {
107             false
108         }
109     }
110 
111     // Pseudorandom number generator from the "Xorshift RNGs" paper by George Marsaglia.
gen_u32(&mut self) -> u32112     fn gen_u32(&mut self) -> u32 {
113         self.seed ^= self.seed << 13;
114         self.seed ^= self.seed >> 17;
115         self.seed ^= self.seed << 5;
116         self.seed
117     }
118 }
119 
120 struct ThreadData {
121     parker: ThreadParker,
122 
123     // Key that this thread is sleeping on. This may change if the thread is
124     // requeued to a different key.
125     key: AtomicUsize,
126 
127     // Linked list of parked threads in a bucket
128     next_in_queue: Cell<*const ThreadData>,
129 
130     // UnparkToken passed to this thread when it is unparked
131     unpark_token: Cell<UnparkToken>,
132 
133     // ParkToken value set by the thread when it was parked
134     park_token: Cell<ParkToken>,
135 
136     // Is the thread parked with a timeout?
137     parked_with_timeout: Cell<bool>,
138 
139     // Extra data for deadlock detection
140     #[cfg(feature = "deadlock_detection")]
141     deadlock_data: deadlock::DeadlockData,
142 }
143 
144 impl ThreadData {
new() -> ThreadData145     fn new() -> ThreadData {
146         // Keep track of the total number of live ThreadData objects and resize
147         // the hash table accordingly.
148         let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1;
149         unsafe {
150             grow_hashtable(num_threads);
151         }
152 
153         ThreadData {
154             parker: ThreadParker::new(),
155             key: AtomicUsize::new(0),
156             next_in_queue: Cell::new(ptr::null()),
157             unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN),
158             park_token: Cell::new(DEFAULT_PARK_TOKEN),
159             parked_with_timeout: Cell::new(false),
160             #[cfg(feature = "deadlock_detection")]
161             deadlock_data: deadlock::DeadlockData::new(),
162         }
163     }
164 }
165 
166 // Invokes the given closure with a reference to the current thread `ThreadData`.
167 #[inline(always)]
with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T168 fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T {
169     // Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive
170     // to construct. Try to use a thread-local version if possible. Otherwise just
171     // create a ThreadData on the stack
172     let mut thread_data_storage = None;
173     thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
174     let thread_data_ptr = THREAD_DATA
175         .try_with(|x| x as *const ThreadData)
176         .unwrap_or_else(|_| thread_data_storage.get_or_insert_with(ThreadData::new));
177 
178     f(unsafe { &*thread_data_ptr })
179 }
180 
181 impl Drop for ThreadData {
drop(&mut self)182     fn drop(&mut self) {
183         NUM_THREADS.fetch_sub(1, Ordering::Relaxed);
184     }
185 }
186 
187 // Get a pointer to the latest hash table, creating one if it doesn't exist yet.
188 #[inline]
get_hashtable() -> *mut HashTable189 fn get_hashtable() -> *mut HashTable {
190     let table = HASHTABLE.load(Ordering::Acquire);
191 
192     // If there is no table, create one
193     if table.is_null() {
194         create_hashtable()
195     } else {
196         table
197     }
198 }
199 
200 // Get a pointer to the latest hash table, creating one if it doesn't exist yet.
201 #[cold]
create_hashtable() -> *mut HashTable202 fn create_hashtable() -> *mut HashTable {
203     let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null()));
204 
205     // If this fails then it means some other thread created the hash
206     // table first.
207     match HASHTABLE.compare_exchange(
208         ptr::null_mut(),
209         new_table,
210         Ordering::Release,
211         Ordering::Relaxed,
212     ) {
213         Ok(_) => new_table,
214         Err(old_table) => {
215             // Free the table we created
216             unsafe {
217                 Box::from_raw(new_table);
218             }
219             old_table
220         }
221     }
222 }
223 
224 // Grow the hash table so that it is big enough for the given number of threads.
225 // This isn't performance-critical since it is only done when a ThreadData is
226 // created, which only happens once per thread.
grow_hashtable(num_threads: usize)227 unsafe fn grow_hashtable(num_threads: usize) {
228     // If there is no table, create one
229     if HASHTABLE.load(Ordering::Relaxed).is_null() {
230         let new_table = Box::into_raw(HashTable::new(num_threads, ptr::null()));
231 
232         // If this fails then it means some other thread created the hash
233         // table first.
234         if HASHTABLE
235             .compare_exchange(
236                 ptr::null_mut(),
237                 new_table,
238                 Ordering::Release,
239                 Ordering::Relaxed,
240             )
241             .is_ok()
242         {
243             return;
244         }
245 
246         // Free the table we created
247         Box::from_raw(new_table);
248     }
249 
250     let mut old_table;
251     loop {
252         old_table = HASHTABLE.load(Ordering::Acquire);
253 
254         // Check if we need to resize the existing table
255         if (*old_table).entries.len() >= LOAD_FACTOR * num_threads {
256             return;
257         }
258 
259         // Lock all buckets in the old table
260         for b in &(*old_table).entries[..] {
261             b.mutex.lock();
262         }
263 
264         // Now check if our table is still the latest one. Another thread could
265         // have grown the hash table between us reading HASHTABLE and locking
266         // the buckets.
267         if HASHTABLE.load(Ordering::Relaxed) == old_table {
268             break;
269         }
270 
271         // Unlock buckets and try again
272         for b in &(*old_table).entries[..] {
273             b.mutex.unlock();
274         }
275     }
276 
277     // Create the new table
278     let new_table = HashTable::new(num_threads, old_table);
279 
280     // Move the entries from the old table to the new one
281     for b in &(*old_table).entries[..] {
282         let mut current = b.queue_head.get();
283         while !current.is_null() {
284             let next = (*current).next_in_queue.get();
285             let hash = hash((*current).key.load(Ordering::Relaxed), new_table.hash_bits);
286             if new_table.entries[hash].queue_tail.get().is_null() {
287                 new_table.entries[hash].queue_head.set(current);
288             } else {
289                 (*new_table.entries[hash].queue_tail.get())
290                     .next_in_queue
291                     .set(current);
292             }
293             new_table.entries[hash].queue_tail.set(current);
294             (*current).next_in_queue.set(ptr::null());
295             current = next;
296         }
297     }
298 
299     // Publish the new table. No races are possible at this point because
300     // any other thread trying to grow the hash table is blocked on the bucket
301     // locks in the old table.
302     HASHTABLE.store(Box::into_raw(new_table), Ordering::Release);
303 
304     // Unlock all buckets in the old table
305     for b in &(*old_table).entries[..] {
306         b.mutex.unlock();
307     }
308 }
309 
310 // Hash function for addresses
311 #[cfg(target_pointer_width = "32")]
312 #[inline]
hash(key: usize, bits: u32) -> usize313 fn hash(key: usize, bits: u32) -> usize {
314     key.wrapping_mul(0x9E3779B9) >> (32 - bits)
315 }
316 #[cfg(target_pointer_width = "64")]
317 #[inline]
hash(key: usize, bits: u32) -> usize318 fn hash(key: usize, bits: u32) -> usize {
319     key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits)
320 }
321 
322 // Lock the bucket for the given key
323 #[inline]
lock_bucket<'a>(key: usize) -> &'a Bucket324 unsafe fn lock_bucket<'a>(key: usize) -> &'a Bucket {
325     let mut bucket;
326     loop {
327         let hashtable = get_hashtable();
328 
329         let hash = hash(key, (*hashtable).hash_bits);
330         bucket = &(*hashtable).entries[hash];
331 
332         // Lock the bucket
333         bucket.mutex.lock();
334 
335         // If no other thread has rehashed the table before we grabbed the lock
336         // then we are good to go! The lock we grabbed prevents any rehashes.
337         if HASHTABLE.load(Ordering::Relaxed) == hashtable {
338             return bucket;
339         }
340 
341         // Unlock the bucket and try again
342         bucket.mutex.unlock();
343     }
344 }
345 
346 // Lock the bucket for the given key, but check that the key hasn't been changed
347 // in the meantime due to a requeue.
348 #[inline]
lock_bucket_checked<'a>(key: &AtomicUsize) -> (usize, &'a Bucket)349 unsafe fn lock_bucket_checked<'a>(key: &AtomicUsize) -> (usize, &'a Bucket) {
350     let mut bucket;
351     loop {
352         let hashtable = get_hashtable();
353         let current_key = key.load(Ordering::Relaxed);
354 
355         let hash = hash(current_key, (*hashtable).hash_bits);
356         bucket = &(*hashtable).entries[hash];
357 
358         // Lock the bucket
359         bucket.mutex.lock();
360 
361         // Check that both the hash table and key are correct while the bucket
362         // is locked. Note that the key can't change once we locked the proper
363         // bucket for it, so we just keep trying until we have the correct key.
364         if HASHTABLE.load(Ordering::Relaxed) == hashtable
365             && key.load(Ordering::Relaxed) == current_key
366         {
367             return (current_key, bucket);
368         }
369 
370         // Unlock the bucket and try again
371         bucket.mutex.unlock();
372     }
373 }
374 
375 // Lock the two buckets for the given pair of keys
376 #[inline]
lock_bucket_pair<'a>(key1: usize, key2: usize) -> (&'a Bucket, &'a Bucket)377 unsafe fn lock_bucket_pair<'a>(key1: usize, key2: usize) -> (&'a Bucket, &'a Bucket) {
378     let mut bucket1;
379     loop {
380         let hashtable = get_hashtable();
381 
382         // Get the lowest bucket first
383         let hash1 = hash(key1, (*hashtable).hash_bits);
384         let hash2 = hash(key2, (*hashtable).hash_bits);
385         if hash1 <= hash2 {
386             bucket1 = &(*hashtable).entries[hash1];
387         } else {
388             bucket1 = &(*hashtable).entries[hash2];
389         }
390 
391         // Lock the first bucket
392         bucket1.mutex.lock();
393 
394         // If no other thread has rehashed the table before we grabbed the lock
395         // then we are good to go! The lock we grabbed prevents any rehashes.
396         if HASHTABLE.load(Ordering::Relaxed) == hashtable {
397             // Now lock the second bucket and return the two buckets
398             if hash1 == hash2 {
399                 return (bucket1, bucket1);
400             } else if hash1 < hash2 {
401                 let bucket2 = &(*hashtable).entries[hash2];
402                 bucket2.mutex.lock();
403                 return (bucket1, bucket2);
404             } else {
405                 let bucket2 = &(*hashtable).entries[hash1];
406                 bucket2.mutex.lock();
407                 return (bucket2, bucket1);
408             }
409         }
410 
411         // Unlock the bucket and try again
412         bucket1.mutex.unlock();
413     }
414 }
415 
416 // Unlock a pair of buckets
417 #[inline]
unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket)418 unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) {
419     bucket1.mutex.unlock();
420     if !ptr::eq(bucket1, bucket2) {
421         bucket2.mutex.unlock();
422     }
423 }
424 
425 /// Result of a park operation.
426 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
427 pub enum ParkResult {
428     /// We were unparked by another thread with the given token.
429     Unparked(UnparkToken),
430 
431     /// The validation callback returned false.
432     Invalid,
433 
434     /// The timeout expired.
435     TimedOut,
436 }
437 
438 impl ParkResult {
439     /// Returns true if we were unparked by another thread.
440     #[inline]
is_unparked(self) -> bool441     pub fn is_unparked(self) -> bool {
442         if let ParkResult::Unparked(_) = self {
443             true
444         } else {
445             false
446         }
447     }
448 }
449 
450 /// Result of an unpark operation.
451 #[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
452 pub struct UnparkResult {
453     /// The number of threads that were unparked.
454     pub unparked_threads: usize,
455 
456     /// The number of threads that were requeued.
457     pub requeued_threads: usize,
458 
459     /// Whether there are any threads remaining in the queue. This only returns
460     /// true if a thread was unparked.
461     pub have_more_threads: bool,
462 
463     /// This is set to true on average once every 0.5ms for any given key. It
464     /// should be used to switch to a fair unlocking mechanism for a particular
465     /// unlock.
466     pub be_fair: bool,
467 
468     /// Private field so new fields can be added without breakage.
469     _sealed: (),
470 }
471 
472 /// Operation that `unpark_requeue` should perform.
473 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
474 pub enum RequeueOp {
475     /// Abort the operation without doing anything.
476     Abort,
477 
478     /// Unpark one thread and requeue the rest onto the target queue.
479     UnparkOneRequeueRest,
480 
481     /// Requeue all threads onto the target queue.
482     RequeueAll,
483 
484     /// Unpark one thread and leave the rest parked. No requeuing is done.
485     UnparkOne,
486 
487     /// Requeue one thread and leave the rest parked on the original queue.
488     RequeueOne,
489 }
490 
491 /// Operation that `unpark_filter` should perform for each thread.
492 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
493 pub enum FilterOp {
494     /// Unpark the thread and continue scanning the list of parked threads.
495     Unpark,
496 
497     /// Don't unpark the thread and continue scanning the list of parked threads.
498     Skip,
499 
500     /// Don't unpark the thread and stop scanning the list of parked threads.
501     Stop,
502 }
503 
504 /// A value which is passed from an unparker to a parked thread.
505 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
506 pub struct UnparkToken(pub usize);
507 
508 /// A value associated with a parked thread which can be used by `unpark_filter`.
509 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
510 pub struct ParkToken(pub usize);
511 
512 /// A default unpark token to use.
513 pub const DEFAULT_UNPARK_TOKEN: UnparkToken = UnparkToken(0);
514 
515 /// A default park token to use.
516 pub const DEFAULT_PARK_TOKEN: ParkToken = ParkToken(0);
517 
518 /// Parks the current thread in the queue associated with the given key.
519 ///
520 /// The `validate` function is called while the queue is locked and can abort
521 /// the operation by returning false. If `validate` returns true then the
522 /// current thread is appended to the queue and the queue is unlocked.
523 ///
524 /// The `before_sleep` function is called after the queue is unlocked but before
525 /// the thread is put to sleep. The thread will then sleep until it is unparked
526 /// or the given timeout is reached.
527 ///
528 /// The `timed_out` function is also called while the queue is locked, but only
529 /// if the timeout was reached. It is passed the key of the queue it was in when
530 /// it timed out, which may be different from the original key if
531 /// `unpark_requeue` was called. It is also passed a bool which indicates
532 /// whether it was the last thread in the queue.
533 ///
534 /// # Safety
535 ///
536 /// You should only call this function with an address that you control, since
537 /// you could otherwise interfere with the operation of other synchronization
538 /// primitives.
539 ///
540 /// The `validate` and `timed_out` functions are called while the queue is
541 /// locked and must not panic or call into any function in `parking_lot`.
542 ///
543 /// The `before_sleep` function is called outside the queue lock and is allowed
544 /// to call `unpark_one`, `unpark_all`, `unpark_requeue` or `unpark_filter`, but
545 /// it is not allowed to call `park` or panic.
546 #[inline]
park( key: usize, validate: impl FnOnce() -> bool, before_sleep: impl FnOnce(), timed_out: impl FnOnce(usize, bool), park_token: ParkToken, timeout: Option<Instant>, ) -> ParkResult547 pub unsafe fn park(
548     key: usize,
549     validate: impl FnOnce() -> bool,
550     before_sleep: impl FnOnce(),
551     timed_out: impl FnOnce(usize, bool),
552     park_token: ParkToken,
553     timeout: Option<Instant>,
554 ) -> ParkResult {
555     // Grab our thread data, this also ensures that the hash table exists
556     with_thread_data(|thread_data| {
557         // Lock the bucket for the given key
558         let bucket = lock_bucket(key);
559 
560         // If the validation function fails, just return
561         if !validate() {
562             bucket.mutex.unlock();
563             return ParkResult::Invalid;
564         }
565 
566         // Append our thread data to the queue and unlock the bucket
567         thread_data.parked_with_timeout.set(timeout.is_some());
568         thread_data.next_in_queue.set(ptr::null());
569         thread_data.key.store(key, Ordering::Relaxed);
570         thread_data.park_token.set(park_token);
571         thread_data.parker.prepare_park();
572         if !bucket.queue_head.get().is_null() {
573             (*bucket.queue_tail.get()).next_in_queue.set(thread_data);
574         } else {
575             bucket.queue_head.set(thread_data);
576         }
577         bucket.queue_tail.set(thread_data);
578         bucket.mutex.unlock();
579 
580         // Invoke the pre-sleep callback
581         before_sleep();
582 
583         // Park our thread and determine whether we were woken up by an unpark
584         // or by our timeout. Note that this isn't precise: we can still be
585         // unparked since we are still in the queue.
586         let unparked = match timeout {
587             Some(timeout) => thread_data.parker.park_until(timeout),
588             None => {
589                 thread_data.parker.park();
590                 // call deadlock detection on_unpark hook
591                 deadlock::on_unpark(thread_data);
592                 true
593             }
594         };
595 
596         // If we were unparked, return now
597         if unparked {
598             return ParkResult::Unparked(thread_data.unpark_token.get());
599         }
600 
601         // Lock our bucket again. Note that the hashtable may have been rehashed in
602         // the meantime. Our key may also have changed if we were requeued.
603         let (key, bucket) = lock_bucket_checked(&thread_data.key);
604 
605         // Now we need to check again if we were unparked or timed out. Unlike the
606         // last check this is precise because we hold the bucket lock.
607         if !thread_data.parker.timed_out() {
608             bucket.mutex.unlock();
609             return ParkResult::Unparked(thread_data.unpark_token.get());
610         }
611 
612         // We timed out, so we now need to remove our thread from the queue
613         let mut link = &bucket.queue_head;
614         let mut current = bucket.queue_head.get();
615         let mut previous = ptr::null();
616         let mut was_last_thread = true;
617         while !current.is_null() {
618             if current == thread_data {
619                 let next = (*current).next_in_queue.get();
620                 link.set(next);
621                 if bucket.queue_tail.get() == current {
622                     bucket.queue_tail.set(previous);
623                 } else {
624                     // Scan the rest of the queue to see if there are any other
625                     // entries with the given key.
626                     let mut scan = next;
627                     while !scan.is_null() {
628                         if (*scan).key.load(Ordering::Relaxed) == key {
629                             was_last_thread = false;
630                             break;
631                         }
632                         scan = (*scan).next_in_queue.get();
633                     }
634                 }
635 
636                 // Callback to indicate that we timed out, and whether we were the
637                 // last thread on the queue.
638                 timed_out(key, was_last_thread);
639                 break;
640             } else {
641                 if (*current).key.load(Ordering::Relaxed) == key {
642                     was_last_thread = false;
643                 }
644                 link = &(*current).next_in_queue;
645                 previous = current;
646                 current = link.get();
647             }
648         }
649 
650         // There should be no way for our thread to have been removed from the queue
651         // if we timed out.
652         debug_assert!(!current.is_null());
653 
654         // Unlock the bucket, we are done
655         bucket.mutex.unlock();
656         ParkResult::TimedOut
657     })
658 }
659 
660 /// Unparks one thread from the queue associated with the given key.
661 ///
662 /// The `callback` function is called while the queue is locked and before the
663 /// target thread is woken up. The `UnparkResult` argument to the function
664 /// indicates whether a thread was found in the queue and whether this was the
665 /// last thread in the queue. This value is also returned by `unpark_one`.
666 ///
667 /// The `callback` function should return an `UnparkToken` value which will be
668 /// passed to the thread that is unparked. If no thread is unparked then the
669 /// returned value is ignored.
670 ///
671 /// # Safety
672 ///
673 /// You should only call this function with an address that you control, since
674 /// you could otherwise interfere with the operation of other synchronization
675 /// primitives.
676 ///
677 /// The `callback` function is called while the queue is locked and must not
678 /// panic or call into any function in `parking_lot`.
679 #[inline]
unpark_one( key: usize, callback: impl FnOnce(UnparkResult) -> UnparkToken, ) -> UnparkResult680 pub unsafe fn unpark_one(
681     key: usize,
682     callback: impl FnOnce(UnparkResult) -> UnparkToken,
683 ) -> UnparkResult {
684     // Lock the bucket for the given key
685     let bucket = lock_bucket(key);
686 
687     // Find a thread with a matching key and remove it from the queue
688     let mut link = &bucket.queue_head;
689     let mut current = bucket.queue_head.get();
690     let mut previous = ptr::null();
691     let mut result = UnparkResult::default();
692     while !current.is_null() {
693         if (*current).key.load(Ordering::Relaxed) == key {
694             // Remove the thread from the queue
695             let next = (*current).next_in_queue.get();
696             link.set(next);
697             if bucket.queue_tail.get() == current {
698                 bucket.queue_tail.set(previous);
699             } else {
700                 // Scan the rest of the queue to see if there are any other
701                 // entries with the given key.
702                 let mut scan = next;
703                 while !scan.is_null() {
704                     if (*scan).key.load(Ordering::Relaxed) == key {
705                         result.have_more_threads = true;
706                         break;
707                     }
708                     scan = (*scan).next_in_queue.get();
709                 }
710             }
711 
712             // Invoke the callback before waking up the thread
713             result.unparked_threads = 1;
714             result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
715             let token = callback(result);
716 
717             // Set the token for the target thread
718             (*current).unpark_token.set(token);
719 
720             // This is a bit tricky: we first lock the ThreadParker to prevent
721             // the thread from exiting and freeing its ThreadData if its wait
722             // times out. Then we unlock the queue since we don't want to keep
723             // the queue locked while we perform a system call. Finally we wake
724             // up the parked thread.
725             let handle = (*current).parker.unpark_lock();
726             bucket.mutex.unlock();
727             handle.unpark();
728 
729             return result;
730         } else {
731             link = &(*current).next_in_queue;
732             previous = current;
733             current = link.get();
734         }
735     }
736 
737     // No threads with a matching key were found in the bucket
738     callback(result);
739     bucket.mutex.unlock();
740     result
741 }
742 
743 /// Unparks all threads in the queue associated with the given key.
744 ///
745 /// The given `UnparkToken` is passed to all unparked threads.
746 ///
747 /// This function returns the number of threads that were unparked.
748 ///
749 /// # Safety
750 ///
751 /// You should only call this function with an address that you control, since
752 /// you could otherwise interfere with the operation of other synchronization
753 /// primitives.
754 #[inline]
unpark_all(key: usize, unpark_token: UnparkToken) -> usize755 pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize {
756     // Lock the bucket for the given key
757     let bucket = lock_bucket(key);
758 
759     // Remove all threads with the given key in the bucket
760     let mut link = &bucket.queue_head;
761     let mut current = bucket.queue_head.get();
762     let mut previous = ptr::null();
763     let mut threads = SmallVec::<[_; 8]>::new();
764     while !current.is_null() {
765         if (*current).key.load(Ordering::Relaxed) == key {
766             // Remove the thread from the queue
767             let next = (*current).next_in_queue.get();
768             link.set(next);
769             if bucket.queue_tail.get() == current {
770                 bucket.queue_tail.set(previous);
771             }
772 
773             // Set the token for the target thread
774             (*current).unpark_token.set(unpark_token);
775 
776             // Don't wake up threads while holding the queue lock. See comment
777             // in unpark_one. For now just record which threads we need to wake
778             // up.
779             threads.push((*current).parker.unpark_lock());
780             current = next;
781         } else {
782             link = &(*current).next_in_queue;
783             previous = current;
784             current = link.get();
785         }
786     }
787 
788     // Unlock the bucket
789     bucket.mutex.unlock();
790 
791     // Now that we are outside the lock, wake up all the threads that we removed
792     // from the queue.
793     let num_threads = threads.len();
794     for handle in threads.into_iter() {
795         handle.unpark();
796     }
797 
798     num_threads
799 }
800 
801 /// Removes all threads from the queue associated with `key_from`, optionally
802 /// unparks the first one and requeues the rest onto the queue associated with
803 /// `key_to`.
804 ///
805 /// The `validate` function is called while both queues are locked. Its return
806 /// value will determine which operation is performed, or whether the operation
807 /// should be aborted. See `RequeueOp` for details about the different possible
808 /// return values.
809 ///
810 /// The `callback` function is also called while both queues are locked. It is
811 /// passed the `RequeueOp` returned by `validate` and an `UnparkResult`
812 /// indicating whether a thread was unparked and whether there are threads still
813 /// parked in the new queue. This `UnparkResult` value is also returned by
814 /// `unpark_requeue`.
815 ///
816 /// The `callback` function should return an `UnparkToken` value which will be
817 /// passed to the thread that is unparked. If no thread is unparked then the
818 /// returned value is ignored.
819 ///
820 /// # Safety
821 ///
822 /// You should only call this function with an address that you control, since
823 /// you could otherwise interfere with the operation of other synchronization
824 /// primitives.
825 ///
826 /// The `validate` and `callback` functions are called while the queue is locked
827 /// and must not panic or call into any function in `parking_lot`.
828 #[inline]
unpark_requeue( key_from: usize, key_to: usize, validate: impl FnOnce() -> RequeueOp, callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken, ) -> UnparkResult829 pub unsafe fn unpark_requeue(
830     key_from: usize,
831     key_to: usize,
832     validate: impl FnOnce() -> RequeueOp,
833     callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken,
834 ) -> UnparkResult {
835     // Lock the two buckets for the given key
836     let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to);
837 
838     // If the validation function fails, just return
839     let mut result = UnparkResult::default();
840     let op = validate();
841     if op == RequeueOp::Abort {
842         unlock_bucket_pair(bucket_from, bucket_to);
843         return result;
844     }
845 
846     // Remove all threads with the given key in the source bucket
847     let mut link = &bucket_from.queue_head;
848     let mut current = bucket_from.queue_head.get();
849     let mut previous = ptr::null();
850     let mut requeue_threads: *const ThreadData = ptr::null();
851     let mut requeue_threads_tail: *const ThreadData = ptr::null();
852     let mut wakeup_thread = None;
853     while !current.is_null() {
854         if (*current).key.load(Ordering::Relaxed) == key_from {
855             // Remove the thread from the queue
856             let next = (*current).next_in_queue.get();
857             link.set(next);
858             if bucket_from.queue_tail.get() == current {
859                 bucket_from.queue_tail.set(previous);
860             }
861 
862             // Prepare the first thread for wakeup and requeue the rest.
863             if (op == RequeueOp::UnparkOneRequeueRest || op == RequeueOp::UnparkOne)
864                 && wakeup_thread.is_none()
865             {
866                 wakeup_thread = Some(current);
867                 result.unparked_threads = 1;
868             } else {
869                 if !requeue_threads.is_null() {
870                     (*requeue_threads_tail).next_in_queue.set(current);
871                 } else {
872                     requeue_threads = current;
873                 }
874                 requeue_threads_tail = current;
875                 (*current).key.store(key_to, Ordering::Relaxed);
876                 result.requeued_threads += 1;
877             }
878             if op == RequeueOp::UnparkOne || op == RequeueOp::RequeueOne {
879                 // Scan the rest of the queue to see if there are any other
880                 // entries with the given key.
881                 let mut scan = next;
882                 while !scan.is_null() {
883                     if (*scan).key.load(Ordering::Relaxed) == key_from {
884                         result.have_more_threads = true;
885                         break;
886                     }
887                     scan = (*scan).next_in_queue.get();
888                 }
889                 break;
890             }
891             current = next;
892         } else {
893             link = &(*current).next_in_queue;
894             previous = current;
895             current = link.get();
896         }
897     }
898 
899     // Add the requeued threads to the destination bucket
900     if !requeue_threads.is_null() {
901         (*requeue_threads_tail).next_in_queue.set(ptr::null());
902         if !bucket_to.queue_head.get().is_null() {
903             (*bucket_to.queue_tail.get())
904                 .next_in_queue
905                 .set(requeue_threads);
906         } else {
907             bucket_to.queue_head.set(requeue_threads);
908         }
909         bucket_to.queue_tail.set(requeue_threads_tail);
910     }
911 
912     // Invoke the callback before waking up the thread
913     if result.unparked_threads != 0 {
914         result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout();
915     }
916     let token = callback(op, result);
917 
918     // See comment in unpark_one for why we mess with the locking
919     if let Some(wakeup_thread) = wakeup_thread {
920         (*wakeup_thread).unpark_token.set(token);
921         let handle = (*wakeup_thread).parker.unpark_lock();
922         unlock_bucket_pair(bucket_from, bucket_to);
923         handle.unpark();
924     } else {
925         unlock_bucket_pair(bucket_from, bucket_to);
926     }
927 
928     result
929 }
930 
931 /// Unparks a number of threads from the front of the queue associated with
932 /// `key` depending on the results of a filter function which inspects the
933 /// `ParkToken` associated with each thread.
934 ///
935 /// The `filter` function is called for each thread in the queue or until
936 /// `FilterOp::Stop` is returned. This function is passed the `ParkToken`
937 /// associated with a particular thread, which is unparked if `FilterOp::Unpark`
938 /// is returned.
939 ///
940 /// The `callback` function is also called while both queues are locked. It is
941 /// passed an `UnparkResult` indicating the number of threads that were unparked
942 /// and whether there are still parked threads in the queue. This `UnparkResult`
943 /// value is also returned by `unpark_filter`.
944 ///
945 /// The `callback` function should return an `UnparkToken` value which will be
946 /// passed to all threads that are unparked. If no thread is unparked then the
947 /// returned value is ignored.
948 ///
949 /// # Safety
950 ///
951 /// You should only call this function with an address that you control, since
952 /// you could otherwise interfere with the operation of other synchronization
953 /// primitives.
954 ///
955 /// The `filter` and `callback` functions are called while the queue is locked
956 /// and must not panic or call into any function in `parking_lot`.
957 #[inline]
unpark_filter( key: usize, mut filter: impl FnMut(ParkToken) -> FilterOp, callback: impl FnOnce(UnparkResult) -> UnparkToken, ) -> UnparkResult958 pub unsafe fn unpark_filter(
959     key: usize,
960     mut filter: impl FnMut(ParkToken) -> FilterOp,
961     callback: impl FnOnce(UnparkResult) -> UnparkToken,
962 ) -> UnparkResult {
963     // Lock the bucket for the given key
964     let bucket = lock_bucket(key);
965 
966     // Go through the queue looking for threads with a matching key
967     let mut link = &bucket.queue_head;
968     let mut current = bucket.queue_head.get();
969     let mut previous = ptr::null();
970     let mut threads = SmallVec::<[_; 8]>::new();
971     let mut result = UnparkResult::default();
972     while !current.is_null() {
973         if (*current).key.load(Ordering::Relaxed) == key {
974             // Call the filter function with the thread's ParkToken
975             let next = (*current).next_in_queue.get();
976             match filter((*current).park_token.get()) {
977                 FilterOp::Unpark => {
978                     // Remove the thread from the queue
979                     link.set(next);
980                     if bucket.queue_tail.get() == current {
981                         bucket.queue_tail.set(previous);
982                     }
983 
984                     // Add the thread to our list of threads to unpark
985                     threads.push((current, None));
986 
987                     current = next;
988                 }
989                 FilterOp::Skip => {
990                     result.have_more_threads = true;
991                     link = &(*current).next_in_queue;
992                     previous = current;
993                     current = link.get();
994                 }
995                 FilterOp::Stop => {
996                     result.have_more_threads = true;
997                     break;
998                 }
999             }
1000         } else {
1001             link = &(*current).next_in_queue;
1002             previous = current;
1003             current = link.get();
1004         }
1005     }
1006 
1007     // Invoke the callback before waking up the threads
1008     result.unparked_threads = threads.len();
1009     if result.unparked_threads != 0 {
1010         result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
1011     }
1012     let token = callback(result);
1013 
1014     // Pass the token to all threads that are going to be unparked and prepare
1015     // them for unparking.
1016     for t in threads.iter_mut() {
1017         (*t.0).unpark_token.set(token);
1018         t.1 = Some((*t.0).parker.unpark_lock());
1019     }
1020 
1021     bucket.mutex.unlock();
1022 
1023     // Now that we are outside the lock, wake up all the threads that we removed
1024     // from the queue.
1025     for (_, handle) in threads.into_iter() {
1026         handle.unchecked_unwrap().unpark();
1027     }
1028 
1029     result
1030 }
1031 
1032 /// \[Experimental\] Deadlock detection
1033 ///
1034 /// Enabled via the `deadlock_detection` feature flag.
1035 pub mod deadlock {
1036     #[cfg(feature = "deadlock_detection")]
1037     use super::deadlock_impl;
1038 
1039     #[cfg(feature = "deadlock_detection")]
1040     pub(super) use super::deadlock_impl::DeadlockData;
1041 
1042     /// Acquire a resource identified by key in the deadlock detector
1043     /// Noop if deadlock_detection feature isn't enabled.
1044     /// Note: Call after the resource is acquired
1045     #[inline]
acquire_resource(_key: usize)1046     pub unsafe fn acquire_resource(_key: usize) {
1047         #[cfg(feature = "deadlock_detection")]
1048         deadlock_impl::acquire_resource(_key);
1049     }
1050 
1051     /// Release a resource identified by key in the deadlock detector.
1052     /// Noop if deadlock_detection feature isn't enabled.
1053     /// Note: Call before the resource is released
1054     /// # Panics
1055     /// Panics if the resource was already released or wasn't acquired in this thread.
1056     #[inline]
release_resource(_key: usize)1057     pub unsafe fn release_resource(_key: usize) {
1058         #[cfg(feature = "deadlock_detection")]
1059         deadlock_impl::release_resource(_key);
1060     }
1061 
1062     /// Returns all deadlocks detected *since* the last call.
1063     /// Each cycle consist of a vector of `DeadlockedThread`.
1064     #[cfg(feature = "deadlock_detection")]
1065     #[inline]
check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>>1066     pub fn check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>> {
1067         deadlock_impl::check_deadlock()
1068     }
1069 
1070     #[inline]
on_unpark(_td: &super::ThreadData)1071     pub(super) unsafe fn on_unpark(_td: &super::ThreadData) {
1072         #[cfg(feature = "deadlock_detection")]
1073         deadlock_impl::on_unpark(_td);
1074     }
1075 }
1076 
1077 #[cfg(feature = "deadlock_detection")]
1078 mod deadlock_impl {
1079     use super::{get_hashtable, lock_bucket, with_thread_data, ThreadData, NUM_THREADS};
1080     use crate::thread_parker::{ThreadParkerT, UnparkHandleT};
1081     use crate::word_lock::WordLock;
1082     use backtrace::Backtrace;
1083     use petgraph;
1084     use petgraph::graphmap::DiGraphMap;
1085     use std::cell::{Cell, UnsafeCell};
1086     use std::collections::HashSet;
1087     use std::sync::atomic::Ordering;
1088     use std::sync::mpsc;
1089     use thread_id;
1090 
1091     /// Representation of a deadlocked thread
1092     pub struct DeadlockedThread {
1093         thread_id: usize,
1094         backtrace: Backtrace,
1095     }
1096 
1097     impl DeadlockedThread {
1098         /// The system thread id
thread_id(&self) -> usize1099         pub fn thread_id(&self) -> usize {
1100             self.thread_id
1101         }
1102 
1103         /// The thread backtrace
backtrace(&self) -> &Backtrace1104         pub fn backtrace(&self) -> &Backtrace {
1105             &self.backtrace
1106         }
1107     }
1108 
1109     pub struct DeadlockData {
1110         // Currently owned resources (keys)
1111         resources: UnsafeCell<Vec<usize>>,
1112 
1113         // Set when there's a pending callstack request
1114         deadlocked: Cell<bool>,
1115 
1116         // Sender used to report the backtrace
1117         backtrace_sender: UnsafeCell<Option<mpsc::Sender<DeadlockedThread>>>,
1118 
1119         // System thread id
1120         thread_id: usize,
1121     }
1122 
1123     impl DeadlockData {
new() -> Self1124         pub fn new() -> Self {
1125             DeadlockData {
1126                 resources: UnsafeCell::new(Vec::new()),
1127                 deadlocked: Cell::new(false),
1128                 backtrace_sender: UnsafeCell::new(None),
1129                 thread_id: thread_id::get(),
1130             }
1131         }
1132     }
1133 
on_unpark(td: &ThreadData)1134     pub(super) unsafe fn on_unpark(td: &ThreadData) {
1135         if td.deadlock_data.deadlocked.get() {
1136             let sender = (*td.deadlock_data.backtrace_sender.get()).take().unwrap();
1137             sender
1138                 .send(DeadlockedThread {
1139                     thread_id: td.deadlock_data.thread_id,
1140                     backtrace: Backtrace::new(),
1141                 })
1142                 .unwrap();
1143             // make sure to close this sender
1144             drop(sender);
1145 
1146             // park until the end of the time
1147             td.parker.prepare_park();
1148             td.parker.park();
1149             unreachable!("unparked deadlocked thread!");
1150         }
1151     }
1152 
acquire_resource(key: usize)1153     pub unsafe fn acquire_resource(key: usize) {
1154         with_thread_data(|thread_data| {
1155             (*thread_data.deadlock_data.resources.get()).push(key);
1156         });
1157     }
1158 
release_resource(key: usize)1159     pub unsafe fn release_resource(key: usize) {
1160         with_thread_data(|thread_data| {
1161             let resources = &mut (*thread_data.deadlock_data.resources.get());
1162             match resources.iter().rposition(|x| *x == key) {
1163                 Some(p) => resources.swap_remove(p),
1164                 None => panic!("key {} not found in thread resources", key),
1165             };
1166         });
1167     }
1168 
check_deadlock() -> Vec<Vec<DeadlockedThread>>1169     pub fn check_deadlock() -> Vec<Vec<DeadlockedThread>> {
1170         unsafe {
1171             // fast pass
1172             if check_wait_graph_fast() {
1173                 // double check
1174                 check_wait_graph_slow()
1175             } else {
1176                 Vec::new()
1177             }
1178         }
1179     }
1180 
1181     // Simple algorithm that builds a wait graph f the threads and the resources,
1182     // then checks for the presence of cycles (deadlocks).
1183     // This variant isn't precise as it doesn't lock the entire table before checking
check_wait_graph_fast() -> bool1184     unsafe fn check_wait_graph_fast() -> bool {
1185         let table = get_hashtable();
1186         let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1187         let mut graph = DiGraphMap::<usize, ()>::with_capacity(thread_count * 2, thread_count * 2);
1188 
1189         for b in &(*table).entries[..] {
1190             b.mutex.lock();
1191             let mut current = b.queue_head.get();
1192             while !current.is_null() {
1193                 if !(*current).parked_with_timeout.get()
1194                     && !(*current).deadlock_data.deadlocked.get()
1195                 {
1196                     // .resources are waiting for their owner
1197                     for &resource in &(*(*current).deadlock_data.resources.get()) {
1198                         graph.add_edge(resource, current as usize, ());
1199                     }
1200                     // owner waits for resource .key
1201                     graph.add_edge(current as usize, (*current).key.load(Ordering::Relaxed), ());
1202                 }
1203                 current = (*current).next_in_queue.get();
1204             }
1205             b.mutex.unlock();
1206         }
1207 
1208         petgraph::algo::is_cyclic_directed(&graph)
1209     }
1210 
1211     #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
1212     enum WaitGraphNode {
1213         Thread(*const ThreadData),
1214         Resource(usize),
1215     }
1216 
1217     use self::WaitGraphNode::*;
1218 
1219     // Contrary to the _fast variant this locks the entries table before looking for cycles.
1220     // Returns all detected thread wait cycles.
1221     // Note that once a cycle is reported it's never reported again.
check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>>1222     unsafe fn check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>> {
1223         static DEADLOCK_DETECTION_LOCK: WordLock = WordLock::INIT;
1224         DEADLOCK_DETECTION_LOCK.lock();
1225 
1226         let mut table = get_hashtable();
1227         loop {
1228             // Lock all buckets in the old table
1229             for b in &(*table).entries[..] {
1230                 b.mutex.lock();
1231             }
1232 
1233             // Now check if our table is still the latest one. Another thread could
1234             // have grown the hash table between us getting and locking the hash table.
1235             let new_table = get_hashtable();
1236             if new_table == table {
1237                 break;
1238             }
1239 
1240             // Unlock buckets and try again
1241             for b in &(*table).entries[..] {
1242                 b.mutex.unlock();
1243             }
1244 
1245             table = new_table;
1246         }
1247 
1248         let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1249         let mut graph =
1250             DiGraphMap::<WaitGraphNode, ()>::with_capacity(thread_count * 2, thread_count * 2);
1251 
1252         for b in &(*table).entries[..] {
1253             let mut current = b.queue_head.get();
1254             while !current.is_null() {
1255                 if !(*current).parked_with_timeout.get()
1256                     && !(*current).deadlock_data.deadlocked.get()
1257                 {
1258                     // .resources are waiting for their owner
1259                     for &resource in &(*(*current).deadlock_data.resources.get()) {
1260                         graph.add_edge(Resource(resource), Thread(current), ());
1261                     }
1262                     // owner waits for resource .key
1263                     graph.add_edge(
1264                         Thread(current),
1265                         Resource((*current).key.load(Ordering::Relaxed)),
1266                         (),
1267                     );
1268                 }
1269                 current = (*current).next_in_queue.get();
1270             }
1271         }
1272 
1273         for b in &(*table).entries[..] {
1274             b.mutex.unlock();
1275         }
1276 
1277         // find cycles
1278         let cycles = graph_cycles(&graph);
1279 
1280         let mut results = Vec::with_capacity(cycles.len());
1281 
1282         for cycle in cycles {
1283             let (sender, receiver) = mpsc::channel();
1284             for td in cycle {
1285                 let bucket = lock_bucket((*td).key.load(Ordering::Relaxed));
1286                 (*td).deadlock_data.deadlocked.set(true);
1287                 *(*td).deadlock_data.backtrace_sender.get() = Some(sender.clone());
1288                 let handle = (*td).parker.unpark_lock();
1289                 bucket.mutex.unlock();
1290                 // unpark the deadlocked thread!
1291                 // on unpark it'll notice the deadlocked flag and report back
1292                 handle.unpark();
1293             }
1294             // make sure to drop our sender before collecting results
1295             drop(sender);
1296             results.push(receiver.iter().collect());
1297         }
1298 
1299         DEADLOCK_DETECTION_LOCK.unlock();
1300 
1301         results
1302     }
1303 
1304     // normalize a cycle to start with the "smallest" node
normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T>1305     fn normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T> {
1306         let min_pos = input
1307             .iter()
1308             .enumerate()
1309             .min_by_key(|&(_, &t)| t)
1310             .map(|(p, _)| p)
1311             .unwrap_or(0);
1312         input
1313             .iter()
1314             .cycle()
1315             .skip(min_pos)
1316             .take(input.len())
1317             .cloned()
1318             .collect()
1319     }
1320 
1321     // returns all thread cycles in the wait graph
graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>>1322     fn graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>> {
1323         use petgraph::visit::depth_first_search;
1324         use petgraph::visit::DfsEvent;
1325         use petgraph::visit::NodeIndexable;
1326 
1327         let mut cycles = HashSet::new();
1328         let mut path = Vec::with_capacity(g.node_bound());
1329         // start from threads to get the correct threads cycle
1330         let threads = g
1331             .nodes()
1332             .filter(|n| if let &Thread(_) = n { true } else { false });
1333 
1334         depth_first_search(g, threads, |e| match e {
1335             DfsEvent::Discover(Thread(n), _) => path.push(n),
1336             DfsEvent::Finish(Thread(_), _) => {
1337                 path.pop();
1338             }
1339             DfsEvent::BackEdge(_, Thread(n)) => {
1340                 let from = path.iter().rposition(|&i| i == n).unwrap();
1341                 cycles.insert(normalize_cycle(&path[from..]));
1342             }
1343             _ => (),
1344         });
1345 
1346         cycles.iter().cloned().collect()
1347     }
1348 }
1349