1 //! An asynchronous `Mutex`-like type.
2 //!
3 //! This module provides [`Lock`], a type that acts similarly to an asynchronous `Mutex`, with one
4 //! major difference: the [`LockGuard`] returned by `poll_lock` is not tied to the lifetime of the
5 //! `Mutex`. This enables you to acquire a lock, and then pass that guard into a future, and then
6 //! release it at some later point in time.
7 //!
8 //! This allows you to do something along the lines of:
9 //!
10 //! ```rust,no_run
11 //! # #[macro_use]
12 //! # extern crate futures;
13 //! # extern crate tokio;
14 //! # use futures::{future, Poll, Async, Future, Stream};
15 //! use tokio::sync::lock::{Lock, LockGuard};
16 //! struct MyType<S> {
17 //!     lock: Lock<S>,
18 //! }
19 //!
20 //! impl<S> Future for MyType<S>
21 //!   where S: Stream<Item = u32> + Send + 'static
22 //! {
23 //!     type Item = ();
24 //!     type Error = ();
25 //!
26 //!     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
27 //!         match self.lock.poll_lock() {
28 //!             Async::Ready(mut guard) => {
29 //!                 tokio::spawn(future::poll_fn(move || {
30 //!                     let item = try_ready!(guard.poll().map_err(|_| ()));
31 //!                     println!("item = {:?}", item);
32 //!                     Ok(().into())
33 //!                 }));
34 //!                 Ok(().into())
35 //!             },
36 //!             Async::NotReady => Ok(Async::NotReady)
37 //!         }
38 //!     }
39 //! }
40 //! # fn main() {}
41 //! ```
42 //!
43 //!   [`Lock`]: struct.Lock.html
44 //!   [`LockGuard`]: struct.LockGuard.html
45 
46 use futures::Async;
47 use semaphore;
48 use std::cell::UnsafeCell;
49 use std::fmt;
50 use std::ops::{Deref, DerefMut};
51 use std::sync::Arc;
52 
53 /// An asynchronous mutual exclusion primitive useful for protecting shared data
54 ///
55 /// Each mutex has a type parameter (`T`) which represents the data that it is protecting. The data
56 /// can only be accessed through the RAII guards returned from `poll_lock`, which guarantees that
57 /// the data is only ever accessed when the mutex is locked.
58 #[derive(Debug)]
59 pub struct Lock<T> {
60     inner: Arc<State<T>>,
61     permit: semaphore::Permit,
62 }
63 
64 /// A handle to a held `Lock`.
65 ///
66 /// As long as you have this guard, you have exclusive access to the underlying `T`. The guard
67 /// internally keeps a reference-couned pointer to the original `Lock`, so even if the lock goes
68 /// away, the guard remains valid.
69 ///
70 /// The lock is automatically released whenever the guard is dropped, at which point `poll_lock`
71 /// will succeed yet again.
72 #[derive(Debug)]
73 pub struct LockGuard<T>(Lock<T>);
74 
75 // As long as T: Send, it's fine to send and share Lock<T> between threads.
76 // If T was not Send, sending and sharing a Lock<T> would be bad, since you can access T through
77 // Lock<T>.
78 unsafe impl<T> Send for Lock<T> where T: Send {}
79 unsafe impl<T> Sync for Lock<T> where T: Send {}
80 unsafe impl<T> Sync for LockGuard<T> where T: Send + Sync {}
81 
82 #[derive(Debug)]
83 struct State<T> {
84     c: UnsafeCell<T>,
85     s: semaphore::Semaphore,
86 }
87 
88 #[test]
bounds()89 fn bounds() {
90     fn check<T: Send>() {}
91     check::<LockGuard<u32>>();
92 }
93 
94 impl<T> Lock<T> {
95     /// Creates a new lock in an unlocked state ready for use.
new(t: T) -> Self96     pub fn new(t: T) -> Self {
97         Self {
98             inner: Arc::new(State {
99                 c: UnsafeCell::new(t),
100                 s: semaphore::Semaphore::new(1),
101             }),
102             permit: semaphore::Permit::new(),
103         }
104     }
105 
106     /// Try to acquire the lock.
107     ///
108     /// If the lock is already held, the current task is notified when it is released.
poll_lock(&mut self) -> Async<LockGuard<T>>109     pub fn poll_lock(&mut self) -> Async<LockGuard<T>> {
110         if let Async::NotReady = self.permit.poll_acquire(&self.inner.s).unwrap_or_else(|_| {
111             // The semaphore was closed. but, we never explicitly close it, and we have a
112             // handle to it through the Arc, which means that this can never happen.
113             unreachable!()
114         }) {
115             return Async::NotReady;
116         }
117 
118         // We want to move the acquired permit into the guard,
119         // and leave an unacquired one in self.
120         let acquired = Self {
121             inner: self.inner.clone(),
122             permit: ::std::mem::replace(&mut self.permit, semaphore::Permit::new()),
123         };
124         Async::Ready(LockGuard(acquired))
125     }
126 }
127 
128 impl<T> Drop for LockGuard<T> {
drop(&mut self)129     fn drop(&mut self) {
130         if self.0.permit.is_acquired() {
131             self.0.permit.release(&self.0.inner.s);
132         } else if ::std::thread::panicking() {
133             // A guard _should_ always hold its permit, but if the thread is already panicking,
134             // we don't want to generate a panic-while-panicing, since that's just unhelpful!
135         } else {
136             unreachable!("Permit not held when LockGuard was dropped")
137         }
138     }
139 }
140 
141 impl<T> From<T> for Lock<T> {
from(s: T) -> Self142     fn from(s: T) -> Self {
143         Self::new(s)
144     }
145 }
146 
147 impl<T> Clone for Lock<T> {
clone(&self) -> Self148     fn clone(&self) -> Self {
149         Self {
150             inner: self.inner.clone(),
151             permit: semaphore::Permit::new(),
152         }
153     }
154 }
155 
156 impl<T> Default for Lock<T>
157 where
158     T: Default,
159 {
default() -> Self160     fn default() -> Self {
161         Self::new(T::default())
162     }
163 }
164 
165 impl<T> Deref for LockGuard<T> {
166     type Target = T;
deref(&self) -> &Self::Target167     fn deref(&self) -> &Self::Target {
168         assert!(self.0.permit.is_acquired());
169         unsafe { &*self.0.inner.c.get() }
170     }
171 }
172 
173 impl<T> DerefMut for LockGuard<T> {
deref_mut(&mut self) -> &mut Self::Target174     fn deref_mut(&mut self) -> &mut Self::Target {
175         assert!(self.0.permit.is_acquired());
176         unsafe { &mut *self.0.inner.c.get() }
177     }
178 }
179 
180 impl<T: fmt::Display> fmt::Display for LockGuard<T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result181     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
182         fmt::Display::fmt(&**self, f)
183     }
184 }
185