1 use std::marker::PhantomData;
2 use std::net::SocketAddr;
3 use std::task::{Context, Poll};
4 
5 use actix_service::{Service, ServiceFactory as BaseServiceFactory};
6 use actix_utils::{
7     counter::CounterGuard,
8     future::{ready, Ready},
9 };
10 use futures_core::future::LocalBoxFuture;
11 use log::error;
12 
13 use crate::socket::{FromStream, MioStream};
14 use crate::Token;
15 
16 pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
17     type Factory: BaseServiceFactory<Stream, Config = ()>;
18 
create(&self) -> Self::Factory19     fn create(&self) -> Self::Factory;
20 }
21 
22 pub(crate) trait InternalServiceFactory: Send {
name(&self, token: Token) -> &str23     fn name(&self, token: Token) -> &str;
24 
clone_factory(&self) -> Box<dyn InternalServiceFactory>25     fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
26 
create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>27     fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>;
28 }
29 
30 pub(crate) type BoxedServerService = Box<
31     dyn Service<
32         (CounterGuard, MioStream),
33         Response = (),
34         Error = (),
35         Future = Ready<Result<(), ()>>,
36     >,
37 >;
38 
39 pub(crate) struct StreamService<S, I> {
40     service: S,
41     _phantom: PhantomData<I>,
42 }
43 
44 impl<S, I> StreamService<S, I> {
new(service: S) -> Self45     pub(crate) fn new(service: S) -> Self {
46         StreamService {
47             service,
48             _phantom: PhantomData,
49         }
50     }
51 }
52 
53 impl<S, I> Service<(CounterGuard, MioStream)> for StreamService<S, I>
54 where
55     S: Service<I>,
56     S::Future: 'static,
57     S::Error: 'static,
58     I: FromStream,
59 {
60     type Response = ();
61     type Error = ();
62     type Future = Ready<Result<(), ()>>;
63 
poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>64     fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
65         self.service.poll_ready(ctx).map_err(|_| ())
66     }
67 
call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future68     fn call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future {
69         ready(match FromStream::from_mio(req) {
70             Ok(stream) => {
71                 let f = self.service.call(stream);
72                 actix_rt::spawn(async move {
73                     let _ = f.await;
74                     drop(guard);
75                 });
76                 Ok(())
77             }
78             Err(e) => {
79                 error!("Can not convert to an async tcp stream: {}", e);
80                 Err(())
81             }
82         })
83     }
84 }
85 
86 pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
87     name: String,
88     inner: F,
89     token: Token,
90     addr: SocketAddr,
91     _t: PhantomData<Io>,
92 }
93 
94 impl<F, Io> StreamNewService<F, Io>
95 where
96     F: ServiceFactory<Io>,
97     Io: FromStream + Send + 'static,
98 {
create( name: String, token: Token, inner: F, addr: SocketAddr, ) -> Box<dyn InternalServiceFactory>99     pub(crate) fn create(
100         name: String,
101         token: Token,
102         inner: F,
103         addr: SocketAddr,
104     ) -> Box<dyn InternalServiceFactory> {
105         Box::new(Self {
106             name,
107             token,
108             inner,
109             addr,
110             _t: PhantomData,
111         })
112     }
113 }
114 
115 impl<F, Io> InternalServiceFactory for StreamNewService<F, Io>
116 where
117     F: ServiceFactory<Io>,
118     Io: FromStream + Send + 'static,
119 {
name(&self, _: Token) -> &str120     fn name(&self, _: Token) -> &str {
121         &self.name
122     }
123 
clone_factory(&self) -> Box<dyn InternalServiceFactory>124     fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
125         Box::new(Self {
126             name: self.name.clone(),
127             inner: self.inner.clone(),
128             token: self.token,
129             addr: self.addr,
130             _t: PhantomData,
131         })
132     }
133 
create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>134     fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
135         let token = self.token;
136         let fut = self.inner.create().new_service(());
137         Box::pin(async move {
138             match fut.await {
139                 Ok(inner) => {
140                     let service = Box::new(StreamService::new(inner)) as _;
141                     Ok(vec![(token, service)])
142                 }
143                 Err(_) => Err(()),
144             }
145         })
146     }
147 }
148 
149 impl<F, T, I> ServiceFactory<I> for F
150 where
151     F: Fn() -> T + Send + Clone + 'static,
152     T: BaseServiceFactory<I, Config = ()>,
153     I: FromStream,
154 {
155     type Factory = T;
156 
create(&self) -> T157     fn create(&self) -> T {
158         (self)()
159     }
160 }
161