1 extern crate hyper;
2 extern crate websocket;
3
4 use self::hyper::uri::RequestUri::AbsolutePath;
5
6 use self::websocket::WebSocketError;
7 use futures::future::{err, Future};
8
9 use std::rc::Rc;
10
11 use crate::options::StaticFile;
12
13 use self::websocket::server::upgrade::r#async::IntoWs;
14
15 use super::ws_peer::{PeerForWs};
16 use super::{box_up_err, io_other_error, BoxedNewPeerFuture, Peer};
17 use super::{ConstructParams, L2rUser, PeerConstructor, Specifier};
18
19 #[derive(Debug)]
20 pub struct WsServer<T: Specifier>(pub T);
21 impl<T: Specifier> Specifier for WsServer<T> {
construct(&self, cp: ConstructParams) -> PeerConstructor22 fn construct(&self, cp: ConstructParams) -> PeerConstructor {
23 let restrict_uri = Rc::new(cp.program_options.restrict_uri.clone());
24 let serve_static_files = Rc::new(cp.program_options.serve_static_files.clone());
25 let inner = self.0.construct(cp.clone());
26 //let l2r = cp.left_to_right;
27 inner.map(move |p, l2r| {
28 // FIXME: attack of `Vec::clone`s.
29 ws_upgrade_peer(
30 p,
31 restrict_uri.clone(),
32 serve_static_files.clone(),
33 cp.program_options.websocket_reply_protocol.clone(),
34 cp.program_options.custom_reply_headers.clone(),
35 cp.program_options.clone(),
36 l2r,
37 )
38 })
39 }
40 specifier_boilerplate!(noglobalstate has_subspec);
41 self_0_is_subspecifier!(proxy_is_multiconnect);
42 }
43 specifier_class!(
44 name = WsServerClass,
45 target = WsServer,
46 prefixes = ["ws-upgrade:", "upgrade-ws:", "ws-u:", "u-ws:"],
47 arg_handling = subspec,
48 overlay = true,
49 MessageOriented,
50 MulticonnectnessDependsOnInnerType,
51 help = r#"
52 WebSocket upgrader / raw server. Specify your own protocol instead of usual TCP. [A]
53
54 All other WebSocket server modes actually use this overlay under the hood.
55
56 Example: serve incoming connection from socat
57
58 socat tcp-l:1234,fork,reuseaddr exec:'websocat -t ws-u\:stdio\: mirror\:'
59 "#
60 );
61
62 specifier_alias!(
63 name = WsTcpServerClass,
64 prefixes = ["ws-listen:", "ws-l:", "l-ws:", "listen-ws:"],
65 alias = "ws-u:tcp-l:",
66 help = r#"
67 WebSocket server. Argument is host and port to listen.
68
69 Example: Dump all incoming websocket data to console
70
71 websocat ws-l:127.0.0.1:8808 -
72
73 Example: the same, but more verbose:
74
75 websocat ws-l:tcp-l:127.0.0.1:8808 reuse:-
76 "#
77 );
78
79 specifier_alias!(
80 name = WsInetdServerClass,
81 prefixes = ["inetd-ws:", "ws-inetd:"],
82 alias = "ws-u:inetd:",
83 help = r#"
84 WebSocket inetd server. [A]
85
86 TODO: transfer the example here
87 "#
88 );
89
90 specifier_alias!(
91 name = WsUnixServerClass,
92 prefixes = ["l-ws-unix:"],
93 alias = "ws-u:unix-l:",
94 help = r#"
95 WebSocket UNIX socket-based server. [A]
96 "#
97 );
98
99 specifier_alias!(
100 name = WsAbstractUnixServerClass,
101 prefixes = ["l-ws-abstract:"],
102 alias = "ws-l:abstract-l:",
103 help = r#"
104 WebSocket abstract-namespaced UNIX socket server. [A]
105 "#
106 );
107
108 #[path = "http_serve.rs"]
109 pub mod http_serve;
110
ws_upgrade_peer( inner_peer: Peer, restrict_uri: Rc<Option<String>>, serve_static_files: Rc<Vec<StaticFile>>, websocket_protocol: Option<String>, custom_reply_headers: Vec<(String, Vec<u8>)>, opts: Rc<super::Options>, l2r: L2rUser, ) -> BoxedNewPeerFuture111 pub fn ws_upgrade_peer(
112 inner_peer: Peer,
113 restrict_uri: Rc<Option<String>>,
114 serve_static_files: Rc<Vec<StaticFile>>,
115 websocket_protocol: Option<String>,
116 custom_reply_headers: Vec<(String, Vec<u8>)>,
117 opts: Rc<super::Options>,
118 l2r: L2rUser,
119 ) -> BoxedNewPeerFuture {
120 let step1 = PeerForWs(inner_peer);
121 let step2: Box<
122 dyn Future<Item = self::websocket::server::upgrade::r#async::Upgrade<_>, Error = _>,
123 > = step1.into_ws();
124 let step3 = step2
125 .or_else(|(innerpeer, hyper_incoming, _bytesmut, e)| {
126 http_serve::http_serve(innerpeer.0, hyper_incoming, serve_static_files)
127 .then(|_|
128 err(WebSocketError::IoError(io_other_error(e)))
129 )
130 })
131 .and_then(
132 move |mut x| -> Box<dyn Future<Item = Peer, Error = websocket::WebSocketError>> {
133 info!("Incoming connection to websocket: {}", x.request.subject.1);
134
135 use ::websocket::header::WebSocketProtocol;
136
137 let mut protocol_check = true;
138 {
139 let pp : Option<&WebSocketProtocol> = x.request.headers.get();
140 if let Some(rp) = websocket_protocol {
141 // Unconditionally set this protocol
142 x.headers.set_raw("Sec-WebSocket-Protocol",
143 vec![rp.as_bytes().to_vec()],
144 );
145 // Warn if not present in client protocols
146 let mut present = false;
147 if let Some(pp) = pp {
148 if let Some(pp) = pp.iter().next() {
149 if pp == &rp {
150 present = true;
151 }
152 }
153 }
154 if !present {
155 if pp.is_none() {
156 warn!("Client failed to specify Sec-WebSocket-Protocol header. Replying with it anyway, against the RFC.");
157 } else {
158 protocol_check = false;
159 }
160 }
161 } else {
162 // No protocol specified, just choosing the first if any.
163 if let Some(pp) = pp {
164 if pp.len() > 1 {
165 warn!("Multiple `Sec-WebSocket-Protocol`s specified in the request. Choosing the first one. Use --server-protocol to make it explicit.")
166 }
167 if let Some(pp) = pp.iter().next() {
168 x.headers.set_raw(
169 "Sec-WebSocket-Protocol",
170 vec![pp.as_bytes().to_vec()],
171 );
172 }
173 }
174 }
175 }
176
177 for (hn, hv) in custom_reply_headers {
178 x.headers.append_raw(hn, hv);
179 }
180
181 debug!("{:?}", x.request);
182 debug!("{:?}", x.headers);
183
184 if !protocol_check {
185 return Box::new(
186 x.reject()
187 .and_then(|_| {
188 warn!("Requested Sec-WebSocket-Protocol does not match --server-protocol option");
189 ::futures::future::err(crate::util::simple_err(
190 "Requested Sec-WebSocket-Protocol does not match --server-protocol option"
191 .to_string(),
192 ))
193 })
194 .map_err(|e| websocket::WebSocketError::IoError(io_other_error(e))),
195 )
196 as Box<dyn Future<Item = Peer, Error = websocket::WebSocketError>>;
197 }
198
199
200 match l2r {
201 L2rUser::FillIn(ref y) => {
202 let uri = &x.request.subject.1;
203 let mut z = y.borrow_mut();
204 z.uri = Some(format!("{}", uri));
205
206 let h : &websocket::header::Headers = &x.request.headers;
207 for q in opts.headers_to_env.iter() {
208 if let Some(v) = h.get_raw(q) {
209 if v.is_empty() { continue }
210 if v.len() > 1 {
211 warn!("Extra request header for {} ignored", q);
212 }
213 if let Ok(val) = String::from_utf8(v[0].clone()) {
214 z.headers.push((
215 q.clone(),
216 val,
217 ));
218 } else {
219 warn!("Header {} value contains invalid UTF-8", q);
220 }
221 } else {
222 warn!("No request header {}, so no envvar H_{}", q, q);
223 }
224 }
225 },
226 L2rUser::ReadFrom(_) => {},
227 }
228
229
230 if let Some(ref restrict_uri) = *restrict_uri {
231 let check_passed = match x.request.subject.1 {
232 AbsolutePath(ref x) if x == restrict_uri => true,
233 _ => false,
234 };
235 if !check_passed {
236 return Box::new(
237 x.reject()
238 .and_then(|_| {
239 warn!("Incoming request URI doesn't match the --restrict-uri value");
240 ::futures::future::err(crate::util::simple_err(
241 "Request URI doesn't match --restrict-uri parameter"
242 .to_string(),
243 ))
244 })
245 .map_err(|e| websocket::WebSocketError::IoError(io_other_error(e))),
246 )
247 as Box<dyn Future<Item = Peer, Error = websocket::WebSocketError>>;
248 }
249 };
250 Box::new(x.accept().map(move |(y, headers)| {
251 debug!("{:?}", headers);
252 info!("Upgraded");
253 let close_on_shutdown = !opts.websocket_dont_close;
254 super::ws_peer::finish_building_ws_peer(&*opts, y, close_on_shutdown, None)
255 })) as Box<dyn Future<Item = Peer, Error = websocket::WebSocketError>>
256 },
257 );
258 let step4 = step3.map_err(box_up_err);
259 Box::new(step4) as BoxedNewPeerFuture
260 }
261