1 use std::error::Error as StdError;
2 use std::fmt;
3 use std::future::Future;
4 use std::io;
5 use std::marker::PhantomData;
6 use std::net::{IpAddr, SocketAddr};
7 use std::pin::Pin;
8 use std::sync::Arc;
9 use std::task::{self, Poll};
10 use std::time::Duration;
11
12 use futures_util::future::Either;
13 use http::uri::{Scheme, Uri};
14 use pin_project::pin_project;
15 use tokio::net::TcpStream;
16 use tokio::time::Delay;
17
18 use super::dns::{self, resolve, GaiResolver, Resolve};
19 use super::{Connected, Connection};
20 //#[cfg(feature = "runtime")] use super::dns::TokioThreadpoolGaiResolver;
21
22 /// A connector for the `http` scheme.
23 ///
24 /// Performs DNS resolution in a thread pool, and then connects over TCP.
25 ///
26 /// # Note
27 ///
28 /// Sets the [`HttpInfo`](HttpInfo) value on responses, which includes
29 /// transport information such as the remote socket address used.
30 #[derive(Clone)]
31 pub struct HttpConnector<R = GaiResolver> {
32 config: Arc<Config>,
33 resolver: R,
34 }
35
36 /// Extra information about the transport when an HttpConnector is used.
37 ///
38 /// # Example
39 ///
40 /// ```
41 /// # async fn doc() -> hyper::Result<()> {
42 /// use hyper::Uri;
43 /// use hyper::client::{Client, connect::HttpInfo};
44 ///
45 /// let client = Client::new();
46 /// let uri = Uri::from_static("http://example.com");
47 ///
48 /// let res = client.get(uri).await?;
49 /// res
50 /// .extensions()
51 /// .get::<HttpInfo>()
52 /// .map(|info| {
53 /// println!("remote addr = {}", info.remote_addr());
54 /// });
55 /// # Ok(())
56 /// # }
57 /// ```
58 ///
59 /// # Note
60 ///
61 /// If a different connector is used besides [`HttpConnector`](HttpConnector),
62 /// this value will not exist in the extensions. Consult that specific
63 /// connector to see what "extra" information it might provide to responses.
64 #[derive(Clone, Debug)]
65 pub struct HttpInfo {
66 remote_addr: SocketAddr,
67 }
68
69 #[derive(Clone)]
70 struct Config {
71 connect_timeout: Option<Duration>,
72 enforce_http: bool,
73 happy_eyeballs_timeout: Option<Duration>,
74 keep_alive_timeout: Option<Duration>,
75 local_address: Option<IpAddr>,
76 nodelay: bool,
77 reuse_address: bool,
78 send_buffer_size: Option<usize>,
79 recv_buffer_size: Option<usize>,
80 }
81
82 // ===== impl HttpConnector =====
83
84 impl HttpConnector {
85 /// Construct a new HttpConnector.
new() -> HttpConnector86 pub fn new() -> HttpConnector {
87 HttpConnector::new_with_resolver(GaiResolver::new())
88 }
89 }
90
91 /*
92 #[cfg(feature = "runtime")]
93 impl HttpConnector<TokioThreadpoolGaiResolver> {
94 /// Construct a new HttpConnector using the `TokioThreadpoolGaiResolver`.
95 ///
96 /// This resolver **requires** the threadpool runtime to be used.
97 pub fn new_with_tokio_threadpool_resolver() -> Self {
98 HttpConnector::new_with_resolver(TokioThreadpoolGaiResolver::new())
99 }
100 }
101 */
102
103 impl<R> HttpConnector<R> {
104 /// Construct a new HttpConnector.
105 ///
106 /// Takes a `Resolve` to handle DNS lookups.
new_with_resolver(resolver: R) -> HttpConnector<R>107 pub fn new_with_resolver(resolver: R) -> HttpConnector<R> {
108 HttpConnector {
109 config: Arc::new(Config {
110 connect_timeout: None,
111 enforce_http: true,
112 happy_eyeballs_timeout: Some(Duration::from_millis(300)),
113 keep_alive_timeout: None,
114 local_address: None,
115 nodelay: false,
116 reuse_address: false,
117 send_buffer_size: None,
118 recv_buffer_size: None,
119 }),
120 resolver,
121 }
122 }
123
124 /// Option to enforce all `Uri`s have the `http` scheme.
125 ///
126 /// Enabled by default.
127 #[inline]
enforce_http(&mut self, is_enforced: bool)128 pub fn enforce_http(&mut self, is_enforced: bool) {
129 self.config_mut().enforce_http = is_enforced;
130 }
131
132 /// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration.
133 ///
134 /// If `None`, the option will not be set.
135 ///
136 /// Default is `None`.
137 #[inline]
set_keepalive(&mut self, dur: Option<Duration>)138 pub fn set_keepalive(&mut self, dur: Option<Duration>) {
139 self.config_mut().keep_alive_timeout = dur;
140 }
141
142 /// Set that all sockets have `SO_NODELAY` set to the supplied value `nodelay`.
143 ///
144 /// Default is `false`.
145 #[inline]
set_nodelay(&mut self, nodelay: bool)146 pub fn set_nodelay(&mut self, nodelay: bool) {
147 self.config_mut().nodelay = nodelay;
148 }
149
150 /// Sets the value of the SO_SNDBUF option on the socket.
151 #[inline]
set_send_buffer_size(&mut self, size: Option<usize>)152 pub fn set_send_buffer_size(&mut self, size: Option<usize>) {
153 self.config_mut().send_buffer_size = size;
154 }
155
156 /// Sets the value of the SO_RCVBUF option on the socket.
157 #[inline]
set_recv_buffer_size(&mut self, size: Option<usize>)158 pub fn set_recv_buffer_size(&mut self, size: Option<usize>) {
159 self.config_mut().recv_buffer_size = size;
160 }
161
162 /// Set that all sockets are bound to the configured address before connection.
163 ///
164 /// If `None`, the sockets will not be bound.
165 ///
166 /// Default is `None`.
167 #[inline]
set_local_address(&mut self, addr: Option<IpAddr>)168 pub fn set_local_address(&mut self, addr: Option<IpAddr>) {
169 self.config_mut().local_address = addr;
170 }
171
172 /// Set the connect timeout.
173 ///
174 /// If a domain resolves to multiple IP addresses, the timeout will be
175 /// evenly divided across them.
176 ///
177 /// Default is `None`.
178 #[inline]
set_connect_timeout(&mut self, dur: Option<Duration>)179 pub fn set_connect_timeout(&mut self, dur: Option<Duration>) {
180 self.config_mut().connect_timeout = dur;
181 }
182
183 /// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm.
184 ///
185 /// If hostname resolves to both IPv4 and IPv6 addresses and connection
186 /// cannot be established using preferred address family before timeout
187 /// elapses, then connector will in parallel attempt connection using other
188 /// address family.
189 ///
190 /// If `None`, parallel connection attempts are disabled.
191 ///
192 /// Default is 300 milliseconds.
193 ///
194 /// [RFC 6555]: https://tools.ietf.org/html/rfc6555
195 #[inline]
set_happy_eyeballs_timeout(&mut self, dur: Option<Duration>)196 pub fn set_happy_eyeballs_timeout(&mut self, dur: Option<Duration>) {
197 self.config_mut().happy_eyeballs_timeout = dur;
198 }
199
200 /// Set that all socket have `SO_REUSEADDR` set to the supplied value `reuse_address`.
201 ///
202 /// Default is `false`.
203 #[inline]
set_reuse_address(&mut self, reuse_address: bool) -> &mut Self204 pub fn set_reuse_address(&mut self, reuse_address: bool) -> &mut Self {
205 self.config_mut().reuse_address = reuse_address;
206 self
207 }
208
209 // private
210
config_mut(&mut self) -> &mut Config211 fn config_mut(&mut self) -> &mut Config {
212 // If the are HttpConnector clones, this will clone the inner
213 // config. So mutating the config won't ever affect previous
214 // clones.
215 Arc::make_mut(&mut self.config)
216 }
217 }
218
219 static INVALID_NOT_HTTP: &str = "invalid URL, scheme is not http";
220 static INVALID_MISSING_SCHEME: &str = "invalid URL, scheme is missing";
221 static INVALID_MISSING_HOST: &str = "invalid URL, host is missing";
222
223 // R: Debug required for now to allow adding it to debug output later...
224 impl<R: fmt::Debug> fmt::Debug for HttpConnector<R> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result225 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
226 f.debug_struct("HttpConnector").finish()
227 }
228 }
229
230 impl<R> tower_service::Service<Uri> for HttpConnector<R>
231 where
232 R: Resolve + Clone + Send + Sync + 'static,
233 R::Future: Send,
234 {
235 type Response = TcpStream;
236 type Error = ConnectError;
237 type Future = HttpConnecting<R>;
238
poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>239 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
240 ready!(self.resolver.poll_ready(cx)).map_err(ConnectError::dns)?;
241 Poll::Ready(Ok(()))
242 }
243
call(&mut self, dst: Uri) -> Self::Future244 fn call(&mut self, dst: Uri) -> Self::Future {
245 let mut self_ = self.clone();
246 HttpConnecting {
247 fut: Box::pin(async move { self_.call_async(dst).await }),
248 _marker: PhantomData,
249 }
250 }
251 }
252
253 impl<R> HttpConnector<R>
254 where
255 R: Resolve,
256 {
call_async(&mut self, dst: Uri) -> Result<TcpStream, ConnectError>257 async fn call_async(&mut self, dst: Uri) -> Result<TcpStream, ConnectError> {
258 trace!(
259 "Http::connect; scheme={:?}, host={:?}, port={:?}",
260 dst.scheme(),
261 dst.host(),
262 dst.port(),
263 );
264
265 if self.config.enforce_http {
266 if dst.scheme() != Some(&Scheme::HTTP) {
267 return Err(ConnectError {
268 msg: INVALID_NOT_HTTP.into(),
269 cause: None,
270 });
271 }
272 } else if dst.scheme().is_none() {
273 return Err(ConnectError {
274 msg: INVALID_MISSING_SCHEME.into(),
275 cause: None,
276 });
277 }
278
279 let host = match dst.host() {
280 Some(s) => s,
281 None => {
282 return Err(ConnectError {
283 msg: INVALID_MISSING_HOST.into(),
284 cause: None,
285 })
286 }
287 };
288 let port = match dst.port() {
289 Some(port) => port.as_u16(),
290 None => {
291 if dst.scheme() == Some(&Scheme::HTTPS) {
292 443
293 } else {
294 80
295 }
296 }
297 };
298
299 let config = &self.config;
300
301 // If the host is already an IP addr (v4 or v6),
302 // skip resolving the dns and start connecting right away.
303 let addrs = if let Some(addrs) = dns::IpAddrs::try_parse(host, port) {
304 addrs
305 } else {
306 let addrs = resolve(&mut self.resolver, dns::Name::new(host.into()))
307 .await
308 .map_err(ConnectError::dns)?;
309 let addrs = addrs.map(|addr| SocketAddr::new(addr, port)).collect();
310 dns::IpAddrs::new(addrs)
311 };
312
313 let c = ConnectingTcp::new(
314 config.local_address,
315 addrs,
316 config.connect_timeout,
317 config.happy_eyeballs_timeout,
318 config.reuse_address,
319 );
320
321 let sock = c
322 .connect()
323 .await
324 .map_err(ConnectError::m("tcp connect error"))?;
325
326 if let Some(dur) = config.keep_alive_timeout {
327 sock.set_keepalive(Some(dur))
328 .map_err(ConnectError::m("tcp set_keepalive error"))?;
329 }
330
331 if let Some(size) = config.send_buffer_size {
332 sock.set_send_buffer_size(size)
333 .map_err(ConnectError::m("tcp set_send_buffer_size error"))?;
334 }
335
336 if let Some(size) = config.recv_buffer_size {
337 sock.set_recv_buffer_size(size)
338 .map_err(ConnectError::m("tcp set_recv_buffer_size error"))?;
339 }
340
341 sock.set_nodelay(config.nodelay)
342 .map_err(ConnectError::m("tcp set_nodelay error"))?;
343
344 Ok(sock)
345 }
346 }
347
348 impl Connection for TcpStream {
connected(&self) -> Connected349 fn connected(&self) -> Connected {
350 let connected = Connected::new();
351 if let Ok(remote_addr) = self.peer_addr() {
352 connected.extra(HttpInfo { remote_addr })
353 } else {
354 connected
355 }
356 }
357 }
358
359 impl HttpInfo {
360 /// Get the remote address of the transport used.
remote_addr(&self) -> SocketAddr361 pub fn remote_addr(&self) -> SocketAddr {
362 self.remote_addr
363 }
364 }
365
366 // Not publicly exported (so missing_docs doesn't trigger).
367 //
368 // We return this `Future` instead of the `Pin<Box<dyn Future>>` directly
369 // so that users don't rely on it fitting in a `Pin<Box<dyn Future>>` slot
370 // (and thus we can change the type in the future).
371 #[must_use = "futures do nothing unless polled"]
372 #[pin_project]
373 #[allow(missing_debug_implementations)]
374 pub struct HttpConnecting<R> {
375 #[pin]
376 fut: BoxConnecting,
377 _marker: PhantomData<R>,
378 }
379
380 type ConnectResult = Result<TcpStream, ConnectError>;
381 type BoxConnecting = Pin<Box<dyn Future<Output = ConnectResult> + Send>>;
382
383 impl<R: Resolve> Future for HttpConnecting<R> {
384 type Output = ConnectResult;
385
poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>386 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
387 self.project().fut.poll(cx)
388 }
389 }
390
391 // Not publicly exported (so missing_docs doesn't trigger).
392 pub struct ConnectError {
393 msg: Box<str>,
394 cause: Option<Box<dyn StdError + Send + Sync>>,
395 }
396
397 impl ConnectError {
new<S, E>(msg: S, cause: E) -> ConnectError where S: Into<Box<str>>, E: Into<Box<dyn StdError + Send + Sync>>,398 fn new<S, E>(msg: S, cause: E) -> ConnectError
399 where
400 S: Into<Box<str>>,
401 E: Into<Box<dyn StdError + Send + Sync>>,
402 {
403 ConnectError {
404 msg: msg.into(),
405 cause: Some(cause.into()),
406 }
407 }
408
dns<E>(cause: E) -> ConnectError where E: Into<Box<dyn StdError + Send + Sync>>,409 fn dns<E>(cause: E) -> ConnectError
410 where
411 E: Into<Box<dyn StdError + Send + Sync>>,
412 {
413 ConnectError::new("dns error", cause)
414 }
415
m<S, E>(msg: S) -> impl FnOnce(E) -> ConnectError where S: Into<Box<str>>, E: Into<Box<dyn StdError + Send + Sync>>,416 fn m<S, E>(msg: S) -> impl FnOnce(E) -> ConnectError
417 where
418 S: Into<Box<str>>,
419 E: Into<Box<dyn StdError + Send + Sync>>,
420 {
421 move |cause| ConnectError::new(msg, cause)
422 }
423 }
424
425 impl fmt::Debug for ConnectError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result426 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
427 if let Some(ref cause) = self.cause {
428 f.debug_tuple("ConnectError")
429 .field(&self.msg)
430 .field(cause)
431 .finish()
432 } else {
433 self.msg.fmt(f)
434 }
435 }
436 }
437
438 impl fmt::Display for ConnectError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result439 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
440 f.write_str(&self.msg)?;
441
442 if let Some(ref cause) = self.cause {
443 write!(f, ": {}", cause)?;
444 }
445
446 Ok(())
447 }
448 }
449
450 impl StdError for ConnectError {
source(&self) -> Option<&(dyn StdError + 'static)>451 fn source(&self) -> Option<&(dyn StdError + 'static)> {
452 self.cause.as_ref().map(|e| &**e as _)
453 }
454 }
455
456 struct ConnectingTcp {
457 local_addr: Option<IpAddr>,
458 preferred: ConnectingTcpRemote,
459 fallback: Option<ConnectingTcpFallback>,
460 reuse_address: bool,
461 }
462
463 impl ConnectingTcp {
new( local_addr: Option<IpAddr>, remote_addrs: dns::IpAddrs, connect_timeout: Option<Duration>, fallback_timeout: Option<Duration>, reuse_address: bool, ) -> ConnectingTcp464 fn new(
465 local_addr: Option<IpAddr>,
466 remote_addrs: dns::IpAddrs,
467 connect_timeout: Option<Duration>,
468 fallback_timeout: Option<Duration>,
469 reuse_address: bool,
470 ) -> ConnectingTcp {
471 if let Some(fallback_timeout) = fallback_timeout {
472 let (preferred_addrs, fallback_addrs) = remote_addrs.split_by_preference(local_addr);
473 if fallback_addrs.is_empty() {
474 return ConnectingTcp {
475 local_addr,
476 preferred: ConnectingTcpRemote::new(preferred_addrs, connect_timeout),
477 fallback: None,
478 reuse_address,
479 };
480 }
481
482 ConnectingTcp {
483 local_addr,
484 preferred: ConnectingTcpRemote::new(preferred_addrs, connect_timeout),
485 fallback: Some(ConnectingTcpFallback {
486 delay: tokio::time::delay_for(fallback_timeout),
487 remote: ConnectingTcpRemote::new(fallback_addrs, connect_timeout),
488 }),
489 reuse_address,
490 }
491 } else {
492 ConnectingTcp {
493 local_addr,
494 preferred: ConnectingTcpRemote::new(remote_addrs, connect_timeout),
495 fallback: None,
496 reuse_address,
497 }
498 }
499 }
500 }
501
502 struct ConnectingTcpFallback {
503 delay: Delay,
504 remote: ConnectingTcpRemote,
505 }
506
507 struct ConnectingTcpRemote {
508 addrs: dns::IpAddrs,
509 connect_timeout: Option<Duration>,
510 }
511
512 impl ConnectingTcpRemote {
new(addrs: dns::IpAddrs, connect_timeout: Option<Duration>) -> Self513 fn new(addrs: dns::IpAddrs, connect_timeout: Option<Duration>) -> Self {
514 let connect_timeout = connect_timeout.map(|t| t / (addrs.len() as u32));
515
516 Self {
517 addrs,
518 connect_timeout,
519 }
520 }
521 }
522
523 impl ConnectingTcpRemote {
connect( &mut self, local_addr: &Option<IpAddr>, reuse_address: bool, ) -> io::Result<TcpStream>524 async fn connect(
525 &mut self,
526 local_addr: &Option<IpAddr>,
527 reuse_address: bool,
528 ) -> io::Result<TcpStream> {
529 let mut err = None;
530 for addr in &mut self.addrs {
531 debug!("connecting to {}", addr);
532 match connect(&addr, local_addr, reuse_address, self.connect_timeout)?.await {
533 Ok(tcp) => {
534 debug!("connected to {}", addr);
535 return Ok(tcp);
536 }
537 Err(e) => {
538 trace!("connect error for {}: {:?}", addr, e);
539 err = Some(e);
540 }
541 }
542 }
543
544 Err(err.take().expect("missing connect error"))
545 }
546 }
547
connect( addr: &SocketAddr, local_addr: &Option<IpAddr>, reuse_address: bool, connect_timeout: Option<Duration>, ) -> io::Result<impl Future<Output = io::Result<TcpStream>>>548 fn connect(
549 addr: &SocketAddr,
550 local_addr: &Option<IpAddr>,
551 reuse_address: bool,
552 connect_timeout: Option<Duration>,
553 ) -> io::Result<impl Future<Output = io::Result<TcpStream>>> {
554 use socket2::{Domain, Protocol, Socket, Type};
555 let domain = match *addr {
556 SocketAddr::V4(_) => Domain::ipv4(),
557 SocketAddr::V6(_) => Domain::ipv6(),
558 };
559 let socket = Socket::new(domain, Type::stream(), Some(Protocol::tcp()))?;
560
561 if reuse_address {
562 socket.set_reuse_address(true)?;
563 }
564
565 if let Some(ref local_addr) = *local_addr {
566 // Caller has requested this socket be bound before calling connect
567 socket.bind(&SocketAddr::new(local_addr.clone(), 0).into())?;
568 } else if cfg!(windows) {
569 // Windows requires a socket be bound before calling connect
570 let any: SocketAddr = match *addr {
571 SocketAddr::V4(_) => ([0, 0, 0, 0], 0).into(),
572 SocketAddr::V6(_) => ([0, 0, 0, 0, 0, 0, 0, 0], 0).into(),
573 };
574 socket.bind(&any.into())?;
575 }
576
577 let addr = *addr;
578
579 let std_tcp = socket.into_tcp_stream();
580
581 Ok(async move {
582 let connect = TcpStream::connect_std(std_tcp, &addr);
583 match connect_timeout {
584 Some(dur) => match tokio::time::timeout(dur, connect).await {
585 Ok(Ok(s)) => Ok(s),
586 Ok(Err(e)) => Err(e),
587 Err(e) => Err(io::Error::new(io::ErrorKind::TimedOut, e)),
588 },
589 None => connect.await,
590 }
591 })
592 }
593
594 impl ConnectingTcp {
connect(mut self) -> io::Result<TcpStream>595 async fn connect(mut self) -> io::Result<TcpStream> {
596 let Self {
597 ref local_addr,
598 reuse_address,
599 ..
600 } = self;
601 match self.fallback {
602 None => self.preferred.connect(local_addr, reuse_address).await,
603 Some(mut fallback) => {
604 let preferred_fut = self.preferred.connect(local_addr, reuse_address);
605 futures_util::pin_mut!(preferred_fut);
606
607 let fallback_fut = fallback.remote.connect(local_addr, reuse_address);
608 futures_util::pin_mut!(fallback_fut);
609
610 let (result, future) =
611 match futures_util::future::select(preferred_fut, fallback.delay).await {
612 Either::Left((result, _fallback_delay)) => {
613 (result, Either::Right(fallback_fut))
614 }
615 Either::Right(((), preferred_fut)) => {
616 // Delay is done, start polling both the preferred and the fallback
617 futures_util::future::select(preferred_fut, fallback_fut)
618 .await
619 .factor_first()
620 }
621 };
622
623 if result.is_err() {
624 // Fallback to the remaining future (could be preferred or fallback)
625 // if we get an error
626 future.await
627 } else {
628 result
629 }
630 }
631 }
632 }
633 }
634
635 #[cfg(test)]
636 mod tests {
637 use std::io;
638
639 use ::http::Uri;
640
641 use super::super::sealed::{Connect, ConnectSvc};
642 use super::HttpConnector;
643
connect<C>( connector: C, dst: Uri, ) -> Result<<C::_Svc as ConnectSvc>::Connection, <C::_Svc as ConnectSvc>::Error> where C: Connect,644 async fn connect<C>(
645 connector: C,
646 dst: Uri,
647 ) -> Result<<C::_Svc as ConnectSvc>::Connection, <C::_Svc as ConnectSvc>::Error>
648 where
649 C: Connect,
650 {
651 connector.connect(super::super::sealed::Internal, dst).await
652 }
653
654 #[tokio::test]
test_errors_enforce_http()655 async fn test_errors_enforce_http() {
656 let dst = "https://example.domain/foo/bar?baz".parse().unwrap();
657 let connector = HttpConnector::new();
658
659 let err = connect(connector, dst).await.unwrap_err();
660 assert_eq!(&*err.msg, super::INVALID_NOT_HTTP);
661 }
662
663 #[tokio::test]
test_errors_missing_scheme()664 async fn test_errors_missing_scheme() {
665 let dst = "example.domain".parse().unwrap();
666 let mut connector = HttpConnector::new();
667 connector.enforce_http(false);
668
669 let err = connect(connector, dst).await.unwrap_err();
670 assert_eq!(&*err.msg, super::INVALID_MISSING_SCHEME);
671 }
672
673 #[test]
674 #[cfg_attr(not(feature = "__internal_happy_eyeballs_tests"), ignore)]
client_happy_eyeballs()675 fn client_happy_eyeballs() {
676 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, TcpListener};
677 use std::time::{Duration, Instant};
678
679 use super::dns;
680 use super::ConnectingTcp;
681
682 let _ = pretty_env_logger::try_init();
683 let server4 = TcpListener::bind("127.0.0.1:0").unwrap();
684 let addr = server4.local_addr().unwrap();
685 let _server6 = TcpListener::bind(&format!("[::1]:{}", addr.port())).unwrap();
686 let mut rt = tokio::runtime::Builder::new()
687 .enable_io()
688 .enable_time()
689 .basic_scheduler()
690 .build()
691 .unwrap();
692
693 let local_timeout = Duration::default();
694 let unreachable_v4_timeout = measure_connect(unreachable_ipv4_addr()).1;
695 let unreachable_v6_timeout = measure_connect(unreachable_ipv6_addr()).1;
696 let fallback_timeout = std::cmp::max(unreachable_v4_timeout, unreachable_v6_timeout)
697 + Duration::from_millis(250);
698
699 let scenarios = &[
700 // Fast primary, without fallback.
701 (&[local_ipv4_addr()][..], 4, local_timeout, false),
702 (&[local_ipv6_addr()][..], 6, local_timeout, false),
703 // Fast primary, with (unused) fallback.
704 (
705 &[local_ipv4_addr(), local_ipv6_addr()][..],
706 4,
707 local_timeout,
708 false,
709 ),
710 (
711 &[local_ipv6_addr(), local_ipv4_addr()][..],
712 6,
713 local_timeout,
714 false,
715 ),
716 // Unreachable + fast primary, without fallback.
717 (
718 &[unreachable_ipv4_addr(), local_ipv4_addr()][..],
719 4,
720 unreachable_v4_timeout,
721 false,
722 ),
723 (
724 &[unreachable_ipv6_addr(), local_ipv6_addr()][..],
725 6,
726 unreachable_v6_timeout,
727 false,
728 ),
729 // Unreachable + fast primary, with (unused) fallback.
730 (
731 &[
732 unreachable_ipv4_addr(),
733 local_ipv4_addr(),
734 local_ipv6_addr(),
735 ][..],
736 4,
737 unreachable_v4_timeout,
738 false,
739 ),
740 (
741 &[
742 unreachable_ipv6_addr(),
743 local_ipv6_addr(),
744 local_ipv4_addr(),
745 ][..],
746 6,
747 unreachable_v6_timeout,
748 true,
749 ),
750 // Slow primary, with (used) fallback.
751 (
752 &[slow_ipv4_addr(), local_ipv4_addr(), local_ipv6_addr()][..],
753 6,
754 fallback_timeout,
755 false,
756 ),
757 (
758 &[slow_ipv6_addr(), local_ipv6_addr(), local_ipv4_addr()][..],
759 4,
760 fallback_timeout,
761 true,
762 ),
763 // Slow primary, with (used) unreachable + fast fallback.
764 (
765 &[slow_ipv4_addr(), unreachable_ipv6_addr(), local_ipv6_addr()][..],
766 6,
767 fallback_timeout + unreachable_v6_timeout,
768 false,
769 ),
770 (
771 &[slow_ipv6_addr(), unreachable_ipv4_addr(), local_ipv4_addr()][..],
772 4,
773 fallback_timeout + unreachable_v4_timeout,
774 true,
775 ),
776 ];
777
778 // Scenarios for IPv6 -> IPv4 fallback require that host can access IPv6 network.
779 // Otherwise, connection to "slow" IPv6 address will error-out immediately.
780 let ipv6_accessible = measure_connect(slow_ipv6_addr()).0;
781
782 for &(hosts, family, timeout, needs_ipv6_access) in scenarios {
783 if needs_ipv6_access && !ipv6_accessible {
784 continue;
785 }
786
787 let (start, stream) = rt
788 .block_on(async move {
789 let addrs = hosts
790 .iter()
791 .map(|host| (host.clone(), addr.port()).into())
792 .collect();
793 let connecting_tcp = ConnectingTcp::new(
794 None,
795 dns::IpAddrs::new(addrs),
796 None,
797 Some(fallback_timeout),
798 false,
799 );
800 let start = Instant::now();
801 Ok::<_, io::Error>((start, connecting_tcp.connect().await?))
802 })
803 .unwrap();
804 let res = if stream.peer_addr().unwrap().is_ipv4() {
805 4
806 } else {
807 6
808 };
809 let duration = start.elapsed();
810
811 // Allow actual duration to be +/- 150ms off.
812 let min_duration = if timeout >= Duration::from_millis(150) {
813 timeout - Duration::from_millis(150)
814 } else {
815 Duration::default()
816 };
817 let max_duration = timeout + Duration::from_millis(150);
818
819 assert_eq!(res, family);
820 assert!(duration >= min_duration);
821 assert!(duration <= max_duration);
822 }
823
824 fn local_ipv4_addr() -> IpAddr {
825 Ipv4Addr::new(127, 0, 0, 1).into()
826 }
827
828 fn local_ipv6_addr() -> IpAddr {
829 Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1).into()
830 }
831
832 fn unreachable_ipv4_addr() -> IpAddr {
833 Ipv4Addr::new(127, 0, 0, 2).into()
834 }
835
836 fn unreachable_ipv6_addr() -> IpAddr {
837 Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 2).into()
838 }
839
840 fn slow_ipv4_addr() -> IpAddr {
841 // RFC 6890 reserved IPv4 address.
842 Ipv4Addr::new(198, 18, 0, 25).into()
843 }
844
845 fn slow_ipv6_addr() -> IpAddr {
846 // RFC 6890 reserved IPv6 address.
847 Ipv6Addr::new(2001, 2, 0, 0, 0, 0, 0, 254).into()
848 }
849
850 fn measure_connect(addr: IpAddr) -> (bool, Duration) {
851 let start = Instant::now();
852 let result =
853 std::net::TcpStream::connect_timeout(&(addr, 80).into(), Duration::from_secs(1));
854
855 let reachable = result.is_ok() || result.unwrap_err().kind() == io::ErrorKind::TimedOut;
856 let duration = start.elapsed();
857 (reachable, duration)
858 }
859 }
860 }
861