1 use std::error::Error as StdError;
2 use std::fmt;
3 use std::future::Future;
4 use std::io;
5 use std::marker::PhantomData;
6 use std::net::{IpAddr, SocketAddr};
7 use std::pin::Pin;
8 use std::sync::Arc;
9 use std::task::{self, Poll};
10 use std::time::Duration;
11 
12 use futures_util::future::Either;
13 use http::uri::{Scheme, Uri};
14 use pin_project::pin_project;
15 use tokio::net::TcpStream;
16 use tokio::time::Delay;
17 
18 use super::dns::{self, resolve, GaiResolver, Resolve};
19 use super::{Connected, Connection};
20 //#[cfg(feature = "runtime")] use super::dns::TokioThreadpoolGaiResolver;
21 
22 /// A connector for the `http` scheme.
23 ///
24 /// Performs DNS resolution in a thread pool, and then connects over TCP.
25 ///
26 /// # Note
27 ///
28 /// Sets the [`HttpInfo`](HttpInfo) value on responses, which includes
29 /// transport information such as the remote socket address used.
30 #[derive(Clone)]
31 pub struct HttpConnector<R = GaiResolver> {
32     config: Arc<Config>,
33     resolver: R,
34 }
35 
36 /// Extra information about the transport when an HttpConnector is used.
37 ///
38 /// # Example
39 ///
40 /// ```
41 /// # async fn doc() -> hyper::Result<()> {
42 /// use hyper::Uri;
43 /// use hyper::client::{Client, connect::HttpInfo};
44 ///
45 /// let client = Client::new();
46 /// let uri = Uri::from_static("http://example.com");
47 ///
48 /// let res = client.get(uri).await?;
49 /// res
50 ///     .extensions()
51 ///     .get::<HttpInfo>()
52 ///     .map(|info| {
53 ///         println!("remote addr = {}", info.remote_addr());
54 ///     });
55 /// # Ok(())
56 /// # }
57 /// ```
58 ///
59 /// # Note
60 ///
61 /// If a different connector is used besides [`HttpConnector`](HttpConnector),
62 /// this value will not exist in the extensions. Consult that specific
63 /// connector to see what "extra" information it might provide to responses.
64 #[derive(Clone, Debug)]
65 pub struct HttpInfo {
66     remote_addr: SocketAddr,
67 }
68 
69 #[derive(Clone)]
70 struct Config {
71     connect_timeout: Option<Duration>,
72     enforce_http: bool,
73     happy_eyeballs_timeout: Option<Duration>,
74     keep_alive_timeout: Option<Duration>,
75     local_address: Option<IpAddr>,
76     nodelay: bool,
77     reuse_address: bool,
78     send_buffer_size: Option<usize>,
79     recv_buffer_size: Option<usize>,
80 }
81 
82 // ===== impl HttpConnector =====
83 
84 impl HttpConnector {
85     /// Construct a new HttpConnector.
new() -> HttpConnector86     pub fn new() -> HttpConnector {
87         HttpConnector::new_with_resolver(GaiResolver::new())
88     }
89 }
90 
91 /*
92 #[cfg(feature = "runtime")]
93 impl HttpConnector<TokioThreadpoolGaiResolver> {
94     /// Construct a new HttpConnector using the `TokioThreadpoolGaiResolver`.
95     ///
96     /// This resolver **requires** the threadpool runtime to be used.
97     pub fn new_with_tokio_threadpool_resolver() -> Self {
98         HttpConnector::new_with_resolver(TokioThreadpoolGaiResolver::new())
99     }
100 }
101 */
102 
103 impl<R> HttpConnector<R> {
104     /// Construct a new HttpConnector.
105     ///
106     /// Takes a `Resolve` to handle DNS lookups.
new_with_resolver(resolver: R) -> HttpConnector<R>107     pub fn new_with_resolver(resolver: R) -> HttpConnector<R> {
108         HttpConnector {
109             config: Arc::new(Config {
110                 connect_timeout: None,
111                 enforce_http: true,
112                 happy_eyeballs_timeout: Some(Duration::from_millis(300)),
113                 keep_alive_timeout: None,
114                 local_address: None,
115                 nodelay: false,
116                 reuse_address: false,
117                 send_buffer_size: None,
118                 recv_buffer_size: None,
119             }),
120             resolver,
121         }
122     }
123 
124     /// Option to enforce all `Uri`s have the `http` scheme.
125     ///
126     /// Enabled by default.
127     #[inline]
enforce_http(&mut self, is_enforced: bool)128     pub fn enforce_http(&mut self, is_enforced: bool) {
129         self.config_mut().enforce_http = is_enforced;
130     }
131 
132     /// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration.
133     ///
134     /// If `None`, the option will not be set.
135     ///
136     /// Default is `None`.
137     #[inline]
set_keepalive(&mut self, dur: Option<Duration>)138     pub fn set_keepalive(&mut self, dur: Option<Duration>) {
139         self.config_mut().keep_alive_timeout = dur;
140     }
141 
142     /// Set that all sockets have `SO_NODELAY` set to the supplied value `nodelay`.
143     ///
144     /// Default is `false`.
145     #[inline]
set_nodelay(&mut self, nodelay: bool)146     pub fn set_nodelay(&mut self, nodelay: bool) {
147         self.config_mut().nodelay = nodelay;
148     }
149 
150     /// Sets the value of the SO_SNDBUF option on the socket.
151     #[inline]
set_send_buffer_size(&mut self, size: Option<usize>)152     pub fn set_send_buffer_size(&mut self, size: Option<usize>) {
153         self.config_mut().send_buffer_size = size;
154     }
155 
156     /// Sets the value of the SO_RCVBUF option on the socket.
157     #[inline]
set_recv_buffer_size(&mut self, size: Option<usize>)158     pub fn set_recv_buffer_size(&mut self, size: Option<usize>) {
159         self.config_mut().recv_buffer_size = size;
160     }
161 
162     /// Set that all sockets are bound to the configured address before connection.
163     ///
164     /// If `None`, the sockets will not be bound.
165     ///
166     /// Default is `None`.
167     #[inline]
set_local_address(&mut self, addr: Option<IpAddr>)168     pub fn set_local_address(&mut self, addr: Option<IpAddr>) {
169         self.config_mut().local_address = addr;
170     }
171 
172     /// Set the connect timeout.
173     ///
174     /// If a domain resolves to multiple IP addresses, the timeout will be
175     /// evenly divided across them.
176     ///
177     /// Default is `None`.
178     #[inline]
set_connect_timeout(&mut self, dur: Option<Duration>)179     pub fn set_connect_timeout(&mut self, dur: Option<Duration>) {
180         self.config_mut().connect_timeout = dur;
181     }
182 
183     /// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm.
184     ///
185     /// If hostname resolves to both IPv4 and IPv6 addresses and connection
186     /// cannot be established using preferred address family before timeout
187     /// elapses, then connector will in parallel attempt connection using other
188     /// address family.
189     ///
190     /// If `None`, parallel connection attempts are disabled.
191     ///
192     /// Default is 300 milliseconds.
193     ///
194     /// [RFC 6555]: https://tools.ietf.org/html/rfc6555
195     #[inline]
set_happy_eyeballs_timeout(&mut self, dur: Option<Duration>)196     pub fn set_happy_eyeballs_timeout(&mut self, dur: Option<Duration>) {
197         self.config_mut().happy_eyeballs_timeout = dur;
198     }
199 
200     /// Set that all socket have `SO_REUSEADDR` set to the supplied value `reuse_address`.
201     ///
202     /// Default is `false`.
203     #[inline]
set_reuse_address(&mut self, reuse_address: bool) -> &mut Self204     pub fn set_reuse_address(&mut self, reuse_address: bool) -> &mut Self {
205         self.config_mut().reuse_address = reuse_address;
206         self
207     }
208 
209     // private
210 
config_mut(&mut self) -> &mut Config211     fn config_mut(&mut self) -> &mut Config {
212         // If the are HttpConnector clones, this will clone the inner
213         // config. So mutating the config won't ever affect previous
214         // clones.
215         Arc::make_mut(&mut self.config)
216     }
217 }
218 
219 static INVALID_NOT_HTTP: &str = "invalid URL, scheme is not http";
220 static INVALID_MISSING_SCHEME: &str = "invalid URL, scheme is missing";
221 static INVALID_MISSING_HOST: &str = "invalid URL, host is missing";
222 
223 // R: Debug required for now to allow adding it to debug output later...
224 impl<R: fmt::Debug> fmt::Debug for HttpConnector<R> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result225     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
226         f.debug_struct("HttpConnector").finish()
227     }
228 }
229 
230 impl<R> tower_service::Service<Uri> for HttpConnector<R>
231 where
232     R: Resolve + Clone + Send + Sync + 'static,
233     R::Future: Send,
234 {
235     type Response = TcpStream;
236     type Error = ConnectError;
237     type Future = HttpConnecting<R>;
238 
poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>239     fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
240         ready!(self.resolver.poll_ready(cx)).map_err(ConnectError::dns)?;
241         Poll::Ready(Ok(()))
242     }
243 
call(&mut self, dst: Uri) -> Self::Future244     fn call(&mut self, dst: Uri) -> Self::Future {
245         let mut self_ = self.clone();
246         HttpConnecting {
247             fut: Box::pin(async move { self_.call_async(dst).await }),
248             _marker: PhantomData,
249         }
250     }
251 }
252 
253 impl<R> HttpConnector<R>
254 where
255     R: Resolve,
256 {
call_async(&mut self, dst: Uri) -> Result<TcpStream, ConnectError>257     async fn call_async(&mut self, dst: Uri) -> Result<TcpStream, ConnectError> {
258         trace!(
259             "Http::connect; scheme={:?}, host={:?}, port={:?}",
260             dst.scheme(),
261             dst.host(),
262             dst.port(),
263         );
264 
265         if self.config.enforce_http {
266             if dst.scheme() != Some(&Scheme::HTTP) {
267                 return Err(ConnectError {
268                     msg: INVALID_NOT_HTTP.into(),
269                     cause: None,
270                 });
271             }
272         } else if dst.scheme().is_none() {
273             return Err(ConnectError {
274                 msg: INVALID_MISSING_SCHEME.into(),
275                 cause: None,
276             });
277         }
278 
279         let host = match dst.host() {
280             Some(s) => s,
281             None => {
282                 return Err(ConnectError {
283                     msg: INVALID_MISSING_HOST.into(),
284                     cause: None,
285                 })
286             }
287         };
288         let port = match dst.port() {
289             Some(port) => port.as_u16(),
290             None => {
291                 if dst.scheme() == Some(&Scheme::HTTPS) {
292                     443
293                 } else {
294                     80
295                 }
296             }
297         };
298 
299         let config = &self.config;
300 
301         // If the host is already an IP addr (v4 or v6),
302         // skip resolving the dns and start connecting right away.
303         let addrs = if let Some(addrs) = dns::IpAddrs::try_parse(host, port) {
304             addrs
305         } else {
306             let addrs = resolve(&mut self.resolver, dns::Name::new(host.into()))
307                 .await
308                 .map_err(ConnectError::dns)?;
309             let addrs = addrs.map(|addr| SocketAddr::new(addr, port)).collect();
310             dns::IpAddrs::new(addrs)
311         };
312 
313         let c = ConnectingTcp::new(
314             config.local_address,
315             addrs,
316             config.connect_timeout,
317             config.happy_eyeballs_timeout,
318             config.reuse_address,
319         );
320 
321         let sock = c
322             .connect()
323             .await
324             .map_err(ConnectError::m("tcp connect error"))?;
325 
326         if let Some(dur) = config.keep_alive_timeout {
327             sock.set_keepalive(Some(dur))
328                 .map_err(ConnectError::m("tcp set_keepalive error"))?;
329         }
330 
331         if let Some(size) = config.send_buffer_size {
332             sock.set_send_buffer_size(size)
333                 .map_err(ConnectError::m("tcp set_send_buffer_size error"))?;
334         }
335 
336         if let Some(size) = config.recv_buffer_size {
337             sock.set_recv_buffer_size(size)
338                 .map_err(ConnectError::m("tcp set_recv_buffer_size error"))?;
339         }
340 
341         sock.set_nodelay(config.nodelay)
342             .map_err(ConnectError::m("tcp set_nodelay error"))?;
343 
344         Ok(sock)
345     }
346 }
347 
348 impl Connection for TcpStream {
connected(&self) -> Connected349     fn connected(&self) -> Connected {
350         let connected = Connected::new();
351         if let Ok(remote_addr) = self.peer_addr() {
352             connected.extra(HttpInfo { remote_addr })
353         } else {
354             connected
355         }
356     }
357 }
358 
359 impl HttpInfo {
360     /// Get the remote address of the transport used.
remote_addr(&self) -> SocketAddr361     pub fn remote_addr(&self) -> SocketAddr {
362         self.remote_addr
363     }
364 }
365 
366 // Not publicly exported (so missing_docs doesn't trigger).
367 //
368 // We return this `Future` instead of the `Pin<Box<dyn Future>>` directly
369 // so that users don't rely on it fitting in a `Pin<Box<dyn Future>>` slot
370 // (and thus we can change the type in the future).
371 #[must_use = "futures do nothing unless polled"]
372 #[pin_project]
373 #[allow(missing_debug_implementations)]
374 pub struct HttpConnecting<R> {
375     #[pin]
376     fut: BoxConnecting,
377     _marker: PhantomData<R>,
378 }
379 
380 type ConnectResult = Result<TcpStream, ConnectError>;
381 type BoxConnecting = Pin<Box<dyn Future<Output = ConnectResult> + Send>>;
382 
383 impl<R: Resolve> Future for HttpConnecting<R> {
384     type Output = ConnectResult;
385 
poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>386     fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
387         self.project().fut.poll(cx)
388     }
389 }
390 
391 // Not publicly exported (so missing_docs doesn't trigger).
392 pub struct ConnectError {
393     msg: Box<str>,
394     cause: Option<Box<dyn StdError + Send + Sync>>,
395 }
396 
397 impl ConnectError {
new<S, E>(msg: S, cause: E) -> ConnectError where S: Into<Box<str>>, E: Into<Box<dyn StdError + Send + Sync>>,398     fn new<S, E>(msg: S, cause: E) -> ConnectError
399     where
400         S: Into<Box<str>>,
401         E: Into<Box<dyn StdError + Send + Sync>>,
402     {
403         ConnectError {
404             msg: msg.into(),
405             cause: Some(cause.into()),
406         }
407     }
408 
dns<E>(cause: E) -> ConnectError where E: Into<Box<dyn StdError + Send + Sync>>,409     fn dns<E>(cause: E) -> ConnectError
410     where
411         E: Into<Box<dyn StdError + Send + Sync>>,
412     {
413         ConnectError::new("dns error", cause)
414     }
415 
m<S, E>(msg: S) -> impl FnOnce(E) -> ConnectError where S: Into<Box<str>>, E: Into<Box<dyn StdError + Send + Sync>>,416     fn m<S, E>(msg: S) -> impl FnOnce(E) -> ConnectError
417     where
418         S: Into<Box<str>>,
419         E: Into<Box<dyn StdError + Send + Sync>>,
420     {
421         move |cause| ConnectError::new(msg, cause)
422     }
423 }
424 
425 impl fmt::Debug for ConnectError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result426     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
427         if let Some(ref cause) = self.cause {
428             f.debug_tuple("ConnectError")
429                 .field(&self.msg)
430                 .field(cause)
431                 .finish()
432         } else {
433             self.msg.fmt(f)
434         }
435     }
436 }
437 
438 impl fmt::Display for ConnectError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result439     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
440         f.write_str(&self.msg)?;
441 
442         if let Some(ref cause) = self.cause {
443             write!(f, ": {}", cause)?;
444         }
445 
446         Ok(())
447     }
448 }
449 
450 impl StdError for ConnectError {
source(&self) -> Option<&(dyn StdError + 'static)>451     fn source(&self) -> Option<&(dyn StdError + 'static)> {
452         self.cause.as_ref().map(|e| &**e as _)
453     }
454 }
455 
456 struct ConnectingTcp {
457     local_addr: Option<IpAddr>,
458     preferred: ConnectingTcpRemote,
459     fallback: Option<ConnectingTcpFallback>,
460     reuse_address: bool,
461 }
462 
463 impl ConnectingTcp {
new( local_addr: Option<IpAddr>, remote_addrs: dns::IpAddrs, connect_timeout: Option<Duration>, fallback_timeout: Option<Duration>, reuse_address: bool, ) -> ConnectingTcp464     fn new(
465         local_addr: Option<IpAddr>,
466         remote_addrs: dns::IpAddrs,
467         connect_timeout: Option<Duration>,
468         fallback_timeout: Option<Duration>,
469         reuse_address: bool,
470     ) -> ConnectingTcp {
471         if let Some(fallback_timeout) = fallback_timeout {
472             let (preferred_addrs, fallback_addrs) = remote_addrs.split_by_preference(local_addr);
473             if fallback_addrs.is_empty() {
474                 return ConnectingTcp {
475                     local_addr,
476                     preferred: ConnectingTcpRemote::new(preferred_addrs, connect_timeout),
477                     fallback: None,
478                     reuse_address,
479                 };
480             }
481 
482             ConnectingTcp {
483                 local_addr,
484                 preferred: ConnectingTcpRemote::new(preferred_addrs, connect_timeout),
485                 fallback: Some(ConnectingTcpFallback {
486                     delay: tokio::time::delay_for(fallback_timeout),
487                     remote: ConnectingTcpRemote::new(fallback_addrs, connect_timeout),
488                 }),
489                 reuse_address,
490             }
491         } else {
492             ConnectingTcp {
493                 local_addr,
494                 preferred: ConnectingTcpRemote::new(remote_addrs, connect_timeout),
495                 fallback: None,
496                 reuse_address,
497             }
498         }
499     }
500 }
501 
502 struct ConnectingTcpFallback {
503     delay: Delay,
504     remote: ConnectingTcpRemote,
505 }
506 
507 struct ConnectingTcpRemote {
508     addrs: dns::IpAddrs,
509     connect_timeout: Option<Duration>,
510 }
511 
512 impl ConnectingTcpRemote {
new(addrs: dns::IpAddrs, connect_timeout: Option<Duration>) -> Self513     fn new(addrs: dns::IpAddrs, connect_timeout: Option<Duration>) -> Self {
514         let connect_timeout = connect_timeout.map(|t| t / (addrs.len() as u32));
515 
516         Self {
517             addrs,
518             connect_timeout,
519         }
520     }
521 }
522 
523 impl ConnectingTcpRemote {
connect( &mut self, local_addr: &Option<IpAddr>, reuse_address: bool, ) -> io::Result<TcpStream>524     async fn connect(
525         &mut self,
526         local_addr: &Option<IpAddr>,
527         reuse_address: bool,
528     ) -> io::Result<TcpStream> {
529         let mut err = None;
530         for addr in &mut self.addrs {
531             debug!("connecting to {}", addr);
532             match connect(&addr, local_addr, reuse_address, self.connect_timeout)?.await {
533                 Ok(tcp) => {
534                     debug!("connected to {}", addr);
535                     return Ok(tcp);
536                 }
537                 Err(e) => {
538                     trace!("connect error for {}: {:?}", addr, e);
539                     err = Some(e);
540                 }
541             }
542         }
543 
544         Err(err.take().expect("missing connect error"))
545     }
546 }
547 
connect( addr: &SocketAddr, local_addr: &Option<IpAddr>, reuse_address: bool, connect_timeout: Option<Duration>, ) -> io::Result<impl Future<Output = io::Result<TcpStream>>>548 fn connect(
549     addr: &SocketAddr,
550     local_addr: &Option<IpAddr>,
551     reuse_address: bool,
552     connect_timeout: Option<Duration>,
553 ) -> io::Result<impl Future<Output = io::Result<TcpStream>>> {
554     use socket2::{Domain, Protocol, Socket, Type};
555     let domain = match *addr {
556         SocketAddr::V4(_) => Domain::ipv4(),
557         SocketAddr::V6(_) => Domain::ipv6(),
558     };
559     let socket = Socket::new(domain, Type::stream(), Some(Protocol::tcp()))?;
560 
561     if reuse_address {
562         socket.set_reuse_address(true)?;
563     }
564 
565     if let Some(ref local_addr) = *local_addr {
566         // Caller has requested this socket be bound before calling connect
567         socket.bind(&SocketAddr::new(local_addr.clone(), 0).into())?;
568     } else if cfg!(windows) {
569         // Windows requires a socket be bound before calling connect
570         let any: SocketAddr = match *addr {
571             SocketAddr::V4(_) => ([0, 0, 0, 0], 0).into(),
572             SocketAddr::V6(_) => ([0, 0, 0, 0, 0, 0, 0, 0], 0).into(),
573         };
574         socket.bind(&any.into())?;
575     }
576 
577     let addr = *addr;
578 
579     let std_tcp = socket.into_tcp_stream();
580 
581     Ok(async move {
582         let connect = TcpStream::connect_std(std_tcp, &addr);
583         match connect_timeout {
584             Some(dur) => match tokio::time::timeout(dur, connect).await {
585                 Ok(Ok(s)) => Ok(s),
586                 Ok(Err(e)) => Err(e),
587                 Err(e) => Err(io::Error::new(io::ErrorKind::TimedOut, e)),
588             },
589             None => connect.await,
590         }
591     })
592 }
593 
594 impl ConnectingTcp {
connect(mut self) -> io::Result<TcpStream>595     async fn connect(mut self) -> io::Result<TcpStream> {
596         let Self {
597             ref local_addr,
598             reuse_address,
599             ..
600         } = self;
601         match self.fallback {
602             None => self.preferred.connect(local_addr, reuse_address).await,
603             Some(mut fallback) => {
604                 let preferred_fut = self.preferred.connect(local_addr, reuse_address);
605                 futures_util::pin_mut!(preferred_fut);
606 
607                 let fallback_fut = fallback.remote.connect(local_addr, reuse_address);
608                 futures_util::pin_mut!(fallback_fut);
609 
610                 let (result, future) =
611                     match futures_util::future::select(preferred_fut, fallback.delay).await {
612                         Either::Left((result, _fallback_delay)) => {
613                             (result, Either::Right(fallback_fut))
614                         }
615                         Either::Right(((), preferred_fut)) => {
616                             // Delay is done, start polling both the preferred and the fallback
617                             futures_util::future::select(preferred_fut, fallback_fut)
618                                 .await
619                                 .factor_first()
620                         }
621                     };
622 
623                 if result.is_err() {
624                     // Fallback to the remaining future (could be preferred or fallback)
625                     // if we get an error
626                     future.await
627                 } else {
628                     result
629                 }
630             }
631         }
632     }
633 }
634 
635 #[cfg(test)]
636 mod tests {
637     use std::io;
638 
639     use ::http::Uri;
640 
641     use super::super::sealed::{Connect, ConnectSvc};
642     use super::HttpConnector;
643 
connect<C>( connector: C, dst: Uri, ) -> Result<<C::_Svc as ConnectSvc>::Connection, <C::_Svc as ConnectSvc>::Error> where C: Connect,644     async fn connect<C>(
645         connector: C,
646         dst: Uri,
647     ) -> Result<<C::_Svc as ConnectSvc>::Connection, <C::_Svc as ConnectSvc>::Error>
648     where
649         C: Connect,
650     {
651         connector.connect(super::super::sealed::Internal, dst).await
652     }
653 
654     #[tokio::test]
test_errors_enforce_http()655     async fn test_errors_enforce_http() {
656         let dst = "https://example.domain/foo/bar?baz".parse().unwrap();
657         let connector = HttpConnector::new();
658 
659         let err = connect(connector, dst).await.unwrap_err();
660         assert_eq!(&*err.msg, super::INVALID_NOT_HTTP);
661     }
662 
663     #[tokio::test]
test_errors_missing_scheme()664     async fn test_errors_missing_scheme() {
665         let dst = "example.domain".parse().unwrap();
666         let mut connector = HttpConnector::new();
667         connector.enforce_http(false);
668 
669         let err = connect(connector, dst).await.unwrap_err();
670         assert_eq!(&*err.msg, super::INVALID_MISSING_SCHEME);
671     }
672 
673     #[test]
674     #[cfg_attr(not(feature = "__internal_happy_eyeballs_tests"), ignore)]
client_happy_eyeballs()675     fn client_happy_eyeballs() {
676         use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, TcpListener};
677         use std::time::{Duration, Instant};
678 
679         use super::dns;
680         use super::ConnectingTcp;
681 
682         let _ = pretty_env_logger::try_init();
683         let server4 = TcpListener::bind("127.0.0.1:0").unwrap();
684         let addr = server4.local_addr().unwrap();
685         let _server6 = TcpListener::bind(&format!("[::1]:{}", addr.port())).unwrap();
686         let mut rt = tokio::runtime::Builder::new()
687             .enable_io()
688             .enable_time()
689             .basic_scheduler()
690             .build()
691             .unwrap();
692 
693         let local_timeout = Duration::default();
694         let unreachable_v4_timeout = measure_connect(unreachable_ipv4_addr()).1;
695         let unreachable_v6_timeout = measure_connect(unreachable_ipv6_addr()).1;
696         let fallback_timeout = std::cmp::max(unreachable_v4_timeout, unreachable_v6_timeout)
697             + Duration::from_millis(250);
698 
699         let scenarios = &[
700             // Fast primary, without fallback.
701             (&[local_ipv4_addr()][..], 4, local_timeout, false),
702             (&[local_ipv6_addr()][..], 6, local_timeout, false),
703             // Fast primary, with (unused) fallback.
704             (
705                 &[local_ipv4_addr(), local_ipv6_addr()][..],
706                 4,
707                 local_timeout,
708                 false,
709             ),
710             (
711                 &[local_ipv6_addr(), local_ipv4_addr()][..],
712                 6,
713                 local_timeout,
714                 false,
715             ),
716             // Unreachable + fast primary, without fallback.
717             (
718                 &[unreachable_ipv4_addr(), local_ipv4_addr()][..],
719                 4,
720                 unreachable_v4_timeout,
721                 false,
722             ),
723             (
724                 &[unreachable_ipv6_addr(), local_ipv6_addr()][..],
725                 6,
726                 unreachable_v6_timeout,
727                 false,
728             ),
729             // Unreachable + fast primary, with (unused) fallback.
730             (
731                 &[
732                     unreachable_ipv4_addr(),
733                     local_ipv4_addr(),
734                     local_ipv6_addr(),
735                 ][..],
736                 4,
737                 unreachable_v4_timeout,
738                 false,
739             ),
740             (
741                 &[
742                     unreachable_ipv6_addr(),
743                     local_ipv6_addr(),
744                     local_ipv4_addr(),
745                 ][..],
746                 6,
747                 unreachable_v6_timeout,
748                 true,
749             ),
750             // Slow primary, with (used) fallback.
751             (
752                 &[slow_ipv4_addr(), local_ipv4_addr(), local_ipv6_addr()][..],
753                 6,
754                 fallback_timeout,
755                 false,
756             ),
757             (
758                 &[slow_ipv6_addr(), local_ipv6_addr(), local_ipv4_addr()][..],
759                 4,
760                 fallback_timeout,
761                 true,
762             ),
763             // Slow primary, with (used) unreachable + fast fallback.
764             (
765                 &[slow_ipv4_addr(), unreachable_ipv6_addr(), local_ipv6_addr()][..],
766                 6,
767                 fallback_timeout + unreachable_v6_timeout,
768                 false,
769             ),
770             (
771                 &[slow_ipv6_addr(), unreachable_ipv4_addr(), local_ipv4_addr()][..],
772                 4,
773                 fallback_timeout + unreachable_v4_timeout,
774                 true,
775             ),
776         ];
777 
778         // Scenarios for IPv6 -> IPv4 fallback require that host can access IPv6 network.
779         // Otherwise, connection to "slow" IPv6 address will error-out immediately.
780         let ipv6_accessible = measure_connect(slow_ipv6_addr()).0;
781 
782         for &(hosts, family, timeout, needs_ipv6_access) in scenarios {
783             if needs_ipv6_access && !ipv6_accessible {
784                 continue;
785             }
786 
787             let (start, stream) = rt
788                 .block_on(async move {
789                     let addrs = hosts
790                         .iter()
791                         .map(|host| (host.clone(), addr.port()).into())
792                         .collect();
793                     let connecting_tcp = ConnectingTcp::new(
794                         None,
795                         dns::IpAddrs::new(addrs),
796                         None,
797                         Some(fallback_timeout),
798                         false,
799                     );
800                     let start = Instant::now();
801                     Ok::<_, io::Error>((start, connecting_tcp.connect().await?))
802                 })
803                 .unwrap();
804             let res = if stream.peer_addr().unwrap().is_ipv4() {
805                 4
806             } else {
807                 6
808             };
809             let duration = start.elapsed();
810 
811             // Allow actual duration to be +/- 150ms off.
812             let min_duration = if timeout >= Duration::from_millis(150) {
813                 timeout - Duration::from_millis(150)
814             } else {
815                 Duration::default()
816             };
817             let max_duration = timeout + Duration::from_millis(150);
818 
819             assert_eq!(res, family);
820             assert!(duration >= min_duration);
821             assert!(duration <= max_duration);
822         }
823 
824         fn local_ipv4_addr() -> IpAddr {
825             Ipv4Addr::new(127, 0, 0, 1).into()
826         }
827 
828         fn local_ipv6_addr() -> IpAddr {
829             Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1).into()
830         }
831 
832         fn unreachable_ipv4_addr() -> IpAddr {
833             Ipv4Addr::new(127, 0, 0, 2).into()
834         }
835 
836         fn unreachable_ipv6_addr() -> IpAddr {
837             Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 2).into()
838         }
839 
840         fn slow_ipv4_addr() -> IpAddr {
841             // RFC 6890 reserved IPv4 address.
842             Ipv4Addr::new(198, 18, 0, 25).into()
843         }
844 
845         fn slow_ipv6_addr() -> IpAddr {
846             // RFC 6890 reserved IPv6 address.
847             Ipv6Addr::new(2001, 2, 0, 0, 0, 0, 0, 254).into()
848         }
849 
850         fn measure_connect(addr: IpAddr) -> (bool, Duration) {
851             let start = Instant::now();
852             let result =
853                 std::net::TcpStream::connect_timeout(&(addr, 80).into(), Duration::from_secs(1));
854 
855             let reachable = result.is_ok() || result.unwrap_err().kind() == io::ErrorKind::TimedOut;
856             let duration = start.elapsed();
857             (reachable, duration)
858         }
859     }
860 }
861