1 use std::error::Error as StdError;
2 #[cfg(feature = "runtime")]
3 use std::time::Duration;
4
5 use bytes::Bytes;
6 use futures_channel::{mpsc, oneshot};
7 use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _};
8 use futures_util::stream::StreamExt as _;
9 use h2::client::{Builder, SendRequest};
10 use http::{Method, StatusCode};
11 use tokio::io::{AsyncRead, AsyncWrite};
12 use tracing::{debug, trace, warn};
13
14 use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
15 use crate::body::HttpBody;
16 use crate::common::{exec::Exec, task, Future, Never, Pin, Poll};
17 use crate::headers;
18 use crate::proto::h2::UpgradedSendStream;
19 use crate::proto::Dispatched;
20 use crate::upgrade::Upgraded;
21 use crate::{Body, Request, Response};
22
23 type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<Body>>;
24
25 ///// An mpsc channel is used to help notify the `Connection` task when *all*
26 ///// other handles to it have been dropped, so that it can shutdown.
27 type ConnDropRef = mpsc::Sender<Never>;
28
29 ///// A oneshot channel watches the `Connection` task, and when it completes,
30 ///// the "dispatch" task will be notified and can shutdown sooner.
31 type ConnEof = oneshot::Receiver<Never>;
32
33 // Our defaults are chosen for the "majority" case, which usually are not
34 // resource constrained, and so the spec default of 64kb can be too limiting
35 // for performance.
36 const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
37 const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
38 const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
39 const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb
40
41 #[derive(Clone, Debug)]
42 pub(crate) struct Config {
43 pub(crate) adaptive_window: bool,
44 pub(crate) initial_conn_window_size: u32,
45 pub(crate) initial_stream_window_size: u32,
46 pub(crate) max_frame_size: u32,
47 #[cfg(feature = "runtime")]
48 pub(crate) keep_alive_interval: Option<Duration>,
49 #[cfg(feature = "runtime")]
50 pub(crate) keep_alive_timeout: Duration,
51 #[cfg(feature = "runtime")]
52 pub(crate) keep_alive_while_idle: bool,
53 pub(crate) max_concurrent_reset_streams: Option<usize>,
54 pub(crate) max_send_buffer_size: usize,
55 }
56
57 impl Default for Config {
default() -> Config58 fn default() -> Config {
59 Config {
60 adaptive_window: false,
61 initial_conn_window_size: DEFAULT_CONN_WINDOW,
62 initial_stream_window_size: DEFAULT_STREAM_WINDOW,
63 max_frame_size: DEFAULT_MAX_FRAME_SIZE,
64 #[cfg(feature = "runtime")]
65 keep_alive_interval: None,
66 #[cfg(feature = "runtime")]
67 keep_alive_timeout: Duration::from_secs(20),
68 #[cfg(feature = "runtime")]
69 keep_alive_while_idle: false,
70 max_concurrent_reset_streams: None,
71 max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
72 }
73 }
74 }
75
new_builder(config: &Config) -> Builder76 fn new_builder(config: &Config) -> Builder {
77 let mut builder = Builder::default();
78 builder
79 .initial_window_size(config.initial_stream_window_size)
80 .initial_connection_window_size(config.initial_conn_window_size)
81 .max_frame_size(config.max_frame_size)
82 .max_send_buffer_size(config.max_send_buffer_size)
83 .enable_push(false);
84 if let Some(max) = config.max_concurrent_reset_streams {
85 builder.max_concurrent_reset_streams(max);
86 }
87 builder
88 }
89
new_ping_config(config: &Config) -> ping::Config90 fn new_ping_config(config: &Config) -> ping::Config {
91 ping::Config {
92 bdp_initial_window: if config.adaptive_window {
93 Some(config.initial_stream_window_size)
94 } else {
95 None
96 },
97 #[cfg(feature = "runtime")]
98 keep_alive_interval: config.keep_alive_interval,
99 #[cfg(feature = "runtime")]
100 keep_alive_timeout: config.keep_alive_timeout,
101 #[cfg(feature = "runtime")]
102 keep_alive_while_idle: config.keep_alive_while_idle,
103 }
104 }
105
handshake<T, B>( io: T, req_rx: ClientRx<B>, config: &Config, exec: Exec, ) -> crate::Result<ClientTask<B>> where T: AsyncRead + AsyncWrite + Send + Unpin + 'static, B: HttpBody, B::Data: Send + 'static,106 pub(crate) async fn handshake<T, B>(
107 io: T,
108 req_rx: ClientRx<B>,
109 config: &Config,
110 exec: Exec,
111 ) -> crate::Result<ClientTask<B>>
112 where
113 T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
114 B: HttpBody,
115 B::Data: Send + 'static,
116 {
117 let (h2_tx, mut conn) = new_builder(config)
118 .handshake::<_, SendBuf<B::Data>>(io)
119 .await
120 .map_err(crate::Error::new_h2)?;
121
122 // An mpsc channel is used entirely to detect when the
123 // 'Client' has been dropped. This is to get around a bug
124 // in h2 where dropping all SendRequests won't notify a
125 // parked Connection.
126 let (conn_drop_ref, rx) = mpsc::channel(1);
127 let (cancel_tx, conn_eof) = oneshot::channel();
128
129 let conn_drop_rx = rx.into_future().map(|(item, _rx)| {
130 if let Some(never) = item {
131 match never {}
132 }
133 });
134
135 let ping_config = new_ping_config(&config);
136
137 let (conn, ping) = if ping_config.is_enabled() {
138 let pp = conn.ping_pong().expect("conn.ping_pong");
139 let (recorder, mut ponger) = ping::channel(pp, ping_config);
140
141 let conn = future::poll_fn(move |cx| {
142 match ponger.poll(cx) {
143 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
144 conn.set_target_window_size(wnd);
145 conn.set_initial_window_size(wnd)?;
146 }
147 #[cfg(feature = "runtime")]
148 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
149 debug!("connection keep-alive timed out");
150 return Poll::Ready(Ok(()));
151 }
152 Poll::Pending => {}
153 }
154
155 Pin::new(&mut conn).poll(cx)
156 });
157 (Either::Left(conn), recorder)
158 } else {
159 (Either::Right(conn), ping::disabled())
160 };
161 let conn = conn.map_err(|e| debug!("connection error: {}", e));
162
163 exec.execute(conn_task(conn, conn_drop_rx, cancel_tx));
164
165 Ok(ClientTask {
166 ping,
167 conn_drop_ref,
168 conn_eof,
169 executor: exec,
170 h2_tx,
171 req_rx,
172 })
173 }
174
conn_task<C, D>(conn: C, drop_rx: D, cancel_tx: oneshot::Sender<Never>) where C: Future + Unpin, D: Future<Output = ()> + Unpin,175 async fn conn_task<C, D>(conn: C, drop_rx: D, cancel_tx: oneshot::Sender<Never>)
176 where
177 C: Future + Unpin,
178 D: Future<Output = ()> + Unpin,
179 {
180 match future::select(conn, drop_rx).await {
181 Either::Left(_) => {
182 // ok or err, the `conn` has finished
183 }
184 Either::Right(((), conn)) => {
185 // mpsc has been dropped, hopefully polling
186 // the connection some more should start shutdown
187 // and then close
188 trace!("send_request dropped, starting conn shutdown");
189 drop(cancel_tx);
190 let _ = conn.await;
191 }
192 }
193 }
194
195 pub(crate) struct ClientTask<B>
196 where
197 B: HttpBody,
198 {
199 ping: ping::Recorder,
200 conn_drop_ref: ConnDropRef,
201 conn_eof: ConnEof,
202 executor: Exec,
203 h2_tx: SendRequest<SendBuf<B::Data>>,
204 req_rx: ClientRx<B>,
205 }
206
207 impl<B> Future for ClientTask<B>
208 where
209 B: HttpBody + Send + 'static,
210 B::Data: Send,
211 B::Error: Into<Box<dyn StdError + Send + Sync>>,
212 {
213 type Output = crate::Result<Dispatched>;
214
poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>215 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
216 loop {
217 match ready!(self.h2_tx.poll_ready(cx)) {
218 Ok(()) => (),
219 Err(err) => {
220 self.ping.ensure_not_timed_out()?;
221 return if err.reason() == Some(::h2::Reason::NO_ERROR) {
222 trace!("connection gracefully shutdown");
223 Poll::Ready(Ok(Dispatched::Shutdown))
224 } else {
225 Poll::Ready(Err(crate::Error::new_h2(err)))
226 };
227 }
228 };
229
230 match self.req_rx.poll_recv(cx) {
231 Poll::Ready(Some((req, cb))) => {
232 // check that future hasn't been canceled already
233 if cb.is_canceled() {
234 trace!("request callback is canceled");
235 continue;
236 }
237 let (head, body) = req.into_parts();
238 let mut req = ::http::Request::from_parts(head, ());
239 super::strip_connection_headers(req.headers_mut(), true);
240 if let Some(len) = body.size_hint().exact() {
241 if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
242 headers::set_content_length_if_missing(req.headers_mut(), len);
243 }
244 }
245
246 let is_connect = req.method() == Method::CONNECT;
247 let eos = body.is_end_stream();
248 let ping = self.ping.clone();
249
250 if is_connect {
251 if headers::content_length_parse_all(req.headers())
252 .map_or(false, |len| len != 0)
253 {
254 warn!("h2 connect request with non-zero body not supported");
255 cb.send(Err((
256 crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
257 None,
258 )));
259 continue;
260 }
261 }
262
263 let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
264 Ok(ok) => ok,
265 Err(err) => {
266 debug!("client send request error: {}", err);
267 cb.send(Err((crate::Error::new_h2(err), None)));
268 continue;
269 }
270 };
271
272 let send_stream = if !is_connect {
273 if !eos {
274 let mut pipe =
275 Box::pin(PipeToSendStream::new(body, body_tx)).map(|res| {
276 if let Err(e) = res {
277 debug!("client request body error: {}", e);
278 }
279 });
280
281 // eagerly see if the body pipe is ready and
282 // can thus skip allocating in the executor
283 match Pin::new(&mut pipe).poll(cx) {
284 Poll::Ready(_) => (),
285 Poll::Pending => {
286 let conn_drop_ref = self.conn_drop_ref.clone();
287 // keep the ping recorder's knowledge of an
288 // "open stream" alive while this body is
289 // still sending...
290 let ping = ping.clone();
291 let pipe = pipe.map(move |x| {
292 drop(conn_drop_ref);
293 drop(ping);
294 x
295 });
296 self.executor.execute(pipe);
297 }
298 }
299 }
300
301 None
302 } else {
303 Some(body_tx)
304 };
305
306 let fut = fut.map(move |result| match result {
307 Ok(res) => {
308 // record that we got the response headers
309 ping.record_non_data();
310
311 let content_length = headers::content_length_parse_all(res.headers());
312 if let (Some(mut send_stream), StatusCode::OK) =
313 (send_stream, res.status())
314 {
315 if content_length.map_or(false, |len| len != 0) {
316 warn!("h2 connect response with non-zero body not supported");
317
318 send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
319 return Err((
320 crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
321 None,
322 ));
323 }
324 let (parts, recv_stream) = res.into_parts();
325 let mut res = Response::from_parts(parts, Body::empty());
326
327 let (pending, on_upgrade) = crate::upgrade::pending();
328 let io = H2Upgraded {
329 ping,
330 send_stream: unsafe { UpgradedSendStream::new(send_stream) },
331 recv_stream,
332 buf: Bytes::new(),
333 };
334 let upgraded = Upgraded::new(io, Bytes::new());
335
336 pending.fulfill(upgraded);
337 res.extensions_mut().insert(on_upgrade);
338
339 Ok(res)
340 } else {
341 let res = res.map(|stream| {
342 let ping = ping.for_stream(&stream);
343 crate::Body::h2(stream, content_length.into(), ping)
344 });
345 Ok(res)
346 }
347 }
348 Err(err) => {
349 ping.ensure_not_timed_out().map_err(|e| (e, None))?;
350
351 debug!("client response error: {}", err);
352 Err((crate::Error::new_h2(err), None))
353 }
354 });
355 self.executor.execute(cb.send_when(fut));
356 continue;
357 }
358
359 Poll::Ready(None) => {
360 trace!("client::dispatch::Sender dropped");
361 return Poll::Ready(Ok(Dispatched::Shutdown));
362 }
363
364 Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
365 Ok(never) => match never {},
366 Err(_conn_is_eof) => {
367 trace!("connection task is closed, closing dispatch task");
368 return Poll::Ready(Ok(Dispatched::Shutdown));
369 }
370 },
371 }
372 }
373 }
374 }
375