1 use std::error::Error as StdError;
2 #[cfg(feature = "runtime")]
3 use std::time::Duration;
4 
5 use bytes::Bytes;
6 use futures_channel::{mpsc, oneshot};
7 use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _};
8 use futures_util::stream::StreamExt as _;
9 use h2::client::{Builder, SendRequest};
10 use http::{Method, StatusCode};
11 use tokio::io::{AsyncRead, AsyncWrite};
12 use tracing::{debug, trace, warn};
13 
14 use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
15 use crate::body::HttpBody;
16 use crate::common::{exec::Exec, task, Future, Never, Pin, Poll};
17 use crate::headers;
18 use crate::proto::h2::UpgradedSendStream;
19 use crate::proto::Dispatched;
20 use crate::upgrade::Upgraded;
21 use crate::{Body, Request, Response};
22 
23 type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<Body>>;
24 
25 ///// An mpsc channel is used to help notify the `Connection` task when *all*
26 ///// other handles to it have been dropped, so that it can shutdown.
27 type ConnDropRef = mpsc::Sender<Never>;
28 
29 ///// A oneshot channel watches the `Connection` task, and when it completes,
30 ///// the "dispatch" task will be notified and can shutdown sooner.
31 type ConnEof = oneshot::Receiver<Never>;
32 
33 // Our defaults are chosen for the "majority" case, which usually are not
34 // resource constrained, and so the spec default of 64kb can be too limiting
35 // for performance.
36 const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
37 const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
38 const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
39 const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb
40 
41 #[derive(Clone, Debug)]
42 pub(crate) struct Config {
43     pub(crate) adaptive_window: bool,
44     pub(crate) initial_conn_window_size: u32,
45     pub(crate) initial_stream_window_size: u32,
46     pub(crate) max_frame_size: u32,
47     #[cfg(feature = "runtime")]
48     pub(crate) keep_alive_interval: Option<Duration>,
49     #[cfg(feature = "runtime")]
50     pub(crate) keep_alive_timeout: Duration,
51     #[cfg(feature = "runtime")]
52     pub(crate) keep_alive_while_idle: bool,
53     pub(crate) max_concurrent_reset_streams: Option<usize>,
54     pub(crate) max_send_buffer_size: usize,
55 }
56 
57 impl Default for Config {
default() -> Config58     fn default() -> Config {
59         Config {
60             adaptive_window: false,
61             initial_conn_window_size: DEFAULT_CONN_WINDOW,
62             initial_stream_window_size: DEFAULT_STREAM_WINDOW,
63             max_frame_size: DEFAULT_MAX_FRAME_SIZE,
64             #[cfg(feature = "runtime")]
65             keep_alive_interval: None,
66             #[cfg(feature = "runtime")]
67             keep_alive_timeout: Duration::from_secs(20),
68             #[cfg(feature = "runtime")]
69             keep_alive_while_idle: false,
70             max_concurrent_reset_streams: None,
71             max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
72         }
73     }
74 }
75 
new_builder(config: &Config) -> Builder76 fn new_builder(config: &Config) -> Builder {
77     let mut builder = Builder::default();
78     builder
79         .initial_window_size(config.initial_stream_window_size)
80         .initial_connection_window_size(config.initial_conn_window_size)
81         .max_frame_size(config.max_frame_size)
82         .max_send_buffer_size(config.max_send_buffer_size)
83         .enable_push(false);
84     if let Some(max) = config.max_concurrent_reset_streams {
85         builder.max_concurrent_reset_streams(max);
86     }
87     builder
88 }
89 
new_ping_config(config: &Config) -> ping::Config90 fn new_ping_config(config: &Config) -> ping::Config {
91     ping::Config {
92         bdp_initial_window: if config.adaptive_window {
93             Some(config.initial_stream_window_size)
94         } else {
95             None
96         },
97         #[cfg(feature = "runtime")]
98         keep_alive_interval: config.keep_alive_interval,
99         #[cfg(feature = "runtime")]
100         keep_alive_timeout: config.keep_alive_timeout,
101         #[cfg(feature = "runtime")]
102         keep_alive_while_idle: config.keep_alive_while_idle,
103     }
104 }
105 
handshake<T, B>( io: T, req_rx: ClientRx<B>, config: &Config, exec: Exec, ) -> crate::Result<ClientTask<B>> where T: AsyncRead + AsyncWrite + Send + Unpin + 'static, B: HttpBody, B::Data: Send + 'static,106 pub(crate) async fn handshake<T, B>(
107     io: T,
108     req_rx: ClientRx<B>,
109     config: &Config,
110     exec: Exec,
111 ) -> crate::Result<ClientTask<B>>
112 where
113     T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
114     B: HttpBody,
115     B::Data: Send + 'static,
116 {
117     let (h2_tx, mut conn) = new_builder(config)
118         .handshake::<_, SendBuf<B::Data>>(io)
119         .await
120         .map_err(crate::Error::new_h2)?;
121 
122     // An mpsc channel is used entirely to detect when the
123     // 'Client' has been dropped. This is to get around a bug
124     // in h2 where dropping all SendRequests won't notify a
125     // parked Connection.
126     let (conn_drop_ref, rx) = mpsc::channel(1);
127     let (cancel_tx, conn_eof) = oneshot::channel();
128 
129     let conn_drop_rx = rx.into_future().map(|(item, _rx)| {
130         if let Some(never) = item {
131             match never {}
132         }
133     });
134 
135     let ping_config = new_ping_config(&config);
136 
137     let (conn, ping) = if ping_config.is_enabled() {
138         let pp = conn.ping_pong().expect("conn.ping_pong");
139         let (recorder, mut ponger) = ping::channel(pp, ping_config);
140 
141         let conn = future::poll_fn(move |cx| {
142             match ponger.poll(cx) {
143                 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
144                     conn.set_target_window_size(wnd);
145                     conn.set_initial_window_size(wnd)?;
146                 }
147                 #[cfg(feature = "runtime")]
148                 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
149                     debug!("connection keep-alive timed out");
150                     return Poll::Ready(Ok(()));
151                 }
152                 Poll::Pending => {}
153             }
154 
155             Pin::new(&mut conn).poll(cx)
156         });
157         (Either::Left(conn), recorder)
158     } else {
159         (Either::Right(conn), ping::disabled())
160     };
161     let conn = conn.map_err(|e| debug!("connection error: {}", e));
162 
163     exec.execute(conn_task(conn, conn_drop_rx, cancel_tx));
164 
165     Ok(ClientTask {
166         ping,
167         conn_drop_ref,
168         conn_eof,
169         executor: exec,
170         h2_tx,
171         req_rx,
172     })
173 }
174 
conn_task<C, D>(conn: C, drop_rx: D, cancel_tx: oneshot::Sender<Never>) where C: Future + Unpin, D: Future<Output = ()> + Unpin,175 async fn conn_task<C, D>(conn: C, drop_rx: D, cancel_tx: oneshot::Sender<Never>)
176 where
177     C: Future + Unpin,
178     D: Future<Output = ()> + Unpin,
179 {
180     match future::select(conn, drop_rx).await {
181         Either::Left(_) => {
182             // ok or err, the `conn` has finished
183         }
184         Either::Right(((), conn)) => {
185             // mpsc has been dropped, hopefully polling
186             // the connection some more should start shutdown
187             // and then close
188             trace!("send_request dropped, starting conn shutdown");
189             drop(cancel_tx);
190             let _ = conn.await;
191         }
192     }
193 }
194 
195 pub(crate) struct ClientTask<B>
196 where
197     B: HttpBody,
198 {
199     ping: ping::Recorder,
200     conn_drop_ref: ConnDropRef,
201     conn_eof: ConnEof,
202     executor: Exec,
203     h2_tx: SendRequest<SendBuf<B::Data>>,
204     req_rx: ClientRx<B>,
205 }
206 
207 impl<B> Future for ClientTask<B>
208 where
209     B: HttpBody + Send + 'static,
210     B::Data: Send,
211     B::Error: Into<Box<dyn StdError + Send + Sync>>,
212 {
213     type Output = crate::Result<Dispatched>;
214 
poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>215     fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
216         loop {
217             match ready!(self.h2_tx.poll_ready(cx)) {
218                 Ok(()) => (),
219                 Err(err) => {
220                     self.ping.ensure_not_timed_out()?;
221                     return if err.reason() == Some(::h2::Reason::NO_ERROR) {
222                         trace!("connection gracefully shutdown");
223                         Poll::Ready(Ok(Dispatched::Shutdown))
224                     } else {
225                         Poll::Ready(Err(crate::Error::new_h2(err)))
226                     };
227                 }
228             };
229 
230             match self.req_rx.poll_recv(cx) {
231                 Poll::Ready(Some((req, cb))) => {
232                     // check that future hasn't been canceled already
233                     if cb.is_canceled() {
234                         trace!("request callback is canceled");
235                         continue;
236                     }
237                     let (head, body) = req.into_parts();
238                     let mut req = ::http::Request::from_parts(head, ());
239                     super::strip_connection_headers(req.headers_mut(), true);
240                     if let Some(len) = body.size_hint().exact() {
241                         if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
242                             headers::set_content_length_if_missing(req.headers_mut(), len);
243                         }
244                     }
245 
246                     let is_connect = req.method() == Method::CONNECT;
247                     let eos = body.is_end_stream();
248                     let ping = self.ping.clone();
249 
250                     if is_connect {
251                         if headers::content_length_parse_all(req.headers())
252                             .map_or(false, |len| len != 0)
253                         {
254                             warn!("h2 connect request with non-zero body not supported");
255                             cb.send(Err((
256                                 crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
257                                 None,
258                             )));
259                             continue;
260                         }
261                     }
262 
263                     let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
264                         Ok(ok) => ok,
265                         Err(err) => {
266                             debug!("client send request error: {}", err);
267                             cb.send(Err((crate::Error::new_h2(err), None)));
268                             continue;
269                         }
270                     };
271 
272                     let send_stream = if !is_connect {
273                         if !eos {
274                             let mut pipe =
275                                 Box::pin(PipeToSendStream::new(body, body_tx)).map(|res| {
276                                     if let Err(e) = res {
277                                         debug!("client request body error: {}", e);
278                                     }
279                                 });
280 
281                             // eagerly see if the body pipe is ready and
282                             // can thus skip allocating in the executor
283                             match Pin::new(&mut pipe).poll(cx) {
284                                 Poll::Ready(_) => (),
285                                 Poll::Pending => {
286                                     let conn_drop_ref = self.conn_drop_ref.clone();
287                                     // keep the ping recorder's knowledge of an
288                                     // "open stream" alive while this body is
289                                     // still sending...
290                                     let ping = ping.clone();
291                                     let pipe = pipe.map(move |x| {
292                                         drop(conn_drop_ref);
293                                         drop(ping);
294                                         x
295                                     });
296                                     self.executor.execute(pipe);
297                                 }
298                             }
299                         }
300 
301                         None
302                     } else {
303                         Some(body_tx)
304                     };
305 
306                     let fut = fut.map(move |result| match result {
307                         Ok(res) => {
308                             // record that we got the response headers
309                             ping.record_non_data();
310 
311                             let content_length = headers::content_length_parse_all(res.headers());
312                             if let (Some(mut send_stream), StatusCode::OK) =
313                                 (send_stream, res.status())
314                             {
315                                 if content_length.map_or(false, |len| len != 0) {
316                                     warn!("h2 connect response with non-zero body not supported");
317 
318                                     send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
319                                     return Err((
320                                         crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
321                                         None,
322                                     ));
323                                 }
324                                 let (parts, recv_stream) = res.into_parts();
325                                 let mut res = Response::from_parts(parts, Body::empty());
326 
327                                 let (pending, on_upgrade) = crate::upgrade::pending();
328                                 let io = H2Upgraded {
329                                     ping,
330                                     send_stream: unsafe { UpgradedSendStream::new(send_stream) },
331                                     recv_stream,
332                                     buf: Bytes::new(),
333                                 };
334                                 let upgraded = Upgraded::new(io, Bytes::new());
335 
336                                 pending.fulfill(upgraded);
337                                 res.extensions_mut().insert(on_upgrade);
338 
339                                 Ok(res)
340                             } else {
341                                 let res = res.map(|stream| {
342                                     let ping = ping.for_stream(&stream);
343                                     crate::Body::h2(stream, content_length.into(), ping)
344                                 });
345                                 Ok(res)
346                             }
347                         }
348                         Err(err) => {
349                             ping.ensure_not_timed_out().map_err(|e| (e, None))?;
350 
351                             debug!("client response error: {}", err);
352                             Err((crate::Error::new_h2(err), None))
353                         }
354                     });
355                     self.executor.execute(cb.send_when(fut));
356                     continue;
357                 }
358 
359                 Poll::Ready(None) => {
360                     trace!("client::dispatch::Sender dropped");
361                     return Poll::Ready(Ok(Dispatched::Shutdown));
362                 }
363 
364                 Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
365                     Ok(never) => match never {},
366                     Err(_conn_is_eof) => {
367                         trace!("connection task is closed, closing dispatch task");
368                         return Poll::Ready(Ok(Dispatched::Shutdown));
369                     }
370                 },
371             }
372         }
373     }
374 }
375