1 // Copyright 2016 Amanieu d'Antras 2 // 3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or 4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or 5 // http://opensource.org/licenses/MIT>, at your option. This file may not be 6 // copied, modified, or distributed except according to those terms. 7 8 use deadlock; 9 use elision::{have_elision, AtomicElisionExt}; 10 use lock_api::{ 11 GuardNoSend, RawRwLock as RawRwLockTrait, RawRwLockDowngrade, RawRwLockFair, 12 RawRwLockRecursive, RawRwLockRecursiveTimed, RawRwLockTimed, RawRwLockUpgrade, 13 RawRwLockUpgradeDowngrade, RawRwLockUpgradeFair, RawRwLockUpgradeTimed, 14 }; 15 use parking_lot_core::{self, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult}; 16 use raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL}; 17 use std::cell::Cell; 18 use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; 19 use std::time::{Duration, Instant}; 20 use util; 21 22 const PARKED_BIT: usize = 0b001; 23 const UPGRADING_BIT: usize = 0b010; 24 // A shared guard acquires a single guard resource 25 const SHARED_GUARD: usize = 0b100; 26 const GUARD_COUNT_MASK: usize = !(SHARED_GUARD - 1); 27 // An exclusive lock acquires all of guard resource (i.e. it is exclusive) 28 const EXCLUSIVE_GUARD: usize = GUARD_COUNT_MASK; 29 // An upgradable lock acquires just over half of the guard resource 30 // This should be (GUARD_COUNT_MASK + SHARED_GUARD) >> 1, however this might 31 // overflow, so we shift before adding (which is okay since the least 32 // significant bit is zero for both GUARD_COUNT_MASK and SHARED_GUARD) 33 const UPGRADABLE_GUARD: usize = (GUARD_COUNT_MASK >> 1) + (SHARED_GUARD >> 1); 34 35 // Token indicating what type of lock queued threads are trying to acquire 36 const TOKEN_SHARED: ParkToken = ParkToken(SHARED_GUARD); 37 const TOKEN_EXCLUSIVE: ParkToken = ParkToken(EXCLUSIVE_GUARD); 38 const TOKEN_UPGRADABLE: ParkToken = ParkToken(UPGRADABLE_GUARD); 39 const TOKEN_UPGRADING: ParkToken = ParkToken((EXCLUSIVE_GUARD - UPGRADABLE_GUARD) | UPGRADING_BIT); 40 41 /// Raw reader-writer lock type backed by the parking lot. 42 pub struct RawRwLock { 43 state: AtomicUsize, 44 } 45 46 unsafe impl RawRwLockTrait for RawRwLock { 47 const INIT: RawRwLock = RawRwLock { 48 state: ATOMIC_USIZE_INIT, 49 }; 50 51 type GuardMarker = GuardNoSend; 52 53 #[inline] lock_exclusive(&self)54 fn lock_exclusive(&self) { 55 if self 56 .state 57 .compare_exchange_weak(0, EXCLUSIVE_GUARD, Ordering::Acquire, Ordering::Relaxed) 58 .is_err() 59 { 60 let result = self.lock_exclusive_slow(None); 61 debug_assert!(result); 62 } 63 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 64 } 65 66 #[inline] try_lock_exclusive(&self) -> bool67 fn try_lock_exclusive(&self) -> bool { 68 if self 69 .state 70 .compare_exchange(0, EXCLUSIVE_GUARD, Ordering::Acquire, Ordering::Relaxed) 71 .is_ok() 72 { 73 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 74 true 75 } else { 76 false 77 } 78 } 79 80 #[inline] unlock_exclusive(&self)81 fn unlock_exclusive(&self) { 82 unsafe { deadlock::release_resource(self as *const _ as usize) }; 83 if self 84 .state 85 .compare_exchange_weak(EXCLUSIVE_GUARD, 0, Ordering::Release, Ordering::Relaxed) 86 .is_ok() 87 { 88 return; 89 } 90 self.unlock_exclusive_slow(false); 91 } 92 93 #[inline] lock_shared(&self)94 fn lock_shared(&self) { 95 if !self.try_lock_shared_fast(false) { 96 let result = self.lock_shared_slow(false, None); 97 debug_assert!(result); 98 } 99 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 100 } 101 102 #[inline] try_lock_shared(&self) -> bool103 fn try_lock_shared(&self) -> bool { 104 let result = if self.try_lock_shared_fast(false) { 105 true 106 } else { 107 self.try_lock_shared_slow(false) 108 }; 109 if result { 110 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 111 } 112 result 113 } 114 115 #[inline] unlock_shared(&self)116 fn unlock_shared(&self) { 117 unsafe { deadlock::release_resource(self as *const _ as usize) }; 118 let state = self.state.load(Ordering::Relaxed); 119 if state & PARKED_BIT == 0 120 || (state & UPGRADING_BIT == 0 && state & GUARD_COUNT_MASK != SHARED_GUARD) 121 { 122 if have_elision() { 123 if self 124 .state 125 .elision_release(state, state - SHARED_GUARD) 126 .is_ok() 127 { 128 return; 129 } 130 } else { 131 if self 132 .state 133 .compare_exchange_weak( 134 state, 135 state - SHARED_GUARD, 136 Ordering::Release, 137 Ordering::Relaxed, 138 ) 139 .is_ok() 140 { 141 return; 142 } 143 } 144 } 145 self.unlock_shared_slow(false); 146 } 147 } 148 149 unsafe impl RawRwLockFair for RawRwLock { 150 #[inline] unlock_shared_fair(&self)151 fn unlock_shared_fair(&self) { 152 unsafe { deadlock::release_resource(self as *const _ as usize) }; 153 let state = self.state.load(Ordering::Relaxed); 154 if state & PARKED_BIT == 0 155 || (state & UPGRADING_BIT == 0 && state & GUARD_COUNT_MASK != SHARED_GUARD) 156 { 157 if have_elision() { 158 if self 159 .state 160 .elision_release(state, state - SHARED_GUARD) 161 .is_ok() 162 { 163 return; 164 } 165 } else { 166 if self 167 .state 168 .compare_exchange_weak( 169 state, 170 state - SHARED_GUARD, 171 Ordering::Release, 172 Ordering::Relaxed, 173 ) 174 .is_ok() 175 { 176 return; 177 } 178 } 179 } 180 self.unlock_shared_slow(true); 181 } 182 183 #[inline] unlock_exclusive_fair(&self)184 fn unlock_exclusive_fair(&self) { 185 unsafe { deadlock::release_resource(self as *const _ as usize) }; 186 if self 187 .state 188 .compare_exchange_weak(EXCLUSIVE_GUARD, 0, Ordering::Release, Ordering::Relaxed) 189 .is_ok() 190 { 191 return; 192 } 193 self.unlock_exclusive_slow(true); 194 } 195 196 #[inline] bump_shared(&self)197 fn bump_shared(&self) { 198 if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 { 199 self.bump_shared_slow(); 200 } 201 } 202 203 #[inline] bump_exclusive(&self)204 fn bump_exclusive(&self) { 205 if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 { 206 self.bump_exclusive_slow(); 207 } 208 } 209 } 210 211 unsafe impl RawRwLockDowngrade for RawRwLock { 212 #[inline] downgrade(&self)213 fn downgrade(&self) { 214 let state = self 215 .state 216 .fetch_sub(EXCLUSIVE_GUARD - SHARED_GUARD, Ordering::Release); 217 218 // Wake up parked shared and upgradable threads if there are any 219 if state & PARKED_BIT != 0 { 220 self.downgrade_slow(); 221 } 222 } 223 } 224 225 unsafe impl RawRwLockTimed for RawRwLock { 226 type Duration = Duration; 227 type Instant = Instant; 228 229 #[inline] try_lock_shared_for(&self, timeout: Self::Duration) -> bool230 fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool { 231 let result = if self.try_lock_shared_fast(false) { 232 true 233 } else { 234 self.lock_shared_slow(false, util::to_deadline(timeout)) 235 }; 236 if result { 237 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 238 } 239 result 240 } 241 242 #[inline] try_lock_shared_until(&self, timeout: Self::Instant) -> bool243 fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool { 244 let result = if self.try_lock_shared_fast(false) { 245 true 246 } else { 247 self.lock_shared_slow(false, Some(timeout)) 248 }; 249 if result { 250 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 251 } 252 result 253 } 254 255 #[inline] try_lock_exclusive_for(&self, timeout: Duration) -> bool256 fn try_lock_exclusive_for(&self, timeout: Duration) -> bool { 257 let result = if self 258 .state 259 .compare_exchange_weak(0, EXCLUSIVE_GUARD, Ordering::Acquire, Ordering::Relaxed) 260 .is_ok() 261 { 262 true 263 } else { 264 self.lock_exclusive_slow(util::to_deadline(timeout)) 265 }; 266 if result { 267 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 268 } 269 result 270 } 271 272 #[inline] try_lock_exclusive_until(&self, timeout: Instant) -> bool273 fn try_lock_exclusive_until(&self, timeout: Instant) -> bool { 274 let result = if self 275 .state 276 .compare_exchange_weak(0, EXCLUSIVE_GUARD, Ordering::Acquire, Ordering::Relaxed) 277 .is_ok() 278 { 279 true 280 } else { 281 self.lock_exclusive_slow(Some(timeout)) 282 }; 283 if result { 284 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 285 } 286 result 287 } 288 } 289 290 unsafe impl RawRwLockRecursive for RawRwLock { 291 #[inline] lock_shared_recursive(&self)292 fn lock_shared_recursive(&self) { 293 if !self.try_lock_shared_fast(true) { 294 let result = self.lock_shared_slow(true, None); 295 debug_assert!(result); 296 } 297 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 298 } 299 300 #[inline] try_lock_shared_recursive(&self) -> bool301 fn try_lock_shared_recursive(&self) -> bool { 302 let result = if self.try_lock_shared_fast(true) { 303 true 304 } else { 305 self.try_lock_shared_slow(true) 306 }; 307 if result { 308 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 309 } 310 result 311 } 312 } 313 314 unsafe impl RawRwLockRecursiveTimed for RawRwLock { 315 #[inline] try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool316 fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool { 317 let result = if self.try_lock_shared_fast(true) { 318 true 319 } else { 320 self.lock_shared_slow(true, util::to_deadline(timeout)) 321 }; 322 if result { 323 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 324 } 325 result 326 } 327 328 #[inline] try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool329 fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool { 330 let result = if self.try_lock_shared_fast(true) { 331 true 332 } else { 333 self.lock_shared_slow(true, Some(timeout)) 334 }; 335 if result { 336 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 337 } 338 result 339 } 340 } 341 342 unsafe impl RawRwLockUpgrade for RawRwLock { 343 #[inline] lock_upgradable(&self)344 fn lock_upgradable(&self) { 345 if !self.try_lock_upgradable_fast() { 346 let result = self.lock_upgradable_slow(None); 347 debug_assert!(result); 348 } 349 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 350 } 351 352 #[inline] try_lock_upgradable(&self) -> bool353 fn try_lock_upgradable(&self) -> bool { 354 let result = if self.try_lock_upgradable_fast() { 355 true 356 } else { 357 self.try_lock_upgradable_slow() 358 }; 359 if result { 360 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 361 } 362 result 363 } 364 365 #[inline] unlock_upgradable(&self)366 fn unlock_upgradable(&self) { 367 unsafe { deadlock::release_resource(self as *const _ as usize) }; 368 if self 369 .state 370 .compare_exchange_weak(UPGRADABLE_GUARD, 0, Ordering::Release, Ordering::Relaxed) 371 .is_ok() 372 { 373 return; 374 } 375 self.unlock_upgradable_slow(false); 376 } 377 378 #[inline] upgrade(&self)379 fn upgrade(&self) { 380 if self 381 .state 382 .compare_exchange_weak( 383 UPGRADABLE_GUARD, 384 EXCLUSIVE_GUARD, 385 Ordering::Relaxed, 386 Ordering::Relaxed, 387 ) 388 .is_err() 389 { 390 let result = self.upgrade_slow(None); 391 debug_assert!(result); 392 } 393 } 394 try_upgrade(&self) -> bool395 fn try_upgrade(&self) -> bool { 396 if self 397 .state 398 .compare_exchange_weak( 399 UPGRADABLE_GUARD, 400 EXCLUSIVE_GUARD, 401 Ordering::Relaxed, 402 Ordering::Relaxed, 403 ) 404 .is_ok() 405 { 406 true 407 } else { 408 self.try_upgrade_slow() 409 } 410 } 411 } 412 413 unsafe impl RawRwLockUpgradeFair for RawRwLock { 414 #[inline] unlock_upgradable_fair(&self)415 fn unlock_upgradable_fair(&self) { 416 unsafe { deadlock::release_resource(self as *const _ as usize) }; 417 if self 418 .state 419 .compare_exchange_weak(UPGRADABLE_GUARD, 0, Ordering::Release, Ordering::Relaxed) 420 .is_ok() 421 { 422 return; 423 } 424 self.unlock_upgradable_slow(true); 425 } 426 427 #[inline] bump_upgradable(&self)428 fn bump_upgradable(&self) { 429 if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 { 430 self.bump_upgradable_slow(); 431 } 432 } 433 } 434 435 unsafe impl RawRwLockUpgradeDowngrade for RawRwLock { 436 #[inline] downgrade_upgradable(&self)437 fn downgrade_upgradable(&self) { 438 let state = self 439 .state 440 .fetch_sub(UPGRADABLE_GUARD - SHARED_GUARD, Ordering::Relaxed); 441 442 // Wake up parked shared and upgradable threads if there are any 443 if state & PARKED_BIT != 0 { 444 self.downgrade_upgradable_slow(state); 445 } 446 } 447 448 #[inline] downgrade_to_upgradable(&self)449 fn downgrade_to_upgradable(&self) { 450 let state = self 451 .state 452 .fetch_sub(EXCLUSIVE_GUARD - UPGRADABLE_GUARD, Ordering::Release); 453 454 // Wake up parked shared threads if there are any 455 if state & PARKED_BIT != 0 { 456 self.downgrade_to_upgradable_slow(); 457 } 458 } 459 } 460 461 unsafe impl RawRwLockUpgradeTimed for RawRwLock { 462 #[inline] try_lock_upgradable_until(&self, timeout: Instant) -> bool463 fn try_lock_upgradable_until(&self, timeout: Instant) -> bool { 464 let result = if self.try_lock_upgradable_fast() { 465 true 466 } else { 467 self.lock_upgradable_slow(Some(timeout)) 468 }; 469 if result { 470 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 471 } 472 result 473 } 474 475 #[inline] try_lock_upgradable_for(&self, timeout: Duration) -> bool476 fn try_lock_upgradable_for(&self, timeout: Duration) -> bool { 477 let result = if self.try_lock_upgradable_fast() { 478 true 479 } else { 480 self.lock_upgradable_slow(util::to_deadline(timeout)) 481 }; 482 if result { 483 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 484 } 485 result 486 } 487 488 #[inline] try_upgrade_until(&self, timeout: Instant) -> bool489 fn try_upgrade_until(&self, timeout: Instant) -> bool { 490 if self 491 .state 492 .compare_exchange_weak( 493 UPGRADABLE_GUARD, 494 EXCLUSIVE_GUARD, 495 Ordering::Relaxed, 496 Ordering::Relaxed, 497 ) 498 .is_ok() 499 { 500 true 501 } else { 502 self.upgrade_slow(Some(timeout)) 503 } 504 } 505 506 #[inline] try_upgrade_for(&self, timeout: Duration) -> bool507 fn try_upgrade_for(&self, timeout: Duration) -> bool { 508 if self 509 .state 510 .compare_exchange_weak( 511 UPGRADABLE_GUARD, 512 EXCLUSIVE_GUARD, 513 Ordering::Relaxed, 514 Ordering::Relaxed, 515 ) 516 .is_ok() 517 { 518 true 519 } else { 520 self.upgrade_slow(util::to_deadline(timeout)) 521 } 522 } 523 } 524 525 impl RawRwLock { 526 #[inline(always)] try_lock_shared_fast(&self, recursive: bool) -> bool527 fn try_lock_shared_fast(&self, recursive: bool) -> bool { 528 let state = self.state.load(Ordering::Relaxed); 529 530 // We can't allow grabbing a shared lock while there are parked threads 531 // since that could lead to writer starvation. 532 if !recursive && state & PARKED_BIT != 0 { 533 return false; 534 } 535 536 // Use hardware lock elision to avoid cache conflicts when multiple 537 // readers try to acquire the lock. We only do this if the lock is 538 // completely empty since elision handles conflicts poorly. 539 if have_elision() && state == 0 { 540 self.state.elision_acquire(0, SHARED_GUARD).is_ok() 541 } else if let Some(new_state) = state.checked_add(SHARED_GUARD) { 542 self.state 543 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed) 544 .is_ok() 545 } else { 546 false 547 } 548 } 549 550 #[inline(always)] try_lock_upgradable_fast(&self) -> bool551 fn try_lock_upgradable_fast(&self) -> bool { 552 let state = self.state.load(Ordering::Relaxed); 553 554 // We can't allow grabbing an upgradable lock while there are parked threads 555 // since that could lead to writer starvation. 556 if state & PARKED_BIT != 0 { 557 return false; 558 } 559 560 if let Some(new_state) = state.checked_add(UPGRADABLE_GUARD) { 561 self.state 562 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed) 563 .is_ok() 564 } else { 565 false 566 } 567 } 568 569 #[cold] 570 #[inline(never)] lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool571 fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool { 572 let mut spinwait = SpinWait::new(); 573 let mut state = self.state.load(Ordering::Relaxed); 574 loop { 575 // Grab the lock if it isn't locked, even if there are other 576 // threads parked. 577 if let Some(new_state) = state.checked_add(EXCLUSIVE_GUARD) { 578 match self.state.compare_exchange_weak( 579 state, 580 new_state, 581 Ordering::Acquire, 582 Ordering::Relaxed, 583 ) { 584 Ok(_) => return true, 585 Err(x) => state = x, 586 } 587 continue; 588 } 589 590 // If there are no parked threads and only one reader or writer, try 591 // spinning a few times. 592 if (state == EXCLUSIVE_GUARD || state == SHARED_GUARD || state == UPGRADABLE_GUARD) 593 && spinwait.spin() 594 { 595 state = self.state.load(Ordering::Relaxed); 596 continue; 597 } 598 599 // Park our thread until we are woken up by an unlock 600 unsafe { 601 let addr = self as *const _ as usize; 602 let validate = || { 603 let mut state = self.state.load(Ordering::Relaxed); 604 loop { 605 // If the rwlock is free, abort the park and try to grab 606 // it immediately. 607 if state & GUARD_COUNT_MASK == 0 { 608 return false; 609 } 610 611 // Nothing to do if the parked bit is already set 612 if state & PARKED_BIT != 0 { 613 return true; 614 } 615 616 // Set the parked bit 617 match self.state.compare_exchange_weak( 618 state, 619 state | PARKED_BIT, 620 Ordering::Relaxed, 621 Ordering::Relaxed, 622 ) { 623 Ok(_) => return true, 624 Err(x) => state = x, 625 } 626 } 627 }; 628 let before_sleep = || {}; 629 let timed_out = |_, was_last_thread| { 630 // Clear the parked bit if we were the last parked thread 631 if was_last_thread { 632 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 633 } 634 }; 635 match parking_lot_core::park( 636 addr, 637 validate, 638 before_sleep, 639 timed_out, 640 TOKEN_EXCLUSIVE, 641 timeout, 642 ) { 643 // The thread that unparked us passed the lock on to us 644 // directly without unlocking it. 645 ParkResult::Unparked(TOKEN_HANDOFF) => return true, 646 647 // We were unparked normally, try acquiring the lock again 648 ParkResult::Unparked(_) => (), 649 650 // The validation function failed, try locking again 651 ParkResult::Invalid => (), 652 653 // Timeout expired 654 ParkResult::TimedOut => return false, 655 } 656 } 657 658 // Loop back and try locking again 659 spinwait.reset(); 660 state = self.state.load(Ordering::Relaxed); 661 } 662 } 663 664 #[cold] 665 #[inline(never)] unlock_exclusive_slow(&self, force_fair: bool)666 fn unlock_exclusive_slow(&self, force_fair: bool) { 667 // Unlock directly if there are no parked threads 668 if self 669 .state 670 .compare_exchange(EXCLUSIVE_GUARD, 0, Ordering::Release, Ordering::Relaxed) 671 .is_ok() 672 { 673 return; 674 }; 675 676 // There are threads to unpark. We unpark threads up to the guard capacity. 677 let guard_count = Cell::new(0usize); 678 unsafe { 679 let addr = self as *const _ as usize; 680 let filter = |ParkToken(token)| -> FilterOp { 681 match guard_count.get().checked_add(token) { 682 Some(new_guard_count) => { 683 guard_count.set(new_guard_count); 684 FilterOp::Unpark 685 } 686 None => FilterOp::Stop, 687 } 688 }; 689 let callback = |result: UnparkResult| { 690 // If we are using a fair unlock then we should keep the 691 // rwlock locked and hand it off to the unparked threads. 692 if result.unparked_threads != 0 && (force_fair || result.be_fair) { 693 // We need to set the guard count accordingly. 694 let mut new_state = guard_count.get(); 695 696 if result.have_more_threads { 697 new_state |= PARKED_BIT; 698 } 699 700 self.state.store(new_state, Ordering::Release); 701 TOKEN_HANDOFF 702 } else { 703 // Clear the parked bit if there are no more parked threads. 704 if result.have_more_threads { 705 self.state.store(PARKED_BIT, Ordering::Release); 706 } else { 707 self.state.store(0, Ordering::Release); 708 } 709 TOKEN_NORMAL 710 } 711 }; 712 parking_lot_core::unpark_filter(addr, filter, callback); 713 } 714 } 715 716 #[cold] 717 #[inline(never)] downgrade_slow(&self)718 fn downgrade_slow(&self) { 719 unsafe { 720 let addr = self as *const _ as usize; 721 let mut guard_count = SHARED_GUARD; 722 let filter = |ParkToken(token)| -> FilterOp { 723 match guard_count.checked_add(token) { 724 Some(new_guard_count) => { 725 guard_count = new_guard_count; 726 FilterOp::Unpark 727 } 728 None => FilterOp::Stop, 729 } 730 }; 731 let callback = |result: UnparkResult| { 732 // Clear the parked bit if there no more parked threads 733 if !result.have_more_threads { 734 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 735 } 736 TOKEN_NORMAL 737 }; 738 parking_lot_core::unpark_filter(addr, filter, callback); 739 } 740 } 741 742 #[cold] 743 #[inline(never)] downgrade_to_upgradable_slow(&self)744 fn downgrade_to_upgradable_slow(&self) { 745 unsafe { 746 let addr = self as *const _ as usize; 747 let mut guard_count = UPGRADABLE_GUARD; 748 let filter = |ParkToken(token)| -> FilterOp { 749 match guard_count.checked_add(token) { 750 Some(new_guard_count) => { 751 guard_count = new_guard_count; 752 FilterOp::Unpark 753 } 754 None => FilterOp::Stop, 755 } 756 }; 757 let callback = |result: UnparkResult| { 758 // Clear the parked bit if there no more parked threads 759 if !result.have_more_threads { 760 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 761 } 762 TOKEN_NORMAL 763 }; 764 parking_lot_core::unpark_filter(addr, filter, callback); 765 } 766 } 767 768 #[cold] 769 #[inline(never)] lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool770 fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool { 771 let mut spinwait = SpinWait::new(); 772 let mut spinwait_shared = SpinWait::new(); 773 let mut state = self.state.load(Ordering::Relaxed); 774 let mut unparked = false; 775 loop { 776 // Use hardware lock elision to avoid cache conflicts when multiple 777 // readers try to acquire the lock. We only do this if the lock is 778 // completely empty since elision handles conflicts poorly. 779 if have_elision() && state == 0 { 780 match self.state.elision_acquire(0, SHARED_GUARD) { 781 Ok(_) => return true, 782 Err(x) => state = x, 783 } 784 } 785 786 // Grab the lock if there are no exclusive threads locked or 787 // waiting. However if we were unparked then we are allowed to grab 788 // the lock even if there are pending exclusive threads. 789 if unparked || recursive || state & PARKED_BIT == 0 { 790 if let Some(new_state) = state.checked_add(SHARED_GUARD) { 791 if self 792 .state 793 .compare_exchange_weak( 794 state, 795 new_state, 796 Ordering::Acquire, 797 Ordering::Relaxed, 798 ) 799 .is_ok() 800 { 801 return true; 802 } 803 804 // If there is high contention on the reader count then we want 805 // to leave some time between attempts to acquire the lock to 806 // let other threads make progress. 807 spinwait_shared.spin_no_yield(); 808 state = self.state.load(Ordering::Relaxed); 809 continue; 810 } else { 811 // We were unparked spuriously, reset unparked flag. 812 unparked = false; 813 } 814 } 815 816 // If there are no parked threads, try spinning a few times 817 if state & PARKED_BIT == 0 && spinwait.spin() { 818 state = self.state.load(Ordering::Relaxed); 819 continue; 820 } 821 822 // Park our thread until we are woken up by an unlock 823 unsafe { 824 let addr = self as *const _ as usize; 825 let validate = || { 826 let mut state = self.state.load(Ordering::Relaxed); 827 loop { 828 // Nothing to do if the parked bit is already set 829 if state & PARKED_BIT != 0 { 830 return true; 831 } 832 833 // If the parked bit is not set then it means we are at 834 // the front of the queue. If there is space for another 835 // lock then we should abort the park and try acquiring 836 // the lock again. 837 if state & GUARD_COUNT_MASK != GUARD_COUNT_MASK { 838 return false; 839 } 840 841 // Set the parked bit 842 match self.state.compare_exchange_weak( 843 state, 844 state | PARKED_BIT, 845 Ordering::Relaxed, 846 Ordering::Relaxed, 847 ) { 848 Ok(_) => return true, 849 Err(x) => state = x, 850 } 851 } 852 }; 853 let before_sleep = || {}; 854 let timed_out = |_, was_last_thread| { 855 // Clear the parked bit if we were the last parked thread 856 if was_last_thread { 857 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 858 } 859 }; 860 match parking_lot_core::park( 861 addr, 862 validate, 863 before_sleep, 864 timed_out, 865 TOKEN_SHARED, 866 timeout, 867 ) { 868 // The thread that unparked us passed the lock on to us 869 // directly without unlocking it. 870 ParkResult::Unparked(TOKEN_HANDOFF) => return true, 871 872 // We were unparked normally, try acquiring the lock again 873 ParkResult::Unparked(_) => (), 874 875 // The validation function failed, try locking again 876 ParkResult::Invalid => (), 877 878 // Timeout expired 879 ParkResult::TimedOut => return false, 880 } 881 } 882 883 // Loop back and try locking again 884 spinwait.reset(); 885 spinwait_shared.reset(); 886 state = self.state.load(Ordering::Relaxed); 887 unparked = true; 888 } 889 } 890 891 #[cold] 892 #[inline(never)] try_lock_shared_slow(&self, recursive: bool) -> bool893 fn try_lock_shared_slow(&self, recursive: bool) -> bool { 894 let mut state = self.state.load(Ordering::Relaxed); 895 loop { 896 if !recursive && state & PARKED_BIT != 0 { 897 return false; 898 } 899 if have_elision() && state == 0 { 900 match self.state.elision_acquire(0, SHARED_GUARD) { 901 Ok(_) => return true, 902 Err(x) => state = x, 903 } 904 } else { 905 match state.checked_add(SHARED_GUARD) { 906 Some(new_state) => match self.state.compare_exchange_weak( 907 state, 908 new_state, 909 Ordering::Acquire, 910 Ordering::Relaxed, 911 ) { 912 Ok(_) => return true, 913 Err(x) => state = x, 914 }, 915 None => return false, 916 } 917 } 918 } 919 } 920 921 #[cold] 922 #[inline(never)] unlock_shared_slow(&self, force_fair: bool)923 fn unlock_shared_slow(&self, force_fair: bool) { 924 let mut state = self.state.load(Ordering::Relaxed); 925 loop { 926 // Just release the lock if there are no parked thread or if we are 927 // not the last shared thread. 928 if state & PARKED_BIT == 0 929 || (state & UPGRADING_BIT == 0 && state & GUARD_COUNT_MASK != SHARED_GUARD) 930 || (state & UPGRADING_BIT != 0 931 && state & GUARD_COUNT_MASK != UPGRADABLE_GUARD + SHARED_GUARD) 932 { 933 match self.state.compare_exchange_weak( 934 state, 935 state - SHARED_GUARD, 936 Ordering::Release, 937 Ordering::Relaxed, 938 ) { 939 Ok(_) => return, 940 Err(x) => state = x, 941 } 942 continue; 943 } 944 945 break; 946 } 947 948 // There are threads to unpark. If there is a thread waiting to be 949 // upgraded, we find that thread and let it upgrade, otherwise we 950 // unpark threads up to the guard capacity. Note that there is a 951 // potential race condition here: another thread might grab a shared 952 // lock between now and when we actually release our lock. 953 let additional_guards = Cell::new(0usize); 954 let has_upgraded = Cell::new(false); 955 unsafe { 956 let addr = self as *const _ as usize; 957 let filter = |ParkToken(token)| -> FilterOp { 958 // We need to check UPGRADING_BIT while holding the bucket lock, 959 // otherwise we might miss a thread trying to upgrade. 960 if self.state.load(Ordering::Relaxed) & UPGRADING_BIT == 0 { 961 match additional_guards.get().checked_add(token) { 962 Some(x) => { 963 additional_guards.set(x); 964 FilterOp::Unpark 965 } 966 None => FilterOp::Stop, 967 } 968 } else if has_upgraded.get() { 969 FilterOp::Stop 970 } else { 971 if token & UPGRADING_BIT != 0 { 972 additional_guards.set(token & !UPGRADING_BIT); 973 has_upgraded.set(true); 974 FilterOp::Unpark 975 } else { 976 FilterOp::Skip 977 } 978 } 979 }; 980 let callback = |result: UnparkResult| { 981 let mut state = self.state.load(Ordering::Relaxed); 982 loop { 983 // Release our shared lock 984 let mut new_state = state - SHARED_GUARD; 985 986 // Clear the parked bit if there are no more threads in 987 // the queue. 988 if !result.have_more_threads { 989 new_state &= !PARKED_BIT; 990 } 991 992 // Clear the upgrading bit if we are upgrading a thread. 993 if has_upgraded.get() { 994 new_state &= !UPGRADING_BIT; 995 } 996 997 // Consider using fair unlocking. If we are, then we should set 998 // the state to the new value and tell the threads that we are 999 // handing the lock directly. 1000 let token = if result.unparked_threads != 0 && (force_fair || result.be_fair) { 1001 match new_state.checked_add(additional_guards.get()) { 1002 Some(x) => { 1003 new_state = x; 1004 TOKEN_HANDOFF 1005 } 1006 None => TOKEN_NORMAL, 1007 } 1008 } else { 1009 TOKEN_NORMAL 1010 }; 1011 1012 match self.state.compare_exchange_weak( 1013 state, 1014 new_state, 1015 Ordering::Release, 1016 Ordering::Relaxed, 1017 ) { 1018 Ok(_) => return token, 1019 Err(x) => state = x, 1020 } 1021 } 1022 }; 1023 parking_lot_core::unpark_filter(addr, filter, callback); 1024 } 1025 } 1026 1027 #[cold] 1028 #[inline(never)] lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool1029 fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool { 1030 let mut spinwait = SpinWait::new(); 1031 let mut spinwait_shared = SpinWait::new(); 1032 let mut state = self.state.load(Ordering::Relaxed); 1033 let mut unparked = false; 1034 loop { 1035 // Grab the lock if there are no exclusive or upgradable threads 1036 // locked or waiting. However if we were unparked then we are 1037 // allowed to grab the lock even if there are pending exclusive threads. 1038 if unparked || state & PARKED_BIT == 0 { 1039 if let Some(new_state) = state.checked_add(UPGRADABLE_GUARD) { 1040 if self 1041 .state 1042 .compare_exchange_weak( 1043 state, 1044 new_state, 1045 Ordering::Acquire, 1046 Ordering::Relaxed, 1047 ) 1048 .is_ok() 1049 { 1050 return true; 1051 } 1052 1053 // If there is high contention on the reader count then we want 1054 // to leave some time between attempts to acquire the lock to 1055 // let other threads make progress. 1056 spinwait_shared.spin_no_yield(); 1057 state = self.state.load(Ordering::Relaxed); 1058 continue; 1059 } else { 1060 // We were unparked spuriously, reset unparked flag. 1061 unparked = false; 1062 } 1063 } 1064 1065 // If there are no parked threads, try spinning a few times 1066 if state & PARKED_BIT == 0 && spinwait.spin() { 1067 state = self.state.load(Ordering::Relaxed); 1068 continue; 1069 } 1070 1071 // Park our thread until we are woken up by an unlock 1072 unsafe { 1073 let addr = self as *const _ as usize; 1074 let validate = || { 1075 let mut state = self.state.load(Ordering::Relaxed); 1076 loop { 1077 // Nothing to do if the parked bit is already set 1078 if state & PARKED_BIT != 0 { 1079 return true; 1080 } 1081 1082 // If the parked bit is not set then it means we are at 1083 // the front of the queue. If there is space for an 1084 // upgradable lock then we should abort the park and try 1085 // acquiring the lock again. 1086 if state & UPGRADABLE_GUARD != UPGRADABLE_GUARD { 1087 return false; 1088 } 1089 1090 // Set the parked bit 1091 match self.state.compare_exchange_weak( 1092 state, 1093 state | PARKED_BIT, 1094 Ordering::Relaxed, 1095 Ordering::Relaxed, 1096 ) { 1097 Ok(_) => return true, 1098 Err(x) => state = x, 1099 } 1100 } 1101 }; 1102 let before_sleep = || {}; 1103 let timed_out = |_, was_last_thread| { 1104 // Clear the parked bit if we were the last parked thread 1105 if was_last_thread { 1106 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 1107 } 1108 }; 1109 match parking_lot_core::park( 1110 addr, 1111 validate, 1112 before_sleep, 1113 timed_out, 1114 TOKEN_UPGRADABLE, 1115 timeout, 1116 ) { 1117 // The thread that unparked us passed the lock on to us 1118 // directly without unlocking it. 1119 ParkResult::Unparked(TOKEN_HANDOFF) => return true, 1120 1121 // We were unparked normally, try acquiring the lock again 1122 ParkResult::Unparked(_) => (), 1123 1124 // The validation function failed, try locking again 1125 ParkResult::Invalid => (), 1126 1127 // Timeout expired 1128 ParkResult::TimedOut => return false, 1129 } 1130 } 1131 1132 // Loop back and try locking again 1133 spinwait.reset(); 1134 spinwait_shared.reset(); 1135 state = self.state.load(Ordering::Relaxed); 1136 unparked = true; 1137 } 1138 } 1139 1140 #[cold] 1141 #[inline(never)] try_lock_upgradable_slow(&self) -> bool1142 fn try_lock_upgradable_slow(&self) -> bool { 1143 let mut state = self.state.load(Ordering::Relaxed); 1144 loop { 1145 if state & PARKED_BIT != 0 { 1146 return false; 1147 } 1148 1149 match state.checked_add(UPGRADABLE_GUARD) { 1150 Some(new_state) => match self.state.compare_exchange_weak( 1151 state, 1152 new_state, 1153 Ordering::Acquire, 1154 Ordering::Relaxed, 1155 ) { 1156 Ok(_) => return true, 1157 Err(x) => state = x, 1158 }, 1159 None => return false, 1160 } 1161 } 1162 } 1163 1164 #[cold] 1165 #[inline(never)] unlock_upgradable_slow(&self, force_fair: bool)1166 fn unlock_upgradable_slow(&self, force_fair: bool) { 1167 let mut state = self.state.load(Ordering::Relaxed); 1168 loop { 1169 // Just release the lock if there are no parked threads. 1170 if state & PARKED_BIT == 0 { 1171 match self.state.compare_exchange_weak( 1172 state, 1173 state - UPGRADABLE_GUARD, 1174 Ordering::Release, 1175 Ordering::Relaxed, 1176 ) { 1177 Ok(_) => return, 1178 Err(x) => state = x, 1179 } 1180 continue; 1181 } 1182 1183 break; 1184 } 1185 1186 // There are threads to unpark. We unpark threads up to the guard capacity. 1187 let additional_guards = Cell::new(0usize); 1188 unsafe { 1189 let addr = self as *const _ as usize; 1190 let filter = |ParkToken(token)| -> FilterOp { 1191 match additional_guards.get().checked_add(token) { 1192 Some(x) => { 1193 additional_guards.set(x); 1194 FilterOp::Unpark 1195 } 1196 None => FilterOp::Stop, 1197 } 1198 }; 1199 let callback = |result: UnparkResult| { 1200 let mut state = self.state.load(Ordering::Relaxed); 1201 loop { 1202 // Release our upgradable lock 1203 let mut new_state = state - UPGRADABLE_GUARD; 1204 1205 // Clear the parked bit if there are no more threads in 1206 // the queue 1207 if !result.have_more_threads { 1208 new_state &= !PARKED_BIT; 1209 } 1210 1211 // Consider using fair unlocking. If we are, then we should set 1212 // the state to the new value and tell the threads that we are 1213 // handing the lock directly. 1214 let token = if result.unparked_threads != 0 && (force_fair || result.be_fair) { 1215 match new_state.checked_add(additional_guards.get()) { 1216 Some(x) => { 1217 new_state = x; 1218 TOKEN_HANDOFF 1219 } 1220 None => TOKEN_NORMAL, 1221 } 1222 } else { 1223 TOKEN_NORMAL 1224 }; 1225 1226 match self.state.compare_exchange_weak( 1227 state, 1228 new_state, 1229 Ordering::Release, 1230 Ordering::Relaxed, 1231 ) { 1232 Ok(_) => return token, 1233 Err(x) => state = x, 1234 } 1235 } 1236 }; 1237 parking_lot_core::unpark_filter(addr, filter, callback); 1238 } 1239 } 1240 1241 #[cold] 1242 #[inline(never)] downgrade_upgradable_slow(&self, state: usize)1243 fn downgrade_upgradable_slow(&self, state: usize) { 1244 unsafe { 1245 let addr = self as *const _ as usize; 1246 let mut guard_count = (state & GUARD_COUNT_MASK) - UPGRADABLE_GUARD; 1247 let filter = |ParkToken(token)| -> FilterOp { 1248 match guard_count.checked_add(token) { 1249 Some(x) => { 1250 guard_count = x; 1251 FilterOp::Unpark 1252 } 1253 None => FilterOp::Stop, 1254 } 1255 }; 1256 let callback = |result: UnparkResult| { 1257 // Clear the parked bit if there no more parked threads 1258 if !result.have_more_threads { 1259 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 1260 } 1261 TOKEN_NORMAL 1262 }; 1263 parking_lot_core::unpark_filter(addr, filter, callback); 1264 } 1265 } 1266 1267 #[cold] 1268 #[inline(never)] try_upgrade_slow(&self) -> bool1269 fn try_upgrade_slow(&self) -> bool { 1270 let mut state = self.state.load(Ordering::Relaxed); 1271 loop { 1272 match state.checked_add(EXCLUSIVE_GUARD - SHARED_GUARD) { 1273 Some(new_state) => match self.state.compare_exchange_weak( 1274 state, 1275 new_state, 1276 Ordering::Relaxed, 1277 Ordering::Relaxed, 1278 ) { 1279 Ok(_) => return true, 1280 Err(x) => state = x, 1281 }, 1282 None => return false, 1283 } 1284 } 1285 } 1286 1287 #[cold] 1288 #[inline(never)] upgrade_slow(&self, timeout: Option<Instant>) -> bool1289 fn upgrade_slow(&self, timeout: Option<Instant>) -> bool { 1290 let mut spinwait = SpinWait::new(); 1291 let mut state = self.state.load(Ordering::Relaxed); 1292 loop { 1293 // Grab the lock if it isn't locked, even if there are other 1294 // threads parked. 1295 if let Some(new_state) = state.checked_add(EXCLUSIVE_GUARD - UPGRADABLE_GUARD) { 1296 match self.state.compare_exchange_weak( 1297 state, 1298 new_state, 1299 Ordering::Acquire, 1300 Ordering::Relaxed, 1301 ) { 1302 Ok(_) => return true, 1303 Err(x) => state = x, 1304 } 1305 continue; 1306 } 1307 1308 // If there are no parked threads and only one other reader, try 1309 // spinning a few times. 1310 if state == UPGRADABLE_GUARD | SHARED_GUARD && spinwait.spin() { 1311 state = self.state.load(Ordering::Relaxed); 1312 continue; 1313 } 1314 1315 // Park our thread until we are woken up by an unlock 1316 unsafe { 1317 let addr = self as *const _ as usize; 1318 let validate = || { 1319 let mut state = self.state.load(Ordering::Relaxed); 1320 loop { 1321 // If the rwlock is free, abort the park and try to grab 1322 // it immediately. 1323 if state & GUARD_COUNT_MASK == UPGRADABLE_GUARD { 1324 return false; 1325 } 1326 1327 // Set the upgrading and parked bits 1328 match self.state.compare_exchange_weak( 1329 state, 1330 state | (UPGRADING_BIT | PARKED_BIT), 1331 Ordering::Relaxed, 1332 Ordering::Relaxed, 1333 ) { 1334 Ok(_) => return true, 1335 Err(x) => state = x, 1336 } 1337 } 1338 }; 1339 let before_sleep = || {}; 1340 let timed_out = |_, was_last_thread| { 1341 // Clear the upgrading bit 1342 let mut flags = UPGRADING_BIT; 1343 1344 // Clear the parked bit if we were the last parked thread 1345 if was_last_thread { 1346 flags |= PARKED_BIT; 1347 } 1348 1349 self.state.fetch_and(!flags, Ordering::Relaxed); 1350 }; 1351 match parking_lot_core::park( 1352 addr, 1353 validate, 1354 before_sleep, 1355 timed_out, 1356 TOKEN_UPGRADING, 1357 timeout, 1358 ) { 1359 // The thread that unparked us passed the lock on to us 1360 // directly without unlocking it. 1361 ParkResult::Unparked(TOKEN_HANDOFF) => return true, 1362 1363 // We were unparked normally, try acquiring the lock again 1364 ParkResult::Unparked(_) => (), 1365 1366 // The validation function failed, try locking again 1367 ParkResult::Invalid => (), 1368 1369 // Timeout expired 1370 ParkResult::TimedOut => return false, 1371 } 1372 } 1373 1374 // Loop back and try locking again 1375 spinwait.reset(); 1376 state = self.state.load(Ordering::Relaxed); 1377 } 1378 } 1379 1380 #[cold] 1381 #[inline(never)] bump_shared_slow(&self)1382 fn bump_shared_slow(&self) { 1383 unsafe { deadlock::release_resource(self as *const _ as usize) }; 1384 self.unlock_shared_slow(true); 1385 self.lock_shared(); 1386 } 1387 1388 #[cold] 1389 #[inline(never)] bump_exclusive_slow(&self)1390 fn bump_exclusive_slow(&self) { 1391 unsafe { deadlock::release_resource(self as *const _ as usize) }; 1392 self.unlock_exclusive_slow(true); 1393 self.lock_exclusive(); 1394 } 1395 1396 #[cold] 1397 #[inline(never)] bump_upgradable_slow(&self)1398 fn bump_upgradable_slow(&self) { 1399 unsafe { deadlock::release_resource(self as *const _ as usize) }; 1400 self.unlock_upgradable_slow(true); 1401 self.lock_upgradable(); 1402 } 1403 } 1404