1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7
8 use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
9 use std::time::{Duration, Instant};
10 use std::cell::{Cell, UnsafeCell};
11 use std::ptr;
12 use std::mem;
13 use std::thread::LocalKey;
14 #[cfg(not(feature = "nightly"))]
15 use std::panic;
16 use smallvec::SmallVec;
17 use rand::{self, Rng, XorShiftRng};
18 use thread_parker::ThreadParker;
19 use word_lock::WordLock;
20 use util::UncheckedOptionExt;
21
22 static NUM_THREADS: AtomicUsize = ATOMIC_USIZE_INIT;
23 static HASHTABLE: AtomicUsize = ATOMIC_USIZE_INIT;
24
25 // Even with 3x more buckets than threads, the memory overhead per thread is
26 // still only a few hundred bytes per thread.
27 const LOAD_FACTOR: usize = 3;
28
29 struct HashTable {
30 // Hash buckets for the table
31 entries: Box<[Bucket]>,
32
33 // Number of bits used for the hash function
34 hash_bits: u32,
35
36 // Previous table. This is only kept to keep leak detectors happy.
37 _prev: *const HashTable,
38 }
39
40 impl HashTable {
new(num_threads: usize, prev: *const HashTable) -> Box<HashTable>41 fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> {
42 let new_size = (num_threads * LOAD_FACTOR).next_power_of_two();
43 let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1;
44 let bucket = Bucket {
45 mutex: WordLock::new(),
46 queue_head: Cell::new(ptr::null()),
47 queue_tail: Cell::new(ptr::null()),
48 fair_timeout: UnsafeCell::new(FairTimeout::new()),
49 _padding: unsafe { mem::uninitialized() },
50 };
51 Box::new(HashTable {
52 entries: vec![bucket; new_size].into_boxed_slice(),
53 hash_bits: hash_bits,
54 _prev: prev,
55 })
56 }
57 }
58
59 struct Bucket {
60 // Lock protecting the queue
61 mutex: WordLock,
62
63 // Linked list of threads waiting on this bucket
64 queue_head: Cell<*const ThreadData>,
65 queue_tail: Cell<*const ThreadData>,
66
67 // Next time at which point be_fair should be set
68 fair_timeout: UnsafeCell<FairTimeout>,
69
70 // Padding to avoid false sharing between buckets. Ideally we would just
71 // align the bucket structure to 64 bytes, but Rust doesn't support that
72 // yet.
73 _padding: [u8; 64],
74 }
75
76 // Implementation of Clone for Bucket, needed to make vec![] work
77 impl Clone for Bucket {
clone(&self) -> Bucket78 fn clone(&self) -> Bucket {
79 Bucket {
80 mutex: WordLock::new(),
81 queue_head: Cell::new(ptr::null()),
82 queue_tail: Cell::new(ptr::null()),
83 fair_timeout: UnsafeCell::new(FairTimeout::new()),
84 _padding: unsafe { mem::uninitialized() },
85 }
86 }
87 }
88
89 struct FairTimeout {
90 // Next time at which point be_fair should be set
91 timeout: Instant,
92
93 // Random number generator for calculating the next timeout
94 rng: XorShiftRng,
95 }
96
97 impl FairTimeout {
new() -> FairTimeout98 fn new() -> FairTimeout {
99 FairTimeout {
100 timeout: Instant::now(),
101 rng: rand::weak_rng(),
102 }
103 }
104
105 // Determine whether we should force a fair unlock, and update the timeout
should_timeout(&mut self) -> bool106 fn should_timeout(&mut self) -> bool {
107 let now = Instant::now();
108 if now > self.timeout {
109 self.timeout = now + Duration::new(0, self.rng.gen_range(0, 1000000));
110 true
111 } else {
112 false
113 }
114 }
115 }
116
117 struct ThreadData {
118 parker: ThreadParker,
119
120 // Key that this thread is sleeping on. This may change if the thread is
121 // requeued to a different key.
122 key: AtomicUsize,
123
124 // Linked list of parked threads in a bucket
125 next_in_queue: Cell<*const ThreadData>,
126
127 // UnparkToken passed to this thread when it is unparked
128 unpark_token: Cell<UnparkToken>,
129
130 // ParkToken value set by the thread when it was parked
131 park_token: Cell<ParkToken>,
132
133 // Is the thread parked with a timeout?
134 parked_with_timeout: Cell<bool>,
135
136 // Extra data for deadlock detection
137 // TODO: once supported in stable replace with #[cfg...] & remove dummy struct/impl
138 #[allow(dead_code)] deadlock_data: deadlock::DeadlockData,
139 }
140
141 impl ThreadData {
new() -> ThreadData142 fn new() -> ThreadData {
143 // Keep track of the total number of live ThreadData objects and resize
144 // the hash table accordingly.
145 let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1;
146 unsafe {
147 grow_hashtable(num_threads);
148 }
149
150 ThreadData {
151 parker: ThreadParker::new(),
152 key: AtomicUsize::new(0),
153 next_in_queue: Cell::new(ptr::null()),
154 unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN),
155 park_token: Cell::new(DEFAULT_PARK_TOKEN),
156 parked_with_timeout: Cell::new(false),
157 deadlock_data: deadlock::DeadlockData::new(),
158 }
159 }
160 }
161
162 // Returns a ThreadData structure for the current thread
get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData163 unsafe fn get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData {
164 // Try to read from thread-local storage, but return None if the TLS has
165 // already been destroyed.
166 #[cfg(feature = "nightly")]
167 fn try_get_tls(key: &'static LocalKey<ThreadData>) -> Option<*const ThreadData> {
168 key.try_with(|x| x as *const ThreadData).ok()
169 }
170 #[cfg(not(feature = "nightly"))]
171 fn try_get_tls(key: &'static LocalKey<ThreadData>) -> Option<*const ThreadData> {
172 panic::catch_unwind(|| key.with(|x| x as *const ThreadData)).ok()
173 }
174
175 // Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive
176 // to construct. Try to use a thread-local version if possible.
177 thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
178 if let Some(tls) = try_get_tls(&THREAD_DATA) {
179 return &*tls;
180 }
181
182 // Otherwise just create a ThreadData on the stack
183 *local = Some(ThreadData::new());
184 local.as_ref().unwrap()
185 }
186
187 impl Drop for ThreadData {
drop(&mut self)188 fn drop(&mut self) {
189 NUM_THREADS.fetch_sub(1, Ordering::Relaxed);
190 }
191 }
192
193 // Get a pointer to the latest hash table, creating one if it doesn't exist yet.
get_hashtable() -> *const HashTable194 unsafe fn get_hashtable() -> *const HashTable {
195 let mut table = HASHTABLE.load(Ordering::Acquire);
196
197 // If there is no table, create one
198 if table == 0 {
199 let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null()));
200
201 // If this fails then it means some other thread created the hash
202 // table first.
203 match HASHTABLE.compare_exchange(
204 0,
205 new_table as usize,
206 Ordering::Release,
207 Ordering::Relaxed,
208 ) {
209 Ok(_) => return new_table,
210 Err(x) => table = x,
211 }
212
213 // Free the table we created
214 Box::from_raw(new_table);
215 }
216
217 table as *const HashTable
218 }
219
220 // Grow the hash table so that it is big enough for the given number of threads.
221 // This isn't performance-critical since it is only done when a ThreadData is
222 // created, which only happens once per thread.
grow_hashtable(num_threads: usize)223 unsafe fn grow_hashtable(num_threads: usize) {
224 // If there is no table, create one
225 if HASHTABLE.load(Ordering::Relaxed) == 0 {
226 let new_table = Box::into_raw(HashTable::new(num_threads, ptr::null()));
227
228 // If this fails then it means some other thread created the hash
229 // table first.
230 if HASHTABLE
231 .compare_exchange(0, new_table as usize, Ordering::Release, Ordering::Relaxed)
232 .is_ok()
233 {
234 return;
235 }
236
237 // Free the table we created
238 Box::from_raw(new_table);
239 }
240
241 let mut old_table;
242 loop {
243 old_table = HASHTABLE.load(Ordering::Acquire) as *mut HashTable;
244
245 // Check if we need to resize the existing table
246 if (*old_table).entries.len() >= LOAD_FACTOR * num_threads {
247 return;
248 }
249
250 // Lock all buckets in the old table
251 for b in &(*old_table).entries[..] {
252 b.mutex.lock();
253 }
254
255 // Now check if our table is still the latest one. Another thread could
256 // have grown the hash table between us reading HASHTABLE and locking
257 // the buckets.
258 if HASHTABLE.load(Ordering::Relaxed) == old_table as usize {
259 break;
260 }
261
262 // Unlock buckets and try again
263 for b in &(*old_table).entries[..] {
264 b.mutex.unlock();
265 }
266 }
267
268 // Create the new table
269 let new_table = HashTable::new(num_threads, old_table);
270
271 // Move the entries from the old table to the new one
272 for b in &(*old_table).entries[..] {
273 let mut current = b.queue_head.get();
274 while !current.is_null() {
275 let next = (*current).next_in_queue.get();
276 let hash = hash((*current).key.load(Ordering::Relaxed), new_table.hash_bits);
277 if new_table.entries[hash].queue_tail.get().is_null() {
278 new_table.entries[hash].queue_head.set(current);
279 } else {
280 (*new_table.entries[hash].queue_tail.get())
281 .next_in_queue
282 .set(current);
283 }
284 new_table.entries[hash].queue_tail.set(current);
285 (*current).next_in_queue.set(ptr::null());
286 current = next;
287 }
288 }
289
290 // Publish the new table. No races are possible at this point because
291 // any other thread trying to grow the hash table is blocked on the bucket
292 // locks in the old table.
293 HASHTABLE.store(Box::into_raw(new_table) as usize, Ordering::Release);
294
295 // Unlock all buckets in the old table
296 for b in &(*old_table).entries[..] {
297 b.mutex.unlock();
298 }
299 }
300
301 // Hash function for addresses
302 #[cfg(target_pointer_width = "32")]
hash(key: usize, bits: u32) -> usize303 fn hash(key: usize, bits: u32) -> usize {
304 key.wrapping_mul(0x9E3779B9) >> (32 - bits)
305 }
306 #[cfg(target_pointer_width = "64")]
hash(key: usize, bits: u32) -> usize307 fn hash(key: usize, bits: u32) -> usize {
308 key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits)
309 }
310
311 // Lock the bucket for the given key
lock_bucket<'a>(key: usize) -> &'a Bucket312 unsafe fn lock_bucket<'a>(key: usize) -> &'a Bucket {
313 let mut bucket;
314 loop {
315 let hashtable = get_hashtable();
316
317 let hash = hash(key, (*hashtable).hash_bits);
318 bucket = &(*hashtable).entries[hash];
319
320 // Lock the bucket
321 bucket.mutex.lock();
322
323 // If no other thread has rehashed the table before we grabbed the lock
324 // then we are good to go! The lock we grabbed prevents any rehashes.
325 if HASHTABLE.load(Ordering::Relaxed) == hashtable as usize {
326 return bucket;
327 }
328
329 // Unlock the bucket and try again
330 bucket.mutex.unlock();
331 }
332 }
333
334 // Lock the bucket for the given key, but check that the key hasn't been changed
335 // in the meantime due to a requeue.
lock_bucket_checked<'a>(key: &AtomicUsize) -> (usize, &'a Bucket)336 unsafe fn lock_bucket_checked<'a>(key: &AtomicUsize) -> (usize, &'a Bucket) {
337 let mut bucket;
338 loop {
339 let hashtable = get_hashtable();
340 let current_key = key.load(Ordering::Relaxed);
341
342 let hash = hash(current_key, (*hashtable).hash_bits);
343 bucket = &(*hashtable).entries[hash];
344
345 // Lock the bucket
346 bucket.mutex.lock();
347
348 // Check that both the hash table and key are correct while the bucket
349 // is locked. Note that the key can't change once we locked the proper
350 // bucket for it, so we just keep trying until we have the correct key.
351 if HASHTABLE.load(Ordering::Relaxed) == hashtable as usize
352 && key.load(Ordering::Relaxed) == current_key
353 {
354 return (current_key, bucket);
355 }
356
357 // Unlock the bucket and try again
358 bucket.mutex.unlock();
359 }
360 }
361
362 // Lock the two buckets for the given pair of keys
lock_bucket_pair<'a>(key1: usize, key2: usize) -> (&'a Bucket, &'a Bucket)363 unsafe fn lock_bucket_pair<'a>(key1: usize, key2: usize) -> (&'a Bucket, &'a Bucket) {
364 let mut bucket1;
365 loop {
366 let hashtable = get_hashtable();
367
368 // Get the lowest bucket first
369 let hash1 = hash(key1, (*hashtable).hash_bits);
370 let hash2 = hash(key2, (*hashtable).hash_bits);
371 if hash1 <= hash2 {
372 bucket1 = &(*hashtable).entries[hash1];
373 } else {
374 bucket1 = &(*hashtable).entries[hash2];
375 }
376
377 // Lock the first bucket
378 bucket1.mutex.lock();
379
380 // If no other thread has rehashed the table before we grabbed the lock
381 // then we are good to go! The lock we grabbed prevents any rehashes.
382 if HASHTABLE.load(Ordering::Relaxed) == hashtable as usize {
383 // Now lock the second bucket and return the two buckets
384 if hash1 == hash2 {
385 return (bucket1, bucket1);
386 } else if hash1 < hash2 {
387 let bucket2 = &(*hashtable).entries[hash2];
388 bucket2.mutex.lock();
389 return (bucket1, bucket2);
390 } else {
391 let bucket2 = &(*hashtable).entries[hash1];
392 bucket2.mutex.lock();
393 return (bucket2, bucket1);
394 }
395 }
396
397 // Unlock the bucket and try again
398 bucket1.mutex.unlock();
399 }
400 }
401
402 // Unlock a pair of buckets
unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket)403 unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) {
404 if bucket1 as *const _ == bucket2 as *const _ {
405 bucket1.mutex.unlock();
406 } else if bucket1 as *const _ < bucket2 as *const _ {
407 bucket2.mutex.unlock();
408 bucket1.mutex.unlock();
409 } else {
410 bucket1.mutex.unlock();
411 bucket2.mutex.unlock();
412 }
413 }
414
415 /// Result of a park operation.
416 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
417 pub enum ParkResult {
418 /// We were unparked by another thread with the given token.
419 Unparked(UnparkToken),
420
421 /// The validation callback returned false.
422 Invalid,
423
424 /// The timeout expired.
425 TimedOut,
426 }
427
428 impl ParkResult {
429 /// Returns true if we were unparked by another thread.
is_unparked(self) -> bool430 pub fn is_unparked(self) -> bool {
431 if let ParkResult::Unparked(_) = self {
432 true
433 } else {
434 false
435 }
436 }
437 }
438
439 /// Result of an unpark operation.
440 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
441 pub struct UnparkResult {
442 /// The number of threads that were unparked.
443 pub unparked_threads: usize,
444
445 /// Whether there are any threads remaining in the queue. This only returns
446 /// true if a thread was unparked.
447 pub have_more_threads: bool,
448
449 /// This is set to true on average once every 0.5ms for any given key. It
450 /// should be used to switch to a fair unlocking mechanism for a particular
451 /// unlock.
452 pub be_fair: bool,
453 }
454
455 /// Operation that `unpark_requeue` should perform.
456 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
457 pub enum RequeueOp {
458 /// Abort the operation without doing anything.
459 Abort,
460
461 /// Unpark one thread and requeue the rest onto the target queue.
462 UnparkOneRequeueRest,
463
464 /// Requeue all threads onto the target queue.
465 RequeueAll,
466 }
467
468 /// Operation that `unpark_filter` should perform for each thread.
469 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
470 pub enum FilterOp {
471 /// Unpark the thread and continue scanning the list of parked threads.
472 Unpark,
473
474 /// Don't unpark the thread and continue scanning the list of parked threads.
475 Skip,
476
477 /// Don't unpark the thread and stop scanning the list of parked threads.
478 Stop,
479 }
480
481 /// A value which is passed from an unparker to a parked thread.
482 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
483 pub struct UnparkToken(pub usize);
484
485 /// A value associated with a parked thread which can be used by `unpark_filter`.
486 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
487 pub struct ParkToken(pub usize);
488
489 /// A default unpark token to use.
490 pub const DEFAULT_UNPARK_TOKEN: UnparkToken = UnparkToken(0);
491
492 /// A default park token to use.
493 pub const DEFAULT_PARK_TOKEN: ParkToken = ParkToken(0);
494
495 /// Parks the current thread in the queue associated with the given key.
496 ///
497 /// The `validate` function is called while the queue is locked and can abort
498 /// the operation by returning false. If `validate` returns true then the
499 /// current thread is appended to the queue and the queue is unlocked.
500 ///
501 /// The `before_sleep` function is called after the queue is unlocked but before
502 /// the thread is put to sleep. The thread will then sleep until it is unparked
503 /// or the given timeout is reached.
504 ///
505 /// The `timed_out` function is also called while the queue is locked, but only
506 /// if the timeout was reached. It is passed the key of the queue it was in when
507 /// it timed out, which may be different from the original key if
508 /// `unpark_requeue` was called. It is also passed a bool which indicates
509 /// whether it was the last thread in the queue.
510 ///
511 /// # Safety
512 ///
513 /// You should only call this function with an address that you control, since
514 /// you could otherwise interfere with the operation of other synchronization
515 /// primitives.
516 ///
517 /// The `validate` and `timed_out` functions are called while the queue is
518 /// locked and must not panic or call into any function in `parking_lot`.
519 ///
520 /// The `before_sleep` function is called outside the queue lock and is allowed
521 /// to call `unpark_one`, `unpark_all`, `unpark_requeue` or `unpark_filter`, but
522 /// it is not allowed to call `park` or panic.
523 #[inline]
park<V, B, T>( key: usize, validate: V, before_sleep: B, timed_out: T, park_token: ParkToken, timeout: Option<Instant>, ) -> ParkResult where V: FnOnce() -> bool, B: FnOnce(), T: FnOnce(usize, bool),524 pub unsafe fn park<V, B, T>(
525 key: usize,
526 validate: V,
527 before_sleep: B,
528 timed_out: T,
529 park_token: ParkToken,
530 timeout: Option<Instant>,
531 ) -> ParkResult
532 where
533 V: FnOnce() -> bool,
534 B: FnOnce(),
535 T: FnOnce(usize, bool),
536 {
537 let mut v = Some(validate);
538 let mut b = Some(before_sleep);
539 let mut t = Some(timed_out);
540 park_internal(
541 key,
542 &mut || v.take().unchecked_unwrap()(),
543 &mut || b.take().unchecked_unwrap()(),
544 &mut |key, was_last_thread| t.take().unchecked_unwrap()(key, was_last_thread),
545 park_token,
546 timeout,
547 )
548 }
549
550 // Non-generic version to reduce monomorphization cost
park_internal( key: usize, validate: &mut FnMut() -> bool, before_sleep: &mut FnMut(), timed_out: &mut FnMut(usize, bool), park_token: ParkToken, timeout: Option<Instant>, ) -> ParkResult551 unsafe fn park_internal(
552 key: usize,
553 validate: &mut FnMut() -> bool,
554 before_sleep: &mut FnMut(),
555 timed_out: &mut FnMut(usize, bool),
556 park_token: ParkToken,
557 timeout: Option<Instant>,
558 ) -> ParkResult {
559 // Grab our thread data, this also ensures that the hash table exists
560 let mut thread_data = None;
561 let thread_data = get_thread_data(&mut thread_data);
562
563 // Lock the bucket for the given key
564 let bucket = lock_bucket(key);
565
566 // If the validation function fails, just return
567 if !validate() {
568 bucket.mutex.unlock();
569 return ParkResult::Invalid;
570 }
571
572 // Append our thread data to the queue and unlock the bucket
573 thread_data.parked_with_timeout.set(timeout.is_some());
574 thread_data.next_in_queue.set(ptr::null());
575 thread_data.key.store(key, Ordering::Relaxed);
576 thread_data.park_token.set(park_token);
577 thread_data.parker.prepare_park();
578 if !bucket.queue_head.get().is_null() {
579 (*bucket.queue_tail.get()).next_in_queue.set(thread_data);
580 } else {
581 bucket.queue_head.set(thread_data);
582 }
583 bucket.queue_tail.set(thread_data);
584 bucket.mutex.unlock();
585
586 // Invoke the pre-sleep callback
587 before_sleep();
588
589 // Park our thread and determine whether we were woken up by an unpark or by
590 // our timeout. Note that this isn't precise: we can still be unparked since
591 // we are still in the queue.
592 let unparked = match timeout {
593 Some(timeout) => thread_data.parker.park_until(timeout),
594 None => {
595 thread_data.parker.park();
596 // call deadlock detection on_unpark hook
597 deadlock::on_unpark(thread_data);
598 true
599 }
600 };
601
602 // If we were unparked, return now
603 if unparked {
604 return ParkResult::Unparked(thread_data.unpark_token.get());
605 }
606
607 // Lock our bucket again. Note that the hashtable may have been rehashed in
608 // the meantime. Our key may also have changed if we were requeued.
609 let (key, bucket) = lock_bucket_checked(&thread_data.key);
610
611 // Now we need to check again if we were unparked or timed out. Unlike the
612 // last check this is precise because we hold the bucket lock.
613 if !thread_data.parker.timed_out() {
614 bucket.mutex.unlock();
615 return ParkResult::Unparked(thread_data.unpark_token.get());
616 }
617
618 // We timed out, so we now need to remove our thread from the queue
619 let mut link = &bucket.queue_head;
620 let mut current = bucket.queue_head.get();
621 let mut previous = ptr::null();
622 while !current.is_null() {
623 if current == thread_data {
624 let next = (*current).next_in_queue.get();
625 link.set(next);
626 let mut was_last_thread = true;
627 if bucket.queue_tail.get() == current {
628 bucket.queue_tail.set(previous);
629 } else {
630 // Scan the rest of the queue to see if there are any other
631 // entries with the given key.
632 let mut scan = next;
633 while !scan.is_null() {
634 if (*scan).key.load(Ordering::Relaxed) == key {
635 was_last_thread = false;
636 break;
637 }
638 scan = (*scan).next_in_queue.get();
639 }
640 }
641
642 // Callback to indicate that we timed out, and whether we were the
643 // last thread on the queue.
644 timed_out(key, was_last_thread);
645 break;
646 } else {
647 link = &(*current).next_in_queue;
648 previous = current;
649 current = link.get();
650 }
651 }
652
653 // There should be no way for our thread to have been removed from the queue
654 // if we timed out.
655 debug_assert!(!current.is_null());
656
657 // Unlock the bucket, we are done
658 bucket.mutex.unlock();
659 ParkResult::TimedOut
660 }
661
662 /// Unparks one thread from the queue associated with the given key.
663 ///
664 /// The `callback` function is called while the queue is locked and before the
665 /// target thread is woken up. The `UnparkResult` argument to the function
666 /// indicates whether a thread was found in the queue and whether this was the
667 /// last thread in the queue. This value is also returned by `unpark_one`.
668 ///
669 /// The `callback` function should return an `UnparkToken` value which will be
670 /// passed to the thread that is unparked. If no thread is unparked then the
671 /// returned value is ignored.
672 ///
673 /// # Safety
674 ///
675 /// You should only call this function with an address that you control, since
676 /// you could otherwise interfere with the operation of other synchronization
677 /// primitives.
678 ///
679 /// The `callback` function is called while the queue is locked and must not
680 /// panic or call into any function in `parking_lot`.
681 #[inline]
unpark_one<C>(key: usize, callback: C) -> UnparkResult where C: FnOnce(UnparkResult) -> UnparkToken,682 pub unsafe fn unpark_one<C>(key: usize, callback: C) -> UnparkResult
683 where
684 C: FnOnce(UnparkResult) -> UnparkToken,
685 {
686 let mut c = Some(callback);
687 unpark_one_internal(key, &mut |result| c.take().unchecked_unwrap()(result))
688 }
689
690 // Non-generic version to reduce monomorphization cost
unpark_one_internal( key: usize, callback: &mut FnMut(UnparkResult) -> UnparkToken, ) -> UnparkResult691 unsafe fn unpark_one_internal(
692 key: usize,
693 callback: &mut FnMut(UnparkResult) -> UnparkToken,
694 ) -> UnparkResult {
695 // Lock the bucket for the given key
696 let bucket = lock_bucket(key);
697
698 // Find a thread with a matching key and remove it from the queue
699 let mut link = &bucket.queue_head;
700 let mut current = bucket.queue_head.get();
701 let mut previous = ptr::null();
702 let mut result = UnparkResult {
703 unparked_threads: 0,
704 have_more_threads: false,
705 be_fair: false,
706 };
707 while !current.is_null() {
708 if (*current).key.load(Ordering::Relaxed) == key {
709 // Remove the thread from the queue
710 let next = (*current).next_in_queue.get();
711 link.set(next);
712 if bucket.queue_tail.get() == current {
713 bucket.queue_tail.set(previous);
714 } else {
715 // Scan the rest of the queue to see if there are any other
716 // entries with the given key.
717 let mut scan = next;
718 while !scan.is_null() {
719 if (*scan).key.load(Ordering::Relaxed) == key {
720 result.have_more_threads = true;
721 break;
722 }
723 scan = (*scan).next_in_queue.get();
724 }
725 }
726
727 // Invoke the callback before waking up the thread
728 result.unparked_threads = 1;
729 result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
730 let token = callback(result);
731
732 // Set the token for the target thread
733 (*current).unpark_token.set(token);
734
735 // This is a bit tricky: we first lock the ThreadParker to prevent
736 // the thread from exiting and freeing its ThreadData if its wait
737 // times out. Then we unlock the queue since we don't want to keep
738 // the queue locked while we perform a system call. Finally we wake
739 // up the parked thread.
740 let handle = (*current).parker.unpark_lock();
741 bucket.mutex.unlock();
742 handle.unpark();
743
744 return result;
745 } else {
746 link = &(*current).next_in_queue;
747 previous = current;
748 current = link.get();
749 }
750 }
751
752 // No threads with a matching key were found in the bucket
753 callback(result);
754 bucket.mutex.unlock();
755 result
756 }
757
758 /// Unparks all threads in the queue associated with the given key.
759 ///
760 /// The given `UnparkToken` is passed to all unparked threads.
761 ///
762 /// This function returns the number of threads that were unparked.
763 ///
764 /// # Safety
765 ///
766 /// You should only call this function with an address that you control, since
767 /// you could otherwise interfere with the operation of other synchronization
768 /// primitives.
unpark_all(key: usize, unpark_token: UnparkToken) -> usize769 pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize {
770 // Lock the bucket for the given key
771 let bucket = lock_bucket(key);
772
773 // Remove all threads with the given key in the bucket
774 let mut link = &bucket.queue_head;
775 let mut current = bucket.queue_head.get();
776 let mut previous = ptr::null();
777 let mut threads = SmallVec::<[_; 8]>::new();
778 while !current.is_null() {
779 if (*current).key.load(Ordering::Relaxed) == key {
780 // Remove the thread from the queue
781 let next = (*current).next_in_queue.get();
782 link.set(next);
783 if bucket.queue_tail.get() == current {
784 bucket.queue_tail.set(previous);
785 }
786
787 // Set the token for the target thread
788 (*current).unpark_token.set(unpark_token);
789
790 // Don't wake up threads while holding the queue lock. See comment
791 // in unpark_one. For now just record which threads we need to wake
792 // up.
793 threads.push((*current).parker.unpark_lock());
794 current = next;
795 } else {
796 link = &(*current).next_in_queue;
797 previous = current;
798 current = link.get();
799 }
800 }
801
802 // Unlock the bucket
803 bucket.mutex.unlock();
804
805 // Now that we are outside the lock, wake up all the threads that we removed
806 // from the queue.
807 let num_threads = threads.len();
808 for handle in threads.into_iter() {
809 handle.unpark();
810 }
811
812 num_threads
813 }
814
815 /// Removes all threads from the queue associated with `key_from`, optionally
816 /// unparks the first one and requeues the rest onto the queue associated with
817 /// `key_to`.
818 ///
819 /// The `validate` function is called while both queues are locked and can abort
820 /// the operation by returning `RequeueOp::Abort`. It can also choose to
821 /// unpark the first thread in the source queue while moving the rest by
822 /// returning `RequeueOp::UnparkFirstRequeueRest`. Returning
823 /// `RequeueOp::RequeueAll` will move all threads to the destination queue.
824 ///
825 /// The `callback` function is also called while both queues are locked. It is
826 /// passed the `RequeueOp` returned by `validate` and an `UnparkResult`
827 /// indicating whether a thread was unparked and whether there are threads still
828 /// parked in the new queue. This `UnparkResult` value is also returned by
829 /// `unpark_requeue`.
830 ///
831 /// The `callback` function should return an `UnparkToken` value which will be
832 /// passed to the thread that is unparked. If no thread is unparked then the
833 /// returned value is ignored.
834 ///
835 /// # Safety
836 ///
837 /// You should only call this function with an address that you control, since
838 /// you could otherwise interfere with the operation of other synchronization
839 /// primitives.
840 ///
841 /// The `validate` and `callback` functions are called while the queue is locked
842 /// and must not panic or call into any function in `parking_lot`.
843 #[inline]
unpark_requeue<V, C>( key_from: usize, key_to: usize, validate: V, callback: C, ) -> UnparkResult where V: FnOnce() -> RequeueOp, C: FnOnce(RequeueOp, UnparkResult) -> UnparkToken,844 pub unsafe fn unpark_requeue<V, C>(
845 key_from: usize,
846 key_to: usize,
847 validate: V,
848 callback: C,
849 ) -> UnparkResult
850 where
851 V: FnOnce() -> RequeueOp,
852 C: FnOnce(RequeueOp, UnparkResult) -> UnparkToken,
853 {
854 let mut v = Some(validate);
855 let mut c = Some(callback);
856 unpark_requeue_internal(
857 key_from,
858 key_to,
859 &mut || v.take().unchecked_unwrap()(),
860 &mut |op, r| c.take().unchecked_unwrap()(op, r),
861 )
862 }
863
864 // Non-generic version to reduce monomorphization cost
unpark_requeue_internal( key_from: usize, key_to: usize, validate: &mut FnMut() -> RequeueOp, callback: &mut FnMut(RequeueOp, UnparkResult) -> UnparkToken, ) -> UnparkResult865 unsafe fn unpark_requeue_internal(
866 key_from: usize,
867 key_to: usize,
868 validate: &mut FnMut() -> RequeueOp,
869 callback: &mut FnMut(RequeueOp, UnparkResult) -> UnparkToken,
870 ) -> UnparkResult {
871 // Lock the two buckets for the given key
872 let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to);
873
874 // If the validation function fails, just return
875 let mut result = UnparkResult {
876 unparked_threads: 0,
877 have_more_threads: false,
878 be_fair: false,
879 };
880 let op = validate();
881 if op == RequeueOp::Abort {
882 unlock_bucket_pair(bucket_from, bucket_to);
883 return result;
884 }
885
886 // Remove all threads with the given key in the source bucket
887 let mut link = &bucket_from.queue_head;
888 let mut current = bucket_from.queue_head.get();
889 let mut previous = ptr::null();
890 let mut requeue_threads: *const ThreadData = ptr::null();
891 let mut requeue_threads_tail: *const ThreadData = ptr::null();
892 let mut wakeup_thread = None;
893 while !current.is_null() {
894 if (*current).key.load(Ordering::Relaxed) == key_from {
895 // Remove the thread from the queue
896 let next = (*current).next_in_queue.get();
897 link.set(next);
898 if bucket_from.queue_tail.get() == current {
899 bucket_from.queue_tail.set(previous);
900 }
901
902 // Prepare the first thread for wakeup and requeue the rest.
903 if op == RequeueOp::UnparkOneRequeueRest && wakeup_thread.is_none() {
904 wakeup_thread = Some(current);
905 result.unparked_threads = 1;
906 } else {
907 if !requeue_threads.is_null() {
908 (*requeue_threads_tail).next_in_queue.set(current);
909 } else {
910 requeue_threads = current;
911 }
912 requeue_threads_tail = current;
913 (*current).key.store(key_to, Ordering::Relaxed);
914 result.have_more_threads = true;
915 }
916 current = next;
917 } else {
918 link = &(*current).next_in_queue;
919 previous = current;
920 current = link.get();
921 }
922 }
923
924 // Add the requeued threads to the destination bucket
925 if !requeue_threads.is_null() {
926 (*requeue_threads_tail).next_in_queue.set(ptr::null());
927 if !bucket_to.queue_head.get().is_null() {
928 (*bucket_to.queue_tail.get())
929 .next_in_queue
930 .set(requeue_threads);
931 } else {
932 bucket_to.queue_head.set(requeue_threads);
933 }
934 bucket_to.queue_tail.set(requeue_threads_tail);
935 }
936
937 // Invoke the callback before waking up the thread
938 if result.unparked_threads != 0 {
939 result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout();
940 }
941 let token = callback(op, result);
942
943 // See comment in unpark_one for why we mess with the locking
944 if let Some(wakeup_thread) = wakeup_thread {
945 (*wakeup_thread).unpark_token.set(token);
946 let handle = (*wakeup_thread).parker.unpark_lock();
947 unlock_bucket_pair(bucket_from, bucket_to);
948 handle.unpark();
949 } else {
950 unlock_bucket_pair(bucket_from, bucket_to);
951 }
952
953 result
954 }
955
956 /// Unparks a number of threads from the front of the queue associated with
957 /// `key` depending on the results of a filter function which inspects the
958 /// `ParkToken` associated with each thread.
959 ///
960 /// The `filter` function is called for each thread in the queue or until
961 /// `FilterOp::Stop` is returned. This function is passed the `ParkToken`
962 /// associated with a particular thread, which is unparked if `FilterOp::Unpark`
963 /// is returned.
964 ///
965 /// The `callback` function is also called while both queues are locked. It is
966 /// passed an `UnparkResult` indicating the number of threads that were unparked
967 /// and whether there are still parked threads in the queue. This `UnparkResult`
968 /// value is also returned by `unpark_filter`.
969 ///
970 /// The `callback` function should return an `UnparkToken` value which will be
971 /// passed to all threads that are unparked. If no thread is unparked then the
972 /// returned value is ignored.
973 ///
974 /// # Safety
975 ///
976 /// You should only call this function with an address that you control, since
977 /// you could otherwise interfere with the operation of other synchronization
978 /// primitives.
979 ///
980 /// The `filter` and `callback` functions are called while the queue is locked
981 /// and must not panic or call into any function in `parking_lot`.
982 #[inline]
unpark_filter<F, C>(key: usize, mut filter: F, callback: C) -> UnparkResult where F: FnMut(ParkToken) -> FilterOp, C: FnOnce(UnparkResult) -> UnparkToken,983 pub unsafe fn unpark_filter<F, C>(key: usize, mut filter: F, callback: C) -> UnparkResult
984 where
985 F: FnMut(ParkToken) -> FilterOp,
986 C: FnOnce(UnparkResult) -> UnparkToken,
987 {
988 let mut c = Some(callback);
989 unpark_filter_internal(key, &mut filter, &mut |r| c.take().unchecked_unwrap()(r))
990 }
991
992 // Non-generic version to reduce monomorphization cost
unpark_filter_internal( key: usize, filter: &mut FnMut(ParkToken) -> FilterOp, callback: &mut FnMut(UnparkResult) -> UnparkToken, ) -> UnparkResult993 unsafe fn unpark_filter_internal(
994 key: usize,
995 filter: &mut FnMut(ParkToken) -> FilterOp,
996 callback: &mut FnMut(UnparkResult) -> UnparkToken,
997 ) -> UnparkResult {
998 // Lock the bucket for the given key
999 let bucket = lock_bucket(key);
1000
1001 // Go through the queue looking for threads with a matching key
1002 let mut link = &bucket.queue_head;
1003 let mut current = bucket.queue_head.get();
1004 let mut previous = ptr::null();
1005 let mut threads = SmallVec::<[_; 8]>::new();
1006 let mut result = UnparkResult {
1007 unparked_threads: 0,
1008 have_more_threads: false,
1009 be_fair: false,
1010 };
1011 while !current.is_null() {
1012 if (*current).key.load(Ordering::Relaxed) == key {
1013 // Call the filter function with the thread's ParkToken
1014 let next = (*current).next_in_queue.get();
1015 match filter((*current).park_token.get()) {
1016 FilterOp::Unpark => {
1017 // Remove the thread from the queue
1018 link.set(next);
1019 if bucket.queue_tail.get() == current {
1020 bucket.queue_tail.set(previous);
1021 }
1022
1023 // Add the thread to our list of threads to unpark
1024 threads.push((current, None));
1025
1026 current = next;
1027 }
1028 FilterOp::Skip => {
1029 result.have_more_threads = true;
1030 link = &(*current).next_in_queue;
1031 previous = current;
1032 current = link.get();
1033 }
1034 FilterOp::Stop => {
1035 result.have_more_threads = true;
1036 break;
1037 }
1038 }
1039 } else {
1040 link = &(*current).next_in_queue;
1041 previous = current;
1042 current = link.get();
1043 }
1044 }
1045
1046 // Invoke the callback before waking up the threads
1047 result.unparked_threads = threads.len();
1048 if result.unparked_threads != 0 {
1049 result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
1050 }
1051 let token = callback(result);
1052
1053 // Pass the token to all threads that are going to be unparked and prepare
1054 // them for unparking.
1055 for t in threads.iter_mut() {
1056 (*t.0).unpark_token.set(token);
1057 t.1 = Some((*t.0).parker.unpark_lock());
1058 }
1059
1060 bucket.mutex.unlock();
1061
1062 // Now that we are outside the lock, wake up all the threads that we removed
1063 // from the queue.
1064 for (_, handle) in threads.into_iter() {
1065 handle.unchecked_unwrap().unpark();
1066 }
1067
1068 result
1069 }
1070
1071 /// [Experimental] Deadlock detection
1072 ///
1073 /// Enabled via the `deadlock_detection` feature flag.
1074 pub mod deadlock {
1075 #[cfg(feature = "deadlock_detection")]
1076 use super::deadlock_impl;
1077
1078 #[cfg(feature = "deadlock_detection")]
1079 pub(super) use super::deadlock_impl::DeadlockData;
1080
1081 #[cfg(not(feature = "deadlock_detection"))]
1082 pub(super) struct DeadlockData {}
1083
1084 #[cfg(not(feature = "deadlock_detection"))]
1085 impl DeadlockData {
new() -> Self1086 pub(super) fn new() -> Self {
1087 DeadlockData {}
1088 }
1089 }
1090
1091 /// Acquire a resource identified by key in the deadlock detector
1092 /// Noop if deadlock_detection feature isn't enabled.
1093 /// Note: Call after the resource is acquired
1094 #[inline]
acquire_resource(_key: usize)1095 pub unsafe fn acquire_resource(_key: usize) {
1096 #[cfg(feature = "deadlock_detection")]
1097 deadlock_impl::acquire_resource(_key);
1098 }
1099
1100 /// Release a resource identified by key in the deadlock detector.
1101 /// Noop if deadlock_detection feature isn't enabled.
1102 /// Note: Call before the resource is released
1103 /// # Panics
1104 /// Panics if the resource was already released or wasn't acquired in this thread.
1105 #[inline]
release_resource(_key: usize)1106 pub unsafe fn release_resource(_key: usize) {
1107 #[cfg(feature = "deadlock_detection")]
1108 deadlock_impl::release_resource(_key);
1109 }
1110
1111 /// Returns all deadlocks detected *since* the last call.
1112 /// Each cycle consist of a vector of `DeadlockedThread`.
1113 #[cfg(feature = "deadlock_detection")]
1114 #[inline]
check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>>1115 pub fn check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>> {
1116 deadlock_impl::check_deadlock()
1117 }
1118
1119 #[inline]
on_unpark(_td: &super::ThreadData)1120 pub(super) unsafe fn on_unpark(_td: &super::ThreadData) {
1121 #[cfg(feature = "deadlock_detection")]
1122 deadlock_impl::on_unpark(_td);
1123 }
1124 }
1125
1126 #[cfg(feature = "deadlock_detection")]
1127 mod deadlock_impl {
1128 use super::{get_hashtable, get_thread_data, lock_bucket, ThreadData, NUM_THREADS};
1129 use std::cell::{Cell, UnsafeCell};
1130 use std::sync::mpsc;
1131 use std::sync::atomic::Ordering;
1132 use std::collections::HashSet;
1133 use thread_id;
1134 use backtrace::Backtrace;
1135 use petgraph;
1136 use petgraph::graphmap::DiGraphMap;
1137
1138 /// Representation of a deadlocked thread
1139 pub struct DeadlockedThread {
1140 thread_id: usize,
1141 backtrace: Backtrace,
1142 }
1143
1144 impl DeadlockedThread {
1145 /// The system thread id
thread_id(&self) -> usize1146 pub fn thread_id(&self) -> usize {
1147 self.thread_id
1148 }
1149
1150 /// The thread backtrace
backtrace(&self) -> &Backtrace1151 pub fn backtrace(&self) -> &Backtrace {
1152 &self.backtrace
1153 }
1154 }
1155
1156 pub struct DeadlockData {
1157 // Currently owned resources (keys)
1158 resources: UnsafeCell<Vec<usize>>,
1159
1160 // Set when there's a pending callstack request
1161 deadlocked: Cell<bool>,
1162
1163 // Sender used to report the backtrace
1164 backtrace_sender: UnsafeCell<Option<mpsc::Sender<DeadlockedThread>>>,
1165
1166 // System thread id
1167 thread_id: usize,
1168 }
1169
1170 impl DeadlockData {
new() -> Self1171 pub fn new() -> Self {
1172 DeadlockData {
1173 resources: UnsafeCell::new(Vec::new()),
1174 deadlocked: Cell::new(false),
1175 backtrace_sender: UnsafeCell::new(None),
1176 thread_id: thread_id::get(),
1177 }
1178 }
1179 }
1180
on_unpark(td: &ThreadData)1181 pub(super) unsafe fn on_unpark(td: &ThreadData) {
1182 if td.deadlock_data.deadlocked.get() {
1183 let sender = (*td.deadlock_data.backtrace_sender.get()).take().unwrap();
1184 sender
1185 .send(DeadlockedThread {
1186 thread_id: td.deadlock_data.thread_id,
1187 backtrace: Backtrace::new(),
1188 })
1189 .unwrap();
1190 // make sure to close this sender
1191 drop(sender);
1192
1193 // park until the end of the time
1194 td.parker.prepare_park();
1195 td.parker.park();
1196 unreachable!("unparked deadlocked thread!");
1197 }
1198 }
1199
acquire_resource(key: usize)1200 pub unsafe fn acquire_resource(key: usize) {
1201 let mut thread_data = None;
1202 let thread_data = get_thread_data(&mut thread_data);
1203 (*thread_data.deadlock_data.resources.get()).push(key);
1204 }
1205
release_resource(key: usize)1206 pub unsafe fn release_resource(key: usize) {
1207 let mut thread_data = None;
1208 let thread_data = get_thread_data(&mut thread_data);
1209 let resources = &mut (*thread_data.deadlock_data.resources.get());
1210 match resources.iter().rposition(|x| *x == key) {
1211 Some(p) => resources.swap_remove(p),
1212 None => panic!("key {} not found in thread resources", key),
1213 };
1214 }
1215
check_deadlock() -> Vec<Vec<DeadlockedThread>>1216 pub fn check_deadlock() -> Vec<Vec<DeadlockedThread>> {
1217 unsafe {
1218 // fast pass
1219 if check_wait_graph_fast() {
1220 // double check
1221 check_wait_graph_slow()
1222 } else {
1223 Vec::new()
1224 }
1225 }
1226 }
1227
1228 // Simple algorithm that builds a wait graph f the threads and the resources,
1229 // then checks for the presence of cycles (deadlocks).
1230 // This variant isn't precise as it doesn't lock the entire table before checking
check_wait_graph_fast() -> bool1231 unsafe fn check_wait_graph_fast() -> bool {
1232 let table = get_hashtable();
1233 let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1234 let mut graph = DiGraphMap::<usize, ()>::with_capacity(thread_count * 2, thread_count * 2);
1235
1236 for b in &(*table).entries[..] {
1237 b.mutex.lock();
1238 let mut current = b.queue_head.get();
1239 while !current.is_null() {
1240 if !(*current).parked_with_timeout.get()
1241 && !(*current).deadlock_data.deadlocked.get()
1242 {
1243 // .resources are waiting for their owner
1244 for &resource in &(*(*current).deadlock_data.resources.get()) {
1245 graph.add_edge(resource, current as usize, ());
1246 }
1247 // owner waits for resource .key
1248 graph.add_edge(current as usize, (*current).key.load(Ordering::Relaxed), ());
1249 }
1250 current = (*current).next_in_queue.get();
1251 }
1252 b.mutex.unlock();
1253 }
1254
1255 petgraph::algo::is_cyclic_directed(&graph)
1256 }
1257
1258 #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
1259 enum WaitGraphNode {
1260 Thread(*const ThreadData),
1261 Resource(usize),
1262 }
1263
1264 use self::WaitGraphNode::*;
1265
1266 // Contrary to the _fast variant this locks the entrie table before looking for cycles.
1267 // Returns all detected thread wait cycles.
1268 // Note that once a cycle is reported it's never reported again.
check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>>1269 unsafe fn check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>> {
1270 let mut table = get_hashtable();
1271 loop {
1272 // Lock all buckets in the old table
1273 for b in &(*table).entries[..] {
1274 b.mutex.lock();
1275 }
1276
1277 // Now check if our table is still the latest one. Another thread could
1278 // have grown the hash table between us getting and locking the hash table.
1279 let new_table = get_hashtable();
1280 if new_table == table {
1281 break;
1282 }
1283
1284 // Unlock buckets and try again
1285 for b in &(*table).entries[..] {
1286 b.mutex.unlock();
1287 }
1288
1289 table = new_table;
1290 }
1291
1292 let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1293 let mut graph =
1294 DiGraphMap::<WaitGraphNode, ()>::with_capacity(thread_count * 2, thread_count * 2);
1295
1296 for b in &(*table).entries[..] {
1297 let mut current = b.queue_head.get();
1298 while !current.is_null() {
1299 if !(*current).parked_with_timeout.get()
1300 && !(*current).deadlock_data.deadlocked.get()
1301 {
1302 // .resources are waiting for their owner
1303 for &resource in &(*(*current).deadlock_data.resources.get()) {
1304 graph.add_edge(Resource(resource), Thread(current), ());
1305 }
1306 // owner waits for resource .key
1307 graph.add_edge(
1308 Thread(current),
1309 Resource((*current).key.load(Ordering::Relaxed)),
1310 (),
1311 );
1312 }
1313 current = (*current).next_in_queue.get();
1314 }
1315 }
1316
1317 for b in &(*table).entries[..] {
1318 b.mutex.unlock();
1319 }
1320
1321 // find cycles
1322 let cycles = graph_cycles(&graph);
1323
1324 let mut results = Vec::with_capacity(cycles.len());
1325
1326 for cycle in cycles {
1327 let (sender, receiver) = mpsc::channel();
1328 for td in cycle {
1329 let bucket = lock_bucket((*td).key.load(Ordering::Relaxed));
1330 (*td).deadlock_data.deadlocked.set(true);
1331 *(*td).deadlock_data.backtrace_sender.get() = Some(sender.clone());
1332 let handle = (*td).parker.unpark_lock();
1333 bucket.mutex.unlock();
1334 // unpark the deadlocked thread!
1335 // on unpark it'll notice the deadlocked flag and report back
1336 handle.unpark();
1337 }
1338 // make sure to drop our sender before collecting results
1339 drop(sender);
1340 results.push(receiver.iter().collect());
1341 }
1342
1343 results
1344 }
1345
1346 // normalize a cycle to start with the "smallest" node
normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T>1347 fn normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T> {
1348 let min_pos = input
1349 .iter()
1350 .enumerate()
1351 .min_by_key(|&(_, &t)| t)
1352 .map(|(p, _)| p)
1353 .unwrap_or(0);
1354 input
1355 .iter()
1356 .cycle()
1357 .skip(min_pos)
1358 .take(input.len())
1359 .cloned()
1360 .collect()
1361 }
1362
1363 // returns all thread cycles in the wait graph
graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>>1364 fn graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>> {
1365 use petgraph::visit::NodeIndexable;
1366 use petgraph::visit::depth_first_search;
1367 use petgraph::visit::DfsEvent;
1368
1369 let mut cycles = HashSet::new();
1370 let mut path = Vec::with_capacity(g.node_bound());
1371 // start from threads to get the correct threads cycle
1372 let threads = g.nodes()
1373 .filter(|n| if let &Thread(_) = n { true } else { false });
1374
1375 depth_first_search(g, threads, |e| match e {
1376 DfsEvent::Discover(Thread(n), _) => path.push(n),
1377 DfsEvent::Finish(Thread(_), _) => {
1378 path.pop();
1379 }
1380 DfsEvent::BackEdge(_, Thread(n)) => {
1381 let from = path.iter().rposition(|&i| i == n).unwrap();
1382 cycles.insert(normalize_cycle(&path[from..]));
1383 }
1384 _ => (),
1385 });
1386
1387 cycles.iter().cloned().collect()
1388 }
1389 }
1390