1 use std::fmt; 2 use std::future::Future; 3 use std::pin::Pin; 4 use std::sync::Arc; 5 6 #[cfg(feature = "server")] 7 use crate::body::{Body, HttpBody}; 8 #[cfg(all(feature = "http2", feature = "server"))] 9 use crate::proto::h2::server::H2Stream; 10 use crate::rt::Executor; 11 #[cfg(feature = "server")] 12 use crate::server::conn::spawn_all::{NewSvcTask, Watcher}; 13 #[cfg(feature = "server")] 14 use crate::service::HttpService; 15 16 #[cfg(feature = "server")] 17 pub trait ConnStreamExec<F, B: HttpBody>: Clone { execute_h2stream(&mut self, fut: H2Stream<F, B>)18 fn execute_h2stream(&mut self, fut: H2Stream<F, B>); 19 } 20 21 #[cfg(feature = "server")] 22 pub trait NewSvcExec<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>>: Clone { execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>)23 fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>); 24 } 25 26 pub(crate) type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>; 27 28 // Either the user provides an executor for background tasks, or we use 29 // `tokio::spawn`. 30 #[derive(Clone)] 31 pub enum Exec { 32 Default, 33 Executor(Arc<dyn Executor<BoxSendFuture> + Send + Sync>), 34 } 35 36 // ===== impl Exec ===== 37 38 impl Exec { execute<F>(&self, fut: F) where F: Future<Output = ()> + Send + 'static,39 pub(crate) fn execute<F>(&self, fut: F) 40 where 41 F: Future<Output = ()> + Send + 'static, 42 { 43 match *self { 44 Exec::Default => { 45 #[cfg(feature = "tcp")] 46 { 47 tokio::task::spawn(fut); 48 } 49 #[cfg(not(feature = "tcp"))] 50 { 51 // If no runtime, we need an executor! 52 panic!("executor must be set") 53 } 54 } 55 Exec::Executor(ref e) => { 56 e.execute(Box::pin(fut)); 57 } 58 } 59 } 60 } 61 62 impl fmt::Debug for Exec { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 64 f.debug_struct("Exec").finish() 65 } 66 } 67 68 #[cfg(feature = "server")] 69 impl<F, B> ConnStreamExec<F, B> for Exec 70 where 71 H2Stream<F, B>: Future<Output = ()> + Send + 'static, 72 B: HttpBody, 73 { execute_h2stream(&mut self, fut: H2Stream<F, B>)74 fn execute_h2stream(&mut self, fut: H2Stream<F, B>) { 75 self.execute(fut) 76 } 77 } 78 79 #[cfg(feature = "server")] 80 impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for Exec 81 where 82 NewSvcTask<I, N, S, E, W>: Future<Output = ()> + Send + 'static, 83 S: HttpService<Body>, 84 W: Watcher<I, S, E>, 85 { execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>)86 fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) { 87 self.execute(fut) 88 } 89 } 90 91 // ==== impl Executor ===== 92 93 #[cfg(feature = "server")] 94 impl<E, F, B> ConnStreamExec<F, B> for E 95 where 96 E: Executor<H2Stream<F, B>> + Clone, 97 H2Stream<F, B>: Future<Output = ()>, 98 B: HttpBody, 99 { execute_h2stream(&mut self, fut: H2Stream<F, B>)100 fn execute_h2stream(&mut self, fut: H2Stream<F, B>) { 101 self.execute(fut) 102 } 103 } 104 105 #[cfg(feature = "server")] 106 impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for E 107 where 108 E: Executor<NewSvcTask<I, N, S, E, W>> + Clone, 109 NewSvcTask<I, N, S, E, W>: Future<Output = ()>, 110 S: HttpService<Body>, 111 W: Watcher<I, S, E>, 112 { execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>)113 fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) { 114 self.execute(fut) 115 } 116 } 117 118 // If http2 is not enable, we just have a stub here, so that the trait bounds 119 // that *would* have been needed are still checked. Why? 120 // 121 // Because enabling `http2` shouldn't suddenly add new trait bounds that cause 122 // a compilation error. 123 #[cfg(not(feature = "http2"))] 124 #[allow(missing_debug_implementations)] 125 pub struct H2Stream<F, B>(std::marker::PhantomData<(F, B)>); 126 127 #[cfg(not(feature = "http2"))] 128 impl<F, B, E> Future for H2Stream<F, B> 129 where 130 F: Future<Output = Result<http::Response<B>, E>>, 131 B: crate::body::HttpBody, 132 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, 133 E: Into<Box<dyn std::error::Error + Send + Sync>>, 134 { 135 type Output = (); 136 poll( self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Self::Output>137 fn poll( 138 self: Pin<&mut Self>, 139 _cx: &mut std::task::Context<'_>, 140 ) -> std::task::Poll<Self::Output> { 141 unreachable!() 142 } 143 } 144