1 //! An asynchronous `Mutex`-like type.
2 //!
3 //! This module provides [`Mutex`], a type that acts similarly to an asynchronous `Mutex`, with one
4 //! major difference: the [`MutexGuard`] returned by `lock` is not tied to the lifetime of the
5 //! `Mutex`. This enables you to acquire a lock, and then pass that guard into a future, and then
6 //! release it at some later point in time.
7 //!
8 //! This allows you to do something along the lines of:
9 //!
10 //! ```rust,no_run
11 //! use tokio::sync::Mutex;
12 //! use std::sync::Arc;
13 //!
14 //! #[tokio::main]
15 //! async fn main() {
16 //!     let data1 = Arc::new(Mutex::new(0));
17 //!     let data2 = Arc::clone(&data1);
18 //!
19 //!     tokio::spawn(async move {
20 //!         let mut lock = data2.lock().await;
21 //!         *lock += 1;
22 //!     });
23 //!
24 //!     let mut lock = data1.lock().await;
25 //!     *lock += 1;
26 //! }
27 //! ```
28 //!
29 //! Another example
30 //! ```rust,no_run
31 //! #![warn(rust_2018_idioms)]
32 //!
33 //! use tokio::sync::Mutex;
34 //! use std::sync::Arc;
35 //!
36 //!
37 //! #[tokio::main]
38 //! async fn main() {
39 //!    let count = Arc::new(Mutex::new(0));
40 //!
41 //!    for _ in 0..5 {
42 //!        let my_count = Arc::clone(&count);
43 //!        tokio::spawn(async move {
44 //!            for _ in 0..10 {
45 //!                let mut lock = my_count.lock().await;
46 //!                *lock += 1;
47 //!                println!("{}", lock);
48 //!            }
49 //!        });
50 //!    }
51 //!
52 //!    loop {
53 //!        if *count.lock().await >= 50 {
54 //!            break;
55 //!        }
56 //!    }
57 //!   println!("Count hit 50.");
58 //! }
59 //! ```
60 //! There are a few things of note here to pay attention to in this example.
61 //! 1. The mutex is wrapped in an [`std::sync::Arc`] to allow it to be shared across threads.
62 //! 2. Each spawned task obtains a lock and releases it on every iteration.
63 //! 3. Mutation of the data the Mutex is protecting is done by de-referencing the the obtained lock
64 //!    as seen on lines 23 and 30.
65 //!
66 //! Tokio's Mutex works in a simple FIFO (first in, first out) style where as requests for a lock are
67 //! made Tokio will queue them up and provide a lock when it is that requester's turn. In that way
68 //! the Mutex is "fair" and predictable in how it distributes the locks to inner data. This is why
69 //! the output of this program is an in-order count to 50. Locks are released and reacquired
70 //! after every iteration, so basically, each thread goes to the back of the line after it increments
71 //! the value once. Also, since there is only a single valid lock at any given time there is no
72 //! possibility of a race condition when mutating the inner value.
73 //!
74 //! Note that in contrast to `std::sync::Mutex`, this implementation does not
75 //! poison the mutex when a thread holding the `MutexGuard` panics. In such a
76 //! case, the mutex will be unlocked. If the panic is caught, this might leave
77 //! the data protected by the mutex in an inconsistent state.
78 //!
79 //! [`Mutex`]: struct@Mutex
80 //! [`MutexGuard`]: struct@MutexGuard
81 use crate::coop::CoopFutureExt;
82 use crate::sync::batch_semaphore as semaphore;
83 
84 use std::cell::UnsafeCell;
85 use std::error::Error;
86 use std::fmt;
87 use std::ops::{Deref, DerefMut};
88 
89 /// An asynchronous mutual exclusion primitive useful for protecting shared data
90 ///
91 /// Each mutex has a type parameter (`T`) which represents the data that it is protecting. The data
92 /// can only be accessed through the RAII guards returned from `lock`, which
93 /// guarantees that the data is only ever accessed when the mutex is locked.
94 #[derive(Debug)]
95 pub struct Mutex<T> {
96     c: UnsafeCell<T>,
97     s: semaphore::Semaphore,
98 }
99 
100 /// A handle to a held `Mutex`.
101 ///
102 /// As long as you have this guard, you have exclusive access to the underlying `T`. The guard
103 /// internally keeps a reference-couned pointer to the original `Mutex`, so even if the lock goes
104 /// away, the guard remains valid.
105 ///
106 /// The lock is automatically released whenever the guard is dropped, at which point `lock`
107 /// will succeed yet again.
108 pub struct MutexGuard<'a, T> {
109     lock: &'a Mutex<T>,
110 }
111 
112 // As long as T: Send, it's fine to send and share Mutex<T> between threads.
113 // If T was not Send, sending and sharing a Mutex<T> would be bad, since you can access T through
114 // Mutex<T>.
115 unsafe impl<T> Send for Mutex<T> where T: Send {}
116 unsafe impl<T> Sync for Mutex<T> where T: Send {}
117 unsafe impl<'a, T> Sync for MutexGuard<'a, T> where T: Send + Sync {}
118 
119 /// Error returned from the [`Mutex::try_lock`] function.
120 ///
121 /// A `try_lock` operation can only fail if the mutex is already locked.
122 ///
123 /// [`Mutex::try_lock`]: Mutex::try_lock
124 #[derive(Debug)]
125 pub struct TryLockError(());
126 
127 impl fmt::Display for TryLockError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result128     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
129         write!(fmt, "{}", "operation would block")
130     }
131 }
132 
133 impl Error for TryLockError {}
134 
135 #[test]
136 #[cfg(not(loom))]
bounds()137 fn bounds() {
138     fn check_send<T: Send>() {}
139     fn check_unpin<T: Unpin>() {}
140     // This has to take a value, since the async fn's return type is unnameable.
141     fn check_send_sync_val<T: Send + Sync>(_t: T) {}
142     fn check_send_sync<T: Send + Sync>() {}
143     check_send::<MutexGuard<'_, u32>>();
144     check_unpin::<Mutex<u32>>();
145     check_send_sync::<Mutex<u32>>();
146 
147     let mutex = Mutex::new(1);
148     check_send_sync_val(mutex.lock());
149 }
150 
151 impl<T> Mutex<T> {
152     /// Creates a new lock in an unlocked state ready for use.
new(t: T) -> Self153     pub fn new(t: T) -> Self {
154         Self {
155             c: UnsafeCell::new(t),
156             s: semaphore::Semaphore::new(1),
157         }
158     }
159 
160     /// A future that resolves on acquiring the lock and returns the `MutexGuard`.
lock(&self) -> MutexGuard<'_, T>161     pub async fn lock(&self) -> MutexGuard<'_, T> {
162         self.s.acquire(1).cooperate().await.unwrap_or_else(|_| {
163             // The semaphore was closed. but, we never explicitly close it, and we have a
164             // handle to it through the Arc, which means that this can never happen.
165             unreachable!()
166         });
167         MutexGuard { lock: self }
168     }
169 
170     /// Tries to acquire the lock
try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError>171     pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> {
172         match self.s.try_acquire(1) {
173             Ok(_) => Ok(MutexGuard { lock: self }),
174             Err(_) => Err(TryLockError(())),
175         }
176     }
177 
178     /// Consumes the mutex, returning the underlying data.
into_inner(self) -> T179     pub fn into_inner(self) -> T {
180         self.c.into_inner()
181     }
182 }
183 
184 impl<'a, T> Drop for MutexGuard<'a, T> {
drop(&mut self)185     fn drop(&mut self) {
186         self.lock.s.release(1)
187     }
188 }
189 
190 impl<T> From<T> for Mutex<T> {
from(s: T) -> Self191     fn from(s: T) -> Self {
192         Self::new(s)
193     }
194 }
195 
196 impl<T> Default for Mutex<T>
197 where
198     T: Default,
199 {
default() -> Self200     fn default() -> Self {
201         Self::new(T::default())
202     }
203 }
204 
205 impl<'a, T> Deref for MutexGuard<'a, T> {
206     type Target = T;
deref(&self) -> &Self::Target207     fn deref(&self) -> &Self::Target {
208         unsafe { &*self.lock.c.get() }
209     }
210 }
211 
212 impl<'a, T> DerefMut for MutexGuard<'a, T> {
deref_mut(&mut self) -> &mut Self::Target213     fn deref_mut(&mut self) -> &mut Self::Target {
214         unsafe { &mut *self.lock.c.get() }
215     }
216 }
217 
218 impl<'a, T: fmt::Debug> fmt::Debug for MutexGuard<'a, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result219     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220         fmt::Debug::fmt(&**self, f)
221     }
222 }
223 
224 impl<'a, T: fmt::Display> fmt::Display for MutexGuard<'a, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result225     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
226         fmt::Display::fmt(&**self, f)
227     }
228 }
229