1 // Copyright 2016 Amanieu d'Antras 2 // 3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or 4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or 5 // http://opensource.org/licenses/MIT>, at your option. This file may not be 6 // copied, modified, or distributed except according to those terms. 7 8 use crate::elision::{have_elision, AtomicElisionExt}; 9 use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL}; 10 use crate::util; 11 use core::{ 12 cell::Cell, 13 sync::atomic::{AtomicUsize, Ordering}, 14 }; 15 use lock_api::{GuardNoSend, RawRwLock as RawRwLock_, RawRwLockUpgrade}; 16 use parking_lot_core::{ 17 self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken, 18 }; 19 use std::time::{Duration, Instant}; 20 21 // This reader-writer lock implementation is based on Boost's upgrade_mutex: 22 // https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432 23 // 24 // This implementation uses 2 wait queues, one at key [addr] and one at key 25 // [addr + 1]. The primary queue is used for all new waiting threads, and the 26 // secondary queue is used by the thread which has acquired WRITER_BIT but is 27 // waiting for the remaining readers to exit the lock. 28 // 29 // This implementation is fair between readers and writers since it uses the 30 // order in which threads first started queuing to alternate between read phases 31 // and write phases. In particular is it not vulnerable to write starvation 32 // since readers will block if there is a pending writer. 33 34 // There is at least one thread in the main queue. 35 const PARKED_BIT: usize = 0b0001; 36 // There is a parked thread holding WRITER_BIT. WRITER_BIT must be set. 37 const WRITER_PARKED_BIT: usize = 0b0010; 38 // A reader is holding an upgradable lock. The reader count must be non-zero and 39 // WRITER_BIT must not be set. 40 const UPGRADABLE_BIT: usize = 0b0100; 41 // If the reader count is zero: a writer is currently holding an exclusive lock. 42 // Otherwise: a writer is waiting for the remaining readers to exit the lock. 43 const WRITER_BIT: usize = 0b1000; 44 // Mask of bits used to count readers. 45 const READERS_MASK: usize = !0b1111; 46 // Base unit for counting readers. 47 const ONE_READER: usize = 0b10000; 48 49 // Token indicating what type of lock a queued thread is trying to acquire 50 const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER); 51 const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT); 52 const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT); 53 54 /// Raw reader-writer lock type backed by the parking lot. 55 pub struct RawRwLock { 56 state: AtomicUsize, 57 } 58 59 unsafe impl lock_api::RawRwLock for RawRwLock { 60 const INIT: RawRwLock = RawRwLock { 61 state: AtomicUsize::new(0), 62 }; 63 64 type GuardMarker = GuardNoSend; 65 66 #[inline] lock_exclusive(&self)67 fn lock_exclusive(&self) { 68 if self 69 .state 70 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) 71 .is_err() 72 { 73 let result = self.lock_exclusive_slow(None); 74 debug_assert!(result); 75 } 76 self.deadlock_acquire(); 77 } 78 79 #[inline] try_lock_exclusive(&self) -> bool80 fn try_lock_exclusive(&self) -> bool { 81 if self 82 .state 83 .compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) 84 .is_ok() 85 { 86 self.deadlock_acquire(); 87 true 88 } else { 89 false 90 } 91 } 92 93 #[inline] unlock_exclusive(&self)94 fn unlock_exclusive(&self) { 95 self.deadlock_release(); 96 if self 97 .state 98 .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed) 99 .is_ok() 100 { 101 return; 102 } 103 self.unlock_exclusive_slow(false); 104 } 105 106 #[inline] lock_shared(&self)107 fn lock_shared(&self) { 108 if !self.try_lock_shared_fast(false) { 109 let result = self.lock_shared_slow(false, None); 110 debug_assert!(result); 111 } 112 self.deadlock_acquire(); 113 } 114 115 #[inline] try_lock_shared(&self) -> bool116 fn try_lock_shared(&self) -> bool { 117 let result = if self.try_lock_shared_fast(false) { 118 true 119 } else { 120 self.try_lock_shared_slow(false) 121 }; 122 if result { 123 self.deadlock_acquire(); 124 } 125 result 126 } 127 128 #[inline] unlock_shared(&self)129 fn unlock_shared(&self) { 130 self.deadlock_release(); 131 let state = if have_elision() { 132 self.state.elision_fetch_sub_release(ONE_READER) 133 } else { 134 self.state.fetch_sub(ONE_READER, Ordering::Release) 135 }; 136 if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) { 137 self.unlock_shared_slow(); 138 } 139 } 140 } 141 142 unsafe impl lock_api::RawRwLockFair for RawRwLock { 143 #[inline] unlock_shared_fair(&self)144 fn unlock_shared_fair(&self) { 145 // Shared unlocking is always fair in this implementation. 146 self.unlock_shared(); 147 } 148 149 #[inline] unlock_exclusive_fair(&self)150 fn unlock_exclusive_fair(&self) { 151 self.deadlock_release(); 152 if self 153 .state 154 .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed) 155 .is_ok() 156 { 157 return; 158 } 159 self.unlock_exclusive_slow(true); 160 } 161 162 #[inline] bump_shared(&self)163 fn bump_shared(&self) { 164 if self.state.load(Ordering::Relaxed) & (READERS_MASK | WRITER_BIT) 165 == ONE_READER | WRITER_BIT 166 { 167 self.bump_shared_slow(); 168 } 169 } 170 171 #[inline] bump_exclusive(&self)172 fn bump_exclusive(&self) { 173 if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 { 174 self.bump_exclusive_slow(); 175 } 176 } 177 } 178 179 unsafe impl lock_api::RawRwLockDowngrade for RawRwLock { 180 #[inline] downgrade(&self)181 fn downgrade(&self) { 182 let state = self 183 .state 184 .fetch_add(ONE_READER - WRITER_BIT, Ordering::Release); 185 186 // Wake up parked shared and upgradable threads if there are any 187 if state & PARKED_BIT != 0 { 188 self.downgrade_slow(); 189 } 190 } 191 } 192 193 unsafe impl lock_api::RawRwLockTimed for RawRwLock { 194 type Duration = Duration; 195 type Instant = Instant; 196 197 #[inline] try_lock_shared_for(&self, timeout: Self::Duration) -> bool198 fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool { 199 let result = if self.try_lock_shared_fast(false) { 200 true 201 } else { 202 self.lock_shared_slow(false, util::to_deadline(timeout)) 203 }; 204 if result { 205 self.deadlock_acquire(); 206 } 207 result 208 } 209 210 #[inline] try_lock_shared_until(&self, timeout: Self::Instant) -> bool211 fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool { 212 let result = if self.try_lock_shared_fast(false) { 213 true 214 } else { 215 self.lock_shared_slow(false, Some(timeout)) 216 }; 217 if result { 218 self.deadlock_acquire(); 219 } 220 result 221 } 222 223 #[inline] try_lock_exclusive_for(&self, timeout: Duration) -> bool224 fn try_lock_exclusive_for(&self, timeout: Duration) -> bool { 225 let result = if self 226 .state 227 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) 228 .is_ok() 229 { 230 true 231 } else { 232 self.lock_exclusive_slow(util::to_deadline(timeout)) 233 }; 234 if result { 235 self.deadlock_acquire(); 236 } 237 result 238 } 239 240 #[inline] try_lock_exclusive_until(&self, timeout: Instant) -> bool241 fn try_lock_exclusive_until(&self, timeout: Instant) -> bool { 242 let result = if self 243 .state 244 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) 245 .is_ok() 246 { 247 true 248 } else { 249 self.lock_exclusive_slow(Some(timeout)) 250 }; 251 if result { 252 self.deadlock_acquire(); 253 } 254 result 255 } 256 } 257 258 unsafe impl lock_api::RawRwLockRecursive for RawRwLock { 259 #[inline] lock_shared_recursive(&self)260 fn lock_shared_recursive(&self) { 261 if !self.try_lock_shared_fast(true) { 262 let result = self.lock_shared_slow(true, None); 263 debug_assert!(result); 264 } 265 self.deadlock_acquire(); 266 } 267 268 #[inline] try_lock_shared_recursive(&self) -> bool269 fn try_lock_shared_recursive(&self) -> bool { 270 let result = if self.try_lock_shared_fast(true) { 271 true 272 } else { 273 self.try_lock_shared_slow(true) 274 }; 275 if result { 276 self.deadlock_acquire(); 277 } 278 result 279 } 280 } 281 282 unsafe impl lock_api::RawRwLockRecursiveTimed for RawRwLock { 283 #[inline] try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool284 fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool { 285 let result = if self.try_lock_shared_fast(true) { 286 true 287 } else { 288 self.lock_shared_slow(true, util::to_deadline(timeout)) 289 }; 290 if result { 291 self.deadlock_acquire(); 292 } 293 result 294 } 295 296 #[inline] try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool297 fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool { 298 let result = if self.try_lock_shared_fast(true) { 299 true 300 } else { 301 self.lock_shared_slow(true, Some(timeout)) 302 }; 303 if result { 304 self.deadlock_acquire(); 305 } 306 result 307 } 308 } 309 310 unsafe impl lock_api::RawRwLockUpgrade for RawRwLock { 311 #[inline] lock_upgradable(&self)312 fn lock_upgradable(&self) { 313 if !self.try_lock_upgradable_fast() { 314 let result = self.lock_upgradable_slow(None); 315 debug_assert!(result); 316 } 317 self.deadlock_acquire(); 318 } 319 320 #[inline] try_lock_upgradable(&self) -> bool321 fn try_lock_upgradable(&self) -> bool { 322 let result = if self.try_lock_upgradable_fast() { 323 true 324 } else { 325 self.try_lock_upgradable_slow() 326 }; 327 if result { 328 self.deadlock_acquire(); 329 } 330 result 331 } 332 333 #[inline] unlock_upgradable(&self)334 fn unlock_upgradable(&self) { 335 self.deadlock_release(); 336 let state = self.state.load(Ordering::Relaxed); 337 if state & PARKED_BIT == 0 { 338 if self 339 .state 340 .compare_exchange_weak( 341 state, 342 state - (ONE_READER | UPGRADABLE_BIT), 343 Ordering::Release, 344 Ordering::Relaxed, 345 ) 346 .is_ok() 347 { 348 return; 349 } 350 } 351 self.unlock_upgradable_slow(false); 352 } 353 354 #[inline] upgrade(&self)355 fn upgrade(&self) { 356 let state = self.state.fetch_sub( 357 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, 358 Ordering::Relaxed, 359 ); 360 if state & READERS_MASK != ONE_READER { 361 let result = self.upgrade_slow(None); 362 debug_assert!(result); 363 } 364 } 365 366 #[inline] try_upgrade(&self) -> bool367 fn try_upgrade(&self) -> bool { 368 if self 369 .state 370 .compare_exchange_weak( 371 ONE_READER | UPGRADABLE_BIT, 372 WRITER_BIT, 373 Ordering::Relaxed, 374 Ordering::Relaxed, 375 ) 376 .is_ok() 377 { 378 true 379 } else { 380 self.try_upgrade_slow() 381 } 382 } 383 } 384 385 unsafe impl lock_api::RawRwLockUpgradeFair for RawRwLock { 386 #[inline] unlock_upgradable_fair(&self)387 fn unlock_upgradable_fair(&self) { 388 self.deadlock_release(); 389 let state = self.state.load(Ordering::Relaxed); 390 if state & PARKED_BIT == 0 { 391 if self 392 .state 393 .compare_exchange_weak( 394 state, 395 state - (ONE_READER | UPGRADABLE_BIT), 396 Ordering::Release, 397 Ordering::Relaxed, 398 ) 399 .is_ok() 400 { 401 return; 402 } 403 } 404 self.unlock_upgradable_slow(false); 405 } 406 407 #[inline] bump_upgradable(&self)408 fn bump_upgradable(&self) { 409 if self.state.load(Ordering::Relaxed) == ONE_READER | UPGRADABLE_BIT | PARKED_BIT { 410 self.bump_upgradable_slow(); 411 } 412 } 413 } 414 415 unsafe impl lock_api::RawRwLockUpgradeDowngrade for RawRwLock { 416 #[inline] downgrade_upgradable(&self)417 fn downgrade_upgradable(&self) { 418 let state = self.state.fetch_sub(UPGRADABLE_BIT, Ordering::Relaxed); 419 420 // Wake up parked upgradable threads if there are any 421 if state & PARKED_BIT != 0 { 422 self.downgrade_slow(); 423 } 424 } 425 426 #[inline] downgrade_to_upgradable(&self)427 fn downgrade_to_upgradable(&self) { 428 let state = self.state.fetch_add( 429 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, 430 Ordering::Release, 431 ); 432 433 // Wake up parked shared threads if there are any 434 if state & PARKED_BIT != 0 { 435 self.downgrade_to_upgradable_slow(); 436 } 437 } 438 } 439 440 unsafe impl lock_api::RawRwLockUpgradeTimed for RawRwLock { 441 #[inline] try_lock_upgradable_until(&self, timeout: Instant) -> bool442 fn try_lock_upgradable_until(&self, timeout: Instant) -> bool { 443 let result = if self.try_lock_upgradable_fast() { 444 true 445 } else { 446 self.lock_upgradable_slow(Some(timeout)) 447 }; 448 if result { 449 self.deadlock_acquire(); 450 } 451 result 452 } 453 454 #[inline] try_lock_upgradable_for(&self, timeout: Duration) -> bool455 fn try_lock_upgradable_for(&self, timeout: Duration) -> bool { 456 let result = if self.try_lock_upgradable_fast() { 457 true 458 } else { 459 self.lock_upgradable_slow(util::to_deadline(timeout)) 460 }; 461 if result { 462 self.deadlock_acquire(); 463 } 464 result 465 } 466 467 #[inline] try_upgrade_until(&self, timeout: Instant) -> bool468 fn try_upgrade_until(&self, timeout: Instant) -> bool { 469 let state = self.state.fetch_sub( 470 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, 471 Ordering::Relaxed, 472 ); 473 if state & READERS_MASK == ONE_READER { 474 true 475 } else { 476 self.upgrade_slow(Some(timeout)) 477 } 478 } 479 480 #[inline] try_upgrade_for(&self, timeout: Duration) -> bool481 fn try_upgrade_for(&self, timeout: Duration) -> bool { 482 let state = self.state.fetch_sub( 483 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, 484 Ordering::Relaxed, 485 ); 486 if state & READERS_MASK == ONE_READER { 487 true 488 } else { 489 self.upgrade_slow(util::to_deadline(timeout)) 490 } 491 } 492 } 493 494 impl RawRwLock { 495 #[inline(always)] try_lock_shared_fast(&self, recursive: bool) -> bool496 fn try_lock_shared_fast(&self, recursive: bool) -> bool { 497 let state = self.state.load(Ordering::Relaxed); 498 499 // We can't allow grabbing a shared lock if there is a writer, even if 500 // the writer is still waiting for the remaining readers to exit. 501 if state & WRITER_BIT != 0 { 502 // To allow recursive locks, we make an exception and allow readers 503 // to skip ahead of a pending writer to avoid deadlocking, at the 504 // cost of breaking the fairness guarantees. 505 if !recursive || state & READERS_MASK == 0 { 506 return false; 507 } 508 } 509 510 // Use hardware lock elision to avoid cache conflicts when multiple 511 // readers try to acquire the lock. We only do this if the lock is 512 // completely empty since elision handles conflicts poorly. 513 if have_elision() && state == 0 { 514 self.state 515 .elision_compare_exchange_acquire(0, ONE_READER) 516 .is_ok() 517 } else if let Some(new_state) = state.checked_add(ONE_READER) { 518 self.state 519 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed) 520 .is_ok() 521 } else { 522 false 523 } 524 } 525 526 #[cold] try_lock_shared_slow(&self, recursive: bool) -> bool527 fn try_lock_shared_slow(&self, recursive: bool) -> bool { 528 let mut state = self.state.load(Ordering::Relaxed); 529 loop { 530 // This mirrors the condition in try_lock_shared_fast 531 if state & WRITER_BIT != 0 { 532 if !recursive || state & READERS_MASK == 0 { 533 return false; 534 } 535 } 536 if have_elision() && state == 0 { 537 match self.state.elision_compare_exchange_acquire(0, ONE_READER) { 538 Ok(_) => return true, 539 Err(x) => state = x, 540 } 541 } else { 542 match self.state.compare_exchange_weak( 543 state, 544 state 545 .checked_add(ONE_READER) 546 .expect("RwLock reader count overflow"), 547 Ordering::Acquire, 548 Ordering::Relaxed, 549 ) { 550 Ok(_) => return true, 551 Err(x) => state = x, 552 } 553 } 554 } 555 } 556 557 #[inline(always)] try_lock_upgradable_fast(&self) -> bool558 fn try_lock_upgradable_fast(&self) -> bool { 559 let state = self.state.load(Ordering::Relaxed); 560 561 // We can't grab an upgradable lock if there is already a writer or 562 // upgradable reader. 563 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 564 return false; 565 } 566 567 if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) { 568 self.state 569 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed) 570 .is_ok() 571 } else { 572 false 573 } 574 } 575 576 #[cold] try_lock_upgradable_slow(&self) -> bool577 fn try_lock_upgradable_slow(&self) -> bool { 578 let mut state = self.state.load(Ordering::Relaxed); 579 loop { 580 // This mirrors the condition in try_lock_upgradable_fast 581 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 582 return false; 583 } 584 585 match self.state.compare_exchange_weak( 586 state, 587 state 588 .checked_add(ONE_READER | UPGRADABLE_BIT) 589 .expect("RwLock reader count overflow"), 590 Ordering::Acquire, 591 Ordering::Relaxed, 592 ) { 593 Ok(_) => return true, 594 Err(x) => state = x, 595 } 596 } 597 } 598 599 #[cold] lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool600 fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool { 601 let try_lock = |state: &mut usize| { 602 loop { 603 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 604 return false; 605 } 606 607 // Grab WRITER_BIT if it isn't set, even if there are parked threads. 608 match self.state.compare_exchange_weak( 609 *state, 610 *state | WRITER_BIT, 611 Ordering::Acquire, 612 Ordering::Relaxed, 613 ) { 614 Ok(_) => return true, 615 Err(x) => *state = x, 616 } 617 } 618 }; 619 620 // Step 1: grab exclusive ownership of WRITER_BIT 621 let timed_out = !self.lock_common( 622 timeout, 623 TOKEN_EXCLUSIVE, 624 try_lock, 625 WRITER_BIT | UPGRADABLE_BIT, 626 ); 627 if timed_out { 628 return false; 629 } 630 631 // Step 2: wait for all remaining readers to exit the lock. 632 self.wait_for_readers(timeout, 0) 633 } 634 635 #[cold] unlock_exclusive_slow(&self, force_fair: bool)636 fn unlock_exclusive_slow(&self, force_fair: bool) { 637 // There are threads to unpark. Try to unpark as many as we can. 638 let callback = |mut new_state, result: UnparkResult| { 639 // If we are using a fair unlock then we should keep the 640 // rwlock locked and hand it off to the unparked threads. 641 if result.unparked_threads != 0 && (force_fair || result.be_fair) { 642 if result.have_more_threads { 643 new_state |= PARKED_BIT; 644 } 645 self.state.store(new_state, Ordering::Release); 646 TOKEN_HANDOFF 647 } else { 648 // Clear the parked bit if there are no more parked threads. 649 if result.have_more_threads { 650 self.state.store(PARKED_BIT, Ordering::Release); 651 } else { 652 self.state.store(0, Ordering::Release); 653 } 654 TOKEN_NORMAL 655 } 656 }; 657 // SAFETY: `callback` does not panic or call into any function of `parking_lot`. 658 unsafe { 659 self.wake_parked_threads(0, callback); 660 } 661 } 662 663 #[cold] lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool664 fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool { 665 let try_lock = |state: &mut usize| { 666 let mut spinwait_shared = SpinWait::new(); 667 loop { 668 // Use hardware lock elision to avoid cache conflicts when multiple 669 // readers try to acquire the lock. We only do this if the lock is 670 // completely empty since elision handles conflicts poorly. 671 if have_elision() && *state == 0 { 672 match self.state.elision_compare_exchange_acquire(0, ONE_READER) { 673 Ok(_) => return true, 674 Err(x) => *state = x, 675 } 676 } 677 678 // This is the same condition as try_lock_shared_fast 679 if *state & WRITER_BIT != 0 { 680 if !recursive || *state & READERS_MASK == 0 { 681 return false; 682 } 683 } 684 685 if self 686 .state 687 .compare_exchange_weak( 688 *state, 689 state 690 .checked_add(ONE_READER) 691 .expect("RwLock reader count overflow"), 692 Ordering::Acquire, 693 Ordering::Relaxed, 694 ) 695 .is_ok() 696 { 697 return true; 698 } 699 700 // If there is high contention on the reader count then we want 701 // to leave some time between attempts to acquire the lock to 702 // let other threads make progress. 703 spinwait_shared.spin_no_yield(); 704 *state = self.state.load(Ordering::Relaxed); 705 } 706 }; 707 self.lock_common(timeout, TOKEN_SHARED, try_lock, WRITER_BIT) 708 } 709 710 #[cold] unlock_shared_slow(&self)711 fn unlock_shared_slow(&self) { 712 // At this point WRITER_PARKED_BIT is set and READER_MASK is empty. We 713 // just need to wake up a potentially sleeping pending writer. 714 // Using the 2nd key at addr + 1 715 let addr = self as *const _ as usize + 1; 716 let callback = |_result: UnparkResult| { 717 // Clear the WRITER_PARKED_BIT here since there can only be one 718 // parked writer thread. 719 self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed); 720 TOKEN_NORMAL 721 }; 722 // SAFETY: 723 // * `addr` is an address we control. 724 // * `callback` does not panic or call into any function of `parking_lot`. 725 unsafe { 726 parking_lot_core::unpark_one(addr, callback); 727 } 728 } 729 730 #[cold] lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool731 fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool { 732 let try_lock = |state: &mut usize| { 733 let mut spinwait_shared = SpinWait::new(); 734 loop { 735 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 736 return false; 737 } 738 739 if self 740 .state 741 .compare_exchange_weak( 742 *state, 743 state 744 .checked_add(ONE_READER | UPGRADABLE_BIT) 745 .expect("RwLock reader count overflow"), 746 Ordering::Acquire, 747 Ordering::Relaxed, 748 ) 749 .is_ok() 750 { 751 return true; 752 } 753 754 // If there is high contention on the reader count then we want 755 // to leave some time between attempts to acquire the lock to 756 // let other threads make progress. 757 spinwait_shared.spin_no_yield(); 758 *state = self.state.load(Ordering::Relaxed); 759 } 760 }; 761 self.lock_common( 762 timeout, 763 TOKEN_UPGRADABLE, 764 try_lock, 765 WRITER_BIT | UPGRADABLE_BIT, 766 ) 767 } 768 769 #[cold] unlock_upgradable_slow(&self, force_fair: bool)770 fn unlock_upgradable_slow(&self, force_fair: bool) { 771 // Just release the lock if there are no parked threads. 772 let mut state = self.state.load(Ordering::Relaxed); 773 while state & PARKED_BIT == 0 { 774 match self.state.compare_exchange_weak( 775 state, 776 state - (ONE_READER | UPGRADABLE_BIT), 777 Ordering::Release, 778 Ordering::Relaxed, 779 ) { 780 Ok(_) => return, 781 Err(x) => state = x, 782 } 783 } 784 785 // There are threads to unpark. Try to unpark as many as we can. 786 let callback = |new_state, result: UnparkResult| { 787 // If we are using a fair unlock then we should keep the 788 // rwlock locked and hand it off to the unparked threads. 789 let mut state = self.state.load(Ordering::Relaxed); 790 if force_fair || result.be_fair { 791 // Fall back to normal unpark on overflow. Panicking is 792 // not allowed in parking_lot callbacks. 793 while let Some(mut new_state) = 794 (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state) 795 { 796 if result.have_more_threads { 797 new_state |= PARKED_BIT; 798 } else { 799 new_state &= !PARKED_BIT; 800 } 801 match self.state.compare_exchange_weak( 802 state, 803 new_state, 804 Ordering::Relaxed, 805 Ordering::Relaxed, 806 ) { 807 Ok(_) => return TOKEN_HANDOFF, 808 Err(x) => state = x, 809 } 810 } 811 } 812 813 // Otherwise just release the upgradable lock and update PARKED_BIT. 814 loop { 815 let mut new_state = state - (ONE_READER | UPGRADABLE_BIT); 816 if result.have_more_threads { 817 new_state |= PARKED_BIT; 818 } else { 819 new_state &= !PARKED_BIT; 820 } 821 match self.state.compare_exchange_weak( 822 state, 823 new_state, 824 Ordering::Relaxed, 825 Ordering::Relaxed, 826 ) { 827 Ok(_) => return TOKEN_NORMAL, 828 Err(x) => state = x, 829 } 830 } 831 }; 832 // SAFETY: `callback` does not panic or call into any function of `parking_lot`. 833 unsafe { 834 self.wake_parked_threads(0, callback); 835 } 836 } 837 838 #[cold] try_upgrade_slow(&self) -> bool839 fn try_upgrade_slow(&self) -> bool { 840 let mut state = self.state.load(Ordering::Relaxed); 841 loop { 842 if state & READERS_MASK != ONE_READER { 843 return false; 844 } 845 match self.state.compare_exchange_weak( 846 state, 847 state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT, 848 Ordering::Relaxed, 849 Ordering::Relaxed, 850 ) { 851 Ok(_) => return true, 852 Err(x) => state = x, 853 } 854 } 855 } 856 857 #[cold] upgrade_slow(&self, timeout: Option<Instant>) -> bool858 fn upgrade_slow(&self, timeout: Option<Instant>) -> bool { 859 self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT) 860 } 861 862 #[cold] downgrade_slow(&self)863 fn downgrade_slow(&self) { 864 // We only reach this point if PARKED_BIT is set. 865 let callback = |_, result: UnparkResult| { 866 // Clear the parked bit if there no more parked threads 867 if !result.have_more_threads { 868 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 869 } 870 TOKEN_NORMAL 871 }; 872 // SAFETY: `callback` does not panic or call into any function of `parking_lot`. 873 unsafe { 874 self.wake_parked_threads(ONE_READER, callback); 875 } 876 } 877 878 #[cold] downgrade_to_upgradable_slow(&self)879 fn downgrade_to_upgradable_slow(&self) { 880 // We only reach this point if PARKED_BIT is set. 881 let callback = |_, result: UnparkResult| { 882 // Clear the parked bit if there no more parked threads 883 if !result.have_more_threads { 884 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 885 } 886 TOKEN_NORMAL 887 }; 888 // SAFETY: `callback` does not panic or call into any function of `parking_lot`. 889 unsafe { 890 self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback); 891 } 892 } 893 894 #[cold] bump_shared_slow(&self)895 fn bump_shared_slow(&self) { 896 self.unlock_shared(); 897 self.lock_shared(); 898 } 899 900 #[cold] bump_exclusive_slow(&self)901 fn bump_exclusive_slow(&self) { 902 self.deadlock_release(); 903 self.unlock_exclusive_slow(true); 904 self.lock_exclusive(); 905 } 906 907 #[cold] bump_upgradable_slow(&self)908 fn bump_upgradable_slow(&self) { 909 self.deadlock_release(); 910 self.unlock_upgradable_slow(true); 911 self.lock_upgradable(); 912 } 913 914 /// Common code for waking up parked threads after releasing WRITER_BIT or 915 /// UPGRADABLE_BIT. 916 /// 917 /// # Safety 918 /// 919 /// `callback` must uphold the requirements of the `callback` parameter to 920 /// `parking_lot_core::unpark_filter`. Meaning no panics or calls into any function in 921 /// `parking_lot`. 922 #[inline] wake_parked_threads( &self, new_state: usize, callback: impl FnOnce(usize, UnparkResult) -> UnparkToken, )923 unsafe fn wake_parked_threads( 924 &self, 925 new_state: usize, 926 callback: impl FnOnce(usize, UnparkResult) -> UnparkToken, 927 ) { 928 // We must wake up at least one upgrader or writer if there is one, 929 // otherwise they may end up parked indefinitely since unlock_shared 930 // does not call wake_parked_threads. 931 let new_state = Cell::new(new_state); 932 let addr = self as *const _ as usize; 933 let filter = |ParkToken(token)| { 934 let s = new_state.get(); 935 936 // If we are waking up a writer, don't wake anything else. 937 if s & WRITER_BIT != 0 { 938 return FilterOp::Stop; 939 } 940 941 // Otherwise wake *all* readers and one upgrader/writer. 942 if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 { 943 // Skip writers and upgradable readers if we already have 944 // a writer/upgradable reader. 945 FilterOp::Skip 946 } else { 947 new_state.set(s + token); 948 FilterOp::Unpark 949 } 950 }; 951 let callback = |result| callback(new_state.get(), result); 952 // SAFETY: 953 // * `addr` is an address we control. 954 // * `filter` does not panic or call into any function of `parking_lot`. 955 // * `callback` safety responsibility is on caller 956 parking_lot_core::unpark_filter(addr, filter, callback); 957 } 958 959 // Common code for waiting for readers to exit the lock after acquiring 960 // WRITER_BIT. 961 #[inline] wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool962 fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool { 963 // At this point WRITER_BIT is already set, we just need to wait for the 964 // remaining readers to exit the lock. 965 let mut spinwait = SpinWait::new(); 966 let mut state = self.state.load(Ordering::Relaxed); 967 while state & READERS_MASK != 0 { 968 // Spin a few times to wait for readers to exit 969 if spinwait.spin() { 970 state = self.state.load(Ordering::Relaxed); 971 continue; 972 } 973 974 // Set the parked bit 975 if state & WRITER_PARKED_BIT == 0 { 976 if let Err(x) = self.state.compare_exchange_weak( 977 state, 978 state | WRITER_PARKED_BIT, 979 Ordering::Relaxed, 980 Ordering::Relaxed, 981 ) { 982 state = x; 983 continue; 984 } 985 } 986 987 // Park our thread until we are woken up by an unlock 988 // Using the 2nd key at addr + 1 989 let addr = self as *const _ as usize + 1; 990 let validate = || { 991 let state = self.state.load(Ordering::Relaxed); 992 state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0 993 }; 994 let before_sleep = || {}; 995 let timed_out = |_, _| {}; 996 // SAFETY: 997 // * `addr` is an address we control. 998 // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`. 999 // * `before_sleep` does not call `park`, nor does it panic. 1000 let park_result = unsafe { 1001 parking_lot_core::park( 1002 addr, 1003 validate, 1004 before_sleep, 1005 timed_out, 1006 TOKEN_EXCLUSIVE, 1007 timeout, 1008 ) 1009 }; 1010 match park_result { 1011 // We still need to re-check the state if we are unparked 1012 // since a previous writer timing-out could have allowed 1013 // another reader to sneak in before we parked. 1014 ParkResult::Unparked(_) | ParkResult::Invalid => { 1015 state = self.state.load(Ordering::Relaxed); 1016 continue; 1017 } 1018 1019 // Timeout expired 1020 ParkResult::TimedOut => { 1021 // We need to release WRITER_BIT and revert back to 1022 // our previous value. We also wake up any threads that 1023 // might be waiting on WRITER_BIT. 1024 let state = self.state.fetch_add( 1025 prev_value.wrapping_sub(WRITER_BIT | WRITER_PARKED_BIT), 1026 Ordering::Relaxed, 1027 ); 1028 if state & PARKED_BIT != 0 { 1029 let callback = |_, result: UnparkResult| { 1030 // Clear the parked bit if there no more parked threads 1031 if !result.have_more_threads { 1032 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 1033 } 1034 TOKEN_NORMAL 1035 }; 1036 // SAFETY: `callback` does not panic or call any function of `parking_lot`. 1037 unsafe { 1038 self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback); 1039 } 1040 } 1041 return false; 1042 } 1043 } 1044 } 1045 true 1046 } 1047 1048 /// Common code for acquiring a lock 1049 #[inline] lock_common( &self, timeout: Option<Instant>, token: ParkToken, mut try_lock: impl FnMut(&mut usize) -> bool, validate_flags: usize, ) -> bool1050 fn lock_common( 1051 &self, 1052 timeout: Option<Instant>, 1053 token: ParkToken, 1054 mut try_lock: impl FnMut(&mut usize) -> bool, 1055 validate_flags: usize, 1056 ) -> bool { 1057 let mut spinwait = SpinWait::new(); 1058 let mut state = self.state.load(Ordering::Relaxed); 1059 loop { 1060 // Attempt to grab the lock 1061 if try_lock(&mut state) { 1062 return true; 1063 } 1064 1065 // If there are no parked threads, try spinning a few times. 1066 if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() { 1067 state = self.state.load(Ordering::Relaxed); 1068 continue; 1069 } 1070 1071 // Set the parked bit 1072 if state & PARKED_BIT == 0 { 1073 if let Err(x) = self.state.compare_exchange_weak( 1074 state, 1075 state | PARKED_BIT, 1076 Ordering::Relaxed, 1077 Ordering::Relaxed, 1078 ) { 1079 state = x; 1080 continue; 1081 } 1082 } 1083 1084 // Park our thread until we are woken up by an unlock 1085 let addr = self as *const _ as usize; 1086 let validate = || { 1087 let state = self.state.load(Ordering::Relaxed); 1088 state & PARKED_BIT != 0 && (state & validate_flags != 0) 1089 }; 1090 let before_sleep = || {}; 1091 let timed_out = |_, was_last_thread| { 1092 // Clear the parked bit if we were the last parked thread 1093 if was_last_thread { 1094 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 1095 } 1096 }; 1097 1098 // SAFETY: 1099 // * `addr` is an address we control. 1100 // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`. 1101 // * `before_sleep` does not call `park`, nor does it panic. 1102 let park_result = unsafe { 1103 parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout) 1104 }; 1105 match park_result { 1106 // The thread that unparked us passed the lock on to us 1107 // directly without unlocking it. 1108 ParkResult::Unparked(TOKEN_HANDOFF) => return true, 1109 1110 // We were unparked normally, try acquiring the lock again 1111 ParkResult::Unparked(_) => (), 1112 1113 // The validation function failed, try locking again 1114 ParkResult::Invalid => (), 1115 1116 // Timeout expired 1117 ParkResult::TimedOut => return false, 1118 } 1119 1120 // Loop back and try locking again 1121 spinwait.reset(); 1122 state = self.state.load(Ordering::Relaxed); 1123 } 1124 } 1125 1126 #[inline] deadlock_acquire(&self)1127 fn deadlock_acquire(&self) { 1128 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 1129 unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) }; 1130 } 1131 1132 #[inline] deadlock_release(&self)1133 fn deadlock_release(&self) { 1134 unsafe { deadlock::release_resource(self as *const _ as usize) }; 1135 unsafe { deadlock::release_resource(self as *const _ as usize + 1) }; 1136 } 1137 } 1138