1 use super::{Enter, Executor, SpawnError};
2
3 use futures::{future, Future};
4
5 use std::cell::Cell;
6
7 /// Executes futures on the default executor for the current execution context.
8 ///
9 /// `DefaultExecutor` implements `Executor` and can be used to spawn futures
10 /// without referencing a specific executor.
11 ///
12 /// When an executor starts, it sets the `DefaultExecutor` handle to point to an
13 /// executor (usually itself) that is used to spawn new tasks.
14 ///
15 /// The current `DefaultExecutor` reference is tracked using a thread-local
16 /// variable and is set using `tokio_executor::with_default`
17 #[derive(Debug, Clone)]
18 pub struct DefaultExecutor {
19 _dummy: (),
20 }
21
22 /// Ensures that the executor is removed from the thread-local context
23 /// when leaving the scope. This handles cases that involve panicking.
24 #[derive(Debug)]
25 pub struct DefaultGuard {
26 _p: (),
27 }
28
29 impl DefaultExecutor {
30 /// Returns a handle to the default executor for the current context.
31 ///
32 /// Futures may be spawned onto the default executor using this handle.
33 ///
34 /// The returned handle will reference whichever executor is configured as
35 /// the default **at the time `spawn` is called**. This enables
36 /// `DefaultExecutor::current()` to be called before an execution context is
37 /// setup, then passed **into** an execution context before it is used.
38 ///
39 /// This is also true for sending the handle across threads, so calling
40 /// `DefaultExecutor::current()` on thread A and then sending the result to
41 /// thread B will _not_ reference the default executor that was set on thread A.
current() -> DefaultExecutor42 pub fn current() -> DefaultExecutor {
43 DefaultExecutor { _dummy: () }
44 }
45
46 #[inline]
with_current<F: FnOnce(&mut dyn Executor) -> R, R>(f: F) -> Option<R>47 fn with_current<F: FnOnce(&mut dyn Executor) -> R, R>(f: F) -> Option<R> {
48 EXECUTOR.with(
49 |current_executor| match current_executor.replace(State::Active) {
50 State::Ready(executor_ptr) => {
51 let executor = unsafe { &mut *executor_ptr };
52 let result = f(executor);
53 current_executor.set(State::Ready(executor_ptr));
54 Some(result)
55 }
56 State::Empty | State::Active => None,
57 },
58 )
59 }
60 }
61
62 #[derive(Clone, Copy)]
63 enum State {
64 // default executor not defined
65 Empty,
66 // default executor is defined and ready to be used
67 Ready(*mut dyn Executor),
68 // default executor is currently active (used to detect recursive calls)
69 Active,
70 }
71
72 thread_local! {
73 /// Thread-local tracking the current executor
74 static EXECUTOR: Cell<State> = Cell::new(State::Empty)
75 }
76
77 // ===== impl DefaultExecutor =====
78
79 impl super::Executor for DefaultExecutor {
spawn( &mut self, future: Box<dyn Future<Item = (), Error = ()> + Send>, ) -> Result<(), SpawnError>80 fn spawn(
81 &mut self,
82 future: Box<dyn Future<Item = (), Error = ()> + Send>,
83 ) -> Result<(), SpawnError> {
84 DefaultExecutor::with_current(|executor| executor.spawn(future))
85 .unwrap_or_else(|| Err(SpawnError::shutdown()))
86 }
87
status(&self) -> Result<(), SpawnError>88 fn status(&self) -> Result<(), SpawnError> {
89 DefaultExecutor::with_current(|executor| executor.status())
90 .unwrap_or_else(|| Err(SpawnError::shutdown()))
91 }
92 }
93
94 impl<T> super::TypedExecutor<T> for DefaultExecutor
95 where
96 T: Future<Item = (), Error = ()> + Send + 'static,
97 {
spawn(&mut self, future: T) -> Result<(), SpawnError>98 fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
99 super::Executor::spawn(self, Box::new(future))
100 }
101
status(&self) -> Result<(), SpawnError>102 fn status(&self) -> Result<(), SpawnError> {
103 super::Executor::status(self)
104 }
105 }
106
107 impl<T> future::Executor<T> for DefaultExecutor
108 where
109 T: Future<Item = (), Error = ()> + Send + 'static,
110 {
execute(&self, future: T) -> Result<(), future::ExecuteError<T>>111 fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> {
112 if let Err(e) = super::Executor::status(self) {
113 let kind = if e.is_at_capacity() {
114 future::ExecuteErrorKind::NoCapacity
115 } else {
116 future::ExecuteErrorKind::Shutdown
117 };
118
119 return Err(future::ExecuteError::new(kind, future));
120 }
121
122 let _ = DefaultExecutor::with_current(|executor| executor.spawn(Box::new(future)));
123 Ok(())
124 }
125 }
126
127 // ===== global spawn fns =====
128
129 /// Submits a future for execution on the default executor -- usually a
130 /// threadpool.
131 ///
132 /// Futures are lazy constructs. When they are defined, no work happens. In
133 /// order for the logic defined by the future to be run, the future must be
134 /// spawned on an executor. This function is the easiest way to do so.
135 ///
136 /// This function must be called from an execution context, i.e. from a future
137 /// that has been already spawned onto an executor.
138 ///
139 /// Once spawned, the future will execute. The details of how that happens is
140 /// left up to the executor instance. If the executor is a thread pool, the
141 /// future will be pushed onto a queue that a worker thread polls from. If the
142 /// executor is a "current thread" executor, the future might be polled
143 /// immediately from within the call to `spawn` or it might be pushed onto an
144 /// internal queue.
145 ///
146 /// # Panics
147 ///
148 /// This function will panic if the default executor is not set or if spawning
149 /// onto the default executor returns an error. To avoid the panic, use the
150 /// `DefaultExecutor` handle directly.
151 ///
152 /// # Examples
153 ///
154 /// ```rust
155 /// # extern crate futures;
156 /// # extern crate tokio_executor;
157 /// # use tokio_executor::spawn;
158 /// # pub fn dox() {
159 /// use futures::future::lazy;
160 ///
161 /// spawn(lazy(|| {
162 /// println!("running on the default executor");
163 /// Ok(())
164 /// }));
165 /// # }
166 /// # pub fn main() {}
167 /// ```
spawn<T>(future: T) where T: Future<Item = (), Error = ()> + Send + 'static,168 pub fn spawn<T>(future: T)
169 where
170 T: Future<Item = (), Error = ()> + Send + 'static,
171 {
172 DefaultExecutor::current().spawn(Box::new(future)).unwrap()
173 }
174
175 /// Set the default executor for the duration of the closure
176 ///
177 /// # Panics
178 ///
179 /// This function panics if there already is a default executor set.
with_default<T, F, R>(executor: &mut T, enter: &mut Enter, f: F) -> R where T: Executor, F: FnOnce(&mut Enter) -> R,180 pub fn with_default<T, F, R>(executor: &mut T, enter: &mut Enter, f: F) -> R
181 where
182 T: Executor,
183 F: FnOnce(&mut Enter) -> R,
184 {
185 unsafe fn hide_lt<'a>(p: *mut (dyn Executor + 'a)) -> *mut (dyn Executor + 'static) {
186 use std::mem;
187 mem::transmute(p)
188 }
189
190 EXECUTOR.with(|cell| {
191 match cell.get() {
192 State::Ready(_) | State::Active => {
193 panic!("default executor already set for execution context")
194 }
195 _ => {}
196 }
197
198 // Ensure that the executor is removed from the thread-local context
199 // when leaving the scope. This handles cases that involve panicking.
200 struct Reset<'a>(&'a Cell<State>);
201
202 impl<'a> Drop for Reset<'a> {
203 fn drop(&mut self) {
204 self.0.set(State::Empty);
205 }
206 }
207
208 let _reset = Reset(cell);
209
210 // While scary, this is safe. The function takes a
211 // `&mut Executor`, which guarantees that the reference lives for the
212 // duration of `with_default`.
213 //
214 // Because we are always clearing the TLS value at the end of the
215 // function, we can cast the reference to 'static which thread-local
216 // cells require.
217 let executor = unsafe { hide_lt(executor as &mut _ as *mut _) };
218
219 cell.set(State::Ready(executor));
220
221 f(enter)
222 })
223 }
224
225 /// Sets `executor` as the default executor, returning a guard that unsets it when
226 /// dropped.
227 ///
228 /// # Panics
229 ///
230 /// This function panics if there already is a default executor set.
set_default<T>(executor: T) -> DefaultGuard where T: Executor + 'static,231 pub fn set_default<T>(executor: T) -> DefaultGuard
232 where
233 T: Executor + 'static,
234 {
235 EXECUTOR.with(|cell| {
236 match cell.get() {
237 State::Ready(_) | State::Active => {
238 panic!("default executor already set for execution context")
239 }
240 _ => {}
241 }
242
243 // Ensure that the executor will outlive the call to set_default, even
244 // if the drop guard is never dropped due to calls to `mem::forget` or
245 // similar.
246 let executor = Box::new(executor);
247
248 cell.set(State::Ready(Box::into_raw(executor)));
249 });
250
251 DefaultGuard { _p: () }
252 }
253
254 impl Drop for DefaultGuard {
drop(&mut self)255 fn drop(&mut self) {
256 let _ = EXECUTOR.try_with(|cell| {
257 if let State::Ready(prev) = cell.replace(State::Empty) {
258 // drop the previous executor.
259 unsafe {
260 let prev = Box::from_raw(prev);
261 drop(prev);
262 };
263 }
264 });
265 }
266 }
267
268 #[cfg(test)]
269 mod tests {
270 use super::{with_default, DefaultExecutor, Executor};
271
272 #[test]
default_executor_is_send_and_sync()273 fn default_executor_is_send_and_sync() {
274 fn assert_send_sync<T: Send + Sync>() {}
275
276 assert_send_sync::<DefaultExecutor>();
277 }
278
279 #[test]
nested_default_executor_status()280 fn nested_default_executor_status() {
281 let mut enter = super::super::enter().unwrap();
282 let mut executor = DefaultExecutor::current();
283
284 let result = with_default(&mut executor, &mut enter, |_| {
285 DefaultExecutor::current().status()
286 });
287
288 assert!(result.err().unwrap().is_shutdown())
289 }
290 }
291