1 //! Connectors used by the `Client`. 2 //! 3 //! This module contains: 4 //! 5 //! - A default [`HttpConnector`][] that does DNS resolution and establishes 6 //! connections over TCP. 7 //! - Types to build custom connectors. 8 //! 9 //! # Connectors 10 //! 11 //! A "connector" is a [`Service`][] that takes a [`Uri`][] destination, and 12 //! its `Response` is some type implementing [`AsyncRead`][], [`AsyncWrite`][], 13 //! and [`Connection`][]. 14 //! 15 //! ## Custom Connectors 16 //! 17 //! A simple connector that ignores the `Uri` destination and always returns 18 //! a TCP connection to the same address could be written like this: 19 //! 20 //! ```rust,ignore 21 //! let connector = tower::service_fn(|_dst| async { 22 //! tokio::net::TcpStream::connect("127.0.0.1:1337") 23 //! }) 24 //! ``` 25 //! 26 //! Or, fully written out: 27 //! 28 //! ``` 29 //! # #[cfg(feature = "runtime")] 30 //! # mod rt { 31 //! use std::{future::Future, net::SocketAddr, pin::Pin, task::{self, Poll}}; 32 //! use hyper::{service::Service, Uri}; 33 //! use tokio::net::TcpStream; 34 //! 35 //! #[derive(Clone)] 36 //! struct LocalConnector; 37 //! 38 //! impl Service<Uri> for LocalConnector { 39 //! type Response = TcpStream; 40 //! type Error = std::io::Error; 41 //! // We can't "name" an `async` generated future. 42 //! type Future = Pin<Box< 43 //! dyn Future<Output = Result<Self::Response, Self::Error>> + Send 44 //! >>; 45 //! 46 //! fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { 47 //! // This connector is always ready, but others might not be. 48 //! Poll::Ready(Ok(())) 49 //! } 50 //! 51 //! fn call(&mut self, _: Uri) -> Self::Future { 52 //! Box::pin(TcpStream::connect(SocketAddr::from(([127, 0, 0, 1], 1337)))) 53 //! } 54 //! } 55 //! # } 56 //! ``` 57 //! 58 //! It's worth noting that for `TcpStream`s, the [`HttpConnector`][] is a 59 //! better starting place to extend from. 60 //! 61 //! Using either of the above connector examples, it can be used with the 62 //! `Client` like this: 63 //! 64 //! ``` 65 //! # #[cfg(feature = "runtime")] 66 //! # fn rt () { 67 //! # let connector = hyper::client::HttpConnector::new(); 68 //! // let connector = ... 69 //! 70 //! let client = hyper::Client::builder() 71 //! .build::<_, hyper::Body>(connector); 72 //! # } 73 //! ``` 74 //! 75 //! 76 //! [`HttpConnector`]: HttpConnector 77 //! [`Service`]: crate::service::Service 78 //! [`Uri`]: ::http::Uri 79 //! [`AsyncRead`]: tokio::io::AsyncRead 80 //! [`AsyncWrite`]: tokio::io::AsyncWrite 81 //! [`Connection`]: Connection 82 use std::fmt; 83 84 use ::http::Extensions; 85 86 cfg_feature! { 87 #![feature = "tcp"] 88 89 pub use self::http::{HttpConnector, HttpInfo}; 90 91 pub mod dns; 92 mod http; 93 } 94 95 cfg_feature! { 96 #![any(feature = "http1", feature = "http2")] 97 98 pub use self::sealed::Connect; 99 } 100 101 /// Describes a type returned by a connector. 102 pub trait Connection { 103 /// Return metadata describing the connection. connected(&self) -> Connected104 fn connected(&self) -> Connected; 105 } 106 107 /// Extra information about the connected transport. 108 /// 109 /// This can be used to inform recipients about things like if ALPN 110 /// was used, or if connected to an HTTP proxy. 111 #[derive(Debug)] 112 pub struct Connected { 113 pub(super) alpn: Alpn, 114 pub(super) is_proxied: bool, 115 pub(super) extra: Option<Extra>, 116 } 117 118 pub(super) struct Extra(Box<dyn ExtraInner>); 119 120 #[derive(Clone, Copy, Debug, PartialEq)] 121 pub(super) enum Alpn { 122 H2, 123 None, 124 } 125 126 impl Connected { 127 /// Create new `Connected` type with empty metadata. new() -> Connected128 pub fn new() -> Connected { 129 Connected { 130 alpn: Alpn::None, 131 is_proxied: false, 132 extra: None, 133 } 134 } 135 136 /// Set whether the connected transport is to an HTTP proxy. 137 /// 138 /// This setting will affect if HTTP/1 requests written on the transport 139 /// will have the request-target in absolute-form or origin-form: 140 /// 141 /// - When `proxy(false)`: 142 /// 143 /// ```http 144 /// GET /guide HTTP/1.1 145 /// ``` 146 /// 147 /// - When `proxy(true)`: 148 /// 149 /// ```http 150 /// GET http://hyper.rs/guide HTTP/1.1 151 /// ``` 152 /// 153 /// Default is `false`. proxy(mut self, is_proxied: bool) -> Connected154 pub fn proxy(mut self, is_proxied: bool) -> Connected { 155 self.is_proxied = is_proxied; 156 self 157 } 158 159 /// Determines if the connected transport is to an HTTP proxy. is_proxied(&self) -> bool160 pub fn is_proxied(&self) -> bool { 161 self.is_proxied 162 } 163 164 /// Set extra connection information to be set in the extensions of every `Response`. extra<T: Clone + Send + Sync + 'static>(mut self, extra: T) -> Connected165 pub fn extra<T: Clone + Send + Sync + 'static>(mut self, extra: T) -> Connected { 166 if let Some(prev) = self.extra { 167 self.extra = Some(Extra(Box::new(ExtraChain(prev.0, extra)))); 168 } else { 169 self.extra = Some(Extra(Box::new(ExtraEnvelope(extra)))); 170 } 171 self 172 } 173 174 /// Copies the extra connection information into an `Extensions` map. get_extras(&self, extensions: &mut Extensions)175 pub fn get_extras(&self, extensions: &mut Extensions) { 176 if let Some(extra) = &self.extra { 177 extra.set(extensions); 178 } 179 } 180 181 /// Set that the connected transport negotiated HTTP/2 as its next protocol. negotiated_h2(mut self) -> Connected182 pub fn negotiated_h2(mut self) -> Connected { 183 self.alpn = Alpn::H2; 184 self 185 } 186 187 /// Determines if the connected transport negotiated HTTP/2 as its next protocol. is_negotiated_h2(&self) -> bool188 pub fn is_negotiated_h2(&self) -> bool { 189 self.alpn == Alpn::H2 190 } 191 192 // Don't public expose that `Connected` is `Clone`, unsure if we want to 193 // keep that contract... 194 #[cfg(feature = "http2")] clone(&self) -> Connected195 pub(super) fn clone(&self) -> Connected { 196 Connected { 197 alpn: self.alpn.clone(), 198 is_proxied: self.is_proxied, 199 extra: self.extra.clone(), 200 } 201 } 202 } 203 204 // ===== impl Extra ===== 205 206 impl Extra { set(&self, res: &mut Extensions)207 pub(super) fn set(&self, res: &mut Extensions) { 208 self.0.set(res); 209 } 210 } 211 212 impl Clone for Extra { clone(&self) -> Extra213 fn clone(&self) -> Extra { 214 Extra(self.0.clone_box()) 215 } 216 } 217 218 impl fmt::Debug for Extra { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result219 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 220 f.debug_struct("Extra").finish() 221 } 222 } 223 224 trait ExtraInner: Send + Sync { clone_box(&self) -> Box<dyn ExtraInner>225 fn clone_box(&self) -> Box<dyn ExtraInner>; set(&self, res: &mut Extensions)226 fn set(&self, res: &mut Extensions); 227 } 228 229 // This indirection allows the `Connected` to have a type-erased "extra" value, 230 // while that type still knows its inner extra type. This allows the correct 231 // TypeId to be used when inserting into `res.extensions_mut()`. 232 #[derive(Clone)] 233 struct ExtraEnvelope<T>(T); 234 235 impl<T> ExtraInner for ExtraEnvelope<T> 236 where 237 T: Clone + Send + Sync + 'static, 238 { clone_box(&self) -> Box<dyn ExtraInner>239 fn clone_box(&self) -> Box<dyn ExtraInner> { 240 Box::new(self.clone()) 241 } 242 set(&self, res: &mut Extensions)243 fn set(&self, res: &mut Extensions) { 244 res.insert(self.0.clone()); 245 } 246 } 247 248 struct ExtraChain<T>(Box<dyn ExtraInner>, T); 249 250 impl<T: Clone> Clone for ExtraChain<T> { clone(&self) -> Self251 fn clone(&self) -> Self { 252 ExtraChain(self.0.clone_box(), self.1.clone()) 253 } 254 } 255 256 impl<T> ExtraInner for ExtraChain<T> 257 where 258 T: Clone + Send + Sync + 'static, 259 { clone_box(&self) -> Box<dyn ExtraInner>260 fn clone_box(&self) -> Box<dyn ExtraInner> { 261 Box::new(self.clone()) 262 } 263 set(&self, res: &mut Extensions)264 fn set(&self, res: &mut Extensions) { 265 self.0.set(res); 266 res.insert(self.1.clone()); 267 } 268 } 269 270 #[cfg(any(feature = "http1", feature = "http2"))] 271 pub(super) mod sealed { 272 use std::error::Error as StdError; 273 274 use ::http::Uri; 275 use tokio::io::{AsyncRead, AsyncWrite}; 276 277 use super::Connection; 278 use crate::common::{Future, Unpin}; 279 280 /// Connect to a destination, returning an IO transport. 281 /// 282 /// A connector receives a [`Uri`](::http::Uri) and returns a `Future` of the 283 /// ready connection. 284 /// 285 /// # Trait Alias 286 /// 287 /// This is really just an *alias* for the `tower::Service` trait, with 288 /// additional bounds set for convenience *inside* hyper. You don't actually 289 /// implement this trait, but `tower::Service<Uri>` instead. 290 // The `Sized` bound is to prevent creating `dyn Connect`, since they cannot 291 // fit the `Connect` bounds because of the blanket impl for `Service`. 292 pub trait Connect: Sealed + Sized { 293 #[doc(hidden)] 294 type _Svc: ConnectSvc; 295 #[doc(hidden)] connect(self, internal_only: Internal, dst: Uri) -> <Self::_Svc as ConnectSvc>::Future296 fn connect(self, internal_only: Internal, dst: Uri) -> <Self::_Svc as ConnectSvc>::Future; 297 } 298 299 pub trait ConnectSvc { 300 type Connection: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static; 301 type Error: Into<Box<dyn StdError + Send + Sync>>; 302 type Future: Future<Output = Result<Self::Connection, Self::Error>> + Unpin + Send + 'static; 303 connect(self, internal_only: Internal, dst: Uri) -> Self::Future304 fn connect(self, internal_only: Internal, dst: Uri) -> Self::Future; 305 } 306 307 impl<S, T> Connect for S 308 where 309 S: tower_service::Service<Uri, Response = T> + Send + 'static, 310 S::Error: Into<Box<dyn StdError + Send + Sync>>, 311 S::Future: Unpin + Send, 312 T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, 313 { 314 type _Svc = S; 315 connect(self, _: Internal, dst: Uri) -> crate::service::Oneshot<S, Uri>316 fn connect(self, _: Internal, dst: Uri) -> crate::service::Oneshot<S, Uri> { 317 crate::service::oneshot(self, dst) 318 } 319 } 320 321 impl<S, T> ConnectSvc for S 322 where 323 S: tower_service::Service<Uri, Response = T> + Send + 'static, 324 S::Error: Into<Box<dyn StdError + Send + Sync>>, 325 S::Future: Unpin + Send, 326 T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, 327 { 328 type Connection = T; 329 type Error = S::Error; 330 type Future = crate::service::Oneshot<S, Uri>; 331 connect(self, _: Internal, dst: Uri) -> Self::Future332 fn connect(self, _: Internal, dst: Uri) -> Self::Future { 333 crate::service::oneshot(self, dst) 334 } 335 } 336 337 impl<S, T> Sealed for S 338 where 339 S: tower_service::Service<Uri, Response = T> + Send, 340 S::Error: Into<Box<dyn StdError + Send + Sync>>, 341 S::Future: Unpin + Send, 342 T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, 343 { 344 } 345 346 pub trait Sealed {} 347 #[allow(missing_debug_implementations)] 348 pub struct Internal; 349 } 350 351 #[cfg(test)] 352 mod tests { 353 use super::Connected; 354 355 #[derive(Clone, Debug, PartialEq)] 356 struct Ex1(usize); 357 358 #[derive(Clone, Debug, PartialEq)] 359 struct Ex2(&'static str); 360 361 #[derive(Clone, Debug, PartialEq)] 362 struct Ex3(&'static str); 363 364 #[test] test_connected_extra()365 fn test_connected_extra() { 366 let c1 = Connected::new().extra(Ex1(41)); 367 368 let mut ex = ::http::Extensions::new(); 369 370 assert_eq!(ex.get::<Ex1>(), None); 371 372 c1.extra.as_ref().expect("c1 extra").set(&mut ex); 373 374 assert_eq!(ex.get::<Ex1>(), Some(&Ex1(41))); 375 } 376 377 #[test] test_connected_extra_chain()378 fn test_connected_extra_chain() { 379 // If a user composes connectors and at each stage, there's "extra" 380 // info to attach, it shouldn't override the previous extras. 381 382 let c1 = Connected::new() 383 .extra(Ex1(45)) 384 .extra(Ex2("zoom")) 385 .extra(Ex3("pew pew")); 386 387 let mut ex1 = ::http::Extensions::new(); 388 389 assert_eq!(ex1.get::<Ex1>(), None); 390 assert_eq!(ex1.get::<Ex2>(), None); 391 assert_eq!(ex1.get::<Ex3>(), None); 392 393 c1.extra.as_ref().expect("c1 extra").set(&mut ex1); 394 395 assert_eq!(ex1.get::<Ex1>(), Some(&Ex1(45))); 396 assert_eq!(ex1.get::<Ex2>(), Some(&Ex2("zoom"))); 397 assert_eq!(ex1.get::<Ex3>(), Some(&Ex3("pew pew"))); 398 399 // Just like extensions, inserting the same type overrides previous type. 400 let c2 = Connected::new() 401 .extra(Ex1(33)) 402 .extra(Ex2("hiccup")) 403 .extra(Ex1(99)); 404 405 let mut ex2 = ::http::Extensions::new(); 406 407 c2.extra.as_ref().expect("c2 extra").set(&mut ex2); 408 409 assert_eq!(ex2.get::<Ex1>(), Some(&Ex1(99))); 410 assert_eq!(ex2.get::<Ex2>(), Some(&Ex2("hiccup"))); 411 } 412 } 413