1 use std::cell::UnsafeCell;
2 use std::collections::HashMap;
3 use std::fmt;
4 use std::marker::PhantomData;
5 use std::mem;
6 use std::ops::{Deref, DerefMut};
7 use std::panic::{RefUnwindSafe, UnwindSafe};
8 use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult};
9 use std::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
10 use std::thread::{self, ThreadId};
11
12 use CachePadded;
13
14 /// The number of shards per sharded lock. Must be a power of two.
15 const NUM_SHARDS: usize = 8;
16
17 /// A shard containing a single reader-writer lock.
18 struct Shard {
19 /// The inner reader-writer lock.
20 lock: RwLock<()>,
21
22 /// The write-guard keeping this shard locked.
23 ///
24 /// Write operations will lock each shard and store the guard here. These guards get dropped at
25 /// the same time the big guard is dropped.
26 write_guard: UnsafeCell<Option<RwLockWriteGuard<'static, ()>>>,
27 }
28
29 /// A sharded reader-writer lock.
30 ///
31 /// This lock is equivalent to [`RwLock`], except read operations are faster and write operations
32 /// are slower.
33 ///
34 /// A `ShardedLock` is internally made of a list of *shards*, each being a [`RwLock`] occupying a
35 /// single cache line. Read operations will pick one of the shards depending on the current thread
36 /// and lock it. Write operations need to lock all shards in succession.
37 ///
38 /// By splitting the lock into shards, concurrent read operations will in most cases choose
39 /// different shards and thus update different cache lines, which is good for scalability. However,
40 /// write operations need to do more work and are therefore slower than usual.
41 ///
42 /// The priority policy of the lock is dependent on the underlying operating system's
43 /// implementation, and this type does not guarantee that any particular policy will be used.
44 ///
45 /// # Poisoning
46 ///
47 /// A `ShardedLock`, like [`RwLock`], will become poisoned on a panic. Note that it may only be
48 /// poisoned if a panic occurs while a write operation is in progress. If a panic occurs in any
49 /// read operation, the lock will not be poisoned.
50 ///
51 /// # Examples
52 ///
53 /// ```
54 /// use crossbeam_utils::sync::ShardedLock;
55 ///
56 /// let lock = ShardedLock::new(5);
57 ///
58 /// // Any number of read locks can be held at once.
59 /// {
60 /// let r1 = lock.read().unwrap();
61 /// let r2 = lock.read().unwrap();
62 /// assert_eq!(*r1, 5);
63 /// assert_eq!(*r2, 5);
64 /// } // Read locks are dropped at this point.
65 ///
66 /// // However, only one write lock may be held.
67 /// {
68 /// let mut w = lock.write().unwrap();
69 /// *w += 1;
70 /// assert_eq!(*w, 6);
71 /// } // Write lock is dropped here.
72 /// ```
73 ///
74 /// [`RwLock`]: https://doc.rust-lang.org/std/sync/struct.RwLock.html
75 pub struct ShardedLock<T: ?Sized> {
76 /// A list of locks protecting the internal data.
77 shards: Box<[CachePadded<Shard>]>,
78
79 /// The internal data.
80 value: UnsafeCell<T>,
81 }
82
83 unsafe impl<T: ?Sized + Send> Send for ShardedLock<T> {}
84 unsafe impl<T: ?Sized + Send + Sync> Sync for ShardedLock<T> {}
85
86 impl<T: ?Sized> UnwindSafe for ShardedLock<T> {}
87 impl<T: ?Sized> RefUnwindSafe for ShardedLock<T> {}
88
89 impl<T> ShardedLock<T> {
90 /// Creates a new sharded reader-writer lock.
91 ///
92 /// # Examples
93 ///
94 /// ```
95 /// use crossbeam_utils::sync::ShardedLock;
96 ///
97 /// let lock = ShardedLock::new(5);
98 /// ```
new(value: T) -> ShardedLock<T>99 pub fn new(value: T) -> ShardedLock<T> {
100 ShardedLock {
101 shards: (0..NUM_SHARDS)
102 .map(|_| {
103 CachePadded::new(Shard {
104 lock: RwLock::new(()),
105 write_guard: UnsafeCell::new(None),
106 })
107 })
108 .collect::<Vec<_>>()
109 .into_boxed_slice(),
110 value: UnsafeCell::new(value),
111 }
112 }
113
114 /// Consumes this lock, returning the underlying data.
115 ///
116 /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
117 /// operation panics.
118 ///
119 /// # Examples
120 ///
121 /// ```
122 /// use crossbeam_utils::sync::ShardedLock;
123 ///
124 /// let lock = ShardedLock::new(String::new());
125 /// {
126 /// let mut s = lock.write().unwrap();
127 /// *s = "modified".to_owned();
128 /// }
129 /// assert_eq!(lock.into_inner().unwrap(), "modified");
130 /// ```
into_inner(self) -> LockResult<T>131 pub fn into_inner(self) -> LockResult<T> {
132 let is_poisoned = self.is_poisoned();
133 let inner = self.value.into_inner();
134
135 if is_poisoned {
136 Err(PoisonError::new(inner))
137 } else {
138 Ok(inner)
139 }
140 }
141 }
142
143 impl<T: ?Sized> ShardedLock<T> {
144 /// Returns `true` if the lock is poisoned.
145 ///
146 /// If another thread can still access the lock, it may become poisoned at any time. A `false`
147 /// result should not be trusted without additional synchronization.
148 ///
149 /// # Examples
150 ///
151 /// ```
152 /// use crossbeam_utils::sync::ShardedLock;
153 /// use std::sync::Arc;
154 /// use std::thread;
155 ///
156 /// let lock = Arc::new(ShardedLock::new(0));
157 /// let c_lock = lock.clone();
158 ///
159 /// let _ = thread::spawn(move || {
160 /// let _lock = c_lock.write().unwrap();
161 /// panic!(); // the lock gets poisoned
162 /// }).join();
163 /// assert_eq!(lock.is_poisoned(), true);
164 /// ```
is_poisoned(&self) -> bool165 pub fn is_poisoned(&self) -> bool {
166 self.shards[0].lock.is_poisoned()
167 }
168
169 /// Returns a mutable reference to the underlying data.
170 ///
171 /// Since this call borrows the lock mutably, no actual locking needs to take place.
172 ///
173 /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
174 /// operation panics.
175 ///
176 /// # Examples
177 ///
178 /// ```
179 /// use crossbeam_utils::sync::ShardedLock;
180 ///
181 /// let mut lock = ShardedLock::new(0);
182 /// *lock.get_mut().unwrap() = 10;
183 /// assert_eq!(*lock.read().unwrap(), 10);
184 /// ```
get_mut(&mut self) -> LockResult<&mut T>185 pub fn get_mut(&mut self) -> LockResult<&mut T> {
186 let is_poisoned = self.is_poisoned();
187 let inner = unsafe { &mut *self.value.get() };
188
189 if is_poisoned {
190 Err(PoisonError::new(inner))
191 } else {
192 Ok(inner)
193 }
194 }
195
196 /// Attempts to acquire this lock with shared read access.
197 ///
198 /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
199 /// is returned which will release the shared access when it is dropped. This method does not
200 /// provide any guarantees with respect to the ordering of whether contentious readers or
201 /// writers will acquire the lock first.
202 ///
203 /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
204 /// operation panics.
205 ///
206 /// # Examples
207 ///
208 /// ```
209 /// use crossbeam_utils::sync::ShardedLock;
210 ///
211 /// let lock = ShardedLock::new(1);
212 ///
213 /// match lock.try_read() {
214 /// Ok(n) => assert_eq!(*n, 1),
215 /// Err(_) => unreachable!(),
216 /// };
217 /// ```
try_read(&self) -> TryLockResult<ShardedLockReadGuard<T>>218 pub fn try_read(&self) -> TryLockResult<ShardedLockReadGuard<T>> {
219 // Take the current thread index and map it to a shard index. Thread indices will tend to
220 // distribute shards among threads equally, thus reducing contention due to read-locking.
221 let current_index = current_index().unwrap_or(0);
222 let shard_index = current_index & (self.shards.len() - 1);
223
224 match self.shards[shard_index].lock.try_read() {
225 Ok(guard) => Ok(ShardedLockReadGuard {
226 lock: self,
227 _guard: guard,
228 _marker: PhantomData,
229 }),
230 Err(TryLockError::Poisoned(err)) => {
231 let guard = ShardedLockReadGuard {
232 lock: self,
233 _guard: err.into_inner(),
234 _marker: PhantomData,
235 };
236 Err(TryLockError::Poisoned(PoisonError::new(guard)))
237 }
238 Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock),
239 }
240 }
241
242 /// Locks with shared read access, blocking the current thread until it can be acquired.
243 ///
244 /// The calling thread will be blocked until there are no more writers which hold the lock.
245 /// There may be other readers currently inside the lock when this method returns. This method
246 /// does not provide any guarantees with respect to the ordering of whether contentious readers
247 /// or writers will acquire the lock first.
248 ///
249 /// Returns a guard which will release the shared access when dropped.
250 ///
251 /// # Examples
252 ///
253 /// ```
254 /// use crossbeam_utils::sync::ShardedLock;
255 /// use std::sync::Arc;
256 /// use std::thread;
257 ///
258 /// let lock = Arc::new(ShardedLock::new(1));
259 /// let c_lock = lock.clone();
260 ///
261 /// let n = lock.read().unwrap();
262 /// assert_eq!(*n, 1);
263 ///
264 /// thread::spawn(move || {
265 /// let r = c_lock.read();
266 /// assert!(r.is_ok());
267 /// }).join().unwrap();
268 /// ```
read(&self) -> LockResult<ShardedLockReadGuard<T>>269 pub fn read(&self) -> LockResult<ShardedLockReadGuard<T>> {
270 // Take the current thread index and map it to a shard index. Thread indices will tend to
271 // distribute shards among threads equally, thus reducing contention due to read-locking.
272 let current_index = current_index().unwrap_or(0);
273 let shard_index = current_index & (self.shards.len() - 1);
274
275 match self.shards[shard_index].lock.read() {
276 Ok(guard) => Ok(ShardedLockReadGuard {
277 lock: self,
278 _guard: guard,
279 _marker: PhantomData,
280 }),
281 Err(err) => Err(PoisonError::new(ShardedLockReadGuard {
282 lock: self,
283 _guard: err.into_inner(),
284 _marker: PhantomData,
285 })),
286 }
287 }
288
289 /// Attempts to acquire this lock with exclusive write access.
290 ///
291 /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
292 /// is returned which will release the exclusive access when it is dropped. This method does
293 /// not provide any guarantees with respect to the ordering of whether contentious readers or
294 /// writers will acquire the lock first.
295 ///
296 /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
297 /// operation panics.
298 ///
299 /// # Examples
300 ///
301 /// ```
302 /// use crossbeam_utils::sync::ShardedLock;
303 ///
304 /// let lock = ShardedLock::new(1);
305 ///
306 /// let n = lock.read().unwrap();
307 /// assert_eq!(*n, 1);
308 ///
309 /// assert!(lock.try_write().is_err());
310 /// ```
try_write(&self) -> TryLockResult<ShardedLockWriteGuard<T>>311 pub fn try_write(&self) -> TryLockResult<ShardedLockWriteGuard<T>> {
312 let mut poisoned = false;
313 let mut blocked = None;
314
315 // Write-lock each shard in succession.
316 for (i, shard) in self.shards.iter().enumerate() {
317 let guard = match shard.lock.try_write() {
318 Ok(guard) => guard,
319 Err(TryLockError::Poisoned(err)) => {
320 poisoned = true;
321 err.into_inner()
322 }
323 Err(TryLockError::WouldBlock) => {
324 blocked = Some(i);
325 break;
326 }
327 };
328
329 // Store the guard into the shard.
330 unsafe {
331 let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
332 let dest: *mut _ = shard.write_guard.get();
333 *dest = Some(guard);
334 }
335 }
336
337 if let Some(i) = blocked {
338 // Unlock the shards in reverse order of locking.
339 for shard in self.shards[0..i].iter().rev() {
340 unsafe {
341 let dest: *mut _ = shard.write_guard.get();
342 let guard = mem::replace(&mut *dest, None);
343 drop(guard);
344 }
345 }
346 Err(TryLockError::WouldBlock)
347 } else if poisoned {
348 let guard = ShardedLockWriteGuard {
349 lock: self,
350 _marker: PhantomData,
351 };
352 Err(TryLockError::Poisoned(PoisonError::new(guard)))
353 } else {
354 Ok(ShardedLockWriteGuard {
355 lock: self,
356 _marker: PhantomData,
357 })
358 }
359 }
360
361 /// Locks with exclusive write access, blocking the current thread until it can be acquired.
362 ///
363 /// The calling thread will be blocked until there are no more writers which hold the lock.
364 /// There may be other readers currently inside the lock when this method returns. This method
365 /// does not provide any guarantees with respect to the ordering of whether contentious readers
366 /// or writers will acquire the lock first.
367 ///
368 /// Returns a guard which will release the exclusive access when dropped.
369 ///
370 /// # Examples
371 ///
372 /// ```
373 /// use crossbeam_utils::sync::ShardedLock;
374 ///
375 /// let lock = ShardedLock::new(1);
376 ///
377 /// let mut n = lock.write().unwrap();
378 /// *n = 2;
379 ///
380 /// assert!(lock.try_read().is_err());
381 /// ```
write(&self) -> LockResult<ShardedLockWriteGuard<T>>382 pub fn write(&self) -> LockResult<ShardedLockWriteGuard<T>> {
383 let mut poisoned = false;
384
385 // Write-lock each shard in succession.
386 for shard in self.shards.iter() {
387 let guard = match shard.lock.write() {
388 Ok(guard) => guard,
389 Err(err) => {
390 poisoned = true;
391 err.into_inner()
392 }
393 };
394
395 // Store the guard into the shard.
396 unsafe {
397 let guard: RwLockWriteGuard<'_, ()> = guard;
398 let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
399 let dest: *mut _ = shard.write_guard.get();
400 *dest = Some(guard);
401 }
402 }
403
404 if poisoned {
405 Err(PoisonError::new(ShardedLockWriteGuard {
406 lock: self,
407 _marker: PhantomData,
408 }))
409 } else {
410 Ok(ShardedLockWriteGuard {
411 lock: self,
412 _marker: PhantomData,
413 })
414 }
415 }
416 }
417
418 impl<T: ?Sized + fmt::Debug> fmt::Debug for ShardedLock<T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result419 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
420 match self.try_read() {
421 Ok(guard) => f
422 .debug_struct("ShardedLock")
423 .field("data", &&*guard)
424 .finish(),
425 Err(TryLockError::Poisoned(err)) => f
426 .debug_struct("ShardedLock")
427 .field("data", &&**err.get_ref())
428 .finish(),
429 Err(TryLockError::WouldBlock) => {
430 struct LockedPlaceholder;
431 impl fmt::Debug for LockedPlaceholder {
432 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
433 f.write_str("<locked>")
434 }
435 }
436 f.debug_struct("ShardedLock")
437 .field("data", &LockedPlaceholder)
438 .finish()
439 }
440 }
441 }
442 }
443
444 impl<T: Default> Default for ShardedLock<T> {
default() -> ShardedLock<T>445 fn default() -> ShardedLock<T> {
446 ShardedLock::new(Default::default())
447 }
448 }
449
450 impl<T> From<T> for ShardedLock<T> {
from(t: T) -> Self451 fn from(t: T) -> Self {
452 ShardedLock::new(t)
453 }
454 }
455
456 /// A guard used to release the shared read access of a [`ShardedLock`] when dropped.
457 ///
458 /// [`ShardedLock`]: struct.ShardedLock.html
459 pub struct ShardedLockReadGuard<'a, T: ?Sized + 'a> {
460 lock: &'a ShardedLock<T>,
461 _guard: RwLockReadGuard<'a, ()>,
462 _marker: PhantomData<RwLockReadGuard<'a, T>>,
463 }
464
465 unsafe impl<'a, T: ?Sized + Sync> Sync for ShardedLockReadGuard<'a, T> {}
466
467 impl<'a, T: ?Sized> Deref for ShardedLockReadGuard<'a, T> {
468 type Target = T;
469
deref(&self) -> &T470 fn deref(&self) -> &T {
471 unsafe { &*self.lock.value.get() }
472 }
473 }
474
475 impl<'a, T: fmt::Debug> fmt::Debug for ShardedLockReadGuard<'a, T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result476 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
477 f.debug_struct("ShardedLockReadGuard")
478 .field("lock", &self.lock)
479 .finish()
480 }
481 }
482
483 impl<'a, T: ?Sized + fmt::Display> fmt::Display for ShardedLockReadGuard<'a, T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result484 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
485 (**self).fmt(f)
486 }
487 }
488
489 /// A guard used to release the exclusive write access of a [`ShardedLock`] when dropped.
490 ///
491 /// [`ShardedLock`]: struct.ShardedLock.html
492 pub struct ShardedLockWriteGuard<'a, T: ?Sized + 'a> {
493 lock: &'a ShardedLock<T>,
494 _marker: PhantomData<RwLockWriteGuard<'a, T>>,
495 }
496
497 unsafe impl<'a, T: ?Sized + Sync> Sync for ShardedLockWriteGuard<'a, T> {}
498
499 impl<'a, T: ?Sized> Drop for ShardedLockWriteGuard<'a, T> {
drop(&mut self)500 fn drop(&mut self) {
501 // Unlock the shards in reverse order of locking.
502 for shard in self.lock.shards.iter().rev() {
503 unsafe {
504 let dest: *mut _ = shard.write_guard.get();
505 let guard = mem::replace(&mut *dest, None);
506 drop(guard);
507 }
508 }
509 }
510 }
511
512 impl<'a, T: fmt::Debug> fmt::Debug for ShardedLockWriteGuard<'a, T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result513 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
514 f.debug_struct("ShardedLockWriteGuard")
515 .field("lock", &self.lock)
516 .finish()
517 }
518 }
519
520 impl<'a, T: ?Sized + fmt::Display> fmt::Display for ShardedLockWriteGuard<'a, T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result521 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
522 (**self).fmt(f)
523 }
524 }
525
526 impl<'a, T: ?Sized> Deref for ShardedLockWriteGuard<'a, T> {
527 type Target = T;
528
deref(&self) -> &T529 fn deref(&self) -> &T {
530 unsafe { &*self.lock.value.get() }
531 }
532 }
533
534 impl<'a, T: ?Sized> DerefMut for ShardedLockWriteGuard<'a, T> {
deref_mut(&mut self) -> &mut T535 fn deref_mut(&mut self) -> &mut T {
536 unsafe { &mut *self.lock.value.get() }
537 }
538 }
539
540 /// Returns a `usize` that identifies the current thread.
541 ///
542 /// Each thread is associated with an 'index'. While there are no particular guarantees, indices
543 /// usually tend to be consecutive numbers between 0 and the number of running threads.
544 ///
545 /// Since this function accesses TLS, `None` might be returned if the current thread's TLS is
546 /// tearing down.
547 #[inline]
current_index() -> Option<usize>548 fn current_index() -> Option<usize> {
549 REGISTRATION.try_with(|reg| reg.index).ok()
550 }
551
552 /// The global registry keeping track of registered threads and indices.
553 struct ThreadIndices {
554 /// Mapping from `ThreadId` to thread index.
555 mapping: HashMap<ThreadId, usize>,
556
557 /// A list of free indices.
558 free_list: Vec<usize>,
559
560 /// The next index to allocate if the free list is empty.
561 next_index: usize,
562 }
563
564 lazy_static! {
565 static ref THREAD_INDICES: Mutex<ThreadIndices> = Mutex::new(ThreadIndices {
566 mapping: HashMap::new(),
567 free_list: Vec::new(),
568 next_index: 0,
569 });
570 }
571
572 /// A registration of a thread with an index.
573 ///
574 /// When dropped, unregisters the thread and frees the reserved index.
575 struct Registration {
576 index: usize,
577 thread_id: ThreadId,
578 }
579
580 impl Drop for Registration {
drop(&mut self)581 fn drop(&mut self) {
582 let mut indices = THREAD_INDICES.lock().unwrap();
583 indices.mapping.remove(&self.thread_id);
584 indices.free_list.push(self.index);
585 }
586 }
587
588 thread_local! {
589 static REGISTRATION: Registration = {
590 let thread_id = thread::current().id();
591 let mut indices = THREAD_INDICES.lock().unwrap();
592
593 let index = match indices.free_list.pop() {
594 Some(i) => i,
595 None => {
596 let i = indices.next_index;
597 indices.next_index += 1;
598 i
599 }
600 };
601 indices.mapping.insert(thread_id, index);
602
603 Registration {
604 index,
605 thread_id,
606 }
607 };
608 }
609