1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 
8 use crate::elision::{have_elision, AtomicElisionExt};
9 use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL};
10 use crate::util;
11 use core::{
12     cell::Cell,
13     sync::atomic::{AtomicUsize, Ordering},
14 };
15 use lock_api::{GuardNoSend, RawRwLock as RawRwLock_, RawRwLockUpgrade};
16 use parking_lot_core::{
17     self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken,
18 };
19 use std::time::{Duration, Instant};
20 
21 // This reader-writer lock implementation is based on Boost's upgrade_mutex:
22 // https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432
23 //
24 // This implementation uses 2 wait queues, one at key [addr] and one at key
25 // [addr + 1]. The primary queue is used for all new waiting threads, and the
26 // secondary queue is used by the thread which has acquired WRITER_BIT but is
27 // waiting for the remaining readers to exit the lock.
28 //
29 // This implementation is fair between readers and writers since it uses the
30 // order in which threads first started queuing to alternate between read phases
31 // and write phases. In particular is it not vulnerable to write starvation
32 // since readers will block if there is a pending writer.
33 
34 // There is at least one thread in the main queue.
35 const PARKED_BIT: usize = 0b0001;
36 // There is a parked thread holding WRITER_BIT. WRITER_BIT must be set.
37 const WRITER_PARKED_BIT: usize = 0b0010;
38 // A reader is holding an upgradable lock. The reader count must be non-zero and
39 // WRITER_BIT must not be set.
40 const UPGRADABLE_BIT: usize = 0b0100;
41 // If the reader count is zero: a writer is currently holding an exclusive lock.
42 // Otherwise: a writer is waiting for the remaining readers to exit the lock.
43 const WRITER_BIT: usize = 0b1000;
44 // Mask of bits used to count readers.
45 const READERS_MASK: usize = !0b1111;
46 // Base unit for counting readers.
47 const ONE_READER: usize = 0b10000;
48 
49 // Token indicating what type of lock a queued thread is trying to acquire
50 const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER);
51 const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT);
52 const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT);
53 
54 /// Raw reader-writer lock type backed by the parking lot.
55 pub struct RawRwLock {
56     state: AtomicUsize,
57 }
58 
59 unsafe impl lock_api::RawRwLock for RawRwLock {
60     const INIT: RawRwLock = RawRwLock {
61         state: AtomicUsize::new(0),
62     };
63 
64     type GuardMarker = GuardNoSend;
65 
66     #[inline]
lock_exclusive(&self)67     fn lock_exclusive(&self) {
68         if self
69             .state
70             .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
71             .is_err()
72         {
73             let result = self.lock_exclusive_slow(None);
74             debug_assert!(result);
75         }
76         self.deadlock_acquire();
77     }
78 
79     #[inline]
try_lock_exclusive(&self) -> bool80     fn try_lock_exclusive(&self) -> bool {
81         if self
82             .state
83             .compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
84             .is_ok()
85         {
86             self.deadlock_acquire();
87             true
88         } else {
89             false
90         }
91     }
92 
93     #[inline]
unlock_exclusive(&self)94     fn unlock_exclusive(&self) {
95         self.deadlock_release();
96         if self
97             .state
98             .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
99             .is_ok()
100         {
101             return;
102         }
103         self.unlock_exclusive_slow(false);
104     }
105 
106     #[inline]
lock_shared(&self)107     fn lock_shared(&self) {
108         if !self.try_lock_shared_fast(false) {
109             let result = self.lock_shared_slow(false, None);
110             debug_assert!(result);
111         }
112         self.deadlock_acquire();
113     }
114 
115     #[inline]
try_lock_shared(&self) -> bool116     fn try_lock_shared(&self) -> bool {
117         let result = if self.try_lock_shared_fast(false) {
118             true
119         } else {
120             self.try_lock_shared_slow(false)
121         };
122         if result {
123             self.deadlock_acquire();
124         }
125         result
126     }
127 
128     #[inline]
unlock_shared(&self)129     fn unlock_shared(&self) {
130         self.deadlock_release();
131         let state = if have_elision() {
132             self.state.elision_fetch_sub_release(ONE_READER)
133         } else {
134             self.state.fetch_sub(ONE_READER, Ordering::Release)
135         };
136         if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) {
137             self.unlock_shared_slow();
138         }
139     }
140 }
141 
142 unsafe impl lock_api::RawRwLockFair for RawRwLock {
143     #[inline]
unlock_shared_fair(&self)144     fn unlock_shared_fair(&self) {
145         // Shared unlocking is always fair in this implementation.
146         self.unlock_shared();
147     }
148 
149     #[inline]
unlock_exclusive_fair(&self)150     fn unlock_exclusive_fair(&self) {
151         self.deadlock_release();
152         if self
153             .state
154             .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
155             .is_ok()
156         {
157             return;
158         }
159         self.unlock_exclusive_slow(true);
160     }
161 
162     #[inline]
bump_shared(&self)163     fn bump_shared(&self) {
164         if self.state.load(Ordering::Relaxed) & (READERS_MASK | WRITER_BIT)
165             == ONE_READER | WRITER_BIT
166         {
167             self.bump_shared_slow();
168         }
169     }
170 
171     #[inline]
bump_exclusive(&self)172     fn bump_exclusive(&self) {
173         if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 {
174             self.bump_exclusive_slow();
175         }
176     }
177 }
178 
179 unsafe impl lock_api::RawRwLockDowngrade for RawRwLock {
180     #[inline]
downgrade(&self)181     fn downgrade(&self) {
182         let state = self
183             .state
184             .fetch_add(ONE_READER - WRITER_BIT, Ordering::Release);
185 
186         // Wake up parked shared and upgradable threads if there are any
187         if state & PARKED_BIT != 0 {
188             self.downgrade_slow();
189         }
190     }
191 }
192 
193 unsafe impl lock_api::RawRwLockTimed for RawRwLock {
194     type Duration = Duration;
195     type Instant = Instant;
196 
197     #[inline]
try_lock_shared_for(&self, timeout: Self::Duration) -> bool198     fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool {
199         let result = if self.try_lock_shared_fast(false) {
200             true
201         } else {
202             self.lock_shared_slow(false, util::to_deadline(timeout))
203         };
204         if result {
205             self.deadlock_acquire();
206         }
207         result
208     }
209 
210     #[inline]
try_lock_shared_until(&self, timeout: Self::Instant) -> bool211     fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool {
212         let result = if self.try_lock_shared_fast(false) {
213             true
214         } else {
215             self.lock_shared_slow(false, Some(timeout))
216         };
217         if result {
218             self.deadlock_acquire();
219         }
220         result
221     }
222 
223     #[inline]
try_lock_exclusive_for(&self, timeout: Duration) -> bool224     fn try_lock_exclusive_for(&self, timeout: Duration) -> bool {
225         let result = if self
226             .state
227             .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
228             .is_ok()
229         {
230             true
231         } else {
232             self.lock_exclusive_slow(util::to_deadline(timeout))
233         };
234         if result {
235             self.deadlock_acquire();
236         }
237         result
238     }
239 
240     #[inline]
try_lock_exclusive_until(&self, timeout: Instant) -> bool241     fn try_lock_exclusive_until(&self, timeout: Instant) -> bool {
242         let result = if self
243             .state
244             .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
245             .is_ok()
246         {
247             true
248         } else {
249             self.lock_exclusive_slow(Some(timeout))
250         };
251         if result {
252             self.deadlock_acquire();
253         }
254         result
255     }
256 }
257 
258 unsafe impl lock_api::RawRwLockRecursive for RawRwLock {
259     #[inline]
lock_shared_recursive(&self)260     fn lock_shared_recursive(&self) {
261         if !self.try_lock_shared_fast(true) {
262             let result = self.lock_shared_slow(true, None);
263             debug_assert!(result);
264         }
265         self.deadlock_acquire();
266     }
267 
268     #[inline]
try_lock_shared_recursive(&self) -> bool269     fn try_lock_shared_recursive(&self) -> bool {
270         let result = if self.try_lock_shared_fast(true) {
271             true
272         } else {
273             self.try_lock_shared_slow(true)
274         };
275         if result {
276             self.deadlock_acquire();
277         }
278         result
279     }
280 }
281 
282 unsafe impl lock_api::RawRwLockRecursiveTimed for RawRwLock {
283     #[inline]
try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool284     fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool {
285         let result = if self.try_lock_shared_fast(true) {
286             true
287         } else {
288             self.lock_shared_slow(true, util::to_deadline(timeout))
289         };
290         if result {
291             self.deadlock_acquire();
292         }
293         result
294     }
295 
296     #[inline]
try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool297     fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool {
298         let result = if self.try_lock_shared_fast(true) {
299             true
300         } else {
301             self.lock_shared_slow(true, Some(timeout))
302         };
303         if result {
304             self.deadlock_acquire();
305         }
306         result
307     }
308 }
309 
310 unsafe impl lock_api::RawRwLockUpgrade for RawRwLock {
311     #[inline]
lock_upgradable(&self)312     fn lock_upgradable(&self) {
313         if !self.try_lock_upgradable_fast() {
314             let result = self.lock_upgradable_slow(None);
315             debug_assert!(result);
316         }
317         self.deadlock_acquire();
318     }
319 
320     #[inline]
try_lock_upgradable(&self) -> bool321     fn try_lock_upgradable(&self) -> bool {
322         let result = if self.try_lock_upgradable_fast() {
323             true
324         } else {
325             self.try_lock_upgradable_slow()
326         };
327         if result {
328             self.deadlock_acquire();
329         }
330         result
331     }
332 
333     #[inline]
unlock_upgradable(&self)334     fn unlock_upgradable(&self) {
335         self.deadlock_release();
336         let state = self.state.load(Ordering::Relaxed);
337         if state & PARKED_BIT == 0 {
338             if self
339                 .state
340                 .compare_exchange_weak(
341                     state,
342                     state - (ONE_READER | UPGRADABLE_BIT),
343                     Ordering::Release,
344                     Ordering::Relaxed,
345                 )
346                 .is_ok()
347             {
348                 return;
349             }
350         }
351         self.unlock_upgradable_slow(false);
352     }
353 
354     #[inline]
upgrade(&self)355     fn upgrade(&self) {
356         let state = self.state.fetch_sub(
357             (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
358             Ordering::Relaxed,
359         );
360         if state & READERS_MASK != ONE_READER {
361             let result = self.upgrade_slow(None);
362             debug_assert!(result);
363         }
364     }
365 
366     #[inline]
try_upgrade(&self) -> bool367     fn try_upgrade(&self) -> bool {
368         if self
369             .state
370             .compare_exchange_weak(
371                 ONE_READER | UPGRADABLE_BIT,
372                 WRITER_BIT,
373                 Ordering::Relaxed,
374                 Ordering::Relaxed,
375             )
376             .is_ok()
377         {
378             true
379         } else {
380             self.try_upgrade_slow()
381         }
382     }
383 }
384 
385 unsafe impl lock_api::RawRwLockUpgradeFair for RawRwLock {
386     #[inline]
unlock_upgradable_fair(&self)387     fn unlock_upgradable_fair(&self) {
388         self.deadlock_release();
389         let state = self.state.load(Ordering::Relaxed);
390         if state & PARKED_BIT == 0 {
391             if self
392                 .state
393                 .compare_exchange_weak(
394                     state,
395                     state - (ONE_READER | UPGRADABLE_BIT),
396                     Ordering::Release,
397                     Ordering::Relaxed,
398                 )
399                 .is_ok()
400             {
401                 return;
402             }
403         }
404         self.unlock_upgradable_slow(false);
405     }
406 
407     #[inline]
bump_upgradable(&self)408     fn bump_upgradable(&self) {
409         if self.state.load(Ordering::Relaxed) == ONE_READER | UPGRADABLE_BIT | PARKED_BIT {
410             self.bump_upgradable_slow();
411         }
412     }
413 }
414 
415 unsafe impl lock_api::RawRwLockUpgradeDowngrade for RawRwLock {
416     #[inline]
downgrade_upgradable(&self)417     fn downgrade_upgradable(&self) {
418         let state = self.state.fetch_sub(UPGRADABLE_BIT, Ordering::Relaxed);
419 
420         // Wake up parked upgradable threads if there are any
421         if state & PARKED_BIT != 0 {
422             self.downgrade_slow();
423         }
424     }
425 
426     #[inline]
downgrade_to_upgradable(&self)427     fn downgrade_to_upgradable(&self) {
428         let state = self.state.fetch_add(
429             (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
430             Ordering::Release,
431         );
432 
433         // Wake up parked shared threads if there are any
434         if state & PARKED_BIT != 0 {
435             self.downgrade_to_upgradable_slow();
436         }
437     }
438 }
439 
440 unsafe impl lock_api::RawRwLockUpgradeTimed for RawRwLock {
441     #[inline]
try_lock_upgradable_until(&self, timeout: Instant) -> bool442     fn try_lock_upgradable_until(&self, timeout: Instant) -> bool {
443         let result = if self.try_lock_upgradable_fast() {
444             true
445         } else {
446             self.lock_upgradable_slow(Some(timeout))
447         };
448         if result {
449             self.deadlock_acquire();
450         }
451         result
452     }
453 
454     #[inline]
try_lock_upgradable_for(&self, timeout: Duration) -> bool455     fn try_lock_upgradable_for(&self, timeout: Duration) -> bool {
456         let result = if self.try_lock_upgradable_fast() {
457             true
458         } else {
459             self.lock_upgradable_slow(util::to_deadline(timeout))
460         };
461         if result {
462             self.deadlock_acquire();
463         }
464         result
465     }
466 
467     #[inline]
try_upgrade_until(&self, timeout: Instant) -> bool468     fn try_upgrade_until(&self, timeout: Instant) -> bool {
469         let state = self.state.fetch_sub(
470             (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
471             Ordering::Relaxed,
472         );
473         if state & READERS_MASK == ONE_READER {
474             true
475         } else {
476             self.upgrade_slow(Some(timeout))
477         }
478     }
479 
480     #[inline]
try_upgrade_for(&self, timeout: Duration) -> bool481     fn try_upgrade_for(&self, timeout: Duration) -> bool {
482         let state = self.state.fetch_sub(
483             (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
484             Ordering::Relaxed,
485         );
486         if state & READERS_MASK == ONE_READER {
487             true
488         } else {
489             self.upgrade_slow(util::to_deadline(timeout))
490         }
491     }
492 }
493 
494 impl RawRwLock {
495     #[inline(always)]
try_lock_shared_fast(&self, recursive: bool) -> bool496     fn try_lock_shared_fast(&self, recursive: bool) -> bool {
497         let state = self.state.load(Ordering::Relaxed);
498 
499         // We can't allow grabbing a shared lock if there is a writer, even if
500         // the writer is still waiting for the remaining readers to exit.
501         if state & WRITER_BIT != 0 {
502             // To allow recursive locks, we make an exception and allow readers
503             // to skip ahead of a pending writer to avoid deadlocking, at the
504             // cost of breaking the fairness guarantees.
505             if !recursive || state & READERS_MASK == 0 {
506                 return false;
507             }
508         }
509 
510         // Use hardware lock elision to avoid cache conflicts when multiple
511         // readers try to acquire the lock. We only do this if the lock is
512         // completely empty since elision handles conflicts poorly.
513         if have_elision() && state == 0 {
514             self.state
515                 .elision_compare_exchange_acquire(0, ONE_READER)
516                 .is_ok()
517         } else if let Some(new_state) = state.checked_add(ONE_READER) {
518             self.state
519                 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
520                 .is_ok()
521         } else {
522             false
523         }
524     }
525 
526     #[cold]
try_lock_shared_slow(&self, recursive: bool) -> bool527     fn try_lock_shared_slow(&self, recursive: bool) -> bool {
528         let mut state = self.state.load(Ordering::Relaxed);
529         loop {
530             // This mirrors the condition in try_lock_shared_fast
531             if state & WRITER_BIT != 0 {
532                 if !recursive || state & READERS_MASK == 0 {
533                     return false;
534                 }
535             }
536             if have_elision() && state == 0 {
537                 match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
538                     Ok(_) => return true,
539                     Err(x) => state = x,
540                 }
541             } else {
542                 match self.state.compare_exchange_weak(
543                     state,
544                     state
545                         .checked_add(ONE_READER)
546                         .expect("RwLock reader count overflow"),
547                     Ordering::Acquire,
548                     Ordering::Relaxed,
549                 ) {
550                     Ok(_) => return true,
551                     Err(x) => state = x,
552                 }
553             }
554         }
555     }
556 
557     #[inline(always)]
try_lock_upgradable_fast(&self) -> bool558     fn try_lock_upgradable_fast(&self) -> bool {
559         let state = self.state.load(Ordering::Relaxed);
560 
561         // We can't grab an upgradable lock if there is already a writer or
562         // upgradable reader.
563         if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
564             return false;
565         }
566 
567         if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) {
568             self.state
569                 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
570                 .is_ok()
571         } else {
572             false
573         }
574     }
575 
576     #[cold]
try_lock_upgradable_slow(&self) -> bool577     fn try_lock_upgradable_slow(&self) -> bool {
578         let mut state = self.state.load(Ordering::Relaxed);
579         loop {
580             // This mirrors the condition in try_lock_upgradable_fast
581             if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
582                 return false;
583             }
584 
585             match self.state.compare_exchange_weak(
586                 state,
587                 state
588                     .checked_add(ONE_READER | UPGRADABLE_BIT)
589                     .expect("RwLock reader count overflow"),
590                 Ordering::Acquire,
591                 Ordering::Relaxed,
592             ) {
593                 Ok(_) => return true,
594                 Err(x) => state = x,
595             }
596         }
597     }
598 
599     #[cold]
lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool600     fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool {
601         let try_lock = |state: &mut usize| {
602             loop {
603                 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
604                     return false;
605                 }
606 
607                 // Grab WRITER_BIT if it isn't set, even if there are parked threads.
608                 match self.state.compare_exchange_weak(
609                     *state,
610                     *state | WRITER_BIT,
611                     Ordering::Acquire,
612                     Ordering::Relaxed,
613                 ) {
614                     Ok(_) => return true,
615                     Err(x) => *state = x,
616                 }
617             }
618         };
619 
620         // Step 1: grab exclusive ownership of WRITER_BIT
621         let timed_out = !self.lock_common(
622             timeout,
623             TOKEN_EXCLUSIVE,
624             try_lock,
625             WRITER_BIT | UPGRADABLE_BIT,
626         );
627         if timed_out {
628             return false;
629         }
630 
631         // Step 2: wait for all remaining readers to exit the lock.
632         self.wait_for_readers(timeout, 0)
633     }
634 
635     #[cold]
unlock_exclusive_slow(&self, force_fair: bool)636     fn unlock_exclusive_slow(&self, force_fair: bool) {
637         // There are threads to unpark. Try to unpark as many as we can.
638         let callback = |mut new_state, result: UnparkResult| {
639             // If we are using a fair unlock then we should keep the
640             // rwlock locked and hand it off to the unparked threads.
641             if result.unparked_threads != 0 && (force_fair || result.be_fair) {
642                 if result.have_more_threads {
643                     new_state |= PARKED_BIT;
644                 }
645                 self.state.store(new_state, Ordering::Release);
646                 TOKEN_HANDOFF
647             } else {
648                 // Clear the parked bit if there are no more parked threads.
649                 if result.have_more_threads {
650                     self.state.store(PARKED_BIT, Ordering::Release);
651                 } else {
652                     self.state.store(0, Ordering::Release);
653                 }
654                 TOKEN_NORMAL
655             }
656         };
657         // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
658         unsafe {
659             self.wake_parked_threads(0, callback);
660         }
661     }
662 
663     #[cold]
lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool664     fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool {
665         let try_lock = |state: &mut usize| {
666             let mut spinwait_shared = SpinWait::new();
667             loop {
668                 // Use hardware lock elision to avoid cache conflicts when multiple
669                 // readers try to acquire the lock. We only do this if the lock is
670                 // completely empty since elision handles conflicts poorly.
671                 if have_elision() && *state == 0 {
672                     match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
673                         Ok(_) => return true,
674                         Err(x) => *state = x,
675                     }
676                 }
677 
678                 // This is the same condition as try_lock_shared_fast
679                 if *state & WRITER_BIT != 0 {
680                     if !recursive || *state & READERS_MASK == 0 {
681                         return false;
682                     }
683                 }
684 
685                 if self
686                     .state
687                     .compare_exchange_weak(
688                         *state,
689                         state
690                             .checked_add(ONE_READER)
691                             .expect("RwLock reader count overflow"),
692                         Ordering::Acquire,
693                         Ordering::Relaxed,
694                     )
695                     .is_ok()
696                 {
697                     return true;
698                 }
699 
700                 // If there is high contention on the reader count then we want
701                 // to leave some time between attempts to acquire the lock to
702                 // let other threads make progress.
703                 spinwait_shared.spin_no_yield();
704                 *state = self.state.load(Ordering::Relaxed);
705             }
706         };
707         self.lock_common(timeout, TOKEN_SHARED, try_lock, WRITER_BIT)
708     }
709 
710     #[cold]
unlock_shared_slow(&self)711     fn unlock_shared_slow(&self) {
712         // At this point WRITER_PARKED_BIT is set and READER_MASK is empty. We
713         // just need to wake up a potentially sleeping pending writer.
714         // Using the 2nd key at addr + 1
715         let addr = self as *const _ as usize + 1;
716         let callback = |_result: UnparkResult| {
717             // Clear the WRITER_PARKED_BIT here since there can only be one
718             // parked writer thread.
719             self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed);
720             TOKEN_NORMAL
721         };
722         // SAFETY:
723         //   * `addr` is an address we control.
724         //   * `callback` does not panic or call into any function of `parking_lot`.
725         unsafe {
726             parking_lot_core::unpark_one(addr, callback);
727         }
728     }
729 
730     #[cold]
lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool731     fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool {
732         let try_lock = |state: &mut usize| {
733             let mut spinwait_shared = SpinWait::new();
734             loop {
735                 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
736                     return false;
737                 }
738 
739                 if self
740                     .state
741                     .compare_exchange_weak(
742                         *state,
743                         state
744                             .checked_add(ONE_READER | UPGRADABLE_BIT)
745                             .expect("RwLock reader count overflow"),
746                         Ordering::Acquire,
747                         Ordering::Relaxed,
748                     )
749                     .is_ok()
750                 {
751                     return true;
752                 }
753 
754                 // If there is high contention on the reader count then we want
755                 // to leave some time between attempts to acquire the lock to
756                 // let other threads make progress.
757                 spinwait_shared.spin_no_yield();
758                 *state = self.state.load(Ordering::Relaxed);
759             }
760         };
761         self.lock_common(
762             timeout,
763             TOKEN_UPGRADABLE,
764             try_lock,
765             WRITER_BIT | UPGRADABLE_BIT,
766         )
767     }
768 
769     #[cold]
unlock_upgradable_slow(&self, force_fair: bool)770     fn unlock_upgradable_slow(&self, force_fair: bool) {
771         // Just release the lock if there are no parked threads.
772         let mut state = self.state.load(Ordering::Relaxed);
773         while state & PARKED_BIT == 0 {
774             match self.state.compare_exchange_weak(
775                 state,
776                 state - (ONE_READER | UPGRADABLE_BIT),
777                 Ordering::Release,
778                 Ordering::Relaxed,
779             ) {
780                 Ok(_) => return,
781                 Err(x) => state = x,
782             }
783         }
784 
785         // There are threads to unpark. Try to unpark as many as we can.
786         let callback = |new_state, result: UnparkResult| {
787             // If we are using a fair unlock then we should keep the
788             // rwlock locked and hand it off to the unparked threads.
789             let mut state = self.state.load(Ordering::Relaxed);
790             if force_fair || result.be_fair {
791                 // Fall back to normal unpark on overflow. Panicking is
792                 // not allowed in parking_lot callbacks.
793                 while let Some(mut new_state) =
794                     (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state)
795                 {
796                     if result.have_more_threads {
797                         new_state |= PARKED_BIT;
798                     } else {
799                         new_state &= !PARKED_BIT;
800                     }
801                     match self.state.compare_exchange_weak(
802                         state,
803                         new_state,
804                         Ordering::Relaxed,
805                         Ordering::Relaxed,
806                     ) {
807                         Ok(_) => return TOKEN_HANDOFF,
808                         Err(x) => state = x,
809                     }
810                 }
811             }
812 
813             // Otherwise just release the upgradable lock and update PARKED_BIT.
814             loop {
815                 let mut new_state = state - (ONE_READER | UPGRADABLE_BIT);
816                 if result.have_more_threads {
817                     new_state |= PARKED_BIT;
818                 } else {
819                     new_state &= !PARKED_BIT;
820                 }
821                 match self.state.compare_exchange_weak(
822                     state,
823                     new_state,
824                     Ordering::Relaxed,
825                     Ordering::Relaxed,
826                 ) {
827                     Ok(_) => return TOKEN_NORMAL,
828                     Err(x) => state = x,
829                 }
830             }
831         };
832         // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
833         unsafe {
834             self.wake_parked_threads(0, callback);
835         }
836     }
837 
838     #[cold]
try_upgrade_slow(&self) -> bool839     fn try_upgrade_slow(&self) -> bool {
840         let mut state = self.state.load(Ordering::Relaxed);
841         loop {
842             if state & READERS_MASK != ONE_READER {
843                 return false;
844             }
845             match self.state.compare_exchange_weak(
846                 state,
847                 state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT,
848                 Ordering::Relaxed,
849                 Ordering::Relaxed,
850             ) {
851                 Ok(_) => return true,
852                 Err(x) => state = x,
853             }
854         }
855     }
856 
857     #[cold]
upgrade_slow(&self, timeout: Option<Instant>) -> bool858     fn upgrade_slow(&self, timeout: Option<Instant>) -> bool {
859         self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT)
860     }
861 
862     #[cold]
downgrade_slow(&self)863     fn downgrade_slow(&self) {
864         // We only reach this point if PARKED_BIT is set.
865         let callback = |_, result: UnparkResult| {
866             // Clear the parked bit if there no more parked threads
867             if !result.have_more_threads {
868                 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
869             }
870             TOKEN_NORMAL
871         };
872         // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
873         unsafe {
874             self.wake_parked_threads(ONE_READER, callback);
875         }
876     }
877 
878     #[cold]
downgrade_to_upgradable_slow(&self)879     fn downgrade_to_upgradable_slow(&self) {
880         // We only reach this point if PARKED_BIT is set.
881         let callback = |_, result: UnparkResult| {
882             // Clear the parked bit if there no more parked threads
883             if !result.have_more_threads {
884                 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
885             }
886             TOKEN_NORMAL
887         };
888         // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
889         unsafe {
890             self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
891         }
892     }
893 
894     #[cold]
bump_shared_slow(&self)895     fn bump_shared_slow(&self) {
896         self.unlock_shared();
897         self.lock_shared();
898     }
899 
900     #[cold]
bump_exclusive_slow(&self)901     fn bump_exclusive_slow(&self) {
902         self.deadlock_release();
903         self.unlock_exclusive_slow(true);
904         self.lock_exclusive();
905     }
906 
907     #[cold]
bump_upgradable_slow(&self)908     fn bump_upgradable_slow(&self) {
909         self.deadlock_release();
910         self.unlock_upgradable_slow(true);
911         self.lock_upgradable();
912     }
913 
914     /// Common code for waking up parked threads after releasing WRITER_BIT or
915     /// UPGRADABLE_BIT.
916     ///
917     /// # Safety
918     ///
919     /// `callback` must uphold the requirements of the `callback` parameter to
920     /// `parking_lot_core::unpark_filter`. Meaning no panics or calls into any function in
921     /// `parking_lot`.
922     #[inline]
wake_parked_threads( &self, new_state: usize, callback: impl FnOnce(usize, UnparkResult) -> UnparkToken, )923     unsafe fn wake_parked_threads(
924         &self,
925         new_state: usize,
926         callback: impl FnOnce(usize, UnparkResult) -> UnparkToken,
927     ) {
928         // We must wake up at least one upgrader or writer if there is one,
929         // otherwise they may end up parked indefinitely since unlock_shared
930         // does not call wake_parked_threads.
931         let new_state = Cell::new(new_state);
932         let addr = self as *const _ as usize;
933         let filter = |ParkToken(token)| {
934             let s = new_state.get();
935 
936             // If we are waking up a writer, don't wake anything else.
937             if s & WRITER_BIT != 0 {
938                 return FilterOp::Stop;
939             }
940 
941             // Otherwise wake *all* readers and one upgrader/writer.
942             if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 {
943                 // Skip writers and upgradable readers if we already have
944                 // a writer/upgradable reader.
945                 FilterOp::Skip
946             } else {
947                 new_state.set(s + token);
948                 FilterOp::Unpark
949             }
950         };
951         let callback = |result| callback(new_state.get(), result);
952         // SAFETY:
953         // * `addr` is an address we control.
954         // * `filter` does not panic or call into any function of `parking_lot`.
955         // * `callback` safety responsibility is on caller
956         parking_lot_core::unpark_filter(addr, filter, callback);
957     }
958 
959     // Common code for waiting for readers to exit the lock after acquiring
960     // WRITER_BIT.
961     #[inline]
wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool962     fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool {
963         // At this point WRITER_BIT is already set, we just need to wait for the
964         // remaining readers to exit the lock.
965         let mut spinwait = SpinWait::new();
966         let mut state = self.state.load(Ordering::Relaxed);
967         while state & READERS_MASK != 0 {
968             // Spin a few times to wait for readers to exit
969             if spinwait.spin() {
970                 state = self.state.load(Ordering::Relaxed);
971                 continue;
972             }
973 
974             // Set the parked bit
975             if state & WRITER_PARKED_BIT == 0 {
976                 if let Err(x) = self.state.compare_exchange_weak(
977                     state,
978                     state | WRITER_PARKED_BIT,
979                     Ordering::Relaxed,
980                     Ordering::Relaxed,
981                 ) {
982                     state = x;
983                     continue;
984                 }
985             }
986 
987             // Park our thread until we are woken up by an unlock
988             // Using the 2nd key at addr + 1
989             let addr = self as *const _ as usize + 1;
990             let validate = || {
991                 let state = self.state.load(Ordering::Relaxed);
992                 state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0
993             };
994             let before_sleep = || {};
995             let timed_out = |_, _| {};
996             // SAFETY:
997             //   * `addr` is an address we control.
998             //   * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
999             //   * `before_sleep` does not call `park`, nor does it panic.
1000             let park_result = unsafe {
1001                 parking_lot_core::park(
1002                     addr,
1003                     validate,
1004                     before_sleep,
1005                     timed_out,
1006                     TOKEN_EXCLUSIVE,
1007                     timeout,
1008                 )
1009             };
1010             match park_result {
1011                 // We still need to re-check the state if we are unparked
1012                 // since a previous writer timing-out could have allowed
1013                 // another reader to sneak in before we parked.
1014                 ParkResult::Unparked(_) | ParkResult::Invalid => {
1015                     state = self.state.load(Ordering::Relaxed);
1016                     continue;
1017                 }
1018 
1019                 // Timeout expired
1020                 ParkResult::TimedOut => {
1021                     // We need to release WRITER_BIT and revert back to
1022                     // our previous value. We also wake up any threads that
1023                     // might be waiting on WRITER_BIT.
1024                     let state = self.state.fetch_add(
1025                         prev_value.wrapping_sub(WRITER_BIT | WRITER_PARKED_BIT),
1026                         Ordering::Relaxed,
1027                     );
1028                     if state & PARKED_BIT != 0 {
1029                         let callback = |_, result: UnparkResult| {
1030                             // Clear the parked bit if there no more parked threads
1031                             if !result.have_more_threads {
1032                                 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1033                             }
1034                             TOKEN_NORMAL
1035                         };
1036                         // SAFETY: `callback` does not panic or call any function of `parking_lot`.
1037                         unsafe {
1038                             self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
1039                         }
1040                     }
1041                     return false;
1042                 }
1043             }
1044         }
1045         true
1046     }
1047 
1048     /// Common code for acquiring a lock
1049     #[inline]
lock_common( &self, timeout: Option<Instant>, token: ParkToken, mut try_lock: impl FnMut(&mut usize) -> bool, validate_flags: usize, ) -> bool1050     fn lock_common(
1051         &self,
1052         timeout: Option<Instant>,
1053         token: ParkToken,
1054         mut try_lock: impl FnMut(&mut usize) -> bool,
1055         validate_flags: usize,
1056     ) -> bool {
1057         let mut spinwait = SpinWait::new();
1058         let mut state = self.state.load(Ordering::Relaxed);
1059         loop {
1060             // Attempt to grab the lock
1061             if try_lock(&mut state) {
1062                 return true;
1063             }
1064 
1065             // If there are no parked threads, try spinning a few times.
1066             if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() {
1067                 state = self.state.load(Ordering::Relaxed);
1068                 continue;
1069             }
1070 
1071             // Set the parked bit
1072             if state & PARKED_BIT == 0 {
1073                 if let Err(x) = self.state.compare_exchange_weak(
1074                     state,
1075                     state | PARKED_BIT,
1076                     Ordering::Relaxed,
1077                     Ordering::Relaxed,
1078                 ) {
1079                     state = x;
1080                     continue;
1081                 }
1082             }
1083 
1084             // Park our thread until we are woken up by an unlock
1085             let addr = self as *const _ as usize;
1086             let validate = || {
1087                 let state = self.state.load(Ordering::Relaxed);
1088                 state & PARKED_BIT != 0 && (state & validate_flags != 0)
1089             };
1090             let before_sleep = || {};
1091             let timed_out = |_, was_last_thread| {
1092                 // Clear the parked bit if we were the last parked thread
1093                 if was_last_thread {
1094                     self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1095                 }
1096             };
1097 
1098             // SAFETY:
1099             // * `addr` is an address we control.
1100             // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
1101             // * `before_sleep` does not call `park`, nor does it panic.
1102             let park_result = unsafe {
1103                 parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout)
1104             };
1105             match park_result {
1106                 // The thread that unparked us passed the lock on to us
1107                 // directly without unlocking it.
1108                 ParkResult::Unparked(TOKEN_HANDOFF) => return true,
1109 
1110                 // We were unparked normally, try acquiring the lock again
1111                 ParkResult::Unparked(_) => (),
1112 
1113                 // The validation function failed, try locking again
1114                 ParkResult::Invalid => (),
1115 
1116                 // Timeout expired
1117                 ParkResult::TimedOut => return false,
1118             }
1119 
1120             // Loop back and try locking again
1121             spinwait.reset();
1122             state = self.state.load(Ordering::Relaxed);
1123         }
1124     }
1125 
1126     #[inline]
deadlock_acquire(&self)1127     fn deadlock_acquire(&self) {
1128         unsafe { deadlock::acquire_resource(self as *const _ as usize) };
1129         unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) };
1130     }
1131 
1132     #[inline]
deadlock_release(&self)1133     fn deadlock_release(&self) {
1134         unsafe { deadlock::release_resource(self as *const _ as usize) };
1135         unsafe { deadlock::release_resource(self as *const _ as usize + 1) };
1136     }
1137 }
1138