1 use crate::sync::batch_semaphore as semaphore;
2 
3 use std::cell::UnsafeCell;
4 use std::error::Error;
5 use std::fmt;
6 use std::ops::{Deref, DerefMut};
7 use std::sync::Arc;
8 
9 /// An asynchronous `Mutex`-like type.
10 ///
11 /// This type acts similarly to an asynchronous [`std::sync::Mutex`], with one
12 /// major difference: [`lock`] does not block and the lock guard can be held
13 /// across await points.
14 ///
15 /// # Which kind of mutex should you use?
16 ///
17 /// Contrary to popular belief, it is ok and often preferred to use the ordinary
18 /// [`Mutex`][std] from the standard library in asynchronous code. This section
19 /// will help you decide on which kind of mutex you should use.
20 ///
21 /// The primary use case of the async mutex is to provide shared mutable access
22 /// to IO resources such as a database connection. If the data stored behind the
23 /// mutex is just data, it is often better to use a blocking mutex such as the
24 /// one in the standard library or [`parking_lot`]. This is because the feature
25 /// that the async mutex offers over the blocking mutex is that it is possible
26 /// to keep the mutex locked across an `.await` point, which is rarely necessary
27 /// for data.
28 ///
29 /// A common pattern is to wrap the `Arc<Mutex<...>>` in a struct that provides
30 /// non-async methods for performing operations on the data within, and only
31 /// lock the mutex inside these methods. The [mini-redis] example provides an
32 /// illustration of this pattern.
33 ///
34 /// Additionally, when you _do_ want shared access to an IO resource, it is
35 /// often better to spawn a task to manage the IO resource, and to use message
36 /// passing to communicate with that task.
37 ///
38 /// [std]: std::sync::Mutex
39 /// [`parking_lot`]: https://docs.rs/parking_lot
40 /// [mini-redis]: https://github.com/tokio-rs/mini-redis/blob/master/src/db.rs
41 ///
42 /// # Examples:
43 ///
44 /// ```rust,no_run
45 /// use tokio::sync::Mutex;
46 /// use std::sync::Arc;
47 ///
48 /// #[tokio::main]
49 /// async fn main() {
50 ///     let data1 = Arc::new(Mutex::new(0));
51 ///     let data2 = Arc::clone(&data1);
52 ///
53 ///     tokio::spawn(async move {
54 ///         let mut lock = data2.lock().await;
55 ///         *lock += 1;
56 ///     });
57 ///
58 ///     let mut lock = data1.lock().await;
59 ///     *lock += 1;
60 /// }
61 /// ```
62 ///
63 ///
64 /// ```rust,no_run
65 /// use tokio::sync::Mutex;
66 /// use std::sync::Arc;
67 ///
68 /// #[tokio::main]
69 /// async fn main() {
70 ///     let count = Arc::new(Mutex::new(0));
71 ///
72 ///     for _ in 0..5 {
73 ///         let my_count = Arc::clone(&count);
74 ///         tokio::spawn(async move {
75 ///             for _ in 0..10 {
76 ///                 let mut lock = my_count.lock().await;
77 ///                 *lock += 1;
78 ///                 println!("{}", lock);
79 ///             }
80 ///         });
81 ///     }
82 ///
83 ///     loop {
84 ///         if *count.lock().await >= 50 {
85 ///             break;
86 ///         }
87 ///     }
88 ///     println!("Count hit 50.");
89 /// }
90 /// ```
91 /// There are a few things of note here to pay attention to in this example.
92 /// 1. The mutex is wrapped in an [`Arc`] to allow it to be shared across
93 ///    threads.
94 /// 2. Each spawned task obtains a lock and releases it on every iteration.
95 /// 3. Mutation of the data protected by the Mutex is done by de-referencing
96 ///    the obtained lock as seen on lines 12 and 19.
97 ///
98 /// Tokio's Mutex works in a simple FIFO (first in, first out) style where all
99 /// calls to [`lock`] complete in the order they were performed. In that way the
100 /// Mutex is "fair" and predictable in how it distributes the locks to inner
101 /// data. This is why the output of the program above is an in-order count to
102 /// 50. Locks are released and reacquired after every iteration, so basically,
103 /// each thread goes to the back of the line after it increments the value once.
104 /// Finally, since there is only a single valid lock at any given time, there is
105 /// no possibility of a race condition when mutating the inner value.
106 ///
107 /// Note that in contrast to [`std::sync::Mutex`], this implementation does not
108 /// poison the mutex when a thread holding the [`MutexGuard`] panics. In such a
109 /// case, the mutex will be unlocked. If the panic is caught, this might leave
110 /// the data protected by the mutex in an inconsistent state.
111 ///
112 /// [`Mutex`]: struct@Mutex
113 /// [`MutexGuard`]: struct@MutexGuard
114 /// [`Arc`]: struct@std::sync::Arc
115 /// [`std::sync::Mutex`]: struct@std::sync::Mutex
116 /// [`Send`]: trait@std::marker::Send
117 /// [`lock`]: method@Mutex::lock
118 pub struct Mutex<T: ?Sized> {
119     s: semaphore::Semaphore,
120     c: UnsafeCell<T>,
121 }
122 
123 /// A handle to a held `Mutex`.
124 ///
125 /// As long as you have this guard, you have exclusive access to the underlying
126 /// `T`. The guard internally borrows the `Mutex`, so the mutex will not be
127 /// dropped while a guard exists.
128 ///
129 /// The lock is automatically released whenever the guard is dropped, at which
130 /// point `lock` will succeed yet again.
131 pub struct MutexGuard<'a, T: ?Sized> {
132     lock: &'a Mutex<T>,
133 }
134 
135 /// An owned handle to a held `Mutex`.
136 ///
137 /// This guard is only available from a `Mutex` that is wrapped in an [`Arc`]. It
138 /// is identical to `MutexGuard`, except that rather than borrowing the `Mutex`,
139 /// it clones the `Arc`, incrementing the reference count. This means that
140 /// unlike `MutexGuard`, it will have the `'static` lifetime.
141 ///
142 /// As long as you have this guard, you have exclusive access to the underlying
143 /// `T`. The guard internally keeps a reference-couned pointer to the original
144 /// `Mutex`, so even if the lock goes away, the guard remains valid.
145 ///
146 /// The lock is automatically released whenever the guard is dropped, at which
147 /// point `lock` will succeed yet again.
148 ///
149 /// [`Arc`]: std::sync::Arc
150 pub struct OwnedMutexGuard<T: ?Sized> {
151     lock: Arc<Mutex<T>>,
152 }
153 
154 // As long as T: Send, it's fine to send and share Mutex<T> between threads.
155 // If T was not Send, sending and sharing a Mutex<T> would be bad, since you can
156 // access T through Mutex<T>.
157 unsafe impl<T> Send for Mutex<T> where T: ?Sized + Send {}
158 unsafe impl<T> Sync for Mutex<T> where T: ?Sized + Send {}
159 unsafe impl<T> Sync for MutexGuard<'_, T> where T: ?Sized + Send + Sync {}
160 unsafe impl<T> Sync for OwnedMutexGuard<T> where T: ?Sized + Send + Sync {}
161 
162 /// Error returned from the [`Mutex::try_lock`] function.
163 ///
164 /// A `try_lock` operation can only fail if the mutex is already locked.
165 ///
166 /// [`Mutex::try_lock`]: Mutex::try_lock
167 #[derive(Debug)]
168 pub struct TryLockError(());
169 
170 impl fmt::Display for TryLockError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result171     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
172         write!(fmt, "operation would block")
173     }
174 }
175 
176 impl Error for TryLockError {}
177 
178 #[test]
179 #[cfg(not(loom))]
bounds()180 fn bounds() {
181     fn check_send<T: Send>() {}
182     fn check_unpin<T: Unpin>() {}
183     // This has to take a value, since the async fn's return type is unnameable.
184     fn check_send_sync_val<T: Send + Sync>(_t: T) {}
185     fn check_send_sync<T: Send + Sync>() {}
186     fn check_static<T: 'static>() {}
187     fn check_static_val<T: 'static>(_t: T) {}
188 
189     check_send::<MutexGuard<'_, u32>>();
190     check_send::<OwnedMutexGuard<u32>>();
191     check_unpin::<Mutex<u32>>();
192     check_send_sync::<Mutex<u32>>();
193     check_static::<OwnedMutexGuard<u32>>();
194 
195     let mutex = Mutex::new(1);
196     check_send_sync_val(mutex.lock());
197     let arc_mutex = Arc::new(Mutex::new(1));
198     check_send_sync_val(arc_mutex.clone().lock_owned());
199     check_static_val(arc_mutex.lock_owned());
200 }
201 
202 impl<T: ?Sized> Mutex<T> {
203     /// Creates a new lock in an unlocked state ready for use.
204     ///
205     /// # Examples
206     ///
207     /// ```
208     /// use tokio::sync::Mutex;
209     ///
210     /// let lock = Mutex::new(5);
211     /// ```
new(t: T) -> Self where T: Sized,212     pub fn new(t: T) -> Self
213     where
214         T: Sized,
215     {
216         Self {
217             c: UnsafeCell::new(t),
218             s: semaphore::Semaphore::new(1),
219         }
220     }
221 
222     /// Locks this mutex, causing the current task
223     /// to yield until the lock has been acquired.
224     /// When the lock has been acquired, function returns a [`MutexGuard`].
225     ///
226     /// # Examples
227     ///
228     /// ```
229     /// use tokio::sync::Mutex;
230     ///
231     /// #[tokio::main]
232     /// async fn main() {
233     ///     let mutex = Mutex::new(1);
234     ///
235     ///     let mut n = mutex.lock().await;
236     ///     *n = 2;
237     /// }
238     /// ```
lock(&self) -> MutexGuard<'_, T>239     pub async fn lock(&self) -> MutexGuard<'_, T> {
240         self.acquire().await;
241         MutexGuard { lock: self }
242     }
243 
244     /// Locks this mutex, causing the current task to yield until the lock has
245     /// been acquired. When the lock has been acquired, this returns an
246     /// [`OwnedMutexGuard`].
247     ///
248     /// This method is identical to [`Mutex::lock`], except that the returned
249     /// guard references the `Mutex` with an [`Arc`] rather than by borrowing
250     /// it. Therefore, the `Mutex` must be wrapped in an `Arc` to call this
251     /// method, and the guard will live for the `'static` lifetime, as it keeps
252     /// the `Mutex` alive by holding an `Arc`.
253     ///
254     /// # Examples
255     ///
256     /// ```
257     /// use tokio::sync::Mutex;
258     /// use std::sync::Arc;
259     ///
260     /// #[tokio::main]
261     /// async fn main() {
262     ///     let mutex = Arc::new(Mutex::new(1));
263     ///
264     ///     let mut n = mutex.clone().lock_owned().await;
265     ///     *n = 2;
266     /// }
267     /// ```
268     ///
269     /// [`Arc`]: std::sync::Arc
lock_owned(self: Arc<Self>) -> OwnedMutexGuard<T>270     pub async fn lock_owned(self: Arc<Self>) -> OwnedMutexGuard<T> {
271         self.acquire().await;
272         OwnedMutexGuard { lock: self }
273     }
274 
acquire(&self)275     async fn acquire(&self) {
276         self.s.acquire(1).await.unwrap_or_else(|_| {
277             // The semaphore was closed. but, we never explicitly close it, and
278             // we own it exclusively, which means that this can never happen.
279             unreachable!()
280         });
281     }
282 
283     /// Attempts to acquire the lock, and returns [`TryLockError`] if the
284     /// lock is currently held somewhere else.
285     ///
286     /// [`TryLockError`]: TryLockError
287     /// # Examples
288     ///
289     /// ```
290     /// use tokio::sync::Mutex;
291     /// # async fn dox() -> Result<(), tokio::sync::TryLockError> {
292     ///
293     /// let mutex = Mutex::new(1);
294     ///
295     /// let n = mutex.try_lock()?;
296     /// assert_eq!(*n, 1);
297     /// # Ok(())
298     /// # }
299     /// ```
try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError>300     pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> {
301         match self.s.try_acquire(1) {
302             Ok(_) => Ok(MutexGuard { lock: self }),
303             Err(_) => Err(TryLockError(())),
304         }
305     }
306 
307     /// Attempts to acquire the lock, and returns [`TryLockError`] if the lock
308     /// is currently held somewhere else.
309     ///
310     /// This method is identical to [`Mutex::try_lock`], except that the
311     /// returned  guard references the `Mutex` with an [`Arc`] rather than by
312     /// borrowing it. Therefore, the `Mutex` must be wrapped in an `Arc` to call
313     /// this method, and the guard will live for the `'static` lifetime, as it
314     /// keeps the `Mutex` alive by holding an `Arc`.
315     ///
316     /// [`TryLockError`]: TryLockError
317     /// [`Arc`]: std::sync::Arc
318     /// # Examples
319     ///
320     /// ```
321     /// use tokio::sync::Mutex;
322     /// use std::sync::Arc;
323     /// # async fn dox() -> Result<(), tokio::sync::TryLockError> {
324     ///
325     /// let mutex = Arc::new(Mutex::new(1));
326     ///
327     /// let n = mutex.clone().try_lock_owned()?;
328     /// assert_eq!(*n, 1);
329     /// # Ok(())
330     /// # }
try_lock_owned(self: Arc<Self>) -> Result<OwnedMutexGuard<T>, TryLockError>331     pub fn try_lock_owned(self: Arc<Self>) -> Result<OwnedMutexGuard<T>, TryLockError> {
332         match self.s.try_acquire(1) {
333             Ok(_) => Ok(OwnedMutexGuard { lock: self }),
334             Err(_) => Err(TryLockError(())),
335         }
336     }
337 
338     /// Consumes the mutex, returning the underlying data.
339     /// # Examples
340     ///
341     /// ```
342     /// use tokio::sync::Mutex;
343     ///
344     /// #[tokio::main]
345     /// async fn main() {
346     ///     let mutex = Mutex::new(1);
347     ///
348     ///     let n = mutex.into_inner();
349     ///     assert_eq!(n, 1);
350     /// }
351     /// ```
into_inner(self) -> T where T: Sized,352     pub fn into_inner(self) -> T
353     where
354         T: Sized,
355     {
356         self.c.into_inner()
357     }
358 }
359 
360 impl<T> From<T> for Mutex<T> {
from(s: T) -> Self361     fn from(s: T) -> Self {
362         Self::new(s)
363     }
364 }
365 
366 impl<T> Default for Mutex<T>
367 where
368     T: Default,
369 {
default() -> Self370     fn default() -> Self {
371         Self::new(T::default())
372     }
373 }
374 
375 impl<T> std::fmt::Debug for Mutex<T>
376 where
377     T: std::fmt::Debug,
378 {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result379     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
380         let mut d = f.debug_struct("Mutex");
381         match self.try_lock() {
382             Ok(inner) => d.field("data", &*inner),
383             Err(_) => d.field("data", &format_args!("<locked>")),
384         };
385         d.finish()
386     }
387 }
388 
389 // === impl MutexGuard ===
390 
391 impl<T: ?Sized> Drop for MutexGuard<'_, T> {
drop(&mut self)392     fn drop(&mut self) {
393         self.lock.s.release(1)
394     }
395 }
396 
397 impl<T: ?Sized> Deref for MutexGuard<'_, T> {
398     type Target = T;
deref(&self) -> &Self::Target399     fn deref(&self) -> &Self::Target {
400         unsafe { &*self.lock.c.get() }
401     }
402 }
403 
404 impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
deref_mut(&mut self) -> &mut Self::Target405     fn deref_mut(&mut self) -> &mut Self::Target {
406         unsafe { &mut *self.lock.c.get() }
407     }
408 }
409 
410 impl<T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result411     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
412         fmt::Debug::fmt(&**self, f)
413     }
414 }
415 
416 impl<T: ?Sized + fmt::Display> fmt::Display for MutexGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result417     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
418         fmt::Display::fmt(&**self, f)
419     }
420 }
421 
422 // === impl OwnedMutexGuard ===
423 
424 impl<T: ?Sized> Drop for OwnedMutexGuard<T> {
drop(&mut self)425     fn drop(&mut self) {
426         self.lock.s.release(1)
427     }
428 }
429 
430 impl<T: ?Sized> Deref for OwnedMutexGuard<T> {
431     type Target = T;
deref(&self) -> &Self::Target432     fn deref(&self) -> &Self::Target {
433         unsafe { &*self.lock.c.get() }
434     }
435 }
436 
437 impl<T: ?Sized> DerefMut for OwnedMutexGuard<T> {
deref_mut(&mut self) -> &mut Self::Target438     fn deref_mut(&mut self) -> &mut Self::Target {
439         unsafe { &mut *self.lock.c.get() }
440     }
441 }
442 
443 impl<T: ?Sized + fmt::Debug> fmt::Debug for OwnedMutexGuard<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result444     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
445         fmt::Debug::fmt(&**self, f)
446     }
447 }
448 
449 impl<T: ?Sized + fmt::Display> fmt::Display for OwnedMutexGuard<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result450     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
451         fmt::Display::fmt(&**self, f)
452     }
453 }
454