1 use std::marker::PhantomData; 2 use std::net::SocketAddr; 3 use std::task::{Context, Poll}; 4 5 use actix_service::{Service, ServiceFactory as BaseServiceFactory}; 6 use actix_utils::{ 7 counter::CounterGuard, 8 future::{ready, Ready}, 9 }; 10 use futures_core::future::LocalBoxFuture; 11 use log::error; 12 13 use crate::socket::{FromStream, MioStream}; 14 use crate::Token; 15 16 pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static { 17 type Factory: BaseServiceFactory<Stream, Config = ()>; 18 create(&self) -> Self::Factory19 fn create(&self) -> Self::Factory; 20 } 21 22 pub(crate) trait InternalServiceFactory: Send { name(&self, token: Token) -> &str23 fn name(&self, token: Token) -> &str; 24 clone_factory(&self) -> Box<dyn InternalServiceFactory>25 fn clone_factory(&self) -> Box<dyn InternalServiceFactory>; 26 create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>27 fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>; 28 } 29 30 pub(crate) type BoxedServerService = Box< 31 dyn Service< 32 (CounterGuard, MioStream), 33 Response = (), 34 Error = (), 35 Future = Ready<Result<(), ()>>, 36 >, 37 >; 38 39 pub(crate) struct StreamService<S, I> { 40 service: S, 41 _phantom: PhantomData<I>, 42 } 43 44 impl<S, I> StreamService<S, I> { new(service: S) -> Self45 pub(crate) fn new(service: S) -> Self { 46 StreamService { 47 service, 48 _phantom: PhantomData, 49 } 50 } 51 } 52 53 impl<S, I> Service<(CounterGuard, MioStream)> for StreamService<S, I> 54 where 55 S: Service<I>, 56 S::Future: 'static, 57 S::Error: 'static, 58 I: FromStream, 59 { 60 type Response = (); 61 type Error = (); 62 type Future = Ready<Result<(), ()>>; 63 poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>64 fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 65 self.service.poll_ready(ctx).map_err(|_| ()) 66 } 67 call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future68 fn call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future { 69 ready(match FromStream::from_mio(req) { 70 Ok(stream) => { 71 let f = self.service.call(stream); 72 actix_rt::spawn(async move { 73 let _ = f.await; 74 drop(guard); 75 }); 76 Ok(()) 77 } 78 Err(e) => { 79 error!("Can not convert to an async tcp stream: {}", e); 80 Err(()) 81 } 82 }) 83 } 84 } 85 86 pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> { 87 name: String, 88 inner: F, 89 token: Token, 90 addr: SocketAddr, 91 _t: PhantomData<Io>, 92 } 93 94 impl<F, Io> StreamNewService<F, Io> 95 where 96 F: ServiceFactory<Io>, 97 Io: FromStream + Send + 'static, 98 { create( name: String, token: Token, inner: F, addr: SocketAddr, ) -> Box<dyn InternalServiceFactory>99 pub(crate) fn create( 100 name: String, 101 token: Token, 102 inner: F, 103 addr: SocketAddr, 104 ) -> Box<dyn InternalServiceFactory> { 105 Box::new(Self { 106 name, 107 token, 108 inner, 109 addr, 110 _t: PhantomData, 111 }) 112 } 113 } 114 115 impl<F, Io> InternalServiceFactory for StreamNewService<F, Io> 116 where 117 F: ServiceFactory<Io>, 118 Io: FromStream + Send + 'static, 119 { name(&self, _: Token) -> &str120 fn name(&self, _: Token) -> &str { 121 &self.name 122 } 123 clone_factory(&self) -> Box<dyn InternalServiceFactory>124 fn clone_factory(&self) -> Box<dyn InternalServiceFactory> { 125 Box::new(Self { 126 name: self.name.clone(), 127 inner: self.inner.clone(), 128 token: self.token, 129 addr: self.addr, 130 _t: PhantomData, 131 }) 132 } 133 create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>134 fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> { 135 let token = self.token; 136 let fut = self.inner.create().new_service(()); 137 Box::pin(async move { 138 match fut.await { 139 Ok(inner) => { 140 let service = Box::new(StreamService::new(inner)) as _; 141 Ok(vec![(token, service)]) 142 } 143 Err(_) => Err(()), 144 } 145 }) 146 } 147 } 148 149 impl<F, T, I> ServiceFactory<I> for F 150 where 151 F: Fn() -> T + Send + Clone + 'static, 152 T: BaseServiceFactory<I, Config = ()>, 153 I: FromStream, 154 { 155 type Factory = T; 156 create(&self) -> T157 fn create(&self) -> T { 158 (self)() 159 } 160 } 161