1 // Copyright 2016 Amanieu d'Antras 2 // 3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or 4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or 5 // http://opensource.org/licenses/MIT>, at your option. This file may not be 6 // copied, modified, or distributed except according to those terms. 7 8 use crate::elision::{have_elision, AtomicElisionExt}; 9 use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL}; 10 use crate::util; 11 use core::{ 12 cell::Cell, 13 sync::atomic::{AtomicUsize, Ordering}, 14 }; 15 use instant::Instant; 16 use lock_api::{RawRwLock as RawRwLock_, RawRwLockUpgrade}; 17 use parking_lot_core::{ 18 self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken, 19 }; 20 use std::time::Duration; 21 22 // This reader-writer lock implementation is based on Boost's upgrade_mutex: 23 // https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432 24 // 25 // This implementation uses 2 wait queues, one at key [addr] and one at key 26 // [addr + 1]. The primary queue is used for all new waiting threads, and the 27 // secondary queue is used by the thread which has acquired WRITER_BIT but is 28 // waiting for the remaining readers to exit the lock. 29 // 30 // This implementation is fair between readers and writers since it uses the 31 // order in which threads first started queuing to alternate between read phases 32 // and write phases. In particular is it not vulnerable to write starvation 33 // since readers will block if there is a pending writer. 34 35 // There is at least one thread in the main queue. 36 const PARKED_BIT: usize = 0b0001; 37 // There is a parked thread holding WRITER_BIT. WRITER_BIT must be set. 38 const WRITER_PARKED_BIT: usize = 0b0010; 39 // A reader is holding an upgradable lock. The reader count must be non-zero and 40 // WRITER_BIT must not be set. 41 const UPGRADABLE_BIT: usize = 0b0100; 42 // If the reader count is zero: a writer is currently holding an exclusive lock. 43 // Otherwise: a writer is waiting for the remaining readers to exit the lock. 44 const WRITER_BIT: usize = 0b1000; 45 // Mask of bits used to count readers. 46 const READERS_MASK: usize = !0b1111; 47 // Base unit for counting readers. 48 const ONE_READER: usize = 0b10000; 49 50 // Token indicating what type of lock a queued thread is trying to acquire 51 const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER); 52 const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT); 53 const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT); 54 55 /// Raw reader-writer lock type backed by the parking lot. 56 pub struct RawRwLock { 57 state: AtomicUsize, 58 } 59 60 unsafe impl lock_api::RawRwLock for RawRwLock { 61 const INIT: RawRwLock = RawRwLock { 62 state: AtomicUsize::new(0), 63 }; 64 65 type GuardMarker = crate::GuardMarker; 66 67 #[inline] lock_exclusive(&self)68 fn lock_exclusive(&self) { 69 if self 70 .state 71 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) 72 .is_err() 73 { 74 let result = self.lock_exclusive_slow(None); 75 debug_assert!(result); 76 } 77 self.deadlock_acquire(); 78 } 79 80 #[inline] try_lock_exclusive(&self) -> bool81 fn try_lock_exclusive(&self) -> bool { 82 if self 83 .state 84 .compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) 85 .is_ok() 86 { 87 self.deadlock_acquire(); 88 true 89 } else { 90 false 91 } 92 } 93 94 #[inline] unlock_exclusive(&self)95 unsafe fn unlock_exclusive(&self) { 96 self.deadlock_release(); 97 if self 98 .state 99 .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed) 100 .is_ok() 101 { 102 return; 103 } 104 self.unlock_exclusive_slow(false); 105 } 106 107 #[inline] lock_shared(&self)108 fn lock_shared(&self) { 109 if !self.try_lock_shared_fast(false) { 110 let result = self.lock_shared_slow(false, None); 111 debug_assert!(result); 112 } 113 self.deadlock_acquire(); 114 } 115 116 #[inline] try_lock_shared(&self) -> bool117 fn try_lock_shared(&self) -> bool { 118 let result = if self.try_lock_shared_fast(false) { 119 true 120 } else { 121 self.try_lock_shared_slow(false) 122 }; 123 if result { 124 self.deadlock_acquire(); 125 } 126 result 127 } 128 129 #[inline] unlock_shared(&self)130 unsafe fn unlock_shared(&self) { 131 self.deadlock_release(); 132 let state = if have_elision() { 133 self.state.elision_fetch_sub_release(ONE_READER) 134 } else { 135 self.state.fetch_sub(ONE_READER, Ordering::Release) 136 }; 137 if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) { 138 self.unlock_shared_slow(); 139 } 140 } 141 142 #[inline] is_locked(&self) -> bool143 fn is_locked(&self) -> bool { 144 let state = self.state.load(Ordering::Relaxed); 145 state & (WRITER_BIT | READERS_MASK) != 0 146 } 147 } 148 149 unsafe impl lock_api::RawRwLockFair for RawRwLock { 150 #[inline] unlock_shared_fair(&self)151 unsafe fn unlock_shared_fair(&self) { 152 // Shared unlocking is always fair in this implementation. 153 self.unlock_shared(); 154 } 155 156 #[inline] unlock_exclusive_fair(&self)157 unsafe fn unlock_exclusive_fair(&self) { 158 self.deadlock_release(); 159 if self 160 .state 161 .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed) 162 .is_ok() 163 { 164 return; 165 } 166 self.unlock_exclusive_slow(true); 167 } 168 169 #[inline] bump_shared(&self)170 unsafe fn bump_shared(&self) { 171 if self.state.load(Ordering::Relaxed) & (READERS_MASK | WRITER_BIT) 172 == ONE_READER | WRITER_BIT 173 { 174 self.bump_shared_slow(); 175 } 176 } 177 178 #[inline] bump_exclusive(&self)179 unsafe fn bump_exclusive(&self) { 180 if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 { 181 self.bump_exclusive_slow(); 182 } 183 } 184 } 185 186 unsafe impl lock_api::RawRwLockDowngrade for RawRwLock { 187 #[inline] downgrade(&self)188 unsafe fn downgrade(&self) { 189 let state = self 190 .state 191 .fetch_add(ONE_READER - WRITER_BIT, Ordering::Release); 192 193 // Wake up parked shared and upgradable threads if there are any 194 if state & PARKED_BIT != 0 { 195 self.downgrade_slow(); 196 } 197 } 198 } 199 200 unsafe impl lock_api::RawRwLockTimed for RawRwLock { 201 type Duration = Duration; 202 type Instant = Instant; 203 204 #[inline] try_lock_shared_for(&self, timeout: Self::Duration) -> bool205 fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool { 206 let result = if self.try_lock_shared_fast(false) { 207 true 208 } else { 209 self.lock_shared_slow(false, util::to_deadline(timeout)) 210 }; 211 if result { 212 self.deadlock_acquire(); 213 } 214 result 215 } 216 217 #[inline] try_lock_shared_until(&self, timeout: Self::Instant) -> bool218 fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool { 219 let result = if self.try_lock_shared_fast(false) { 220 true 221 } else { 222 self.lock_shared_slow(false, Some(timeout)) 223 }; 224 if result { 225 self.deadlock_acquire(); 226 } 227 result 228 } 229 230 #[inline] try_lock_exclusive_for(&self, timeout: Duration) -> bool231 fn try_lock_exclusive_for(&self, timeout: Duration) -> bool { 232 let result = if self 233 .state 234 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) 235 .is_ok() 236 { 237 true 238 } else { 239 self.lock_exclusive_slow(util::to_deadline(timeout)) 240 }; 241 if result { 242 self.deadlock_acquire(); 243 } 244 result 245 } 246 247 #[inline] try_lock_exclusive_until(&self, timeout: Instant) -> bool248 fn try_lock_exclusive_until(&self, timeout: Instant) -> bool { 249 let result = if self 250 .state 251 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) 252 .is_ok() 253 { 254 true 255 } else { 256 self.lock_exclusive_slow(Some(timeout)) 257 }; 258 if result { 259 self.deadlock_acquire(); 260 } 261 result 262 } 263 } 264 265 unsafe impl lock_api::RawRwLockRecursive for RawRwLock { 266 #[inline] lock_shared_recursive(&self)267 fn lock_shared_recursive(&self) { 268 if !self.try_lock_shared_fast(true) { 269 let result = self.lock_shared_slow(true, None); 270 debug_assert!(result); 271 } 272 self.deadlock_acquire(); 273 } 274 275 #[inline] try_lock_shared_recursive(&self) -> bool276 fn try_lock_shared_recursive(&self) -> bool { 277 let result = if self.try_lock_shared_fast(true) { 278 true 279 } else { 280 self.try_lock_shared_slow(true) 281 }; 282 if result { 283 self.deadlock_acquire(); 284 } 285 result 286 } 287 } 288 289 unsafe impl lock_api::RawRwLockRecursiveTimed for RawRwLock { 290 #[inline] try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool291 fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool { 292 let result = if self.try_lock_shared_fast(true) { 293 true 294 } else { 295 self.lock_shared_slow(true, util::to_deadline(timeout)) 296 }; 297 if result { 298 self.deadlock_acquire(); 299 } 300 result 301 } 302 303 #[inline] try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool304 fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool { 305 let result = if self.try_lock_shared_fast(true) { 306 true 307 } else { 308 self.lock_shared_slow(true, Some(timeout)) 309 }; 310 if result { 311 self.deadlock_acquire(); 312 } 313 result 314 } 315 } 316 317 unsafe impl lock_api::RawRwLockUpgrade for RawRwLock { 318 #[inline] lock_upgradable(&self)319 fn lock_upgradable(&self) { 320 if !self.try_lock_upgradable_fast() { 321 let result = self.lock_upgradable_slow(None); 322 debug_assert!(result); 323 } 324 self.deadlock_acquire(); 325 } 326 327 #[inline] try_lock_upgradable(&self) -> bool328 fn try_lock_upgradable(&self) -> bool { 329 let result = if self.try_lock_upgradable_fast() { 330 true 331 } else { 332 self.try_lock_upgradable_slow() 333 }; 334 if result { 335 self.deadlock_acquire(); 336 } 337 result 338 } 339 340 #[inline] unlock_upgradable(&self)341 unsafe fn unlock_upgradable(&self) { 342 self.deadlock_release(); 343 let state = self.state.load(Ordering::Relaxed); 344 if state & PARKED_BIT == 0 { 345 if self 346 .state 347 .compare_exchange_weak( 348 state, 349 state - (ONE_READER | UPGRADABLE_BIT), 350 Ordering::Release, 351 Ordering::Relaxed, 352 ) 353 .is_ok() 354 { 355 return; 356 } 357 } 358 self.unlock_upgradable_slow(false); 359 } 360 361 #[inline] upgrade(&self)362 unsafe fn upgrade(&self) { 363 let state = self.state.fetch_sub( 364 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, 365 Ordering::Acquire, 366 ); 367 if state & READERS_MASK != ONE_READER { 368 let result = self.upgrade_slow(None); 369 debug_assert!(result); 370 } 371 } 372 373 #[inline] try_upgrade(&self) -> bool374 unsafe fn try_upgrade(&self) -> bool { 375 if self 376 .state 377 .compare_exchange_weak( 378 ONE_READER | UPGRADABLE_BIT, 379 WRITER_BIT, 380 Ordering::Acquire, 381 Ordering::Relaxed, 382 ) 383 .is_ok() 384 { 385 true 386 } else { 387 self.try_upgrade_slow() 388 } 389 } 390 } 391 392 unsafe impl lock_api::RawRwLockUpgradeFair for RawRwLock { 393 #[inline] unlock_upgradable_fair(&self)394 unsafe fn unlock_upgradable_fair(&self) { 395 self.deadlock_release(); 396 let state = self.state.load(Ordering::Relaxed); 397 if state & PARKED_BIT == 0 { 398 if self 399 .state 400 .compare_exchange_weak( 401 state, 402 state - (ONE_READER | UPGRADABLE_BIT), 403 Ordering::Release, 404 Ordering::Relaxed, 405 ) 406 .is_ok() 407 { 408 return; 409 } 410 } 411 self.unlock_upgradable_slow(false); 412 } 413 414 #[inline] bump_upgradable(&self)415 unsafe fn bump_upgradable(&self) { 416 if self.state.load(Ordering::Relaxed) == ONE_READER | UPGRADABLE_BIT | PARKED_BIT { 417 self.bump_upgradable_slow(); 418 } 419 } 420 } 421 422 unsafe impl lock_api::RawRwLockUpgradeDowngrade for RawRwLock { 423 #[inline] downgrade_upgradable(&self)424 unsafe fn downgrade_upgradable(&self) { 425 let state = self.state.fetch_sub(UPGRADABLE_BIT, Ordering::Relaxed); 426 427 // Wake up parked upgradable threads if there are any 428 if state & PARKED_BIT != 0 { 429 self.downgrade_slow(); 430 } 431 } 432 433 #[inline] downgrade_to_upgradable(&self)434 unsafe fn downgrade_to_upgradable(&self) { 435 let state = self.state.fetch_add( 436 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, 437 Ordering::Release, 438 ); 439 440 // Wake up parked shared threads if there are any 441 if state & PARKED_BIT != 0 { 442 self.downgrade_to_upgradable_slow(); 443 } 444 } 445 } 446 447 unsafe impl lock_api::RawRwLockUpgradeTimed for RawRwLock { 448 #[inline] try_lock_upgradable_until(&self, timeout: Instant) -> bool449 fn try_lock_upgradable_until(&self, timeout: Instant) -> bool { 450 let result = if self.try_lock_upgradable_fast() { 451 true 452 } else { 453 self.lock_upgradable_slow(Some(timeout)) 454 }; 455 if result { 456 self.deadlock_acquire(); 457 } 458 result 459 } 460 461 #[inline] try_lock_upgradable_for(&self, timeout: Duration) -> bool462 fn try_lock_upgradable_for(&self, timeout: Duration) -> bool { 463 let result = if self.try_lock_upgradable_fast() { 464 true 465 } else { 466 self.lock_upgradable_slow(util::to_deadline(timeout)) 467 }; 468 if result { 469 self.deadlock_acquire(); 470 } 471 result 472 } 473 474 #[inline] try_upgrade_until(&self, timeout: Instant) -> bool475 unsafe fn try_upgrade_until(&self, timeout: Instant) -> bool { 476 let state = self.state.fetch_sub( 477 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, 478 Ordering::Relaxed, 479 ); 480 if state & READERS_MASK == ONE_READER { 481 true 482 } else { 483 self.upgrade_slow(Some(timeout)) 484 } 485 } 486 487 #[inline] try_upgrade_for(&self, timeout: Duration) -> bool488 unsafe fn try_upgrade_for(&self, timeout: Duration) -> bool { 489 let state = self.state.fetch_sub( 490 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, 491 Ordering::Relaxed, 492 ); 493 if state & READERS_MASK == ONE_READER { 494 true 495 } else { 496 self.upgrade_slow(util::to_deadline(timeout)) 497 } 498 } 499 } 500 501 impl RawRwLock { 502 #[inline(always)] try_lock_shared_fast(&self, recursive: bool) -> bool503 fn try_lock_shared_fast(&self, recursive: bool) -> bool { 504 let state = self.state.load(Ordering::Relaxed); 505 506 // We can't allow grabbing a shared lock if there is a writer, even if 507 // the writer is still waiting for the remaining readers to exit. 508 if state & WRITER_BIT != 0 { 509 // To allow recursive locks, we make an exception and allow readers 510 // to skip ahead of a pending writer to avoid deadlocking, at the 511 // cost of breaking the fairness guarantees. 512 if !recursive || state & READERS_MASK == 0 { 513 return false; 514 } 515 } 516 517 // Use hardware lock elision to avoid cache conflicts when multiple 518 // readers try to acquire the lock. We only do this if the lock is 519 // completely empty since elision handles conflicts poorly. 520 if have_elision() && state == 0 { 521 self.state 522 .elision_compare_exchange_acquire(0, ONE_READER) 523 .is_ok() 524 } else if let Some(new_state) = state.checked_add(ONE_READER) { 525 self.state 526 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed) 527 .is_ok() 528 } else { 529 false 530 } 531 } 532 533 #[cold] try_lock_shared_slow(&self, recursive: bool) -> bool534 fn try_lock_shared_slow(&self, recursive: bool) -> bool { 535 let mut state = self.state.load(Ordering::Relaxed); 536 loop { 537 // This mirrors the condition in try_lock_shared_fast 538 if state & WRITER_BIT != 0 { 539 if !recursive || state & READERS_MASK == 0 { 540 return false; 541 } 542 } 543 if have_elision() && state == 0 { 544 match self.state.elision_compare_exchange_acquire(0, ONE_READER) { 545 Ok(_) => return true, 546 Err(x) => state = x, 547 } 548 } else { 549 match self.state.compare_exchange_weak( 550 state, 551 state 552 .checked_add(ONE_READER) 553 .expect("RwLock reader count overflow"), 554 Ordering::Acquire, 555 Ordering::Relaxed, 556 ) { 557 Ok(_) => return true, 558 Err(x) => state = x, 559 } 560 } 561 } 562 } 563 564 #[inline(always)] try_lock_upgradable_fast(&self) -> bool565 fn try_lock_upgradable_fast(&self) -> bool { 566 let state = self.state.load(Ordering::Relaxed); 567 568 // We can't grab an upgradable lock if there is already a writer or 569 // upgradable reader. 570 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 571 return false; 572 } 573 574 if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) { 575 self.state 576 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed) 577 .is_ok() 578 } else { 579 false 580 } 581 } 582 583 #[cold] try_lock_upgradable_slow(&self) -> bool584 fn try_lock_upgradable_slow(&self) -> bool { 585 let mut state = self.state.load(Ordering::Relaxed); 586 loop { 587 // This mirrors the condition in try_lock_upgradable_fast 588 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 589 return false; 590 } 591 592 match self.state.compare_exchange_weak( 593 state, 594 state 595 .checked_add(ONE_READER | UPGRADABLE_BIT) 596 .expect("RwLock reader count overflow"), 597 Ordering::Acquire, 598 Ordering::Relaxed, 599 ) { 600 Ok(_) => return true, 601 Err(x) => state = x, 602 } 603 } 604 } 605 606 #[cold] lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool607 fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool { 608 let try_lock = |state: &mut usize| { 609 loop { 610 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 611 return false; 612 } 613 614 // Grab WRITER_BIT if it isn't set, even if there are parked threads. 615 match self.state.compare_exchange_weak( 616 *state, 617 *state | WRITER_BIT, 618 Ordering::Acquire, 619 Ordering::Relaxed, 620 ) { 621 Ok(_) => return true, 622 Err(x) => *state = x, 623 } 624 } 625 }; 626 627 // Step 1: grab exclusive ownership of WRITER_BIT 628 let timed_out = !self.lock_common( 629 timeout, 630 TOKEN_EXCLUSIVE, 631 try_lock, 632 WRITER_BIT | UPGRADABLE_BIT, 633 ); 634 if timed_out { 635 return false; 636 } 637 638 // Step 2: wait for all remaining readers to exit the lock. 639 self.wait_for_readers(timeout, 0) 640 } 641 642 #[cold] unlock_exclusive_slow(&self, force_fair: bool)643 fn unlock_exclusive_slow(&self, force_fair: bool) { 644 // There are threads to unpark. Try to unpark as many as we can. 645 let callback = |mut new_state, result: UnparkResult| { 646 // If we are using a fair unlock then we should keep the 647 // rwlock locked and hand it off to the unparked threads. 648 if result.unparked_threads != 0 && (force_fair || result.be_fair) { 649 if result.have_more_threads { 650 new_state |= PARKED_BIT; 651 } 652 self.state.store(new_state, Ordering::Release); 653 TOKEN_HANDOFF 654 } else { 655 // Clear the parked bit if there are no more parked threads. 656 if result.have_more_threads { 657 self.state.store(PARKED_BIT, Ordering::Release); 658 } else { 659 self.state.store(0, Ordering::Release); 660 } 661 TOKEN_NORMAL 662 } 663 }; 664 // SAFETY: `callback` does not panic or call into any function of `parking_lot`. 665 unsafe { 666 self.wake_parked_threads(0, callback); 667 } 668 } 669 670 #[cold] lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool671 fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool { 672 let try_lock = |state: &mut usize| { 673 let mut spinwait_shared = SpinWait::new(); 674 loop { 675 // Use hardware lock elision to avoid cache conflicts when multiple 676 // readers try to acquire the lock. We only do this if the lock is 677 // completely empty since elision handles conflicts poorly. 678 if have_elision() && *state == 0 { 679 match self.state.elision_compare_exchange_acquire(0, ONE_READER) { 680 Ok(_) => return true, 681 Err(x) => *state = x, 682 } 683 } 684 685 // This is the same condition as try_lock_shared_fast 686 if *state & WRITER_BIT != 0 { 687 if !recursive || *state & READERS_MASK == 0 { 688 return false; 689 } 690 } 691 692 if self 693 .state 694 .compare_exchange_weak( 695 *state, 696 state 697 .checked_add(ONE_READER) 698 .expect("RwLock reader count overflow"), 699 Ordering::Acquire, 700 Ordering::Relaxed, 701 ) 702 .is_ok() 703 { 704 return true; 705 } 706 707 // If there is high contention on the reader count then we want 708 // to leave some time between attempts to acquire the lock to 709 // let other threads make progress. 710 spinwait_shared.spin_no_yield(); 711 *state = self.state.load(Ordering::Relaxed); 712 } 713 }; 714 self.lock_common(timeout, TOKEN_SHARED, try_lock, WRITER_BIT) 715 } 716 717 #[cold] unlock_shared_slow(&self)718 fn unlock_shared_slow(&self) { 719 // At this point WRITER_PARKED_BIT is set and READER_MASK is empty. We 720 // just need to wake up a potentially sleeping pending writer. 721 // Using the 2nd key at addr + 1 722 let addr = self as *const _ as usize + 1; 723 let callback = |_result: UnparkResult| { 724 // Clear the WRITER_PARKED_BIT here since there can only be one 725 // parked writer thread. 726 self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed); 727 TOKEN_NORMAL 728 }; 729 // SAFETY: 730 // * `addr` is an address we control. 731 // * `callback` does not panic or call into any function of `parking_lot`. 732 unsafe { 733 parking_lot_core::unpark_one(addr, callback); 734 } 735 } 736 737 #[cold] lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool738 fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool { 739 let try_lock = |state: &mut usize| { 740 let mut spinwait_shared = SpinWait::new(); 741 loop { 742 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 743 return false; 744 } 745 746 if self 747 .state 748 .compare_exchange_weak( 749 *state, 750 state 751 .checked_add(ONE_READER | UPGRADABLE_BIT) 752 .expect("RwLock reader count overflow"), 753 Ordering::Acquire, 754 Ordering::Relaxed, 755 ) 756 .is_ok() 757 { 758 return true; 759 } 760 761 // If there is high contention on the reader count then we want 762 // to leave some time between attempts to acquire the lock to 763 // let other threads make progress. 764 spinwait_shared.spin_no_yield(); 765 *state = self.state.load(Ordering::Relaxed); 766 } 767 }; 768 self.lock_common( 769 timeout, 770 TOKEN_UPGRADABLE, 771 try_lock, 772 WRITER_BIT | UPGRADABLE_BIT, 773 ) 774 } 775 776 #[cold] unlock_upgradable_slow(&self, force_fair: bool)777 fn unlock_upgradable_slow(&self, force_fair: bool) { 778 // Just release the lock if there are no parked threads. 779 let mut state = self.state.load(Ordering::Relaxed); 780 while state & PARKED_BIT == 0 { 781 match self.state.compare_exchange_weak( 782 state, 783 state - (ONE_READER | UPGRADABLE_BIT), 784 Ordering::Release, 785 Ordering::Relaxed, 786 ) { 787 Ok(_) => return, 788 Err(x) => state = x, 789 } 790 } 791 792 // There are threads to unpark. Try to unpark as many as we can. 793 let callback = |new_state, result: UnparkResult| { 794 // If we are using a fair unlock then we should keep the 795 // rwlock locked and hand it off to the unparked threads. 796 let mut state = self.state.load(Ordering::Relaxed); 797 if force_fair || result.be_fair { 798 // Fall back to normal unpark on overflow. Panicking is 799 // not allowed in parking_lot callbacks. 800 while let Some(mut new_state) = 801 (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state) 802 { 803 if result.have_more_threads { 804 new_state |= PARKED_BIT; 805 } else { 806 new_state &= !PARKED_BIT; 807 } 808 match self.state.compare_exchange_weak( 809 state, 810 new_state, 811 Ordering::Relaxed, 812 Ordering::Relaxed, 813 ) { 814 Ok(_) => return TOKEN_HANDOFF, 815 Err(x) => state = x, 816 } 817 } 818 } 819 820 // Otherwise just release the upgradable lock and update PARKED_BIT. 821 loop { 822 let mut new_state = state - (ONE_READER | UPGRADABLE_BIT); 823 if result.have_more_threads { 824 new_state |= PARKED_BIT; 825 } else { 826 new_state &= !PARKED_BIT; 827 } 828 match self.state.compare_exchange_weak( 829 state, 830 new_state, 831 Ordering::Relaxed, 832 Ordering::Relaxed, 833 ) { 834 Ok(_) => return TOKEN_NORMAL, 835 Err(x) => state = x, 836 } 837 } 838 }; 839 // SAFETY: `callback` does not panic or call into any function of `parking_lot`. 840 unsafe { 841 self.wake_parked_threads(0, callback); 842 } 843 } 844 845 #[cold] try_upgrade_slow(&self) -> bool846 fn try_upgrade_slow(&self) -> bool { 847 let mut state = self.state.load(Ordering::Relaxed); 848 loop { 849 if state & READERS_MASK != ONE_READER { 850 return false; 851 } 852 match self.state.compare_exchange_weak( 853 state, 854 state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT, 855 Ordering::Relaxed, 856 Ordering::Relaxed, 857 ) { 858 Ok(_) => return true, 859 Err(x) => state = x, 860 } 861 } 862 } 863 864 #[cold] upgrade_slow(&self, timeout: Option<Instant>) -> bool865 fn upgrade_slow(&self, timeout: Option<Instant>) -> bool { 866 self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT) 867 } 868 869 #[cold] downgrade_slow(&self)870 fn downgrade_slow(&self) { 871 // We only reach this point if PARKED_BIT is set. 872 let callback = |_, result: UnparkResult| { 873 // Clear the parked bit if there no more parked threads 874 if !result.have_more_threads { 875 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 876 } 877 TOKEN_NORMAL 878 }; 879 // SAFETY: `callback` does not panic or call into any function of `parking_lot`. 880 unsafe { 881 self.wake_parked_threads(ONE_READER, callback); 882 } 883 } 884 885 #[cold] downgrade_to_upgradable_slow(&self)886 fn downgrade_to_upgradable_slow(&self) { 887 // We only reach this point if PARKED_BIT is set. 888 let callback = |_, result: UnparkResult| { 889 // Clear the parked bit if there no more parked threads 890 if !result.have_more_threads { 891 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 892 } 893 TOKEN_NORMAL 894 }; 895 // SAFETY: `callback` does not panic or call into any function of `parking_lot`. 896 unsafe { 897 self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback); 898 } 899 } 900 901 #[cold] bump_shared_slow(&self)902 unsafe fn bump_shared_slow(&self) { 903 self.unlock_shared(); 904 self.lock_shared(); 905 } 906 907 #[cold] bump_exclusive_slow(&self)908 fn bump_exclusive_slow(&self) { 909 self.deadlock_release(); 910 self.unlock_exclusive_slow(true); 911 self.lock_exclusive(); 912 } 913 914 #[cold] bump_upgradable_slow(&self)915 fn bump_upgradable_slow(&self) { 916 self.deadlock_release(); 917 self.unlock_upgradable_slow(true); 918 self.lock_upgradable(); 919 } 920 921 /// Common code for waking up parked threads after releasing WRITER_BIT or 922 /// UPGRADABLE_BIT. 923 /// 924 /// # Safety 925 /// 926 /// `callback` must uphold the requirements of the `callback` parameter to 927 /// `parking_lot_core::unpark_filter`. Meaning no panics or calls into any function in 928 /// `parking_lot`. 929 #[inline] wake_parked_threads( &self, new_state: usize, callback: impl FnOnce(usize, UnparkResult) -> UnparkToken, )930 unsafe fn wake_parked_threads( 931 &self, 932 new_state: usize, 933 callback: impl FnOnce(usize, UnparkResult) -> UnparkToken, 934 ) { 935 // We must wake up at least one upgrader or writer if there is one, 936 // otherwise they may end up parked indefinitely since unlock_shared 937 // does not call wake_parked_threads. 938 let new_state = Cell::new(new_state); 939 let addr = self as *const _ as usize; 940 let filter = |ParkToken(token)| { 941 let s = new_state.get(); 942 943 // If we are waking up a writer, don't wake anything else. 944 if s & WRITER_BIT != 0 { 945 return FilterOp::Stop; 946 } 947 948 // Otherwise wake *all* readers and one upgrader/writer. 949 if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 { 950 // Skip writers and upgradable readers if we already have 951 // a writer/upgradable reader. 952 FilterOp::Skip 953 } else { 954 new_state.set(s + token); 955 FilterOp::Unpark 956 } 957 }; 958 let callback = |result| callback(new_state.get(), result); 959 // SAFETY: 960 // * `addr` is an address we control. 961 // * `filter` does not panic or call into any function of `parking_lot`. 962 // * `callback` safety responsibility is on caller 963 parking_lot_core::unpark_filter(addr, filter, callback); 964 } 965 966 // Common code for waiting for readers to exit the lock after acquiring 967 // WRITER_BIT. 968 #[inline] wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool969 fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool { 970 // At this point WRITER_BIT is already set, we just need to wait for the 971 // remaining readers to exit the lock. 972 let mut spinwait = SpinWait::new(); 973 let mut state = self.state.load(Ordering::Acquire); 974 while state & READERS_MASK != 0 { 975 // Spin a few times to wait for readers to exit 976 if spinwait.spin() { 977 state = self.state.load(Ordering::Acquire); 978 continue; 979 } 980 981 // Set the parked bit 982 if state & WRITER_PARKED_BIT == 0 { 983 if let Err(x) = self.state.compare_exchange_weak( 984 state, 985 state | WRITER_PARKED_BIT, 986 Ordering::Relaxed, 987 Ordering::Relaxed, 988 ) { 989 state = x; 990 continue; 991 } 992 } 993 994 // Park our thread until we are woken up by an unlock 995 // Using the 2nd key at addr + 1 996 let addr = self as *const _ as usize + 1; 997 let validate = || { 998 let state = self.state.load(Ordering::Relaxed); 999 state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0 1000 }; 1001 let before_sleep = || {}; 1002 let timed_out = |_, _| {}; 1003 // SAFETY: 1004 // * `addr` is an address we control. 1005 // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`. 1006 // * `before_sleep` does not call `park`, nor does it panic. 1007 let park_result = unsafe { 1008 parking_lot_core::park( 1009 addr, 1010 validate, 1011 before_sleep, 1012 timed_out, 1013 TOKEN_EXCLUSIVE, 1014 timeout, 1015 ) 1016 }; 1017 match park_result { 1018 // We still need to re-check the state if we are unparked 1019 // since a previous writer timing-out could have allowed 1020 // another reader to sneak in before we parked. 1021 ParkResult::Unparked(_) | ParkResult::Invalid => { 1022 state = self.state.load(Ordering::Acquire); 1023 continue; 1024 } 1025 1026 // Timeout expired 1027 ParkResult::TimedOut => { 1028 // We need to release WRITER_BIT and revert back to 1029 // our previous value. We also wake up any threads that 1030 // might be waiting on WRITER_BIT. 1031 let state = self.state.fetch_add( 1032 prev_value.wrapping_sub(WRITER_BIT | WRITER_PARKED_BIT), 1033 Ordering::Relaxed, 1034 ); 1035 if state & PARKED_BIT != 0 { 1036 let callback = |_, result: UnparkResult| { 1037 // Clear the parked bit if there no more parked threads 1038 if !result.have_more_threads { 1039 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 1040 } 1041 TOKEN_NORMAL 1042 }; 1043 // SAFETY: `callback` does not panic or call any function of `parking_lot`. 1044 unsafe { 1045 self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback); 1046 } 1047 } 1048 return false; 1049 } 1050 } 1051 } 1052 true 1053 } 1054 1055 /// Common code for acquiring a lock 1056 #[inline] lock_common( &self, timeout: Option<Instant>, token: ParkToken, mut try_lock: impl FnMut(&mut usize) -> bool, validate_flags: usize, ) -> bool1057 fn lock_common( 1058 &self, 1059 timeout: Option<Instant>, 1060 token: ParkToken, 1061 mut try_lock: impl FnMut(&mut usize) -> bool, 1062 validate_flags: usize, 1063 ) -> bool { 1064 let mut spinwait = SpinWait::new(); 1065 let mut state = self.state.load(Ordering::Relaxed); 1066 loop { 1067 // Attempt to grab the lock 1068 if try_lock(&mut state) { 1069 return true; 1070 } 1071 1072 // If there are no parked threads, try spinning a few times. 1073 if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() { 1074 state = self.state.load(Ordering::Relaxed); 1075 continue; 1076 } 1077 1078 // Set the parked bit 1079 if state & PARKED_BIT == 0 { 1080 if let Err(x) = self.state.compare_exchange_weak( 1081 state, 1082 state | PARKED_BIT, 1083 Ordering::Relaxed, 1084 Ordering::Relaxed, 1085 ) { 1086 state = x; 1087 continue; 1088 } 1089 } 1090 1091 // Park our thread until we are woken up by an unlock 1092 let addr = self as *const _ as usize; 1093 let validate = || { 1094 let state = self.state.load(Ordering::Relaxed); 1095 state & PARKED_BIT != 0 && (state & validate_flags != 0) 1096 }; 1097 let before_sleep = || {}; 1098 let timed_out = |_, was_last_thread| { 1099 // Clear the parked bit if we were the last parked thread 1100 if was_last_thread { 1101 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 1102 } 1103 }; 1104 1105 // SAFETY: 1106 // * `addr` is an address we control. 1107 // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`. 1108 // * `before_sleep` does not call `park`, nor does it panic. 1109 let park_result = unsafe { 1110 parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout) 1111 }; 1112 match park_result { 1113 // The thread that unparked us passed the lock on to us 1114 // directly without unlocking it. 1115 ParkResult::Unparked(TOKEN_HANDOFF) => return true, 1116 1117 // We were unparked normally, try acquiring the lock again 1118 ParkResult::Unparked(_) => (), 1119 1120 // The validation function failed, try locking again 1121 ParkResult::Invalid => (), 1122 1123 // Timeout expired 1124 ParkResult::TimedOut => return false, 1125 } 1126 1127 // Loop back and try locking again 1128 spinwait.reset(); 1129 state = self.state.load(Ordering::Relaxed); 1130 } 1131 } 1132 1133 #[inline] deadlock_acquire(&self)1134 fn deadlock_acquire(&self) { 1135 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 1136 unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) }; 1137 } 1138 1139 #[inline] deadlock_release(&self)1140 fn deadlock_release(&self) { 1141 unsafe { deadlock::release_resource(self as *const _ as usize) }; 1142 unsafe { deadlock::release_resource(self as *const _ as usize + 1) }; 1143 } 1144 } 1145