1 use super::{Enter, Executor, SpawnError};
2 
3 use futures::{future, Future};
4 
5 use std::cell::Cell;
6 
7 /// Executes futures on the default executor for the current execution context.
8 ///
9 /// `DefaultExecutor` implements `Executor` and can be used to spawn futures
10 /// without referencing a specific executor.
11 ///
12 /// When an executor starts, it sets the `DefaultExecutor` handle to point to an
13 /// executor (usually itself) that is used to spawn new tasks.
14 ///
15 /// The current `DefaultExecutor` reference is tracked using a thread-local
16 /// variable and is set using `tokio_executor::with_default`
17 #[derive(Debug, Clone)]
18 pub struct DefaultExecutor {
19     _dummy: (),
20 }
21 
22 /// Ensures that the executor is removed from the thread-local context
23 /// when leaving the scope. This handles cases that involve panicking.
24 #[derive(Debug)]
25 pub struct DefaultGuard {
26     _p: (),
27 }
28 
29 impl DefaultExecutor {
30     /// Returns a handle to the default executor for the current context.
31     ///
32     /// Futures may be spawned onto the default executor using this handle.
33     ///
34     /// The returned handle will reference whichever executor is configured as
35     /// the default **at the time `spawn` is called**. This enables
36     /// `DefaultExecutor::current()` to be called before an execution context is
37     /// setup, then passed **into** an execution context before it is used.
38     ///
39     /// This is also true for sending the handle across threads, so calling
40     /// `DefaultExecutor::current()` on thread A and then sending the result to
41     /// thread B will _not_ reference the default executor that was set on thread A.
current() -> DefaultExecutor42     pub fn current() -> DefaultExecutor {
43         DefaultExecutor { _dummy: () }
44     }
45 
46     #[inline]
with_current<F: FnOnce(&mut dyn Executor) -> R, R>(f: F) -> Option<R>47     fn with_current<F: FnOnce(&mut dyn Executor) -> R, R>(f: F) -> Option<R> {
48         EXECUTOR.with(
49             |current_executor| match current_executor.replace(State::Active) {
50                 State::Ready(executor_ptr) => {
51                     let executor = unsafe { &mut *executor_ptr };
52                     let result = f(executor);
53                     current_executor.set(State::Ready(executor_ptr));
54                     Some(result)
55                 }
56                 State::Empty | State::Active => None,
57             },
58         )
59     }
60 }
61 
62 #[derive(Clone, Copy)]
63 enum State {
64     // default executor not defined
65     Empty,
66     // default executor is defined and ready to be used
67     Ready(*mut dyn Executor),
68     // default executor is currently active (used to detect recursive calls)
69     Active,
70 }
71 
72 thread_local! {
73     /// Thread-local tracking the current executor
74     static EXECUTOR: Cell<State> = Cell::new(State::Empty)
75 }
76 
77 // ===== impl DefaultExecutor =====
78 
79 impl super::Executor for DefaultExecutor {
spawn( &mut self, future: Box<dyn Future<Item = (), Error = ()> + Send>, ) -> Result<(), SpawnError>80     fn spawn(
81         &mut self,
82         future: Box<dyn Future<Item = (), Error = ()> + Send>,
83     ) -> Result<(), SpawnError> {
84         DefaultExecutor::with_current(|executor| executor.spawn(future))
85             .unwrap_or_else(|| Err(SpawnError::shutdown()))
86     }
87 
status(&self) -> Result<(), SpawnError>88     fn status(&self) -> Result<(), SpawnError> {
89         DefaultExecutor::with_current(|executor| executor.status())
90             .unwrap_or_else(|| Err(SpawnError::shutdown()))
91     }
92 }
93 
94 impl<T> super::TypedExecutor<T> for DefaultExecutor
95 where
96     T: Future<Item = (), Error = ()> + Send + 'static,
97 {
spawn(&mut self, future: T) -> Result<(), SpawnError>98     fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
99         super::Executor::spawn(self, Box::new(future))
100     }
101 
status(&self) -> Result<(), SpawnError>102     fn status(&self) -> Result<(), SpawnError> {
103         super::Executor::status(self)
104     }
105 }
106 
107 impl<T> future::Executor<T> for DefaultExecutor
108 where
109     T: Future<Item = (), Error = ()> + Send + 'static,
110 {
execute(&self, future: T) -> Result<(), future::ExecuteError<T>>111     fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> {
112         if let Err(e) = super::Executor::status(self) {
113             let kind = if e.is_at_capacity() {
114                 future::ExecuteErrorKind::NoCapacity
115             } else {
116                 future::ExecuteErrorKind::Shutdown
117             };
118 
119             return Err(future::ExecuteError::new(kind, future));
120         }
121 
122         let _ = DefaultExecutor::with_current(|executor| executor.spawn(Box::new(future)));
123         Ok(())
124     }
125 }
126 
127 // ===== global spawn fns =====
128 
129 /// Submits a future for execution on the default executor -- usually a
130 /// threadpool.
131 ///
132 /// Futures are lazy constructs. When they are defined, no work happens. In
133 /// order for the logic defined by the future to be run, the future must be
134 /// spawned on an executor. This function is the easiest way to do so.
135 ///
136 /// This function must be called from an execution context, i.e. from a future
137 /// that has been already spawned onto an executor.
138 ///
139 /// Once spawned, the future will execute. The details of how that happens is
140 /// left up to the executor instance. If the executor is a thread pool, the
141 /// future will be pushed onto a queue that a worker thread polls from. If the
142 /// executor is a "current thread" executor, the future might be polled
143 /// immediately from within the call to `spawn` or it might be pushed onto an
144 /// internal queue.
145 ///
146 /// # Panics
147 ///
148 /// This function will panic if the default executor is not set or if spawning
149 /// onto the default executor returns an error. To avoid the panic, use the
150 /// `DefaultExecutor` handle directly.
151 ///
152 /// # Examples
153 ///
154 /// ```rust
155 /// # extern crate futures;
156 /// # extern crate tokio_executor;
157 /// # use tokio_executor::spawn;
158 /// # pub fn dox() {
159 /// use futures::future::lazy;
160 ///
161 /// spawn(lazy(|| {
162 ///     println!("running on the default executor");
163 ///     Ok(())
164 /// }));
165 /// # }
166 /// # pub fn main() {}
167 /// ```
spawn<T>(future: T) where T: Future<Item = (), Error = ()> + Send + 'static,168 pub fn spawn<T>(future: T)
169 where
170     T: Future<Item = (), Error = ()> + Send + 'static,
171 {
172     DefaultExecutor::current().spawn(Box::new(future)).unwrap()
173 }
174 
175 /// Set the default executor for the duration of the closure
176 ///
177 /// # Panics
178 ///
179 /// This function panics if there already is a default executor set.
with_default<T, F, R>(executor: &mut T, enter: &mut Enter, f: F) -> R where T: Executor, F: FnOnce(&mut Enter) -> R,180 pub fn with_default<T, F, R>(executor: &mut T, enter: &mut Enter, f: F) -> R
181 where
182     T: Executor,
183     F: FnOnce(&mut Enter) -> R,
184 {
185     unsafe fn hide_lt<'a>(p: *mut (dyn Executor + 'a)) -> *mut (dyn Executor + 'static) {
186         use std::mem;
187         mem::transmute(p)
188     }
189 
190     EXECUTOR.with(|cell| {
191         match cell.get() {
192             State::Ready(_) | State::Active => {
193                 panic!("default executor already set for execution context")
194             }
195             _ => {}
196         }
197 
198         // Ensure that the executor is removed from the thread-local context
199         // when leaving the scope. This handles cases that involve panicking.
200         struct Reset<'a>(&'a Cell<State>);
201 
202         impl<'a> Drop for Reset<'a> {
203             fn drop(&mut self) {
204                 self.0.set(State::Empty);
205             }
206         }
207 
208         let _reset = Reset(cell);
209 
210         // While scary, this is safe. The function takes a
211         // `&mut Executor`, which guarantees that the reference lives for the
212         // duration of `with_default`.
213         //
214         // Because we are always clearing the TLS value at the end of the
215         // function, we can cast the reference to 'static which thread-local
216         // cells require.
217         let executor = unsafe { hide_lt(executor as &mut _ as *mut _) };
218 
219         cell.set(State::Ready(executor));
220 
221         f(enter)
222     })
223 }
224 
225 /// Sets `executor` as the default executor, returning a guard that unsets it when
226 /// dropped.
227 ///
228 /// # Panics
229 ///
230 /// This function panics if there already is a default executor set.
set_default<T>(executor: T) -> DefaultGuard where T: Executor + 'static,231 pub fn set_default<T>(executor: T) -> DefaultGuard
232 where
233     T: Executor + 'static,
234 {
235     EXECUTOR.with(|cell| {
236         match cell.get() {
237             State::Ready(_) | State::Active => {
238                 panic!("default executor already set for execution context")
239             }
240             _ => {}
241         }
242 
243         // Ensure that the executor will outlive the call to set_default, even
244         // if the drop guard is never dropped due to calls to `mem::forget` or
245         // similar.
246         let executor = Box::new(executor);
247 
248         cell.set(State::Ready(Box::into_raw(executor)));
249     });
250 
251     DefaultGuard { _p: () }
252 }
253 
254 impl Drop for DefaultGuard {
drop(&mut self)255     fn drop(&mut self) {
256         let _ = EXECUTOR.try_with(|cell| {
257             if let State::Ready(prev) = cell.replace(State::Empty) {
258                 // drop the previous executor.
259                 unsafe {
260                     let prev = Box::from_raw(prev);
261                     drop(prev);
262                 };
263             }
264         });
265     }
266 }
267 
268 #[cfg(test)]
269 mod tests {
270     use super::{with_default, DefaultExecutor, Executor};
271 
272     #[test]
default_executor_is_send_and_sync()273     fn default_executor_is_send_and_sync() {
274         fn assert_send_sync<T: Send + Sync>() {}
275 
276         assert_send_sync::<DefaultExecutor>();
277     }
278 
279     #[test]
nested_default_executor_status()280     fn nested_default_executor_status() {
281         let mut enter = super::super::enter().unwrap();
282         let mut executor = DefaultExecutor::current();
283 
284         let result = with_default(&mut executor, &mut enter, |_| {
285             DefaultExecutor::current().status()
286         });
287 
288         assert!(result.err().unwrap().is_shutdown())
289     }
290 }
291