1 //! Client implementation of the HTTP/2.0 protocol.
2 //!
3 //! # Getting started
4 //!
5 //! Running an HTTP/2.0 client requires the caller to establish the underlying
6 //! connection as well as get the connection to a state that is ready to begin
7 //! the HTTP/2.0 handshake. See [here](../index.html#handshake) for more
8 //! details.
9 //!
10 //! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote
11 //! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades.
12 //!
13 //! Once a connection is obtained, it is passed to [`handshake`], which will
14 //! begin the [HTTP/2.0 handshake]. This returns a future that completes once
15 //! the handshake process is performed and HTTP/2.0 streams may be initialized.
16 //!
17 //! [`handshake`] uses default configuration values. There are a number of
18 //! settings that can be changed by using [`Builder`] instead.
19 //!
20 //! Once the handshake future completes, the caller is provided with a
21 //! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`]
22 //! instance is used to drive the connection (see [Managing the connection]).
23 //! The [`SendRequest`] instance is used to initialize new streams (see [Making
24 //! requests]).
25 //!
26 //! # Making requests
27 //!
28 //! Requests are made using the [`SendRequest`] handle provided by the handshake
29 //! future. Once a request is submitted, an HTTP/2.0 stream is initialized and
30 //! the request is sent to the server.
31 //!
32 //! A request body and request trailers are sent using [`SendRequest`] and the
33 //! server's response is returned once the [`ResponseFuture`] future completes.
34 //! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by
35 //! [`SendRequest::send_request`] and are tied to the HTTP/2.0 stream
36 //! initialized by the sent request.
37 //!
38 //! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2.0
39 //! stream can be created, i.e. as long as the current number of active streams
40 //! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the
41 //! caller will be notified once an existing stream closes, freeing capacity for
42 //! the caller.  The caller should use [`SendRequest::poll_ready`] to check for
43 //! capacity before sending a request to the server.
44 //!
45 //! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user
46 //! must not send a request if `poll_ready` does not return `Ready`. Attempting
47 //! to do so will result in an [`Error`] being returned.
48 //!
49 //! # Managing the connection
50 //!
51 //! The [`Connection`] instance is used to manage connection state. The caller
52 //! is required to call [`Connection::poll`] in order to advance state.
53 //! [`SendRequest::send_request`] and other functions have no effect unless
54 //! [`Connection::poll`] is called.
55 //!
56 //! The [`Connection`] instance should only be dropped once [`Connection::poll`]
57 //! returns `Ready`. At this point, the underlying socket has been closed and no
58 //! further work needs to be done.
59 //!
60 //! The easiest way to ensure that the [`Connection`] instance gets polled is to
61 //! submit the [`Connection`] instance to an [executor]. The executor will then
62 //! manage polling the connection until the connection is complete.
63 //! Alternatively, the caller can call `poll` manually.
64 //!
65 //! # Example
66 //!
67 //! ```rust
68 //! extern crate futures;
69 //! extern crate h2;
70 //! extern crate http;
71 //! extern crate tokio_core;
72 //!
73 //! use h2::client;
74 //!
75 //! use futures::*;
76 //! # use futures::future::ok;
77 //! use http::*;
78 //!
79 //! use tokio_core::net::TcpStream;
80 //! use tokio_core::reactor;
81 //!
82 //! pub fn main() {
83 //!     let mut core = reactor::Core::new().unwrap();
84 //!     let handle = core.handle();
85 //!
86 //!     let addr = "127.0.0.1:5928".parse().unwrap();
87 //!
88 //!     core.run({
89 //! # let _ =
90 //!         // Establish TCP connection to the server.
91 //!         TcpStream::connect(&addr, &handle)
92 //!             .map_err(|_| {
93 //!                 panic!("failed to establish TCP connection")
94 //!             })
95 //!             .and_then(|tcp| client::handshake(tcp))
96 //!             .and_then(|(h2, connection)| {
97 //!                 let connection = connection
98 //!                     .map_err(|_| panic!("HTTP/2.0 connection failed"));
99 //!
100 //!                 // Spawn a new task to drive the connection state
101 //!                 handle.spawn(connection);
102 //!
103 //!                 // Wait until the `SendRequest` handle has available
104 //!                 // capacity.
105 //!                 h2.ready()
106 //!             })
107 //!             .and_then(|mut h2| {
108 //!                 // Prepare the HTTP request to send to the server.
109 //!                 let request = Request::builder()
110 //!                     .method(Method::GET)
111 //!                     .uri("https://www.example.com/")
112 //!                     .body(())
113 //!                     .unwrap();
114 //!
115 //!                 // Send the request. The second tuple item allows the caller
116 //!                 // to stream a request body.
117 //!                 let (response, _) = h2.send_request(request, true).unwrap();
118 //!
119 //!                 response.and_then(|response| {
120 //!                     let (head, mut body) = response.into_parts();
121 //!
122 //!                     println!("Received response: {:?}", head);
123 //!
124 //!                     // The `release_capacity` handle allows the caller to manage
125 //!                     // flow control.
126 //!                     //
127 //!                     // Whenever data is received, the caller is responsible for
128 //!                     // releasing capacity back to the server once it has freed
129 //!                     // the data from memory.
130 //!                     let mut release_capacity = body.release_capacity().clone();
131 //!
132 //!                     body.for_each(move |chunk| {
133 //!                         println!("RX: {:?}", chunk);
134 //!
135 //!                         // Let the server send more data.
136 //!                         let _ = release_capacity.release_capacity(chunk.len());
137 //!
138 //!                         Ok(())
139 //!                     })
140 //!                 })
141 //!             })
142 //! # ;
143 //! # ok::<_, ()>(())
144 //!     }).ok().expect("failed to perform HTTP/2.0 request");
145 //! }
146 //! ```
147 //!
148 //! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html
149 //! [`handshake`]: fn.handshake.html
150 //! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
151 //! [`SendRequest`]: struct.SendRequest.html
152 //! [`SendStream`]: ../struct.SendStream.html
153 //! [Making requests]: #making-requests
154 //! [Managing the connection]: #managing-the-connection
155 //! [`Connection`]: struct.Connection.html
156 //! [`Connection::poll`]: struct.Connection.html#method.poll
157 //! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request
158 //! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues
159 //! [`SendRequest`]: struct.SendRequest.html
160 //! [`ResponseFuture`]: struct.ResponseFuture.html
161 //! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready
162 //! [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
163 //! [`Builder`]: struct.Builder.html
164 //! [`Error`]: ../struct.Error.html
165 
166 use {SendStream, RecvStream, ReleaseCapacity};
167 use codec::{Codec, RecvError, SendError, UserError};
168 use frame::{Headers, Pseudo, Reason, Settings, StreamId};
169 use proto;
170 
171 use bytes::{Bytes, IntoBuf};
172 use futures::{Async, Future, Poll};
173 use http::{uri, Request, Response, Method, Version};
174 use tokio_io::{AsyncRead, AsyncWrite};
175 use tokio_io::io::WriteAll;
176 
177 use std::fmt;
178 use std::marker::PhantomData;
179 use std::time::Duration;
180 use std::usize;
181 
182 /// Performs the HTTP/2.0 connection handshake.
183 ///
184 /// This type implements `Future`, yielding a `(SendRequest, Connection)`
185 /// instance once the handshake has completed.
186 ///
187 /// The handshake is completed once both the connection preface and the initial
188 /// settings frame is sent by the client.
189 ///
190 /// The handshake future does not wait for the initial settings frame from the
191 /// server.
192 ///
193 /// See [module] level documentation for more details.
194 ///
195 /// [module]: index.html
196 #[must_use = "futures do nothing unless polled"]
197 pub struct Handshake<T, B: IntoBuf = Bytes> {
198     builder: Builder,
199     inner: WriteAll<T, &'static [u8]>,
200     _marker: PhantomData<B>,
201 }
202 
203 /// Initializes new HTTP/2.0 streams on a connection by sending a request.
204 ///
205 /// This type does no work itself. Instead, it is a handle to the inner
206 /// connection state held by [`Connection`]. If the associated connection
207 /// instance is dropped, all `SendRequest` functions will return [`Error`].
208 ///
209 /// [`SendRequest`] instances are able to move to and operate on separate tasks
210 /// / threads than their associated [`Connection`] instance. Internally, there
211 /// is a buffer used to stage requests before they get written to the
212 /// connection. There is no guarantee that requests get written to the
213 /// connection in FIFO order as HTTP/2.0 prioritization logic can play a role.
214 ///
215 /// [`SendRequest`] implements [`Clone`], enabling the creation of many
216 /// instances that are backed by a single connection.
217 ///
218 /// See [module] level documentation for more details.
219 ///
220 /// [module]: index.html
221 /// [`Connection`]: struct.Connection.html
222 /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
223 /// [`Error`]: ../struct.Error.html
224 pub struct SendRequest<B: IntoBuf> {
225     inner: proto::Streams<B::Buf, Peer>,
226     pending: Option<proto::OpaqueStreamRef>,
227 }
228 
229 /// Returns a `SendRequest` instance once it is ready to send at least one
230 /// request.
231 #[derive(Debug)]
232 pub struct ReadySendRequest<B: IntoBuf> {
233     inner: Option<SendRequest<B>>,
234 }
235 
236 /// Manages all state associated with an HTTP/2.0 client connection.
237 ///
238 /// A `Connection` is backed by an I/O resource (usually a TCP socket) and
239 /// implements the HTTP/2.0 client logic for that connection. It is responsible
240 /// for driving the internal state forward, performing the work requested of the
241 /// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`],
242 /// [`RecvStream`]).
243 ///
244 /// `Connection` values are created by calling [`handshake`]. Once a
245 /// `Connection` value is obtained, the caller must repeatedly call [`poll`]
246 /// until `Ready` is returned. The easiest way to do this is to submit the
247 /// `Connection` instance to an [executor].
248 ///
249 /// [module]: index.html
250 /// [`handshake`]: fn.handshake.html
251 /// [`SendRequest`]: struct.SendRequest.html
252 /// [`ResponseFuture`]: struct.ResponseFuture.html
253 /// [`SendStream`]: ../struct.SendStream.html
254 /// [`RecvStream`]: ../struct.RecvStream.html
255 /// [`poll`]: #method.poll
256 /// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
257 ///
258 /// # Examples
259 ///
260 /// ```
261 /// # extern crate bytes;
262 /// # extern crate futures;
263 /// # extern crate h2;
264 /// # extern crate tokio_io;
265 /// # use futures::{Future, Stream};
266 /// # use futures::future::Executor;
267 /// # use tokio_io::*;
268 /// # use h2::client;
269 /// # use h2::client::*;
270 /// #
271 /// # fn doc<T, E>(my_io: T, my_executor: E)
272 /// # where T: AsyncRead + AsyncWrite + 'static,
273 /// #       E: Executor<Box<Future<Item = (), Error = ()>>>,
274 /// # {
275 /// client::handshake(my_io)
276 ///     .and_then(|(send_request, connection)| {
277 ///         // Submit the connection handle to an executor.
278 ///         my_executor.execute(
279 ///             # Box::new(
280 ///             connection.map_err(|_| panic!("connection failed"))
281 ///             # )
282 ///         ).unwrap();
283 ///
284 ///         // Now, use `send_request` to initialize HTTP/2.0 streams.
285 ///         // ...
286 ///         # drop(send_request);
287 ///         # Ok(())
288 ///     })
289 /// # .wait().unwrap();
290 /// # }
291 /// #
292 /// # pub fn main() {}
293 /// ```
294 #[must_use = "futures do nothing unless polled"]
295 pub struct Connection<T, B: IntoBuf = Bytes> {
296     inner: proto::Connection<T, Peer, B>,
297 }
298 
299 /// A future of an HTTP response.
300 #[derive(Debug)]
301 #[must_use = "futures do nothing unless polled"]
302 pub struct ResponseFuture {
303     inner: proto::OpaqueStreamRef,
304 }
305 
306 /// Builds client connections with custom configuration values.
307 ///
308 /// Methods can be chained in order to set the configuration values.
309 ///
310 /// The client is constructed by calling [`handshake`] and passing the I/O
311 /// handle that will back the HTTP/2.0 server.
312 ///
313 /// New instances of `Builder` are obtained via [`Builder::new`].
314 ///
315 /// See function level documentation for details on the various client
316 /// configuration settings.
317 ///
318 /// [`Builder::new`]: struct.Builder.html#method.new
319 /// [`handshake`]: struct.Builder.html#method.handshake
320 ///
321 /// # Examples
322 ///
323 /// ```
324 /// # extern crate h2;
325 /// # extern crate tokio_io;
326 /// # use tokio_io::*;
327 /// # use h2::client::*;
328 /// #
329 /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
330 /// # -> Handshake<T>
331 /// # {
332 /// // `client_fut` is a future representing the completion of the HTTP/2.0
333 /// // handshake.
334 /// let client_fut = Builder::new()
335 ///     .initial_window_size(1_000_000)
336 ///     .max_concurrent_streams(1000)
337 ///     .handshake(my_io);
338 /// # client_fut
339 /// # }
340 /// #
341 /// # pub fn main() {}
342 /// ```
343 #[derive(Clone, Debug)]
344 pub struct Builder {
345     /// Time to keep locally reset streams around before reaping.
346     reset_stream_duration: Duration,
347 
348     /// Initial maximum number of locally initiated (send) streams.
349     /// After receiving a Settings frame from the remote peer,
350     /// the connection will overwrite this value with the
351     /// MAX_CONCURRENT_STREAMS specified in the frame.
352     initial_max_send_streams: usize,
353 
354     /// Initial target window size for new connections.
355     initial_target_connection_window_size: Option<u32>,
356 
357     /// Maximum number of locally reset streams to keep at a time.
358     reset_stream_max: usize,
359 
360     /// Initial `Settings` frame to send as part of the handshake.
361     settings: Settings,
362 
363     /// The stream ID of the first (lowest) stream. Subsequent streams will use
364     /// monotonically increasing stream IDs.
365     stream_id: StreamId,
366 }
367 
368 #[derive(Debug)]
369 pub(crate) struct Peer;
370 
371 // ===== impl SendRequest =====
372 
373 impl<B> SendRequest<B>
374 where
375     B: IntoBuf,
376     B::Buf: 'static,
377 {
378     /// Returns `Ready` when the connection can initialize a new HTTP/2.0
379     /// stream.
380     ///
381     /// This function must return `Ready` before `send_request` is called. When
382     /// `NotReady` is returned, the task will be notified once the readiness
383     /// state changes.
384     ///
385     /// See [module] level docs for more details.
386     ///
387     /// [module]: index.html
poll_ready(&mut self) -> Poll<(), ::Error>388     pub fn poll_ready(&mut self) -> Poll<(), ::Error> {
389         try_ready!(self.inner.poll_pending_open(self.pending.as_ref()));
390         self.pending = None;
391         Ok(().into())
392     }
393 
394     /// Consumes `self`, returning a future that returns `self` back once it is
395     /// ready to send a request.
396     ///
397     /// This function should be called before calling `send_request`.
398     ///
399     /// This is a functional combinator for [`poll_ready`]. The returned future
400     /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to
401     /// the caller.
402     ///
403     /// # Examples
404     ///
405     /// ```rust
406     /// # extern crate futures;
407     /// # extern crate h2;
408     /// # extern crate http;
409     /// # use futures::*;
410     /// # use h2::client::*;
411     /// # use http::*;
412     /// # fn doc(send_request: SendRequest<&'static [u8]>)
413     /// # {
414     /// // First, wait until the `send_request` handle is ready to send a new
415     /// // request
416     /// send_request.ready()
417     ///     .and_then(|mut send_request| {
418     ///         // Use `send_request` here.
419     ///         # Ok(())
420     ///     })
421     ///     # .wait().unwrap();
422     /// # }
423     /// # pub fn main() {}
424     /// ```
425     ///
426     /// See [module] level docs for more details.
427     ///
428     /// [module]: index.html
ready(self) -> ReadySendRequest<B>429     pub fn ready(self) -> ReadySendRequest<B> {
430         ReadySendRequest { inner: Some(self) }
431     }
432 
433     /// Sends a HTTP/2.0 request to the server.
434     ///
435     /// `send_request` initializes a new HTTP/2.0 stream on the associated
436     /// connection, then sends the given request using this new stream. Only the
437     /// request head is sent.
438     ///
439     /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance
440     /// are returned. The [`ResponseFuture`] instance is used to get the
441     /// server's response and the [`SendStream`] instance is used to send a
442     /// request body or trailers to the server over the same HTTP/2.0 stream.
443     ///
444     /// To send a request body or trailers, set `end_of_stream` to `false`.
445     /// Then, use the returned [`SendStream`] instance to stream request body
446     /// chunks or send trailers. If `end_of_stream` is **not** set to `false`
447     /// then attempting to call [`SendStream::send_data`] or
448     /// [`SendStream::send_trailers`] will result in an error.
449     ///
450     /// If no request body or trailers are to be sent, set `end_of_stream` to
451     /// `true` and drop the returned [`SendStream`] instance.
452     ///
453     /// # A note on HTTP versions
454     ///
455     /// The provided `Request` will be encoded differently depending on the
456     /// value of its version field. If the version is set to 2.0, then the
457     /// request is encoded as per the specification recommends.
458     ///
459     /// If the version is set to a lower value, then the request is encoded to
460     /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host
461     /// headers are permitted and the `:authority` pseudo header is not
462     /// included.
463     ///
464     /// The caller should always set the request's version field to 2.0 unless
465     /// specifically transmitting an HTTP 1.1 request over 2.0.
466     ///
467     /// # Examples
468     ///
469     /// Sending a request with no body
470     ///
471     /// ```rust
472     /// # extern crate futures;
473     /// # extern crate h2;
474     /// # extern crate http;
475     /// # use futures::*;
476     /// # use h2::client::*;
477     /// # use http::*;
478     /// # fn doc(send_request: SendRequest<&'static [u8]>)
479     /// # {
480     /// // First, wait until the `send_request` handle is ready to send a new
481     /// // request
482     /// send_request.ready()
483     ///     .and_then(|mut send_request| {
484     ///         // Prepare the HTTP request to send to the server.
485     ///         let request = Request::get("https://www.example.com/")
486     ///             .body(())
487     ///             .unwrap();
488     ///
489     ///         // Send the request to the server. Since we are not sending a
490     ///         // body or trailers, we can drop the `SendStream` instance.
491     ///         let (response, _) = send_request
492     ///             .send_request(request, true).unwrap();
493     ///
494     ///         response
495     ///     })
496     ///     .and_then(|response| {
497     ///         // Process the response
498     ///         # Ok(())
499     ///     })
500     ///     # .wait().unwrap();
501     /// # }
502     /// # pub fn main() {}
503     /// ```
504     ///
505     /// Sending a request with a body and trailers
506     ///
507     /// ```rust
508     /// # extern crate futures;
509     /// # extern crate h2;
510     /// # extern crate http;
511     /// # use futures::*;
512     /// # use h2::client::*;
513     /// # use http::*;
514     /// # fn doc(send_request: SendRequest<&'static [u8]>)
515     /// # {
516     /// // First, wait until the `send_request` handle is ready to send a new
517     /// // request
518     /// send_request.ready()
519     ///     .and_then(|mut send_request| {
520     ///         // Prepare the HTTP request to send to the server.
521     ///         let request = Request::get("https://www.example.com/")
522     ///             .body(())
523     ///             .unwrap();
524     ///
525     ///         // Send the request to the server. Since we are not sending a
526     ///         // body or trailers, we can drop the `SendStream` instance.
527     ///         let (response, mut send_stream) = send_request
528     ///             .send_request(request, false).unwrap();
529     ///
530     ///         // At this point, one option would be to wait for send capacity.
531     ///         // Doing so would allow us to not hold data in memory that
532     ///         // cannot be sent. However, this is not a requirement, so this
533     ///         // example will skip that step. See `SendStream` documentation
534     ///         // for more details.
535     ///         send_stream.send_data(b"hello", false).unwrap();
536     ///         send_stream.send_data(b"world", false).unwrap();
537     ///
538     ///         // Send the trailers.
539     ///         let mut trailers = HeaderMap::new();
540     ///         trailers.insert(
541     ///             header::HeaderName::from_bytes(b"my-trailer").unwrap(),
542     ///             header::HeaderValue::from_bytes(b"hello").unwrap());
543     ///
544     ///         send_stream.send_trailers(trailers).unwrap();
545     ///
546     ///         response
547     ///     })
548     ///     .and_then(|response| {
549     ///         // Process the response
550     ///         # Ok(())
551     ///     })
552     ///     # .wait().unwrap();
553     /// # }
554     /// # pub fn main() {}
555     /// ```
556     ///
557     /// [`ResponseFuture`]: struct.ResponseFuture.html
558     /// [`SendStream`]: ../struct.SendStream.html
559     /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data
560     /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers
send_request( &mut self, request: Request<()>, end_of_stream: bool, ) -> Result<(ResponseFuture, SendStream<B>), ::Error>561     pub fn send_request(
562         &mut self,
563         request: Request<()>,
564         end_of_stream: bool,
565     ) -> Result<(ResponseFuture, SendStream<B>), ::Error> {
566         self.inner
567             .send_request(request, end_of_stream, self.pending.as_ref())
568             .map_err(Into::into)
569             .map(|stream| {
570                 if stream.is_pending_open() {
571                     self.pending = Some(stream.clone_to_opaque());
572                 }
573 
574                 let response = ResponseFuture {
575                     inner: stream.clone_to_opaque(),
576                 };
577 
578                 let stream = SendStream::new(stream);
579 
580                 (response, stream)
581             })
582     }
583 }
584 
585 impl<B> fmt::Debug for SendRequest<B>
586 where
587     B: IntoBuf,
588 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result589     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
590         fmt.debug_struct("SendRequest").finish()
591     }
592 }
593 
594 impl<B> Clone for SendRequest<B>
595 where
596     B: IntoBuf,
597 {
clone(&self) -> Self598     fn clone(&self) -> Self {
599         SendRequest {
600             inner: self.inner.clone(),
601             pending: None,
602         }
603     }
604 }
605 
606 #[cfg(feature = "unstable")]
607 impl<B> SendRequest<B>
608 where
609     B: IntoBuf,
610 {
611     /// Returns the number of active streams.
612     ///
613     /// An active stream is a stream that has not yet transitioned to a closed
614     /// state.
num_active_streams(&self) -> usize615     pub fn num_active_streams(&self) -> usize {
616         self.inner.num_active_streams()
617     }
618 
619     /// Returns the number of streams that are held in memory.
620     ///
621     /// A wired stream is a stream that is either active or is closed but must
622     /// stay in memory for some reason. For example, there are still outstanding
623     /// userspace handles pointing to the slot.
num_wired_streams(&self) -> usize624     pub fn num_wired_streams(&self) -> usize {
625         self.inner.num_wired_streams()
626     }
627 }
628 
629 // ===== impl ReadySendRequest =====
630 
631 impl<B> Future for ReadySendRequest<B>
632 where B: IntoBuf,
633       B::Buf: 'static,
634 {
635     type Item = SendRequest<B>;
636     type Error = ::Error;
637 
poll(&mut self) -> Poll<Self::Item, Self::Error>638     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
639         match self.inner {
640             Some(ref mut send_request) => {
641                 let _ = try_ready!(send_request.poll_ready());
642             }
643             None => panic!("called `poll` after future completed"),
644         }
645 
646         Ok(self.inner.take().unwrap().into())
647     }
648 }
649 
650 // ===== impl Builder =====
651 
652 impl Builder {
653     /// Returns a new client builder instance initialized with default
654     /// configuration values.
655     ///
656     /// Configuration methods can be chained on the return value.
657     ///
658     /// # Examples
659     ///
660     /// ```
661     /// # extern crate h2;
662     /// # extern crate tokio_io;
663     /// # use tokio_io::*;
664     /// # use h2::client::*;
665     /// #
666     /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
667     /// # -> Handshake<T>
668     /// # {
669     /// // `client_fut` is a future representing the completion of the HTTP/2.0
670     /// // handshake.
671     /// let client_fut = Builder::new()
672     ///     .initial_window_size(1_000_000)
673     ///     .max_concurrent_streams(1000)
674     ///     .handshake(my_io);
675     /// # client_fut
676     /// # }
677     /// #
678     /// # pub fn main() {}
679     /// ```
new() -> Builder680     pub fn new() -> Builder {
681         Builder {
682             reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
683             reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
684             initial_target_connection_window_size: None,
685             initial_max_send_streams: usize::MAX,
686             settings: Default::default(),
687             stream_id: 1.into(),
688         }
689     }
690 
691     /// Indicates the initial window size (in octets) for stream-level
692     /// flow control for received data.
693     ///
694     /// The initial window of a stream is used as part of flow control. For more
695     /// details, see [`ReleaseCapacity`].
696     ///
697     /// The default value is 65,535.
698     ///
699     /// [`ReleaseCapacity`]: ../struct.ReleaseCapacity.html
700     ///
701     /// # Examples
702     ///
703     /// ```
704     /// # extern crate h2;
705     /// # extern crate tokio_io;
706     /// # use tokio_io::*;
707     /// # use h2::client::*;
708     /// #
709     /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
710     /// # -> Handshake<T>
711     /// # {
712     /// // `client_fut` is a future representing the completion of the HTTP/2.0
713     /// // handshake.
714     /// let client_fut = Builder::new()
715     ///     .initial_window_size(1_000_000)
716     ///     .handshake(my_io);
717     /// # client_fut
718     /// # }
719     /// #
720     /// # pub fn main() {}
721     /// ```
initial_window_size(&mut self, size: u32) -> &mut Self722     pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
723         self.settings.set_initial_window_size(Some(size));
724         self
725     }
726 
727     /// Indicates the initial window size (in octets) for connection-level flow control
728     /// for received data.
729     ///
730     /// The initial window of a connection is used as part of flow control. For more details,
731     /// see [`ReleaseCapacity`].
732     ///
733     /// The default value is 65,535.
734     ///
735     /// [`ReleaseCapacity`]: ../struct.ReleaseCapacity.html
736     ///
737     /// # Examples
738     ///
739     /// ```
740     /// # extern crate h2;
741     /// # extern crate tokio_io;
742     /// # use tokio_io::*;
743     /// # use h2::client::*;
744     /// #
745     /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
746     /// # -> Handshake<T>
747     /// # {
748     /// // `client_fut` is a future representing the completion of the HTTP/2.0
749     /// // handshake.
750     /// let client_fut = Builder::new()
751     ///     .initial_connection_window_size(1_000_000)
752     ///     .handshake(my_io);
753     /// # client_fut
754     /// # }
755     /// #
756     /// # pub fn main() {}
757     /// ```
initial_connection_window_size(&mut self, size: u32) -> &mut Self758     pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
759         self.initial_target_connection_window_size = Some(size);
760         self
761     }
762 
763     /// Indicates the size (in octets) of the largest HTTP/2.0 frame payload that the
764     /// configured client is able to accept.
765     ///
766     /// The sender may send data frames that are **smaller** than this value,
767     /// but any data larger than `max` will be broken up into multiple `DATA`
768     /// frames.
769     ///
770     /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
771     ///
772     /// # Examples
773     ///
774     /// ```
775     /// # extern crate h2;
776     /// # extern crate tokio_io;
777     /// # use tokio_io::*;
778     /// # use h2::client::*;
779     /// #
780     /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
781     /// # -> Handshake<T>
782     /// # {
783     /// // `client_fut` is a future representing the completion of the HTTP/2.0
784     /// // handshake.
785     /// let client_fut = Builder::new()
786     ///     .max_frame_size(1_000_000)
787     ///     .handshake(my_io);
788     /// # client_fut
789     /// # }
790     /// #
791     /// # pub fn main() {}
792     /// ```
793     ///
794     /// # Panics
795     ///
796     /// This function panics if `max` is not within the legal range specified
797     /// above.
max_frame_size(&mut self, max: u32) -> &mut Self798     pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
799         self.settings.set_max_frame_size(Some(max));
800         self
801     }
802 
803     /// Sets the max size of received header frames.
804     ///
805     /// This advisory setting informs a peer of the maximum size of header list
806     /// that the sender is prepared to accept, in octets. The value is based on
807     /// the uncompressed size of header fields, including the length of the name
808     /// and value in octets plus an overhead of 32 octets for each header field.
809     ///
810     /// This setting is also used to limit the maximum amount of data that is
811     /// buffered to decode HEADERS frames.
812     ///
813     /// # Examples
814     ///
815     /// ```
816     /// # extern crate h2;
817     /// # extern crate tokio_io;
818     /// # use tokio_io::*;
819     /// # use h2::client::*;
820     /// #
821     /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
822     /// # -> Handshake<T>
823     /// # {
824     /// // `client_fut` is a future representing the completion of the HTTP/2.0
825     /// // handshake.
826     /// let client_fut = Builder::new()
827     ///     .max_header_list_size(16 * 1024)
828     ///     .handshake(my_io);
829     /// # client_fut
830     /// # }
831     /// #
832     /// # pub fn main() {}
833     /// ```
max_header_list_size(&mut self, max: u32) -> &mut Self834     pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
835         self.settings.set_max_header_list_size(Some(max));
836         self
837     }
838 
839     /// Sets the maximum number of concurrent streams.
840     ///
841     /// The maximum concurrent streams setting only controls the maximum number
842     /// of streams that can be initiated by the remote peer. In other words,
843     /// when this setting is set to 100, this does not limit the number of
844     /// concurrent streams that can be created by the caller.
845     ///
846     /// It is recommended that this value be no smaller than 100, so as to not
847     /// unnecessarily limit parallelism. However, any value is legal, including
848     /// 0. If `max` is set to 0, then the remote will not be permitted to
849     /// initiate streams.
850     ///
851     /// Note that streams in the reserved state, i.e., push promises that have
852     /// been reserved but the stream has not started, do not count against this
853     /// setting.
854     ///
855     /// Also note that if the remote *does* exceed the value set here, it is not
856     /// a protocol level error. Instead, the `h2` library will immediately reset
857     /// the stream.
858     ///
859     /// See [Section 5.1.2] in the HTTP/2.0 spec for more details.
860     ///
861     /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
862     ///
863     /// # Examples
864     ///
865     /// ```
866     /// # extern crate h2;
867     /// # extern crate tokio_io;
868     /// # use tokio_io::*;
869     /// # use h2::client::*;
870     /// #
871     /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
872     /// # -> Handshake<T>
873     /// # {
874     /// // `client_fut` is a future representing the completion of the HTTP/2.0
875     /// // handshake.
876     /// let client_fut = Builder::new()
877     ///     .max_concurrent_streams(1000)
878     ///     .handshake(my_io);
879     /// # client_fut
880     /// # }
881     /// #
882     /// # pub fn main() {}
883     /// ```
max_concurrent_streams(&mut self, max: u32) -> &mut Self884     pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
885         self.settings.set_max_concurrent_streams(Some(max));
886         self
887     }
888 
889     /// Sets the initial maximum of locally initiated (send) streams.
890     ///
891     /// The initial settings will be overwritten by the remote peer when
892     /// the Settings frame is received. The new value will be set to the
893     /// `max_concurrent_streams()` from the frame.
894     ///
895     /// This setting prevents the caller from exceeding this number of
896     /// streams that are counted towards the concurrency limit.
897     ///
898     /// Sending streams past the limit returned by the peer will be treated
899     /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM.
900     ///
901     /// See [Section 5.1.2] in the HTTP/2.0 spec for more details.
902     ///
903     /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
904     ///
905     /// # Examples
906     ///
907     /// ```
908     /// # extern crate h2;
909     /// # extern crate tokio_io;
910     /// # use tokio_io::*;
911     /// # use h2::client::*;
912     /// #
913     /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
914     /// # -> Handshake<T>
915     /// # {
916     /// // `client_fut` is a future representing the completion of the HTTP/2.0
917     /// // handshake.
918     /// let client_fut = Builder::new()
919     ///     .initial_max_send_streams(1000)
920     ///     .handshake(my_io);
921     /// # client_fut
922     /// # }
923     /// #
924     /// # pub fn main() {}
925     /// ```
initial_max_send_streams(&mut self, initial: usize) -> &mut Self926     pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self {
927         self.initial_max_send_streams = initial;
928         self
929     }
930 
931     /// Sets the maximum number of concurrent locally reset streams.
932     ///
933     /// When a stream is explicitly reset by either calling
934     /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
935     /// before completing the stream, the HTTP/2.0 specification requires that
936     /// any further frames received for that stream must be ignored for "some
937     /// time".
938     ///
939     /// In order to satisfy the specification, internal state must be maintained
940     /// to implement the behavior. This state grows linearly with the number of
941     /// streams that are locally reset.
942     ///
943     /// The `max_concurrent_reset_streams` setting configures sets an upper
944     /// bound on the amount of state that is maintained. When this max value is
945     /// reached, the oldest reset stream is purged from memory.
946     ///
947     /// Once the stream has been fully purged from memory, any additional frames
948     /// received for that stream will result in a connection level protocol
949     /// error, forcing the connection to terminate.
950     ///
951     /// The default value is 10.
952     ///
953     /// # Examples
954     ///
955     /// ```
956     /// # extern crate h2;
957     /// # extern crate tokio_io;
958     /// # use tokio_io::*;
959     /// # use h2::client::*;
960     /// #
961     /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
962     /// # -> Handshake<T>
963     /// # {
964     /// // `client_fut` is a future representing the completion of the HTTP/2.0
965     /// // handshake.
966     /// let client_fut = Builder::new()
967     ///     .max_concurrent_reset_streams(1000)
968     ///     .handshake(my_io);
969     /// # client_fut
970     /// # }
971     /// #
972     /// # pub fn main() {}
973     /// ```
max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self974     pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
975         self.reset_stream_max = max;
976         self
977     }
978 
979     /// Sets the maximum number of concurrent locally reset streams.
980     ///
981     /// When a stream is explicitly reset by either calling
982     /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
983     /// before completing the stream, the HTTP/2.0 specification requires that
984     /// any further frames received for that stream must be ignored for "some
985     /// time".
986     ///
987     /// In order to satisfy the specification, internal state must be maintained
988     /// to implement the behavior. This state grows linearly with the number of
989     /// streams that are locally reset.
990     ///
991     /// The `reset_stream_duration` setting configures the max amount of time
992     /// this state will be maintained in memory. Once the duration elapses, the
993     /// stream state is purged from memory.
994     ///
995     /// Once the stream has been fully purged from memory, any additional frames
996     /// received for that stream will result in a connection level protocol
997     /// error, forcing the connection to terminate.
998     ///
999     /// The default value is 30 seconds.
1000     ///
1001     /// # Examples
1002     ///
1003     /// ```
1004     /// # extern crate h2;
1005     /// # extern crate tokio_io;
1006     /// # use tokio_io::*;
1007     /// # use h2::client::*;
1008     /// # use std::time::Duration;
1009     /// #
1010     /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
1011     /// # -> Handshake<T>
1012     /// # {
1013     /// // `client_fut` is a future representing the completion of the HTTP/2.0
1014     /// // handshake.
1015     /// let client_fut = Builder::new()
1016     ///     .reset_stream_duration(Duration::from_secs(10))
1017     ///     .handshake(my_io);
1018     /// # client_fut
1019     /// # }
1020     /// #
1021     /// # pub fn main() {}
1022     /// ```
reset_stream_duration(&mut self, dur: Duration) -> &mut Self1023     pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
1024         self.reset_stream_duration = dur;
1025         self
1026     }
1027 
1028     /// Enables or disables server push promises.
1029     ///
1030     /// This value is included in the initial SETTINGS handshake. When set, the
1031     /// server MUST NOT send a push promise. Setting this value to value to
1032     /// false in the initial SETTINGS handshake guarantees that the remote server
1033     /// will never send a push promise.
1034     ///
1035     /// This setting can be changed during the life of a single HTTP/2.0
1036     /// connection by sending another settings frame updating the value.
1037     ///
1038     /// Default value: `true`.
1039     ///
1040     /// # Examples
1041     ///
1042     /// ```
1043     /// # extern crate h2;
1044     /// # extern crate tokio_io;
1045     /// # use tokio_io::*;
1046     /// # use h2::client::*;
1047     /// # use std::time::Duration;
1048     /// #
1049     /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
1050     /// # -> Handshake<T>
1051     /// # {
1052     /// // `client_fut` is a future representing the completion of the HTTP/2.0
1053     /// // handshake.
1054     /// let client_fut = Builder::new()
1055     ///     .enable_push(false)
1056     ///     .handshake(my_io);
1057     /// # client_fut
1058     /// # }
1059     /// #
1060     /// # pub fn main() {}
1061     /// ```
enable_push(&mut self, enabled: bool) -> &mut Self1062     pub fn enable_push(&mut self, enabled: bool) -> &mut Self {
1063         self.settings.set_enable_push(enabled);
1064         self
1065     }
1066 
1067     /// Sets the first stream ID to something other than 1.
1068     #[cfg(feature = "unstable")]
initial_stream_id(&mut self, stream_id: u32) -> &mut Self1069     pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self {
1070         self.stream_id = stream_id.into();
1071         assert!(
1072             self.stream_id.is_client_initiated(),
1073             "stream id must be odd"
1074         );
1075         self
1076     }
1077 
1078     /// Creates a new configured HTTP/2.0 client backed by `io`.
1079     ///
1080     /// It is expected that `io` already be in an appropriate state to commence
1081     /// the [HTTP/2.0 handshake]. See [Handshake] for more details.
1082     ///
1083     /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1084     /// tuple once the HTTP/2.0 handshake has been completed.
1085     ///
1086     /// This function also allows the caller to configure the send payload data
1087     /// type. See [Outbound data type] for more details.
1088     ///
1089     /// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1090     /// [Handshake]: ../index.html#handshake
1091     /// [`Connection`]: struct.Connection.html
1092     /// [`SendRequest`]: struct.SendRequest.html
1093     /// [Outbound data type]: ../index.html#outbound-data-type.
1094     ///
1095     /// # Examples
1096     ///
1097     /// Basic usage:
1098     ///
1099     /// ```
1100     /// # extern crate h2;
1101     /// # extern crate tokio_io;
1102     /// # use tokio_io::*;
1103     /// # use h2::client::*;
1104     /// #
1105     /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
1106     /// # -> Handshake<T>
1107     /// # {
1108     /// // `client_fut` is a future representing the completion of the HTTP/2.0
1109     /// // handshake.
1110     /// let client_fut = Builder::new()
1111     ///     .handshake(my_io);
1112     /// # client_fut
1113     /// # }
1114     /// #
1115     /// # pub fn main() {}
1116     /// ```
1117     ///
1118     /// Configures the send-payload data type. In this case, the outbound data
1119     /// type will be `&'static [u8]`.
1120     ///
1121     /// ```
1122     /// # extern crate h2;
1123     /// # extern crate tokio_io;
1124     /// # use tokio_io::*;
1125     /// # use h2::client::*;
1126     /// #
1127     /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
1128     /// # -> Handshake<T, &'static [u8]>
1129     /// # {
1130     /// // `client_fut` is a future representing the completion of the HTTP/2.0
1131     /// // handshake.
1132     /// let client_fut: Handshake<_, &'static [u8]> = Builder::new()
1133     ///     .handshake(my_io);
1134     /// # client_fut
1135     /// # }
1136     /// #
1137     /// # pub fn main() {}
1138     /// ```
handshake<T, B>(&self, io: T) -> Handshake<T, B> where T: AsyncRead + AsyncWrite, B: IntoBuf, B::Buf: 'static,1139     pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
1140     where
1141         T: AsyncRead + AsyncWrite,
1142         B: IntoBuf,
1143         B::Buf: 'static,
1144     {
1145         Connection::handshake2(io, self.clone())
1146     }
1147 }
1148 
1149 impl Default for Builder {
default() -> Builder1150     fn default() -> Builder {
1151         Builder::new()
1152     }
1153 }
1154 
1155 /// Creates a new configured HTTP/2.0 client with default configuration
1156 /// values backed by `io`.
1157 ///
1158 /// It is expected that `io` already be in an appropriate state to commence
1159 /// the [HTTP/2.0 handshake]. See [Handshake] for more details.
1160 ///
1161 /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1162 /// tuple once the HTTP/2.0 handshake has been completed. The returned
1163 /// [`Connection`] instance will be using default configuration values. Use
1164 /// [`Builder`] to customize the configuration values used by a [`Connection`]
1165 /// instance.
1166 ///
1167 /// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1168 /// [Handshake]: ../index.html#handshake
1169 /// [`Connection`]: struct.Connection.html
1170 /// [`SendRequest`]: struct.SendRequest.html
1171 ///
1172 /// # Examples
1173 ///
1174 /// ```
1175 /// # extern crate futures;
1176 /// # extern crate h2;
1177 /// # extern crate tokio_io;
1178 /// # use futures::*;
1179 /// # use tokio_io::*;
1180 /// # use h2::client;
1181 /// # use h2::client::*;
1182 /// #
1183 /// # fn doc<T: AsyncRead + AsyncWrite>(my_io: T)
1184 /// # {
1185 /// client::handshake(my_io)
1186 ///     .and_then(|(send_request, connection)| {
1187 ///         // The HTTP/2.0 handshake has completed, now start polling
1188 ///         // `connection` and use `send_request` to send requests to the
1189 ///         // server.
1190 ///         # Ok(())
1191 ///     })
1192 ///     # .wait().unwrap();
1193 /// # }
1194 /// #
1195 /// # pub fn main() {}
1196 /// ```
handshake<T>(io: T) -> Handshake<T, Bytes> where T: AsyncRead + AsyncWrite,1197 pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
1198 where T: AsyncRead + AsyncWrite,
1199 {
1200     Builder::new().handshake(io)
1201 }
1202 
1203 // ===== impl Connection =====
1204 
1205 impl<T, B> Connection<T, B>
1206 where
1207     T: AsyncRead + AsyncWrite,
1208     B: IntoBuf,
1209 {
handshake2(io: T, builder: Builder) -> Handshake<T, B>1210     fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
1211         use tokio_io::io;
1212 
1213         debug!("binding client connection");
1214 
1215         let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1216         let handshake = io::write_all(io, msg);
1217 
1218         Handshake {
1219             builder,
1220             inner: handshake,
1221             _marker: PhantomData,
1222         }
1223     }
1224 
1225     /// Sets the target window size for the whole connection.
1226     ///
1227     /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
1228     /// frame will be immediately sent to the remote, increasing the connection
1229     /// level window by `size - current_value`.
1230     ///
1231     /// If `size` is less than the current value, nothing will happen
1232     /// immediately. However, as window capacity is released by
1233     /// [`ReleaseCapacity`] instances, no `WINDOW_UPDATE` frames will be sent
1234     /// out until the number of "in flight" bytes drops below `size`.
1235     ///
1236     /// The default value is 65,535.
1237     ///
1238     /// See [`ReleaseCapacity`] documentation for more details.
1239     ///
1240     /// [`ReleaseCapacity`]: ../struct.ReleaseCapacity.html
1241     /// [library level]: ../index.html#flow-control
set_target_window_size(&mut self, size: u32)1242     pub fn set_target_window_size(&mut self, size: u32) {
1243         assert!(size <= proto::MAX_WINDOW_SIZE);
1244         self.inner.set_target_window_size(size);
1245     }
1246 }
1247 
1248 impl<T, B> Future for Connection<T, B>
1249 where
1250     T: AsyncRead + AsyncWrite,
1251     B: IntoBuf,
1252 {
1253     type Item = ();
1254     type Error = ::Error;
1255 
poll(&mut self) -> Poll<(), ::Error>1256     fn poll(&mut self) -> Poll<(), ::Error> {
1257         self.inner.maybe_close_connection_if_no_streams();
1258         self.inner.poll().map_err(Into::into)
1259     }
1260 }
1261 
1262 impl<T, B> fmt::Debug for Connection<T, B>
1263 where
1264     T: AsyncRead + AsyncWrite,
1265     T: fmt::Debug,
1266     B: fmt::Debug + IntoBuf,
1267     B::Buf: fmt::Debug,
1268 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result1269     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1270         fmt::Debug::fmt(&self.inner, fmt)
1271     }
1272 }
1273 
1274 // ===== impl Handshake =====
1275 
1276 impl<T, B> Future for Handshake<T, B>
1277 where
1278     T: AsyncRead + AsyncWrite,
1279     B: IntoBuf,
1280     B::Buf: 'static,
1281 {
1282     type Item = (SendRequest<B>, Connection<T, B>);
1283     type Error = ::Error;
1284 
poll(&mut self) -> Poll<Self::Item, Self::Error>1285     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
1286         let res = self.inner.poll()
1287             .map_err(::Error::from);
1288 
1289         let (io, _) = try_ready!(res);
1290 
1291         debug!("client connection bound");
1292 
1293         // Create the codec
1294         let mut codec = Codec::new(io);
1295 
1296         if let Some(max) = self.builder.settings.max_frame_size() {
1297             codec.set_max_recv_frame_size(max as usize);
1298         }
1299 
1300         if let Some(max) = self.builder.settings.max_header_list_size() {
1301             codec.set_max_recv_header_list_size(max as usize);
1302         }
1303 
1304         // Send initial settings frame
1305         codec
1306             .buffer(self.builder.settings.clone().into())
1307             .expect("invalid SETTINGS frame");
1308 
1309         let inner = proto::Connection::new(codec, proto::Config {
1310             next_stream_id: self.builder.stream_id,
1311             initial_max_send_streams: self.builder.initial_max_send_streams,
1312             reset_stream_duration: self.builder.reset_stream_duration,
1313             reset_stream_max: self.builder.reset_stream_max,
1314             settings: self.builder.settings.clone(),
1315         });
1316         let send_request = SendRequest {
1317             inner: inner.streams().clone(),
1318             pending: None,
1319         };
1320 
1321         let mut connection = Connection { inner };
1322         if let Some(sz) = self.builder.initial_target_connection_window_size {
1323             connection.set_target_window_size(sz);
1324         }
1325 
1326         Ok(Async::Ready((send_request, connection)))
1327     }
1328 }
1329 
1330 impl<T, B> fmt::Debug for Handshake<T, B>
1331 where
1332     T: AsyncRead + AsyncWrite,
1333     T: fmt::Debug,
1334     B: fmt::Debug + IntoBuf,
1335     B::Buf: fmt::Debug + IntoBuf,
1336 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result1337     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1338         write!(fmt, "client::Handshake")
1339     }
1340 }
1341 
1342 // ===== impl ResponseFuture =====
1343 
1344 impl Future for ResponseFuture {
1345     type Item = Response<RecvStream>;
1346     type Error = ::Error;
1347 
poll(&mut self) -> Poll<Self::Item, Self::Error>1348     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
1349         let (parts, _) = try_ready!(self.inner.poll_response()).into_parts();
1350         let body = RecvStream::new(ReleaseCapacity::new(self.inner.clone()));
1351 
1352         Ok(Response::from_parts(parts, body).into())
1353     }
1354 }
1355 
1356 impl ResponseFuture {
1357     /// Returns the stream ID of the response stream.
1358     ///
1359     /// # Panics
1360     ///
1361     /// If the lock on the stream store has been poisoned.
stream_id(&self) -> ::StreamId1362     pub fn stream_id(&self) -> ::StreamId {
1363         ::StreamId::from_internal(self.inner.stream_id())
1364     }
1365 }
1366 
1367 // ===== impl Peer =====
1368 
1369 impl Peer {
convert_send_message( id: StreamId, request: Request<()>, end_of_stream: bool) -> Result<Headers, SendError>1370     pub fn convert_send_message(
1371         id: StreamId,
1372         request: Request<()>,
1373         end_of_stream: bool) -> Result<Headers, SendError>
1374     {
1375         use http::request::Parts;
1376 
1377         let (
1378             Parts {
1379                 method,
1380                 uri,
1381                 headers,
1382                 version,
1383                 ..
1384             },
1385             _,
1386         ) = request.into_parts();
1387 
1388         let is_connect = method == Method::CONNECT;
1389 
1390         // Build the set pseudo header set. All requests will include `method`
1391         // and `path`.
1392         let mut pseudo = Pseudo::request(method, uri);
1393 
1394         if pseudo.scheme.is_none() {
1395             // If the scheme is not set, then there are a two options.
1396             //
1397             // 1) Authority is not set. In this case, a request was issued with
1398             //    a relative URI. This is permitted **only** when forwarding
1399             //    HTTP 1.x requests. If the HTTP version is set to 2.0, then
1400             //    this is an error.
1401             //
1402             // 2) Authority is set, then the HTTP method *must* be CONNECT.
1403             //
1404             // It is not possible to have a scheme but not an authority set (the
1405             // `http` crate does not allow it).
1406             //
1407             if pseudo.authority.is_none() {
1408                 if version == Version::HTTP_2 {
1409                     return Err(UserError::MissingUriSchemeAndAuthority.into());
1410                 } else {
1411                     // This is acceptable as per the above comment. However,
1412                     // HTTP/2.0 requires that a scheme is set. Since we are
1413                     // forwarding an HTTP 1.1 request, the scheme is set to
1414                     // "http".
1415                     pseudo.set_scheme(uri::Scheme::HTTP);
1416                 }
1417             } else if !is_connect {
1418                 // TODO: Error
1419             }
1420         }
1421 
1422         // Create the HEADERS frame
1423         let mut frame = Headers::new(id, pseudo, headers);
1424 
1425         if end_of_stream {
1426             frame.set_end_stream()
1427         }
1428 
1429         Ok(frame)
1430     }
1431 }
1432 
1433 impl proto::Peer for Peer {
1434     type Poll = Response<()>;
1435 
dyn() -> proto::DynPeer1436     fn dyn() -> proto::DynPeer {
1437         proto::DynPeer::Client
1438     }
1439 
is_server() -> bool1440     fn is_server() -> bool {
1441         false
1442     }
1443 
convert_poll_message(headers: Headers) -> Result<Self::Poll, RecvError>1444     fn convert_poll_message(headers: Headers) -> Result<Self::Poll, RecvError> {
1445         let mut b = Response::builder();
1446 
1447         let stream_id = headers.stream_id();
1448         let (pseudo, fields) = headers.into_parts();
1449 
1450         b.version(Version::HTTP_2);
1451 
1452         if let Some(status) = pseudo.status {
1453             b.status(status);
1454         }
1455 
1456         let mut response = match b.body(()) {
1457             Ok(response) => response,
1458             Err(_) => {
1459                 // TODO: Should there be more specialized handling for different
1460                 // kinds of errors
1461                 return Err(RecvError::Stream {
1462                     id: stream_id,
1463                     reason: Reason::PROTOCOL_ERROR,
1464                 });
1465             },
1466         };
1467 
1468         *response.headers_mut() = fields;
1469 
1470         Ok(response)
1471     }
1472 }
1473