1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7
8 use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT};
9 use crate::util::UncheckedOptionExt;
10 use crate::word_lock::WordLock;
11 use core::{
12 cell::{Cell, UnsafeCell},
13 ptr,
14 sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
15 };
16 use smallvec::SmallVec;
17 use std::time::{Duration, Instant};
18
19 static NUM_THREADS: AtomicUsize = AtomicUsize::new(0);
20 static HASHTABLE: AtomicPtr<HashTable> = AtomicPtr::new(ptr::null_mut());
21
22 // Even with 3x more buckets than threads, the memory overhead per thread is
23 // still only a few hundred bytes per thread.
24 const LOAD_FACTOR: usize = 3;
25
26 struct HashTable {
27 // Hash buckets for the table
28 entries: Box<[Bucket]>,
29
30 // Number of bits used for the hash function
31 hash_bits: u32,
32
33 // Previous table. This is only kept to keep leak detectors happy.
34 _prev: *const HashTable,
35 }
36
37 impl HashTable {
38 #[inline]
new(num_threads: usize, prev: *const HashTable) -> Box<HashTable>39 fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> {
40 let new_size = (num_threads * LOAD_FACTOR).next_power_of_two();
41 let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1;
42
43 let now = Instant::now();
44 let mut entries = Vec::with_capacity(new_size);
45 for i in 0..new_size {
46 // We must ensure the seed is not zero
47 entries.push(Bucket::new(now, i as u32 + 1));
48 }
49
50 Box::new(HashTable {
51 entries: entries.into_boxed_slice(),
52 hash_bits,
53 _prev: prev,
54 })
55 }
56 }
57
58 #[repr(align(64))]
59 struct Bucket {
60 // Lock protecting the queue
61 mutex: WordLock,
62
63 // Linked list of threads waiting on this bucket
64 queue_head: Cell<*const ThreadData>,
65 queue_tail: Cell<*const ThreadData>,
66
67 // Next time at which point be_fair should be set
68 fair_timeout: UnsafeCell<FairTimeout>,
69 }
70
71 impl Bucket {
72 #[inline]
new(timeout: Instant, seed: u32) -> Self73 pub fn new(timeout: Instant, seed: u32) -> Self {
74 Self {
75 mutex: WordLock::INIT,
76 queue_head: Cell::new(ptr::null()),
77 queue_tail: Cell::new(ptr::null()),
78 fair_timeout: UnsafeCell::new(FairTimeout::new(timeout, seed)),
79 }
80 }
81 }
82
83 struct FairTimeout {
84 // Next time at which point be_fair should be set
85 timeout: Instant,
86
87 // the PRNG state for calculating the next timeout
88 seed: u32,
89 }
90
91 impl FairTimeout {
92 #[inline]
new(timeout: Instant, seed: u32) -> FairTimeout93 fn new(timeout: Instant, seed: u32) -> FairTimeout {
94 FairTimeout { timeout, seed }
95 }
96
97 // Determine whether we should force a fair unlock, and update the timeout
98 #[inline]
should_timeout(&mut self) -> bool99 fn should_timeout(&mut self) -> bool {
100 let now = Instant::now();
101 if now > self.timeout {
102 // Time between 0 and 1ms.
103 let nanos = self.gen_u32() % 1_000_000;
104 self.timeout = now + Duration::new(0, nanos);
105 true
106 } else {
107 false
108 }
109 }
110
111 // Pseudorandom number generator from the "Xorshift RNGs" paper by George Marsaglia.
gen_u32(&mut self) -> u32112 fn gen_u32(&mut self) -> u32 {
113 self.seed ^= self.seed << 13;
114 self.seed ^= self.seed >> 17;
115 self.seed ^= self.seed << 5;
116 self.seed
117 }
118 }
119
120 struct ThreadData {
121 parker: ThreadParker,
122
123 // Key that this thread is sleeping on. This may change if the thread is
124 // requeued to a different key.
125 key: AtomicUsize,
126
127 // Linked list of parked threads in a bucket
128 next_in_queue: Cell<*const ThreadData>,
129
130 // UnparkToken passed to this thread when it is unparked
131 unpark_token: Cell<UnparkToken>,
132
133 // ParkToken value set by the thread when it was parked
134 park_token: Cell<ParkToken>,
135
136 // Is the thread parked with a timeout?
137 parked_with_timeout: Cell<bool>,
138
139 // Extra data for deadlock detection
140 #[cfg(feature = "deadlock_detection")]
141 deadlock_data: deadlock::DeadlockData,
142 }
143
144 impl ThreadData {
new() -> ThreadData145 fn new() -> ThreadData {
146 // Keep track of the total number of live ThreadData objects and resize
147 // the hash table accordingly.
148 let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1;
149 unsafe {
150 grow_hashtable(num_threads);
151 }
152
153 ThreadData {
154 parker: ThreadParker::new(),
155 key: AtomicUsize::new(0),
156 next_in_queue: Cell::new(ptr::null()),
157 unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN),
158 park_token: Cell::new(DEFAULT_PARK_TOKEN),
159 parked_with_timeout: Cell::new(false),
160 #[cfg(feature = "deadlock_detection")]
161 deadlock_data: deadlock::DeadlockData::new(),
162 }
163 }
164 }
165
166 // Invokes the given closure with a reference to the current thread `ThreadData`.
167 #[inline(always)]
with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T168 fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T {
169 // Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive
170 // to construct. Try to use a thread-local version if possible. Otherwise just
171 // create a ThreadData on the stack
172 let mut thread_data_storage = None;
173 thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
174 let thread_data_ptr = THREAD_DATA
175 .try_with(|x| x as *const ThreadData)
176 .unwrap_or_else(|_| thread_data_storage.get_or_insert_with(ThreadData::new));
177
178 f(unsafe { &*thread_data_ptr })
179 }
180
181 impl Drop for ThreadData {
drop(&mut self)182 fn drop(&mut self) {
183 NUM_THREADS.fetch_sub(1, Ordering::Relaxed);
184 }
185 }
186
187 // Get a pointer to the latest hash table, creating one if it doesn't exist yet.
188 #[inline]
get_hashtable() -> *mut HashTable189 fn get_hashtable() -> *mut HashTable {
190 let table = HASHTABLE.load(Ordering::Acquire);
191
192 // If there is no table, create one
193 if table.is_null() {
194 create_hashtable()
195 } else {
196 table
197 }
198 }
199
200 // Get a pointer to the latest hash table, creating one if it doesn't exist yet.
201 #[cold]
create_hashtable() -> *mut HashTable202 fn create_hashtable() -> *mut HashTable {
203 let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null()));
204
205 // If this fails then it means some other thread created the hash
206 // table first.
207 match HASHTABLE.compare_exchange(
208 ptr::null_mut(),
209 new_table,
210 Ordering::Release,
211 Ordering::Relaxed,
212 ) {
213 Ok(_) => new_table,
214 Err(old_table) => {
215 // Free the table we created
216 unsafe {
217 Box::from_raw(new_table);
218 }
219 old_table
220 }
221 }
222 }
223
224 // Grow the hash table so that it is big enough for the given number of threads.
225 // This isn't performance-critical since it is only done when a ThreadData is
226 // created, which only happens once per thread.
grow_hashtable(num_threads: usize)227 unsafe fn grow_hashtable(num_threads: usize) {
228 // If there is no table, create one
229 if HASHTABLE.load(Ordering::Relaxed).is_null() {
230 let new_table = Box::into_raw(HashTable::new(num_threads, ptr::null()));
231
232 // If this fails then it means some other thread created the hash
233 // table first.
234 if HASHTABLE
235 .compare_exchange(
236 ptr::null_mut(),
237 new_table,
238 Ordering::Release,
239 Ordering::Relaxed,
240 )
241 .is_ok()
242 {
243 return;
244 }
245
246 // Free the table we created
247 Box::from_raw(new_table);
248 }
249
250 let mut old_table;
251 loop {
252 old_table = HASHTABLE.load(Ordering::Acquire);
253
254 // Check if we need to resize the existing table
255 if (*old_table).entries.len() >= LOAD_FACTOR * num_threads {
256 return;
257 }
258
259 // Lock all buckets in the old table
260 for b in &(*old_table).entries[..] {
261 b.mutex.lock();
262 }
263
264 // Now check if our table is still the latest one. Another thread could
265 // have grown the hash table between us reading HASHTABLE and locking
266 // the buckets.
267 if HASHTABLE.load(Ordering::Relaxed) == old_table {
268 break;
269 }
270
271 // Unlock buckets and try again
272 for b in &(*old_table).entries[..] {
273 b.mutex.unlock();
274 }
275 }
276
277 // Create the new table
278 let new_table = HashTable::new(num_threads, old_table);
279
280 // Move the entries from the old table to the new one
281 for b in &(*old_table).entries[..] {
282 let mut current = b.queue_head.get();
283 while !current.is_null() {
284 let next = (*current).next_in_queue.get();
285 let hash = hash((*current).key.load(Ordering::Relaxed), new_table.hash_bits);
286 if new_table.entries[hash].queue_tail.get().is_null() {
287 new_table.entries[hash].queue_head.set(current);
288 } else {
289 (*new_table.entries[hash].queue_tail.get())
290 .next_in_queue
291 .set(current);
292 }
293 new_table.entries[hash].queue_tail.set(current);
294 (*current).next_in_queue.set(ptr::null());
295 current = next;
296 }
297 }
298
299 // Publish the new table. No races are possible at this point because
300 // any other thread trying to grow the hash table is blocked on the bucket
301 // locks in the old table.
302 HASHTABLE.store(Box::into_raw(new_table), Ordering::Release);
303
304 // Unlock all buckets in the old table
305 for b in &(*old_table).entries[..] {
306 b.mutex.unlock();
307 }
308 }
309
310 // Hash function for addresses
311 #[cfg(target_pointer_width = "32")]
312 #[inline]
hash(key: usize, bits: u32) -> usize313 fn hash(key: usize, bits: u32) -> usize {
314 key.wrapping_mul(0x9E3779B9) >> (32 - bits)
315 }
316 #[cfg(target_pointer_width = "64")]
317 #[inline]
hash(key: usize, bits: u32) -> usize318 fn hash(key: usize, bits: u32) -> usize {
319 key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits)
320 }
321
322 // Lock the bucket for the given key
323 #[inline]
lock_bucket<'a>(key: usize) -> &'a Bucket324 unsafe fn lock_bucket<'a>(key: usize) -> &'a Bucket {
325 let mut bucket;
326 loop {
327 let hashtable = get_hashtable();
328
329 let hash = hash(key, (*hashtable).hash_bits);
330 bucket = &(*hashtable).entries[hash];
331
332 // Lock the bucket
333 bucket.mutex.lock();
334
335 // If no other thread has rehashed the table before we grabbed the lock
336 // then we are good to go! The lock we grabbed prevents any rehashes.
337 if HASHTABLE.load(Ordering::Relaxed) == hashtable {
338 return bucket;
339 }
340
341 // Unlock the bucket and try again
342 bucket.mutex.unlock();
343 }
344 }
345
346 // Lock the bucket for the given key, but check that the key hasn't been changed
347 // in the meantime due to a requeue.
348 #[inline]
lock_bucket_checked<'a>(key: &AtomicUsize) -> (usize, &'a Bucket)349 unsafe fn lock_bucket_checked<'a>(key: &AtomicUsize) -> (usize, &'a Bucket) {
350 let mut bucket;
351 loop {
352 let hashtable = get_hashtable();
353 let current_key = key.load(Ordering::Relaxed);
354
355 let hash = hash(current_key, (*hashtable).hash_bits);
356 bucket = &(*hashtable).entries[hash];
357
358 // Lock the bucket
359 bucket.mutex.lock();
360
361 // Check that both the hash table and key are correct while the bucket
362 // is locked. Note that the key can't change once we locked the proper
363 // bucket for it, so we just keep trying until we have the correct key.
364 if HASHTABLE.load(Ordering::Relaxed) == hashtable
365 && key.load(Ordering::Relaxed) == current_key
366 {
367 return (current_key, bucket);
368 }
369
370 // Unlock the bucket and try again
371 bucket.mutex.unlock();
372 }
373 }
374
375 // Lock the two buckets for the given pair of keys
376 #[inline]
lock_bucket_pair<'a>(key1: usize, key2: usize) -> (&'a Bucket, &'a Bucket)377 unsafe fn lock_bucket_pair<'a>(key1: usize, key2: usize) -> (&'a Bucket, &'a Bucket) {
378 let mut bucket1;
379 loop {
380 let hashtable = get_hashtable();
381
382 // Get the lowest bucket first
383 let hash1 = hash(key1, (*hashtable).hash_bits);
384 let hash2 = hash(key2, (*hashtable).hash_bits);
385 if hash1 <= hash2 {
386 bucket1 = &(*hashtable).entries[hash1];
387 } else {
388 bucket1 = &(*hashtable).entries[hash2];
389 }
390
391 // Lock the first bucket
392 bucket1.mutex.lock();
393
394 // If no other thread has rehashed the table before we grabbed the lock
395 // then we are good to go! The lock we grabbed prevents any rehashes.
396 if HASHTABLE.load(Ordering::Relaxed) == hashtable {
397 // Now lock the second bucket and return the two buckets
398 if hash1 == hash2 {
399 return (bucket1, bucket1);
400 } else if hash1 < hash2 {
401 let bucket2 = &(*hashtable).entries[hash2];
402 bucket2.mutex.lock();
403 return (bucket1, bucket2);
404 } else {
405 let bucket2 = &(*hashtable).entries[hash1];
406 bucket2.mutex.lock();
407 return (bucket2, bucket1);
408 }
409 }
410
411 // Unlock the bucket and try again
412 bucket1.mutex.unlock();
413 }
414 }
415
416 // Unlock a pair of buckets
417 #[inline]
unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket)418 unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) {
419 bucket1.mutex.unlock();
420 if !ptr::eq(bucket1, bucket2) {
421 bucket2.mutex.unlock();
422 }
423 }
424
425 /// Result of a park operation.
426 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
427 pub enum ParkResult {
428 /// We were unparked by another thread with the given token.
429 Unparked(UnparkToken),
430
431 /// The validation callback returned false.
432 Invalid,
433
434 /// The timeout expired.
435 TimedOut,
436 }
437
438 impl ParkResult {
439 /// Returns true if we were unparked by another thread.
440 #[inline]
is_unparked(self) -> bool441 pub fn is_unparked(self) -> bool {
442 if let ParkResult::Unparked(_) = self {
443 true
444 } else {
445 false
446 }
447 }
448 }
449
450 /// Result of an unpark operation.
451 #[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
452 pub struct UnparkResult {
453 /// The number of threads that were unparked.
454 pub unparked_threads: usize,
455
456 /// The number of threads that were requeued.
457 pub requeued_threads: usize,
458
459 /// Whether there are any threads remaining in the queue. This only returns
460 /// true if a thread was unparked.
461 pub have_more_threads: bool,
462
463 /// This is set to true on average once every 0.5ms for any given key. It
464 /// should be used to switch to a fair unlocking mechanism for a particular
465 /// unlock.
466 pub be_fair: bool,
467
468 /// Private field so new fields can be added without breakage.
469 _sealed: (),
470 }
471
472 /// Operation that `unpark_requeue` should perform.
473 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
474 pub enum RequeueOp {
475 /// Abort the operation without doing anything.
476 Abort,
477
478 /// Unpark one thread and requeue the rest onto the target queue.
479 UnparkOneRequeueRest,
480
481 /// Requeue all threads onto the target queue.
482 RequeueAll,
483
484 /// Unpark one thread and leave the rest parked. No requeuing is done.
485 UnparkOne,
486
487 /// Requeue one thread and leave the rest parked on the original queue.
488 RequeueOne,
489 }
490
491 /// Operation that `unpark_filter` should perform for each thread.
492 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
493 pub enum FilterOp {
494 /// Unpark the thread and continue scanning the list of parked threads.
495 Unpark,
496
497 /// Don't unpark the thread and continue scanning the list of parked threads.
498 Skip,
499
500 /// Don't unpark the thread and stop scanning the list of parked threads.
501 Stop,
502 }
503
504 /// A value which is passed from an unparker to a parked thread.
505 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
506 pub struct UnparkToken(pub usize);
507
508 /// A value associated with a parked thread which can be used by `unpark_filter`.
509 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
510 pub struct ParkToken(pub usize);
511
512 /// A default unpark token to use.
513 pub const DEFAULT_UNPARK_TOKEN: UnparkToken = UnparkToken(0);
514
515 /// A default park token to use.
516 pub const DEFAULT_PARK_TOKEN: ParkToken = ParkToken(0);
517
518 /// Parks the current thread in the queue associated with the given key.
519 ///
520 /// The `validate` function is called while the queue is locked and can abort
521 /// the operation by returning false. If `validate` returns true then the
522 /// current thread is appended to the queue and the queue is unlocked.
523 ///
524 /// The `before_sleep` function is called after the queue is unlocked but before
525 /// the thread is put to sleep. The thread will then sleep until it is unparked
526 /// or the given timeout is reached.
527 ///
528 /// The `timed_out` function is also called while the queue is locked, but only
529 /// if the timeout was reached. It is passed the key of the queue it was in when
530 /// it timed out, which may be different from the original key if
531 /// `unpark_requeue` was called. It is also passed a bool which indicates
532 /// whether it was the last thread in the queue.
533 ///
534 /// # Safety
535 ///
536 /// You should only call this function with an address that you control, since
537 /// you could otherwise interfere with the operation of other synchronization
538 /// primitives.
539 ///
540 /// The `validate` and `timed_out` functions are called while the queue is
541 /// locked and must not panic or call into any function in `parking_lot`.
542 ///
543 /// The `before_sleep` function is called outside the queue lock and is allowed
544 /// to call `unpark_one`, `unpark_all`, `unpark_requeue` or `unpark_filter`, but
545 /// it is not allowed to call `park` or panic.
546 #[inline]
park( key: usize, validate: impl FnOnce() -> bool, before_sleep: impl FnOnce(), timed_out: impl FnOnce(usize, bool), park_token: ParkToken, timeout: Option<Instant>, ) -> ParkResult547 pub unsafe fn park(
548 key: usize,
549 validate: impl FnOnce() -> bool,
550 before_sleep: impl FnOnce(),
551 timed_out: impl FnOnce(usize, bool),
552 park_token: ParkToken,
553 timeout: Option<Instant>,
554 ) -> ParkResult {
555 // Grab our thread data, this also ensures that the hash table exists
556 with_thread_data(|thread_data| {
557 // Lock the bucket for the given key
558 let bucket = lock_bucket(key);
559
560 // If the validation function fails, just return
561 if !validate() {
562 bucket.mutex.unlock();
563 return ParkResult::Invalid;
564 }
565
566 // Append our thread data to the queue and unlock the bucket
567 thread_data.parked_with_timeout.set(timeout.is_some());
568 thread_data.next_in_queue.set(ptr::null());
569 thread_data.key.store(key, Ordering::Relaxed);
570 thread_data.park_token.set(park_token);
571 thread_data.parker.prepare_park();
572 if !bucket.queue_head.get().is_null() {
573 (*bucket.queue_tail.get()).next_in_queue.set(thread_data);
574 } else {
575 bucket.queue_head.set(thread_data);
576 }
577 bucket.queue_tail.set(thread_data);
578 bucket.mutex.unlock();
579
580 // Invoke the pre-sleep callback
581 before_sleep();
582
583 // Park our thread and determine whether we were woken up by an unpark
584 // or by our timeout. Note that this isn't precise: we can still be
585 // unparked since we are still in the queue.
586 let unparked = match timeout {
587 Some(timeout) => thread_data.parker.park_until(timeout),
588 None => {
589 thread_data.parker.park();
590 // call deadlock detection on_unpark hook
591 deadlock::on_unpark(thread_data);
592 true
593 }
594 };
595
596 // If we were unparked, return now
597 if unparked {
598 return ParkResult::Unparked(thread_data.unpark_token.get());
599 }
600
601 // Lock our bucket again. Note that the hashtable may have been rehashed in
602 // the meantime. Our key may also have changed if we were requeued.
603 let (key, bucket) = lock_bucket_checked(&thread_data.key);
604
605 // Now we need to check again if we were unparked or timed out. Unlike the
606 // last check this is precise because we hold the bucket lock.
607 if !thread_data.parker.timed_out() {
608 bucket.mutex.unlock();
609 return ParkResult::Unparked(thread_data.unpark_token.get());
610 }
611
612 // We timed out, so we now need to remove our thread from the queue
613 let mut link = &bucket.queue_head;
614 let mut current = bucket.queue_head.get();
615 let mut previous = ptr::null();
616 let mut was_last_thread = true;
617 while !current.is_null() {
618 if current == thread_data {
619 let next = (*current).next_in_queue.get();
620 link.set(next);
621 if bucket.queue_tail.get() == current {
622 bucket.queue_tail.set(previous);
623 } else {
624 // Scan the rest of the queue to see if there are any other
625 // entries with the given key.
626 let mut scan = next;
627 while !scan.is_null() {
628 if (*scan).key.load(Ordering::Relaxed) == key {
629 was_last_thread = false;
630 break;
631 }
632 scan = (*scan).next_in_queue.get();
633 }
634 }
635
636 // Callback to indicate that we timed out, and whether we were the
637 // last thread on the queue.
638 timed_out(key, was_last_thread);
639 break;
640 } else {
641 if (*current).key.load(Ordering::Relaxed) == key {
642 was_last_thread = false;
643 }
644 link = &(*current).next_in_queue;
645 previous = current;
646 current = link.get();
647 }
648 }
649
650 // There should be no way for our thread to have been removed from the queue
651 // if we timed out.
652 debug_assert!(!current.is_null());
653
654 // Unlock the bucket, we are done
655 bucket.mutex.unlock();
656 ParkResult::TimedOut
657 })
658 }
659
660 /// Unparks one thread from the queue associated with the given key.
661 ///
662 /// The `callback` function is called while the queue is locked and before the
663 /// target thread is woken up. The `UnparkResult` argument to the function
664 /// indicates whether a thread was found in the queue and whether this was the
665 /// last thread in the queue. This value is also returned by `unpark_one`.
666 ///
667 /// The `callback` function should return an `UnparkToken` value which will be
668 /// passed to the thread that is unparked. If no thread is unparked then the
669 /// returned value is ignored.
670 ///
671 /// # Safety
672 ///
673 /// You should only call this function with an address that you control, since
674 /// you could otherwise interfere with the operation of other synchronization
675 /// primitives.
676 ///
677 /// The `callback` function is called while the queue is locked and must not
678 /// panic or call into any function in `parking_lot`.
679 #[inline]
unpark_one( key: usize, callback: impl FnOnce(UnparkResult) -> UnparkToken, ) -> UnparkResult680 pub unsafe fn unpark_one(
681 key: usize,
682 callback: impl FnOnce(UnparkResult) -> UnparkToken,
683 ) -> UnparkResult {
684 // Lock the bucket for the given key
685 let bucket = lock_bucket(key);
686
687 // Find a thread with a matching key and remove it from the queue
688 let mut link = &bucket.queue_head;
689 let mut current = bucket.queue_head.get();
690 let mut previous = ptr::null();
691 let mut result = UnparkResult::default();
692 while !current.is_null() {
693 if (*current).key.load(Ordering::Relaxed) == key {
694 // Remove the thread from the queue
695 let next = (*current).next_in_queue.get();
696 link.set(next);
697 if bucket.queue_tail.get() == current {
698 bucket.queue_tail.set(previous);
699 } else {
700 // Scan the rest of the queue to see if there are any other
701 // entries with the given key.
702 let mut scan = next;
703 while !scan.is_null() {
704 if (*scan).key.load(Ordering::Relaxed) == key {
705 result.have_more_threads = true;
706 break;
707 }
708 scan = (*scan).next_in_queue.get();
709 }
710 }
711
712 // Invoke the callback before waking up the thread
713 result.unparked_threads = 1;
714 result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
715 let token = callback(result);
716
717 // Set the token for the target thread
718 (*current).unpark_token.set(token);
719
720 // This is a bit tricky: we first lock the ThreadParker to prevent
721 // the thread from exiting and freeing its ThreadData if its wait
722 // times out. Then we unlock the queue since we don't want to keep
723 // the queue locked while we perform a system call. Finally we wake
724 // up the parked thread.
725 let handle = (*current).parker.unpark_lock();
726 bucket.mutex.unlock();
727 handle.unpark();
728
729 return result;
730 } else {
731 link = &(*current).next_in_queue;
732 previous = current;
733 current = link.get();
734 }
735 }
736
737 // No threads with a matching key were found in the bucket
738 callback(result);
739 bucket.mutex.unlock();
740 result
741 }
742
743 /// Unparks all threads in the queue associated with the given key.
744 ///
745 /// The given `UnparkToken` is passed to all unparked threads.
746 ///
747 /// This function returns the number of threads that were unparked.
748 ///
749 /// # Safety
750 ///
751 /// You should only call this function with an address that you control, since
752 /// you could otherwise interfere with the operation of other synchronization
753 /// primitives.
754 #[inline]
unpark_all(key: usize, unpark_token: UnparkToken) -> usize755 pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize {
756 // Lock the bucket for the given key
757 let bucket = lock_bucket(key);
758
759 // Remove all threads with the given key in the bucket
760 let mut link = &bucket.queue_head;
761 let mut current = bucket.queue_head.get();
762 let mut previous = ptr::null();
763 let mut threads = SmallVec::<[_; 8]>::new();
764 while !current.is_null() {
765 if (*current).key.load(Ordering::Relaxed) == key {
766 // Remove the thread from the queue
767 let next = (*current).next_in_queue.get();
768 link.set(next);
769 if bucket.queue_tail.get() == current {
770 bucket.queue_tail.set(previous);
771 }
772
773 // Set the token for the target thread
774 (*current).unpark_token.set(unpark_token);
775
776 // Don't wake up threads while holding the queue lock. See comment
777 // in unpark_one. For now just record which threads we need to wake
778 // up.
779 threads.push((*current).parker.unpark_lock());
780 current = next;
781 } else {
782 link = &(*current).next_in_queue;
783 previous = current;
784 current = link.get();
785 }
786 }
787
788 // Unlock the bucket
789 bucket.mutex.unlock();
790
791 // Now that we are outside the lock, wake up all the threads that we removed
792 // from the queue.
793 let num_threads = threads.len();
794 for handle in threads.into_iter() {
795 handle.unpark();
796 }
797
798 num_threads
799 }
800
801 /// Removes all threads from the queue associated with `key_from`, optionally
802 /// unparks the first one and requeues the rest onto the queue associated with
803 /// `key_to`.
804 ///
805 /// The `validate` function is called while both queues are locked. Its return
806 /// value will determine which operation is performed, or whether the operation
807 /// should be aborted. See `RequeueOp` for details about the different possible
808 /// return values.
809 ///
810 /// The `callback` function is also called while both queues are locked. It is
811 /// passed the `RequeueOp` returned by `validate` and an `UnparkResult`
812 /// indicating whether a thread was unparked and whether there are threads still
813 /// parked in the new queue. This `UnparkResult` value is also returned by
814 /// `unpark_requeue`.
815 ///
816 /// The `callback` function should return an `UnparkToken` value which will be
817 /// passed to the thread that is unparked. If no thread is unparked then the
818 /// returned value is ignored.
819 ///
820 /// # Safety
821 ///
822 /// You should only call this function with an address that you control, since
823 /// you could otherwise interfere with the operation of other synchronization
824 /// primitives.
825 ///
826 /// The `validate` and `callback` functions are called while the queue is locked
827 /// and must not panic or call into any function in `parking_lot`.
828 #[inline]
unpark_requeue( key_from: usize, key_to: usize, validate: impl FnOnce() -> RequeueOp, callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken, ) -> UnparkResult829 pub unsafe fn unpark_requeue(
830 key_from: usize,
831 key_to: usize,
832 validate: impl FnOnce() -> RequeueOp,
833 callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken,
834 ) -> UnparkResult {
835 // Lock the two buckets for the given key
836 let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to);
837
838 // If the validation function fails, just return
839 let mut result = UnparkResult::default();
840 let op = validate();
841 if op == RequeueOp::Abort {
842 unlock_bucket_pair(bucket_from, bucket_to);
843 return result;
844 }
845
846 // Remove all threads with the given key in the source bucket
847 let mut link = &bucket_from.queue_head;
848 let mut current = bucket_from.queue_head.get();
849 let mut previous = ptr::null();
850 let mut requeue_threads: *const ThreadData = ptr::null();
851 let mut requeue_threads_tail: *const ThreadData = ptr::null();
852 let mut wakeup_thread = None;
853 while !current.is_null() {
854 if (*current).key.load(Ordering::Relaxed) == key_from {
855 // Remove the thread from the queue
856 let next = (*current).next_in_queue.get();
857 link.set(next);
858 if bucket_from.queue_tail.get() == current {
859 bucket_from.queue_tail.set(previous);
860 }
861
862 // Prepare the first thread for wakeup and requeue the rest.
863 if (op == RequeueOp::UnparkOneRequeueRest || op == RequeueOp::UnparkOne)
864 && wakeup_thread.is_none()
865 {
866 wakeup_thread = Some(current);
867 result.unparked_threads = 1;
868 } else {
869 if !requeue_threads.is_null() {
870 (*requeue_threads_tail).next_in_queue.set(current);
871 } else {
872 requeue_threads = current;
873 }
874 requeue_threads_tail = current;
875 (*current).key.store(key_to, Ordering::Relaxed);
876 result.requeued_threads += 1;
877 }
878 if op == RequeueOp::UnparkOne || op == RequeueOp::RequeueOne {
879 // Scan the rest of the queue to see if there are any other
880 // entries with the given key.
881 let mut scan = next;
882 while !scan.is_null() {
883 if (*scan).key.load(Ordering::Relaxed) == key_from {
884 result.have_more_threads = true;
885 break;
886 }
887 scan = (*scan).next_in_queue.get();
888 }
889 break;
890 }
891 current = next;
892 } else {
893 link = &(*current).next_in_queue;
894 previous = current;
895 current = link.get();
896 }
897 }
898
899 // Add the requeued threads to the destination bucket
900 if !requeue_threads.is_null() {
901 (*requeue_threads_tail).next_in_queue.set(ptr::null());
902 if !bucket_to.queue_head.get().is_null() {
903 (*bucket_to.queue_tail.get())
904 .next_in_queue
905 .set(requeue_threads);
906 } else {
907 bucket_to.queue_head.set(requeue_threads);
908 }
909 bucket_to.queue_tail.set(requeue_threads_tail);
910 }
911
912 // Invoke the callback before waking up the thread
913 if result.unparked_threads != 0 {
914 result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout();
915 }
916 let token = callback(op, result);
917
918 // See comment in unpark_one for why we mess with the locking
919 if let Some(wakeup_thread) = wakeup_thread {
920 (*wakeup_thread).unpark_token.set(token);
921 let handle = (*wakeup_thread).parker.unpark_lock();
922 unlock_bucket_pair(bucket_from, bucket_to);
923 handle.unpark();
924 } else {
925 unlock_bucket_pair(bucket_from, bucket_to);
926 }
927
928 result
929 }
930
931 /// Unparks a number of threads from the front of the queue associated with
932 /// `key` depending on the results of a filter function which inspects the
933 /// `ParkToken` associated with each thread.
934 ///
935 /// The `filter` function is called for each thread in the queue or until
936 /// `FilterOp::Stop` is returned. This function is passed the `ParkToken`
937 /// associated with a particular thread, which is unparked if `FilterOp::Unpark`
938 /// is returned.
939 ///
940 /// The `callback` function is also called while both queues are locked. It is
941 /// passed an `UnparkResult` indicating the number of threads that were unparked
942 /// and whether there are still parked threads in the queue. This `UnparkResult`
943 /// value is also returned by `unpark_filter`.
944 ///
945 /// The `callback` function should return an `UnparkToken` value which will be
946 /// passed to all threads that are unparked. If no thread is unparked then the
947 /// returned value is ignored.
948 ///
949 /// # Safety
950 ///
951 /// You should only call this function with an address that you control, since
952 /// you could otherwise interfere with the operation of other synchronization
953 /// primitives.
954 ///
955 /// The `filter` and `callback` functions are called while the queue is locked
956 /// and must not panic or call into any function in `parking_lot`.
957 #[inline]
unpark_filter( key: usize, mut filter: impl FnMut(ParkToken) -> FilterOp, callback: impl FnOnce(UnparkResult) -> UnparkToken, ) -> UnparkResult958 pub unsafe fn unpark_filter(
959 key: usize,
960 mut filter: impl FnMut(ParkToken) -> FilterOp,
961 callback: impl FnOnce(UnparkResult) -> UnparkToken,
962 ) -> UnparkResult {
963 // Lock the bucket for the given key
964 let bucket = lock_bucket(key);
965
966 // Go through the queue looking for threads with a matching key
967 let mut link = &bucket.queue_head;
968 let mut current = bucket.queue_head.get();
969 let mut previous = ptr::null();
970 let mut threads = SmallVec::<[_; 8]>::new();
971 let mut result = UnparkResult::default();
972 while !current.is_null() {
973 if (*current).key.load(Ordering::Relaxed) == key {
974 // Call the filter function with the thread's ParkToken
975 let next = (*current).next_in_queue.get();
976 match filter((*current).park_token.get()) {
977 FilterOp::Unpark => {
978 // Remove the thread from the queue
979 link.set(next);
980 if bucket.queue_tail.get() == current {
981 bucket.queue_tail.set(previous);
982 }
983
984 // Add the thread to our list of threads to unpark
985 threads.push((current, None));
986
987 current = next;
988 }
989 FilterOp::Skip => {
990 result.have_more_threads = true;
991 link = &(*current).next_in_queue;
992 previous = current;
993 current = link.get();
994 }
995 FilterOp::Stop => {
996 result.have_more_threads = true;
997 break;
998 }
999 }
1000 } else {
1001 link = &(*current).next_in_queue;
1002 previous = current;
1003 current = link.get();
1004 }
1005 }
1006
1007 // Invoke the callback before waking up the threads
1008 result.unparked_threads = threads.len();
1009 if result.unparked_threads != 0 {
1010 result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
1011 }
1012 let token = callback(result);
1013
1014 // Pass the token to all threads that are going to be unparked and prepare
1015 // them for unparking.
1016 for t in threads.iter_mut() {
1017 (*t.0).unpark_token.set(token);
1018 t.1 = Some((*t.0).parker.unpark_lock());
1019 }
1020
1021 bucket.mutex.unlock();
1022
1023 // Now that we are outside the lock, wake up all the threads that we removed
1024 // from the queue.
1025 for (_, handle) in threads.into_iter() {
1026 handle.unchecked_unwrap().unpark();
1027 }
1028
1029 result
1030 }
1031
1032 /// \[Experimental\] Deadlock detection
1033 ///
1034 /// Enabled via the `deadlock_detection` feature flag.
1035 pub mod deadlock {
1036 #[cfg(feature = "deadlock_detection")]
1037 use super::deadlock_impl;
1038
1039 #[cfg(feature = "deadlock_detection")]
1040 pub(super) use super::deadlock_impl::DeadlockData;
1041
1042 /// Acquire a resource identified by key in the deadlock detector
1043 /// Noop if deadlock_detection feature isn't enabled.
1044 /// Note: Call after the resource is acquired
1045 #[inline]
acquire_resource(_key: usize)1046 pub unsafe fn acquire_resource(_key: usize) {
1047 #[cfg(feature = "deadlock_detection")]
1048 deadlock_impl::acquire_resource(_key);
1049 }
1050
1051 /// Release a resource identified by key in the deadlock detector.
1052 /// Noop if deadlock_detection feature isn't enabled.
1053 /// Note: Call before the resource is released
1054 /// # Panics
1055 /// Panics if the resource was already released or wasn't acquired in this thread.
1056 #[inline]
release_resource(_key: usize)1057 pub unsafe fn release_resource(_key: usize) {
1058 #[cfg(feature = "deadlock_detection")]
1059 deadlock_impl::release_resource(_key);
1060 }
1061
1062 /// Returns all deadlocks detected *since* the last call.
1063 /// Each cycle consist of a vector of `DeadlockedThread`.
1064 #[cfg(feature = "deadlock_detection")]
1065 #[inline]
check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>>1066 pub fn check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>> {
1067 deadlock_impl::check_deadlock()
1068 }
1069
1070 #[inline]
on_unpark(_td: &super::ThreadData)1071 pub(super) unsafe fn on_unpark(_td: &super::ThreadData) {
1072 #[cfg(feature = "deadlock_detection")]
1073 deadlock_impl::on_unpark(_td);
1074 }
1075 }
1076
1077 #[cfg(feature = "deadlock_detection")]
1078 mod deadlock_impl {
1079 use super::{get_hashtable, lock_bucket, with_thread_data, ThreadData, NUM_THREADS};
1080 use crate::thread_parker::{ThreadParkerT, UnparkHandleT};
1081 use crate::word_lock::WordLock;
1082 use backtrace::Backtrace;
1083 use petgraph;
1084 use petgraph::graphmap::DiGraphMap;
1085 use std::cell::{Cell, UnsafeCell};
1086 use std::collections::HashSet;
1087 use std::sync::atomic::Ordering;
1088 use std::sync::mpsc;
1089 use thread_id;
1090
1091 /// Representation of a deadlocked thread
1092 pub struct DeadlockedThread {
1093 thread_id: usize,
1094 backtrace: Backtrace,
1095 }
1096
1097 impl DeadlockedThread {
1098 /// The system thread id
thread_id(&self) -> usize1099 pub fn thread_id(&self) -> usize {
1100 self.thread_id
1101 }
1102
1103 /// The thread backtrace
backtrace(&self) -> &Backtrace1104 pub fn backtrace(&self) -> &Backtrace {
1105 &self.backtrace
1106 }
1107 }
1108
1109 pub struct DeadlockData {
1110 // Currently owned resources (keys)
1111 resources: UnsafeCell<Vec<usize>>,
1112
1113 // Set when there's a pending callstack request
1114 deadlocked: Cell<bool>,
1115
1116 // Sender used to report the backtrace
1117 backtrace_sender: UnsafeCell<Option<mpsc::Sender<DeadlockedThread>>>,
1118
1119 // System thread id
1120 thread_id: usize,
1121 }
1122
1123 impl DeadlockData {
new() -> Self1124 pub fn new() -> Self {
1125 DeadlockData {
1126 resources: UnsafeCell::new(Vec::new()),
1127 deadlocked: Cell::new(false),
1128 backtrace_sender: UnsafeCell::new(None),
1129 thread_id: thread_id::get(),
1130 }
1131 }
1132 }
1133
on_unpark(td: &ThreadData)1134 pub(super) unsafe fn on_unpark(td: &ThreadData) {
1135 if td.deadlock_data.deadlocked.get() {
1136 let sender = (*td.deadlock_data.backtrace_sender.get()).take().unwrap();
1137 sender
1138 .send(DeadlockedThread {
1139 thread_id: td.deadlock_data.thread_id,
1140 backtrace: Backtrace::new(),
1141 })
1142 .unwrap();
1143 // make sure to close this sender
1144 drop(sender);
1145
1146 // park until the end of the time
1147 td.parker.prepare_park();
1148 td.parker.park();
1149 unreachable!("unparked deadlocked thread!");
1150 }
1151 }
1152
acquire_resource(key: usize)1153 pub unsafe fn acquire_resource(key: usize) {
1154 with_thread_data(|thread_data| {
1155 (*thread_data.deadlock_data.resources.get()).push(key);
1156 });
1157 }
1158
release_resource(key: usize)1159 pub unsafe fn release_resource(key: usize) {
1160 with_thread_data(|thread_data| {
1161 let resources = &mut (*thread_data.deadlock_data.resources.get());
1162 match resources.iter().rposition(|x| *x == key) {
1163 Some(p) => resources.swap_remove(p),
1164 None => panic!("key {} not found in thread resources", key),
1165 };
1166 });
1167 }
1168
check_deadlock() -> Vec<Vec<DeadlockedThread>>1169 pub fn check_deadlock() -> Vec<Vec<DeadlockedThread>> {
1170 unsafe {
1171 // fast pass
1172 if check_wait_graph_fast() {
1173 // double check
1174 check_wait_graph_slow()
1175 } else {
1176 Vec::new()
1177 }
1178 }
1179 }
1180
1181 // Simple algorithm that builds a wait graph f the threads and the resources,
1182 // then checks for the presence of cycles (deadlocks).
1183 // This variant isn't precise as it doesn't lock the entire table before checking
check_wait_graph_fast() -> bool1184 unsafe fn check_wait_graph_fast() -> bool {
1185 let table = get_hashtable();
1186 let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1187 let mut graph = DiGraphMap::<usize, ()>::with_capacity(thread_count * 2, thread_count * 2);
1188
1189 for b in &(*table).entries[..] {
1190 b.mutex.lock();
1191 let mut current = b.queue_head.get();
1192 while !current.is_null() {
1193 if !(*current).parked_with_timeout.get()
1194 && !(*current).deadlock_data.deadlocked.get()
1195 {
1196 // .resources are waiting for their owner
1197 for &resource in &(*(*current).deadlock_data.resources.get()) {
1198 graph.add_edge(resource, current as usize, ());
1199 }
1200 // owner waits for resource .key
1201 graph.add_edge(current as usize, (*current).key.load(Ordering::Relaxed), ());
1202 }
1203 current = (*current).next_in_queue.get();
1204 }
1205 b.mutex.unlock();
1206 }
1207
1208 petgraph::algo::is_cyclic_directed(&graph)
1209 }
1210
1211 #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
1212 enum WaitGraphNode {
1213 Thread(*const ThreadData),
1214 Resource(usize),
1215 }
1216
1217 use self::WaitGraphNode::*;
1218
1219 // Contrary to the _fast variant this locks the entries table before looking for cycles.
1220 // Returns all detected thread wait cycles.
1221 // Note that once a cycle is reported it's never reported again.
check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>>1222 unsafe fn check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>> {
1223 static DEADLOCK_DETECTION_LOCK: WordLock = WordLock::INIT;
1224 DEADLOCK_DETECTION_LOCK.lock();
1225
1226 let mut table = get_hashtable();
1227 loop {
1228 // Lock all buckets in the old table
1229 for b in &(*table).entries[..] {
1230 b.mutex.lock();
1231 }
1232
1233 // Now check if our table is still the latest one. Another thread could
1234 // have grown the hash table between us getting and locking the hash table.
1235 let new_table = get_hashtable();
1236 if new_table == table {
1237 break;
1238 }
1239
1240 // Unlock buckets and try again
1241 for b in &(*table).entries[..] {
1242 b.mutex.unlock();
1243 }
1244
1245 table = new_table;
1246 }
1247
1248 let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1249 let mut graph =
1250 DiGraphMap::<WaitGraphNode, ()>::with_capacity(thread_count * 2, thread_count * 2);
1251
1252 for b in &(*table).entries[..] {
1253 let mut current = b.queue_head.get();
1254 while !current.is_null() {
1255 if !(*current).parked_with_timeout.get()
1256 && !(*current).deadlock_data.deadlocked.get()
1257 {
1258 // .resources are waiting for their owner
1259 for &resource in &(*(*current).deadlock_data.resources.get()) {
1260 graph.add_edge(Resource(resource), Thread(current), ());
1261 }
1262 // owner waits for resource .key
1263 graph.add_edge(
1264 Thread(current),
1265 Resource((*current).key.load(Ordering::Relaxed)),
1266 (),
1267 );
1268 }
1269 current = (*current).next_in_queue.get();
1270 }
1271 }
1272
1273 for b in &(*table).entries[..] {
1274 b.mutex.unlock();
1275 }
1276
1277 // find cycles
1278 let cycles = graph_cycles(&graph);
1279
1280 let mut results = Vec::with_capacity(cycles.len());
1281
1282 for cycle in cycles {
1283 let (sender, receiver) = mpsc::channel();
1284 for td in cycle {
1285 let bucket = lock_bucket((*td).key.load(Ordering::Relaxed));
1286 (*td).deadlock_data.deadlocked.set(true);
1287 *(*td).deadlock_data.backtrace_sender.get() = Some(sender.clone());
1288 let handle = (*td).parker.unpark_lock();
1289 bucket.mutex.unlock();
1290 // unpark the deadlocked thread!
1291 // on unpark it'll notice the deadlocked flag and report back
1292 handle.unpark();
1293 }
1294 // make sure to drop our sender before collecting results
1295 drop(sender);
1296 results.push(receiver.iter().collect());
1297 }
1298
1299 DEADLOCK_DETECTION_LOCK.unlock();
1300
1301 results
1302 }
1303
1304 // normalize a cycle to start with the "smallest" node
normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T>1305 fn normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T> {
1306 let min_pos = input
1307 .iter()
1308 .enumerate()
1309 .min_by_key(|&(_, &t)| t)
1310 .map(|(p, _)| p)
1311 .unwrap_or(0);
1312 input
1313 .iter()
1314 .cycle()
1315 .skip(min_pos)
1316 .take(input.len())
1317 .cloned()
1318 .collect()
1319 }
1320
1321 // returns all thread cycles in the wait graph
graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>>1322 fn graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>> {
1323 use petgraph::visit::depth_first_search;
1324 use petgraph::visit::DfsEvent;
1325 use petgraph::visit::NodeIndexable;
1326
1327 let mut cycles = HashSet::new();
1328 let mut path = Vec::with_capacity(g.node_bound());
1329 // start from threads to get the correct threads cycle
1330 let threads = g
1331 .nodes()
1332 .filter(|n| if let &Thread(_) = n { true } else { false });
1333
1334 depth_first_search(g, threads, |e| match e {
1335 DfsEvent::Discover(Thread(n), _) => path.push(n),
1336 DfsEvent::Finish(Thread(_), _) => {
1337 path.pop();
1338 }
1339 DfsEvent::BackEdge(_, Thread(n)) => {
1340 let from = path.iter().rposition(|&i| i == n).unwrap();
1341 cycles.insert(normalize_cycle(&path[from..]));
1342 }
1343 _ => (),
1344 });
1345
1346 cycles.iter().cloned().collect()
1347 }
1348 }
1349