1 use super::{Compat, Future01CompatExt};
2 use crate::{
3     future::{FutureExt, TryFutureExt, UnitError},
4     task::SpawnExt,
5 };
6 use futures_01::future::{ExecuteError as ExecuteError01, Executor as Executor01};
7 use futures_01::Future as Future01;
8 use futures_task::{FutureObj, Spawn as Spawn03, SpawnError as SpawnError03};
9 
10 /// A future that can run on a futures 0.1
11 /// [`Executor`](futures_01::future::Executor).
12 pub type Executor01Future = Compat<UnitError<FutureObj<'static, ()>>>;
13 
14 /// Extension trait for futures 0.1 [`Executor`](futures_01::future::Executor).
15 pub trait Executor01CompatExt: Executor01<Executor01Future> + Clone + Send + 'static {
16     /// Converts a futures 0.1 [`Executor`](futures_01::future::Executor) into a
17     /// futures 0.3 [`Spawn`](futures_task::Spawn).
18     ///
19     /// ```
20     /// # if cfg!(miri) { return; } // Miri does not support epoll
21     /// use futures::task::SpawnExt;
22     /// use futures::future::{FutureExt, TryFutureExt};
23     /// use futures_util::compat::Executor01CompatExt;
24     /// use tokio::executor::DefaultExecutor;
25     ///
26     /// # let (tx, rx) = futures::channel::oneshot::channel();
27     ///
28     /// let spawner = DefaultExecutor::current().compat();
29     /// let future03 = async move {
30     ///     println!("Running on the pool");
31     ///     spawner.spawn(async {
32     ///         println!("Spawned!");
33     ///         # tx.send(42).unwrap();
34     ///     }).unwrap();
35     /// };
36     ///
37     /// let future01 = future03.unit_error().boxed().compat();
38     ///
39     /// tokio::run(future01);
40     /// # futures::executor::block_on(rx).unwrap();
41     /// ```
compat(self) -> Executor01As03<Self> where Self: Sized42     fn compat(self) -> Executor01As03<Self>
43     where
44         Self: Sized;
45 }
46 
47 impl<Ex> Executor01CompatExt for Ex
48 where
49     Ex: Executor01<Executor01Future> + Clone + Send + 'static,
50 {
compat(self) -> Executor01As03<Self>51     fn compat(self) -> Executor01As03<Self> {
52         Executor01As03 { executor01: self }
53     }
54 }
55 
56 /// Converts a futures 0.1 [`Executor`](futures_01::future::Executor) into a
57 /// futures 0.3 [`Spawn`](futures_task::Spawn).
58 #[derive(Debug, Clone)]
59 pub struct Executor01As03<Ex> {
60     executor01: Ex,
61 }
62 
63 impl<Ex> Spawn03 for Executor01As03<Ex>
64 where
65     Ex: Executor01<Executor01Future> + Clone + Send + 'static,
66 {
spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError03>67     fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError03> {
68         let future = future.unit_error().compat();
69 
70         self.executor01.execute(future).map_err(|_| SpawnError03::shutdown())
71     }
72 }
73 
74 #[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
75 impl<Sp, Fut> Executor01<Fut> for Compat<Sp>
76 where
77     for<'a> &'a Sp: Spawn03,
78     Fut: Future01<Item = (), Error = ()> + Send + 'static,
79 {
execute(&self, future: Fut) -> Result<(), ExecuteError01<Fut>>80     fn execute(&self, future: Fut) -> Result<(), ExecuteError01<Fut>> {
81         (&self.inner)
82             .spawn(future.compat().map(|_| ()))
83             .expect("unable to spawn future from Compat executor");
84         Ok(())
85     }
86 }
87