1 use std::fmt;
2 use std::future::Future;
3 use std::pin::Pin;
4 use std::sync::Arc;
5 
6 #[cfg(feature = "server")]
7 use crate::body::{Body, HttpBody};
8 #[cfg(all(feature = "http2", feature = "server"))]
9 use crate::proto::h2::server::H2Stream;
10 use crate::rt::Executor;
11 #[cfg(feature = "server")]
12 use crate::server::conn::spawn_all::{NewSvcTask, Watcher};
13 #[cfg(feature = "server")]
14 use crate::service::HttpService;
15 
16 #[cfg(feature = "server")]
17 pub trait ConnStreamExec<F, B: HttpBody>: Clone {
execute_h2stream(&mut self, fut: H2Stream<F, B>)18     fn execute_h2stream(&mut self, fut: H2Stream<F, B>);
19 }
20 
21 #[cfg(feature = "server")]
22 pub trait NewSvcExec<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>>: Clone {
execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>)23     fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>);
24 }
25 
26 pub(crate) type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
27 
28 // Either the user provides an executor for background tasks, or we use
29 // `tokio::spawn`.
30 #[derive(Clone)]
31 pub enum Exec {
32     Default,
33     Executor(Arc<dyn Executor<BoxSendFuture> + Send + Sync>),
34 }
35 
36 // ===== impl Exec =====
37 
38 impl Exec {
execute<F>(&self, fut: F) where F: Future<Output = ()> + Send + 'static,39     pub(crate) fn execute<F>(&self, fut: F)
40     where
41         F: Future<Output = ()> + Send + 'static,
42     {
43         match *self {
44             Exec::Default => {
45                 #[cfg(feature = "tcp")]
46                 {
47                     tokio::task::spawn(fut);
48                 }
49                 #[cfg(not(feature = "tcp"))]
50                 {
51                     // If no runtime, we need an executor!
52                     panic!("executor must be set")
53                 }
54             }
55             Exec::Executor(ref e) => {
56                 e.execute(Box::pin(fut));
57             }
58         }
59     }
60 }
61 
62 impl fmt::Debug for Exec {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result63     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64         f.debug_struct("Exec").finish()
65     }
66 }
67 
68 #[cfg(feature = "server")]
69 impl<F, B> ConnStreamExec<F, B> for Exec
70 where
71     H2Stream<F, B>: Future<Output = ()> + Send + 'static,
72     B: HttpBody,
73 {
execute_h2stream(&mut self, fut: H2Stream<F, B>)74     fn execute_h2stream(&mut self, fut: H2Stream<F, B>) {
75         self.execute(fut)
76     }
77 }
78 
79 #[cfg(feature = "server")]
80 impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for Exec
81 where
82     NewSvcTask<I, N, S, E, W>: Future<Output = ()> + Send + 'static,
83     S: HttpService<Body>,
84     W: Watcher<I, S, E>,
85 {
execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>)86     fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) {
87         self.execute(fut)
88     }
89 }
90 
91 // ==== impl Executor =====
92 
93 #[cfg(feature = "server")]
94 impl<E, F, B> ConnStreamExec<F, B> for E
95 where
96     E: Executor<H2Stream<F, B>> + Clone,
97     H2Stream<F, B>: Future<Output = ()>,
98     B: HttpBody,
99 {
execute_h2stream(&mut self, fut: H2Stream<F, B>)100     fn execute_h2stream(&mut self, fut: H2Stream<F, B>) {
101         self.execute(fut)
102     }
103 }
104 
105 #[cfg(feature = "server")]
106 impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for E
107 where
108     E: Executor<NewSvcTask<I, N, S, E, W>> + Clone,
109     NewSvcTask<I, N, S, E, W>: Future<Output = ()>,
110     S: HttpService<Body>,
111     W: Watcher<I, S, E>,
112 {
execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>)113     fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) {
114         self.execute(fut)
115     }
116 }
117 
118 // If http2 is not enable, we just have a stub here, so that the trait bounds
119 // that *would* have been needed are still checked. Why?
120 //
121 // Because enabling `http2` shouldn't suddenly add new trait bounds that cause
122 // a compilation error.
123 #[cfg(not(feature = "http2"))]
124 #[allow(missing_debug_implementations)]
125 pub struct H2Stream<F, B>(std::marker::PhantomData<(F, B)>);
126 
127 #[cfg(not(feature = "http2"))]
128 impl<F, B, E> Future for H2Stream<F, B>
129 where
130     F: Future<Output = Result<http::Response<B>, E>>,
131     B: crate::body::HttpBody,
132     B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
133     E: Into<Box<dyn std::error::Error + Send + Sync>>,
134 {
135     type Output = ();
136 
poll( self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Self::Output>137     fn poll(
138         self: Pin<&mut Self>,
139         _cx: &mut std::task::Context<'_>,
140     ) -> std::task::Poll<Self::Output> {
141         unreachable!()
142     }
143 }
144