1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7
8 use rand::rngs::SmallRng;
9 use rand::{FromEntropy, Rng};
10 use smallvec::SmallVec;
11 use std::cell::{Cell, UnsafeCell};
12 use std::mem;
13 #[cfg(not(has_localkey_try_with))]
14 use std::panic;
15 use std::ptr;
16 use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
17 use std::thread::LocalKey;
18 use std::time::{Duration, Instant};
19 use thread_parker::ThreadParker;
20 use util::UncheckedOptionExt;
21 use word_lock::WordLock;
22
23 static NUM_THREADS: AtomicUsize = ATOMIC_USIZE_INIT;
24 static HASHTABLE: AtomicUsize = ATOMIC_USIZE_INIT;
25
26 // Even with 3x more buckets than threads, the memory overhead per thread is
27 // still only a few hundred bytes per thread.
28 const LOAD_FACTOR: usize = 3;
29
30 struct HashTable {
31 // Hash buckets for the table
32 entries: Box<[Bucket]>,
33
34 // Number of bits used for the hash function
35 hash_bits: u32,
36
37 // Previous table. This is only kept to keep leak detectors happy.
38 _prev: *const HashTable,
39 }
40
41 impl HashTable {
new(num_threads: usize, prev: *const HashTable) -> Box<HashTable>42 fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> {
43 let new_size = (num_threads * LOAD_FACTOR).next_power_of_two();
44 let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1;
45 let bucket = Bucket {
46 mutex: WordLock::new(),
47 queue_head: Cell::new(ptr::null()),
48 queue_tail: Cell::new(ptr::null()),
49 fair_timeout: UnsafeCell::new(FairTimeout::new()),
50 _padding: unsafe { mem::uninitialized() },
51 };
52 Box::new(HashTable {
53 entries: vec![bucket; new_size].into_boxed_slice(),
54 hash_bits: hash_bits,
55 _prev: prev,
56 })
57 }
58 }
59
60 struct Bucket {
61 // Lock protecting the queue
62 mutex: WordLock,
63
64 // Linked list of threads waiting on this bucket
65 queue_head: Cell<*const ThreadData>,
66 queue_tail: Cell<*const ThreadData>,
67
68 // Next time at which point be_fair should be set
69 fair_timeout: UnsafeCell<FairTimeout>,
70
71 // Padding to avoid false sharing between buckets. Ideally we would just
72 // align the bucket structure to 64 bytes, but Rust doesn't support that
73 // yet.
74 _padding: [u8; 64],
75 }
76
77 // Implementation of Clone for Bucket, needed to make vec![] work
78 impl Clone for Bucket {
clone(&self) -> Bucket79 fn clone(&self) -> Bucket {
80 Bucket {
81 mutex: WordLock::new(),
82 queue_head: Cell::new(ptr::null()),
83 queue_tail: Cell::new(ptr::null()),
84 fair_timeout: UnsafeCell::new(FairTimeout::new()),
85 _padding: unsafe { mem::uninitialized() },
86 }
87 }
88 }
89
90 struct FairTimeout {
91 // Next time at which point be_fair should be set
92 timeout: Instant,
93
94 // Random number generator for calculating the next timeout
95 rng: SmallRng,
96 }
97
98 impl FairTimeout {
new() -> FairTimeout99 fn new() -> FairTimeout {
100 FairTimeout {
101 timeout: Instant::now(),
102 rng: SmallRng::from_entropy(),
103 }
104 }
105
106 // Determine whether we should force a fair unlock, and update the timeout
should_timeout(&mut self) -> bool107 fn should_timeout(&mut self) -> bool {
108 let now = Instant::now();
109 if now > self.timeout {
110 self.timeout = now + Duration::new(0, self.rng.gen_range(0, 1000000));
111 true
112 } else {
113 false
114 }
115 }
116 }
117
118 struct ThreadData {
119 parker: ThreadParker,
120
121 // Key that this thread is sleeping on. This may change if the thread is
122 // requeued to a different key.
123 key: AtomicUsize,
124
125 // Linked list of parked threads in a bucket
126 next_in_queue: Cell<*const ThreadData>,
127
128 // UnparkToken passed to this thread when it is unparked
129 unpark_token: Cell<UnparkToken>,
130
131 // ParkToken value set by the thread when it was parked
132 park_token: Cell<ParkToken>,
133
134 // Is the thread parked with a timeout?
135 parked_with_timeout: Cell<bool>,
136
137 // Extra data for deadlock detection
138 // TODO: once supported in stable replace with #[cfg...] & remove dummy struct/impl
139 #[allow(dead_code)]
140 deadlock_data: deadlock::DeadlockData,
141 }
142
143 impl ThreadData {
new() -> ThreadData144 fn new() -> ThreadData {
145 // Keep track of the total number of live ThreadData objects and resize
146 // the hash table accordingly.
147 let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1;
148 unsafe {
149 grow_hashtable(num_threads);
150 }
151
152 ThreadData {
153 parker: ThreadParker::new(),
154 key: AtomicUsize::new(0),
155 next_in_queue: Cell::new(ptr::null()),
156 unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN),
157 park_token: Cell::new(DEFAULT_PARK_TOKEN),
158 parked_with_timeout: Cell::new(false),
159 deadlock_data: deadlock::DeadlockData::new(),
160 }
161 }
162 }
163
164 // Returns a ThreadData structure for the current thread
get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData165 unsafe fn get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData {
166 // Try to read from thread-local storage, but return None if the TLS has
167 // already been destroyed.
168 #[cfg(has_localkey_try_with)]
169 fn try_get_tls(key: &'static LocalKey<ThreadData>) -> Option<*const ThreadData> {
170 key.try_with(|x| x as *const ThreadData).ok()
171 }
172 #[cfg(not(has_localkey_try_with))]
173 fn try_get_tls(key: &'static LocalKey<ThreadData>) -> Option<*const ThreadData> {
174 panic::catch_unwind(|| key.with(|x| x as *const ThreadData)).ok()
175 }
176
177 // Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive
178 // to construct. Try to use a thread-local version if possible.
179 thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
180 if let Some(tls) = try_get_tls(&THREAD_DATA) {
181 return &*tls;
182 }
183
184 // Otherwise just create a ThreadData on the stack
185 *local = Some(ThreadData::new());
186 local.as_ref().unwrap()
187 }
188
189 impl Drop for ThreadData {
drop(&mut self)190 fn drop(&mut self) {
191 NUM_THREADS.fetch_sub(1, Ordering::Relaxed);
192 }
193 }
194
195 // Get a pointer to the latest hash table, creating one if it doesn't exist yet.
get_hashtable() -> *const HashTable196 unsafe fn get_hashtable() -> *const HashTable {
197 let mut table = HASHTABLE.load(Ordering::Acquire);
198
199 // If there is no table, create one
200 if table == 0 {
201 let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null()));
202
203 // If this fails then it means some other thread created the hash
204 // table first.
205 match HASHTABLE.compare_exchange(
206 0,
207 new_table as usize,
208 Ordering::Release,
209 Ordering::Relaxed,
210 ) {
211 Ok(_) => return new_table,
212 Err(x) => table = x,
213 }
214
215 // Free the table we created
216 Box::from_raw(new_table);
217 }
218
219 table as *const HashTable
220 }
221
222 // Grow the hash table so that it is big enough for the given number of threads.
223 // This isn't performance-critical since it is only done when a ThreadData is
224 // created, which only happens once per thread.
grow_hashtable(num_threads: usize)225 unsafe fn grow_hashtable(num_threads: usize) {
226 // If there is no table, create one
227 if HASHTABLE.load(Ordering::Relaxed) == 0 {
228 let new_table = Box::into_raw(HashTable::new(num_threads, ptr::null()));
229
230 // If this fails then it means some other thread created the hash
231 // table first.
232 if HASHTABLE
233 .compare_exchange(0, new_table as usize, Ordering::Release, Ordering::Relaxed)
234 .is_ok()
235 {
236 return;
237 }
238
239 // Free the table we created
240 Box::from_raw(new_table);
241 }
242
243 let mut old_table;
244 loop {
245 old_table = HASHTABLE.load(Ordering::Acquire) as *mut HashTable;
246
247 // Check if we need to resize the existing table
248 if (*old_table).entries.len() >= LOAD_FACTOR * num_threads {
249 return;
250 }
251
252 // Lock all buckets in the old table
253 for b in &(*old_table).entries[..] {
254 b.mutex.lock();
255 }
256
257 // Now check if our table is still the latest one. Another thread could
258 // have grown the hash table between us reading HASHTABLE and locking
259 // the buckets.
260 if HASHTABLE.load(Ordering::Relaxed) == old_table as usize {
261 break;
262 }
263
264 // Unlock buckets and try again
265 for b in &(*old_table).entries[..] {
266 b.mutex.unlock();
267 }
268 }
269
270 // Create the new table
271 let new_table = HashTable::new(num_threads, old_table);
272
273 // Move the entries from the old table to the new one
274 for b in &(*old_table).entries[..] {
275 let mut current = b.queue_head.get();
276 while !current.is_null() {
277 let next = (*current).next_in_queue.get();
278 let hash = hash((*current).key.load(Ordering::Relaxed), new_table.hash_bits);
279 if new_table.entries[hash].queue_tail.get().is_null() {
280 new_table.entries[hash].queue_head.set(current);
281 } else {
282 (*new_table.entries[hash].queue_tail.get())
283 .next_in_queue
284 .set(current);
285 }
286 new_table.entries[hash].queue_tail.set(current);
287 (*current).next_in_queue.set(ptr::null());
288 current = next;
289 }
290 }
291
292 // Publish the new table. No races are possible at this point because
293 // any other thread trying to grow the hash table is blocked on the bucket
294 // locks in the old table.
295 HASHTABLE.store(Box::into_raw(new_table) as usize, Ordering::Release);
296
297 // Unlock all buckets in the old table
298 for b in &(*old_table).entries[..] {
299 b.mutex.unlock();
300 }
301 }
302
303 // Hash function for addresses
304 #[cfg(target_pointer_width = "32")]
hash(key: usize, bits: u32) -> usize305 fn hash(key: usize, bits: u32) -> usize {
306 key.wrapping_mul(0x9E3779B9) >> (32 - bits)
307 }
308 #[cfg(target_pointer_width = "64")]
hash(key: usize, bits: u32) -> usize309 fn hash(key: usize, bits: u32) -> usize {
310 key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits)
311 }
312
313 // Lock the bucket for the given key
lock_bucket<'a>(key: usize) -> &'a Bucket314 unsafe fn lock_bucket<'a>(key: usize) -> &'a Bucket {
315 let mut bucket;
316 loop {
317 let hashtable = get_hashtable();
318
319 let hash = hash(key, (*hashtable).hash_bits);
320 bucket = &(*hashtable).entries[hash];
321
322 // Lock the bucket
323 bucket.mutex.lock();
324
325 // If no other thread has rehashed the table before we grabbed the lock
326 // then we are good to go! The lock we grabbed prevents any rehashes.
327 if HASHTABLE.load(Ordering::Relaxed) == hashtable as usize {
328 return bucket;
329 }
330
331 // Unlock the bucket and try again
332 bucket.mutex.unlock();
333 }
334 }
335
336 // Lock the bucket for the given key, but check that the key hasn't been changed
337 // in the meantime due to a requeue.
lock_bucket_checked<'a>(key: &AtomicUsize) -> (usize, &'a Bucket)338 unsafe fn lock_bucket_checked<'a>(key: &AtomicUsize) -> (usize, &'a Bucket) {
339 let mut bucket;
340 loop {
341 let hashtable = get_hashtable();
342 let current_key = key.load(Ordering::Relaxed);
343
344 let hash = hash(current_key, (*hashtable).hash_bits);
345 bucket = &(*hashtable).entries[hash];
346
347 // Lock the bucket
348 bucket.mutex.lock();
349
350 // Check that both the hash table and key are correct while the bucket
351 // is locked. Note that the key can't change once we locked the proper
352 // bucket for it, so we just keep trying until we have the correct key.
353 if HASHTABLE.load(Ordering::Relaxed) == hashtable as usize
354 && key.load(Ordering::Relaxed) == current_key
355 {
356 return (current_key, bucket);
357 }
358
359 // Unlock the bucket and try again
360 bucket.mutex.unlock();
361 }
362 }
363
364 // Lock the two buckets for the given pair of keys
lock_bucket_pair<'a>(key1: usize, key2: usize) -> (&'a Bucket, &'a Bucket)365 unsafe fn lock_bucket_pair<'a>(key1: usize, key2: usize) -> (&'a Bucket, &'a Bucket) {
366 let mut bucket1;
367 loop {
368 let hashtable = get_hashtable();
369
370 // Get the lowest bucket first
371 let hash1 = hash(key1, (*hashtable).hash_bits);
372 let hash2 = hash(key2, (*hashtable).hash_bits);
373 if hash1 <= hash2 {
374 bucket1 = &(*hashtable).entries[hash1];
375 } else {
376 bucket1 = &(*hashtable).entries[hash2];
377 }
378
379 // Lock the first bucket
380 bucket1.mutex.lock();
381
382 // If no other thread has rehashed the table before we grabbed the lock
383 // then we are good to go! The lock we grabbed prevents any rehashes.
384 if HASHTABLE.load(Ordering::Relaxed) == hashtable as usize {
385 // Now lock the second bucket and return the two buckets
386 if hash1 == hash2 {
387 return (bucket1, bucket1);
388 } else if hash1 < hash2 {
389 let bucket2 = &(*hashtable).entries[hash2];
390 bucket2.mutex.lock();
391 return (bucket1, bucket2);
392 } else {
393 let bucket2 = &(*hashtable).entries[hash1];
394 bucket2.mutex.lock();
395 return (bucket2, bucket1);
396 }
397 }
398
399 // Unlock the bucket and try again
400 bucket1.mutex.unlock();
401 }
402 }
403
404 // Unlock a pair of buckets
unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket)405 unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) {
406 if bucket1 as *const _ == bucket2 as *const _ {
407 bucket1.mutex.unlock();
408 } else if bucket1 as *const _ < bucket2 as *const _ {
409 bucket2.mutex.unlock();
410 bucket1.mutex.unlock();
411 } else {
412 bucket1.mutex.unlock();
413 bucket2.mutex.unlock();
414 }
415 }
416
417 /// Result of a park operation.
418 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
419 pub enum ParkResult {
420 /// We were unparked by another thread with the given token.
421 Unparked(UnparkToken),
422
423 /// The validation callback returned false.
424 Invalid,
425
426 /// The timeout expired.
427 TimedOut,
428 }
429
430 impl ParkResult {
431 /// Returns true if we were unparked by another thread.
is_unparked(self) -> bool432 pub fn is_unparked(self) -> bool {
433 if let ParkResult::Unparked(_) = self {
434 true
435 } else {
436 false
437 }
438 }
439 }
440
441 /// Result of an unpark operation.
442 #[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
443 pub struct UnparkResult {
444 /// The number of threads that were unparked.
445 pub unparked_threads: usize,
446
447 /// The number of threads that were requeued.
448 pub requeued_threads: usize,
449
450 /// Whether there are any threads remaining in the queue. This only returns
451 /// true if a thread was unparked.
452 pub have_more_threads: bool,
453
454 /// This is set to true on average once every 0.5ms for any given key. It
455 /// should be used to switch to a fair unlocking mechanism for a particular
456 /// unlock.
457 pub be_fair: bool,
458
459 /// Private field so new fields can be added without breakage.
460 _sealed: (),
461 }
462
463 /// Operation that `unpark_requeue` should perform.
464 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
465 pub enum RequeueOp {
466 /// Abort the operation without doing anything.
467 Abort,
468
469 /// Unpark one thread and requeue the rest onto the target queue.
470 UnparkOneRequeueRest,
471
472 /// Requeue all threads onto the target queue.
473 RequeueAll,
474
475 /// Unpark one thread and leave the rest parked. No requeuing is done.
476 UnparkOne,
477
478 /// Requeue one thread and leave the rest parked on the original queue.
479 RequeueOne,
480 }
481
482 /// Operation that `unpark_filter` should perform for each thread.
483 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
484 pub enum FilterOp {
485 /// Unpark the thread and continue scanning the list of parked threads.
486 Unpark,
487
488 /// Don't unpark the thread and continue scanning the list of parked threads.
489 Skip,
490
491 /// Don't unpark the thread and stop scanning the list of parked threads.
492 Stop,
493 }
494
495 /// A value which is passed from an unparker to a parked thread.
496 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
497 pub struct UnparkToken(pub usize);
498
499 /// A value associated with a parked thread which can be used by `unpark_filter`.
500 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
501 pub struct ParkToken(pub usize);
502
503 /// A default unpark token to use.
504 pub const DEFAULT_UNPARK_TOKEN: UnparkToken = UnparkToken(0);
505
506 /// A default park token to use.
507 pub const DEFAULT_PARK_TOKEN: ParkToken = ParkToken(0);
508
509 /// Parks the current thread in the queue associated with the given key.
510 ///
511 /// The `validate` function is called while the queue is locked and can abort
512 /// the operation by returning false. If `validate` returns true then the
513 /// current thread is appended to the queue and the queue is unlocked.
514 ///
515 /// The `before_sleep` function is called after the queue is unlocked but before
516 /// the thread is put to sleep. The thread will then sleep until it is unparked
517 /// or the given timeout is reached.
518 ///
519 /// The `timed_out` function is also called while the queue is locked, but only
520 /// if the timeout was reached. It is passed the key of the queue it was in when
521 /// it timed out, which may be different from the original key if
522 /// `unpark_requeue` was called. It is also passed a bool which indicates
523 /// whether it was the last thread in the queue.
524 ///
525 /// # Safety
526 ///
527 /// You should only call this function with an address that you control, since
528 /// you could otherwise interfere with the operation of other synchronization
529 /// primitives.
530 ///
531 /// The `validate` and `timed_out` functions are called while the queue is
532 /// locked and must not panic or call into any function in `parking_lot`.
533 ///
534 /// The `before_sleep` function is called outside the queue lock and is allowed
535 /// to call `unpark_one`, `unpark_all`, `unpark_requeue` or `unpark_filter`, but
536 /// it is not allowed to call `park` or panic.
537 #[inline]
park<V, B, T>( key: usize, validate: V, before_sleep: B, timed_out: T, park_token: ParkToken, timeout: Option<Instant>, ) -> ParkResult where V: FnOnce() -> bool, B: FnOnce(), T: FnOnce(usize, bool),538 pub unsafe fn park<V, B, T>(
539 key: usize,
540 validate: V,
541 before_sleep: B,
542 timed_out: T,
543 park_token: ParkToken,
544 timeout: Option<Instant>,
545 ) -> ParkResult
546 where
547 V: FnOnce() -> bool,
548 B: FnOnce(),
549 T: FnOnce(usize, bool),
550 {
551 let mut v = Some(validate);
552 let mut b = Some(before_sleep);
553 let mut t = Some(timed_out);
554 park_internal(
555 key,
556 &mut || v.take().unchecked_unwrap()(),
557 &mut || b.take().unchecked_unwrap()(),
558 &mut |key, was_last_thread| t.take().unchecked_unwrap()(key, was_last_thread),
559 park_token,
560 timeout,
561 )
562 }
563
564 // Non-generic version to reduce monomorphization cost
park_internal( key: usize, validate: &mut FnMut() -> bool, before_sleep: &mut FnMut(), timed_out: &mut FnMut(usize, bool), park_token: ParkToken, timeout: Option<Instant>, ) -> ParkResult565 unsafe fn park_internal(
566 key: usize,
567 validate: &mut FnMut() -> bool,
568 before_sleep: &mut FnMut(),
569 timed_out: &mut FnMut(usize, bool),
570 park_token: ParkToken,
571 timeout: Option<Instant>,
572 ) -> ParkResult {
573 // Grab our thread data, this also ensures that the hash table exists
574 let mut thread_data = None;
575 let thread_data = get_thread_data(&mut thread_data);
576
577 // Lock the bucket for the given key
578 let bucket = lock_bucket(key);
579
580 // If the validation function fails, just return
581 if !validate() {
582 bucket.mutex.unlock();
583 return ParkResult::Invalid;
584 }
585
586 // Append our thread data to the queue and unlock the bucket
587 thread_data.parked_with_timeout.set(timeout.is_some());
588 thread_data.next_in_queue.set(ptr::null());
589 thread_data.key.store(key, Ordering::Relaxed);
590 thread_data.park_token.set(park_token);
591 thread_data.parker.prepare_park();
592 if !bucket.queue_head.get().is_null() {
593 (*bucket.queue_tail.get()).next_in_queue.set(thread_data);
594 } else {
595 bucket.queue_head.set(thread_data);
596 }
597 bucket.queue_tail.set(thread_data);
598 bucket.mutex.unlock();
599
600 // Invoke the pre-sleep callback
601 before_sleep();
602
603 // Park our thread and determine whether we were woken up by an unpark or by
604 // our timeout. Note that this isn't precise: we can still be unparked since
605 // we are still in the queue.
606 let unparked = match timeout {
607 Some(timeout) => thread_data.parker.park_until(timeout),
608 None => {
609 thread_data.parker.park();
610 // call deadlock detection on_unpark hook
611 deadlock::on_unpark(thread_data);
612 true
613 }
614 };
615
616 // If we were unparked, return now
617 if unparked {
618 return ParkResult::Unparked(thread_data.unpark_token.get());
619 }
620
621 // Lock our bucket again. Note that the hashtable may have been rehashed in
622 // the meantime. Our key may also have changed if we were requeued.
623 let (key, bucket) = lock_bucket_checked(&thread_data.key);
624
625 // Now we need to check again if we were unparked or timed out. Unlike the
626 // last check this is precise because we hold the bucket lock.
627 if !thread_data.parker.timed_out() {
628 bucket.mutex.unlock();
629 return ParkResult::Unparked(thread_data.unpark_token.get());
630 }
631
632 // We timed out, so we now need to remove our thread from the queue
633 let mut link = &bucket.queue_head;
634 let mut current = bucket.queue_head.get();
635 let mut previous = ptr::null();
636 while !current.is_null() {
637 if current == thread_data {
638 let next = (*current).next_in_queue.get();
639 link.set(next);
640 let mut was_last_thread = true;
641 if bucket.queue_tail.get() == current {
642 bucket.queue_tail.set(previous);
643 } else {
644 // Scan the rest of the queue to see if there are any other
645 // entries with the given key.
646 let mut scan = next;
647 while !scan.is_null() {
648 if (*scan).key.load(Ordering::Relaxed) == key {
649 was_last_thread = false;
650 break;
651 }
652 scan = (*scan).next_in_queue.get();
653 }
654 }
655
656 // Callback to indicate that we timed out, and whether we were the
657 // last thread on the queue.
658 timed_out(key, was_last_thread);
659 break;
660 } else {
661 link = &(*current).next_in_queue;
662 previous = current;
663 current = link.get();
664 }
665 }
666
667 // There should be no way for our thread to have been removed from the queue
668 // if we timed out.
669 debug_assert!(!current.is_null());
670
671 // Unlock the bucket, we are done
672 bucket.mutex.unlock();
673 ParkResult::TimedOut
674 }
675
676 /// Unparks one thread from the queue associated with the given key.
677 ///
678 /// The `callback` function is called while the queue is locked and before the
679 /// target thread is woken up. The `UnparkResult` argument to the function
680 /// indicates whether a thread was found in the queue and whether this was the
681 /// last thread in the queue. This value is also returned by `unpark_one`.
682 ///
683 /// The `callback` function should return an `UnparkToken` value which will be
684 /// passed to the thread that is unparked. If no thread is unparked then the
685 /// returned value is ignored.
686 ///
687 /// # Safety
688 ///
689 /// You should only call this function with an address that you control, since
690 /// you could otherwise interfere with the operation of other synchronization
691 /// primitives.
692 ///
693 /// The `callback` function is called while the queue is locked and must not
694 /// panic or call into any function in `parking_lot`.
695 #[inline]
unpark_one<C>(key: usize, callback: C) -> UnparkResult where C: FnOnce(UnparkResult) -> UnparkToken,696 pub unsafe fn unpark_one<C>(key: usize, callback: C) -> UnparkResult
697 where
698 C: FnOnce(UnparkResult) -> UnparkToken,
699 {
700 let mut c = Some(callback);
701 unpark_one_internal(key, &mut |result| c.take().unchecked_unwrap()(result))
702 }
703
704 // Non-generic version to reduce monomorphization cost
unpark_one_internal( key: usize, callback: &mut FnMut(UnparkResult) -> UnparkToken, ) -> UnparkResult705 unsafe fn unpark_one_internal(
706 key: usize,
707 callback: &mut FnMut(UnparkResult) -> UnparkToken,
708 ) -> UnparkResult {
709 // Lock the bucket for the given key
710 let bucket = lock_bucket(key);
711
712 // Find a thread with a matching key and remove it from the queue
713 let mut link = &bucket.queue_head;
714 let mut current = bucket.queue_head.get();
715 let mut previous = ptr::null();
716 let mut result = UnparkResult::default();
717 while !current.is_null() {
718 if (*current).key.load(Ordering::Relaxed) == key {
719 // Remove the thread from the queue
720 let next = (*current).next_in_queue.get();
721 link.set(next);
722 if bucket.queue_tail.get() == current {
723 bucket.queue_tail.set(previous);
724 } else {
725 // Scan the rest of the queue to see if there are any other
726 // entries with the given key.
727 let mut scan = next;
728 while !scan.is_null() {
729 if (*scan).key.load(Ordering::Relaxed) == key {
730 result.have_more_threads = true;
731 break;
732 }
733 scan = (*scan).next_in_queue.get();
734 }
735 }
736
737 // Invoke the callback before waking up the thread
738 result.unparked_threads = 1;
739 result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
740 let token = callback(result);
741
742 // Set the token for the target thread
743 (*current).unpark_token.set(token);
744
745 // This is a bit tricky: we first lock the ThreadParker to prevent
746 // the thread from exiting and freeing its ThreadData if its wait
747 // times out. Then we unlock the queue since we don't want to keep
748 // the queue locked while we perform a system call. Finally we wake
749 // up the parked thread.
750 let handle = (*current).parker.unpark_lock();
751 bucket.mutex.unlock();
752 handle.unpark();
753
754 return result;
755 } else {
756 link = &(*current).next_in_queue;
757 previous = current;
758 current = link.get();
759 }
760 }
761
762 // No threads with a matching key were found in the bucket
763 callback(result);
764 bucket.mutex.unlock();
765 result
766 }
767
768 /// Unparks all threads in the queue associated with the given key.
769 ///
770 /// The given `UnparkToken` is passed to all unparked threads.
771 ///
772 /// This function returns the number of threads that were unparked.
773 ///
774 /// # Safety
775 ///
776 /// You should only call this function with an address that you control, since
777 /// you could otherwise interfere with the operation of other synchronization
778 /// primitives.
unpark_all(key: usize, unpark_token: UnparkToken) -> usize779 pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize {
780 // Lock the bucket for the given key
781 let bucket = lock_bucket(key);
782
783 // Remove all threads with the given key in the bucket
784 let mut link = &bucket.queue_head;
785 let mut current = bucket.queue_head.get();
786 let mut previous = ptr::null();
787 let mut threads = SmallVec::<[_; 8]>::new();
788 while !current.is_null() {
789 if (*current).key.load(Ordering::Relaxed) == key {
790 // Remove the thread from the queue
791 let next = (*current).next_in_queue.get();
792 link.set(next);
793 if bucket.queue_tail.get() == current {
794 bucket.queue_tail.set(previous);
795 }
796
797 // Set the token for the target thread
798 (*current).unpark_token.set(unpark_token);
799
800 // Don't wake up threads while holding the queue lock. See comment
801 // in unpark_one. For now just record which threads we need to wake
802 // up.
803 threads.push((*current).parker.unpark_lock());
804 current = next;
805 } else {
806 link = &(*current).next_in_queue;
807 previous = current;
808 current = link.get();
809 }
810 }
811
812 // Unlock the bucket
813 bucket.mutex.unlock();
814
815 // Now that we are outside the lock, wake up all the threads that we removed
816 // from the queue.
817 let num_threads = threads.len();
818 for handle in threads.into_iter() {
819 handle.unpark();
820 }
821
822 num_threads
823 }
824
825 /// Removes all threads from the queue associated with `key_from`, optionally
826 /// unparks the first one and requeues the rest onto the queue associated with
827 /// `key_to`.
828 ///
829 /// The `validate` function is called while both queues are locked. Its return
830 /// value will determine which operation is performed, or whether the operation
831 /// should be aborted. See `RequeueOp` for details about the different possible
832 /// return values.
833 ///
834 /// The `callback` function is also called while both queues are locked. It is
835 /// passed the `RequeueOp` returned by `validate` and an `UnparkResult`
836 /// indicating whether a thread was unparked and whether there are threads still
837 /// parked in the new queue. This `UnparkResult` value is also returned by
838 /// `unpark_requeue`.
839 ///
840 /// The `callback` function should return an `UnparkToken` value which will be
841 /// passed to the thread that is unparked. If no thread is unparked then the
842 /// returned value is ignored.
843 ///
844 /// # Safety
845 ///
846 /// You should only call this function with an address that you control, since
847 /// you could otherwise interfere with the operation of other synchronization
848 /// primitives.
849 ///
850 /// The `validate` and `callback` functions are called while the queue is locked
851 /// and must not panic or call into any function in `parking_lot`.
852 #[inline]
unpark_requeue<V, C>( key_from: usize, key_to: usize, validate: V, callback: C, ) -> UnparkResult where V: FnOnce() -> RequeueOp, C: FnOnce(RequeueOp, UnparkResult) -> UnparkToken,853 pub unsafe fn unpark_requeue<V, C>(
854 key_from: usize,
855 key_to: usize,
856 validate: V,
857 callback: C,
858 ) -> UnparkResult
859 where
860 V: FnOnce() -> RequeueOp,
861 C: FnOnce(RequeueOp, UnparkResult) -> UnparkToken,
862 {
863 let mut v = Some(validate);
864 let mut c = Some(callback);
865 unpark_requeue_internal(
866 key_from,
867 key_to,
868 &mut || v.take().unchecked_unwrap()(),
869 &mut |op, r| c.take().unchecked_unwrap()(op, r),
870 )
871 }
872
873 // Non-generic version to reduce monomorphization cost
unpark_requeue_internal( key_from: usize, key_to: usize, validate: &mut FnMut() -> RequeueOp, callback: &mut FnMut(RequeueOp, UnparkResult) -> UnparkToken, ) -> UnparkResult874 unsafe fn unpark_requeue_internal(
875 key_from: usize,
876 key_to: usize,
877 validate: &mut FnMut() -> RequeueOp,
878 callback: &mut FnMut(RequeueOp, UnparkResult) -> UnparkToken,
879 ) -> UnparkResult {
880 // Lock the two buckets for the given key
881 let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to);
882
883 // If the validation function fails, just return
884 let mut result = UnparkResult::default();
885 let op = validate();
886 if op == RequeueOp::Abort {
887 unlock_bucket_pair(bucket_from, bucket_to);
888 return result;
889 }
890
891 // Remove all threads with the given key in the source bucket
892 let mut link = &bucket_from.queue_head;
893 let mut current = bucket_from.queue_head.get();
894 let mut previous = ptr::null();
895 let mut requeue_threads: *const ThreadData = ptr::null();
896 let mut requeue_threads_tail: *const ThreadData = ptr::null();
897 let mut wakeup_thread = None;
898 while !current.is_null() {
899 if (*current).key.load(Ordering::Relaxed) == key_from {
900 // Remove the thread from the queue
901 let next = (*current).next_in_queue.get();
902 link.set(next);
903 if bucket_from.queue_tail.get() == current {
904 bucket_from.queue_tail.set(previous);
905 }
906
907 // Prepare the first thread for wakeup and requeue the rest.
908 if (op == RequeueOp::UnparkOneRequeueRest || op == RequeueOp::UnparkOne)
909 && wakeup_thread.is_none()
910 {
911 wakeup_thread = Some(current);
912 result.unparked_threads = 1;
913 } else {
914 if !requeue_threads.is_null() {
915 (*requeue_threads_tail).next_in_queue.set(current);
916 } else {
917 requeue_threads = current;
918 }
919 requeue_threads_tail = current;
920 (*current).key.store(key_to, Ordering::Relaxed);
921 result.requeued_threads += 1;
922 }
923 if op == RequeueOp::UnparkOne || op == RequeueOp::RequeueOne {
924 // Scan the rest of the queue to see if there are any other
925 // entries with the given key.
926 let mut scan = next;
927 while !scan.is_null() {
928 if (*scan).key.load(Ordering::Relaxed) == key_from {
929 result.have_more_threads = true;
930 break;
931 }
932 scan = (*scan).next_in_queue.get();
933 }
934 break;
935 }
936 current = next;
937 } else {
938 link = &(*current).next_in_queue;
939 previous = current;
940 current = link.get();
941 }
942 }
943
944 // Add the requeued threads to the destination bucket
945 if !requeue_threads.is_null() {
946 (*requeue_threads_tail).next_in_queue.set(ptr::null());
947 if !bucket_to.queue_head.get().is_null() {
948 (*bucket_to.queue_tail.get())
949 .next_in_queue
950 .set(requeue_threads);
951 } else {
952 bucket_to.queue_head.set(requeue_threads);
953 }
954 bucket_to.queue_tail.set(requeue_threads_tail);
955 }
956
957 // Invoke the callback before waking up the thread
958 if result.unparked_threads != 0 {
959 result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout();
960 }
961 let token = callback(op, result);
962
963 // See comment in unpark_one for why we mess with the locking
964 if let Some(wakeup_thread) = wakeup_thread {
965 (*wakeup_thread).unpark_token.set(token);
966 let handle = (*wakeup_thread).parker.unpark_lock();
967 unlock_bucket_pair(bucket_from, bucket_to);
968 handle.unpark();
969 } else {
970 unlock_bucket_pair(bucket_from, bucket_to);
971 }
972
973 result
974 }
975
976 /// Unparks a number of threads from the front of the queue associated with
977 /// `key` depending on the results of a filter function which inspects the
978 /// `ParkToken` associated with each thread.
979 ///
980 /// The `filter` function is called for each thread in the queue or until
981 /// `FilterOp::Stop` is returned. This function is passed the `ParkToken`
982 /// associated with a particular thread, which is unparked if `FilterOp::Unpark`
983 /// is returned.
984 ///
985 /// The `callback` function is also called while both queues are locked. It is
986 /// passed an `UnparkResult` indicating the number of threads that were unparked
987 /// and whether there are still parked threads in the queue. This `UnparkResult`
988 /// value is also returned by `unpark_filter`.
989 ///
990 /// The `callback` function should return an `UnparkToken` value which will be
991 /// passed to all threads that are unparked. If no thread is unparked then the
992 /// returned value is ignored.
993 ///
994 /// # Safety
995 ///
996 /// You should only call this function with an address that you control, since
997 /// you could otherwise interfere with the operation of other synchronization
998 /// primitives.
999 ///
1000 /// The `filter` and `callback` functions are called while the queue is locked
1001 /// and must not panic or call into any function in `parking_lot`.
1002 #[inline]
unpark_filter<F, C>(key: usize, mut filter: F, callback: C) -> UnparkResult where F: FnMut(ParkToken) -> FilterOp, C: FnOnce(UnparkResult) -> UnparkToken,1003 pub unsafe fn unpark_filter<F, C>(key: usize, mut filter: F, callback: C) -> UnparkResult
1004 where
1005 F: FnMut(ParkToken) -> FilterOp,
1006 C: FnOnce(UnparkResult) -> UnparkToken,
1007 {
1008 let mut c = Some(callback);
1009 unpark_filter_internal(key, &mut filter, &mut |r| c.take().unchecked_unwrap()(r))
1010 }
1011
1012 // Non-generic version to reduce monomorphization cost
unpark_filter_internal( key: usize, filter: &mut FnMut(ParkToken) -> FilterOp, callback: &mut FnMut(UnparkResult) -> UnparkToken, ) -> UnparkResult1013 unsafe fn unpark_filter_internal(
1014 key: usize,
1015 filter: &mut FnMut(ParkToken) -> FilterOp,
1016 callback: &mut FnMut(UnparkResult) -> UnparkToken,
1017 ) -> UnparkResult {
1018 // Lock the bucket for the given key
1019 let bucket = lock_bucket(key);
1020
1021 // Go through the queue looking for threads with a matching key
1022 let mut link = &bucket.queue_head;
1023 let mut current = bucket.queue_head.get();
1024 let mut previous = ptr::null();
1025 let mut threads = SmallVec::<[_; 8]>::new();
1026 let mut result = UnparkResult::default();
1027 while !current.is_null() {
1028 if (*current).key.load(Ordering::Relaxed) == key {
1029 // Call the filter function with the thread's ParkToken
1030 let next = (*current).next_in_queue.get();
1031 match filter((*current).park_token.get()) {
1032 FilterOp::Unpark => {
1033 // Remove the thread from the queue
1034 link.set(next);
1035 if bucket.queue_tail.get() == current {
1036 bucket.queue_tail.set(previous);
1037 }
1038
1039 // Add the thread to our list of threads to unpark
1040 threads.push((current, None));
1041
1042 current = next;
1043 }
1044 FilterOp::Skip => {
1045 result.have_more_threads = true;
1046 link = &(*current).next_in_queue;
1047 previous = current;
1048 current = link.get();
1049 }
1050 FilterOp::Stop => {
1051 result.have_more_threads = true;
1052 break;
1053 }
1054 }
1055 } else {
1056 link = &(*current).next_in_queue;
1057 previous = current;
1058 current = link.get();
1059 }
1060 }
1061
1062 // Invoke the callback before waking up the threads
1063 result.unparked_threads = threads.len();
1064 if result.unparked_threads != 0 {
1065 result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
1066 }
1067 let token = callback(result);
1068
1069 // Pass the token to all threads that are going to be unparked and prepare
1070 // them for unparking.
1071 for t in threads.iter_mut() {
1072 (*t.0).unpark_token.set(token);
1073 t.1 = Some((*t.0).parker.unpark_lock());
1074 }
1075
1076 bucket.mutex.unlock();
1077
1078 // Now that we are outside the lock, wake up all the threads that we removed
1079 // from the queue.
1080 for (_, handle) in threads.into_iter() {
1081 handle.unchecked_unwrap().unpark();
1082 }
1083
1084 result
1085 }
1086
1087 /// \[Experimental\] Deadlock detection
1088 ///
1089 /// Enabled via the `deadlock_detection` feature flag.
1090 pub mod deadlock {
1091 #[cfg(feature = "deadlock_detection")]
1092 use super::deadlock_impl;
1093
1094 #[cfg(feature = "deadlock_detection")]
1095 pub(super) use super::deadlock_impl::DeadlockData;
1096
1097 #[cfg(not(feature = "deadlock_detection"))]
1098 pub(super) struct DeadlockData {}
1099
1100 #[cfg(not(feature = "deadlock_detection"))]
1101 impl DeadlockData {
new() -> Self1102 pub(super) fn new() -> Self {
1103 DeadlockData {}
1104 }
1105 }
1106
1107 /// Acquire a resource identified by key in the deadlock detector
1108 /// Noop if deadlock_detection feature isn't enabled.
1109 /// Note: Call after the resource is acquired
1110 #[inline]
acquire_resource(_key: usize)1111 pub unsafe fn acquire_resource(_key: usize) {
1112 #[cfg(feature = "deadlock_detection")]
1113 deadlock_impl::acquire_resource(_key);
1114 }
1115
1116 /// Release a resource identified by key in the deadlock detector.
1117 /// Noop if deadlock_detection feature isn't enabled.
1118 /// Note: Call before the resource is released
1119 /// # Panics
1120 /// Panics if the resource was already released or wasn't acquired in this thread.
1121 #[inline]
release_resource(_key: usize)1122 pub unsafe fn release_resource(_key: usize) {
1123 #[cfg(feature = "deadlock_detection")]
1124 deadlock_impl::release_resource(_key);
1125 }
1126
1127 /// Returns all deadlocks detected *since* the last call.
1128 /// Each cycle consist of a vector of `DeadlockedThread`.
1129 #[cfg(feature = "deadlock_detection")]
1130 #[inline]
check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>>1131 pub fn check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>> {
1132 deadlock_impl::check_deadlock()
1133 }
1134
1135 #[inline]
on_unpark(_td: &super::ThreadData)1136 pub(super) unsafe fn on_unpark(_td: &super::ThreadData) {
1137 #[cfg(feature = "deadlock_detection")]
1138 deadlock_impl::on_unpark(_td);
1139 }
1140 }
1141
1142 #[cfg(feature = "deadlock_detection")]
1143 mod deadlock_impl {
1144 use super::{get_hashtable, get_thread_data, lock_bucket, ThreadData, NUM_THREADS};
1145 use backtrace::Backtrace;
1146 use petgraph;
1147 use petgraph::graphmap::DiGraphMap;
1148 use std::cell::{Cell, UnsafeCell};
1149 use std::collections::HashSet;
1150 use std::sync::atomic::Ordering;
1151 use std::sync::mpsc;
1152 use thread_id;
1153
1154 /// Representation of a deadlocked thread
1155 pub struct DeadlockedThread {
1156 thread_id: usize,
1157 backtrace: Backtrace,
1158 }
1159
1160 impl DeadlockedThread {
1161 /// The system thread id
thread_id(&self) -> usize1162 pub fn thread_id(&self) -> usize {
1163 self.thread_id
1164 }
1165
1166 /// The thread backtrace
backtrace(&self) -> &Backtrace1167 pub fn backtrace(&self) -> &Backtrace {
1168 &self.backtrace
1169 }
1170 }
1171
1172 pub struct DeadlockData {
1173 // Currently owned resources (keys)
1174 resources: UnsafeCell<Vec<usize>>,
1175
1176 // Set when there's a pending callstack request
1177 deadlocked: Cell<bool>,
1178
1179 // Sender used to report the backtrace
1180 backtrace_sender: UnsafeCell<Option<mpsc::Sender<DeadlockedThread>>>,
1181
1182 // System thread id
1183 thread_id: usize,
1184 }
1185
1186 impl DeadlockData {
new() -> Self1187 pub fn new() -> Self {
1188 DeadlockData {
1189 resources: UnsafeCell::new(Vec::new()),
1190 deadlocked: Cell::new(false),
1191 backtrace_sender: UnsafeCell::new(None),
1192 thread_id: thread_id::get(),
1193 }
1194 }
1195 }
1196
on_unpark(td: &ThreadData)1197 pub(super) unsafe fn on_unpark(td: &ThreadData) {
1198 if td.deadlock_data.deadlocked.get() {
1199 let sender = (*td.deadlock_data.backtrace_sender.get()).take().unwrap();
1200 sender
1201 .send(DeadlockedThread {
1202 thread_id: td.deadlock_data.thread_id,
1203 backtrace: Backtrace::new(),
1204 })
1205 .unwrap();
1206 // make sure to close this sender
1207 drop(sender);
1208
1209 // park until the end of the time
1210 td.parker.prepare_park();
1211 td.parker.park();
1212 unreachable!("unparked deadlocked thread!");
1213 }
1214 }
1215
acquire_resource(key: usize)1216 pub unsafe fn acquire_resource(key: usize) {
1217 let mut thread_data = None;
1218 let thread_data = get_thread_data(&mut thread_data);
1219 (*thread_data.deadlock_data.resources.get()).push(key);
1220 }
1221
release_resource(key: usize)1222 pub unsafe fn release_resource(key: usize) {
1223 let mut thread_data = None;
1224 let thread_data = get_thread_data(&mut thread_data);
1225 let resources = &mut (*thread_data.deadlock_data.resources.get());
1226 match resources.iter().rposition(|x| *x == key) {
1227 Some(p) => resources.swap_remove(p),
1228 None => panic!("key {} not found in thread resources", key),
1229 };
1230 }
1231
check_deadlock() -> Vec<Vec<DeadlockedThread>>1232 pub fn check_deadlock() -> Vec<Vec<DeadlockedThread>> {
1233 unsafe {
1234 // fast pass
1235 if check_wait_graph_fast() {
1236 // double check
1237 check_wait_graph_slow()
1238 } else {
1239 Vec::new()
1240 }
1241 }
1242 }
1243
1244 // Simple algorithm that builds a wait graph f the threads and the resources,
1245 // then checks for the presence of cycles (deadlocks).
1246 // This variant isn't precise as it doesn't lock the entire table before checking
check_wait_graph_fast() -> bool1247 unsafe fn check_wait_graph_fast() -> bool {
1248 let table = get_hashtable();
1249 let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1250 let mut graph = DiGraphMap::<usize, ()>::with_capacity(thread_count * 2, thread_count * 2);
1251
1252 for b in &(*table).entries[..] {
1253 b.mutex.lock();
1254 let mut current = b.queue_head.get();
1255 while !current.is_null() {
1256 if !(*current).parked_with_timeout.get()
1257 && !(*current).deadlock_data.deadlocked.get()
1258 {
1259 // .resources are waiting for their owner
1260 for &resource in &(*(*current).deadlock_data.resources.get()) {
1261 graph.add_edge(resource, current as usize, ());
1262 }
1263 // owner waits for resource .key
1264 graph.add_edge(current as usize, (*current).key.load(Ordering::Relaxed), ());
1265 }
1266 current = (*current).next_in_queue.get();
1267 }
1268 b.mutex.unlock();
1269 }
1270
1271 petgraph::algo::is_cyclic_directed(&graph)
1272 }
1273
1274 #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
1275 enum WaitGraphNode {
1276 Thread(*const ThreadData),
1277 Resource(usize),
1278 }
1279
1280 use self::WaitGraphNode::*;
1281
1282 // Contrary to the _fast variant this locks the entries table before looking for cycles.
1283 // Returns all detected thread wait cycles.
1284 // Note that once a cycle is reported it's never reported again.
check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>>1285 unsafe fn check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>> {
1286 let mut table = get_hashtable();
1287 loop {
1288 // Lock all buckets in the old table
1289 for b in &(*table).entries[..] {
1290 b.mutex.lock();
1291 }
1292
1293 // Now check if our table is still the latest one. Another thread could
1294 // have grown the hash table between us getting and locking the hash table.
1295 let new_table = get_hashtable();
1296 if new_table == table {
1297 break;
1298 }
1299
1300 // Unlock buckets and try again
1301 for b in &(*table).entries[..] {
1302 b.mutex.unlock();
1303 }
1304
1305 table = new_table;
1306 }
1307
1308 let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1309 let mut graph =
1310 DiGraphMap::<WaitGraphNode, ()>::with_capacity(thread_count * 2, thread_count * 2);
1311
1312 for b in &(*table).entries[..] {
1313 let mut current = b.queue_head.get();
1314 while !current.is_null() {
1315 if !(*current).parked_with_timeout.get()
1316 && !(*current).deadlock_data.deadlocked.get()
1317 {
1318 // .resources are waiting for their owner
1319 for &resource in &(*(*current).deadlock_data.resources.get()) {
1320 graph.add_edge(Resource(resource), Thread(current), ());
1321 }
1322 // owner waits for resource .key
1323 graph.add_edge(
1324 Thread(current),
1325 Resource((*current).key.load(Ordering::Relaxed)),
1326 (),
1327 );
1328 }
1329 current = (*current).next_in_queue.get();
1330 }
1331 }
1332
1333 for b in &(*table).entries[..] {
1334 b.mutex.unlock();
1335 }
1336
1337 // find cycles
1338 let cycles = graph_cycles(&graph);
1339
1340 let mut results = Vec::with_capacity(cycles.len());
1341
1342 for cycle in cycles {
1343 let (sender, receiver) = mpsc::channel();
1344 for td in cycle {
1345 let bucket = lock_bucket((*td).key.load(Ordering::Relaxed));
1346 (*td).deadlock_data.deadlocked.set(true);
1347 *(*td).deadlock_data.backtrace_sender.get() = Some(sender.clone());
1348 let handle = (*td).parker.unpark_lock();
1349 bucket.mutex.unlock();
1350 // unpark the deadlocked thread!
1351 // on unpark it'll notice the deadlocked flag and report back
1352 handle.unpark();
1353 }
1354 // make sure to drop our sender before collecting results
1355 drop(sender);
1356 results.push(receiver.iter().collect());
1357 }
1358
1359 results
1360 }
1361
1362 // normalize a cycle to start with the "smallest" node
normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T>1363 fn normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T> {
1364 let min_pos = input
1365 .iter()
1366 .enumerate()
1367 .min_by_key(|&(_, &t)| t)
1368 .map(|(p, _)| p)
1369 .unwrap_or(0);
1370 input
1371 .iter()
1372 .cycle()
1373 .skip(min_pos)
1374 .take(input.len())
1375 .cloned()
1376 .collect()
1377 }
1378
1379 // returns all thread cycles in the wait graph
graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>>1380 fn graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>> {
1381 use petgraph::visit::depth_first_search;
1382 use petgraph::visit::DfsEvent;
1383 use petgraph::visit::NodeIndexable;
1384
1385 let mut cycles = HashSet::new();
1386 let mut path = Vec::with_capacity(g.node_bound());
1387 // start from threads to get the correct threads cycle
1388 let threads = g
1389 .nodes()
1390 .filter(|n| if let &Thread(_) = n { true } else { false });
1391
1392 depth_first_search(g, threads, |e| match e {
1393 DfsEvent::Discover(Thread(n), _) => path.push(n),
1394 DfsEvent::Finish(Thread(_), _) => {
1395 path.pop();
1396 }
1397 DfsEvent::BackEdge(_, Thread(n)) => {
1398 let from = path.iter().rposition(|&i| i == n).unwrap();
1399 cycles.insert(normalize_cycle(&path[from..]));
1400 }
1401 _ => (),
1402 });
1403
1404 cycles.iter().cloned().collect()
1405 }
1406 }
1407