1 //! Everything you need to create a client connection to a websocket.
2 
3 use crate::header::extensions::Extension;
4 use crate::header::{
5 	Origin, WebSocketExtensions, WebSocketKey, WebSocketProtocol, WebSocketVersion,
6 };
7 use hyper::header::{Authorization, Basic, Header, HeaderFormat, Headers};
8 use hyper::version::HttpVersion;
9 use std::borrow::Cow;
10 use std::convert::Into;
11 pub use url::{ParseError, Url};
12 
13 #[cfg(any(feature = "sync", feature = "async"))]
14 mod common_imports {
15 	pub use crate::header::WebSocketAccept;
16 	pub use crate::result::{WSUrlErrorKind, WebSocketError, WebSocketOtherError, WebSocketResult};
17 	pub use crate::stream::{self, Stream};
18 	pub use hyper::buffer::BufReader;
19 	pub use hyper::header::{Connection, ConnectionOption, Host, Protocol, ProtocolName, Upgrade};
20 	pub use hyper::http::h1::parse_response;
21 	pub use hyper::http::h1::Incoming;
22 	pub use hyper::http::RawStatus;
23 	pub use hyper::method::Method;
24 	pub use hyper::status::StatusCode;
25 	pub use hyper::uri::RequestUri;
26 	pub use std::net::TcpStream;
27 	pub use std::net::ToSocketAddrs;
28 	pub use unicase::UniCase;
29 	pub use url::Position;
30 }
31 #[cfg(any(feature = "sync", feature = "async"))]
32 use self::common_imports::*;
33 
34 #[cfg(feature = "sync")]
35 use super::sync::Client;
36 
37 #[cfg(feature = "sync-ssl")]
38 use crate::stream::sync::NetworkStream;
39 
40 #[cfg(any(feature = "sync-ssl", feature = "async-ssl"))]
41 use native_tls::TlsConnector;
42 #[cfg(feature = "sync-ssl")]
43 use native_tls::TlsStream;
44 
45 #[cfg(feature = "async")]
46 mod async_imports {
47 	pub use super::super::r#async;
48 	pub use crate::codec::ws::{Context, MessageCodec};
49 	pub use crate::ws::util::update_framed_codec;
50 	pub use futures::future;
51 	pub use futures::Stream as FutureStream;
52 	pub use futures::{Future, IntoFuture, Sink};
53 	pub use tokio_codec::FramedParts;
54 	pub use tokio_codec::{Decoder, Framed};
55 	pub use tokio_reactor::Handle;
56 	pub use tokio_tcp::TcpStream as TcpStreamNew;
57 	#[cfg(feature = "async-ssl")]
58 	pub use tokio_tls::TlsConnector as TlsConnectorExt;
59 }
60 #[cfg(feature = "async")]
61 use self::async_imports::*;
62 
63 use crate::result::towse;
64 
65 /// Build clients with a builder-style API
66 /// This makes it easy to create and configure a websocket
67 /// connection:
68 ///
69 /// The easiest way to connect is like this:
70 ///
71 /// ```rust,no_run
72 /// use websocket::ClientBuilder;
73 ///
74 /// let client = ClientBuilder::new("ws://myapp.com")
75 ///     .unwrap()
76 ///     .connect_insecure()
77 ///     .unwrap();
78 /// ```
79 ///
80 /// But there are so many more possibilities:
81 ///
82 /// ```rust,no_run
83 /// use websocket::ClientBuilder;
84 /// use websocket::header::{Headers, Cookie};
85 ///
86 /// let default_protos = vec!["ping", "chat"];
87 /// let mut my_headers = Headers::new();
88 /// my_headers.set(Cookie(vec!["userid=1".to_owned()]));
89 ///
90 /// let mut builder = ClientBuilder::new("ws://myapp.com/room/discussion")
91 ///     .unwrap()
92 ///     .add_protocols(default_protos) // any IntoIterator
93 ///     .add_protocol("video-chat")
94 ///     .custom_headers(&my_headers);
95 ///
96 /// // connect to a chat server with a user
97 /// let client = builder.connect_insecure().unwrap();
98 ///
99 /// // clone the builder and take it with you
100 /// let not_logged_in = builder
101 ///     .clone()
102 ///     .clear_header::<Cookie>()
103 ///     .connect_insecure().unwrap();
104 /// ```
105 ///
106 /// You may have noticed we're not using SSL, have no fear, SSL is included!
107 /// This crate's openssl dependency is optional (and included by default).
108 /// One can use `connect_secure` to connect to an SSL service, or simply `connect`
109 /// to choose either SSL or not based on the protocol (`ws://` or `wss://`).
110 #[derive(Clone, Debug)]
111 pub struct ClientBuilder<'u> {
112 	url: Cow<'u, Url>,
113 	version: HttpVersion,
114 	headers: Headers,
115 	version_set: bool,
116 	key_set: bool,
117 }
118 
119 impl<'u> ClientBuilder<'u> {
120 	/// Create a client builder from an already parsed Url,
121 	/// because there is no need to parse this will never error.
122 	///
123 	/// ```rust
124 	/// # use websocket::ClientBuilder;
125 	/// use websocket::url::Url;
126 	///
127 	/// // the parsing error will be handled outside the constructor
128 	/// let url = Url::parse("ws://bitcoins.pizza").unwrap();
129 	///
130 	/// let builder = ClientBuilder::from_url(&url);
131 	/// ```
132 	/// The path of a URL is optional if no port is given then port
133 	/// 80 will be used in the case of `ws://` and port `443` will be
134 	/// used in the case of `wss://`.
from_url(address: &'u Url) -> Self135 	pub fn from_url(address: &'u Url) -> Self {
136 		ClientBuilder::init(Cow::Borrowed(address))
137 	}
138 
139 	/// Create a client builder from a URL string, this will
140 	/// attempt to parse the URL immediately and return a `ParseError`
141 	/// if the URL is invalid. URLs must be of the form:
142 	/// `[ws or wss]://[domain]:[port]/[path]`
143 	/// The path of a URL is optional if no port is given then port
144 	/// 80 will be used in the case of `ws://` and port `443` will be
145 	/// used in the case of `wss://`.
146 	///
147 	/// ```rust
148 	/// # use websocket::ClientBuilder;
149 	/// let builder = ClientBuilder::new("wss://mycluster.club");
150 	/// ```
151 	#[warn(clippy::new_ret_no_self)]
new(address: &str) -> Result<Self, ParseError>152 	pub fn new(address: &str) -> Result<Self, ParseError> {
153 		let url = Url::parse(address)?;
154 		Ok(ClientBuilder::init(Cow::Owned(url)))
155 	}
156 
init(url: Cow<'u, Url>) -> Self157 	fn init(url: Cow<'u, Url>) -> Self {
158 		ClientBuilder {
159 			url,
160 			version: HttpVersion::Http11,
161 			version_set: false,
162 			key_set: false,
163 			headers: Headers::new(),
164 		}
165 	}
166 
167 	/// Adds a user-defined protocol to the handshake, the server will be
168 	/// given a list of these protocols and will send back the ones it accepts.
169 	///
170 	/// ```rust
171 	/// # use websocket::ClientBuilder;
172 	/// # use websocket::header::WebSocketProtocol;
173 	/// let builder = ClientBuilder::new("wss://my-twitch-clone.rs").unwrap()
174 	///     .add_protocol("my-chat-proto");
175 	///
176 	/// let protos = &builder.get_header::<WebSocketProtocol>().unwrap().0;
177 	/// assert!(protos.contains(&"my-chat-proto".to_string()));
178 	/// ```
add_protocol<P>(mut self, protocol: P) -> Self where P: Into<String>,179 	pub fn add_protocol<P>(mut self, protocol: P) -> Self
180 	where
181 		P: Into<String>,
182 	{
183 		upsert_header!(self.headers; WebSocketProtocol; {
184 			Some(protos) => protos.0.push(protocol.into()),
185 			None => WebSocketProtocol(vec![protocol.into()])
186 		});
187 		self
188 	}
189 
190 	/// Adds a user-defined protocols to the handshake.
191 	/// This can take many kinds of iterators.
192 	///
193 	/// ```rust
194 	/// # use websocket::ClientBuilder;
195 	/// # use websocket::header::WebSocketProtocol;
196 	/// let builder = ClientBuilder::new("wss://my-twitch-clone.rs").unwrap()
197 	///     .add_protocols(vec!["pubsub", "sub.events"]);
198 	///
199 	/// let protos = &builder.get_header::<WebSocketProtocol>().unwrap().0;
200 	/// assert!(protos.contains(&"pubsub".to_string()));
201 	/// assert!(protos.contains(&"sub.events".to_string()));
202 	/// ```
add_protocols<I, S>(mut self, protocols: I) -> Self where I: IntoIterator<Item = S>, S: Into<String>,203 	pub fn add_protocols<I, S>(mut self, protocols: I) -> Self
204 	where
205 		I: IntoIterator<Item = S>,
206 		S: Into<String>,
207 	{
208 		let mut protocols: Vec<String> = protocols.into_iter().map(Into::into).collect();
209 
210 		upsert_header!(self.headers; WebSocketProtocol; {
211 			Some(protos) => protos.0.append(&mut protocols),
212 			None => WebSocketProtocol(protocols)
213 		});
214 		self
215 	}
216 
217 	/// Removes all the currently set protocols.
clear_protocols(mut self) -> Self218 	pub fn clear_protocols(mut self) -> Self {
219 		self.headers.remove::<WebSocketProtocol>();
220 		self
221 	}
222 
223 	/// Adds an extension to the connection.
224 	/// Unlike protocols, extensions can be below the application level
225 	/// (like compression). Currently no extensions are supported
226 	/// out-of-the-box but one can still use them by using their own
227 	/// implementation. Support is coming soon though.
228 	///
229 	/// ```rust
230 	/// # use websocket::ClientBuilder;
231 	/// # use websocket::header::{WebSocketExtensions};
232 	/// # use websocket::header::extensions::Extension;
233 	/// let builder = ClientBuilder::new("wss://skype-for-linux-lol.com").unwrap()
234 	///     .add_extension(Extension {
235 	///         name: "permessage-deflate".to_string(),
236 	///         params: vec![],
237 	///     });
238 	///
239 	/// let exts = &builder.get_header::<WebSocketExtensions>().unwrap().0;
240 	/// assert!(exts.first().unwrap().name == "permessage-deflate");
241 	/// ```
add_extension(mut self, extension: Extension) -> Self242 	pub fn add_extension(mut self, extension: Extension) -> Self {
243 		upsert_header!(self.headers; WebSocketExtensions; {
244 			Some(protos) => protos.0.push(extension),
245 			None => WebSocketExtensions(vec![extension])
246 		});
247 		self
248 	}
249 
250 	/// Adds some extensions to the connection.
251 	/// Currently no extensions are supported out-of-the-box but one can
252 	/// still use them by using their own implementation. Support is coming soon though.
253 	///
254 	/// ```rust
255 	/// # use websocket::ClientBuilder;
256 	/// # use websocket::header::{WebSocketExtensions};
257 	/// # use websocket::header::extensions::Extension;
258 	/// let builder = ClientBuilder::new("wss://moxie-chat.org").unwrap()
259 	///     .add_extensions(vec![
260 	///         Extension {
261 	///             name: "permessage-deflate".to_string(),
262 	///             params: vec![],
263 	///         },
264 	///         Extension {
265 	///             name: "crypt-omemo".to_string(),
266 	///             params: vec![],
267 	///         },
268 	///     ]);
269 	///
270 	/// # let exts = &builder.get_header::<WebSocketExtensions>().unwrap().0;
271 	/// # assert!(exts.first().unwrap().name == "permessage-deflate");
272 	/// # assert!(exts.last().unwrap().name == "crypt-omemo");
273 	/// ```
add_extensions<I>(mut self, extensions: I) -> Self where I: IntoIterator<Item = Extension>,274 	pub fn add_extensions<I>(mut self, extensions: I) -> Self
275 	where
276 		I: IntoIterator<Item = Extension>,
277 	{
278 		let mut extensions: Vec<Extension> = extensions.into_iter().collect();
279 		upsert_header!(self.headers; WebSocketExtensions; {
280 			Some(protos) => protos.0.append(&mut extensions),
281 			None => WebSocketExtensions(extensions)
282 		});
283 		self
284 	}
285 
286 	/// Remove all the extensions added to the builder.
clear_extensions(mut self) -> Self287 	pub fn clear_extensions(mut self) -> Self {
288 		self.headers.remove::<WebSocketExtensions>();
289 		self
290 	}
291 
292 	/// Add a custom `Sec-WebSocket-Key` header.
293 	/// Use this only if you know what you're doing, and this almost
294 	/// never has to be used.
key(mut self, key: [u8; 16]) -> Self295 	pub fn key(mut self, key: [u8; 16]) -> Self {
296 		self.headers.set(WebSocketKey::from_array(key));
297 		self.key_set = true;
298 		self
299 	}
300 
301 	/// Remove the currently set `Sec-WebSocket-Key` header if any.
clear_key(mut self) -> Self302 	pub fn clear_key(mut self) -> Self {
303 		self.headers.remove::<WebSocketKey>();
304 		self.key_set = false;
305 		self
306 	}
307 
308 	/// Set the version of the Websocket connection.
309 	/// Currently this library only supports version 13 (from RFC6455),
310 	/// but one could use this library to create the handshake then use an
311 	/// implementation of another websocket version.
version(mut self, version: WebSocketVersion) -> Self312 	pub fn version(mut self, version: WebSocketVersion) -> Self {
313 		self.headers.set(version);
314 		self.version_set = true;
315 		self
316 	}
317 
318 	/// Unset the websocket version to be the default (WebSocket 13).
clear_version(mut self) -> Self319 	pub fn clear_version(mut self) -> Self {
320 		self.headers.remove::<WebSocketVersion>();
321 		self.version_set = false;
322 		self
323 	}
324 
325 	/// Sets the Origin header of the handshake.
326 	/// Normally in browsers this is used to protect against
327 	/// unauthorized cross-origin use of a WebSocket server, but it is rarely
328 	/// send by non-browser clients. Still, it can be useful.
origin(mut self, origin: String) -> Self329 	pub fn origin(mut self, origin: String) -> Self {
330 		self.headers.set(Origin(origin));
331 		self
332 	}
333 
334 	/// Remove the Origin header from the handshake.
clear_origin(mut self) -> Self335 	pub fn clear_origin(mut self) -> Self {
336 		self.headers.remove::<Origin>();
337 		self
338 	}
339 
340 	/// This is a catch all to add random headers to your handshake,
341 	/// the process here is more manual.
342 	///
343 	/// ```rust
344 	/// # use websocket::ClientBuilder;
345 	/// # use websocket::header::{Headers, Authorization};
346 	/// let mut headers = Headers::new();
347 	/// headers.set(Authorization("let me in".to_owned()));
348 	///
349 	/// let builder = ClientBuilder::new("ws://moz.illest").unwrap()
350 	///     .custom_headers(&headers);
351 	///
352 	/// # let hds = &builder.get_header::<Authorization<String>>().unwrap().0;
353 	/// # assert!(hds == &"let me in".to_string());
354 	/// ```
custom_headers(mut self, custom_headers: &Headers) -> Self355 	pub fn custom_headers(mut self, custom_headers: &Headers) -> Self {
356 		self.headers.extend(custom_headers.iter());
357 		self
358 	}
359 
360 	/// Remove a type of header from the handshake, this is to be used
361 	/// with the catch all `custom_headers`.
clear_header<H>(mut self) -> Self where H: Header + HeaderFormat,362 	pub fn clear_header<H>(mut self) -> Self
363 	where
364 		H: Header + HeaderFormat,
365 	{
366 		self.headers.remove::<H>();
367 		self
368 	}
369 
370 	/// Get a header to inspect it.
get_header<H>(&self) -> Option<&H> where H: Header + HeaderFormat,371 	pub fn get_header<H>(&self) -> Option<&H>
372 	where
373 		H: Header + HeaderFormat,
374 	{
375 		self.headers.get::<H>()
376 	}
377 
378 	/// Connect to a server (finally)!
379 	/// This will use a `Box<NetworkStream>` to represent either an SSL
380 	/// connection or a normal TCP connection, what to use will be decided
381 	/// using the protocol of the URL passed in (e.g. `ws://` or `wss://`)
382 	///
383 	/// If you have non-default SSL circumstances, you can use the `ssl_config`
384 	/// parameter to configure those.
385 	///
386 	/// ```rust,no_run
387 	/// # use websocket::ClientBuilder;
388 	/// # use websocket::Message;
389 	/// let mut client = ClientBuilder::new("wss://supersecret.l33t").unwrap()
390 	///     .connect(None)
391 	///     .unwrap();
392 	///
393 	/// // send messages!
394 	/// let message = Message::text("m337 47 7pm");
395 	/// client.send_message(&message).unwrap();
396 	/// ```
397 	#[cfg(feature = "sync-ssl")]
connect( &mut self, ssl_config: Option<TlsConnector>, ) -> WebSocketResult<Client<Box<dyn NetworkStream + Send>>>398 	pub fn connect(
399 		&mut self,
400 		ssl_config: Option<TlsConnector>,
401 	) -> WebSocketResult<Client<Box<dyn NetworkStream + Send>>> {
402 		let tcp_stream = self.establish_tcp(None)?;
403 
404 		let boxed_stream: Box<dyn NetworkStream + Send> = if self.is_secure_url() {
405 			Box::new(self.wrap_ssl(tcp_stream, ssl_config)?)
406 		} else {
407 			Box::new(tcp_stream)
408 		};
409 
410 		self.connect_on(boxed_stream)
411 	}
412 
413 	/// Create an insecure (plain TCP) connection to the client.
414 	/// In this case no `Box` will be used, you will just get a TcpStream,
415 	/// giving you the ability to split the stream into a reader and writer
416 	/// (since SSL streams cannot be cloned).
417 	///
418 	/// ```rust,no_run
419 	/// # use websocket::ClientBuilder;
420 	/// let mut client = ClientBuilder::new("wss://supersecret.l33t").unwrap()
421 	///     .connect_insecure()
422 	///     .unwrap();
423 	///
424 	/// // split into two (for some reason)!
425 	/// let (receiver, sender) = client.split().unwrap();
426 	/// ```
427 	#[cfg(feature = "sync")]
connect_insecure(&mut self) -> WebSocketResult<Client<TcpStream>>428 	pub fn connect_insecure(&mut self) -> WebSocketResult<Client<TcpStream>> {
429 		let tcp_stream = self.establish_tcp(Some(false))?;
430 
431 		self.connect_on(tcp_stream)
432 	}
433 
434 	/// Create an SSL connection to the sever.
435 	/// This will only use an `TlsStream`, this is useful
436 	/// when you want to be sure to connect over SSL or when you want access
437 	/// to the `TlsStream` functions (without having to go through a `Box`).
438 	#[cfg(feature = "sync-ssl")]
connect_secure( &mut self, ssl_config: Option<TlsConnector>, ) -> WebSocketResult<Client<TlsStream<TcpStream>>>439 	pub fn connect_secure(
440 		&mut self,
441 		ssl_config: Option<TlsConnector>,
442 	) -> WebSocketResult<Client<TlsStream<TcpStream>>> {
443 		let tcp_stream = self.establish_tcp(Some(true))?;
444 
445 		let ssl_stream = self.wrap_ssl(tcp_stream, ssl_config)?;
446 
447 		self.connect_on(ssl_stream)
448 	}
449 
450 	/// Connects to a websocket server on any stream you would like.
451 	/// Possible streams:
452 	///  - Unix Sockets
453 	///  - Logging Middle-ware
454 	///  - SSH
455 	///
456 	/// ```rust
457 	/// # use websocket::ClientBuilder;
458 	/// use websocket::sync::stream::ReadWritePair;
459 	/// use std::io::Cursor;
460 	///
461 	/// let accept = b"HTTP/1.1 101 Switching Protocols\r
462 	/// Upgrade: websocket\r
463 	/// Connection: Upgrade\r
464 	/// Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=\r
465 	/// \r\n";
466 	///
467 	/// let input = Cursor::new(&accept[..]);
468 	/// let output = Cursor::new(Vec::new());
469 	///
470 	/// let client = ClientBuilder::new("wss://test.ws").unwrap()
471 	///     .key(b"the sample nonce".clone())
472 	///     .connect_on(ReadWritePair(input, output))
473 	///     .unwrap();
474 	///
475 	/// let text = (client.into_stream().0).1.into_inner();
476 	/// let text = String::from_utf8(text).unwrap();
477 	/// assert!(text.contains("dGhlIHNhbXBsZSBub25jZQ=="), "{}", text);
478 	/// ```
479 	#[cfg(feature = "sync")]
connect_on<S>(&mut self, mut stream: S) -> WebSocketResult<Client<S>> where S: Stream,480 	pub fn connect_on<S>(&mut self, mut stream: S) -> WebSocketResult<Client<S>>
481 	where
482 		S: Stream,
483 	{
484 		// send request
485 		let resource = self.build_request();
486 		let data = format!("GET {} {}\r\n{}\r\n", resource, self.version, self.headers);
487 		stream.write_all(data.as_bytes())?;
488 
489 		// wait for a response
490 		let mut reader = BufReader::new(stream);
491 		let response = parse_response(&mut reader).map_err(towse)?;
492 
493 		// validate
494 		self.validate(&response)?;
495 
496 		Ok(Client::unchecked(reader, response.headers, true, false))
497 	}
498 
499 	/// Connect to a websocket server asynchronously.
500 	///
501 	/// This will use a `Box<AsyncRead + AsyncWrite + Send>` to represent either
502 	/// an SSL connection or a normal TCP connection, what to use will be decided
503 	/// using the protocol of the URL passed in (e.g. `ws://` or `wss://`)
504 	///
505 	/// If you have non-default SSL circumstances, you can use the `ssl_config`
506 	/// parameter to configure those.
507 	///
508 	///# Example
509 	///
510 	/// ```no_run
511 	/// # extern crate rand;
512 	/// # extern crate tokio;
513 	/// # extern crate futures;
514 	/// # extern crate websocket;
515 	/// use websocket::ClientBuilder;
516 	/// use websocket::futures::{Future, Stream, Sink};
517 	/// use websocket::Message;
518 	/// use tokio::runtime::Builder;
519 	/// # use rand::Rng;
520 	///
521 	/// # fn main() {
522 	/// let mut runtime = Builder::new().build().unwrap();
523 	///
524 	/// // let's randomly do either SSL or plaintext
525 	/// let url = if rand::thread_rng().gen() {
526 	///     "ws://echo.websocket.org"
527 	/// } else {
528 	///     "wss://echo.websocket.org"
529 	/// };
530 	///
531 	/// // send a message and hear it come back
532 	/// let echo_future = ClientBuilder::new(url).unwrap()
533 	///     .async_connect(None)
534 	///     .and_then(|(s, _)| s.send(Message::text("hallo").into()))
535 	///     .and_then(|s| s.into_future().map_err(|e| e.0))
536 	///     .map(|(m, _)| {
537 	///         assert_eq!(m, Some(Message::text("hallo").into()))
538 	///     });
539 	///
540 	/// runtime.block_on(echo_future).unwrap();
541 	/// # }
542 	/// ```
543 	#[cfg(feature = "async-ssl")]
async_connect( self, ssl_config: Option<TlsConnector>, ) -> r#async::ClientNew<Box<dyn stream::r#async::Stream + Send>>544 	pub fn async_connect(
545 		self,
546 		ssl_config: Option<TlsConnector>,
547 	) -> r#async::ClientNew<Box<dyn stream::r#async::Stream + Send>> {
548 		// connect to the tcp stream
549 		let tcp_stream = self.async_tcpstream(None);
550 
551 		let builder = ClientBuilder {
552 			url: Cow::Owned(self.url.into_owned()),
553 			version: self.version,
554 			headers: self.headers,
555 			version_set: self.version_set,
556 			key_set: self.key_set,
557 		};
558 
559 		// check if we should connect over ssl or not
560 		if builder.is_secure_url() {
561 			// configure the tls connection
562 			let (host, connector) = {
563 				match builder.extract_host_ssl_conn(ssl_config) {
564 					Ok((h, conn)) => (h.to_string(), TlsConnectorExt::from(conn)),
565 					Err(e) => return Box::new(future::err(e)),
566 				}
567 			};
568 			// secure connection, wrap with ssl
569 			let future = tcp_stream
570 				.and_then(move |s| connector.connect(&host, s).map_err(towse))
571 				.and_then(move |stream| {
572 					let stream: Box<dyn stream::r#async::Stream + Send> = Box::new(stream);
573 					builder.async_connect_on(stream)
574 				});
575 			Box::new(future)
576 		} else {
577 			// insecure connection, connect normally
578 			let future = tcp_stream.and_then(move |stream| {
579 				let stream: Box<dyn stream::r#async::Stream + Send> = Box::new(stream);
580 				builder.async_connect_on(stream)
581 			});
582 			Box::new(future)
583 		}
584 	}
585 
586 	/// Asynchronously create an SSL connection to a websocket sever.
587 	///
588 	/// This method will only try to connect over SSL and fail otherwise, useful
589 	/// when you want to be sure to connect over SSL or when you want access
590 	/// to the `TlsStream` functions (without having to go through a `Box`).
591 	///
592 	/// If you have non-default SSL circumstances, you can use the `ssl_config`
593 	/// parameter to configure those.
594 	///
595 	///# Example
596 	///
597 	/// ```no_run
598 	/// # extern crate tokio;
599 	/// # extern crate futures;
600 	/// # extern crate websocket;
601 	/// use websocket::ClientBuilder;
602 	/// use websocket::futures::{Future, Stream, Sink};
603 	/// use websocket::Message;
604 	/// # fn main() {
605 	///
606 	/// let mut runtime = tokio::runtime::Builder::new().build().unwrap();
607 	///
608 	/// // send a message and hear it come back
609 	/// let echo_future = ClientBuilder::new("wss://echo.websocket.org").unwrap()
610 	///     .async_connect_secure(None)
611 	///     .and_then(|(s, _)| s.send(Message::text("hallo").into()))
612 	///     .and_then(|s| s.into_future().map_err(|e| e.0))
613 	///     .map(|(m, _)| {
614 	///         assert_eq!(m, Some(Message::text("hallo").into()))
615 	///     });
616 	///
617 	/// runtime.block_on(echo_future).unwrap();
618 	/// # }
619 	/// ```
620 	#[cfg(feature = "async-ssl")]
async_connect_secure( self, ssl_config: Option<TlsConnector>, ) -> r#async::ClientNew<r#async::TlsStream<r#async::TcpStream>>621 	pub fn async_connect_secure(
622 		self,
623 		ssl_config: Option<TlsConnector>,
624 	) -> r#async::ClientNew<r#async::TlsStream<r#async::TcpStream>> {
625 		// connect to the tcp stream
626 		let tcp_stream = self.async_tcpstream(Some(true));
627 
628 		// configure the tls connection
629 		let (host, connector) = {
630 			match self.extract_host_ssl_conn(ssl_config) {
631 				Ok((h, conn)) => (h.to_string(), TlsConnectorExt::from(conn)),
632 				Err(e) => return Box::new(future::err(e)),
633 			}
634 		};
635 
636 		let builder = ClientBuilder {
637 			url: Cow::Owned(self.url.into_owned()),
638 			version: self.version,
639 			headers: self.headers,
640 			version_set: self.version_set,
641 			key_set: self.key_set,
642 		};
643 
644 		// put it all together
645 		let future = tcp_stream
646 			.and_then(move |s| connector.connect(&host, s).map_err(towse))
647 			.and_then(move |stream| builder.async_connect_on(stream));
648 		Box::new(future)
649 	}
650 
651 	// TODO: add conveniences like .response_to_pings, .send_close, etc.
652 	/// Asynchronously create an insecure (plain TCP) connection to the client.
653 	///
654 	/// In this case no `Box` will be used, you will just get a `TcpStream`,
655 	/// giving you less allocations on the heap and direct access to `TcpStream`
656 	/// functions.
657 	///
658 	///# Example
659 	///
660 	/// ```no_run
661 	/// # extern crate tokio;
662 	/// # extern crate futures;
663 	/// # extern crate websocket;
664 	/// use websocket::ClientBuilder;
665 	/// use websocket::futures::{Future, Stream, Sink};
666 	/// use websocket::Message;
667 	/// # fn main() {
668 	///
669 	/// let mut runtime = tokio::runtime::Builder::new().build().unwrap();
670 	///
671 	/// // send a message and hear it come back
672 	/// let echo_future = ClientBuilder::new("ws://echo.websocket.org").unwrap()
673 	///     .async_connect_insecure()
674 	///     .and_then(|(s, _)| s.send(Message::text("hallo").into()))
675 	///     .and_then(|s| s.into_future().map_err(|e| e.0))
676 	///     .map(|(m, _)| {
677 	///         assert_eq!(m, Some(Message::text("hallo").into()))
678 	///     });
679 	///
680 	/// runtime.block_on(echo_future).unwrap();
681 	/// # }
682 	/// ```
683 	#[cfg(feature = "async")]
async_connect_insecure(self) -> r#async::ClientNew<r#async::TcpStream>684 	pub fn async_connect_insecure(self) -> r#async::ClientNew<r#async::TcpStream> {
685 		let tcp_stream = self.async_tcpstream(Some(false));
686 
687 		let builder = ClientBuilder {
688 			url: Cow::Owned(self.url.into_owned()),
689 			version: self.version,
690 			headers: self.headers,
691 			version_set: self.version_set,
692 			key_set: self.key_set,
693 		};
694 
695 		let future = tcp_stream.and_then(move |stream| builder.async_connect_on(stream));
696 		Box::new(future)
697 	}
698 
699 	/// Asynchronously connects to a websocket server on any stream you would like.
700 	/// Possible streams:
701 	///  - Unix Sockets
702 	///  - Bluetooth
703 	///  - Logging Middle-ware
704 	///  - SSH
705 	///
706 	/// The stream must be `AsyncRead + AsyncWrite + Send + 'static`.
707 	///
708 	/// # Example
709 	///
710 	/// ```rust
711 	/// # extern crate tokio;
712 	/// # extern crate websocket;
713 	/// use websocket::header::WebSocketProtocol;
714 	/// use websocket::ClientBuilder;
715 	/// use websocket::sync::stream::ReadWritePair;
716 	/// use websocket::futures::Future;
717 	/// # use std::io::Cursor;
718 	///
719 	/// let mut runtime = tokio::runtime::Builder::new().build().unwrap();
720 	///
721 	/// let accept = b"\
722 	/// HTTP/1.1 101 Switching Protocols\r\n\
723 	/// Upgrade: websocket\r\n\
724 	/// Sec-WebSocket-Protocol: proto-metheus\r\n\
725 	/// Connection: Upgrade\r\n\
726 	/// Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=\r\n\
727 	/// \r\n";
728 	///
729 	/// let input = Cursor::new(&accept[..]);
730 	/// let output = Cursor::new(Vec::new());
731 	///
732 	/// let client = ClientBuilder::new("wss://test.ws").unwrap()
733 	///     .key(b"the sample nonce".clone())
734 	///     .async_connect_on(ReadWritePair(input, output))
735 	///     .map(|(_, headers)| {
736 	///         let proto: &WebSocketProtocol = headers.get().unwrap();
737 	///         assert_eq!(proto.0.first().unwrap(), "proto-metheus")
738 	///     });
739 	///
740 	/// runtime.block_on(client).unwrap();
741 	/// ```
742 	#[cfg(feature = "async")]
async_connect_on<S>(self, stream: S) -> r#async::ClientNew<S> where S: stream::r#async::Stream + Send + 'static,743 	pub fn async_connect_on<S>(self, stream: S) -> r#async::ClientNew<S>
744 	where
745 		S: stream::r#async::Stream + Send + 'static,
746 	{
747 		let mut builder = ClientBuilder {
748 			url: Cow::Owned(self.url.into_owned()),
749 			version: self.version,
750 			headers: self.headers,
751 			version_set: self.version_set,
752 			key_set: self.key_set,
753 		};
754 		let resource = builder.build_request();
755 		let framed = crate::codec::http::HttpClientCodec.framed(stream);
756 		let request = Incoming {
757 			version: builder.version,
758 			headers: builder.headers.clone(),
759 			subject: (Method::Get, RequestUri::AbsolutePath(resource)),
760 		};
761 
762 		let future = framed
763 			// send request
764 			.send(request)
765 			.map_err(::std::convert::Into::into)
766 			// wait for a response
767 			.and_then(|stream| stream.into_future().map_err(|e| towse(e.0)))
768 			// validate
769 			.and_then(move |(message, stream)| {
770 				message
771 					.ok_or(WebSocketError::ProtocolError(
772 						"Connection closed before handshake could complete.",
773 					))
774 					.and_then(|message| builder.validate(&message).map(|()| (message, stream)))
775 			})
776 			// output the final client and metadata
777 			.map(|(message, stream)| {
778 				let codec = MessageCodec::default(Context::Client);
779 				let client = update_framed_codec(stream, codec);
780 				(client, message.headers)
781 			});
782 
783 		Box::new(future)
784 	}
785 
786 	#[cfg(feature = "async")]
async_tcpstream( &self, secure: Option<bool>, ) -> Box<dyn future::Future<Item = TcpStreamNew, Error = WebSocketError> + Send>787 	fn async_tcpstream(
788 		&self,
789 		secure: Option<bool>,
790 	) -> Box<dyn future::Future<Item = TcpStreamNew, Error = WebSocketError> + Send> {
791 		// get the address to connect to, return an error future if ther's a problem
792 		let address = match self
793 			.extract_host_port(secure)
794 			.and_then(|p| Ok(p.to_socket_addrs()?))
795 		{
796 			Ok(mut s) => match s.next() {
797 				Some(a) => a,
798 				None => {
799 					return Box::new(
800 						Err(WebSocketOtherError::WebSocketUrlError(
801 							WSUrlErrorKind::NoHostName,
802 						))
803 						.map_err(towse)
804 						.into_future(),
805 					);
806 				}
807 			},
808 			Err(e) => return Box::new(Err(e).into_future()),
809 		};
810 
811 		// connect a tcp stream
812 		Box::new(TcpStreamNew::connect(&address).map_err(Into::into))
813 	}
814 
815 	#[cfg(any(feature = "sync", feature = "async"))]
build_request(&mut self) -> String816 	fn build_request(&mut self) -> String {
817 		// enter host if available (unix sockets don't have hosts)
818 		if let Some(host) = self.url.host_str() {
819 			self.headers.set(Host {
820 				hostname: host.to_string(),
821 				port: self.url.port(),
822 			});
823 		}
824 
825 		// handle username/password from URL
826 		if !self.url.username().is_empty() {
827 			self.headers.set(Authorization(Basic {
828 				username: self.url.username().to_owned(),
829 				password: match self.url.password() {
830 					Some(password) => Some(password.to_owned()),
831 					None => None,
832 				},
833 			}));
834 		}
835 
836 		self.headers
837 			.set(Connection(vec![ConnectionOption::ConnectionHeader(
838 				UniCase("Upgrade".to_string()),
839 			)]));
840 
841 		self.headers.set(Upgrade(vec![Protocol {
842 			name: ProtocolName::WebSocket,
843 			version: None,
844 		}]));
845 
846 		if !self.version_set {
847 			self.headers.set(WebSocketVersion::WebSocket13);
848 		}
849 
850 		if !self.key_set {
851 			self.headers.set(WebSocketKey::new());
852 		}
853 
854 		// send request
855 		self.url[Position::BeforePath..Position::AfterQuery].to_owned()
856 	}
857 
858 	#[cfg(any(feature = "sync", feature = "async"))]
validate(&self, response: &Incoming<RawStatus>) -> WebSocketResult<()>859 	fn validate(&self, response: &Incoming<RawStatus>) -> WebSocketResult<()> {
860 		let status = StatusCode::from_u16(response.subject.0);
861 
862 		if status != StatusCode::SwitchingProtocols {
863 			return Err(WebSocketOtherError::StatusCodeError(status)).map_err(towse);
864 		}
865 
866 		let key = self
867 			.headers
868 			.get::<WebSocketKey>()
869 			.ok_or(WebSocketOtherError::RequestError(
870 				"Request Sec-WebSocket-Key was invalid",
871 			))?;
872 
873 		if response.headers.get() != Some(&(WebSocketAccept::new(key))) {
874 			return Err(WebSocketOtherError::ResponseError(
875 				"Sec-WebSocket-Accept is invalid",
876 			))
877 			.map_err(towse);
878 		}
879 
880 		if response.headers.get()
881 			!= Some(
882 				&(Upgrade(vec![Protocol {
883 					name: ProtocolName::WebSocket,
884 					version: None,
885 				}])),
886 			) {
887 			return Err(WebSocketOtherError::ResponseError(
888 				"Upgrade field must be WebSocket",
889 			))
890 			.map_err(towse);
891 		}
892 
893 		if self.headers.get()
894 			!= Some(
895 				&(Connection(vec![ConnectionOption::ConnectionHeader(UniCase(
896 					"Upgrade".to_string(),
897 				))])),
898 			) {
899 			return Err(WebSocketOtherError::ResponseError(
900 				"Connection field must be 'Upgrade'",
901 			))
902 			.map_err(towse);
903 		}
904 
905 		Ok(())
906 	}
907 
908 	/// Check whether the given URL uses a secure scheme, e.g. `wss` or `https`.
909 	/// Note that `https` is not intended scheme for web sockets, but
910 	/// it's still reasonable to wrap TLS if it is encountered.
911 	#[cfg(any(feature = "sync-ssl", feature = "async-ssl"))]
is_secure_url(&self) -> bool912 	fn is_secure_url(&self) -> bool {
913 		let scheme = self.url.scheme();
914 		scheme == "wss" || scheme == "https"
915 	}
916 
917 	#[cfg(any(feature = "sync", feature = "async"))]
extract_host_port(&self, secure: Option<bool>) -> WebSocketResult<::url::HostAndPort<&str>>918 	fn extract_host_port(&self, secure: Option<bool>) -> WebSocketResult<::url::HostAndPort<&str>> {
919 		if self.url.host().is_none() {
920 			return Err(WebSocketOtherError::WebSocketUrlError(
921 				WSUrlErrorKind::NoHostName,
922 			))
923 			.map_err(towse);
924 		}
925 
926 		Ok(self.url.with_default_port(|url| {
927 			const SECURE_PORT: u16 = 443;
928 			const INSECURE_PORT: u16 = 80;
929 			const SECURE_WS_SCHEME: &str = "wss";
930 
931 			Ok(match secure {
932 				None if url.scheme() == SECURE_WS_SCHEME => SECURE_PORT,
933 				None => INSECURE_PORT,
934 				Some(true) => SECURE_PORT,
935 				Some(false) => INSECURE_PORT,
936 			})
937 		})?)
938 	}
939 
940 	#[cfg(feature = "sync")]
establish_tcp(&mut self, secure: Option<bool>) -> WebSocketResult<TcpStream>941 	fn establish_tcp(&mut self, secure: Option<bool>) -> WebSocketResult<TcpStream> {
942 		Ok(TcpStream::connect(self.extract_host_port(secure)?)?)
943 	}
944 
945 	#[cfg(any(feature = "sync-ssl", feature = "async-ssl"))]
extract_host_ssl_conn( &self, connector: Option<TlsConnector>, ) -> WebSocketResult<(&str, TlsConnector)>946 	fn extract_host_ssl_conn(
947 		&self,
948 		connector: Option<TlsConnector>,
949 	) -> WebSocketResult<(&str, TlsConnector)> {
950 		let host = match self.url.host_str() {
951 			Some(h) => h,
952 			None => {
953 				return Err(WebSocketOtherError::WebSocketUrlError(
954 					WSUrlErrorKind::NoHostName,
955 				))
956 				.map_err(towse);
957 			}
958 		};
959 		let connector = match connector {
960 			Some(c) => c,
961 			None => TlsConnector::builder().build().map_err(towse)?,
962 		};
963 		Ok((host, connector))
964 	}
965 
966 	#[cfg(feature = "sync-ssl")]
wrap_ssl( &self, tcp_stream: TcpStream, connector: Option<TlsConnector>, ) -> WebSocketResult<TlsStream<TcpStream>>967 	fn wrap_ssl(
968 		&self,
969 		tcp_stream: TcpStream,
970 		connector: Option<TlsConnector>,
971 	) -> WebSocketResult<TlsStream<TcpStream>> {
972 		let (host, connector) = self.extract_host_ssl_conn(connector)?;
973 		let ssl_stream = connector.connect(host, tcp_stream).map_err(towse)?;
974 		Ok(ssl_stream)
975 	}
976 }
977 
978 mod tests {
979 	#[test]
build_client_with_protocols()980 	fn build_client_with_protocols() {
981 		use super::*;
982 		let builder = ClientBuilder::new("ws://127.0.0.1:8080/hello/world")
983 			.unwrap()
984 			.add_protocol("protobeard");
985 
986 		let protos = &builder.headers.get::<WebSocketProtocol>().unwrap().0;
987 		assert!(protos.contains(&"protobeard".to_string()));
988 		assert!(protos.len() == 1);
989 
990 		let builder = ClientBuilder::new("ws://example.org/hello")
991 			.unwrap()
992 			.add_protocol("rust-websocket")
993 			.clear_protocols()
994 			.add_protocols(vec!["electric", "boogaloo"]);
995 
996 		let protos = &builder.headers.get::<WebSocketProtocol>().unwrap().0;
997 
998 		assert!(protos.contains(&"boogaloo".to_string()));
999 		assert!(protos.contains(&"electric".to_string()));
1000 		assert!(!protos.contains(&"rust-websocket".to_string()));
1001 	}
1002 
1003 	#[test]
build_client_with_username_password()1004 	fn build_client_with_username_password() {
1005 		use super::*;
1006 		let mut builder = ClientBuilder::new("ws://john:pswd@127.0.0.1:8080/hello").unwrap();
1007 		let _request = builder.build_request();
1008 		let auth = builder.headers.get::<Authorization<Basic>>().unwrap();
1009 		assert!(auth.username == "john");
1010 		assert_eq!(auth.password, Some("pswd".to_owned()));
1011 	}
1012 }
1013