1 use std::cell::UnsafeCell;
2 use std::collections::HashMap;
3 use std::fmt;
4 use std::marker::PhantomData;
5 use std::mem;
6 use std::ops::{Deref, DerefMut};
7 use std::panic::{RefUnwindSafe, UnwindSafe};
8 use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult};
9 use std::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
10 use std::thread::{self, ThreadId};
11 
12 use crate::CachePadded;
13 use lazy_static::lazy_static;
14 
15 /// The number of shards per sharded lock. Must be a power of two.
16 const NUM_SHARDS: usize = 8;
17 
18 /// A shard containing a single reader-writer lock.
19 struct Shard {
20     /// The inner reader-writer lock.
21     lock: RwLock<()>,
22 
23     /// The write-guard keeping this shard locked.
24     ///
25     /// Write operations will lock each shard and store the guard here. These guards get dropped at
26     /// the same time the big guard is dropped.
27     write_guard: UnsafeCell<Option<RwLockWriteGuard<'static, ()>>>,
28 }
29 
30 /// A sharded reader-writer lock.
31 ///
32 /// This lock is equivalent to [`RwLock`], except read operations are faster and write operations
33 /// are slower.
34 ///
35 /// A `ShardedLock` is internally made of a list of *shards*, each being a [`RwLock`] occupying a
36 /// single cache line. Read operations will pick one of the shards depending on the current thread
37 /// and lock it. Write operations need to lock all shards in succession.
38 ///
39 /// By splitting the lock into shards, concurrent read operations will in most cases choose
40 /// different shards and thus update different cache lines, which is good for scalability. However,
41 /// write operations need to do more work and are therefore slower than usual.
42 ///
43 /// The priority policy of the lock is dependent on the underlying operating system's
44 /// implementation, and this type does not guarantee that any particular policy will be used.
45 ///
46 /// # Poisoning
47 ///
48 /// A `ShardedLock`, like [`RwLock`], will become poisoned on a panic. Note that it may only be
49 /// poisoned if a panic occurs while a write operation is in progress. If a panic occurs in any
50 /// read operation, the lock will not be poisoned.
51 ///
52 /// # Examples
53 ///
54 /// ```
55 /// use crossbeam_utils::sync::ShardedLock;
56 ///
57 /// let lock = ShardedLock::new(5);
58 ///
59 /// // Any number of read locks can be held at once.
60 /// {
61 ///     let r1 = lock.read().unwrap();
62 ///     let r2 = lock.read().unwrap();
63 ///     assert_eq!(*r1, 5);
64 ///     assert_eq!(*r2, 5);
65 /// } // Read locks are dropped at this point.
66 ///
67 /// // However, only one write lock may be held.
68 /// {
69 ///     let mut w = lock.write().unwrap();
70 ///     *w += 1;
71 ///     assert_eq!(*w, 6);
72 /// } // Write lock is dropped here.
73 /// ```
74 ///
75 /// [`RwLock`]: std::sync::RwLock
76 pub struct ShardedLock<T: ?Sized> {
77     /// A list of locks protecting the internal data.
78     shards: Box<[CachePadded<Shard>]>,
79 
80     /// The internal data.
81     value: UnsafeCell<T>,
82 }
83 
84 unsafe impl<T: ?Sized + Send> Send for ShardedLock<T> {}
85 unsafe impl<T: ?Sized + Send + Sync> Sync for ShardedLock<T> {}
86 
87 impl<T: ?Sized> UnwindSafe for ShardedLock<T> {}
88 impl<T: ?Sized> RefUnwindSafe for ShardedLock<T> {}
89 
90 impl<T> ShardedLock<T> {
91     /// Creates a new sharded reader-writer lock.
92     ///
93     /// # Examples
94     ///
95     /// ```
96     /// use crossbeam_utils::sync::ShardedLock;
97     ///
98     /// let lock = ShardedLock::new(5);
99     /// ```
new(value: T) -> ShardedLock<T>100     pub fn new(value: T) -> ShardedLock<T> {
101         ShardedLock {
102             shards: (0..NUM_SHARDS)
103                 .map(|_| {
104                     CachePadded::new(Shard {
105                         lock: RwLock::new(()),
106                         write_guard: UnsafeCell::new(None),
107                     })
108                 })
109                 .collect::<Box<[_]>>(),
110             value: UnsafeCell::new(value),
111         }
112     }
113 
114     /// Consumes this lock, returning the underlying data.
115     ///
116     /// # Errors
117     ///
118     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
119     /// operation panics.
120     ///
121     /// # Examples
122     ///
123     /// ```
124     /// use crossbeam_utils::sync::ShardedLock;
125     ///
126     /// let lock = ShardedLock::new(String::new());
127     /// {
128     ///     let mut s = lock.write().unwrap();
129     ///     *s = "modified".to_owned();
130     /// }
131     /// assert_eq!(lock.into_inner().unwrap(), "modified");
132     /// ```
into_inner(self) -> LockResult<T>133     pub fn into_inner(self) -> LockResult<T> {
134         let is_poisoned = self.is_poisoned();
135         let inner = self.value.into_inner();
136 
137         if is_poisoned {
138             Err(PoisonError::new(inner))
139         } else {
140             Ok(inner)
141         }
142     }
143 }
144 
145 impl<T: ?Sized> ShardedLock<T> {
146     /// Returns `true` if the lock is poisoned.
147     ///
148     /// If another thread can still access the lock, it may become poisoned at any time. A `false`
149     /// result should not be trusted without additional synchronization.
150     ///
151     /// # Examples
152     ///
153     /// ```
154     /// use crossbeam_utils::sync::ShardedLock;
155     /// use std::sync::Arc;
156     /// use std::thread;
157     ///
158     /// let lock = Arc::new(ShardedLock::new(0));
159     /// let c_lock = lock.clone();
160     ///
161     /// let _ = thread::spawn(move || {
162     ///     let _lock = c_lock.write().unwrap();
163     ///     panic!(); // the lock gets poisoned
164     /// }).join();
165     /// assert_eq!(lock.is_poisoned(), true);
166     /// ```
is_poisoned(&self) -> bool167     pub fn is_poisoned(&self) -> bool {
168         self.shards[0].lock.is_poisoned()
169     }
170 
171     /// Returns a mutable reference to the underlying data.
172     ///
173     /// Since this call borrows the lock mutably, no actual locking needs to take place.
174     ///
175     /// # Errors
176     ///
177     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
178     /// operation panics.
179     ///
180     /// # Examples
181     ///
182     /// ```
183     /// use crossbeam_utils::sync::ShardedLock;
184     ///
185     /// let mut lock = ShardedLock::new(0);
186     /// *lock.get_mut().unwrap() = 10;
187     /// assert_eq!(*lock.read().unwrap(), 10);
188     /// ```
get_mut(&mut self) -> LockResult<&mut T>189     pub fn get_mut(&mut self) -> LockResult<&mut T> {
190         let is_poisoned = self.is_poisoned();
191         let inner = unsafe { &mut *self.value.get() };
192 
193         if is_poisoned {
194             Err(PoisonError::new(inner))
195         } else {
196             Ok(inner)
197         }
198     }
199 
200     /// Attempts to acquire this lock with shared read access.
201     ///
202     /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
203     /// is returned which will release the shared access when it is dropped. This method does not
204     /// provide any guarantees with respect to the ordering of whether contentious readers or
205     /// writers will acquire the lock first.
206     ///
207     /// # Errors
208     ///
209     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
210     /// operation panics.
211     ///
212     /// # Examples
213     ///
214     /// ```
215     /// use crossbeam_utils::sync::ShardedLock;
216     ///
217     /// let lock = ShardedLock::new(1);
218     ///
219     /// match lock.try_read() {
220     ///     Ok(n) => assert_eq!(*n, 1),
221     ///     Err(_) => unreachable!(),
222     /// };
223     /// ```
try_read(&self) -> TryLockResult<ShardedLockReadGuard<'_, T>>224     pub fn try_read(&self) -> TryLockResult<ShardedLockReadGuard<'_, T>> {
225         // Take the current thread index and map it to a shard index. Thread indices will tend to
226         // distribute shards among threads equally, thus reducing contention due to read-locking.
227         let current_index = current_index().unwrap_or(0);
228         let shard_index = current_index & (self.shards.len() - 1);
229 
230         match self.shards[shard_index].lock.try_read() {
231             Ok(guard) => Ok(ShardedLockReadGuard {
232                 lock: self,
233                 _guard: guard,
234                 _marker: PhantomData,
235             }),
236             Err(TryLockError::Poisoned(err)) => {
237                 let guard = ShardedLockReadGuard {
238                     lock: self,
239                     _guard: err.into_inner(),
240                     _marker: PhantomData,
241                 };
242                 Err(TryLockError::Poisoned(PoisonError::new(guard)))
243             }
244             Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock),
245         }
246     }
247 
248     /// Locks with shared read access, blocking the current thread until it can be acquired.
249     ///
250     /// The calling thread will be blocked until there are no more writers which hold the lock.
251     /// There may be other readers currently inside the lock when this method returns. This method
252     /// does not provide any guarantees with respect to the ordering of whether contentious readers
253     /// or writers will acquire the lock first.
254     ///
255     /// Returns a guard which will release the shared access when dropped.
256     ///
257     /// # Errors
258     ///
259     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
260     /// operation panics.
261     ///
262     /// # Panics
263     ///
264     /// This method might panic when called if the lock is already held by the current thread.
265     ///
266     /// # Examples
267     ///
268     /// ```
269     /// use crossbeam_utils::sync::ShardedLock;
270     /// use std::sync::Arc;
271     /// use std::thread;
272     ///
273     /// let lock = Arc::new(ShardedLock::new(1));
274     /// let c_lock = lock.clone();
275     ///
276     /// let n = lock.read().unwrap();
277     /// assert_eq!(*n, 1);
278     ///
279     /// thread::spawn(move || {
280     ///     let r = c_lock.read();
281     ///     assert!(r.is_ok());
282     /// }).join().unwrap();
283     /// ```
read(&self) -> LockResult<ShardedLockReadGuard<'_, T>>284     pub fn read(&self) -> LockResult<ShardedLockReadGuard<'_, T>> {
285         // Take the current thread index and map it to a shard index. Thread indices will tend to
286         // distribute shards among threads equally, thus reducing contention due to read-locking.
287         let current_index = current_index().unwrap_or(0);
288         let shard_index = current_index & (self.shards.len() - 1);
289 
290         match self.shards[shard_index].lock.read() {
291             Ok(guard) => Ok(ShardedLockReadGuard {
292                 lock: self,
293                 _guard: guard,
294                 _marker: PhantomData,
295             }),
296             Err(err) => Err(PoisonError::new(ShardedLockReadGuard {
297                 lock: self,
298                 _guard: err.into_inner(),
299                 _marker: PhantomData,
300             })),
301         }
302     }
303 
304     /// Attempts to acquire this lock with exclusive write access.
305     ///
306     /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
307     /// is returned which will release the exclusive access when it is dropped. This method does
308     /// not provide any guarantees with respect to the ordering of whether contentious readers or
309     /// writers will acquire the lock first.
310     ///
311     /// # Errors
312     ///
313     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
314     /// operation panics.
315     ///
316     /// # Examples
317     ///
318     /// ```
319     /// use crossbeam_utils::sync::ShardedLock;
320     ///
321     /// let lock = ShardedLock::new(1);
322     ///
323     /// let n = lock.read().unwrap();
324     /// assert_eq!(*n, 1);
325     ///
326     /// assert!(lock.try_write().is_err());
327     /// ```
try_write(&self) -> TryLockResult<ShardedLockWriteGuard<'_, T>>328     pub fn try_write(&self) -> TryLockResult<ShardedLockWriteGuard<'_, T>> {
329         let mut poisoned = false;
330         let mut blocked = None;
331 
332         // Write-lock each shard in succession.
333         for (i, shard) in self.shards.iter().enumerate() {
334             let guard = match shard.lock.try_write() {
335                 Ok(guard) => guard,
336                 Err(TryLockError::Poisoned(err)) => {
337                     poisoned = true;
338                     err.into_inner()
339                 }
340                 Err(TryLockError::WouldBlock) => {
341                     blocked = Some(i);
342                     break;
343                 }
344             };
345 
346             // Store the guard into the shard.
347             unsafe {
348                 let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
349                 let dest: *mut _ = shard.write_guard.get();
350                 *dest = Some(guard);
351             }
352         }
353 
354         if let Some(i) = blocked {
355             // Unlock the shards in reverse order of locking.
356             for shard in self.shards[0..i].iter().rev() {
357                 unsafe {
358                     let dest: *mut _ = shard.write_guard.get();
359                     let guard = mem::replace(&mut *dest, None);
360                     drop(guard);
361                 }
362             }
363             Err(TryLockError::WouldBlock)
364         } else if poisoned {
365             let guard = ShardedLockWriteGuard {
366                 lock: self,
367                 _marker: PhantomData,
368             };
369             Err(TryLockError::Poisoned(PoisonError::new(guard)))
370         } else {
371             Ok(ShardedLockWriteGuard {
372                 lock: self,
373                 _marker: PhantomData,
374             })
375         }
376     }
377 
378     /// Locks with exclusive write access, blocking the current thread until it can be acquired.
379     ///
380     /// The calling thread will be blocked until there are no more writers which hold the lock.
381     /// There may be other readers currently inside the lock when this method returns. This method
382     /// does not provide any guarantees with respect to the ordering of whether contentious readers
383     /// or writers will acquire the lock first.
384     ///
385     /// Returns a guard which will release the exclusive access when dropped.
386     ///
387     /// # Errors
388     ///
389     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
390     /// operation panics.
391     ///
392     /// # Panics
393     ///
394     /// This method might panic when called if the lock is already held by the current thread.
395     ///
396     /// # Examples
397     ///
398     /// ```
399     /// use crossbeam_utils::sync::ShardedLock;
400     ///
401     /// let lock = ShardedLock::new(1);
402     ///
403     /// let mut n = lock.write().unwrap();
404     /// *n = 2;
405     ///
406     /// assert!(lock.try_read().is_err());
407     /// ```
write(&self) -> LockResult<ShardedLockWriteGuard<'_, T>>408     pub fn write(&self) -> LockResult<ShardedLockWriteGuard<'_, T>> {
409         let mut poisoned = false;
410 
411         // Write-lock each shard in succession.
412         for shard in self.shards.iter() {
413             let guard = match shard.lock.write() {
414                 Ok(guard) => guard,
415                 Err(err) => {
416                     poisoned = true;
417                     err.into_inner()
418                 }
419             };
420 
421             // Store the guard into the shard.
422             unsafe {
423                 let guard: RwLockWriteGuard<'_, ()> = guard;
424                 let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
425                 let dest: *mut _ = shard.write_guard.get();
426                 *dest = Some(guard);
427             }
428         }
429 
430         if poisoned {
431             Err(PoisonError::new(ShardedLockWriteGuard {
432                 lock: self,
433                 _marker: PhantomData,
434             }))
435         } else {
436             Ok(ShardedLockWriteGuard {
437                 lock: self,
438                 _marker: PhantomData,
439             })
440         }
441     }
442 }
443 
444 impl<T: ?Sized + fmt::Debug> fmt::Debug for ShardedLock<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result445     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
446         match self.try_read() {
447             Ok(guard) => f
448                 .debug_struct("ShardedLock")
449                 .field("data", &&*guard)
450                 .finish(),
451             Err(TryLockError::Poisoned(err)) => f
452                 .debug_struct("ShardedLock")
453                 .field("data", &&**err.get_ref())
454                 .finish(),
455             Err(TryLockError::WouldBlock) => {
456                 struct LockedPlaceholder;
457                 impl fmt::Debug for LockedPlaceholder {
458                     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
459                         f.write_str("<locked>")
460                     }
461                 }
462                 f.debug_struct("ShardedLock")
463                     .field("data", &LockedPlaceholder)
464                     .finish()
465             }
466         }
467     }
468 }
469 
470 impl<T: Default> Default for ShardedLock<T> {
default() -> ShardedLock<T>471     fn default() -> ShardedLock<T> {
472         ShardedLock::new(Default::default())
473     }
474 }
475 
476 impl<T> From<T> for ShardedLock<T> {
from(t: T) -> Self477     fn from(t: T) -> Self {
478         ShardedLock::new(t)
479     }
480 }
481 
482 /// A guard used to release the shared read access of a [`ShardedLock`] when dropped.
483 pub struct ShardedLockReadGuard<'a, T: ?Sized> {
484     lock: &'a ShardedLock<T>,
485     _guard: RwLockReadGuard<'a, ()>,
486     _marker: PhantomData<RwLockReadGuard<'a, T>>,
487 }
488 
489 unsafe impl<T: ?Sized + Sync> Sync for ShardedLockReadGuard<'_, T> {}
490 
491 impl<T: ?Sized> Deref for ShardedLockReadGuard<'_, T> {
492     type Target = T;
493 
deref(&self) -> &T494     fn deref(&self) -> &T {
495         unsafe { &*self.lock.value.get() }
496     }
497 }
498 
499 impl<T: fmt::Debug> fmt::Debug for ShardedLockReadGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result500     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
501         f.debug_struct("ShardedLockReadGuard")
502             .field("lock", &self.lock)
503             .finish()
504     }
505 }
506 
507 impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockReadGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result508     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
509         (**self).fmt(f)
510     }
511 }
512 
513 /// A guard used to release the exclusive write access of a [`ShardedLock`] when dropped.
514 pub struct ShardedLockWriteGuard<'a, T: ?Sized> {
515     lock: &'a ShardedLock<T>,
516     _marker: PhantomData<RwLockWriteGuard<'a, T>>,
517 }
518 
519 unsafe impl<T: ?Sized + Sync> Sync for ShardedLockWriteGuard<'_, T> {}
520 
521 impl<T: ?Sized> Drop for ShardedLockWriteGuard<'_, T> {
drop(&mut self)522     fn drop(&mut self) {
523         // Unlock the shards in reverse order of locking.
524         for shard in self.lock.shards.iter().rev() {
525             unsafe {
526                 let dest: *mut _ = shard.write_guard.get();
527                 let guard = mem::replace(&mut *dest, None);
528                 drop(guard);
529             }
530         }
531     }
532 }
533 
534 impl<T: fmt::Debug> fmt::Debug for ShardedLockWriteGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result535     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
536         f.debug_struct("ShardedLockWriteGuard")
537             .field("lock", &self.lock)
538             .finish()
539     }
540 }
541 
542 impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockWriteGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result543     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
544         (**self).fmt(f)
545     }
546 }
547 
548 impl<T: ?Sized> Deref for ShardedLockWriteGuard<'_, T> {
549     type Target = T;
550 
deref(&self) -> &T551     fn deref(&self) -> &T {
552         unsafe { &*self.lock.value.get() }
553     }
554 }
555 
556 impl<T: ?Sized> DerefMut for ShardedLockWriteGuard<'_, T> {
deref_mut(&mut self) -> &mut T557     fn deref_mut(&mut self) -> &mut T {
558         unsafe { &mut *self.lock.value.get() }
559     }
560 }
561 
562 /// Returns a `usize` that identifies the current thread.
563 ///
564 /// Each thread is associated with an 'index'. While there are no particular guarantees, indices
565 /// usually tend to be consecutive numbers between 0 and the number of running threads.
566 ///
567 /// Since this function accesses TLS, `None` might be returned if the current thread's TLS is
568 /// tearing down.
569 #[inline]
current_index() -> Option<usize>570 fn current_index() -> Option<usize> {
571     REGISTRATION.try_with(|reg| reg.index).ok()
572 }
573 
574 /// The global registry keeping track of registered threads and indices.
575 struct ThreadIndices {
576     /// Mapping from `ThreadId` to thread index.
577     mapping: HashMap<ThreadId, usize>,
578 
579     /// A list of free indices.
580     free_list: Vec<usize>,
581 
582     /// The next index to allocate if the free list is empty.
583     next_index: usize,
584 }
585 
586 lazy_static! {
587     static ref THREAD_INDICES: Mutex<ThreadIndices> = Mutex::new(ThreadIndices {
588         mapping: HashMap::new(),
589         free_list: Vec::new(),
590         next_index: 0,
591     });
592 }
593 
594 /// A registration of a thread with an index.
595 ///
596 /// When dropped, unregisters the thread and frees the reserved index.
597 struct Registration {
598     index: usize,
599     thread_id: ThreadId,
600 }
601 
602 impl Drop for Registration {
drop(&mut self)603     fn drop(&mut self) {
604         let mut indices = THREAD_INDICES.lock().unwrap();
605         indices.mapping.remove(&self.thread_id);
606         indices.free_list.push(self.index);
607     }
608 }
609 
610 thread_local! {
611     static REGISTRATION: Registration = {
612         let thread_id = thread::current().id();
613         let mut indices = THREAD_INDICES.lock().unwrap();
614 
615         let index = match indices.free_list.pop() {
616             Some(i) => i,
617             None => {
618                 let i = indices.next_index;
619                 indices.next_index += 1;
620                 i
621             }
622         };
623         indices.mapping.insert(thread_id, index);
624 
625         Registration {
626             index,
627             thread_id,
628         }
629     };
630 }
631