1 use std::cell::UnsafeCell;
2 use std::collections::HashMap;
3 use std::fmt;
4 use std::marker::PhantomData;
5 use std::mem;
6 use std::ops::{Deref, DerefMut};
7 use std::panic::{RefUnwindSafe, UnwindSafe};
8 use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult};
9 use std::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
10 use std::thread::{self, ThreadId};
11 
12 use CachePadded;
13 
14 /// The number of shards per sharded lock. Must be a power of two.
15 const NUM_SHARDS: usize = 8;
16 
17 /// A shard containing a single reader-writer lock.
18 struct Shard {
19     /// The inner reader-writer lock.
20     lock: RwLock<()>,
21 
22     /// The write-guard keeping this shard locked.
23     ///
24     /// Write operations will lock each shard and store the guard here. These guards get dropped at
25     /// the same time the big guard is dropped.
26     write_guard: UnsafeCell<Option<RwLockWriteGuard<'static, ()>>>,
27 }
28 
29 /// A sharded reader-writer lock.
30 ///
31 /// This lock is equivalent to [`RwLock`], except read operations are faster and write operations
32 /// are slower.
33 ///
34 /// A `ShardedLock` is internally made of a list of *shards*, each being a [`RwLock`] occupying a
35 /// single cache line. Read operations will pick one of the shards depending on the current thread
36 /// and lock it. Write operations need to lock all shards in succession.
37 ///
38 /// By splitting the lock into shards, concurrent read operations will in most cases choose
39 /// different shards and thus update different cache lines, which is good for scalability. However,
40 /// write operations need to do more work and are therefore slower than usual.
41 ///
42 /// The priority policy of the lock is dependent on the underlying operating system's
43 /// implementation, and this type does not guarantee that any particular policy will be used.
44 ///
45 /// # Poisoning
46 ///
47 /// A `ShardedLock`, like [`RwLock`], will become poisoned on a panic. Note that it may only be
48 /// poisoned if a panic occurs while a write operation is in progress. If a panic occurs in any
49 /// read operation, the lock will not be poisoned.
50 ///
51 /// # Examples
52 ///
53 /// ```
54 /// use crossbeam_utils::sync::ShardedLock;
55 ///
56 /// let lock = ShardedLock::new(5);
57 ///
58 /// // Any number of read locks can be held at once.
59 /// {
60 ///     let r1 = lock.read().unwrap();
61 ///     let r2 = lock.read().unwrap();
62 ///     assert_eq!(*r1, 5);
63 ///     assert_eq!(*r2, 5);
64 /// } // Read locks are dropped at this point.
65 ///
66 /// // However, only one write lock may be held.
67 /// {
68 ///     let mut w = lock.write().unwrap();
69 ///     *w += 1;
70 ///     assert_eq!(*w, 6);
71 /// } // Write lock is dropped here.
72 /// ```
73 ///
74 /// [`RwLock`]: https://doc.rust-lang.org/std/sync/struct.RwLock.html
75 pub struct ShardedLock<T: ?Sized> {
76     /// A list of locks protecting the internal data.
77     shards: Box<[CachePadded<Shard>]>,
78 
79     /// The internal data.
80     value: UnsafeCell<T>,
81 }
82 
83 unsafe impl<T: ?Sized + Send> Send for ShardedLock<T> {}
84 unsafe impl<T: ?Sized + Send + Sync> Sync for ShardedLock<T> {}
85 
86 impl<T: ?Sized> UnwindSafe for ShardedLock<T> {}
87 impl<T: ?Sized> RefUnwindSafe for ShardedLock<T> {}
88 
89 impl<T> ShardedLock<T> {
90     /// Creates a new sharded reader-writer lock.
91     ///
92     /// # Examples
93     ///
94     /// ```
95     /// use crossbeam_utils::sync::ShardedLock;
96     ///
97     /// let lock = ShardedLock::new(5);
98     /// ```
new(value: T) -> ShardedLock<T>99     pub fn new(value: T) -> ShardedLock<T> {
100         ShardedLock {
101             shards: (0..NUM_SHARDS)
102                 .map(|_| {
103                     CachePadded::new(Shard {
104                         lock: RwLock::new(()),
105                         write_guard: UnsafeCell::new(None),
106                     })
107                 })
108                 .collect::<Vec<_>>()
109                 .into_boxed_slice(),
110             value: UnsafeCell::new(value),
111         }
112     }
113 
114     /// Consumes this lock, returning the underlying data.
115     ///
116     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
117     /// operation panics.
118     ///
119     /// # Examples
120     ///
121     /// ```
122     /// use crossbeam_utils::sync::ShardedLock;
123     ///
124     /// let lock = ShardedLock::new(String::new());
125     /// {
126     ///     let mut s = lock.write().unwrap();
127     ///     *s = "modified".to_owned();
128     /// }
129     /// assert_eq!(lock.into_inner().unwrap(), "modified");
130     /// ```
into_inner(self) -> LockResult<T>131     pub fn into_inner(self) -> LockResult<T> {
132         let is_poisoned = self.is_poisoned();
133         let inner = self.value.into_inner();
134 
135         if is_poisoned {
136             Err(PoisonError::new(inner))
137         } else {
138             Ok(inner)
139         }
140     }
141 }
142 
143 impl<T: ?Sized> ShardedLock<T> {
144     /// Returns `true` if the lock is poisoned.
145     ///
146     /// If another thread can still access the lock, it may become poisoned at any time. A `false`
147     /// result should not be trusted without additional synchronization.
148     ///
149     /// # Examples
150     ///
151     /// ```
152     /// use crossbeam_utils::sync::ShardedLock;
153     /// use std::sync::Arc;
154     /// use std::thread;
155     ///
156     /// let lock = Arc::new(ShardedLock::new(0));
157     /// let c_lock = lock.clone();
158     ///
159     /// let _ = thread::spawn(move || {
160     ///     let _lock = c_lock.write().unwrap();
161     ///     panic!(); // the lock gets poisoned
162     /// }).join();
163     /// assert_eq!(lock.is_poisoned(), true);
164     /// ```
is_poisoned(&self) -> bool165     pub fn is_poisoned(&self) -> bool {
166         self.shards[0].lock.is_poisoned()
167     }
168 
169     /// Returns a mutable reference to the underlying data.
170     ///
171     /// Since this call borrows the lock mutably, no actual locking needs to take place.
172     ///
173     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
174     /// operation panics.
175     ///
176     /// # Examples
177     ///
178     /// ```
179     /// use crossbeam_utils::sync::ShardedLock;
180     ///
181     /// let mut lock = ShardedLock::new(0);
182     /// *lock.get_mut().unwrap() = 10;
183     /// assert_eq!(*lock.read().unwrap(), 10);
184     /// ```
get_mut(&mut self) -> LockResult<&mut T>185     pub fn get_mut(&mut self) -> LockResult<&mut T> {
186         let is_poisoned = self.is_poisoned();
187         let inner = unsafe { &mut *self.value.get() };
188 
189         if is_poisoned {
190             Err(PoisonError::new(inner))
191         } else {
192             Ok(inner)
193         }
194     }
195 
196     /// Attempts to acquire this lock with shared read access.
197     ///
198     /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
199     /// is returned which will release the shared access when it is dropped. This method does not
200     /// provide any guarantees with respect to the ordering of whether contentious readers or
201     /// writers will acquire the lock first.
202     ///
203     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
204     /// operation panics.
205     ///
206     /// # Examples
207     ///
208     /// ```
209     /// use crossbeam_utils::sync::ShardedLock;
210     ///
211     /// let lock = ShardedLock::new(1);
212     ///
213     /// match lock.try_read() {
214     ///     Ok(n) => assert_eq!(*n, 1),
215     ///     Err(_) => unreachable!(),
216     /// };
217     /// ```
try_read(&self) -> TryLockResult<ShardedLockReadGuard<T>>218     pub fn try_read(&self) -> TryLockResult<ShardedLockReadGuard<T>> {
219         // Take the current thread index and map it to a shard index. Thread indices will tend to
220         // distribute shards among threads equally, thus reducing contention due to read-locking.
221         let current_index = current_index().unwrap_or(0);
222         let shard_index = current_index & (self.shards.len() - 1);
223 
224         match self.shards[shard_index].lock.try_read() {
225             Ok(guard) => Ok(ShardedLockReadGuard {
226                 lock: self,
227                 _guard: guard,
228                 _marker: PhantomData,
229             }),
230             Err(TryLockError::Poisoned(err)) => {
231                 let guard = ShardedLockReadGuard {
232                     lock: self,
233                     _guard: err.into_inner(),
234                     _marker: PhantomData,
235                 };
236                 Err(TryLockError::Poisoned(PoisonError::new(guard)))
237             }
238             Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock),
239         }
240     }
241 
242     /// Locks with shared read access, blocking the current thread until it can be acquired.
243     ///
244     /// The calling thread will be blocked until there are no more writers which hold the lock.
245     /// There may be other readers currently inside the lock when this method returns. This method
246     /// does not provide any guarantees with respect to the ordering of whether contentious readers
247     /// or writers will acquire the lock first.
248     ///
249     /// Returns a guard which will release the shared access when dropped.
250     ///
251     /// # Examples
252     ///
253     /// ```
254     /// use crossbeam_utils::sync::ShardedLock;
255     /// use std::sync::Arc;
256     /// use std::thread;
257     ///
258     /// let lock = Arc::new(ShardedLock::new(1));
259     /// let c_lock = lock.clone();
260     ///
261     /// let n = lock.read().unwrap();
262     /// assert_eq!(*n, 1);
263     ///
264     /// thread::spawn(move || {
265     ///     let r = c_lock.read();
266     ///     assert!(r.is_ok());
267     /// }).join().unwrap();
268     /// ```
read(&self) -> LockResult<ShardedLockReadGuard<T>>269     pub fn read(&self) -> LockResult<ShardedLockReadGuard<T>> {
270         // Take the current thread index and map it to a shard index. Thread indices will tend to
271         // distribute shards among threads equally, thus reducing contention due to read-locking.
272         let current_index = current_index().unwrap_or(0);
273         let shard_index = current_index & (self.shards.len() - 1);
274 
275         match self.shards[shard_index].lock.read() {
276             Ok(guard) => Ok(ShardedLockReadGuard {
277                 lock: self,
278                 _guard: guard,
279                 _marker: PhantomData,
280             }),
281             Err(err) => Err(PoisonError::new(ShardedLockReadGuard {
282                 lock: self,
283                 _guard: err.into_inner(),
284                 _marker: PhantomData,
285             })),
286         }
287     }
288 
289     /// Attempts to acquire this lock with exclusive write access.
290     ///
291     /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
292     /// is returned which will release the exclusive access when it is dropped. This method does
293     /// not provide any guarantees with respect to the ordering of whether contentious readers or
294     /// writers will acquire the lock first.
295     ///
296     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
297     /// operation panics.
298     ///
299     /// # Examples
300     ///
301     /// ```
302     /// use crossbeam_utils::sync::ShardedLock;
303     ///
304     /// let lock = ShardedLock::new(1);
305     ///
306     /// let n = lock.read().unwrap();
307     /// assert_eq!(*n, 1);
308     ///
309     /// assert!(lock.try_write().is_err());
310     /// ```
try_write(&self) -> TryLockResult<ShardedLockWriteGuard<T>>311     pub fn try_write(&self) -> TryLockResult<ShardedLockWriteGuard<T>> {
312         let mut poisoned = false;
313         let mut blocked = None;
314 
315         // Write-lock each shard in succession.
316         for (i, shard) in self.shards.iter().enumerate() {
317             let guard = match shard.lock.try_write() {
318                 Ok(guard) => guard,
319                 Err(TryLockError::Poisoned(err)) => {
320                     poisoned = true;
321                     err.into_inner()
322                 }
323                 Err(TryLockError::WouldBlock) => {
324                     blocked = Some(i);
325                     break;
326                 }
327             };
328 
329             // Store the guard into the shard.
330             unsafe {
331                 let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
332                 let dest: *mut _ = shard.write_guard.get();
333                 *dest = Some(guard);
334             }
335         }
336 
337         if let Some(i) = blocked {
338             // Unlock the shards in reverse order of locking.
339             for shard in self.shards[0..i].iter().rev() {
340                 unsafe {
341                     let dest: *mut _ = shard.write_guard.get();
342                     let guard = mem::replace(&mut *dest, None);
343                     drop(guard);
344                 }
345             }
346             Err(TryLockError::WouldBlock)
347         } else if poisoned {
348             let guard = ShardedLockWriteGuard {
349                 lock: self,
350                 _marker: PhantomData,
351             };
352             Err(TryLockError::Poisoned(PoisonError::new(guard)))
353         } else {
354             Ok(ShardedLockWriteGuard {
355                 lock: self,
356                 _marker: PhantomData,
357             })
358         }
359     }
360 
361     /// Locks with exclusive write access, blocking the current thread until it can be acquired.
362     ///
363     /// The calling thread will be blocked until there are no more writers which hold the lock.
364     /// There may be other readers currently inside the lock when this method returns. This method
365     /// does not provide any guarantees with respect to the ordering of whether contentious readers
366     /// or writers will acquire the lock first.
367     ///
368     /// Returns a guard which will release the exclusive access when dropped.
369     ///
370     /// # Examples
371     ///
372     /// ```
373     /// use crossbeam_utils::sync::ShardedLock;
374     ///
375     /// let lock = ShardedLock::new(1);
376     ///
377     /// let mut n = lock.write().unwrap();
378     /// *n = 2;
379     ///
380     /// assert!(lock.try_read().is_err());
381     /// ```
write(&self) -> LockResult<ShardedLockWriteGuard<T>>382     pub fn write(&self) -> LockResult<ShardedLockWriteGuard<T>> {
383         let mut poisoned = false;
384 
385         // Write-lock each shard in succession.
386         for shard in self.shards.iter() {
387             let guard = match shard.lock.write() {
388                 Ok(guard) => guard,
389                 Err(err) => {
390                     poisoned = true;
391                     err.into_inner()
392                 }
393             };
394 
395             // Store the guard into the shard.
396             unsafe {
397                 let guard: RwLockWriteGuard<'_, ()> = guard;
398                 let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
399                 let dest: *mut _ = shard.write_guard.get();
400                 *dest = Some(guard);
401             }
402         }
403 
404         if poisoned {
405             Err(PoisonError::new(ShardedLockWriteGuard {
406                 lock: self,
407                 _marker: PhantomData,
408             }))
409         } else {
410             Ok(ShardedLockWriteGuard {
411                 lock: self,
412                 _marker: PhantomData,
413             })
414         }
415     }
416 }
417 
418 impl<T: ?Sized + fmt::Debug> fmt::Debug for ShardedLock<T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result419     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
420         match self.try_read() {
421             Ok(guard) => f
422                 .debug_struct("ShardedLock")
423                 .field("data", &&*guard)
424                 .finish(),
425             Err(TryLockError::Poisoned(err)) => f
426                 .debug_struct("ShardedLock")
427                 .field("data", &&**err.get_ref())
428                 .finish(),
429             Err(TryLockError::WouldBlock) => {
430                 struct LockedPlaceholder;
431                 impl fmt::Debug for LockedPlaceholder {
432                     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
433                         f.write_str("<locked>")
434                     }
435                 }
436                 f.debug_struct("ShardedLock")
437                     .field("data", &LockedPlaceholder)
438                     .finish()
439             }
440         }
441     }
442 }
443 
444 impl<T: Default> Default for ShardedLock<T> {
default() -> ShardedLock<T>445     fn default() -> ShardedLock<T> {
446         ShardedLock::new(Default::default())
447     }
448 }
449 
450 impl<T> From<T> for ShardedLock<T> {
from(t: T) -> Self451     fn from(t: T) -> Self {
452         ShardedLock::new(t)
453     }
454 }
455 
456 /// A guard used to release the shared read access of a [`ShardedLock`] when dropped.
457 ///
458 /// [`ShardedLock`]: struct.ShardedLock.html
459 pub struct ShardedLockReadGuard<'a, T: ?Sized + 'a> {
460     lock: &'a ShardedLock<T>,
461     _guard: RwLockReadGuard<'a, ()>,
462     _marker: PhantomData<RwLockReadGuard<'a, T>>,
463 }
464 
465 unsafe impl<'a, T: ?Sized + Sync> Sync for ShardedLockReadGuard<'a, T> {}
466 
467 impl<'a, T: ?Sized> Deref for ShardedLockReadGuard<'a, T> {
468     type Target = T;
469 
deref(&self) -> &T470     fn deref(&self) -> &T {
471         unsafe { &*self.lock.value.get() }
472     }
473 }
474 
475 impl<'a, T: fmt::Debug> fmt::Debug for ShardedLockReadGuard<'a, T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result476     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
477         f.debug_struct("ShardedLockReadGuard")
478             .field("lock", &self.lock)
479             .finish()
480     }
481 }
482 
483 impl<'a, T: ?Sized + fmt::Display> fmt::Display for ShardedLockReadGuard<'a, T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result484     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
485         (**self).fmt(f)
486     }
487 }
488 
489 /// A guard used to release the exclusive write access of a [`ShardedLock`] when dropped.
490 ///
491 /// [`ShardedLock`]: struct.ShardedLock.html
492 pub struct ShardedLockWriteGuard<'a, T: ?Sized + 'a> {
493     lock: &'a ShardedLock<T>,
494     _marker: PhantomData<RwLockWriteGuard<'a, T>>,
495 }
496 
497 unsafe impl<'a, T: ?Sized + Sync> Sync for ShardedLockWriteGuard<'a, T> {}
498 
499 impl<'a, T: ?Sized> Drop for ShardedLockWriteGuard<'a, T> {
drop(&mut self)500     fn drop(&mut self) {
501         // Unlock the shards in reverse order of locking.
502         for shard in self.lock.shards.iter().rev() {
503             unsafe {
504                 let dest: *mut _ = shard.write_guard.get();
505                 let guard = mem::replace(&mut *dest, None);
506                 drop(guard);
507             }
508         }
509     }
510 }
511 
512 impl<'a, T: fmt::Debug> fmt::Debug for ShardedLockWriteGuard<'a, T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result513     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
514         f.debug_struct("ShardedLockWriteGuard")
515             .field("lock", &self.lock)
516             .finish()
517     }
518 }
519 
520 impl<'a, T: ?Sized + fmt::Display> fmt::Display for ShardedLockWriteGuard<'a, T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result521     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
522         (**self).fmt(f)
523     }
524 }
525 
526 impl<'a, T: ?Sized> Deref for ShardedLockWriteGuard<'a, T> {
527     type Target = T;
528 
deref(&self) -> &T529     fn deref(&self) -> &T {
530         unsafe { &*self.lock.value.get() }
531     }
532 }
533 
534 impl<'a, T: ?Sized> DerefMut for ShardedLockWriteGuard<'a, T> {
deref_mut(&mut self) -> &mut T535     fn deref_mut(&mut self) -> &mut T {
536         unsafe { &mut *self.lock.value.get() }
537     }
538 }
539 
540 /// Returns a `usize` that identifies the current thread.
541 ///
542 /// Each thread is associated with an 'index'. While there are no particular guarantees, indices
543 /// usually tend to be consecutive numbers between 0 and the number of running threads.
544 ///
545 /// Since this function accesses TLS, `None` might be returned if the current thread's TLS is
546 /// tearing down.
547 #[inline]
current_index() -> Option<usize>548 fn current_index() -> Option<usize> {
549     REGISTRATION.try_with(|reg| reg.index).ok()
550 }
551 
552 /// The global registry keeping track of registered threads and indices.
553 struct ThreadIndices {
554     /// Mapping from `ThreadId` to thread index.
555     mapping: HashMap<ThreadId, usize>,
556 
557     /// A list of free indices.
558     free_list: Vec<usize>,
559 
560     /// The next index to allocate if the free list is empty.
561     next_index: usize,
562 }
563 
564 lazy_static! {
565     static ref THREAD_INDICES: Mutex<ThreadIndices> = Mutex::new(ThreadIndices {
566         mapping: HashMap::new(),
567         free_list: Vec::new(),
568         next_index: 0,
569     });
570 }
571 
572 /// A registration of a thread with an index.
573 ///
574 /// When dropped, unregisters the thread and frees the reserved index.
575 struct Registration {
576     index: usize,
577     thread_id: ThreadId,
578 }
579 
580 impl Drop for Registration {
drop(&mut self)581     fn drop(&mut self) {
582         let mut indices = THREAD_INDICES.lock().unwrap();
583         indices.mapping.remove(&self.thread_id);
584         indices.free_list.push(self.index);
585     }
586 }
587 
588 thread_local! {
589     static REGISTRATION: Registration = {
590         let thread_id = thread::current().id();
591         let mut indices = THREAD_INDICES.lock().unwrap();
592 
593         let index = match indices.free_list.pop() {
594             Some(i) => i,
595             None => {
596                 let i = indices.next_index;
597                 indices.next_index += 1;
598                 i
599             }
600         };
601         indices.mapping.insert(thread_id, index);
602 
603         Registration {
604             index,
605             thread_id,
606         }
607     };
608 }
609