1 use std::cell::UnsafeCell;
2 use std::collections::HashMap;
3 use std::fmt;
4 use std::marker::PhantomData;
5 use std::mem;
6 use std::ops::{Deref, DerefMut};
7 use std::panic::{RefUnwindSafe, UnwindSafe};
8 use std::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
9 use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult};
10 use std::thread::{self, ThreadId};
11 
12 use CachePadded;
13 
14 /// The number of shards per sharded lock. Must be a power of two.
15 const NUM_SHARDS: usize = 8;
16 
17 /// A shard containing a single reader-writer lock.
18 struct Shard {
19     /// The inner reader-writer lock.
20     lock: RwLock<()>,
21 
22     /// The write-guard keeping this shard locked.
23     ///
24     /// Write operations will lock each shard and store the guard here. These guards get dropped at
25     /// the same time the big guard is dropped.
26     write_guard: UnsafeCell<Option<RwLockWriteGuard<'static, ()>>>,
27 }
28 
29 /// A sharded reader-writer lock.
30 ///
31 /// This lock is equivalent to [`RwLock`], except read operations are faster and write operations
32 /// are slower.
33 ///
34 /// A `ShardedLock` is internally made of a list of *shards*, each being a [`RwLock`] occupying a
35 /// single cache line. Read operations will pick one of the shards depending on the current thread
36 /// and lock it. Write operations need to lock all shards in succession.
37 ///
38 /// By splitting the lock into shards, concurrent read operations will in most cases choose
39 /// different shards and thus update different cache lines, which is good for scalability. However,
40 /// write operations need to do more work and are therefore slower than usual.
41 ///
42 /// The priority policy of the lock is dependent on the underlying operating system's
43 /// implementation, and this type does not guarantee that any particular policy will be used.
44 ///
45 /// # Poisoning
46 ///
47 /// A `ShardedLock`, like [`RwLock`], will become poisoned on a panic. Note that it may only be
48 /// poisoned if a panic occurs while a write operation is in progress. If a panic occurs in any
49 /// read operation, the lock will not be poisoned.
50 ///
51 /// # Examples
52 ///
53 /// ```
54 /// use crossbeam_utils::sync::ShardedLock;
55 ///
56 /// let lock = ShardedLock::new(5);
57 ///
58 /// // Any number of read locks can be held at once.
59 /// {
60 ///     let r1 = lock.read().unwrap();
61 ///     let r2 = lock.read().unwrap();
62 ///     assert_eq!(*r1, 5);
63 ///     assert_eq!(*r2, 5);
64 /// } // Read locks are dropped at this point.
65 ///
66 /// // However, only one write lock may be held.
67 /// {
68 ///     let mut w = lock.write().unwrap();
69 ///     *w += 1;
70 ///     assert_eq!(*w, 6);
71 /// } // Write lock is dropped here.
72 /// ```
73 ///
74 /// [`RwLock`]: https://doc.rust-lang.org/std/sync/struct.RwLock.html
75 pub struct ShardedLock<T: ?Sized> {
76     /// A list of locks protecting the internal data.
77     shards: Box<[CachePadded<Shard>]>,
78 
79     /// The internal data.
80     value: UnsafeCell<T>,
81 }
82 
83 unsafe impl<T: ?Sized + Send> Send for ShardedLock<T> {}
84 unsafe impl<T: ?Sized + Send + Sync> Sync for ShardedLock<T> {}
85 
86 impl<T: ?Sized> UnwindSafe for ShardedLock<T> {}
87 impl<T: ?Sized> RefUnwindSafe for ShardedLock<T> {}
88 
89 impl<T> ShardedLock<T> {
90     /// Creates a new sharded reader-writer lock.
91     ///
92     /// # Examples
93     ///
94     /// ```
95     /// use crossbeam_utils::sync::ShardedLock;
96     ///
97     /// let lock = ShardedLock::new(5);
98     /// ```
new(value: T) -> ShardedLock<T>99     pub fn new(value: T) -> ShardedLock<T> {
100         ShardedLock {
101             shards: (0..NUM_SHARDS)
102                 .map(|_| CachePadded::new(Shard {
103                     lock: RwLock::new(()),
104                     write_guard: UnsafeCell::new(None),
105                 }))
106                 .collect::<Vec<_>>()
107                 .into_boxed_slice(),
108             value: UnsafeCell::new(value),
109         }
110     }
111 
112     /// Consumes this lock, returning the underlying data.
113     ///
114     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
115     /// operation panics.
116     ///
117     /// # Examples
118     ///
119     /// ```
120     /// use crossbeam_utils::sync::ShardedLock;
121     ///
122     /// let lock = ShardedLock::new(String::new());
123     /// {
124     ///     let mut s = lock.write().unwrap();
125     ///     *s = "modified".to_owned();
126     /// }
127     /// assert_eq!(lock.into_inner().unwrap(), "modified");
128     /// ```
into_inner(self) -> LockResult<T>129     pub fn into_inner(self) -> LockResult<T> {
130         let is_poisoned = self.is_poisoned();
131         let inner = self.value.into_inner();
132 
133         if is_poisoned {
134             Err(PoisonError::new(inner))
135         } else {
136             Ok(inner)
137         }
138     }
139 }
140 
141 impl<T: ?Sized> ShardedLock<T> {
142     /// Returns `true` if the lock is poisoned.
143     ///
144     /// If another thread can still access the lock, it may become poisoned at any time. A `false`
145     /// result should not be trusted without additional synchronization.
146     ///
147     /// # Examples
148     ///
149     /// ```
150     /// use crossbeam_utils::sync::ShardedLock;
151     /// use std::sync::Arc;
152     /// use std::thread;
153     ///
154     /// let lock = Arc::new(ShardedLock::new(0));
155     /// let c_lock = lock.clone();
156     ///
157     /// let _ = thread::spawn(move || {
158     ///     let _lock = c_lock.write().unwrap();
159     ///     panic!(); // the lock gets poisoned
160     /// }).join();
161     /// assert_eq!(lock.is_poisoned(), true);
162     /// ```
is_poisoned(&self) -> bool163     pub fn is_poisoned(&self) -> bool {
164         self.shards[0].lock.is_poisoned()
165     }
166 
167     /// Returns a mutable reference to the underlying data.
168     ///
169     /// Since this call borrows the lock mutably, no actual locking needs to take place.
170     ///
171     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
172     /// operation panics.
173     ///
174     /// # Examples
175     ///
176     /// ```
177     /// use crossbeam_utils::sync::ShardedLock;
178     ///
179     /// let mut lock = ShardedLock::new(0);
180     /// *lock.get_mut().unwrap() = 10;
181     /// assert_eq!(*lock.read().unwrap(), 10);
182     /// ```
get_mut(&mut self) -> LockResult<&mut T>183     pub fn get_mut(&mut self) -> LockResult<&mut T> {
184         let is_poisoned = self.is_poisoned();
185         let inner = unsafe { &mut *self.value.get() };
186 
187         if is_poisoned {
188             Err(PoisonError::new(inner))
189         } else {
190             Ok(inner)
191         }
192     }
193 
194     /// Attempts to acquire this lock with shared read access.
195     ///
196     /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
197     /// is returned which will release the shared access when it is dropped. This method does not
198     /// provide any guarantees with respect to the ordering of whether contentious readers or
199     /// writers will acquire the lock first.
200     ///
201     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
202     /// operation panics.
203     ///
204     /// # Examples
205     ///
206     /// ```
207     /// use crossbeam_utils::sync::ShardedLock;
208     ///
209     /// let lock = ShardedLock::new(1);
210     ///
211     /// match lock.try_read() {
212     ///     Ok(n) => assert_eq!(*n, 1),
213     ///     Err(_) => unreachable!(),
214     /// };
215     /// ```
try_read(&self) -> TryLockResult<ShardedLockReadGuard<T>>216     pub fn try_read(&self) -> TryLockResult<ShardedLockReadGuard<T>> {
217         // Take the current thread index and map it to a shard index. Thread indices will tend to
218         // distribute shards among threads equally, thus reducing contention due to read-locking.
219         let current_index = current_index().unwrap_or(0);
220         let shard_index = current_index & (self.shards.len() - 1);
221 
222         match self.shards[shard_index].lock.try_read() {
223             Ok(guard) => Ok(ShardedLockReadGuard {
224                 lock: self,
225                 _guard: guard,
226                 _marker: PhantomData,
227             }),
228             Err(TryLockError::Poisoned(err)) => {
229                 let guard = ShardedLockReadGuard {
230                     lock: self,
231                     _guard: err.into_inner(),
232                     _marker: PhantomData,
233                 };
234                 Err(TryLockError::Poisoned(PoisonError::new(guard)))
235             },
236             Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock),
237         }
238     }
239 
240     /// Locks with shared read access, blocking the current thread until it can be acquired.
241     ///
242     /// The calling thread will be blocked until there are no more writers which hold the lock.
243     /// There may be other readers currently inside the lock when this method returns. This method
244     /// does not provide any guarantees with respect to the ordering of whether contentious readers
245     /// or writers will acquire the lock first.
246     ///
247     /// Returns a guard which will release the shared access when dropped.
248     ///
249     /// # Examples
250     ///
251     /// ```
252     /// use crossbeam_utils::sync::ShardedLock;
253     /// use std::sync::Arc;
254     /// use std::thread;
255     ///
256     /// let lock = Arc::new(ShardedLock::new(1));
257     /// let c_lock = lock.clone();
258     ///
259     /// let n = lock.read().unwrap();
260     /// assert_eq!(*n, 1);
261     ///
262     /// thread::spawn(move || {
263     ///     let r = c_lock.read();
264     ///     assert!(r.is_ok());
265     /// }).join().unwrap();
266     /// ```
read(&self) -> LockResult<ShardedLockReadGuard<T>>267     pub fn read(&self) -> LockResult<ShardedLockReadGuard<T>> {
268         // Take the current thread index and map it to a shard index. Thread indices will tend to
269         // distribute shards among threads equally, thus reducing contention due to read-locking.
270         let current_index = current_index().unwrap_or(0);
271         let shard_index = current_index & (self.shards.len() - 1);
272 
273         match self.shards[shard_index].lock.read() {
274             Ok(guard) => Ok(ShardedLockReadGuard {
275                 lock: self,
276                 _guard: guard,
277                 _marker: PhantomData,
278             }),
279             Err(err) => Err(PoisonError::new(ShardedLockReadGuard {
280                 lock: self,
281                 _guard: err.into_inner(),
282                 _marker: PhantomData,
283             })),
284         }
285     }
286 
287     /// Attempts to acquire this lock with exclusive write access.
288     ///
289     /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
290     /// is returned which will release the exclusive access when it is dropped. This method does
291     /// not provide any guarantees with respect to the ordering of whether contentious readers or
292     /// writers will acquire the lock first.
293     ///
294     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
295     /// operation panics.
296     ///
297     /// # Examples
298     ///
299     /// ```
300     /// use crossbeam_utils::sync::ShardedLock;
301     ///
302     /// let lock = ShardedLock::new(1);
303     ///
304     /// let n = lock.read().unwrap();
305     /// assert_eq!(*n, 1);
306     ///
307     /// assert!(lock.try_write().is_err());
308     /// ```
try_write(&self) -> TryLockResult<ShardedLockWriteGuard<T>>309     pub fn try_write(&self) -> TryLockResult<ShardedLockWriteGuard<T>> {
310         let mut poisoned = false;
311         let mut blocked = None;
312 
313         // Write-lock each shard in succession.
314         for (i, shard) in self.shards.iter().enumerate() {
315             let guard = match shard.lock.try_write() {
316                 Ok(guard) => guard,
317                 Err(TryLockError::Poisoned(err)) => {
318                     poisoned = true;
319                     err.into_inner()
320                 },
321                 Err(TryLockError::WouldBlock) => {
322                     blocked = Some(i);
323                     break;
324                 }
325             };
326 
327             // Store the guard into the shard.
328             unsafe {
329                 let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
330                 let dest: *mut _ = shard.write_guard.get();
331                 *dest = Some(guard);
332             }
333         }
334 
335         if let Some(i) = blocked {
336             // Unlock the shards in reverse order of locking.
337             for shard in self.shards[0..i].iter().rev() {
338                 unsafe {
339                     let dest: *mut _ = shard.write_guard.get();
340                     let guard = mem::replace(&mut *dest, None);
341                     drop(guard);
342                 }
343             }
344             Err(TryLockError::WouldBlock)
345         } else if poisoned {
346             let guard = ShardedLockWriteGuard {
347                 lock: self,
348                 _marker: PhantomData,
349             };
350             Err(TryLockError::Poisoned(PoisonError::new(guard)))
351         } else {
352             Ok(ShardedLockWriteGuard {
353                 lock: self,
354                 _marker: PhantomData,
355             })
356         }
357     }
358 
359     /// Locks with exclusive write access, blocking the current thread until it can be acquired.
360     ///
361     /// The calling thread will be blocked until there are no more writers which hold the lock.
362     /// There may be other readers currently inside the lock when this method returns. This method
363     /// does not provide any guarantees with respect to the ordering of whether contentious readers
364     /// or writers will acquire the lock first.
365     ///
366     /// Returns a guard which will release the exclusive access when dropped.
367     ///
368     /// # Examples
369     ///
370     /// ```
371     /// use crossbeam_utils::sync::ShardedLock;
372     ///
373     /// let lock = ShardedLock::new(1);
374     ///
375     /// let mut n = lock.write().unwrap();
376     /// *n = 2;
377     ///
378     /// assert!(lock.try_read().is_err());
379     /// ```
write(&self) -> LockResult<ShardedLockWriteGuard<T>>380     pub fn write(&self) -> LockResult<ShardedLockWriteGuard<T>> {
381         let mut poisoned = false;
382 
383         // Write-lock each shard in succession.
384         for shard in self.shards.iter() {
385             let guard = match shard.lock.write() {
386                 Ok(guard) => guard,
387                 Err(err) => {
388                     poisoned = true;
389                     err.into_inner()
390                 }
391             };
392 
393             // Store the guard into the shard.
394             unsafe {
395                 let guard: RwLockWriteGuard<'_, ()> = guard;
396                 let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
397                 let dest: *mut _ = shard.write_guard.get();
398                 *dest = Some(guard);
399             }
400         }
401 
402         if poisoned {
403             Err(PoisonError::new(ShardedLockWriteGuard {
404                 lock: self,
405                 _marker: PhantomData,
406             }))
407         } else {
408             Ok(ShardedLockWriteGuard {
409                 lock: self,
410                 _marker: PhantomData,
411             })
412         }
413     }
414 }
415 
416 impl<T: ?Sized + fmt::Debug> fmt::Debug for ShardedLock<T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result417     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
418         match self.try_read() {
419             Ok(guard) => f.debug_struct("ShardedLock").field("data", &&*guard).finish(),
420             Err(TryLockError::Poisoned(err)) => {
421                 f.debug_struct("ShardedLock").field("data", &&**err.get_ref()).finish()
422             },
423             Err(TryLockError::WouldBlock) => {
424                 struct LockedPlaceholder;
425                 impl fmt::Debug for LockedPlaceholder {
426                     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
427                         f.write_str("<locked>")
428                     }
429                 }
430                 f.debug_struct("ShardedLock").field("data", &LockedPlaceholder).finish()
431             }
432         }
433     }
434 }
435 
436 impl<T: Default> Default for ShardedLock<T> {
default() -> ShardedLock<T>437     fn default() -> ShardedLock<T> {
438         ShardedLock::new(Default::default())
439     }
440 }
441 
442 impl<T> From<T> for ShardedLock<T> {
from(t: T) -> Self443     fn from(t: T) -> Self {
444         ShardedLock::new(t)
445     }
446 }
447 
448 /// A guard used to release the shared read access of a [`ShardedLock`] when dropped.
449 ///
450 /// [`ShardedLock`]: struct.ShardedLock.html
451 pub struct ShardedLockReadGuard<'a, T: ?Sized + 'a> {
452     lock: &'a ShardedLock<T>,
453     _guard: RwLockReadGuard<'a, ()>,
454     _marker: PhantomData<RwLockReadGuard<'a, T>>,
455 }
456 
457 unsafe impl<'a, T: ?Sized + Sync> Sync for ShardedLockReadGuard<'a, T> {}
458 
459 impl<'a, T: ?Sized> Deref for ShardedLockReadGuard<'a, T> {
460     type Target = T;
461 
deref(&self) -> &T462     fn deref(&self) -> &T {
463         unsafe { &*self.lock.value.get() }
464     }
465 }
466 
467 impl<'a, T: fmt::Debug> fmt::Debug for ShardedLockReadGuard<'a, T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result468     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
469         f.debug_struct("ShardedLockReadGuard")
470             .field("lock", &self.lock)
471             .finish()
472     }
473 }
474 
475 impl<'a, T: ?Sized + fmt::Display> fmt::Display for ShardedLockReadGuard<'a, T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result476     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
477         (**self).fmt(f)
478     }
479 }
480 
481 /// A guard used to release the exclusive write access of a [`ShardedLock`] when dropped.
482 ///
483 /// [`ShardedLock`]: struct.ShardedLock.html
484 pub struct ShardedLockWriteGuard<'a, T: ?Sized + 'a> {
485     lock: &'a ShardedLock<T>,
486     _marker: PhantomData<RwLockWriteGuard<'a, T>>,
487 }
488 
489 unsafe impl<'a, T: ?Sized + Sync> Sync for ShardedLockWriteGuard<'a, T> {}
490 
491 impl<'a, T: ?Sized> Drop for ShardedLockWriteGuard<'a, T> {
drop(&mut self)492     fn drop(&mut self) {
493         // Unlock the shards in reverse order of locking.
494         for shard in self.lock.shards.iter().rev() {
495             unsafe {
496                 let dest: *mut _ = shard.write_guard.get();
497                 let guard = mem::replace(&mut *dest, None);
498                 drop(guard);
499             }
500         }
501     }
502 }
503 
504 impl<'a, T: fmt::Debug> fmt::Debug for ShardedLockWriteGuard<'a, T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result505     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
506         f.debug_struct("ShardedLockWriteGuard")
507             .field("lock", &self.lock)
508             .finish()
509     }
510 }
511 
512 impl<'a, T: ?Sized + fmt::Display> fmt::Display for ShardedLockWriteGuard<'a, T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result513     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
514         (**self).fmt(f)
515     }
516 }
517 
518 impl<'a, T: ?Sized> Deref for ShardedLockWriteGuard<'a, T> {
519     type Target = T;
520 
deref(&self) -> &T521     fn deref(&self) -> &T {
522         unsafe { &*self.lock.value.get() }
523     }
524 }
525 
526 impl<'a, T: ?Sized> DerefMut for ShardedLockWriteGuard<'a, T> {
deref_mut(&mut self) -> &mut T527     fn deref_mut(&mut self) -> &mut T {
528         unsafe { &mut *self.lock.value.get() }
529     }
530 }
531 
532 /// Returns a `usize` that identifies the current thread.
533 ///
534 /// Each thread is associated with an 'index'. While there are no particular guarantees, indices
535 /// usually tend to be consecutive numbers between 0 and the number of running threads.
536 ///
537 /// Since this function accesses TLS, `None` might be returned if the current thread's TLS is
538 /// tearing down.
539 #[inline]
current_index() -> Option<usize>540 fn current_index() -> Option<usize> {
541     REGISTRATION.try_with(|reg| reg.index).ok()
542 }
543 
544 /// The global registry keeping track of registered threads and indices.
545 struct ThreadIndices {
546     /// Mapping from `ThreadId` to thread index.
547     mapping: HashMap<ThreadId, usize>,
548 
549     /// A list of free indices.
550     free_list: Vec<usize>,
551 
552     /// The next index to allocate if the free list is empty.
553     next_index: usize,
554 }
555 
556 lazy_static! {
557     static ref THREAD_INDICES: Mutex<ThreadIndices> = Mutex::new(ThreadIndices {
558         mapping: HashMap::new(),
559         free_list: Vec::new(),
560         next_index: 0,
561     });
562 }
563 
564 /// A registration of a thread with an index.
565 ///
566 /// When dropped, unregisters the thread and frees the reserved index.
567 struct Registration {
568     index: usize,
569     thread_id: ThreadId,
570 }
571 
572 impl Drop for Registration {
drop(&mut self)573     fn drop(&mut self) {
574         let mut indices = THREAD_INDICES.lock().unwrap();
575         indices.mapping.remove(&self.thread_id);
576         indices.free_list.push(self.index);
577     }
578 }
579 
580 thread_local! {
581     static REGISTRATION: Registration = {
582         let thread_id = thread::current().id();
583         let mut indices = THREAD_INDICES.lock().unwrap();
584 
585         let index = match indices.free_list.pop() {
586             Some(i) => i,
587             None => {
588                 let i = indices.next_index;
589                 indices.next_index += 1;
590                 i
591             }
592         };
593         indices.mapping.insert(thread_id, index);
594 
595         Registration {
596             index,
597             thread_id,
598         }
599     };
600 }
601