1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 
8 use crate::elision::{have_elision, AtomicElisionExt};
9 use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL};
10 use crate::util;
11 use core::{
12     cell::Cell,
13     sync::atomic::{AtomicUsize, Ordering},
14 };
15 use lock_api::{
16     GuardNoSend, RawRwLock as RawRwLockTrait, RawRwLockDowngrade, RawRwLockFair,
17     RawRwLockRecursive, RawRwLockRecursiveTimed, RawRwLockTimed, RawRwLockUpgrade,
18     RawRwLockUpgradeDowngrade, RawRwLockUpgradeFair, RawRwLockUpgradeTimed,
19 };
20 use parking_lot_core::{
21     self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken,
22 };
23 use std::time::{Duration, Instant};
24 
25 // This reader-writer lock implementation is based on Boost's upgrade_mutex:
26 // https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432
27 //
28 // This implementation uses 2 wait queues, one at key [addr] and one at key
29 // [addr + 1]. The primary queue is used for all new waiting threads, and the
30 // secondary queue is used by the thread which has acquired WRITER_BIT but is
31 // waiting for the remaining readers to exit the lock.
32 //
33 // This implementation is fair between readers and writers since it uses the
34 // order in which threads first started queuing to alternate between read phases
35 // and write phases. In particular is it not vulnerable to write starvation
36 // since readers will block if there is a pending writer.
37 
38 // There is at least one thread in the main queue.
39 const PARKED_BIT: usize = 0b0001;
40 // There is a parked thread holding WRITER_BIT. WRITER_BIT must be set.
41 const WRITER_PARKED_BIT: usize = 0b0010;
42 // A reader is holding an upgradable lock. The reader count must be non-zero and
43 // WRITER_BIT must not be set.
44 const UPGRADABLE_BIT: usize = 0b0100;
45 // If the reader count is zero: a writer is currently holding an exclusive lock.
46 // Otherwise: a writer is waiting for the remaining readers to exit the lock.
47 const WRITER_BIT: usize = 0b1000;
48 // Mask of bits used to count readers.
49 const READERS_MASK: usize = !0b1111;
50 // Base unit for counting readers.
51 const ONE_READER: usize = 0b10000;
52 
53 // Token indicating what type of lock a queued thread is trying to acquire
54 const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER);
55 const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT);
56 const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT);
57 
58 /// Raw reader-writer lock type backed by the parking lot.
59 pub struct RawRwLock {
60     state: AtomicUsize,
61 }
62 
63 unsafe impl RawRwLockTrait for RawRwLock {
64     const INIT: RawRwLock = RawRwLock { state: AtomicUsize::new(0) };
65 
66     type GuardMarker = GuardNoSend;
67 
68     #[inline]
lock_exclusive(&self)69     fn lock_exclusive(&self) {
70         if self
71             .state
72             .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
73             .is_err()
74         {
75             let result = self.lock_exclusive_slow(None);
76             debug_assert!(result);
77         }
78         self.deadlock_acquire();
79     }
80 
81     #[inline]
try_lock_exclusive(&self) -> bool82     fn try_lock_exclusive(&self) -> bool {
83         if self.state.compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed).is_ok()
84         {
85             self.deadlock_acquire();
86             true
87         } else {
88             false
89         }
90     }
91 
92     #[inline]
unlock_exclusive(&self)93     fn unlock_exclusive(&self) {
94         self.deadlock_release();
95         if self.state.compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed).is_ok()
96         {
97             return;
98         }
99         self.unlock_exclusive_slow(false);
100     }
101 
102     #[inline]
lock_shared(&self)103     fn lock_shared(&self) {
104         if !self.try_lock_shared_fast(false) {
105             let result = self.lock_shared_slow(false, None);
106             debug_assert!(result);
107         }
108         self.deadlock_acquire();
109     }
110 
111     #[inline]
try_lock_shared(&self) -> bool112     fn try_lock_shared(&self) -> bool {
113         let result =
114             if self.try_lock_shared_fast(false) { true } else { self.try_lock_shared_slow(false) };
115         if result {
116             self.deadlock_acquire();
117         }
118         result
119     }
120 
121     #[inline]
unlock_shared(&self)122     fn unlock_shared(&self) {
123         self.deadlock_release();
124         let state = if have_elision() {
125             self.state.elision_fetch_sub_release(ONE_READER)
126         } else {
127             self.state.fetch_sub(ONE_READER, Ordering::Release)
128         };
129         if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) {
130             self.unlock_shared_slow();
131         }
132     }
133 }
134 
135 unsafe impl RawRwLockFair for RawRwLock {
136     #[inline]
unlock_shared_fair(&self)137     fn unlock_shared_fair(&self) {
138         // Shared unlocking is always fair in this implementation.
139         self.unlock_shared();
140     }
141 
142     #[inline]
unlock_exclusive_fair(&self)143     fn unlock_exclusive_fair(&self) {
144         self.deadlock_release();
145         if self.state.compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed).is_ok()
146         {
147             return;
148         }
149         self.unlock_exclusive_slow(true);
150     }
151 
152     #[inline]
bump_shared(&self)153     fn bump_shared(&self) {
154         if self.state.load(Ordering::Relaxed) & (READERS_MASK | WRITER_BIT)
155             == ONE_READER | WRITER_BIT
156         {
157             self.bump_shared_slow();
158         }
159     }
160 
161     #[inline]
bump_exclusive(&self)162     fn bump_exclusive(&self) {
163         if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 {
164             self.bump_exclusive_slow();
165         }
166     }
167 }
168 
169 unsafe impl RawRwLockDowngrade for RawRwLock {
170     #[inline]
downgrade(&self)171     fn downgrade(&self) {
172         let state = self.state.fetch_add(ONE_READER - WRITER_BIT, Ordering::Release);
173 
174         // Wake up parked shared and upgradable threads if there are any
175         if state & PARKED_BIT != 0 {
176             self.downgrade_slow();
177         }
178     }
179 }
180 
181 unsafe impl RawRwLockTimed for RawRwLock {
182     type Duration = Duration;
183     type Instant = Instant;
184 
185     #[inline]
try_lock_shared_for(&self, timeout: Self::Duration) -> bool186     fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool {
187         let result = if self.try_lock_shared_fast(false) {
188             true
189         } else {
190             self.lock_shared_slow(false, util::to_deadline(timeout))
191         };
192         if result {
193             self.deadlock_acquire();
194         }
195         result
196     }
197 
198     #[inline]
try_lock_shared_until(&self, timeout: Self::Instant) -> bool199     fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool {
200         let result = if self.try_lock_shared_fast(false) {
201             true
202         } else {
203             self.lock_shared_slow(false, Some(timeout))
204         };
205         if result {
206             self.deadlock_acquire();
207         }
208         result
209     }
210 
211     #[inline]
try_lock_exclusive_for(&self, timeout: Duration) -> bool212     fn try_lock_exclusive_for(&self, timeout: Duration) -> bool {
213         let result = if self
214             .state
215             .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
216             .is_ok()
217         {
218             true
219         } else {
220             self.lock_exclusive_slow(util::to_deadline(timeout))
221         };
222         if result {
223             self.deadlock_acquire();
224         }
225         result
226     }
227 
228     #[inline]
try_lock_exclusive_until(&self, timeout: Instant) -> bool229     fn try_lock_exclusive_until(&self, timeout: Instant) -> bool {
230         let result = if self
231             .state
232             .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
233             .is_ok()
234         {
235             true
236         } else {
237             self.lock_exclusive_slow(Some(timeout))
238         };
239         if result {
240             self.deadlock_acquire();
241         }
242         result
243     }
244 }
245 
246 unsafe impl RawRwLockRecursive for RawRwLock {
247     #[inline]
lock_shared_recursive(&self)248     fn lock_shared_recursive(&self) {
249         if !self.try_lock_shared_fast(true) {
250             let result = self.lock_shared_slow(true, None);
251             debug_assert!(result);
252         }
253         self.deadlock_acquire();
254     }
255 
256     #[inline]
try_lock_shared_recursive(&self) -> bool257     fn try_lock_shared_recursive(&self) -> bool {
258         let result =
259             if self.try_lock_shared_fast(true) { true } else { self.try_lock_shared_slow(true) };
260         if result {
261             self.deadlock_acquire();
262         }
263         result
264     }
265 }
266 
267 unsafe impl RawRwLockRecursiveTimed for RawRwLock {
268     #[inline]
try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool269     fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool {
270         let result = if self.try_lock_shared_fast(true) {
271             true
272         } else {
273             self.lock_shared_slow(true, util::to_deadline(timeout))
274         };
275         if result {
276             self.deadlock_acquire();
277         }
278         result
279     }
280 
281     #[inline]
try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool282     fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool {
283         let result = if self.try_lock_shared_fast(true) {
284             true
285         } else {
286             self.lock_shared_slow(true, Some(timeout))
287         };
288         if result {
289             self.deadlock_acquire();
290         }
291         result
292     }
293 }
294 
295 unsafe impl RawRwLockUpgrade for RawRwLock {
296     #[inline]
lock_upgradable(&self)297     fn lock_upgradable(&self) {
298         if !self.try_lock_upgradable_fast() {
299             let result = self.lock_upgradable_slow(None);
300             debug_assert!(result);
301         }
302         self.deadlock_acquire();
303     }
304 
305     #[inline]
try_lock_upgradable(&self) -> bool306     fn try_lock_upgradable(&self) -> bool {
307         let result =
308             if self.try_lock_upgradable_fast() { true } else { self.try_lock_upgradable_slow() };
309         if result {
310             self.deadlock_acquire();
311         }
312         result
313     }
314 
315     #[inline]
unlock_upgradable(&self)316     fn unlock_upgradable(&self) {
317         self.deadlock_release();
318         let state = self.state.load(Ordering::Relaxed);
319         if state & PARKED_BIT == 0 {
320             if self
321                 .state
322                 .compare_exchange_weak(
323                     state,
324                     state - (ONE_READER | UPGRADABLE_BIT),
325                     Ordering::Release,
326                     Ordering::Relaxed,
327                 )
328                 .is_ok()
329             {
330                 return;
331             }
332         }
333         self.unlock_upgradable_slow(false);
334     }
335 
336     #[inline]
upgrade(&self)337     fn upgrade(&self) {
338         let state =
339             self.state.fetch_sub((ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, Ordering::Relaxed);
340         if state & READERS_MASK != ONE_READER {
341             let result = self.upgrade_slow(None);
342             debug_assert!(result);
343         }
344     }
345 
346     #[inline]
try_upgrade(&self) -> bool347     fn try_upgrade(&self) -> bool {
348         if self
349             .state
350             .compare_exchange_weak(
351                 ONE_READER | UPGRADABLE_BIT,
352                 WRITER_BIT,
353                 Ordering::Relaxed,
354                 Ordering::Relaxed,
355             )
356             .is_ok()
357         {
358             true
359         } else {
360             self.try_upgrade_slow()
361         }
362     }
363 }
364 
365 unsafe impl RawRwLockUpgradeFair for RawRwLock {
366     #[inline]
unlock_upgradable_fair(&self)367     fn unlock_upgradable_fair(&self) {
368         self.deadlock_release();
369         let state = self.state.load(Ordering::Relaxed);
370         if state & PARKED_BIT == 0 {
371             if self
372                 .state
373                 .compare_exchange_weak(
374                     state,
375                     state - (ONE_READER | UPGRADABLE_BIT),
376                     Ordering::Release,
377                     Ordering::Relaxed,
378                 )
379                 .is_ok()
380             {
381                 return;
382             }
383         }
384         self.unlock_upgradable_slow(false);
385     }
386 
387     #[inline]
bump_upgradable(&self)388     fn bump_upgradable(&self) {
389         if self.state.load(Ordering::Relaxed) == ONE_READER | UPGRADABLE_BIT | PARKED_BIT {
390             self.bump_upgradable_slow();
391         }
392     }
393 }
394 
395 unsafe impl RawRwLockUpgradeDowngrade for RawRwLock {
396     #[inline]
downgrade_upgradable(&self)397     fn downgrade_upgradable(&self) {
398         let state = self.state.fetch_sub(UPGRADABLE_BIT, Ordering::Relaxed);
399 
400         // Wake up parked upgradable threads if there are any
401         if state & PARKED_BIT != 0 {
402             self.downgrade_slow();
403         }
404     }
405 
406     #[inline]
downgrade_to_upgradable(&self)407     fn downgrade_to_upgradable(&self) {
408         let state =
409             self.state.fetch_add((ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, Ordering::Release);
410 
411         // Wake up parked shared threads if there are any
412         if state & PARKED_BIT != 0 {
413             self.downgrade_to_upgradable_slow();
414         }
415     }
416 }
417 
418 unsafe impl RawRwLockUpgradeTimed for RawRwLock {
419     #[inline]
try_lock_upgradable_until(&self, timeout: Instant) -> bool420     fn try_lock_upgradable_until(&self, timeout: Instant) -> bool {
421         let result = if self.try_lock_upgradable_fast() {
422             true
423         } else {
424             self.lock_upgradable_slow(Some(timeout))
425         };
426         if result {
427             self.deadlock_acquire();
428         }
429         result
430     }
431 
432     #[inline]
try_lock_upgradable_for(&self, timeout: Duration) -> bool433     fn try_lock_upgradable_for(&self, timeout: Duration) -> bool {
434         let result = if self.try_lock_upgradable_fast() {
435             true
436         } else {
437             self.lock_upgradable_slow(util::to_deadline(timeout))
438         };
439         if result {
440             self.deadlock_acquire();
441         }
442         result
443     }
444 
445     #[inline]
try_upgrade_until(&self, timeout: Instant) -> bool446     fn try_upgrade_until(&self, timeout: Instant) -> bool {
447         let state =
448             self.state.fetch_sub((ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, Ordering::Relaxed);
449         if state & READERS_MASK == ONE_READER { true } else { self.upgrade_slow(Some(timeout)) }
450     }
451 
452     #[inline]
try_upgrade_for(&self, timeout: Duration) -> bool453     fn try_upgrade_for(&self, timeout: Duration) -> bool {
454         let state =
455             self.state.fetch_sub((ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, Ordering::Relaxed);
456         if state & READERS_MASK == ONE_READER {
457             true
458         } else {
459             self.upgrade_slow(util::to_deadline(timeout))
460         }
461     }
462 }
463 
464 impl RawRwLock {
465     #[inline(always)]
try_lock_shared_fast(&self, recursive: bool) -> bool466     fn try_lock_shared_fast(&self, recursive: bool) -> bool {
467         let state = self.state.load(Ordering::Relaxed);
468 
469         // We can't allow grabbing a shared lock if there is a writer, even if
470         // the writer is still waiting for the remaining readers to exit.
471         if state & WRITER_BIT != 0 {
472             // To allow recursive locks, we make an exception and allow readers
473             // to skip ahead of a pending writer to avoid deadlocking, at the
474             // cost of breaking the fairness guarantees.
475             if !recursive || state & READERS_MASK == 0 {
476                 return false;
477             }
478         }
479 
480         // Use hardware lock elision to avoid cache conflicts when multiple
481         // readers try to acquire the lock. We only do this if the lock is
482         // completely empty since elision handles conflicts poorly.
483         if have_elision() && state == 0 {
484             self.state.elision_compare_exchange_acquire(0, ONE_READER).is_ok()
485         } else if let Some(new_state) = state.checked_add(ONE_READER) {
486             self.state
487                 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
488                 .is_ok()
489         } else {
490             false
491         }
492     }
493 
494     #[cold]
try_lock_shared_slow(&self, recursive: bool) -> bool495     fn try_lock_shared_slow(&self, recursive: bool) -> bool {
496         let mut state = self.state.load(Ordering::Relaxed);
497         loop {
498             // This mirrors the condition in try_lock_shared_fast
499             if state & WRITER_BIT != 0 {
500                 if !recursive || state & READERS_MASK == 0 {
501                     return false;
502                 }
503             }
504             if have_elision() && state == 0 {
505                 match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
506                     Ok(_) => return true,
507                     Err(x) => state = x,
508                 }
509             } else {
510                 match self.state.compare_exchange_weak(
511                     state,
512                     state.checked_add(ONE_READER).expect("RwLock reader count overflow"),
513                     Ordering::Acquire,
514                     Ordering::Relaxed,
515                 ) {
516                     Ok(_) => return true,
517                     Err(x) => state = x,
518                 }
519             }
520         }
521     }
522 
523     #[inline(always)]
try_lock_upgradable_fast(&self) -> bool524     fn try_lock_upgradable_fast(&self) -> bool {
525         let state = self.state.load(Ordering::Relaxed);
526 
527         // We can't grab an upgradable lock if there is already a writer or
528         // upgradable reader.
529         if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
530             return false;
531         }
532 
533         if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) {
534             self.state
535                 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
536                 .is_ok()
537         } else {
538             false
539         }
540     }
541 
542     #[cold]
try_lock_upgradable_slow(&self) -> bool543     fn try_lock_upgradable_slow(&self) -> bool {
544         let mut state = self.state.load(Ordering::Relaxed);
545         loop {
546             // This mirrors the condition in try_lock_upgradable_fast
547             if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
548                 return false;
549             }
550 
551             match self.state.compare_exchange_weak(
552                 state,
553                 state
554                     .checked_add(ONE_READER | UPGRADABLE_BIT)
555                     .expect("RwLock reader count overflow"),
556                 Ordering::Acquire,
557                 Ordering::Relaxed,
558             ) {
559                 Ok(_) => return true,
560                 Err(x) => state = x,
561             }
562         }
563     }
564 
565     #[cold]
lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool566     fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool {
567         // Step 1: grab exclusive ownership of WRITER_BIT
568         let timed_out = !self.lock_common(
569             timeout,
570             TOKEN_EXCLUSIVE,
571             |state| {
572                 loop {
573                     if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
574                         return false;
575                     }
576 
577                     // Grab WRITER_BIT if it isn't set, even if there are parked threads.
578                     match self.state.compare_exchange_weak(
579                         *state,
580                         *state | WRITER_BIT,
581                         Ordering::Acquire,
582                         Ordering::Relaxed,
583                     ) {
584                         Ok(_) => return true,
585                         Err(x) => *state = x,
586                     }
587                 }
588             },
589             |state| state & (WRITER_BIT | UPGRADABLE_BIT) != 0,
590         );
591         if timed_out {
592             return false;
593         }
594 
595         // Step 2: wait for all remaining readers to exit the lock.
596         self.wait_for_readers(timeout, 0)
597     }
598 
599     #[cold]
unlock_exclusive_slow(&self, force_fair: bool)600     fn unlock_exclusive_slow(&self, force_fair: bool) {
601         // There are threads to unpark. Try to unpark as many as we can.
602         let callback = |mut new_state, result: UnparkResult| {
603             // If we are using a fair unlock then we should keep the
604             // rwlock locked and hand it off to the unparked threads.
605             if result.unparked_threads != 0 && (force_fair || result.be_fair) {
606                 if result.have_more_threads {
607                     new_state |= PARKED_BIT;
608                 }
609                 self.state.store(new_state, Ordering::Release);
610                 TOKEN_HANDOFF
611             } else {
612                 // Clear the parked bit if there are no more parked threads.
613                 if result.have_more_threads {
614                     self.state.store(PARKED_BIT, Ordering::Release);
615                 } else {
616                     self.state.store(0, Ordering::Release);
617                 }
618                 TOKEN_NORMAL
619             }
620         };
621         self.wake_parked_threads(0, callback);
622     }
623 
624     #[cold]
lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool625     fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool {
626         self.lock_common(
627             timeout,
628             TOKEN_SHARED,
629             |state| {
630                 let mut spinwait_shared = SpinWait::new();
631                 loop {
632                     // Use hardware lock elision to avoid cache conflicts when multiple
633                     // readers try to acquire the lock. We only do this if the lock is
634                     // completely empty since elision handles conflicts poorly.
635                     if have_elision() && *state == 0 {
636                         match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
637                             Ok(_) => return true,
638                             Err(x) => *state = x,
639                         }
640                     }
641 
642                     // This is the same condition as try_lock_shared_fast
643                     if *state & WRITER_BIT != 0 {
644                         if !recursive || *state & READERS_MASK == 0 {
645                             return false;
646                         }
647                     }
648 
649                     if self
650                         .state
651                         .compare_exchange_weak(
652                             *state,
653                             state.checked_add(ONE_READER).expect("RwLock reader count overflow"),
654                             Ordering::Acquire,
655                             Ordering::Relaxed,
656                         )
657                         .is_ok()
658                     {
659                         return true;
660                     }
661 
662                     // If there is high contention on the reader count then we want
663                     // to leave some time between attempts to acquire the lock to
664                     // let other threads make progress.
665                     spinwait_shared.spin_no_yield();
666                     *state = self.state.load(Ordering::Relaxed);
667                 }
668             },
669             |state| state & WRITER_BIT != 0,
670         )
671     }
672 
673     #[cold]
unlock_shared_slow(&self)674     fn unlock_shared_slow(&self) {
675         // At this point WRITER_PARKED_BIT is set and READER_MASK is empty. We
676         // just need to wake up a potentially sleeping pending writer.
677         unsafe {
678             // Using the 2nd key at addr + 1
679             let addr = self as *const _ as usize + 1;
680             let callback = |result: UnparkResult| {
681                 // Clear the WRITER_PARKED_BIT here since there can only be one
682                 // parked writer thread.
683                 debug_assert!(!result.have_more_threads);
684                 self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed);
685                 TOKEN_NORMAL
686             };
687             parking_lot_core::unpark_one(addr, callback);
688         }
689     }
690 
691     #[cold]
lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool692     fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool {
693         self.lock_common(
694             timeout,
695             TOKEN_UPGRADABLE,
696             |state| {
697                 let mut spinwait_shared = SpinWait::new();
698                 loop {
699                     if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
700                         return false;
701                     }
702 
703                     if self
704                         .state
705                         .compare_exchange_weak(
706                             *state,
707                             state
708                                 .checked_add(ONE_READER | UPGRADABLE_BIT)
709                                 .expect("RwLock reader count overflow"),
710                             Ordering::Acquire,
711                             Ordering::Relaxed,
712                         )
713                         .is_ok()
714                     {
715                         return true;
716                     }
717 
718                     // If there is high contention on the reader count then we want
719                     // to leave some time between attempts to acquire the lock to
720                     // let other threads make progress.
721                     spinwait_shared.spin_no_yield();
722                     *state = self.state.load(Ordering::Relaxed);
723                 }
724             },
725             |state| state & (WRITER_BIT | UPGRADABLE_BIT) != 0,
726         )
727     }
728 
729     #[cold]
unlock_upgradable_slow(&self, force_fair: bool)730     fn unlock_upgradable_slow(&self, force_fair: bool) {
731         // Just release the lock if there are no parked threads.
732         let mut state = self.state.load(Ordering::Relaxed);
733         while state & PARKED_BIT == 0 {
734             match self.state.compare_exchange_weak(
735                 state,
736                 state - (ONE_READER | UPGRADABLE_BIT),
737                 Ordering::Release,
738                 Ordering::Relaxed,
739             ) {
740                 Ok(_) => return,
741                 Err(x) => state = x,
742             }
743         }
744 
745         // There are threads to unpark. Try to unpark as many as we can.
746         let callback = |new_state, result: UnparkResult| {
747             // If we are using a fair unlock then we should keep the
748             // rwlock locked and hand it off to the unparked threads.
749             let mut state = self.state.load(Ordering::Relaxed);
750             if force_fair || result.be_fair {
751                 // Fall back to normal unpark on overflow. Panicking is
752                 // not allowed in parking_lot callbacks.
753                 while let Some(mut new_state) =
754                     (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state)
755                 {
756                     if result.have_more_threads {
757                         new_state |= PARKED_BIT;
758                     } else {
759                         new_state &= !PARKED_BIT;
760                     }
761                     match self.state.compare_exchange_weak(
762                         state,
763                         new_state,
764                         Ordering::Relaxed,
765                         Ordering::Relaxed,
766                     ) {
767                         Ok(_) => return TOKEN_HANDOFF,
768                         Err(x) => state = x,
769                     }
770                 }
771             }
772 
773             // Otherwise just release the upgradable lock and update PARKED_BIT.
774             loop {
775                 let mut new_state = state - (ONE_READER | UPGRADABLE_BIT);
776                 if result.have_more_threads {
777                     new_state |= PARKED_BIT;
778                 } else {
779                     new_state &= !PARKED_BIT;
780                 }
781                 match self.state.compare_exchange_weak(
782                     state,
783                     new_state,
784                     Ordering::Relaxed,
785                     Ordering::Relaxed,
786                 ) {
787                     Ok(_) => return TOKEN_NORMAL,
788                     Err(x) => state = x,
789                 }
790             }
791         };
792         self.wake_parked_threads(0, callback);
793     }
794 
795     #[cold]
try_upgrade_slow(&self) -> bool796     fn try_upgrade_slow(&self) -> bool {
797         let mut state = self.state.load(Ordering::Relaxed);
798         loop {
799             if state & READERS_MASK != ONE_READER {
800                 return false;
801             }
802             match self.state.compare_exchange_weak(
803                 state,
804                 state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT,
805                 Ordering::Relaxed,
806                 Ordering::Relaxed,
807             ) {
808                 Ok(_) => return true,
809                 Err(x) => state = x,
810             }
811         }
812     }
813 
814     #[cold]
upgrade_slow(&self, timeout: Option<Instant>) -> bool815     fn upgrade_slow(&self, timeout: Option<Instant>) -> bool {
816         self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT)
817     }
818 
819     #[cold]
downgrade_slow(&self)820     fn downgrade_slow(&self) {
821         // We only reach this point if PARKED_BIT is set.
822         let callback = |_, result: UnparkResult| {
823             // Clear the parked bit if there no more parked threads
824             if !result.have_more_threads {
825                 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
826             }
827             TOKEN_NORMAL
828         };
829         self.wake_parked_threads(ONE_READER, callback);
830     }
831 
832     #[cold]
downgrade_to_upgradable_slow(&self)833     fn downgrade_to_upgradable_slow(&self) {
834         // We only reach this point if PARKED_BIT is set.
835         let callback = |_, result: UnparkResult| {
836             // Clear the parked bit if there no more parked threads
837             if !result.have_more_threads {
838                 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
839             }
840             TOKEN_NORMAL
841         };
842         self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
843     }
844 
845     #[cold]
bump_shared_slow(&self)846     fn bump_shared_slow(&self) {
847         self.unlock_shared();
848         self.lock_shared();
849     }
850 
851     #[cold]
bump_exclusive_slow(&self)852     fn bump_exclusive_slow(&self) {
853         self.deadlock_release();
854         self.unlock_exclusive_slow(true);
855         self.lock_exclusive();
856     }
857 
858     #[cold]
bump_upgradable_slow(&self)859     fn bump_upgradable_slow(&self) {
860         self.deadlock_release();
861         self.unlock_upgradable_slow(true);
862         self.lock_upgradable();
863     }
864 
865     // Common code for waking up parked threads after releasing WRITER_BIT or
866     // UPGRADABLE_BIT.
867     #[inline]
wake_parked_threads<C>(&self, new_state: usize, callback: C) where C: FnOnce(usize, UnparkResult) -> UnparkToken,868     fn wake_parked_threads<C>(&self, new_state: usize, callback: C)
869     where
870         C: FnOnce(usize, UnparkResult) -> UnparkToken,
871     {
872         // We must wake up at least one upgrader or writer if there is one,
873         // otherwise they may end up parked indefinitely since unlock_shared
874         // does not call wake_parked_threads.
875         let new_state = Cell::new(new_state);
876         unsafe {
877             let addr = self as *const _ as usize;
878             let filter = |ParkToken(token)| {
879                 let s = new_state.get();
880 
881                 // If we are waking up a writer, don't wake anything else.
882                 if s & WRITER_BIT != 0 {
883                     return FilterOp::Stop;
884                 }
885 
886                 // Otherwise wake *all* readers and one upgrader/writer.
887                 if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 {
888                     // Skip writers and upgradable readers if we already have
889                     // a writer/upgradable reader.
890                     FilterOp::Skip
891                 } else {
892                     new_state.set(s + token);
893                     FilterOp::Unpark
894                 }
895             };
896             parking_lot_core::unpark_filter(addr, filter, |result| {
897                 callback(new_state.get(), result)
898             });
899         }
900     }
901 
902     // Common code for waiting for readers to exit the lock after acquiring
903     // WRITER_BIT.
904     #[inline]
wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool905     fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool {
906         // At this point WRITER_BIT is already set, we just need to wait for the
907         // remaining readers to exit the lock.
908         let mut spinwait = SpinWait::new();
909         let mut state = self.state.load(Ordering::Relaxed);
910         while state & READERS_MASK != 0 {
911             // Spin a few times to wait for readers to exit
912             if spinwait.spin() {
913                 state = self.state.load(Ordering::Relaxed);
914                 continue;
915             }
916 
917             // Set the parked bit
918             if state & WRITER_PARKED_BIT == 0 {
919                 if let Err(x) = self.state.compare_exchange_weak(
920                     state,
921                     state | WRITER_PARKED_BIT,
922                     Ordering::Relaxed,
923                     Ordering::Relaxed,
924                 ) {
925                     state = x;
926                     continue;
927                 }
928             }
929 
930             // Park our thread until we are woken up by an unlock
931             unsafe {
932                 // Using the 2nd key at addr + 1
933                 let addr = self as *const _ as usize + 1;
934                 let validate = || {
935                     let state = self.state.load(Ordering::Relaxed);
936                     state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0
937                 };
938                 let before_sleep = || {};
939                 let timed_out = |_, _| {};
940                 match parking_lot_core::park(
941                     addr,
942                     validate,
943                     before_sleep,
944                     timed_out,
945                     TOKEN_EXCLUSIVE,
946                     timeout,
947                 ) {
948                     // We still need to re-check the state if we are unparked
949                     // since a previous writer timing-out could have allowed
950                     // another reader to sneak in before we parked.
951                     ParkResult::Unparked(_) | ParkResult::Invalid => {
952                         state = self.state.load(Ordering::Relaxed);
953                         continue;
954                     }
955 
956                     // Timeout expired
957                     ParkResult::TimedOut => {
958                         // We need to release WRITER_BIT and revert back to
959                         // our previous value. We also wake up any threads that
960                         // might be waiting on WRITER_BIT.
961                         let state = self.state.fetch_add(
962                             prev_value.wrapping_sub(WRITER_BIT | WRITER_PARKED_BIT),
963                             Ordering::Relaxed,
964                         );
965                         if state & PARKED_BIT != 0 {
966                             let callback = |_, result: UnparkResult| {
967                                 // Clear the parked bit if there no more parked threads
968                                 if !result.have_more_threads {
969                                     self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
970                                 }
971                                 TOKEN_NORMAL
972                             };
973                             self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
974                         }
975                         return false;
976                     }
977                 }
978             }
979         }
980         true
981     }
982 
983     // Common code for acquiring a lock
984     #[inline]
lock_common<F, V>( &self, timeout: Option<Instant>, token: ParkToken, mut try_lock: F, validate: V, ) -> bool where F: FnMut(&mut usize) -> bool, V: Fn(usize) -> bool,985     fn lock_common<F, V>(
986         &self,
987         timeout: Option<Instant>,
988         token: ParkToken,
989         mut try_lock: F,
990         validate: V,
991     ) -> bool
992     where
993         F: FnMut(&mut usize) -> bool,
994         V: Fn(usize) -> bool,
995     {
996         let mut spinwait = SpinWait::new();
997         let mut state = self.state.load(Ordering::Relaxed);
998         loop {
999             // Attempt to grab the lock
1000             if try_lock(&mut state) {
1001                 return true;
1002             }
1003 
1004             // If there are no parked threads, try spinning a few times.
1005             if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() {
1006                 state = self.state.load(Ordering::Relaxed);
1007                 continue;
1008             }
1009 
1010             // Set the parked bit
1011             if state & PARKED_BIT == 0 {
1012                 if let Err(x) = self.state.compare_exchange_weak(
1013                     state,
1014                     state | PARKED_BIT,
1015                     Ordering::Relaxed,
1016                     Ordering::Relaxed,
1017                 ) {
1018                     state = x;
1019                     continue;
1020                 }
1021             }
1022 
1023             // Park our thread until we are woken up by an unlock
1024             unsafe {
1025                 let addr = self as *const _ as usize;
1026                 let validate = || {
1027                     let state = self.state.load(Ordering::Relaxed);
1028                     state & PARKED_BIT != 0 && validate(state)
1029                 };
1030                 let before_sleep = || {};
1031                 let timed_out = |_, was_last_thread| {
1032                     // Clear the parked bit if we were the last parked thread
1033                     if was_last_thread {
1034                         self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1035                     }
1036                 };
1037                 match parking_lot_core::park(
1038                     addr,
1039                     validate,
1040                     before_sleep,
1041                     timed_out,
1042                     token,
1043                     timeout,
1044                 ) {
1045                     // The thread that unparked us passed the lock on to us
1046                     // directly without unlocking it.
1047                     ParkResult::Unparked(TOKEN_HANDOFF) => return true,
1048 
1049                     // We were unparked normally, try acquiring the lock again
1050                     ParkResult::Unparked(_) => (),
1051 
1052                     // The validation function failed, try locking again
1053                     ParkResult::Invalid => (),
1054 
1055                     // Timeout expired
1056                     ParkResult::TimedOut => return false,
1057                 }
1058             }
1059 
1060             // Loop back and try locking again
1061             spinwait.reset();
1062             state = self.state.load(Ordering::Relaxed);
1063         }
1064     }
1065 
1066     #[inline]
deadlock_acquire(&self)1067     fn deadlock_acquire(&self) {
1068         unsafe { deadlock::acquire_resource(self as *const _ as usize) };
1069         unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) };
1070     }
1071 
1072     #[inline]
deadlock_release(&self)1073     fn deadlock_release(&self) {
1074         unsafe { deadlock::release_resource(self as *const _ as usize) };
1075         unsafe { deadlock::release_resource(self as *const _ as usize + 1) };
1076     }
1077 }
1078