1 // Copyright 2016 Amanieu d'Antras 2 // 3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or 4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or 5 // http://opensource.org/licenses/MIT>, at your option. This file may not be 6 // copied, modified, or distributed except according to those terms. 7 8 use crate::elision::{have_elision, AtomicElisionExt}; 9 use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL}; 10 use crate::util; 11 use core::{ 12 cell::Cell, 13 sync::atomic::{AtomicUsize, Ordering}, 14 }; 15 use lock_api::{ 16 GuardNoSend, RawRwLock as RawRwLockTrait, RawRwLockDowngrade, RawRwLockFair, 17 RawRwLockRecursive, RawRwLockRecursiveTimed, RawRwLockTimed, RawRwLockUpgrade, 18 RawRwLockUpgradeDowngrade, RawRwLockUpgradeFair, RawRwLockUpgradeTimed, 19 }; 20 use parking_lot_core::{ 21 self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken, 22 }; 23 use std::time::{Duration, Instant}; 24 25 // This reader-writer lock implementation is based on Boost's upgrade_mutex: 26 // https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432 27 // 28 // This implementation uses 2 wait queues, one at key [addr] and one at key 29 // [addr + 1]. The primary queue is used for all new waiting threads, and the 30 // secondary queue is used by the thread which has acquired WRITER_BIT but is 31 // waiting for the remaining readers to exit the lock. 32 // 33 // This implementation is fair between readers and writers since it uses the 34 // order in which threads first started queuing to alternate between read phases 35 // and write phases. In particular is it not vulnerable to write starvation 36 // since readers will block if there is a pending writer. 37 38 // There is at least one thread in the main queue. 39 const PARKED_BIT: usize = 0b0001; 40 // There is a parked thread holding WRITER_BIT. WRITER_BIT must be set. 41 const WRITER_PARKED_BIT: usize = 0b0010; 42 // A reader is holding an upgradable lock. The reader count must be non-zero and 43 // WRITER_BIT must not be set. 44 const UPGRADABLE_BIT: usize = 0b0100; 45 // If the reader count is zero: a writer is currently holding an exclusive lock. 46 // Otherwise: a writer is waiting for the remaining readers to exit the lock. 47 const WRITER_BIT: usize = 0b1000; 48 // Mask of bits used to count readers. 49 const READERS_MASK: usize = !0b1111; 50 // Base unit for counting readers. 51 const ONE_READER: usize = 0b10000; 52 53 // Token indicating what type of lock a queued thread is trying to acquire 54 const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER); 55 const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT); 56 const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT); 57 58 /// Raw reader-writer lock type backed by the parking lot. 59 pub struct RawRwLock { 60 state: AtomicUsize, 61 } 62 63 unsafe impl RawRwLockTrait for RawRwLock { 64 const INIT: RawRwLock = RawRwLock { state: AtomicUsize::new(0) }; 65 66 type GuardMarker = GuardNoSend; 67 68 #[inline] lock_exclusive(&self)69 fn lock_exclusive(&self) { 70 if self 71 .state 72 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) 73 .is_err() 74 { 75 let result = self.lock_exclusive_slow(None); 76 debug_assert!(result); 77 } 78 self.deadlock_acquire(); 79 } 80 81 #[inline] try_lock_exclusive(&self) -> bool82 fn try_lock_exclusive(&self) -> bool { 83 if self.state.compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed).is_ok() 84 { 85 self.deadlock_acquire(); 86 true 87 } else { 88 false 89 } 90 } 91 92 #[inline] unlock_exclusive(&self)93 fn unlock_exclusive(&self) { 94 self.deadlock_release(); 95 if self.state.compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed).is_ok() 96 { 97 return; 98 } 99 self.unlock_exclusive_slow(false); 100 } 101 102 #[inline] lock_shared(&self)103 fn lock_shared(&self) { 104 if !self.try_lock_shared_fast(false) { 105 let result = self.lock_shared_slow(false, None); 106 debug_assert!(result); 107 } 108 self.deadlock_acquire(); 109 } 110 111 #[inline] try_lock_shared(&self) -> bool112 fn try_lock_shared(&self) -> bool { 113 let result = 114 if self.try_lock_shared_fast(false) { true } else { self.try_lock_shared_slow(false) }; 115 if result { 116 self.deadlock_acquire(); 117 } 118 result 119 } 120 121 #[inline] unlock_shared(&self)122 fn unlock_shared(&self) { 123 self.deadlock_release(); 124 let state = if have_elision() { 125 self.state.elision_fetch_sub_release(ONE_READER) 126 } else { 127 self.state.fetch_sub(ONE_READER, Ordering::Release) 128 }; 129 if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) { 130 self.unlock_shared_slow(); 131 } 132 } 133 } 134 135 unsafe impl RawRwLockFair for RawRwLock { 136 #[inline] unlock_shared_fair(&self)137 fn unlock_shared_fair(&self) { 138 // Shared unlocking is always fair in this implementation. 139 self.unlock_shared(); 140 } 141 142 #[inline] unlock_exclusive_fair(&self)143 fn unlock_exclusive_fair(&self) { 144 self.deadlock_release(); 145 if self.state.compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed).is_ok() 146 { 147 return; 148 } 149 self.unlock_exclusive_slow(true); 150 } 151 152 #[inline] bump_shared(&self)153 fn bump_shared(&self) { 154 if self.state.load(Ordering::Relaxed) & (READERS_MASK | WRITER_BIT) 155 == ONE_READER | WRITER_BIT 156 { 157 self.bump_shared_slow(); 158 } 159 } 160 161 #[inline] bump_exclusive(&self)162 fn bump_exclusive(&self) { 163 if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 { 164 self.bump_exclusive_slow(); 165 } 166 } 167 } 168 169 unsafe impl RawRwLockDowngrade for RawRwLock { 170 #[inline] downgrade(&self)171 fn downgrade(&self) { 172 let state = self.state.fetch_add(ONE_READER - WRITER_BIT, Ordering::Release); 173 174 // Wake up parked shared and upgradable threads if there are any 175 if state & PARKED_BIT != 0 { 176 self.downgrade_slow(); 177 } 178 } 179 } 180 181 unsafe impl RawRwLockTimed for RawRwLock { 182 type Duration = Duration; 183 type Instant = Instant; 184 185 #[inline] try_lock_shared_for(&self, timeout: Self::Duration) -> bool186 fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool { 187 let result = if self.try_lock_shared_fast(false) { 188 true 189 } else { 190 self.lock_shared_slow(false, util::to_deadline(timeout)) 191 }; 192 if result { 193 self.deadlock_acquire(); 194 } 195 result 196 } 197 198 #[inline] try_lock_shared_until(&self, timeout: Self::Instant) -> bool199 fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool { 200 let result = if self.try_lock_shared_fast(false) { 201 true 202 } else { 203 self.lock_shared_slow(false, Some(timeout)) 204 }; 205 if result { 206 self.deadlock_acquire(); 207 } 208 result 209 } 210 211 #[inline] try_lock_exclusive_for(&self, timeout: Duration) -> bool212 fn try_lock_exclusive_for(&self, timeout: Duration) -> bool { 213 let result = if self 214 .state 215 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) 216 .is_ok() 217 { 218 true 219 } else { 220 self.lock_exclusive_slow(util::to_deadline(timeout)) 221 }; 222 if result { 223 self.deadlock_acquire(); 224 } 225 result 226 } 227 228 #[inline] try_lock_exclusive_until(&self, timeout: Instant) -> bool229 fn try_lock_exclusive_until(&self, timeout: Instant) -> bool { 230 let result = if self 231 .state 232 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) 233 .is_ok() 234 { 235 true 236 } else { 237 self.lock_exclusive_slow(Some(timeout)) 238 }; 239 if result { 240 self.deadlock_acquire(); 241 } 242 result 243 } 244 } 245 246 unsafe impl RawRwLockRecursive for RawRwLock { 247 #[inline] lock_shared_recursive(&self)248 fn lock_shared_recursive(&self) { 249 if !self.try_lock_shared_fast(true) { 250 let result = self.lock_shared_slow(true, None); 251 debug_assert!(result); 252 } 253 self.deadlock_acquire(); 254 } 255 256 #[inline] try_lock_shared_recursive(&self) -> bool257 fn try_lock_shared_recursive(&self) -> bool { 258 let result = 259 if self.try_lock_shared_fast(true) { true } else { self.try_lock_shared_slow(true) }; 260 if result { 261 self.deadlock_acquire(); 262 } 263 result 264 } 265 } 266 267 unsafe impl RawRwLockRecursiveTimed for RawRwLock { 268 #[inline] try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool269 fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool { 270 let result = if self.try_lock_shared_fast(true) { 271 true 272 } else { 273 self.lock_shared_slow(true, util::to_deadline(timeout)) 274 }; 275 if result { 276 self.deadlock_acquire(); 277 } 278 result 279 } 280 281 #[inline] try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool282 fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool { 283 let result = if self.try_lock_shared_fast(true) { 284 true 285 } else { 286 self.lock_shared_slow(true, Some(timeout)) 287 }; 288 if result { 289 self.deadlock_acquire(); 290 } 291 result 292 } 293 } 294 295 unsafe impl RawRwLockUpgrade for RawRwLock { 296 #[inline] lock_upgradable(&self)297 fn lock_upgradable(&self) { 298 if !self.try_lock_upgradable_fast() { 299 let result = self.lock_upgradable_slow(None); 300 debug_assert!(result); 301 } 302 self.deadlock_acquire(); 303 } 304 305 #[inline] try_lock_upgradable(&self) -> bool306 fn try_lock_upgradable(&self) -> bool { 307 let result = 308 if self.try_lock_upgradable_fast() { true } else { self.try_lock_upgradable_slow() }; 309 if result { 310 self.deadlock_acquire(); 311 } 312 result 313 } 314 315 #[inline] unlock_upgradable(&self)316 fn unlock_upgradable(&self) { 317 self.deadlock_release(); 318 let state = self.state.load(Ordering::Relaxed); 319 if state & PARKED_BIT == 0 { 320 if self 321 .state 322 .compare_exchange_weak( 323 state, 324 state - (ONE_READER | UPGRADABLE_BIT), 325 Ordering::Release, 326 Ordering::Relaxed, 327 ) 328 .is_ok() 329 { 330 return; 331 } 332 } 333 self.unlock_upgradable_slow(false); 334 } 335 336 #[inline] upgrade(&self)337 fn upgrade(&self) { 338 let state = 339 self.state.fetch_sub((ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, Ordering::Relaxed); 340 if state & READERS_MASK != ONE_READER { 341 let result = self.upgrade_slow(None); 342 debug_assert!(result); 343 } 344 } 345 346 #[inline] try_upgrade(&self) -> bool347 fn try_upgrade(&self) -> bool { 348 if self 349 .state 350 .compare_exchange_weak( 351 ONE_READER | UPGRADABLE_BIT, 352 WRITER_BIT, 353 Ordering::Relaxed, 354 Ordering::Relaxed, 355 ) 356 .is_ok() 357 { 358 true 359 } else { 360 self.try_upgrade_slow() 361 } 362 } 363 } 364 365 unsafe impl RawRwLockUpgradeFair for RawRwLock { 366 #[inline] unlock_upgradable_fair(&self)367 fn unlock_upgradable_fair(&self) { 368 self.deadlock_release(); 369 let state = self.state.load(Ordering::Relaxed); 370 if state & PARKED_BIT == 0 { 371 if self 372 .state 373 .compare_exchange_weak( 374 state, 375 state - (ONE_READER | UPGRADABLE_BIT), 376 Ordering::Release, 377 Ordering::Relaxed, 378 ) 379 .is_ok() 380 { 381 return; 382 } 383 } 384 self.unlock_upgradable_slow(false); 385 } 386 387 #[inline] bump_upgradable(&self)388 fn bump_upgradable(&self) { 389 if self.state.load(Ordering::Relaxed) == ONE_READER | UPGRADABLE_BIT | PARKED_BIT { 390 self.bump_upgradable_slow(); 391 } 392 } 393 } 394 395 unsafe impl RawRwLockUpgradeDowngrade for RawRwLock { 396 #[inline] downgrade_upgradable(&self)397 fn downgrade_upgradable(&self) { 398 let state = self.state.fetch_sub(UPGRADABLE_BIT, Ordering::Relaxed); 399 400 // Wake up parked upgradable threads if there are any 401 if state & PARKED_BIT != 0 { 402 self.downgrade_slow(); 403 } 404 } 405 406 #[inline] downgrade_to_upgradable(&self)407 fn downgrade_to_upgradable(&self) { 408 let state = 409 self.state.fetch_add((ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, Ordering::Release); 410 411 // Wake up parked shared threads if there are any 412 if state & PARKED_BIT != 0 { 413 self.downgrade_to_upgradable_slow(); 414 } 415 } 416 } 417 418 unsafe impl RawRwLockUpgradeTimed for RawRwLock { 419 #[inline] try_lock_upgradable_until(&self, timeout: Instant) -> bool420 fn try_lock_upgradable_until(&self, timeout: Instant) -> bool { 421 let result = if self.try_lock_upgradable_fast() { 422 true 423 } else { 424 self.lock_upgradable_slow(Some(timeout)) 425 }; 426 if result { 427 self.deadlock_acquire(); 428 } 429 result 430 } 431 432 #[inline] try_lock_upgradable_for(&self, timeout: Duration) -> bool433 fn try_lock_upgradable_for(&self, timeout: Duration) -> bool { 434 let result = if self.try_lock_upgradable_fast() { 435 true 436 } else { 437 self.lock_upgradable_slow(util::to_deadline(timeout)) 438 }; 439 if result { 440 self.deadlock_acquire(); 441 } 442 result 443 } 444 445 #[inline] try_upgrade_until(&self, timeout: Instant) -> bool446 fn try_upgrade_until(&self, timeout: Instant) -> bool { 447 let state = 448 self.state.fetch_sub((ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, Ordering::Relaxed); 449 if state & READERS_MASK == ONE_READER { true } else { self.upgrade_slow(Some(timeout)) } 450 } 451 452 #[inline] try_upgrade_for(&self, timeout: Duration) -> bool453 fn try_upgrade_for(&self, timeout: Duration) -> bool { 454 let state = 455 self.state.fetch_sub((ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, Ordering::Relaxed); 456 if state & READERS_MASK == ONE_READER { 457 true 458 } else { 459 self.upgrade_slow(util::to_deadline(timeout)) 460 } 461 } 462 } 463 464 impl RawRwLock { 465 #[inline(always)] try_lock_shared_fast(&self, recursive: bool) -> bool466 fn try_lock_shared_fast(&self, recursive: bool) -> bool { 467 let state = self.state.load(Ordering::Relaxed); 468 469 // We can't allow grabbing a shared lock if there is a writer, even if 470 // the writer is still waiting for the remaining readers to exit. 471 if state & WRITER_BIT != 0 { 472 // To allow recursive locks, we make an exception and allow readers 473 // to skip ahead of a pending writer to avoid deadlocking, at the 474 // cost of breaking the fairness guarantees. 475 if !recursive || state & READERS_MASK == 0 { 476 return false; 477 } 478 } 479 480 // Use hardware lock elision to avoid cache conflicts when multiple 481 // readers try to acquire the lock. We only do this if the lock is 482 // completely empty since elision handles conflicts poorly. 483 if have_elision() && state == 0 { 484 self.state.elision_compare_exchange_acquire(0, ONE_READER).is_ok() 485 } else if let Some(new_state) = state.checked_add(ONE_READER) { 486 self.state 487 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed) 488 .is_ok() 489 } else { 490 false 491 } 492 } 493 494 #[cold] try_lock_shared_slow(&self, recursive: bool) -> bool495 fn try_lock_shared_slow(&self, recursive: bool) -> bool { 496 let mut state = self.state.load(Ordering::Relaxed); 497 loop { 498 // This mirrors the condition in try_lock_shared_fast 499 if state & WRITER_BIT != 0 { 500 if !recursive || state & READERS_MASK == 0 { 501 return false; 502 } 503 } 504 if have_elision() && state == 0 { 505 match self.state.elision_compare_exchange_acquire(0, ONE_READER) { 506 Ok(_) => return true, 507 Err(x) => state = x, 508 } 509 } else { 510 match self.state.compare_exchange_weak( 511 state, 512 state.checked_add(ONE_READER).expect("RwLock reader count overflow"), 513 Ordering::Acquire, 514 Ordering::Relaxed, 515 ) { 516 Ok(_) => return true, 517 Err(x) => state = x, 518 } 519 } 520 } 521 } 522 523 #[inline(always)] try_lock_upgradable_fast(&self) -> bool524 fn try_lock_upgradable_fast(&self) -> bool { 525 let state = self.state.load(Ordering::Relaxed); 526 527 // We can't grab an upgradable lock if there is already a writer or 528 // upgradable reader. 529 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 530 return false; 531 } 532 533 if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) { 534 self.state 535 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed) 536 .is_ok() 537 } else { 538 false 539 } 540 } 541 542 #[cold] try_lock_upgradable_slow(&self) -> bool543 fn try_lock_upgradable_slow(&self) -> bool { 544 let mut state = self.state.load(Ordering::Relaxed); 545 loop { 546 // This mirrors the condition in try_lock_upgradable_fast 547 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 548 return false; 549 } 550 551 match self.state.compare_exchange_weak( 552 state, 553 state 554 .checked_add(ONE_READER | UPGRADABLE_BIT) 555 .expect("RwLock reader count overflow"), 556 Ordering::Acquire, 557 Ordering::Relaxed, 558 ) { 559 Ok(_) => return true, 560 Err(x) => state = x, 561 } 562 } 563 } 564 565 #[cold] lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool566 fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool { 567 // Step 1: grab exclusive ownership of WRITER_BIT 568 let timed_out = !self.lock_common( 569 timeout, 570 TOKEN_EXCLUSIVE, 571 |state| { 572 loop { 573 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 574 return false; 575 } 576 577 // Grab WRITER_BIT if it isn't set, even if there are parked threads. 578 match self.state.compare_exchange_weak( 579 *state, 580 *state | WRITER_BIT, 581 Ordering::Acquire, 582 Ordering::Relaxed, 583 ) { 584 Ok(_) => return true, 585 Err(x) => *state = x, 586 } 587 } 588 }, 589 |state| state & (WRITER_BIT | UPGRADABLE_BIT) != 0, 590 ); 591 if timed_out { 592 return false; 593 } 594 595 // Step 2: wait for all remaining readers to exit the lock. 596 self.wait_for_readers(timeout, 0) 597 } 598 599 #[cold] unlock_exclusive_slow(&self, force_fair: bool)600 fn unlock_exclusive_slow(&self, force_fair: bool) { 601 // There are threads to unpark. Try to unpark as many as we can. 602 let callback = |mut new_state, result: UnparkResult| { 603 // If we are using a fair unlock then we should keep the 604 // rwlock locked and hand it off to the unparked threads. 605 if result.unparked_threads != 0 && (force_fair || result.be_fair) { 606 if result.have_more_threads { 607 new_state |= PARKED_BIT; 608 } 609 self.state.store(new_state, Ordering::Release); 610 TOKEN_HANDOFF 611 } else { 612 // Clear the parked bit if there are no more parked threads. 613 if result.have_more_threads { 614 self.state.store(PARKED_BIT, Ordering::Release); 615 } else { 616 self.state.store(0, Ordering::Release); 617 } 618 TOKEN_NORMAL 619 } 620 }; 621 self.wake_parked_threads(0, callback); 622 } 623 624 #[cold] lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool625 fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool { 626 self.lock_common( 627 timeout, 628 TOKEN_SHARED, 629 |state| { 630 let mut spinwait_shared = SpinWait::new(); 631 loop { 632 // Use hardware lock elision to avoid cache conflicts when multiple 633 // readers try to acquire the lock. We only do this if the lock is 634 // completely empty since elision handles conflicts poorly. 635 if have_elision() && *state == 0 { 636 match self.state.elision_compare_exchange_acquire(0, ONE_READER) { 637 Ok(_) => return true, 638 Err(x) => *state = x, 639 } 640 } 641 642 // This is the same condition as try_lock_shared_fast 643 if *state & WRITER_BIT != 0 { 644 if !recursive || *state & READERS_MASK == 0 { 645 return false; 646 } 647 } 648 649 if self 650 .state 651 .compare_exchange_weak( 652 *state, 653 state.checked_add(ONE_READER).expect("RwLock reader count overflow"), 654 Ordering::Acquire, 655 Ordering::Relaxed, 656 ) 657 .is_ok() 658 { 659 return true; 660 } 661 662 // If there is high contention on the reader count then we want 663 // to leave some time between attempts to acquire the lock to 664 // let other threads make progress. 665 spinwait_shared.spin_no_yield(); 666 *state = self.state.load(Ordering::Relaxed); 667 } 668 }, 669 |state| state & WRITER_BIT != 0, 670 ) 671 } 672 673 #[cold] unlock_shared_slow(&self)674 fn unlock_shared_slow(&self) { 675 // At this point WRITER_PARKED_BIT is set and READER_MASK is empty. We 676 // just need to wake up a potentially sleeping pending writer. 677 unsafe { 678 // Using the 2nd key at addr + 1 679 let addr = self as *const _ as usize + 1; 680 let callback = |result: UnparkResult| { 681 // Clear the WRITER_PARKED_BIT here since there can only be one 682 // parked writer thread. 683 debug_assert!(!result.have_more_threads); 684 self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed); 685 TOKEN_NORMAL 686 }; 687 parking_lot_core::unpark_one(addr, callback); 688 } 689 } 690 691 #[cold] lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool692 fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool { 693 self.lock_common( 694 timeout, 695 TOKEN_UPGRADABLE, 696 |state| { 697 let mut spinwait_shared = SpinWait::new(); 698 loop { 699 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 700 return false; 701 } 702 703 if self 704 .state 705 .compare_exchange_weak( 706 *state, 707 state 708 .checked_add(ONE_READER | UPGRADABLE_BIT) 709 .expect("RwLock reader count overflow"), 710 Ordering::Acquire, 711 Ordering::Relaxed, 712 ) 713 .is_ok() 714 { 715 return true; 716 } 717 718 // If there is high contention on the reader count then we want 719 // to leave some time between attempts to acquire the lock to 720 // let other threads make progress. 721 spinwait_shared.spin_no_yield(); 722 *state = self.state.load(Ordering::Relaxed); 723 } 724 }, 725 |state| state & (WRITER_BIT | UPGRADABLE_BIT) != 0, 726 ) 727 } 728 729 #[cold] unlock_upgradable_slow(&self, force_fair: bool)730 fn unlock_upgradable_slow(&self, force_fair: bool) { 731 // Just release the lock if there are no parked threads. 732 let mut state = self.state.load(Ordering::Relaxed); 733 while state & PARKED_BIT == 0 { 734 match self.state.compare_exchange_weak( 735 state, 736 state - (ONE_READER | UPGRADABLE_BIT), 737 Ordering::Release, 738 Ordering::Relaxed, 739 ) { 740 Ok(_) => return, 741 Err(x) => state = x, 742 } 743 } 744 745 // There are threads to unpark. Try to unpark as many as we can. 746 let callback = |new_state, result: UnparkResult| { 747 // If we are using a fair unlock then we should keep the 748 // rwlock locked and hand it off to the unparked threads. 749 let mut state = self.state.load(Ordering::Relaxed); 750 if force_fair || result.be_fair { 751 // Fall back to normal unpark on overflow. Panicking is 752 // not allowed in parking_lot callbacks. 753 while let Some(mut new_state) = 754 (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state) 755 { 756 if result.have_more_threads { 757 new_state |= PARKED_BIT; 758 } else { 759 new_state &= !PARKED_BIT; 760 } 761 match self.state.compare_exchange_weak( 762 state, 763 new_state, 764 Ordering::Relaxed, 765 Ordering::Relaxed, 766 ) { 767 Ok(_) => return TOKEN_HANDOFF, 768 Err(x) => state = x, 769 } 770 } 771 } 772 773 // Otherwise just release the upgradable lock and update PARKED_BIT. 774 loop { 775 let mut new_state = state - (ONE_READER | UPGRADABLE_BIT); 776 if result.have_more_threads { 777 new_state |= PARKED_BIT; 778 } else { 779 new_state &= !PARKED_BIT; 780 } 781 match self.state.compare_exchange_weak( 782 state, 783 new_state, 784 Ordering::Relaxed, 785 Ordering::Relaxed, 786 ) { 787 Ok(_) => return TOKEN_NORMAL, 788 Err(x) => state = x, 789 } 790 } 791 }; 792 self.wake_parked_threads(0, callback); 793 } 794 795 #[cold] try_upgrade_slow(&self) -> bool796 fn try_upgrade_slow(&self) -> bool { 797 let mut state = self.state.load(Ordering::Relaxed); 798 loop { 799 if state & READERS_MASK != ONE_READER { 800 return false; 801 } 802 match self.state.compare_exchange_weak( 803 state, 804 state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT, 805 Ordering::Relaxed, 806 Ordering::Relaxed, 807 ) { 808 Ok(_) => return true, 809 Err(x) => state = x, 810 } 811 } 812 } 813 814 #[cold] upgrade_slow(&self, timeout: Option<Instant>) -> bool815 fn upgrade_slow(&self, timeout: Option<Instant>) -> bool { 816 self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT) 817 } 818 819 #[cold] downgrade_slow(&self)820 fn downgrade_slow(&self) { 821 // We only reach this point if PARKED_BIT is set. 822 let callback = |_, result: UnparkResult| { 823 // Clear the parked bit if there no more parked threads 824 if !result.have_more_threads { 825 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 826 } 827 TOKEN_NORMAL 828 }; 829 self.wake_parked_threads(ONE_READER, callback); 830 } 831 832 #[cold] downgrade_to_upgradable_slow(&self)833 fn downgrade_to_upgradable_slow(&self) { 834 // We only reach this point if PARKED_BIT is set. 835 let callback = |_, result: UnparkResult| { 836 // Clear the parked bit if there no more parked threads 837 if !result.have_more_threads { 838 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 839 } 840 TOKEN_NORMAL 841 }; 842 self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback); 843 } 844 845 #[cold] bump_shared_slow(&self)846 fn bump_shared_slow(&self) { 847 self.unlock_shared(); 848 self.lock_shared(); 849 } 850 851 #[cold] bump_exclusive_slow(&self)852 fn bump_exclusive_slow(&self) { 853 self.deadlock_release(); 854 self.unlock_exclusive_slow(true); 855 self.lock_exclusive(); 856 } 857 858 #[cold] bump_upgradable_slow(&self)859 fn bump_upgradable_slow(&self) { 860 self.deadlock_release(); 861 self.unlock_upgradable_slow(true); 862 self.lock_upgradable(); 863 } 864 865 // Common code for waking up parked threads after releasing WRITER_BIT or 866 // UPGRADABLE_BIT. 867 #[inline] wake_parked_threads<C>(&self, new_state: usize, callback: C) where C: FnOnce(usize, UnparkResult) -> UnparkToken,868 fn wake_parked_threads<C>(&self, new_state: usize, callback: C) 869 where 870 C: FnOnce(usize, UnparkResult) -> UnparkToken, 871 { 872 // We must wake up at least one upgrader or writer if there is one, 873 // otherwise they may end up parked indefinitely since unlock_shared 874 // does not call wake_parked_threads. 875 let new_state = Cell::new(new_state); 876 unsafe { 877 let addr = self as *const _ as usize; 878 let filter = |ParkToken(token)| { 879 let s = new_state.get(); 880 881 // If we are waking up a writer, don't wake anything else. 882 if s & WRITER_BIT != 0 { 883 return FilterOp::Stop; 884 } 885 886 // Otherwise wake *all* readers and one upgrader/writer. 887 if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 { 888 // Skip writers and upgradable readers if we already have 889 // a writer/upgradable reader. 890 FilterOp::Skip 891 } else { 892 new_state.set(s + token); 893 FilterOp::Unpark 894 } 895 }; 896 parking_lot_core::unpark_filter(addr, filter, |result| { 897 callback(new_state.get(), result) 898 }); 899 } 900 } 901 902 // Common code for waiting for readers to exit the lock after acquiring 903 // WRITER_BIT. 904 #[inline] wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool905 fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool { 906 // At this point WRITER_BIT is already set, we just need to wait for the 907 // remaining readers to exit the lock. 908 let mut spinwait = SpinWait::new(); 909 let mut state = self.state.load(Ordering::Relaxed); 910 while state & READERS_MASK != 0 { 911 // Spin a few times to wait for readers to exit 912 if spinwait.spin() { 913 state = self.state.load(Ordering::Relaxed); 914 continue; 915 } 916 917 // Set the parked bit 918 if state & WRITER_PARKED_BIT == 0 { 919 if let Err(x) = self.state.compare_exchange_weak( 920 state, 921 state | WRITER_PARKED_BIT, 922 Ordering::Relaxed, 923 Ordering::Relaxed, 924 ) { 925 state = x; 926 continue; 927 } 928 } 929 930 // Park our thread until we are woken up by an unlock 931 unsafe { 932 // Using the 2nd key at addr + 1 933 let addr = self as *const _ as usize + 1; 934 let validate = || { 935 let state = self.state.load(Ordering::Relaxed); 936 state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0 937 }; 938 let before_sleep = || {}; 939 let timed_out = |_, _| {}; 940 match parking_lot_core::park( 941 addr, 942 validate, 943 before_sleep, 944 timed_out, 945 TOKEN_EXCLUSIVE, 946 timeout, 947 ) { 948 // We still need to re-check the state if we are unparked 949 // since a previous writer timing-out could have allowed 950 // another reader to sneak in before we parked. 951 ParkResult::Unparked(_) | ParkResult::Invalid => { 952 state = self.state.load(Ordering::Relaxed); 953 continue; 954 } 955 956 // Timeout expired 957 ParkResult::TimedOut => { 958 // We need to release WRITER_BIT and revert back to 959 // our previous value. We also wake up any threads that 960 // might be waiting on WRITER_BIT. 961 let state = self.state.fetch_add( 962 prev_value.wrapping_sub(WRITER_BIT | WRITER_PARKED_BIT), 963 Ordering::Relaxed, 964 ); 965 if state & PARKED_BIT != 0 { 966 let callback = |_, result: UnparkResult| { 967 // Clear the parked bit if there no more parked threads 968 if !result.have_more_threads { 969 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 970 } 971 TOKEN_NORMAL 972 }; 973 self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback); 974 } 975 return false; 976 } 977 } 978 } 979 } 980 true 981 } 982 983 // Common code for acquiring a lock 984 #[inline] lock_common<F, V>( &self, timeout: Option<Instant>, token: ParkToken, mut try_lock: F, validate: V, ) -> bool where F: FnMut(&mut usize) -> bool, V: Fn(usize) -> bool,985 fn lock_common<F, V>( 986 &self, 987 timeout: Option<Instant>, 988 token: ParkToken, 989 mut try_lock: F, 990 validate: V, 991 ) -> bool 992 where 993 F: FnMut(&mut usize) -> bool, 994 V: Fn(usize) -> bool, 995 { 996 let mut spinwait = SpinWait::new(); 997 let mut state = self.state.load(Ordering::Relaxed); 998 loop { 999 // Attempt to grab the lock 1000 if try_lock(&mut state) { 1001 return true; 1002 } 1003 1004 // If there are no parked threads, try spinning a few times. 1005 if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() { 1006 state = self.state.load(Ordering::Relaxed); 1007 continue; 1008 } 1009 1010 // Set the parked bit 1011 if state & PARKED_BIT == 0 { 1012 if let Err(x) = self.state.compare_exchange_weak( 1013 state, 1014 state | PARKED_BIT, 1015 Ordering::Relaxed, 1016 Ordering::Relaxed, 1017 ) { 1018 state = x; 1019 continue; 1020 } 1021 } 1022 1023 // Park our thread until we are woken up by an unlock 1024 unsafe { 1025 let addr = self as *const _ as usize; 1026 let validate = || { 1027 let state = self.state.load(Ordering::Relaxed); 1028 state & PARKED_BIT != 0 && validate(state) 1029 }; 1030 let before_sleep = || {}; 1031 let timed_out = |_, was_last_thread| { 1032 // Clear the parked bit if we were the last parked thread 1033 if was_last_thread { 1034 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 1035 } 1036 }; 1037 match parking_lot_core::park( 1038 addr, 1039 validate, 1040 before_sleep, 1041 timed_out, 1042 token, 1043 timeout, 1044 ) { 1045 // The thread that unparked us passed the lock on to us 1046 // directly without unlocking it. 1047 ParkResult::Unparked(TOKEN_HANDOFF) => return true, 1048 1049 // We were unparked normally, try acquiring the lock again 1050 ParkResult::Unparked(_) => (), 1051 1052 // The validation function failed, try locking again 1053 ParkResult::Invalid => (), 1054 1055 // Timeout expired 1056 ParkResult::TimedOut => return false, 1057 } 1058 } 1059 1060 // Loop back and try locking again 1061 spinwait.reset(); 1062 state = self.state.load(Ordering::Relaxed); 1063 } 1064 } 1065 1066 #[inline] deadlock_acquire(&self)1067 fn deadlock_acquire(&self) { 1068 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 1069 unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) }; 1070 } 1071 1072 #[inline] deadlock_release(&self)1073 fn deadlock_release(&self) { 1074 unsafe { deadlock::release_resource(self as *const _ as usize) }; 1075 unsafe { deadlock::release_resource(self as *const _ as usize + 1) }; 1076 } 1077 } 1078