1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 
8 use crate::elision::{have_elision, AtomicElisionExt};
9 use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL};
10 use crate::util;
11 use core::{
12     cell::Cell,
13     sync::atomic::{AtomicUsize, Ordering},
14 };
15 use instant::Instant;
16 use lock_api::{RawRwLock as RawRwLock_, RawRwLockUpgrade};
17 use parking_lot_core::{
18     self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken,
19 };
20 use std::time::Duration;
21 
22 // This reader-writer lock implementation is based on Boost's upgrade_mutex:
23 // https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432
24 //
25 // This implementation uses 2 wait queues, one at key [addr] and one at key
26 // [addr + 1]. The primary queue is used for all new waiting threads, and the
27 // secondary queue is used by the thread which has acquired WRITER_BIT but is
28 // waiting for the remaining readers to exit the lock.
29 //
30 // This implementation is fair between readers and writers since it uses the
31 // order in which threads first started queuing to alternate between read phases
32 // and write phases. In particular is it not vulnerable to write starvation
33 // since readers will block if there is a pending writer.
34 
35 // There is at least one thread in the main queue.
36 const PARKED_BIT: usize = 0b0001;
37 // There is a parked thread holding WRITER_BIT. WRITER_BIT must be set.
38 const WRITER_PARKED_BIT: usize = 0b0010;
39 // A reader is holding an upgradable lock. The reader count must be non-zero and
40 // WRITER_BIT must not be set.
41 const UPGRADABLE_BIT: usize = 0b0100;
42 // If the reader count is zero: a writer is currently holding an exclusive lock.
43 // Otherwise: a writer is waiting for the remaining readers to exit the lock.
44 const WRITER_BIT: usize = 0b1000;
45 // Mask of bits used to count readers.
46 const READERS_MASK: usize = !0b1111;
47 // Base unit for counting readers.
48 const ONE_READER: usize = 0b10000;
49 
50 // Token indicating what type of lock a queued thread is trying to acquire
51 const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER);
52 const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT);
53 const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT);
54 
55 /// Raw reader-writer lock type backed by the parking lot.
56 pub struct RawRwLock {
57     state: AtomicUsize,
58 }
59 
60 unsafe impl lock_api::RawRwLock for RawRwLock {
61     const INIT: RawRwLock = RawRwLock {
62         state: AtomicUsize::new(0),
63     };
64 
65     type GuardMarker = crate::GuardMarker;
66 
67     #[inline]
lock_exclusive(&self)68     fn lock_exclusive(&self) {
69         if self
70             .state
71             .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
72             .is_err()
73         {
74             let result = self.lock_exclusive_slow(None);
75             debug_assert!(result);
76         }
77         self.deadlock_acquire();
78     }
79 
80     #[inline]
try_lock_exclusive(&self) -> bool81     fn try_lock_exclusive(&self) -> bool {
82         if self
83             .state
84             .compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
85             .is_ok()
86         {
87             self.deadlock_acquire();
88             true
89         } else {
90             false
91         }
92     }
93 
94     #[inline]
unlock_exclusive(&self)95     unsafe fn unlock_exclusive(&self) {
96         self.deadlock_release();
97         if self
98             .state
99             .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
100             .is_ok()
101         {
102             return;
103         }
104         self.unlock_exclusive_slow(false);
105     }
106 
107     #[inline]
lock_shared(&self)108     fn lock_shared(&self) {
109         if !self.try_lock_shared_fast(false) {
110             let result = self.lock_shared_slow(false, None);
111             debug_assert!(result);
112         }
113         self.deadlock_acquire();
114     }
115 
116     #[inline]
try_lock_shared(&self) -> bool117     fn try_lock_shared(&self) -> bool {
118         let result = if self.try_lock_shared_fast(false) {
119             true
120         } else {
121             self.try_lock_shared_slow(false)
122         };
123         if result {
124             self.deadlock_acquire();
125         }
126         result
127     }
128 
129     #[inline]
unlock_shared(&self)130     unsafe fn unlock_shared(&self) {
131         self.deadlock_release();
132         let state = if have_elision() {
133             self.state.elision_fetch_sub_release(ONE_READER)
134         } else {
135             self.state.fetch_sub(ONE_READER, Ordering::Release)
136         };
137         if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) {
138             self.unlock_shared_slow();
139         }
140     }
141 
142     #[inline]
is_locked(&self) -> bool143     fn is_locked(&self) -> bool {
144         let state = self.state.load(Ordering::Relaxed);
145         state & (WRITER_BIT | READERS_MASK) != 0
146     }
147 }
148 
149 unsafe impl lock_api::RawRwLockFair for RawRwLock {
150     #[inline]
unlock_shared_fair(&self)151     unsafe fn unlock_shared_fair(&self) {
152         // Shared unlocking is always fair in this implementation.
153         self.unlock_shared();
154     }
155 
156     #[inline]
unlock_exclusive_fair(&self)157     unsafe fn unlock_exclusive_fair(&self) {
158         self.deadlock_release();
159         if self
160             .state
161             .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
162             .is_ok()
163         {
164             return;
165         }
166         self.unlock_exclusive_slow(true);
167     }
168 
169     #[inline]
bump_shared(&self)170     unsafe fn bump_shared(&self) {
171         if self.state.load(Ordering::Relaxed) & (READERS_MASK | WRITER_BIT)
172             == ONE_READER | WRITER_BIT
173         {
174             self.bump_shared_slow();
175         }
176     }
177 
178     #[inline]
bump_exclusive(&self)179     unsafe fn bump_exclusive(&self) {
180         if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 {
181             self.bump_exclusive_slow();
182         }
183     }
184 }
185 
186 unsafe impl lock_api::RawRwLockDowngrade for RawRwLock {
187     #[inline]
downgrade(&self)188     unsafe fn downgrade(&self) {
189         let state = self
190             .state
191             .fetch_add(ONE_READER - WRITER_BIT, Ordering::Release);
192 
193         // Wake up parked shared and upgradable threads if there are any
194         if state & PARKED_BIT != 0 {
195             self.downgrade_slow();
196         }
197     }
198 }
199 
200 unsafe impl lock_api::RawRwLockTimed for RawRwLock {
201     type Duration = Duration;
202     type Instant = Instant;
203 
204     #[inline]
try_lock_shared_for(&self, timeout: Self::Duration) -> bool205     fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool {
206         let result = if self.try_lock_shared_fast(false) {
207             true
208         } else {
209             self.lock_shared_slow(false, util::to_deadline(timeout))
210         };
211         if result {
212             self.deadlock_acquire();
213         }
214         result
215     }
216 
217     #[inline]
try_lock_shared_until(&self, timeout: Self::Instant) -> bool218     fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool {
219         let result = if self.try_lock_shared_fast(false) {
220             true
221         } else {
222             self.lock_shared_slow(false, Some(timeout))
223         };
224         if result {
225             self.deadlock_acquire();
226         }
227         result
228     }
229 
230     #[inline]
try_lock_exclusive_for(&self, timeout: Duration) -> bool231     fn try_lock_exclusive_for(&self, timeout: Duration) -> bool {
232         let result = if self
233             .state
234             .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
235             .is_ok()
236         {
237             true
238         } else {
239             self.lock_exclusive_slow(util::to_deadline(timeout))
240         };
241         if result {
242             self.deadlock_acquire();
243         }
244         result
245     }
246 
247     #[inline]
try_lock_exclusive_until(&self, timeout: Instant) -> bool248     fn try_lock_exclusive_until(&self, timeout: Instant) -> bool {
249         let result = if self
250             .state
251             .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
252             .is_ok()
253         {
254             true
255         } else {
256             self.lock_exclusive_slow(Some(timeout))
257         };
258         if result {
259             self.deadlock_acquire();
260         }
261         result
262     }
263 }
264 
265 unsafe impl lock_api::RawRwLockRecursive for RawRwLock {
266     #[inline]
lock_shared_recursive(&self)267     fn lock_shared_recursive(&self) {
268         if !self.try_lock_shared_fast(true) {
269             let result = self.lock_shared_slow(true, None);
270             debug_assert!(result);
271         }
272         self.deadlock_acquire();
273     }
274 
275     #[inline]
try_lock_shared_recursive(&self) -> bool276     fn try_lock_shared_recursive(&self) -> bool {
277         let result = if self.try_lock_shared_fast(true) {
278             true
279         } else {
280             self.try_lock_shared_slow(true)
281         };
282         if result {
283             self.deadlock_acquire();
284         }
285         result
286     }
287 }
288 
289 unsafe impl lock_api::RawRwLockRecursiveTimed for RawRwLock {
290     #[inline]
try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool291     fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool {
292         let result = if self.try_lock_shared_fast(true) {
293             true
294         } else {
295             self.lock_shared_slow(true, util::to_deadline(timeout))
296         };
297         if result {
298             self.deadlock_acquire();
299         }
300         result
301     }
302 
303     #[inline]
try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool304     fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool {
305         let result = if self.try_lock_shared_fast(true) {
306             true
307         } else {
308             self.lock_shared_slow(true, Some(timeout))
309         };
310         if result {
311             self.deadlock_acquire();
312         }
313         result
314     }
315 }
316 
317 unsafe impl lock_api::RawRwLockUpgrade for RawRwLock {
318     #[inline]
lock_upgradable(&self)319     fn lock_upgradable(&self) {
320         if !self.try_lock_upgradable_fast() {
321             let result = self.lock_upgradable_slow(None);
322             debug_assert!(result);
323         }
324         self.deadlock_acquire();
325     }
326 
327     #[inline]
try_lock_upgradable(&self) -> bool328     fn try_lock_upgradable(&self) -> bool {
329         let result = if self.try_lock_upgradable_fast() {
330             true
331         } else {
332             self.try_lock_upgradable_slow()
333         };
334         if result {
335             self.deadlock_acquire();
336         }
337         result
338     }
339 
340     #[inline]
unlock_upgradable(&self)341     unsafe fn unlock_upgradable(&self) {
342         self.deadlock_release();
343         let state = self.state.load(Ordering::Relaxed);
344         if state & PARKED_BIT == 0 {
345             if self
346                 .state
347                 .compare_exchange_weak(
348                     state,
349                     state - (ONE_READER | UPGRADABLE_BIT),
350                     Ordering::Release,
351                     Ordering::Relaxed,
352                 )
353                 .is_ok()
354             {
355                 return;
356             }
357         }
358         self.unlock_upgradable_slow(false);
359     }
360 
361     #[inline]
upgrade(&self)362     unsafe fn upgrade(&self) {
363         let state = self.state.fetch_sub(
364             (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
365             Ordering::Acquire,
366         );
367         if state & READERS_MASK != ONE_READER {
368             let result = self.upgrade_slow(None);
369             debug_assert!(result);
370         }
371     }
372 
373     #[inline]
try_upgrade(&self) -> bool374     unsafe fn try_upgrade(&self) -> bool {
375         if self
376             .state
377             .compare_exchange_weak(
378                 ONE_READER | UPGRADABLE_BIT,
379                 WRITER_BIT,
380                 Ordering::Acquire,
381                 Ordering::Relaxed,
382             )
383             .is_ok()
384         {
385             true
386         } else {
387             self.try_upgrade_slow()
388         }
389     }
390 }
391 
392 unsafe impl lock_api::RawRwLockUpgradeFair for RawRwLock {
393     #[inline]
unlock_upgradable_fair(&self)394     unsafe fn unlock_upgradable_fair(&self) {
395         self.deadlock_release();
396         let state = self.state.load(Ordering::Relaxed);
397         if state & PARKED_BIT == 0 {
398             if self
399                 .state
400                 .compare_exchange_weak(
401                     state,
402                     state - (ONE_READER | UPGRADABLE_BIT),
403                     Ordering::Release,
404                     Ordering::Relaxed,
405                 )
406                 .is_ok()
407             {
408                 return;
409             }
410         }
411         self.unlock_upgradable_slow(false);
412     }
413 
414     #[inline]
bump_upgradable(&self)415     unsafe fn bump_upgradable(&self) {
416         if self.state.load(Ordering::Relaxed) == ONE_READER | UPGRADABLE_BIT | PARKED_BIT {
417             self.bump_upgradable_slow();
418         }
419     }
420 }
421 
422 unsafe impl lock_api::RawRwLockUpgradeDowngrade for RawRwLock {
423     #[inline]
downgrade_upgradable(&self)424     unsafe fn downgrade_upgradable(&self) {
425         let state = self.state.fetch_sub(UPGRADABLE_BIT, Ordering::Relaxed);
426 
427         // Wake up parked upgradable threads if there are any
428         if state & PARKED_BIT != 0 {
429             self.downgrade_slow();
430         }
431     }
432 
433     #[inline]
downgrade_to_upgradable(&self)434     unsafe fn downgrade_to_upgradable(&self) {
435         let state = self.state.fetch_add(
436             (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
437             Ordering::Release,
438         );
439 
440         // Wake up parked shared threads if there are any
441         if state & PARKED_BIT != 0 {
442             self.downgrade_to_upgradable_slow();
443         }
444     }
445 }
446 
447 unsafe impl lock_api::RawRwLockUpgradeTimed for RawRwLock {
448     #[inline]
try_lock_upgradable_until(&self, timeout: Instant) -> bool449     fn try_lock_upgradable_until(&self, timeout: Instant) -> bool {
450         let result = if self.try_lock_upgradable_fast() {
451             true
452         } else {
453             self.lock_upgradable_slow(Some(timeout))
454         };
455         if result {
456             self.deadlock_acquire();
457         }
458         result
459     }
460 
461     #[inline]
try_lock_upgradable_for(&self, timeout: Duration) -> bool462     fn try_lock_upgradable_for(&self, timeout: Duration) -> bool {
463         let result = if self.try_lock_upgradable_fast() {
464             true
465         } else {
466             self.lock_upgradable_slow(util::to_deadline(timeout))
467         };
468         if result {
469             self.deadlock_acquire();
470         }
471         result
472     }
473 
474     #[inline]
try_upgrade_until(&self, timeout: Instant) -> bool475     unsafe fn try_upgrade_until(&self, timeout: Instant) -> bool {
476         let state = self.state.fetch_sub(
477             (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
478             Ordering::Relaxed,
479         );
480         if state & READERS_MASK == ONE_READER {
481             true
482         } else {
483             self.upgrade_slow(Some(timeout))
484         }
485     }
486 
487     #[inline]
try_upgrade_for(&self, timeout: Duration) -> bool488     unsafe fn try_upgrade_for(&self, timeout: Duration) -> bool {
489         let state = self.state.fetch_sub(
490             (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
491             Ordering::Relaxed,
492         );
493         if state & READERS_MASK == ONE_READER {
494             true
495         } else {
496             self.upgrade_slow(util::to_deadline(timeout))
497         }
498     }
499 }
500 
501 impl RawRwLock {
502     #[inline(always)]
try_lock_shared_fast(&self, recursive: bool) -> bool503     fn try_lock_shared_fast(&self, recursive: bool) -> bool {
504         let state = self.state.load(Ordering::Relaxed);
505 
506         // We can't allow grabbing a shared lock if there is a writer, even if
507         // the writer is still waiting for the remaining readers to exit.
508         if state & WRITER_BIT != 0 {
509             // To allow recursive locks, we make an exception and allow readers
510             // to skip ahead of a pending writer to avoid deadlocking, at the
511             // cost of breaking the fairness guarantees.
512             if !recursive || state & READERS_MASK == 0 {
513                 return false;
514             }
515         }
516 
517         // Use hardware lock elision to avoid cache conflicts when multiple
518         // readers try to acquire the lock. We only do this if the lock is
519         // completely empty since elision handles conflicts poorly.
520         if have_elision() && state == 0 {
521             self.state
522                 .elision_compare_exchange_acquire(0, ONE_READER)
523                 .is_ok()
524         } else if let Some(new_state) = state.checked_add(ONE_READER) {
525             self.state
526                 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
527                 .is_ok()
528         } else {
529             false
530         }
531     }
532 
533     #[cold]
try_lock_shared_slow(&self, recursive: bool) -> bool534     fn try_lock_shared_slow(&self, recursive: bool) -> bool {
535         let mut state = self.state.load(Ordering::Relaxed);
536         loop {
537             // This mirrors the condition in try_lock_shared_fast
538             if state & WRITER_BIT != 0 {
539                 if !recursive || state & READERS_MASK == 0 {
540                     return false;
541                 }
542             }
543             if have_elision() && state == 0 {
544                 match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
545                     Ok(_) => return true,
546                     Err(x) => state = x,
547                 }
548             } else {
549                 match self.state.compare_exchange_weak(
550                     state,
551                     state
552                         .checked_add(ONE_READER)
553                         .expect("RwLock reader count overflow"),
554                     Ordering::Acquire,
555                     Ordering::Relaxed,
556                 ) {
557                     Ok(_) => return true,
558                     Err(x) => state = x,
559                 }
560             }
561         }
562     }
563 
564     #[inline(always)]
try_lock_upgradable_fast(&self) -> bool565     fn try_lock_upgradable_fast(&self) -> bool {
566         let state = self.state.load(Ordering::Relaxed);
567 
568         // We can't grab an upgradable lock if there is already a writer or
569         // upgradable reader.
570         if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
571             return false;
572         }
573 
574         if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) {
575             self.state
576                 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
577                 .is_ok()
578         } else {
579             false
580         }
581     }
582 
583     #[cold]
try_lock_upgradable_slow(&self) -> bool584     fn try_lock_upgradable_slow(&self) -> bool {
585         let mut state = self.state.load(Ordering::Relaxed);
586         loop {
587             // This mirrors the condition in try_lock_upgradable_fast
588             if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
589                 return false;
590             }
591 
592             match self.state.compare_exchange_weak(
593                 state,
594                 state
595                     .checked_add(ONE_READER | UPGRADABLE_BIT)
596                     .expect("RwLock reader count overflow"),
597                 Ordering::Acquire,
598                 Ordering::Relaxed,
599             ) {
600                 Ok(_) => return true,
601                 Err(x) => state = x,
602             }
603         }
604     }
605 
606     #[cold]
lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool607     fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool {
608         let try_lock = |state: &mut usize| {
609             loop {
610                 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
611                     return false;
612                 }
613 
614                 // Grab WRITER_BIT if it isn't set, even if there are parked threads.
615                 match self.state.compare_exchange_weak(
616                     *state,
617                     *state | WRITER_BIT,
618                     Ordering::Acquire,
619                     Ordering::Relaxed,
620                 ) {
621                     Ok(_) => return true,
622                     Err(x) => *state = x,
623                 }
624             }
625         };
626 
627         // Step 1: grab exclusive ownership of WRITER_BIT
628         let timed_out = !self.lock_common(
629             timeout,
630             TOKEN_EXCLUSIVE,
631             try_lock,
632             WRITER_BIT | UPGRADABLE_BIT,
633         );
634         if timed_out {
635             return false;
636         }
637 
638         // Step 2: wait for all remaining readers to exit the lock.
639         self.wait_for_readers(timeout, 0)
640     }
641 
642     #[cold]
unlock_exclusive_slow(&self, force_fair: bool)643     fn unlock_exclusive_slow(&self, force_fair: bool) {
644         // There are threads to unpark. Try to unpark as many as we can.
645         let callback = |mut new_state, result: UnparkResult| {
646             // If we are using a fair unlock then we should keep the
647             // rwlock locked and hand it off to the unparked threads.
648             if result.unparked_threads != 0 && (force_fair || result.be_fair) {
649                 if result.have_more_threads {
650                     new_state |= PARKED_BIT;
651                 }
652                 self.state.store(new_state, Ordering::Release);
653                 TOKEN_HANDOFF
654             } else {
655                 // Clear the parked bit if there are no more parked threads.
656                 if result.have_more_threads {
657                     self.state.store(PARKED_BIT, Ordering::Release);
658                 } else {
659                     self.state.store(0, Ordering::Release);
660                 }
661                 TOKEN_NORMAL
662             }
663         };
664         // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
665         unsafe {
666             self.wake_parked_threads(0, callback);
667         }
668     }
669 
670     #[cold]
lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool671     fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool {
672         let try_lock = |state: &mut usize| {
673             let mut spinwait_shared = SpinWait::new();
674             loop {
675                 // Use hardware lock elision to avoid cache conflicts when multiple
676                 // readers try to acquire the lock. We only do this if the lock is
677                 // completely empty since elision handles conflicts poorly.
678                 if have_elision() && *state == 0 {
679                     match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
680                         Ok(_) => return true,
681                         Err(x) => *state = x,
682                     }
683                 }
684 
685                 // This is the same condition as try_lock_shared_fast
686                 if *state & WRITER_BIT != 0 {
687                     if !recursive || *state & READERS_MASK == 0 {
688                         return false;
689                     }
690                 }
691 
692                 if self
693                     .state
694                     .compare_exchange_weak(
695                         *state,
696                         state
697                             .checked_add(ONE_READER)
698                             .expect("RwLock reader count overflow"),
699                         Ordering::Acquire,
700                         Ordering::Relaxed,
701                     )
702                     .is_ok()
703                 {
704                     return true;
705                 }
706 
707                 // If there is high contention on the reader count then we want
708                 // to leave some time between attempts to acquire the lock to
709                 // let other threads make progress.
710                 spinwait_shared.spin_no_yield();
711                 *state = self.state.load(Ordering::Relaxed);
712             }
713         };
714         self.lock_common(timeout, TOKEN_SHARED, try_lock, WRITER_BIT)
715     }
716 
717     #[cold]
unlock_shared_slow(&self)718     fn unlock_shared_slow(&self) {
719         // At this point WRITER_PARKED_BIT is set and READER_MASK is empty. We
720         // just need to wake up a potentially sleeping pending writer.
721         // Using the 2nd key at addr + 1
722         let addr = self as *const _ as usize + 1;
723         let callback = |_result: UnparkResult| {
724             // Clear the WRITER_PARKED_BIT here since there can only be one
725             // parked writer thread.
726             self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed);
727             TOKEN_NORMAL
728         };
729         // SAFETY:
730         //   * `addr` is an address we control.
731         //   * `callback` does not panic or call into any function of `parking_lot`.
732         unsafe {
733             parking_lot_core::unpark_one(addr, callback);
734         }
735     }
736 
737     #[cold]
lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool738     fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool {
739         let try_lock = |state: &mut usize| {
740             let mut spinwait_shared = SpinWait::new();
741             loop {
742                 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
743                     return false;
744                 }
745 
746                 if self
747                     .state
748                     .compare_exchange_weak(
749                         *state,
750                         state
751                             .checked_add(ONE_READER | UPGRADABLE_BIT)
752                             .expect("RwLock reader count overflow"),
753                         Ordering::Acquire,
754                         Ordering::Relaxed,
755                     )
756                     .is_ok()
757                 {
758                     return true;
759                 }
760 
761                 // If there is high contention on the reader count then we want
762                 // to leave some time between attempts to acquire the lock to
763                 // let other threads make progress.
764                 spinwait_shared.spin_no_yield();
765                 *state = self.state.load(Ordering::Relaxed);
766             }
767         };
768         self.lock_common(
769             timeout,
770             TOKEN_UPGRADABLE,
771             try_lock,
772             WRITER_BIT | UPGRADABLE_BIT,
773         )
774     }
775 
776     #[cold]
unlock_upgradable_slow(&self, force_fair: bool)777     fn unlock_upgradable_slow(&self, force_fair: bool) {
778         // Just release the lock if there are no parked threads.
779         let mut state = self.state.load(Ordering::Relaxed);
780         while state & PARKED_BIT == 0 {
781             match self.state.compare_exchange_weak(
782                 state,
783                 state - (ONE_READER | UPGRADABLE_BIT),
784                 Ordering::Release,
785                 Ordering::Relaxed,
786             ) {
787                 Ok(_) => return,
788                 Err(x) => state = x,
789             }
790         }
791 
792         // There are threads to unpark. Try to unpark as many as we can.
793         let callback = |new_state, result: UnparkResult| {
794             // If we are using a fair unlock then we should keep the
795             // rwlock locked and hand it off to the unparked threads.
796             let mut state = self.state.load(Ordering::Relaxed);
797             if force_fair || result.be_fair {
798                 // Fall back to normal unpark on overflow. Panicking is
799                 // not allowed in parking_lot callbacks.
800                 while let Some(mut new_state) =
801                     (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state)
802                 {
803                     if result.have_more_threads {
804                         new_state |= PARKED_BIT;
805                     } else {
806                         new_state &= !PARKED_BIT;
807                     }
808                     match self.state.compare_exchange_weak(
809                         state,
810                         new_state,
811                         Ordering::Relaxed,
812                         Ordering::Relaxed,
813                     ) {
814                         Ok(_) => return TOKEN_HANDOFF,
815                         Err(x) => state = x,
816                     }
817                 }
818             }
819 
820             // Otherwise just release the upgradable lock and update PARKED_BIT.
821             loop {
822                 let mut new_state = state - (ONE_READER | UPGRADABLE_BIT);
823                 if result.have_more_threads {
824                     new_state |= PARKED_BIT;
825                 } else {
826                     new_state &= !PARKED_BIT;
827                 }
828                 match self.state.compare_exchange_weak(
829                     state,
830                     new_state,
831                     Ordering::Relaxed,
832                     Ordering::Relaxed,
833                 ) {
834                     Ok(_) => return TOKEN_NORMAL,
835                     Err(x) => state = x,
836                 }
837             }
838         };
839         // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
840         unsafe {
841             self.wake_parked_threads(0, callback);
842         }
843     }
844 
845     #[cold]
try_upgrade_slow(&self) -> bool846     fn try_upgrade_slow(&self) -> bool {
847         let mut state = self.state.load(Ordering::Relaxed);
848         loop {
849             if state & READERS_MASK != ONE_READER {
850                 return false;
851             }
852             match self.state.compare_exchange_weak(
853                 state,
854                 state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT,
855                 Ordering::Relaxed,
856                 Ordering::Relaxed,
857             ) {
858                 Ok(_) => return true,
859                 Err(x) => state = x,
860             }
861         }
862     }
863 
864     #[cold]
upgrade_slow(&self, timeout: Option<Instant>) -> bool865     fn upgrade_slow(&self, timeout: Option<Instant>) -> bool {
866         self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT)
867     }
868 
869     #[cold]
downgrade_slow(&self)870     fn downgrade_slow(&self) {
871         // We only reach this point if PARKED_BIT is set.
872         let callback = |_, result: UnparkResult| {
873             // Clear the parked bit if there no more parked threads
874             if !result.have_more_threads {
875                 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
876             }
877             TOKEN_NORMAL
878         };
879         // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
880         unsafe {
881             self.wake_parked_threads(ONE_READER, callback);
882         }
883     }
884 
885     #[cold]
downgrade_to_upgradable_slow(&self)886     fn downgrade_to_upgradable_slow(&self) {
887         // We only reach this point if PARKED_BIT is set.
888         let callback = |_, result: UnparkResult| {
889             // Clear the parked bit if there no more parked threads
890             if !result.have_more_threads {
891                 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
892             }
893             TOKEN_NORMAL
894         };
895         // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
896         unsafe {
897             self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
898         }
899     }
900 
901     #[cold]
bump_shared_slow(&self)902     unsafe fn bump_shared_slow(&self) {
903         self.unlock_shared();
904         self.lock_shared();
905     }
906 
907     #[cold]
bump_exclusive_slow(&self)908     fn bump_exclusive_slow(&self) {
909         self.deadlock_release();
910         self.unlock_exclusive_slow(true);
911         self.lock_exclusive();
912     }
913 
914     #[cold]
bump_upgradable_slow(&self)915     fn bump_upgradable_slow(&self) {
916         self.deadlock_release();
917         self.unlock_upgradable_slow(true);
918         self.lock_upgradable();
919     }
920 
921     /// Common code for waking up parked threads after releasing WRITER_BIT or
922     /// UPGRADABLE_BIT.
923     ///
924     /// # Safety
925     ///
926     /// `callback` must uphold the requirements of the `callback` parameter to
927     /// `parking_lot_core::unpark_filter`. Meaning no panics or calls into any function in
928     /// `parking_lot`.
929     #[inline]
wake_parked_threads( &self, new_state: usize, callback: impl FnOnce(usize, UnparkResult) -> UnparkToken, )930     unsafe fn wake_parked_threads(
931         &self,
932         new_state: usize,
933         callback: impl FnOnce(usize, UnparkResult) -> UnparkToken,
934     ) {
935         // We must wake up at least one upgrader or writer if there is one,
936         // otherwise they may end up parked indefinitely since unlock_shared
937         // does not call wake_parked_threads.
938         let new_state = Cell::new(new_state);
939         let addr = self as *const _ as usize;
940         let filter = |ParkToken(token)| {
941             let s = new_state.get();
942 
943             // If we are waking up a writer, don't wake anything else.
944             if s & WRITER_BIT != 0 {
945                 return FilterOp::Stop;
946             }
947 
948             // Otherwise wake *all* readers and one upgrader/writer.
949             if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 {
950                 // Skip writers and upgradable readers if we already have
951                 // a writer/upgradable reader.
952                 FilterOp::Skip
953             } else {
954                 new_state.set(s + token);
955                 FilterOp::Unpark
956             }
957         };
958         let callback = |result| callback(new_state.get(), result);
959         // SAFETY:
960         // * `addr` is an address we control.
961         // * `filter` does not panic or call into any function of `parking_lot`.
962         // * `callback` safety responsibility is on caller
963         parking_lot_core::unpark_filter(addr, filter, callback);
964     }
965 
966     // Common code for waiting for readers to exit the lock after acquiring
967     // WRITER_BIT.
968     #[inline]
wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool969     fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool {
970         // At this point WRITER_BIT is already set, we just need to wait for the
971         // remaining readers to exit the lock.
972         let mut spinwait = SpinWait::new();
973         let mut state = self.state.load(Ordering::Acquire);
974         while state & READERS_MASK != 0 {
975             // Spin a few times to wait for readers to exit
976             if spinwait.spin() {
977                 state = self.state.load(Ordering::Acquire);
978                 continue;
979             }
980 
981             // Set the parked bit
982             if state & WRITER_PARKED_BIT == 0 {
983                 if let Err(x) = self.state.compare_exchange_weak(
984                     state,
985                     state | WRITER_PARKED_BIT,
986                     Ordering::Relaxed,
987                     Ordering::Relaxed,
988                 ) {
989                     state = x;
990                     continue;
991                 }
992             }
993 
994             // Park our thread until we are woken up by an unlock
995             // Using the 2nd key at addr + 1
996             let addr = self as *const _ as usize + 1;
997             let validate = || {
998                 let state = self.state.load(Ordering::Relaxed);
999                 state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0
1000             };
1001             let before_sleep = || {};
1002             let timed_out = |_, _| {};
1003             // SAFETY:
1004             //   * `addr` is an address we control.
1005             //   * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
1006             //   * `before_sleep` does not call `park`, nor does it panic.
1007             let park_result = unsafe {
1008                 parking_lot_core::park(
1009                     addr,
1010                     validate,
1011                     before_sleep,
1012                     timed_out,
1013                     TOKEN_EXCLUSIVE,
1014                     timeout,
1015                 )
1016             };
1017             match park_result {
1018                 // We still need to re-check the state if we are unparked
1019                 // since a previous writer timing-out could have allowed
1020                 // another reader to sneak in before we parked.
1021                 ParkResult::Unparked(_) | ParkResult::Invalid => {
1022                     state = self.state.load(Ordering::Acquire);
1023                     continue;
1024                 }
1025 
1026                 // Timeout expired
1027                 ParkResult::TimedOut => {
1028                     // We need to release WRITER_BIT and revert back to
1029                     // our previous value. We also wake up any threads that
1030                     // might be waiting on WRITER_BIT.
1031                     let state = self.state.fetch_add(
1032                         prev_value.wrapping_sub(WRITER_BIT | WRITER_PARKED_BIT),
1033                         Ordering::Relaxed,
1034                     );
1035                     if state & PARKED_BIT != 0 {
1036                         let callback = |_, result: UnparkResult| {
1037                             // Clear the parked bit if there no more parked threads
1038                             if !result.have_more_threads {
1039                                 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1040                             }
1041                             TOKEN_NORMAL
1042                         };
1043                         // SAFETY: `callback` does not panic or call any function of `parking_lot`.
1044                         unsafe {
1045                             self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
1046                         }
1047                     }
1048                     return false;
1049                 }
1050             }
1051         }
1052         true
1053     }
1054 
1055     /// Common code for acquiring a lock
1056     #[inline]
lock_common( &self, timeout: Option<Instant>, token: ParkToken, mut try_lock: impl FnMut(&mut usize) -> bool, validate_flags: usize, ) -> bool1057     fn lock_common(
1058         &self,
1059         timeout: Option<Instant>,
1060         token: ParkToken,
1061         mut try_lock: impl FnMut(&mut usize) -> bool,
1062         validate_flags: usize,
1063     ) -> bool {
1064         let mut spinwait = SpinWait::new();
1065         let mut state = self.state.load(Ordering::Relaxed);
1066         loop {
1067             // Attempt to grab the lock
1068             if try_lock(&mut state) {
1069                 return true;
1070             }
1071 
1072             // If there are no parked threads, try spinning a few times.
1073             if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() {
1074                 state = self.state.load(Ordering::Relaxed);
1075                 continue;
1076             }
1077 
1078             // Set the parked bit
1079             if state & PARKED_BIT == 0 {
1080                 if let Err(x) = self.state.compare_exchange_weak(
1081                     state,
1082                     state | PARKED_BIT,
1083                     Ordering::Relaxed,
1084                     Ordering::Relaxed,
1085                 ) {
1086                     state = x;
1087                     continue;
1088                 }
1089             }
1090 
1091             // Park our thread until we are woken up by an unlock
1092             let addr = self as *const _ as usize;
1093             let validate = || {
1094                 let state = self.state.load(Ordering::Relaxed);
1095                 state & PARKED_BIT != 0 && (state & validate_flags != 0)
1096             };
1097             let before_sleep = || {};
1098             let timed_out = |_, was_last_thread| {
1099                 // Clear the parked bit if we were the last parked thread
1100                 if was_last_thread {
1101                     self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1102                 }
1103             };
1104 
1105             // SAFETY:
1106             // * `addr` is an address we control.
1107             // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
1108             // * `before_sleep` does not call `park`, nor does it panic.
1109             let park_result = unsafe {
1110                 parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout)
1111             };
1112             match park_result {
1113                 // The thread that unparked us passed the lock on to us
1114                 // directly without unlocking it.
1115                 ParkResult::Unparked(TOKEN_HANDOFF) => return true,
1116 
1117                 // We were unparked normally, try acquiring the lock again
1118                 ParkResult::Unparked(_) => (),
1119 
1120                 // The validation function failed, try locking again
1121                 ParkResult::Invalid => (),
1122 
1123                 // Timeout expired
1124                 ParkResult::TimedOut => return false,
1125             }
1126 
1127             // Loop back and try locking again
1128             spinwait.reset();
1129             state = self.state.load(Ordering::Relaxed);
1130         }
1131     }
1132 
1133     #[inline]
deadlock_acquire(&self)1134     fn deadlock_acquire(&self) {
1135         unsafe { deadlock::acquire_resource(self as *const _ as usize) };
1136         unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) };
1137     }
1138 
1139     #[inline]
deadlock_release(&self)1140     fn deadlock_release(&self) {
1141         unsafe { deadlock::release_resource(self as *const _ as usize) };
1142         unsafe { deadlock::release_resource(self as *const _ as usize + 1) };
1143     }
1144 }
1145