1 //! Client implementation of the HTTP/2.0 protocol.
2 //!
3 //! # Getting started
4 //!
5 //! Running an HTTP/2.0 client requires the caller to establish the underlying
6 //! connection as well as get the connection to a state that is ready to begin
7 //! the HTTP/2.0 handshake. See [here](../index.html#handshake) for more
8 //! details.
9 //!
10 //! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote
11 //! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades.
12 //!
13 //! Once a connection is obtained, it is passed to [`handshake`], which will
14 //! begin the [HTTP/2.0 handshake]. This returns a future that completes once
15 //! the handshake process is performed and HTTP/2.0 streams may be initialized.
16 //!
17 //! [`handshake`] uses default configuration values. There are a number of
18 //! settings that can be changed by using [`Builder`] instead.
19 //!
20 //! Once the handshake future completes, the caller is provided with a
21 //! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`]
22 //! instance is used to drive the connection (see [Managing the connection]).
23 //! The [`SendRequest`] instance is used to initialize new streams (see [Making
24 //! requests]).
25 //!
26 //! # Making requests
27 //!
28 //! Requests are made using the [`SendRequest`] handle provided by the handshake
29 //! future. Once a request is submitted, an HTTP/2.0 stream is initialized and
30 //! the request is sent to the server.
31 //!
32 //! A request body and request trailers are sent using [`SendRequest`] and the
33 //! server's response is returned once the [`ResponseFuture`] future completes.
34 //! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by
35 //! [`SendRequest::send_request`] and are tied to the HTTP/2.0 stream
36 //! initialized by the sent request.
37 //!
38 //! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2.0
39 //! stream can be created, i.e. as long as the current number of active streams
40 //! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the
41 //! caller will be notified once an existing stream closes, freeing capacity for
42 //! the caller. The caller should use [`SendRequest::poll_ready`] to check for
43 //! capacity before sending a request to the server.
44 //!
45 //! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user
46 //! must not send a request if `poll_ready` does not return `Ready`. Attempting
47 //! to do so will result in an [`Error`] being returned.
48 //!
49 //! # Managing the connection
50 //!
51 //! The [`Connection`] instance is used to manage connection state. The caller
52 //! is required to call [`Connection::poll`] in order to advance state.
53 //! [`SendRequest::send_request`] and other functions have no effect unless
54 //! [`Connection::poll`] is called.
55 //!
56 //! The [`Connection`] instance should only be dropped once [`Connection::poll`]
57 //! returns `Ready`. At this point, the underlying socket has been closed and no
58 //! further work needs to be done.
59 //!
60 //! The easiest way to ensure that the [`Connection`] instance gets polled is to
61 //! submit the [`Connection`] instance to an [executor]. The executor will then
62 //! manage polling the connection until the connection is complete.
63 //! Alternatively, the caller can call `poll` manually.
64 //!
65 //! # Example
66 //!
67 //! ```rust
68 //! extern crate futures;
69 //! extern crate h2;
70 //! extern crate http;
71 //! extern crate tokio_core;
72 //!
73 //! use h2::client;
74 //!
75 //! use futures::*;
76 //! # use futures::future::ok;
77 //! use http::*;
78 //!
79 //! use tokio_core::net::TcpStream;
80 //! use tokio_core::reactor;
81 //!
82 //! pub fn main() {
83 //! let mut core = reactor::Core::new().unwrap();
84 //! let handle = core.handle();
85 //!
86 //! let addr = "127.0.0.1:5928".parse().unwrap();
87 //!
88 //! core.run({
89 //! # let _ =
90 //! // Establish TCP connection to the server.
91 //! TcpStream::connect(&addr, &handle)
92 //! .map_err(|_| {
93 //! panic!("failed to establish TCP connection")
94 //! })
95 //! .and_then(|tcp| client::handshake(tcp))
96 //! .and_then(|(h2, connection)| {
97 //! let connection = connection
98 //! .map_err(|_| panic!("HTTP/2.0 connection failed"));
99 //!
100 //! // Spawn a new task to drive the connection state
101 //! handle.spawn(connection);
102 //!
103 //! // Wait until the `SendRequest` handle has available
104 //! // capacity.
105 //! h2.ready()
106 //! })
107 //! .and_then(|mut h2| {
108 //! // Prepare the HTTP request to send to the server.
109 //! let request = Request::builder()
110 //! .method(Method::GET)
111 //! .uri("https://www.example.com/")
112 //! .body(())
113 //! .unwrap();
114 //!
115 //! // Send the request. The second tuple item allows the caller
116 //! // to stream a request body.
117 //! let (response, _) = h2.send_request(request, true).unwrap();
118 //!
119 //! response.and_then(|response| {
120 //! let (head, mut body) = response.into_parts();
121 //!
122 //! println!("Received response: {:?}", head);
123 //!
124 //! // The `release_capacity` handle allows the caller to manage
125 //! // flow control.
126 //! //
127 //! // Whenever data is received, the caller is responsible for
128 //! // releasing capacity back to the server once it has freed
129 //! // the data from memory.
130 //! let mut release_capacity = body.release_capacity().clone();
131 //!
132 //! body.for_each(move |chunk| {
133 //! println!("RX: {:?}", chunk);
134 //!
135 //! // Let the server send more data.
136 //! let _ = release_capacity.release_capacity(chunk.len());
137 //!
138 //! Ok(())
139 //! })
140 //! })
141 //! })
142 //! # ;
143 //! # ok::<_, ()>(())
144 //! }).ok().expect("failed to perform HTTP/2.0 request");
145 //! }
146 //! ```
147 //!
148 //! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html
149 //! [`handshake`]: fn.handshake.html
150 //! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
151 //! [`SendRequest`]: struct.SendRequest.html
152 //! [`SendStream`]: ../struct.SendStream.html
153 //! [Making requests]: #making-requests
154 //! [Managing the connection]: #managing-the-connection
155 //! [`Connection`]: struct.Connection.html
156 //! [`Connection::poll`]: struct.Connection.html#method.poll
157 //! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request
158 //! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues
159 //! [`SendRequest`]: struct.SendRequest.html
160 //! [`ResponseFuture`]: struct.ResponseFuture.html
161 //! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready
162 //! [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
163 //! [`Builder`]: struct.Builder.html
164 //! [`Error`]: ../struct.Error.html
165
166 use {SendStream, RecvStream, ReleaseCapacity};
167 use codec::{Codec, RecvError, SendError, UserError};
168 use frame::{Headers, Pseudo, Reason, Settings, StreamId};
169 use proto;
170
171 use bytes::{Bytes, IntoBuf};
172 use futures::{Async, Future, Poll};
173 use http::{uri, Request, Response, Method, Version};
174 use tokio_io::{AsyncRead, AsyncWrite};
175 use tokio_io::io::WriteAll;
176
177 use std::fmt;
178 use std::marker::PhantomData;
179 use std::time::Duration;
180 use std::usize;
181
182 /// Performs the HTTP/2.0 connection handshake.
183 ///
184 /// This type implements `Future`, yielding a `(SendRequest, Connection)`
185 /// instance once the handshake has completed.
186 ///
187 /// The handshake is completed once both the connection preface and the initial
188 /// settings frame is sent by the client.
189 ///
190 /// The handshake future does not wait for the initial settings frame from the
191 /// server.
192 ///
193 /// See [module] level documentation for more details.
194 ///
195 /// [module]: index.html
196 #[must_use = "futures do nothing unless polled"]
197 pub struct Handshake<T, B: IntoBuf = Bytes> {
198 builder: Builder,
199 inner: WriteAll<T, &'static [u8]>,
200 _marker: PhantomData<B>,
201 }
202
203 /// Initializes new HTTP/2.0 streams on a connection by sending a request.
204 ///
205 /// This type does no work itself. Instead, it is a handle to the inner
206 /// connection state held by [`Connection`]. If the associated connection
207 /// instance is dropped, all `SendRequest` functions will return [`Error`].
208 ///
209 /// [`SendRequest`] instances are able to move to and operate on separate tasks
210 /// / threads than their associated [`Connection`] instance. Internally, there
211 /// is a buffer used to stage requests before they get written to the
212 /// connection. There is no guarantee that requests get written to the
213 /// connection in FIFO order as HTTP/2.0 prioritization logic can play a role.
214 ///
215 /// [`SendRequest`] implements [`Clone`], enabling the creation of many
216 /// instances that are backed by a single connection.
217 ///
218 /// See [module] level documentation for more details.
219 ///
220 /// [module]: index.html
221 /// [`Connection`]: struct.Connection.html
222 /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
223 /// [`Error`]: ../struct.Error.html
224 pub struct SendRequest<B: IntoBuf> {
225 inner: proto::Streams<B::Buf, Peer>,
226 pending: Option<proto::OpaqueStreamRef>,
227 }
228
229 /// Returns a `SendRequest` instance once it is ready to send at least one
230 /// request.
231 #[derive(Debug)]
232 pub struct ReadySendRequest<B: IntoBuf> {
233 inner: Option<SendRequest<B>>,
234 }
235
236 /// Manages all state associated with an HTTP/2.0 client connection.
237 ///
238 /// A `Connection` is backed by an I/O resource (usually a TCP socket) and
239 /// implements the HTTP/2.0 client logic for that connection. It is responsible
240 /// for driving the internal state forward, performing the work requested of the
241 /// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`],
242 /// [`RecvStream`]).
243 ///
244 /// `Connection` values are created by calling [`handshake`]. Once a
245 /// `Connection` value is obtained, the caller must repeatedly call [`poll`]
246 /// until `Ready` is returned. The easiest way to do this is to submit the
247 /// `Connection` instance to an [executor].
248 ///
249 /// [module]: index.html
250 /// [`handshake`]: fn.handshake.html
251 /// [`SendRequest`]: struct.SendRequest.html
252 /// [`ResponseFuture`]: struct.ResponseFuture.html
253 /// [`SendStream`]: ../struct.SendStream.html
254 /// [`RecvStream`]: ../struct.RecvStream.html
255 /// [`poll`]: #method.poll
256 /// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
257 ///
258 /// # Examples
259 ///
260 /// ```
261 /// # extern crate bytes;
262 /// # extern crate futures;
263 /// # extern crate h2;
264 /// # extern crate tokio_io;
265 /// # use futures::{Future, Stream};
266 /// # use futures::future::Executor;
267 /// # use tokio_io::*;
268 /// # use h2::client;
269 /// # use h2::client::*;
270 /// #
271 /// # fn doc<T, E>(my_io: T, my_executor: E)
272 /// # where T: AsyncRead + AsyncWrite + 'static,
273 /// # E: Executor<Box<Future<Item = (), Error = ()>>>,
274 /// # {
275 /// client::handshake(my_io)
276 /// .and_then(|(send_request, connection)| {
277 /// // Submit the connection handle to an executor.
278 /// my_executor.execute(
279 /// # Box::new(
280 /// connection.map_err(|_| panic!("connection failed"))
281 /// # )
282 /// ).unwrap();
283 ///
284 /// // Now, use `send_request` to initialize HTTP/2.0 streams.
285 /// // ...
286 /// # drop(send_request);
287 /// # Ok(())
288 /// })
289 /// # .wait().unwrap();
290 /// # }
291 /// #
292 /// # pub fn main() {}
293 /// ```
294 #[must_use = "futures do nothing unless polled"]
295 pub struct Connection<T, B: IntoBuf = Bytes> {
296 inner: proto::Connection<T, Peer, B>,
297 }
298
299 /// A future of an HTTP response.
300 #[derive(Debug)]
301 #[must_use = "futures do nothing unless polled"]
302 pub struct ResponseFuture {
303 inner: proto::OpaqueStreamRef,
304 }
305
306 /// Builds client connections with custom configuration values.
307 ///
308 /// Methods can be chained in order to set the configuration values.
309 ///
310 /// The client is constructed by calling [`handshake`] and passing the I/O
311 /// handle that will back the HTTP/2.0 server.
312 ///
313 /// New instances of `Builder` are obtained via [`Builder::new`].
314 ///
315 /// See function level documentation for details on the various client
316 /// configuration settings.
317 ///
318 /// [`Builder::new`]: struct.Builder.html#method.new
319 /// [`handshake`]: struct.Builder.html#method.handshake
320 ///
321 /// # Examples
322 ///
323 /// ```
324 /// # extern crate h2;
325 /// # extern crate tokio_io;
326 /// # use tokio_io::*;
327 /// # use h2::client::*;
328 /// #
329 /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
330 /// # -> Handshake<T>
331 /// # {
332 /// // `client_fut` is a future representing the completion of the HTTP/2.0
333 /// // handshake.
334 /// let client_fut = Builder::new()
335 /// .initial_window_size(1_000_000)
336 /// .max_concurrent_streams(1000)
337 /// .handshake(my_io);
338 /// # client_fut
339 /// # }
340 /// #
341 /// # pub fn main() {}
342 /// ```
343 #[derive(Clone, Debug)]
344 pub struct Builder {
345 /// Time to keep locally reset streams around before reaping.
346 reset_stream_duration: Duration,
347
348 /// Initial maximum number of locally initiated (send) streams.
349 /// After receiving a Settings frame from the remote peer,
350 /// the connection will overwrite this value with the
351 /// MAX_CONCURRENT_STREAMS specified in the frame.
352 initial_max_send_streams: usize,
353
354 /// Initial target window size for new connections.
355 initial_target_connection_window_size: Option<u32>,
356
357 /// Maximum number of locally reset streams to keep at a time.
358 reset_stream_max: usize,
359
360 /// Initial `Settings` frame to send as part of the handshake.
361 settings: Settings,
362
363 /// The stream ID of the first (lowest) stream. Subsequent streams will use
364 /// monotonically increasing stream IDs.
365 stream_id: StreamId,
366 }
367
368 #[derive(Debug)]
369 pub(crate) struct Peer;
370
371 // ===== impl SendRequest =====
372
373 impl<B> SendRequest<B>
374 where
375 B: IntoBuf,
376 B::Buf: 'static,
377 {
378 /// Returns `Ready` when the connection can initialize a new HTTP/2.0
379 /// stream.
380 ///
381 /// This function must return `Ready` before `send_request` is called. When
382 /// `NotReady` is returned, the task will be notified once the readiness
383 /// state changes.
384 ///
385 /// See [module] level docs for more details.
386 ///
387 /// [module]: index.html
poll_ready(&mut self) -> Poll<(), ::Error>388 pub fn poll_ready(&mut self) -> Poll<(), ::Error> {
389 try_ready!(self.inner.poll_pending_open(self.pending.as_ref()));
390 self.pending = None;
391 Ok(().into())
392 }
393
394 /// Consumes `self`, returning a future that returns `self` back once it is
395 /// ready to send a request.
396 ///
397 /// This function should be called before calling `send_request`.
398 ///
399 /// This is a functional combinator for [`poll_ready`]. The returned future
400 /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to
401 /// the caller.
402 ///
403 /// # Examples
404 ///
405 /// ```rust
406 /// # extern crate futures;
407 /// # extern crate h2;
408 /// # extern crate http;
409 /// # use futures::*;
410 /// # use h2::client::*;
411 /// # use http::*;
412 /// # fn doc(send_request: SendRequest<&'static [u8]>)
413 /// # {
414 /// // First, wait until the `send_request` handle is ready to send a new
415 /// // request
416 /// send_request.ready()
417 /// .and_then(|mut send_request| {
418 /// // Use `send_request` here.
419 /// # Ok(())
420 /// })
421 /// # .wait().unwrap();
422 /// # }
423 /// # pub fn main() {}
424 /// ```
425 ///
426 /// See [module] level docs for more details.
427 ///
428 /// [module]: index.html
ready(self) -> ReadySendRequest<B>429 pub fn ready(self) -> ReadySendRequest<B> {
430 ReadySendRequest { inner: Some(self) }
431 }
432
433 /// Sends a HTTP/2.0 request to the server.
434 ///
435 /// `send_request` initializes a new HTTP/2.0 stream on the associated
436 /// connection, then sends the given request using this new stream. Only the
437 /// request head is sent.
438 ///
439 /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance
440 /// are returned. The [`ResponseFuture`] instance is used to get the
441 /// server's response and the [`SendStream`] instance is used to send a
442 /// request body or trailers to the server over the same HTTP/2.0 stream.
443 ///
444 /// To send a request body or trailers, set `end_of_stream` to `false`.
445 /// Then, use the returned [`SendStream`] instance to stream request body
446 /// chunks or send trailers. If `end_of_stream` is **not** set to `false`
447 /// then attempting to call [`SendStream::send_data`] or
448 /// [`SendStream::send_trailers`] will result in an error.
449 ///
450 /// If no request body or trailers are to be sent, set `end_of_stream` to
451 /// `true` and drop the returned [`SendStream`] instance.
452 ///
453 /// # A note on HTTP versions
454 ///
455 /// The provided `Request` will be encoded differently depending on the
456 /// value of its version field. If the version is set to 2.0, then the
457 /// request is encoded as per the specification recommends.
458 ///
459 /// If the version is set to a lower value, then the request is encoded to
460 /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host
461 /// headers are permitted and the `:authority` pseudo header is not
462 /// included.
463 ///
464 /// The caller should always set the request's version field to 2.0 unless
465 /// specifically transmitting an HTTP 1.1 request over 2.0.
466 ///
467 /// # Examples
468 ///
469 /// Sending a request with no body
470 ///
471 /// ```rust
472 /// # extern crate futures;
473 /// # extern crate h2;
474 /// # extern crate http;
475 /// # use futures::*;
476 /// # use h2::client::*;
477 /// # use http::*;
478 /// # fn doc(send_request: SendRequest<&'static [u8]>)
479 /// # {
480 /// // First, wait until the `send_request` handle is ready to send a new
481 /// // request
482 /// send_request.ready()
483 /// .and_then(|mut send_request| {
484 /// // Prepare the HTTP request to send to the server.
485 /// let request = Request::get("https://www.example.com/")
486 /// .body(())
487 /// .unwrap();
488 ///
489 /// // Send the request to the server. Since we are not sending a
490 /// // body or trailers, we can drop the `SendStream` instance.
491 /// let (response, _) = send_request
492 /// .send_request(request, true).unwrap();
493 ///
494 /// response
495 /// })
496 /// .and_then(|response| {
497 /// // Process the response
498 /// # Ok(())
499 /// })
500 /// # .wait().unwrap();
501 /// # }
502 /// # pub fn main() {}
503 /// ```
504 ///
505 /// Sending a request with a body and trailers
506 ///
507 /// ```rust
508 /// # extern crate futures;
509 /// # extern crate h2;
510 /// # extern crate http;
511 /// # use futures::*;
512 /// # use h2::client::*;
513 /// # use http::*;
514 /// # fn doc(send_request: SendRequest<&'static [u8]>)
515 /// # {
516 /// // First, wait until the `send_request` handle is ready to send a new
517 /// // request
518 /// send_request.ready()
519 /// .and_then(|mut send_request| {
520 /// // Prepare the HTTP request to send to the server.
521 /// let request = Request::get("https://www.example.com/")
522 /// .body(())
523 /// .unwrap();
524 ///
525 /// // Send the request to the server. Since we are not sending a
526 /// // body or trailers, we can drop the `SendStream` instance.
527 /// let (response, mut send_stream) = send_request
528 /// .send_request(request, false).unwrap();
529 ///
530 /// // At this point, one option would be to wait for send capacity.
531 /// // Doing so would allow us to not hold data in memory that
532 /// // cannot be sent. However, this is not a requirement, so this
533 /// // example will skip that step. See `SendStream` documentation
534 /// // for more details.
535 /// send_stream.send_data(b"hello", false).unwrap();
536 /// send_stream.send_data(b"world", false).unwrap();
537 ///
538 /// // Send the trailers.
539 /// let mut trailers = HeaderMap::new();
540 /// trailers.insert(
541 /// header::HeaderName::from_bytes(b"my-trailer").unwrap(),
542 /// header::HeaderValue::from_bytes(b"hello").unwrap());
543 ///
544 /// send_stream.send_trailers(trailers).unwrap();
545 ///
546 /// response
547 /// })
548 /// .and_then(|response| {
549 /// // Process the response
550 /// # Ok(())
551 /// })
552 /// # .wait().unwrap();
553 /// # }
554 /// # pub fn main() {}
555 /// ```
556 ///
557 /// [`ResponseFuture`]: struct.ResponseFuture.html
558 /// [`SendStream`]: ../struct.SendStream.html
559 /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data
560 /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers
send_request( &mut self, request: Request<()>, end_of_stream: bool, ) -> Result<(ResponseFuture, SendStream<B>), ::Error>561 pub fn send_request(
562 &mut self,
563 request: Request<()>,
564 end_of_stream: bool,
565 ) -> Result<(ResponseFuture, SendStream<B>), ::Error> {
566 self.inner
567 .send_request(request, end_of_stream, self.pending.as_ref())
568 .map_err(Into::into)
569 .map(|stream| {
570 if stream.is_pending_open() {
571 self.pending = Some(stream.clone_to_opaque());
572 }
573
574 let response = ResponseFuture {
575 inner: stream.clone_to_opaque(),
576 };
577
578 let stream = SendStream::new(stream);
579
580 (response, stream)
581 })
582 }
583 }
584
585 impl<B> fmt::Debug for SendRequest<B>
586 where
587 B: IntoBuf,
588 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result589 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
590 fmt.debug_struct("SendRequest").finish()
591 }
592 }
593
594 impl<B> Clone for SendRequest<B>
595 where
596 B: IntoBuf,
597 {
clone(&self) -> Self598 fn clone(&self) -> Self {
599 SendRequest {
600 inner: self.inner.clone(),
601 pending: None,
602 }
603 }
604 }
605
606 #[cfg(feature = "unstable")]
607 impl<B> SendRequest<B>
608 where
609 B: IntoBuf,
610 {
611 /// Returns the number of active streams.
612 ///
613 /// An active stream is a stream that has not yet transitioned to a closed
614 /// state.
num_active_streams(&self) -> usize615 pub fn num_active_streams(&self) -> usize {
616 self.inner.num_active_streams()
617 }
618
619 /// Returns the number of streams that are held in memory.
620 ///
621 /// A wired stream is a stream that is either active or is closed but must
622 /// stay in memory for some reason. For example, there are still outstanding
623 /// userspace handles pointing to the slot.
num_wired_streams(&self) -> usize624 pub fn num_wired_streams(&self) -> usize {
625 self.inner.num_wired_streams()
626 }
627 }
628
629 // ===== impl ReadySendRequest =====
630
631 impl<B> Future for ReadySendRequest<B>
632 where B: IntoBuf,
633 B::Buf: 'static,
634 {
635 type Item = SendRequest<B>;
636 type Error = ::Error;
637
poll(&mut self) -> Poll<Self::Item, Self::Error>638 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
639 match self.inner {
640 Some(ref mut send_request) => {
641 let _ = try_ready!(send_request.poll_ready());
642 }
643 None => panic!("called `poll` after future completed"),
644 }
645
646 Ok(self.inner.take().unwrap().into())
647 }
648 }
649
650 // ===== impl Builder =====
651
652 impl Builder {
653 /// Returns a new client builder instance initialized with default
654 /// configuration values.
655 ///
656 /// Configuration methods can be chained on the return value.
657 ///
658 /// # Examples
659 ///
660 /// ```
661 /// # extern crate h2;
662 /// # extern crate tokio_io;
663 /// # use tokio_io::*;
664 /// # use h2::client::*;
665 /// #
666 /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
667 /// # -> Handshake<T>
668 /// # {
669 /// // `client_fut` is a future representing the completion of the HTTP/2.0
670 /// // handshake.
671 /// let client_fut = Builder::new()
672 /// .initial_window_size(1_000_000)
673 /// .max_concurrent_streams(1000)
674 /// .handshake(my_io);
675 /// # client_fut
676 /// # }
677 /// #
678 /// # pub fn main() {}
679 /// ```
new() -> Builder680 pub fn new() -> Builder {
681 Builder {
682 reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
683 reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
684 initial_target_connection_window_size: None,
685 initial_max_send_streams: usize::MAX,
686 settings: Default::default(),
687 stream_id: 1.into(),
688 }
689 }
690
691 /// Indicates the initial window size (in octets) for stream-level
692 /// flow control for received data.
693 ///
694 /// The initial window of a stream is used as part of flow control. For more
695 /// details, see [`ReleaseCapacity`].
696 ///
697 /// The default value is 65,535.
698 ///
699 /// [`ReleaseCapacity`]: ../struct.ReleaseCapacity.html
700 ///
701 /// # Examples
702 ///
703 /// ```
704 /// # extern crate h2;
705 /// # extern crate tokio_io;
706 /// # use tokio_io::*;
707 /// # use h2::client::*;
708 /// #
709 /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
710 /// # -> Handshake<T>
711 /// # {
712 /// // `client_fut` is a future representing the completion of the HTTP/2.0
713 /// // handshake.
714 /// let client_fut = Builder::new()
715 /// .initial_window_size(1_000_000)
716 /// .handshake(my_io);
717 /// # client_fut
718 /// # }
719 /// #
720 /// # pub fn main() {}
721 /// ```
initial_window_size(&mut self, size: u32) -> &mut Self722 pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
723 self.settings.set_initial_window_size(Some(size));
724 self
725 }
726
727 /// Indicates the initial window size (in octets) for connection-level flow control
728 /// for received data.
729 ///
730 /// The initial window of a connection is used as part of flow control. For more details,
731 /// see [`ReleaseCapacity`].
732 ///
733 /// The default value is 65,535.
734 ///
735 /// [`ReleaseCapacity`]: ../struct.ReleaseCapacity.html
736 ///
737 /// # Examples
738 ///
739 /// ```
740 /// # extern crate h2;
741 /// # extern crate tokio_io;
742 /// # use tokio_io::*;
743 /// # use h2::client::*;
744 /// #
745 /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
746 /// # -> Handshake<T>
747 /// # {
748 /// // `client_fut` is a future representing the completion of the HTTP/2.0
749 /// // handshake.
750 /// let client_fut = Builder::new()
751 /// .initial_connection_window_size(1_000_000)
752 /// .handshake(my_io);
753 /// # client_fut
754 /// # }
755 /// #
756 /// # pub fn main() {}
757 /// ```
initial_connection_window_size(&mut self, size: u32) -> &mut Self758 pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
759 self.initial_target_connection_window_size = Some(size);
760 self
761 }
762
763 /// Indicates the size (in octets) of the largest HTTP/2.0 frame payload that the
764 /// configured client is able to accept.
765 ///
766 /// The sender may send data frames that are **smaller** than this value,
767 /// but any data larger than `max` will be broken up into multiple `DATA`
768 /// frames.
769 ///
770 /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
771 ///
772 /// # Examples
773 ///
774 /// ```
775 /// # extern crate h2;
776 /// # extern crate tokio_io;
777 /// # use tokio_io::*;
778 /// # use h2::client::*;
779 /// #
780 /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
781 /// # -> Handshake<T>
782 /// # {
783 /// // `client_fut` is a future representing the completion of the HTTP/2.0
784 /// // handshake.
785 /// let client_fut = Builder::new()
786 /// .max_frame_size(1_000_000)
787 /// .handshake(my_io);
788 /// # client_fut
789 /// # }
790 /// #
791 /// # pub fn main() {}
792 /// ```
793 ///
794 /// # Panics
795 ///
796 /// This function panics if `max` is not within the legal range specified
797 /// above.
max_frame_size(&mut self, max: u32) -> &mut Self798 pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
799 self.settings.set_max_frame_size(Some(max));
800 self
801 }
802
803 /// Sets the max size of received header frames.
804 ///
805 /// This advisory setting informs a peer of the maximum size of header list
806 /// that the sender is prepared to accept, in octets. The value is based on
807 /// the uncompressed size of header fields, including the length of the name
808 /// and value in octets plus an overhead of 32 octets for each header field.
809 ///
810 /// This setting is also used to limit the maximum amount of data that is
811 /// buffered to decode HEADERS frames.
812 ///
813 /// # Examples
814 ///
815 /// ```
816 /// # extern crate h2;
817 /// # extern crate tokio_io;
818 /// # use tokio_io::*;
819 /// # use h2::client::*;
820 /// #
821 /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
822 /// # -> Handshake<T>
823 /// # {
824 /// // `client_fut` is a future representing the completion of the HTTP/2.0
825 /// // handshake.
826 /// let client_fut = Builder::new()
827 /// .max_header_list_size(16 * 1024)
828 /// .handshake(my_io);
829 /// # client_fut
830 /// # }
831 /// #
832 /// # pub fn main() {}
833 /// ```
max_header_list_size(&mut self, max: u32) -> &mut Self834 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
835 self.settings.set_max_header_list_size(Some(max));
836 self
837 }
838
839 /// Sets the maximum number of concurrent streams.
840 ///
841 /// The maximum concurrent streams setting only controls the maximum number
842 /// of streams that can be initiated by the remote peer. In other words,
843 /// when this setting is set to 100, this does not limit the number of
844 /// concurrent streams that can be created by the caller.
845 ///
846 /// It is recommended that this value be no smaller than 100, so as to not
847 /// unnecessarily limit parallelism. However, any value is legal, including
848 /// 0. If `max` is set to 0, then the remote will not be permitted to
849 /// initiate streams.
850 ///
851 /// Note that streams in the reserved state, i.e., push promises that have
852 /// been reserved but the stream has not started, do not count against this
853 /// setting.
854 ///
855 /// Also note that if the remote *does* exceed the value set here, it is not
856 /// a protocol level error. Instead, the `h2` library will immediately reset
857 /// the stream.
858 ///
859 /// See [Section 5.1.2] in the HTTP/2.0 spec for more details.
860 ///
861 /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
862 ///
863 /// # Examples
864 ///
865 /// ```
866 /// # extern crate h2;
867 /// # extern crate tokio_io;
868 /// # use tokio_io::*;
869 /// # use h2::client::*;
870 /// #
871 /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
872 /// # -> Handshake<T>
873 /// # {
874 /// // `client_fut` is a future representing the completion of the HTTP/2.0
875 /// // handshake.
876 /// let client_fut = Builder::new()
877 /// .max_concurrent_streams(1000)
878 /// .handshake(my_io);
879 /// # client_fut
880 /// # }
881 /// #
882 /// # pub fn main() {}
883 /// ```
max_concurrent_streams(&mut self, max: u32) -> &mut Self884 pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
885 self.settings.set_max_concurrent_streams(Some(max));
886 self
887 }
888
889 /// Sets the initial maximum of locally initiated (send) streams.
890 ///
891 /// The initial settings will be overwritten by the remote peer when
892 /// the Settings frame is received. The new value will be set to the
893 /// `max_concurrent_streams()` from the frame.
894 ///
895 /// This setting prevents the caller from exceeding this number of
896 /// streams that are counted towards the concurrency limit.
897 ///
898 /// Sending streams past the limit returned by the peer will be treated
899 /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM.
900 ///
901 /// See [Section 5.1.2] in the HTTP/2.0 spec for more details.
902 ///
903 /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
904 ///
905 /// # Examples
906 ///
907 /// ```
908 /// # extern crate h2;
909 /// # extern crate tokio_io;
910 /// # use tokio_io::*;
911 /// # use h2::client::*;
912 /// #
913 /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
914 /// # -> Handshake<T>
915 /// # {
916 /// // `client_fut` is a future representing the completion of the HTTP/2.0
917 /// // handshake.
918 /// let client_fut = Builder::new()
919 /// .initial_max_send_streams(1000)
920 /// .handshake(my_io);
921 /// # client_fut
922 /// # }
923 /// #
924 /// # pub fn main() {}
925 /// ```
initial_max_send_streams(&mut self, initial: usize) -> &mut Self926 pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self {
927 self.initial_max_send_streams = initial;
928 self
929 }
930
931 /// Sets the maximum number of concurrent locally reset streams.
932 ///
933 /// When a stream is explicitly reset by either calling
934 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
935 /// before completing the stream, the HTTP/2.0 specification requires that
936 /// any further frames received for that stream must be ignored for "some
937 /// time".
938 ///
939 /// In order to satisfy the specification, internal state must be maintained
940 /// to implement the behavior. This state grows linearly with the number of
941 /// streams that are locally reset.
942 ///
943 /// The `max_concurrent_reset_streams` setting configures sets an upper
944 /// bound on the amount of state that is maintained. When this max value is
945 /// reached, the oldest reset stream is purged from memory.
946 ///
947 /// Once the stream has been fully purged from memory, any additional frames
948 /// received for that stream will result in a connection level protocol
949 /// error, forcing the connection to terminate.
950 ///
951 /// The default value is 10.
952 ///
953 /// # Examples
954 ///
955 /// ```
956 /// # extern crate h2;
957 /// # extern crate tokio_io;
958 /// # use tokio_io::*;
959 /// # use h2::client::*;
960 /// #
961 /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
962 /// # -> Handshake<T>
963 /// # {
964 /// // `client_fut` is a future representing the completion of the HTTP/2.0
965 /// // handshake.
966 /// let client_fut = Builder::new()
967 /// .max_concurrent_reset_streams(1000)
968 /// .handshake(my_io);
969 /// # client_fut
970 /// # }
971 /// #
972 /// # pub fn main() {}
973 /// ```
max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self974 pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
975 self.reset_stream_max = max;
976 self
977 }
978
979 /// Sets the maximum number of concurrent locally reset streams.
980 ///
981 /// When a stream is explicitly reset by either calling
982 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
983 /// before completing the stream, the HTTP/2.0 specification requires that
984 /// any further frames received for that stream must be ignored for "some
985 /// time".
986 ///
987 /// In order to satisfy the specification, internal state must be maintained
988 /// to implement the behavior. This state grows linearly with the number of
989 /// streams that are locally reset.
990 ///
991 /// The `reset_stream_duration` setting configures the max amount of time
992 /// this state will be maintained in memory. Once the duration elapses, the
993 /// stream state is purged from memory.
994 ///
995 /// Once the stream has been fully purged from memory, any additional frames
996 /// received for that stream will result in a connection level protocol
997 /// error, forcing the connection to terminate.
998 ///
999 /// The default value is 30 seconds.
1000 ///
1001 /// # Examples
1002 ///
1003 /// ```
1004 /// # extern crate h2;
1005 /// # extern crate tokio_io;
1006 /// # use tokio_io::*;
1007 /// # use h2::client::*;
1008 /// # use std::time::Duration;
1009 /// #
1010 /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
1011 /// # -> Handshake<T>
1012 /// # {
1013 /// // `client_fut` is a future representing the completion of the HTTP/2.0
1014 /// // handshake.
1015 /// let client_fut = Builder::new()
1016 /// .reset_stream_duration(Duration::from_secs(10))
1017 /// .handshake(my_io);
1018 /// # client_fut
1019 /// # }
1020 /// #
1021 /// # pub fn main() {}
1022 /// ```
reset_stream_duration(&mut self, dur: Duration) -> &mut Self1023 pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
1024 self.reset_stream_duration = dur;
1025 self
1026 }
1027
1028 /// Enables or disables server push promises.
1029 ///
1030 /// This value is included in the initial SETTINGS handshake. When set, the
1031 /// server MUST NOT send a push promise. Setting this value to value to
1032 /// false in the initial SETTINGS handshake guarantees that the remote server
1033 /// will never send a push promise.
1034 ///
1035 /// This setting can be changed during the life of a single HTTP/2.0
1036 /// connection by sending another settings frame updating the value.
1037 ///
1038 /// Default value: `true`.
1039 ///
1040 /// # Examples
1041 ///
1042 /// ```
1043 /// # extern crate h2;
1044 /// # extern crate tokio_io;
1045 /// # use tokio_io::*;
1046 /// # use h2::client::*;
1047 /// # use std::time::Duration;
1048 /// #
1049 /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
1050 /// # -> Handshake<T>
1051 /// # {
1052 /// // `client_fut` is a future representing the completion of the HTTP/2.0
1053 /// // handshake.
1054 /// let client_fut = Builder::new()
1055 /// .enable_push(false)
1056 /// .handshake(my_io);
1057 /// # client_fut
1058 /// # }
1059 /// #
1060 /// # pub fn main() {}
1061 /// ```
enable_push(&mut self, enabled: bool) -> &mut Self1062 pub fn enable_push(&mut self, enabled: bool) -> &mut Self {
1063 self.settings.set_enable_push(enabled);
1064 self
1065 }
1066
1067 /// Sets the first stream ID to something other than 1.
1068 #[cfg(feature = "unstable")]
initial_stream_id(&mut self, stream_id: u32) -> &mut Self1069 pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self {
1070 self.stream_id = stream_id.into();
1071 assert!(
1072 self.stream_id.is_client_initiated(),
1073 "stream id must be odd"
1074 );
1075 self
1076 }
1077
1078 /// Creates a new configured HTTP/2.0 client backed by `io`.
1079 ///
1080 /// It is expected that `io` already be in an appropriate state to commence
1081 /// the [HTTP/2.0 handshake]. See [Handshake] for more details.
1082 ///
1083 /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1084 /// tuple once the HTTP/2.0 handshake has been completed.
1085 ///
1086 /// This function also allows the caller to configure the send payload data
1087 /// type. See [Outbound data type] for more details.
1088 ///
1089 /// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1090 /// [Handshake]: ../index.html#handshake
1091 /// [`Connection`]: struct.Connection.html
1092 /// [`SendRequest`]: struct.SendRequest.html
1093 /// [Outbound data type]: ../index.html#outbound-data-type.
1094 ///
1095 /// # Examples
1096 ///
1097 /// Basic usage:
1098 ///
1099 /// ```
1100 /// # extern crate h2;
1101 /// # extern crate tokio_io;
1102 /// # use tokio_io::*;
1103 /// # use h2::client::*;
1104 /// #
1105 /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
1106 /// # -> Handshake<T>
1107 /// # {
1108 /// // `client_fut` is a future representing the completion of the HTTP/2.0
1109 /// // handshake.
1110 /// let client_fut = Builder::new()
1111 /// .handshake(my_io);
1112 /// # client_fut
1113 /// # }
1114 /// #
1115 /// # pub fn main() {}
1116 /// ```
1117 ///
1118 /// Configures the send-payload data type. In this case, the outbound data
1119 /// type will be `&'static [u8]`.
1120 ///
1121 /// ```
1122 /// # extern crate h2;
1123 /// # extern crate tokio_io;
1124 /// # use tokio_io::*;
1125 /// # use h2::client::*;
1126 /// #
1127 /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
1128 /// # -> Handshake<T, &'static [u8]>
1129 /// # {
1130 /// // `client_fut` is a future representing the completion of the HTTP/2.0
1131 /// // handshake.
1132 /// let client_fut: Handshake<_, &'static [u8]> = Builder::new()
1133 /// .handshake(my_io);
1134 /// # client_fut
1135 /// # }
1136 /// #
1137 /// # pub fn main() {}
1138 /// ```
handshake<T, B>(&self, io: T) -> Handshake<T, B> where T: AsyncRead + AsyncWrite, B: IntoBuf, B::Buf: 'static,1139 pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
1140 where
1141 T: AsyncRead + AsyncWrite,
1142 B: IntoBuf,
1143 B::Buf: 'static,
1144 {
1145 Connection::handshake2(io, self.clone())
1146 }
1147 }
1148
1149 impl Default for Builder {
default() -> Builder1150 fn default() -> Builder {
1151 Builder::new()
1152 }
1153 }
1154
1155 /// Creates a new configured HTTP/2.0 client with default configuration
1156 /// values backed by `io`.
1157 ///
1158 /// It is expected that `io` already be in an appropriate state to commence
1159 /// the [HTTP/2.0 handshake]. See [Handshake] for more details.
1160 ///
1161 /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1162 /// tuple once the HTTP/2.0 handshake has been completed. The returned
1163 /// [`Connection`] instance will be using default configuration values. Use
1164 /// [`Builder`] to customize the configuration values used by a [`Connection`]
1165 /// instance.
1166 ///
1167 /// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1168 /// [Handshake]: ../index.html#handshake
1169 /// [`Connection`]: struct.Connection.html
1170 /// [`SendRequest`]: struct.SendRequest.html
1171 ///
1172 /// # Examples
1173 ///
1174 /// ```
1175 /// # extern crate futures;
1176 /// # extern crate h2;
1177 /// # extern crate tokio_io;
1178 /// # use futures::*;
1179 /// # use tokio_io::*;
1180 /// # use h2::client;
1181 /// # use h2::client::*;
1182 /// #
1183 /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
1184 /// # {
1185 /// client::handshake(my_io)
1186 /// .and_then(|(send_request, connection)| {
1187 /// // The HTTP/2.0 handshake has completed, now start polling
1188 /// // `connection` and use `send_request` to send requests to the
1189 /// // server.
1190 /// # Ok(())
1191 /// })
1192 /// # .wait().unwrap();
1193 /// # }
1194 /// #
1195 /// # pub fn main() {}
1196 /// ```
handshake<T>(io: T) -> Handshake<T, Bytes> where T: AsyncRead + AsyncWrite,1197 pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
1198 where T: AsyncRead + AsyncWrite,
1199 {
1200 Builder::new().handshake(io)
1201 }
1202
1203 // ===== impl Connection =====
1204
1205 impl<T, B> Connection<T, B>
1206 where
1207 T: AsyncRead + AsyncWrite,
1208 B: IntoBuf,
1209 {
handshake2(io: T, builder: Builder) -> Handshake<T, B>1210 fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
1211 use tokio_io::io;
1212
1213 debug!("binding client connection");
1214
1215 let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1216 let handshake = io::write_all(io, msg);
1217
1218 Handshake {
1219 builder,
1220 inner: handshake,
1221 _marker: PhantomData,
1222 }
1223 }
1224
1225 /// Sets the target window size for the whole connection.
1226 ///
1227 /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
1228 /// frame will be immediately sent to the remote, increasing the connection
1229 /// level window by `size - current_value`.
1230 ///
1231 /// If `size` is less than the current value, nothing will happen
1232 /// immediately. However, as window capacity is released by
1233 /// [`ReleaseCapacity`] instances, no `WINDOW_UPDATE` frames will be sent
1234 /// out until the number of "in flight" bytes drops below `size`.
1235 ///
1236 /// The default value is 65,535.
1237 ///
1238 /// See [`ReleaseCapacity`] documentation for more details.
1239 ///
1240 /// [`ReleaseCapacity`]: ../struct.ReleaseCapacity.html
1241 /// [library level]: ../index.html#flow-control
set_target_window_size(&mut self, size: u32)1242 pub fn set_target_window_size(&mut self, size: u32) {
1243 assert!(size <= proto::MAX_WINDOW_SIZE);
1244 self.inner.set_target_window_size(size);
1245 }
1246 }
1247
1248 impl<T, B> Future for Connection<T, B>
1249 where
1250 T: AsyncRead + AsyncWrite,
1251 B: IntoBuf,
1252 {
1253 type Item = ();
1254 type Error = ::Error;
1255
poll(&mut self) -> Poll<(), ::Error>1256 fn poll(&mut self) -> Poll<(), ::Error> {
1257 self.inner.maybe_close_connection_if_no_streams();
1258 self.inner.poll().map_err(Into::into)
1259 }
1260 }
1261
1262 impl<T, B> fmt::Debug for Connection<T, B>
1263 where
1264 T: AsyncRead + AsyncWrite,
1265 T: fmt::Debug,
1266 B: fmt::Debug + IntoBuf,
1267 B::Buf: fmt::Debug,
1268 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result1269 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1270 fmt::Debug::fmt(&self.inner, fmt)
1271 }
1272 }
1273
1274 // ===== impl Handshake =====
1275
1276 impl<T, B> Future for Handshake<T, B>
1277 where
1278 T: AsyncRead + AsyncWrite,
1279 B: IntoBuf,
1280 B::Buf: 'static,
1281 {
1282 type Item = (SendRequest<B>, Connection<T, B>);
1283 type Error = ::Error;
1284
poll(&mut self) -> Poll<Self::Item, Self::Error>1285 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
1286 let res = self.inner.poll()
1287 .map_err(::Error::from);
1288
1289 let (io, _) = try_ready!(res);
1290
1291 debug!("client connection bound");
1292
1293 // Create the codec
1294 let mut codec = Codec::new(io);
1295
1296 if let Some(max) = self.builder.settings.max_frame_size() {
1297 codec.set_max_recv_frame_size(max as usize);
1298 }
1299
1300 if let Some(max) = self.builder.settings.max_header_list_size() {
1301 codec.set_max_recv_header_list_size(max as usize);
1302 }
1303
1304 // Send initial settings frame
1305 codec
1306 .buffer(self.builder.settings.clone().into())
1307 .expect("invalid SETTINGS frame");
1308
1309 let inner = proto::Connection::new(codec, proto::Config {
1310 next_stream_id: self.builder.stream_id,
1311 initial_max_send_streams: self.builder.initial_max_send_streams,
1312 reset_stream_duration: self.builder.reset_stream_duration,
1313 reset_stream_max: self.builder.reset_stream_max,
1314 settings: self.builder.settings.clone(),
1315 });
1316 let send_request = SendRequest {
1317 inner: inner.streams().clone(),
1318 pending: None,
1319 };
1320
1321 let mut connection = Connection { inner };
1322 if let Some(sz) = self.builder.initial_target_connection_window_size {
1323 connection.set_target_window_size(sz);
1324 }
1325
1326 Ok(Async::Ready((send_request, connection)))
1327 }
1328 }
1329
1330 impl<T, B> fmt::Debug for Handshake<T, B>
1331 where
1332 T: AsyncRead + AsyncWrite,
1333 T: fmt::Debug,
1334 B: fmt::Debug + IntoBuf,
1335 B::Buf: fmt::Debug + IntoBuf,
1336 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result1337 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1338 write!(fmt, "client::Handshake")
1339 }
1340 }
1341
1342 // ===== impl ResponseFuture =====
1343
1344 impl Future for ResponseFuture {
1345 type Item = Response<RecvStream>;
1346 type Error = ::Error;
1347
poll(&mut self) -> Poll<Self::Item, Self::Error>1348 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
1349 let (parts, _) = try_ready!(self.inner.poll_response()).into_parts();
1350 let body = RecvStream::new(ReleaseCapacity::new(self.inner.clone()));
1351
1352 Ok(Response::from_parts(parts, body).into())
1353 }
1354 }
1355
1356 impl ResponseFuture {
1357 /// Returns the stream ID of the response stream.
1358 ///
1359 /// # Panics
1360 ///
1361 /// If the lock on the stream store has been poisoned.
stream_id(&self) -> ::StreamId1362 pub fn stream_id(&self) -> ::StreamId {
1363 ::StreamId::from_internal(self.inner.stream_id())
1364 }
1365 }
1366
1367 // ===== impl Peer =====
1368
1369 impl Peer {
convert_send_message( id: StreamId, request: Request<()>, end_of_stream: bool) -> Result<Headers, SendError>1370 pub fn convert_send_message(
1371 id: StreamId,
1372 request: Request<()>,
1373 end_of_stream: bool) -> Result<Headers, SendError>
1374 {
1375 use http::request::Parts;
1376
1377 let (
1378 Parts {
1379 method,
1380 uri,
1381 headers,
1382 version,
1383 ..
1384 },
1385 _,
1386 ) = request.into_parts();
1387
1388 let is_connect = method == Method::CONNECT;
1389
1390 // Build the set pseudo header set. All requests will include `method`
1391 // and `path`.
1392 let mut pseudo = Pseudo::request(method, uri);
1393
1394 if pseudo.scheme.is_none() {
1395 // If the scheme is not set, then there are a two options.
1396 //
1397 // 1) Authority is not set. In this case, a request was issued with
1398 // a relative URI. This is permitted **only** when forwarding
1399 // HTTP 1.x requests. If the HTTP version is set to 2.0, then
1400 // this is an error.
1401 //
1402 // 2) Authority is set, then the HTTP method *must* be CONNECT.
1403 //
1404 // It is not possible to have a scheme but not an authority set (the
1405 // `http` crate does not allow it).
1406 //
1407 if pseudo.authority.is_none() {
1408 if version == Version::HTTP_2 {
1409 return Err(UserError::MissingUriSchemeAndAuthority.into());
1410 } else {
1411 // This is acceptable as per the above comment. However,
1412 // HTTP/2.0 requires that a scheme is set. Since we are
1413 // forwarding an HTTP 1.1 request, the scheme is set to
1414 // "http".
1415 pseudo.set_scheme(uri::Scheme::HTTP);
1416 }
1417 } else if !is_connect {
1418 // TODO: Error
1419 }
1420 }
1421
1422 // Create the HEADERS frame
1423 let mut frame = Headers::new(id, pseudo, headers);
1424
1425 if end_of_stream {
1426 frame.set_end_stream()
1427 }
1428
1429 Ok(frame)
1430 }
1431 }
1432
1433 impl proto::Peer for Peer {
1434 type Poll = Response<()>;
1435
dyn() -> proto::DynPeer1436 fn dyn() -> proto::DynPeer {
1437 proto::DynPeer::Client
1438 }
1439
is_server() -> bool1440 fn is_server() -> bool {
1441 false
1442 }
1443
convert_poll_message(headers: Headers) -> Result<Self::Poll, RecvError>1444 fn convert_poll_message(headers: Headers) -> Result<Self::Poll, RecvError> {
1445 let mut b = Response::builder();
1446
1447 let stream_id = headers.stream_id();
1448 let (pseudo, fields) = headers.into_parts();
1449
1450 b.version(Version::HTTP_2);
1451
1452 if let Some(status) = pseudo.status {
1453 b.status(status);
1454 }
1455
1456 let mut response = match b.body(()) {
1457 Ok(response) => response,
1458 Err(_) => {
1459 // TODO: Should there be more specialized handling for different
1460 // kinds of errors
1461 return Err(RecvError::Stream {
1462 id: stream_id,
1463 reason: Reason::PROTOCOL_ERROR,
1464 });
1465 },
1466 };
1467
1468 *response.headers_mut() = fields;
1469
1470 Ok(response)
1471 }
1472 }
1473