1 //! A ticket-based mutex. 2 //! 3 //! Waiting threads take a 'ticket' from the lock in the order they arrive and gain access to the lock when their 4 //! ticket is next in the queue. Best-case latency is slightly worse than a regular spinning mutex, but worse-case 5 //! latency is infinitely better. Waiting threads simply need to wait for all threads that come before them in the 6 //! queue to finish. 7 8 use core::{ 9 cell::UnsafeCell, 10 fmt, 11 ops::{Deref, DerefMut}, 12 sync::atomic::{AtomicUsize, Ordering}, 13 marker::PhantomData, 14 }; 15 use crate::{RelaxStrategy, Spin}; 16 17 /// A spin-based [ticket lock](https://en.wikipedia.org/wiki/Ticket_lock) providing mutually exclusive access to data. 18 /// 19 /// A ticket lock is analagous to a queue management system for lock requests. When a thread tries to take a lock, it 20 /// is assigned a 'ticket'. It then spins until its ticket becomes next in line. When the lock guard is released, the 21 /// next ticket will be processed. 22 /// 23 /// Ticket locks significantly reduce the worse-case performance of locking at the cost of slightly higher average-time 24 /// overhead. 25 /// 26 /// # Example 27 /// 28 /// ``` 29 /// use spin; 30 /// 31 /// let lock = spin::mutex::TicketMutex::<_>::new(0); 32 /// 33 /// // Modify the data 34 /// *lock.lock() = 2; 35 /// 36 /// // Read the data 37 /// let answer = *lock.lock(); 38 /// assert_eq!(answer, 2); 39 /// ``` 40 /// 41 /// # Thread safety example 42 /// 43 /// ``` 44 /// use spin; 45 /// use std::sync::{Arc, Barrier}; 46 /// 47 /// let thread_count = 1000; 48 /// let spin_mutex = Arc::new(spin::mutex::TicketMutex::<_>::new(0)); 49 /// 50 /// // We use a barrier to ensure the readout happens after all writing 51 /// let barrier = Arc::new(Barrier::new(thread_count + 1)); 52 /// 53 /// for _ in (0..thread_count) { 54 /// let my_barrier = barrier.clone(); 55 /// let my_lock = spin_mutex.clone(); 56 /// std::thread::spawn(move || { 57 /// let mut guard = my_lock.lock(); 58 /// *guard += 1; 59 /// 60 /// // Release the lock to prevent a deadlock 61 /// drop(guard); 62 /// my_barrier.wait(); 63 /// }); 64 /// } 65 /// 66 /// barrier.wait(); 67 /// 68 /// let answer = { *spin_mutex.lock() }; 69 /// assert_eq!(answer, thread_count); 70 /// ``` 71 pub struct TicketMutex<T: ?Sized, R = Spin> { 72 phantom: PhantomData<R>, 73 next_ticket: AtomicUsize, 74 next_serving: AtomicUsize, 75 data: UnsafeCell<T>, 76 } 77 78 /// A guard that protects some data. 79 /// 80 /// When the guard is dropped, the next ticket will be processed. 81 pub struct TicketMutexGuard<'a, T: ?Sized + 'a> { 82 next_serving: &'a AtomicUsize, 83 ticket: usize, 84 data: &'a mut T, 85 } 86 87 unsafe impl<T: ?Sized + Send> Sync for TicketMutex<T> {} 88 unsafe impl<T: ?Sized + Send> Send for TicketMutex<T> {} 89 90 impl<T, R> TicketMutex<T, R> { 91 /// Creates a new [`TicketMutex`] wrapping the supplied data. 92 /// 93 /// # Example 94 /// 95 /// ``` 96 /// use spin::mutex::TicketMutex; 97 /// 98 /// static MUTEX: TicketMutex<()> = TicketMutex::<_>::new(()); 99 /// 100 /// fn demo() { 101 /// let lock = MUTEX.lock(); 102 /// // do something with lock 103 /// drop(lock); 104 /// } 105 /// ``` 106 #[inline(always)] new(data: T) -> Self107 pub const fn new(data: T) -> Self { 108 Self { 109 phantom: PhantomData, 110 next_ticket: AtomicUsize::new(0), 111 next_serving: AtomicUsize::new(0), 112 data: UnsafeCell::new(data), 113 } 114 } 115 116 /// Consumes this [`TicketMutex`] and unwraps the underlying data. 117 /// 118 /// # Example 119 /// 120 /// ``` 121 /// let lock = spin::mutex::TicketMutex::<_>::new(42); 122 /// assert_eq!(42, lock.into_inner()); 123 /// ``` 124 #[inline(always)] into_inner(self) -> T125 pub fn into_inner(self) -> T { 126 self.data.into_inner() 127 } 128 /// Returns a mutable pointer to the underying data. 129 /// 130 /// This is mostly meant to be used for applications which require manual unlocking, but where 131 /// storing both the lock and the pointer to the inner data gets inefficient. 132 /// 133 /// # Example 134 /// ``` 135 /// let lock = spin::mutex::SpinMutex::<_>::new(42); 136 /// 137 /// unsafe { 138 /// core::mem::forget(lock.lock()); 139 /// 140 /// assert_eq!(lock.as_mut_ptr().read(), 42); 141 /// lock.as_mut_ptr().write(58); 142 /// 143 /// lock.force_unlock(); 144 /// } 145 /// 146 /// assert_eq!(*lock.lock(), 58); 147 /// 148 /// ``` 149 #[inline(always)] as_mut_ptr(&self) -> *mut T150 pub fn as_mut_ptr(&self) -> *mut T { 151 self.data.get() 152 } 153 } 154 155 impl<T: ?Sized + fmt::Debug, R> fmt::Debug for TicketMutex<T, R> { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result156 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 157 match self.try_lock() { 158 Some(guard) => write!(f, "Mutex {{ data: ") 159 .and_then(|()| (&*guard).fmt(f)) 160 .and_then(|()| write!(f, "}}")), 161 None => write!(f, "Mutex {{ <locked> }}"), 162 } 163 } 164 } 165 166 impl<T: ?Sized, R: RelaxStrategy> TicketMutex<T, R> { 167 /// Locks the [`TicketMutex`] and returns a guard that permits access to the inner data. 168 /// 169 /// The returned data may be dereferenced for data access 170 /// and the lock will be dropped when the guard falls out of scope. 171 /// 172 /// ``` 173 /// let lock = spin::mutex::TicketMutex::<_>::new(0); 174 /// { 175 /// let mut data = lock.lock(); 176 /// // The lock is now locked and the data can be accessed 177 /// *data += 1; 178 /// // The lock is implicitly dropped at the end of the scope 179 /// } 180 /// ``` 181 #[inline(always)] lock(&self) -> TicketMutexGuard<T>182 pub fn lock(&self) -> TicketMutexGuard<T> { 183 let ticket = self.next_ticket.fetch_add(1, Ordering::Relaxed); 184 185 while self.next_serving.load(Ordering::Acquire) != ticket { 186 R::relax(); 187 } 188 189 TicketMutexGuard { 190 next_serving: &self.next_serving, 191 ticket, 192 // Safety 193 // We know that we are the next ticket to be served, 194 // so there's no other thread accessing the data. 195 // 196 // Every other thread has another ticket number so it's 197 // definitely stuck in the spin loop above. 198 data: unsafe { &mut *self.data.get() }, 199 } 200 } 201 } 202 203 impl<T: ?Sized, R> TicketMutex<T, R> { 204 /// Returns `true` if the lock is currently held. 205 /// 206 /// # Safety 207 /// 208 /// This function provides no synchronization guarantees and so its result should be considered 'out of date' 209 /// the instant it is called. Do not use it for synchronization purposes. However, it may be useful as a heuristic. 210 #[inline(always)] is_locked(&self) -> bool211 pub fn is_locked(&self) -> bool { 212 let ticket = self.next_ticket.load(Ordering::Relaxed); 213 self.next_serving.load(Ordering::Relaxed) != ticket 214 } 215 216 /// Force unlock this [`TicketMutex`], by serving the next ticket. 217 /// 218 /// # Safety 219 /// 220 /// This is *extremely* unsafe if the lock is not held by the current 221 /// thread. However, this can be useful in some instances for exposing the 222 /// lock to FFI that doesn't know how to deal with RAII. 223 #[inline(always)] force_unlock(&self)224 pub unsafe fn force_unlock(&self) { 225 self.next_serving.fetch_add(1, Ordering::Release); 226 } 227 228 /// Try to lock this [`TicketMutex`], returning a lock guard if successful. 229 /// 230 /// # Example 231 /// 232 /// ``` 233 /// let lock = spin::mutex::TicketMutex::<_>::new(42); 234 /// 235 /// let maybe_guard = lock.try_lock(); 236 /// assert!(maybe_guard.is_some()); 237 /// 238 /// // `maybe_guard` is still held, so the second call fails 239 /// let maybe_guard2 = lock.try_lock(); 240 /// assert!(maybe_guard2.is_none()); 241 /// ``` 242 #[inline(always)] try_lock(&self) -> Option<TicketMutexGuard<T>>243 pub fn try_lock(&self) -> Option<TicketMutexGuard<T>> { 244 let ticket = self 245 .next_ticket 246 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |ticket| { 247 if self.next_serving.load(Ordering::Acquire) == ticket { 248 Some(ticket + 1) 249 } else { 250 None 251 } 252 }); 253 254 ticket.ok().map(|ticket| TicketMutexGuard { 255 next_serving: &self.next_serving, 256 ticket, 257 // Safety 258 // We have a ticket that is equal to the next_serving ticket, so we know: 259 // - that no other thread can have the same ticket id as this thread 260 // - that we are the next one to be served so we have exclusive access to the data 261 data: unsafe { &mut *self.data.get() }, 262 }) 263 } 264 265 /// Returns a mutable reference to the underlying data. 266 /// 267 /// Since this call borrows the [`TicketMutex`] mutably, and a mutable reference is guaranteed to be exclusive in 268 /// Rust, no actual locking needs to take place -- the mutable borrow statically guarantees no locks exist. As 269 /// such, this is a 'zero-cost' operation. 270 /// 271 /// # Example 272 /// 273 /// ``` 274 /// let mut lock = spin::mutex::TicketMutex::<_>::new(0); 275 /// *lock.get_mut() = 10; 276 /// assert_eq!(*lock.lock(), 10); 277 /// ``` 278 #[inline(always)] get_mut(&mut self) -> &mut T279 pub fn get_mut(&mut self) -> &mut T { 280 // Safety: 281 // We know that there are no other references to `self`, 282 // so it's safe to return a exclusive reference to the data. 283 unsafe { &mut *self.data.get() } 284 } 285 } 286 287 impl<T: ?Sized + Default, R> Default for TicketMutex<T, R> { default() -> Self288 fn default() -> Self { 289 Self::new(Default::default()) 290 } 291 } 292 293 impl<T, R> From<T> for TicketMutex<T, R> { from(data: T) -> Self294 fn from(data: T) -> Self { 295 Self::new(data) 296 } 297 } 298 299 impl<'a, T: ?Sized> TicketMutexGuard<'a, T> { 300 /// Leak the lock guard, yielding a mutable reference to the underlying data. 301 /// 302 /// Note that this function will permanently lock the original [`TicketMutex`]. 303 /// 304 /// ``` 305 /// let mylock = spin::mutex::TicketMutex::<_>::new(0); 306 /// 307 /// let data: &mut i32 = spin::mutex::TicketMutexGuard::leak(mylock.lock()); 308 /// 309 /// *data = 1; 310 /// assert_eq!(*data, 1); 311 /// ``` 312 #[inline(always)] leak(this: Self) -> &'a mut T313 pub fn leak(this: Self) -> &'a mut T { 314 let data = this.data as *mut _; // Keep it in pointer form temporarily to avoid double-aliasing 315 core::mem::forget(this); 316 unsafe { &mut *data } 317 } 318 } 319 320 impl<'a, T: ?Sized + fmt::Debug> fmt::Debug for TicketMutexGuard<'a, T> { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result321 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 322 fmt::Debug::fmt(&**self, f) 323 } 324 } 325 326 impl<'a, T: ?Sized + fmt::Display> fmt::Display for TicketMutexGuard<'a, T> { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result327 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 328 fmt::Display::fmt(&**self, f) 329 } 330 } 331 332 impl<'a, T: ?Sized> Deref for TicketMutexGuard<'a, T> { 333 type Target = T; deref(&self) -> &T334 fn deref(&self) -> &T { 335 self.data 336 } 337 } 338 339 impl<'a, T: ?Sized> DerefMut for TicketMutexGuard<'a, T> { deref_mut(&mut self) -> &mut T340 fn deref_mut(&mut self) -> &mut T { 341 self.data 342 } 343 } 344 345 impl<'a, T: ?Sized> Drop for TicketMutexGuard<'a, T> { drop(&mut self)346 fn drop(&mut self) { 347 let new_ticket = self.ticket + 1; 348 self.next_serving.store(new_ticket, Ordering::Release); 349 } 350 } 351 352 #[cfg(feature = "lock_api")] 353 unsafe impl<R: RelaxStrategy> lock_api_crate::RawMutex for TicketMutex<(), R> { 354 type GuardMarker = lock_api_crate::GuardSend; 355 356 const INIT: Self = Self::new(()); 357 lock(&self)358 fn lock(&self) { 359 // Prevent guard destructor running 360 core::mem::forget(Self::lock(self)); 361 } 362 try_lock(&self) -> bool363 fn try_lock(&self) -> bool { 364 // Prevent guard destructor running 365 Self::try_lock(self).map(core::mem::forget).is_some() 366 } 367 unlock(&self)368 unsafe fn unlock(&self) { 369 self.force_unlock(); 370 } 371 is_locked(&self) -> bool372 fn is_locked(&self) -> bool { 373 Self::is_locked(self) 374 } 375 } 376 377 #[cfg(test)] 378 mod tests { 379 use std::prelude::v1::*; 380 381 use std::sync::atomic::{AtomicUsize, Ordering}; 382 use std::sync::mpsc::channel; 383 use std::sync::Arc; 384 use std::thread; 385 386 type TicketMutex<T> = super::TicketMutex<T>; 387 388 #[derive(Eq, PartialEq, Debug)] 389 struct NonCopy(i32); 390 391 #[test] smoke()392 fn smoke() { 393 let m = TicketMutex::<_>::new(()); 394 drop(m.lock()); 395 drop(m.lock()); 396 } 397 398 #[test] lots_and_lots()399 fn lots_and_lots() { 400 static M: TicketMutex<()> = TicketMutex::<_>::new(()); 401 static mut CNT: u32 = 0; 402 const J: u32 = 1000; 403 const K: u32 = 3; 404 405 fn inc() { 406 for _ in 0..J { 407 unsafe { 408 let _g = M.lock(); 409 CNT += 1; 410 } 411 } 412 } 413 414 let (tx, rx) = channel(); 415 for _ in 0..K { 416 let tx2 = tx.clone(); 417 thread::spawn(move || { 418 inc(); 419 tx2.send(()).unwrap(); 420 }); 421 let tx2 = tx.clone(); 422 thread::spawn(move || { 423 inc(); 424 tx2.send(()).unwrap(); 425 }); 426 } 427 428 drop(tx); 429 for _ in 0..2 * K { 430 rx.recv().unwrap(); 431 } 432 assert_eq!(unsafe { CNT }, J * K * 2); 433 } 434 435 #[test] try_lock()436 fn try_lock() { 437 let mutex = TicketMutex::<_>::new(42); 438 439 // First lock succeeds 440 let a = mutex.try_lock(); 441 assert_eq!(a.as_ref().map(|r| **r), Some(42)); 442 443 // Additional lock failes 444 let b = mutex.try_lock(); 445 assert!(b.is_none()); 446 447 // After dropping lock, it succeeds again 448 ::core::mem::drop(a); 449 let c = mutex.try_lock(); 450 assert_eq!(c.as_ref().map(|r| **r), Some(42)); 451 } 452 453 #[test] test_into_inner()454 fn test_into_inner() { 455 let m = TicketMutex::<_>::new(NonCopy(10)); 456 assert_eq!(m.into_inner(), NonCopy(10)); 457 } 458 459 #[test] test_into_inner_drop()460 fn test_into_inner_drop() { 461 struct Foo(Arc<AtomicUsize>); 462 impl Drop for Foo { 463 fn drop(&mut self) { 464 self.0.fetch_add(1, Ordering::SeqCst); 465 } 466 } 467 let num_drops = Arc::new(AtomicUsize::new(0)); 468 let m = TicketMutex::<_>::new(Foo(num_drops.clone())); 469 assert_eq!(num_drops.load(Ordering::SeqCst), 0); 470 { 471 let _inner = m.into_inner(); 472 assert_eq!(num_drops.load(Ordering::SeqCst), 0); 473 } 474 assert_eq!(num_drops.load(Ordering::SeqCst), 1); 475 } 476 477 #[test] test_mutex_arc_nested()478 fn test_mutex_arc_nested() { 479 // Tests nested mutexes and access 480 // to underlying data. 481 let arc = Arc::new(TicketMutex::<_>::new(1)); 482 let arc2 = Arc::new(TicketMutex::<_>::new(arc)); 483 let (tx, rx) = channel(); 484 let _t = thread::spawn(move || { 485 let lock = arc2.lock(); 486 let lock2 = lock.lock(); 487 assert_eq!(*lock2, 1); 488 tx.send(()).unwrap(); 489 }); 490 rx.recv().unwrap(); 491 } 492 493 #[test] test_mutex_arc_access_in_unwind()494 fn test_mutex_arc_access_in_unwind() { 495 let arc = Arc::new(TicketMutex::<_>::new(1)); 496 let arc2 = arc.clone(); 497 let _ = thread::spawn(move || -> () { 498 struct Unwinder { 499 i: Arc<TicketMutex<i32>>, 500 } 501 impl Drop for Unwinder { 502 fn drop(&mut self) { 503 *self.i.lock() += 1; 504 } 505 } 506 let _u = Unwinder { i: arc2 }; 507 panic!(); 508 }) 509 .join(); 510 let lock = arc.lock(); 511 assert_eq!(*lock, 2); 512 } 513 514 #[test] test_mutex_unsized()515 fn test_mutex_unsized() { 516 let mutex: &TicketMutex<[i32]> = &TicketMutex::<_>::new([1, 2, 3]); 517 { 518 let b = &mut *mutex.lock(); 519 b[0] = 4; 520 b[2] = 5; 521 } 522 let comp: &[i32] = &[4, 2, 5]; 523 assert_eq!(&*mutex.lock(), comp); 524 } 525 526 #[test] is_locked()527 fn is_locked() { 528 let mutex = TicketMutex::<_>::new(()); 529 assert!(!mutex.is_locked()); 530 let lock = mutex.lock(); 531 assert!(mutex.is_locked()); 532 drop(lock); 533 assert!(!mutex.is_locked()); 534 } 535 } 536