1 use std::fmt;
2 use std::io;
3 use std::marker::PhantomData;
4 #[cfg(all(feature = "server", feature = "runtime"))]
5 use std::time::Duration;
6 
7 use bytes::{Buf, Bytes};
8 use http::header::{HeaderValue, CONNECTION};
9 use http::{HeaderMap, Method, Version};
10 use httparse::ParserConfig;
11 use tokio::io::{AsyncRead, AsyncWrite};
12 #[cfg(all(feature = "server", feature = "runtime"))]
13 use tokio::time::Sleep;
14 use tracing::{debug, error, trace};
15 
16 use super::io::Buffered;
17 use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants};
18 use crate::body::DecodedLength;
19 use crate::common::{task, Pin, Poll, Unpin};
20 use crate::headers::connection_keep_alive;
21 use crate::proto::{BodyLength, MessageHead};
22 
23 const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
24 
25 /// This handles a connection, which will have been established over an
26 /// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple
27 /// `Transaction`s over HTTP.
28 ///
29 /// The connection will determine when a message begins and ends as well as
30 /// determine if this connection can be kept alive after the message,
31 /// or if it is complete.
32 pub(crate) struct Conn<I, B, T> {
33     io: Buffered<I, EncodedBuf<B>>,
34     state: State,
35     _marker: PhantomData<fn(T)>,
36 }
37 
38 impl<I, B, T> Conn<I, B, T>
39 where
40     I: AsyncRead + AsyncWrite + Unpin,
41     B: Buf,
42     T: Http1Transaction,
43 {
new(io: I) -> Conn<I, B, T>44     pub(crate) fn new(io: I) -> Conn<I, B, T> {
45         Conn {
46             io: Buffered::new(io),
47             state: State {
48                 allow_half_close: false,
49                 cached_headers: None,
50                 error: None,
51                 keep_alive: KA::Busy,
52                 method: None,
53                 h1_parser_config: ParserConfig::default(),
54                 #[cfg(all(feature = "server", feature = "runtime"))]
55                 h1_header_read_timeout: None,
56                 #[cfg(all(feature = "server", feature = "runtime"))]
57                 h1_header_read_timeout_fut: None,
58                 #[cfg(all(feature = "server", feature = "runtime"))]
59                 h1_header_read_timeout_running: false,
60                 preserve_header_case: false,
61                 title_case_headers: false,
62                 h09_responses: false,
63                 #[cfg(feature = "ffi")]
64                 on_informational: None,
65                 #[cfg(feature = "ffi")]
66                 raw_headers: false,
67                 notify_read: false,
68                 reading: Reading::Init,
69                 writing: Writing::Init,
70                 upgrade: None,
71                 // We assume a modern world where the remote speaks HTTP/1.1.
72                 // If they tell us otherwise, we'll downgrade in `read_head`.
73                 version: Version::HTTP_11,
74             },
75             _marker: PhantomData,
76         }
77     }
78 
79     #[cfg(feature = "server")]
set_flush_pipeline(&mut self, enabled: bool)80     pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) {
81         self.io.set_flush_pipeline(enabled);
82     }
83 
set_write_strategy_queue(&mut self)84     pub(crate) fn set_write_strategy_queue(&mut self) {
85         self.io.set_write_strategy_queue();
86     }
87 
set_max_buf_size(&mut self, max: usize)88     pub(crate) fn set_max_buf_size(&mut self, max: usize) {
89         self.io.set_max_buf_size(max);
90     }
91 
92     #[cfg(feature = "client")]
set_read_buf_exact_size(&mut self, sz: usize)93     pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) {
94         self.io.set_read_buf_exact_size(sz);
95     }
96 
set_write_strategy_flatten(&mut self)97     pub(crate) fn set_write_strategy_flatten(&mut self) {
98         self.io.set_write_strategy_flatten();
99     }
100 
101     #[cfg(feature = "client")]
set_h1_parser_config(&mut self, parser_config: ParserConfig)102     pub(crate) fn set_h1_parser_config(&mut self, parser_config: ParserConfig) {
103         self.state.h1_parser_config = parser_config;
104     }
105 
set_title_case_headers(&mut self)106     pub(crate) fn set_title_case_headers(&mut self) {
107         self.state.title_case_headers = true;
108     }
109 
set_preserve_header_case(&mut self)110     pub(crate) fn set_preserve_header_case(&mut self) {
111         self.state.preserve_header_case = true;
112     }
113 
114     #[cfg(feature = "client")]
set_h09_responses(&mut self)115     pub(crate) fn set_h09_responses(&mut self) {
116         self.state.h09_responses = true;
117     }
118 
119     #[cfg(all(feature = "server", feature = "runtime"))]
set_http1_header_read_timeout(&mut self, val: Duration)120     pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) {
121         self.state.h1_header_read_timeout = Some(val);
122     }
123 
124     #[cfg(feature = "server")]
set_allow_half_close(&mut self)125     pub(crate) fn set_allow_half_close(&mut self) {
126         self.state.allow_half_close = true;
127     }
128 
129     #[cfg(feature = "ffi")]
set_raw_headers(&mut self, enabled: bool)130     pub(crate) fn set_raw_headers(&mut self, enabled: bool) {
131         self.state.raw_headers = enabled;
132     }
133 
into_inner(self) -> (I, Bytes)134     pub(crate) fn into_inner(self) -> (I, Bytes) {
135         self.io.into_inner()
136     }
137 
pending_upgrade(&mut self) -> Option<crate::upgrade::Pending>138     pub(crate) fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> {
139         self.state.upgrade.take()
140     }
141 
is_read_closed(&self) -> bool142     pub(crate) fn is_read_closed(&self) -> bool {
143         self.state.is_read_closed()
144     }
145 
is_write_closed(&self) -> bool146     pub(crate) fn is_write_closed(&self) -> bool {
147         self.state.is_write_closed()
148     }
149 
can_read_head(&self) -> bool150     pub(crate) fn can_read_head(&self) -> bool {
151         match self.state.reading {
152             Reading::Init => {
153                 if T::should_read_first() {
154                     true
155                 } else {
156                     match self.state.writing {
157                         Writing::Init => false,
158                         _ => true,
159                     }
160                 }
161             }
162             _ => false,
163         }
164     }
165 
can_read_body(&self) -> bool166     pub(crate) fn can_read_body(&self) -> bool {
167         match self.state.reading {
168             Reading::Body(..) | Reading::Continue(..) => true,
169             _ => false,
170         }
171     }
172 
should_error_on_eof(&self) -> bool173     fn should_error_on_eof(&self) -> bool {
174         // If we're idle, it's probably just the connection closing gracefully.
175         T::should_error_on_parse_eof() && !self.state.is_idle()
176     }
177 
has_h2_prefix(&self) -> bool178     fn has_h2_prefix(&self) -> bool {
179         let read_buf = self.io.read_buf();
180         read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
181     }
182 
poll_read_head( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>>183     pub(super) fn poll_read_head(
184         &mut self,
185         cx: &mut task::Context<'_>,
186     ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> {
187         debug_assert!(self.can_read_head());
188         trace!("Conn::read_head");
189 
190         let msg = match ready!(self.io.parse::<T>(
191             cx,
192             ParseContext {
193                 cached_headers: &mut self.state.cached_headers,
194                 req_method: &mut self.state.method,
195                 h1_parser_config: self.state.h1_parser_config.clone(),
196                 #[cfg(all(feature = "server", feature = "runtime"))]
197                 h1_header_read_timeout: self.state.h1_header_read_timeout,
198                 #[cfg(all(feature = "server", feature = "runtime"))]
199                 h1_header_read_timeout_fut: &mut self.state.h1_header_read_timeout_fut,
200                 #[cfg(all(feature = "server", feature = "runtime"))]
201                 h1_header_read_timeout_running: &mut self.state.h1_header_read_timeout_running,
202                 preserve_header_case: self.state.preserve_header_case,
203                 h09_responses: self.state.h09_responses,
204                 #[cfg(feature = "ffi")]
205                 on_informational: &mut self.state.on_informational,
206                 #[cfg(feature = "ffi")]
207                 raw_headers: self.state.raw_headers,
208             }
209         )) {
210             Ok(msg) => msg,
211             Err(e) => return self.on_read_head_error(e),
212         };
213 
214         // Note: don't deconstruct `msg` into local variables, it appears
215         // the optimizer doesn't remove the extra copies.
216 
217         debug!("incoming body is {}", msg.decode);
218 
219         // Prevent accepting HTTP/0.9 responses after the initial one, if any.
220         self.state.h09_responses = false;
221 
222         // Drop any OnInformational callbacks, we're done there!
223         #[cfg(feature = "ffi")]
224         {
225             self.state.on_informational = None;
226         }
227 
228         self.state.busy();
229         self.state.keep_alive &= msg.keep_alive;
230         self.state.version = msg.head.version;
231 
232         let mut wants = if msg.wants_upgrade {
233             Wants::UPGRADE
234         } else {
235             Wants::EMPTY
236         };
237 
238         if msg.decode == DecodedLength::ZERO {
239             if msg.expect_continue {
240                 debug!("ignoring expect-continue since body is empty");
241             }
242             self.state.reading = Reading::KeepAlive;
243             if !T::should_read_first() {
244                 self.try_keep_alive(cx);
245             }
246         } else if msg.expect_continue {
247             self.state.reading = Reading::Continue(Decoder::new(msg.decode));
248             wants = wants.add(Wants::EXPECT);
249         } else {
250             self.state.reading = Reading::Body(Decoder::new(msg.decode));
251         }
252 
253         Poll::Ready(Some(Ok((msg.head, msg.decode, wants))))
254     }
255 
on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>>256     fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> {
257         // If we are currently waiting on a message, then an empty
258         // message should be reported as an error. If not, it is just
259         // the connection closing gracefully.
260         let must_error = self.should_error_on_eof();
261         self.close_read();
262         self.io.consume_leading_lines();
263         let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
264         if was_mid_parse || must_error {
265             // We check if the buf contains the h2 Preface
266             debug!(
267                 "parse error ({}) with {} bytes",
268                 e,
269                 self.io.read_buf().len()
270             );
271             match self.on_parse_error(e) {
272                 Ok(()) => Poll::Pending, // XXX: wat?
273                 Err(e) => Poll::Ready(Some(Err(e))),
274             }
275         } else {
276             debug!("read eof");
277             self.close_write();
278             Poll::Ready(None)
279         }
280     }
281 
poll_read_body( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<io::Result<Bytes>>>282     pub(crate) fn poll_read_body(
283         &mut self,
284         cx: &mut task::Context<'_>,
285     ) -> Poll<Option<io::Result<Bytes>>> {
286         debug_assert!(self.can_read_body());
287 
288         let (reading, ret) = match self.state.reading {
289             Reading::Body(ref mut decoder) => {
290                 match ready!(decoder.decode(cx, &mut self.io)) {
291                     Ok(slice) => {
292                         let (reading, chunk) = if decoder.is_eof() {
293                             debug!("incoming body completed");
294                             (
295                                 Reading::KeepAlive,
296                                 if !slice.is_empty() {
297                                     Some(Ok(slice))
298                                 } else {
299                                     None
300                                 },
301                             )
302                         } else if slice.is_empty() {
303                             error!("incoming body unexpectedly ended");
304                             // This should be unreachable, since all 3 decoders
305                             // either set eof=true or return an Err when reading
306                             // an empty slice...
307                             (Reading::Closed, None)
308                         } else {
309                             return Poll::Ready(Some(Ok(slice)));
310                         };
311                         (reading, Poll::Ready(chunk))
312                     }
313                     Err(e) => {
314                         debug!("incoming body decode error: {}", e);
315                         (Reading::Closed, Poll::Ready(Some(Err(e))))
316                     }
317                 }
318             }
319             Reading::Continue(ref decoder) => {
320                 // Write the 100 Continue if not already responded...
321                 if let Writing::Init = self.state.writing {
322                     trace!("automatically sending 100 Continue");
323                     let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
324                     self.io.headers_buf().extend_from_slice(cont);
325                 }
326 
327                 // And now recurse once in the Reading::Body state...
328                 self.state.reading = Reading::Body(decoder.clone());
329                 return self.poll_read_body(cx);
330             }
331             _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading),
332         };
333 
334         self.state.reading = reading;
335         self.try_keep_alive(cx);
336         ret
337     }
338 
wants_read_again(&mut self) -> bool339     pub(crate) fn wants_read_again(&mut self) -> bool {
340         let ret = self.state.notify_read;
341         self.state.notify_read = false;
342         ret
343     }
344 
poll_read_keep_alive( &mut self, cx: &mut task::Context<'_>, ) -> Poll<crate::Result<()>>345     pub(crate) fn poll_read_keep_alive(
346         &mut self,
347         cx: &mut task::Context<'_>,
348     ) -> Poll<crate::Result<()>> {
349         debug_assert!(!self.can_read_head() && !self.can_read_body());
350 
351         if self.is_read_closed() {
352             Poll::Pending
353         } else if self.is_mid_message() {
354             self.mid_message_detect_eof(cx)
355         } else {
356             self.require_empty_read(cx)
357         }
358     }
359 
is_mid_message(&self) -> bool360     fn is_mid_message(&self) -> bool {
361         match (&self.state.reading, &self.state.writing) {
362             (&Reading::Init, &Writing::Init) => false,
363             _ => true,
364         }
365     }
366 
367     // This will check to make sure the io object read is empty.
368     //
369     // This should only be called for Clients wanting to enter the idle
370     // state.
require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>371     fn require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
372         debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
373         debug_assert!(!self.is_mid_message());
374         debug_assert!(T::is_client());
375 
376         if !self.io.read_buf().is_empty() {
377             debug!("received an unexpected {} bytes", self.io.read_buf().len());
378             return Poll::Ready(Err(crate::Error::new_unexpected_message()));
379         }
380 
381         let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
382 
383         if num_read == 0 {
384             let ret = if self.should_error_on_eof() {
385                 trace!("found unexpected EOF on busy connection: {:?}", self.state);
386                 Poll::Ready(Err(crate::Error::new_incomplete()))
387             } else {
388                 trace!("found EOF on idle connection, closing");
389                 Poll::Ready(Ok(()))
390             };
391 
392             // order is important: should_error needs state BEFORE close_read
393             self.state.close_read();
394             return ret;
395         }
396 
397         debug!(
398             "received unexpected {} bytes on an idle connection",
399             num_read
400         );
401         Poll::Ready(Err(crate::Error::new_unexpected_message()))
402     }
403 
mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>404     fn mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
405         debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
406         debug_assert!(self.is_mid_message());
407 
408         if self.state.allow_half_close || !self.io.read_buf().is_empty() {
409             return Poll::Pending;
410         }
411 
412         let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
413 
414         if num_read == 0 {
415             trace!("found unexpected EOF on busy connection: {:?}", self.state);
416             self.state.close_read();
417             Poll::Ready(Err(crate::Error::new_incomplete()))
418         } else {
419             Poll::Ready(Ok(()))
420         }
421     }
422 
force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>>423     fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>> {
424         debug_assert!(!self.state.is_read_closed());
425 
426         let result = ready!(self.io.poll_read_from_io(cx));
427         Poll::Ready(result.map_err(|e| {
428             trace!("force_io_read; io error = {:?}", e);
429             self.state.close();
430             e
431         }))
432     }
433 
maybe_notify(&mut self, cx: &mut task::Context<'_>)434     fn maybe_notify(&mut self, cx: &mut task::Context<'_>) {
435         // its possible that we returned NotReady from poll() without having
436         // exhausted the underlying Io. We would have done this when we
437         // determined we couldn't keep reading until we knew how writing
438         // would finish.
439 
440         match self.state.reading {
441             Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => {
442                 return
443             }
444             Reading::Init => (),
445         };
446 
447         match self.state.writing {
448             Writing::Body(..) => return,
449             Writing::Init | Writing::KeepAlive | Writing::Closed => (),
450         }
451 
452         if !self.io.is_read_blocked() {
453             if self.io.read_buf().is_empty() {
454                 match self.io.poll_read_from_io(cx) {
455                     Poll::Ready(Ok(n)) => {
456                         if n == 0 {
457                             trace!("maybe_notify; read eof");
458                             if self.state.is_idle() {
459                                 self.state.close();
460                             } else {
461                                 self.close_read()
462                             }
463                             return;
464                         }
465                     }
466                     Poll::Pending => {
467                         trace!("maybe_notify; read_from_io blocked");
468                         return;
469                     }
470                     Poll::Ready(Err(e)) => {
471                         trace!("maybe_notify; read_from_io error: {}", e);
472                         self.state.close();
473                         self.state.error = Some(crate::Error::new_io(e));
474                     }
475                 }
476             }
477             self.state.notify_read = true;
478         }
479     }
480 
try_keep_alive(&mut self, cx: &mut task::Context<'_>)481     fn try_keep_alive(&mut self, cx: &mut task::Context<'_>) {
482         self.state.try_keep_alive::<T>();
483         self.maybe_notify(cx);
484     }
485 
can_write_head(&self) -> bool486     pub(crate) fn can_write_head(&self) -> bool {
487         if !T::should_read_first() {
488             if let Reading::Closed = self.state.reading {
489                 return false;
490             }
491         }
492         match self.state.writing {
493             Writing::Init => self.io.can_headers_buf(),
494             _ => false,
495         }
496     }
497 
can_write_body(&self) -> bool498     pub(crate) fn can_write_body(&self) -> bool {
499         match self.state.writing {
500             Writing::Body(..) => true,
501             Writing::Init | Writing::KeepAlive | Writing::Closed => false,
502         }
503     }
504 
can_buffer_body(&self) -> bool505     pub(crate) fn can_buffer_body(&self) -> bool {
506         self.io.can_buffer()
507     }
508 
write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>)509     pub(crate) fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) {
510         if let Some(encoder) = self.encode_head(head, body) {
511             self.state.writing = if !encoder.is_eof() {
512                 Writing::Body(encoder)
513             } else if encoder.is_last() {
514                 Writing::Closed
515             } else {
516                 Writing::KeepAlive
517             };
518         }
519     }
520 
write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B)521     pub(crate) fn write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B) {
522         if let Some(encoder) =
523             self.encode_head(head, Some(BodyLength::Known(body.remaining() as u64)))
524         {
525             let is_last = encoder.is_last();
526             // Make sure we don't write a body if we weren't actually allowed
527             // to do so, like because its a HEAD request.
528             if !encoder.is_eof() {
529                 encoder.danger_full_buf(body, self.io.write_buf());
530             }
531             self.state.writing = if is_last {
532                 Writing::Closed
533             } else {
534                 Writing::KeepAlive
535             }
536         }
537     }
538 
encode_head( &mut self, mut head: MessageHead<T::Outgoing>, body: Option<BodyLength>, ) -> Option<Encoder>539     fn encode_head(
540         &mut self,
541         mut head: MessageHead<T::Outgoing>,
542         body: Option<BodyLength>,
543     ) -> Option<Encoder> {
544         debug_assert!(self.can_write_head());
545 
546         if !T::should_read_first() {
547             self.state.busy();
548         }
549 
550         self.enforce_version(&mut head);
551 
552         let buf = self.io.headers_buf();
553         match super::role::encode_headers::<T>(
554             Encode {
555                 head: &mut head,
556                 body,
557                 #[cfg(feature = "server")]
558                 keep_alive: self.state.wants_keep_alive(),
559                 req_method: &mut self.state.method,
560                 title_case_headers: self.state.title_case_headers,
561             },
562             buf,
563         ) {
564             Ok(encoder) => {
565                 debug_assert!(self.state.cached_headers.is_none());
566                 debug_assert!(head.headers.is_empty());
567                 self.state.cached_headers = Some(head.headers);
568 
569                 #[cfg(feature = "ffi")]
570                 {
571                     self.state.on_informational =
572                         head.extensions.remove::<crate::ffi::OnInformational>();
573                 }
574 
575                 Some(encoder)
576             }
577             Err(err) => {
578                 self.state.error = Some(err);
579                 self.state.writing = Writing::Closed;
580                 None
581             }
582         }
583     }
584 
585     // Fix keep-alives when Connection: keep-alive header is not present
fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>)586     fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) {
587         let outgoing_is_keep_alive = head
588             .headers
589             .get(CONNECTION)
590             .map(connection_keep_alive)
591             .unwrap_or(false);
592 
593         if !outgoing_is_keep_alive {
594             match head.version {
595                 // If response is version 1.0 and keep-alive is not present in the response,
596                 // disable keep-alive so the server closes the connection
597                 Version::HTTP_10 => self.state.disable_keep_alive(),
598                 // If response is version 1.1 and keep-alive is wanted, add
599                 // Connection: keep-alive header when not present
600                 Version::HTTP_11 => {
601                     if self.state.wants_keep_alive() {
602                         head.headers
603                             .insert(CONNECTION, HeaderValue::from_static("keep-alive"));
604                     }
605                 }
606                 _ => (),
607             }
608         }
609     }
610 
611     // If we know the remote speaks an older version, we try to fix up any messages
612     // to work with our older peer.
enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>)613     fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) {
614         if let Version::HTTP_10 = self.state.version {
615             // Fixes response or connection when keep-alive header is not present
616             self.fix_keep_alive(head);
617             // If the remote only knows HTTP/1.0, we should force ourselves
618             // to do only speak HTTP/1.0 as well.
619             head.version = Version::HTTP_10;
620         }
621         // If the remote speaks HTTP/1.1, then it *should* be fine with
622         // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let
623         // the user's headers be.
624     }
625 
write_body(&mut self, chunk: B)626     pub(crate) fn write_body(&mut self, chunk: B) {
627         debug_assert!(self.can_write_body() && self.can_buffer_body());
628         // empty chunks should be discarded at Dispatcher level
629         debug_assert!(chunk.remaining() != 0);
630 
631         let state = match self.state.writing {
632             Writing::Body(ref mut encoder) => {
633                 self.io.buffer(encoder.encode(chunk));
634 
635                 if encoder.is_eof() {
636                     if encoder.is_last() {
637                         Writing::Closed
638                     } else {
639                         Writing::KeepAlive
640                     }
641                 } else {
642                     return;
643                 }
644             }
645             _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
646         };
647 
648         self.state.writing = state;
649     }
650 
write_body_and_end(&mut self, chunk: B)651     pub(crate) fn write_body_and_end(&mut self, chunk: B) {
652         debug_assert!(self.can_write_body() && self.can_buffer_body());
653         // empty chunks should be discarded at Dispatcher level
654         debug_assert!(chunk.remaining() != 0);
655 
656         let state = match self.state.writing {
657             Writing::Body(ref encoder) => {
658                 let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf());
659                 if can_keep_alive {
660                     Writing::KeepAlive
661                 } else {
662                     Writing::Closed
663                 }
664             }
665             _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
666         };
667 
668         self.state.writing = state;
669     }
670 
end_body(&mut self) -> crate::Result<()>671     pub(crate) fn end_body(&mut self) -> crate::Result<()> {
672         debug_assert!(self.can_write_body());
673 
674         let mut res = Ok(());
675         let state = match self.state.writing {
676             Writing::Body(ref mut encoder) => {
677                 // end of stream, that means we should try to eof
678                 match encoder.end() {
679                     Ok(end) => {
680                         if let Some(end) = end {
681                             self.io.buffer(end);
682                         }
683                         if encoder.is_last() || encoder.is_close_delimited() {
684                             Writing::Closed
685                         } else {
686                             Writing::KeepAlive
687                         }
688                     }
689                     Err(_not_eof) => {
690                         res = Err(crate::Error::new_user_body(
691                             crate::Error::new_body_write_aborted(),
692                         ));
693                         Writing::Closed
694                     }
695                 }
696             }
697             _ => return Ok(()),
698         };
699 
700         self.state.writing = state;
701         res
702     }
703 
704     // When we get a parse error, depending on what side we are, we might be able
705     // to write a response before closing the connection.
706     //
707     // - Client: there is nothing we can do
708     // - Server: if Response hasn't been written yet, we can send a 4xx response
on_parse_error(&mut self, err: crate::Error) -> crate::Result<()>709     fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> {
710         if let Writing::Init = self.state.writing {
711             if self.has_h2_prefix() {
712                 return Err(crate::Error::new_version_h2());
713             }
714             if let Some(msg) = T::on_error(&err) {
715                 // Drop the cached headers so as to not trigger a debug
716                 // assert in `write_head`...
717                 self.state.cached_headers.take();
718                 self.write_head(msg, None);
719                 self.state.error = Some(err);
720                 return Ok(());
721             }
722         }
723 
724         // fallback is pass the error back up
725         Err(err)
726     }
727 
poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>>728     pub(crate) fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
729         ready!(Pin::new(&mut self.io).poll_flush(cx))?;
730         self.try_keep_alive(cx);
731         trace!("flushed({}): {:?}", T::LOG, self.state);
732         Poll::Ready(Ok(()))
733     }
734 
poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>>735     pub(crate) fn poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
736         match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) {
737             Ok(()) => {
738                 trace!("shut down IO complete");
739                 Poll::Ready(Ok(()))
740             }
741             Err(e) => {
742                 debug!("error shutting down IO: {}", e);
743                 Poll::Ready(Err(e))
744             }
745         }
746     }
747 
748     /// If the read side can be cheaply drained, do so. Otherwise, close.
poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>)749     pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>) {
750         let _ = self.poll_read_body(cx);
751 
752         // If still in Reading::Body, just give up
753         match self.state.reading {
754             Reading::Init | Reading::KeepAlive => {
755                 trace!("body drained");
756                 return;
757             }
758             _ => self.close_read(),
759         }
760     }
761 
close_read(&mut self)762     pub(crate) fn close_read(&mut self) {
763         self.state.close_read();
764     }
765 
close_write(&mut self)766     pub(crate) fn close_write(&mut self) {
767         self.state.close_write();
768     }
769 
770     #[cfg(feature = "server")]
disable_keep_alive(&mut self)771     pub(crate) fn disable_keep_alive(&mut self) {
772         if self.state.is_idle() {
773             trace!("disable_keep_alive; closing idle connection");
774             self.state.close();
775         } else {
776             trace!("disable_keep_alive; in-progress connection");
777             self.state.disable_keep_alive();
778         }
779     }
780 
take_error(&mut self) -> crate::Result<()>781     pub(crate) fn take_error(&mut self) -> crate::Result<()> {
782         if let Some(err) = self.state.error.take() {
783             Err(err)
784         } else {
785             Ok(())
786         }
787     }
788 
on_upgrade(&mut self) -> crate::upgrade::OnUpgrade789     pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
790         trace!("{}: prepare possible HTTP upgrade", T::LOG);
791         self.state.prepare_upgrade()
792     }
793 }
794 
795 impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result796     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
797         f.debug_struct("Conn")
798             .field("state", &self.state)
799             .field("io", &self.io)
800             .finish()
801     }
802 }
803 
804 // B and T are never pinned
805 impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {}
806 
807 struct State {
808     allow_half_close: bool,
809     /// Re-usable HeaderMap to reduce allocating new ones.
810     cached_headers: Option<HeaderMap>,
811     /// If an error occurs when there wasn't a direct way to return it
812     /// back to the user, this is set.
813     error: Option<crate::Error>,
814     /// Current keep-alive status.
815     keep_alive: KA,
816     /// If mid-message, the HTTP Method that started it.
817     ///
818     /// This is used to know things such as if the message can include
819     /// a body or not.
820     method: Option<Method>,
821     h1_parser_config: ParserConfig,
822     #[cfg(all(feature = "server", feature = "runtime"))]
823     h1_header_read_timeout: Option<Duration>,
824     #[cfg(all(feature = "server", feature = "runtime"))]
825     h1_header_read_timeout_fut: Option<Pin<Box<Sleep>>>,
826     #[cfg(all(feature = "server", feature = "runtime"))]
827     h1_header_read_timeout_running: bool,
828     preserve_header_case: bool,
829     title_case_headers: bool,
830     h09_responses: bool,
831     /// If set, called with each 1xx informational response received for
832     /// the current request. MUST be unset after a non-1xx response is
833     /// received.
834     #[cfg(feature = "ffi")]
835     on_informational: Option<crate::ffi::OnInformational>,
836     #[cfg(feature = "ffi")]
837     raw_headers: bool,
838     /// Set to true when the Dispatcher should poll read operations
839     /// again. See the `maybe_notify` method for more.
840     notify_read: bool,
841     /// State of allowed reads
842     reading: Reading,
843     /// State of allowed writes
844     writing: Writing,
845     /// An expected pending HTTP upgrade.
846     upgrade: Option<crate::upgrade::Pending>,
847     /// Either HTTP/1.0 or 1.1 connection
848     version: Version,
849 }
850 
851 #[derive(Debug)]
852 enum Reading {
853     Init,
854     Continue(Decoder),
855     Body(Decoder),
856     KeepAlive,
857     Closed,
858 }
859 
860 enum Writing {
861     Init,
862     Body(Encoder),
863     KeepAlive,
864     Closed,
865 }
866 
867 impl fmt::Debug for State {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result868     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
869         let mut builder = f.debug_struct("State");
870         builder
871             .field("reading", &self.reading)
872             .field("writing", &self.writing)
873             .field("keep_alive", &self.keep_alive);
874 
875         // Only show error field if it's interesting...
876         if let Some(ref error) = self.error {
877             builder.field("error", error);
878         }
879 
880         if self.allow_half_close {
881             builder.field("allow_half_close", &true);
882         }
883 
884         // Purposefully leaving off other fields..
885 
886         builder.finish()
887     }
888 }
889 
890 impl fmt::Debug for Writing {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result891     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
892         match *self {
893             Writing::Init => f.write_str("Init"),
894             Writing::Body(ref enc) => f.debug_tuple("Body").field(enc).finish(),
895             Writing::KeepAlive => f.write_str("KeepAlive"),
896             Writing::Closed => f.write_str("Closed"),
897         }
898     }
899 }
900 
901 impl std::ops::BitAndAssign<bool> for KA {
bitand_assign(&mut self, enabled: bool)902     fn bitand_assign(&mut self, enabled: bool) {
903         if !enabled {
904             trace!("remote disabling keep-alive");
905             *self = KA::Disabled;
906         }
907     }
908 }
909 
910 #[derive(Clone, Copy, Debug)]
911 enum KA {
912     Idle,
913     Busy,
914     Disabled,
915 }
916 
917 impl Default for KA {
default() -> KA918     fn default() -> KA {
919         KA::Busy
920     }
921 }
922 
923 impl KA {
idle(&mut self)924     fn idle(&mut self) {
925         *self = KA::Idle;
926     }
927 
busy(&mut self)928     fn busy(&mut self) {
929         *self = KA::Busy;
930     }
931 
disable(&mut self)932     fn disable(&mut self) {
933         *self = KA::Disabled;
934     }
935 
status(&self) -> KA936     fn status(&self) -> KA {
937         *self
938     }
939 }
940 
941 impl State {
close(&mut self)942     fn close(&mut self) {
943         trace!("State::close()");
944         self.reading = Reading::Closed;
945         self.writing = Writing::Closed;
946         self.keep_alive.disable();
947     }
948 
close_read(&mut self)949     fn close_read(&mut self) {
950         trace!("State::close_read()");
951         self.reading = Reading::Closed;
952         self.keep_alive.disable();
953     }
954 
close_write(&mut self)955     fn close_write(&mut self) {
956         trace!("State::close_write()");
957         self.writing = Writing::Closed;
958         self.keep_alive.disable();
959     }
960 
wants_keep_alive(&self) -> bool961     fn wants_keep_alive(&self) -> bool {
962         if let KA::Disabled = self.keep_alive.status() {
963             false
964         } else {
965             true
966         }
967     }
968 
try_keep_alive<T: Http1Transaction>(&mut self)969     fn try_keep_alive<T: Http1Transaction>(&mut self) {
970         match (&self.reading, &self.writing) {
971             (&Reading::KeepAlive, &Writing::KeepAlive) => {
972                 if let KA::Busy = self.keep_alive.status() {
973                     self.idle::<T>();
974                 } else {
975                     trace!(
976                         "try_keep_alive({}): could keep-alive, but status = {:?}",
977                         T::LOG,
978                         self.keep_alive
979                     );
980                     self.close();
981                 }
982             }
983             (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => {
984                 self.close()
985             }
986             _ => (),
987         }
988     }
989 
disable_keep_alive(&mut self)990     fn disable_keep_alive(&mut self) {
991         self.keep_alive.disable()
992     }
993 
busy(&mut self)994     fn busy(&mut self) {
995         if let KA::Disabled = self.keep_alive.status() {
996             return;
997         }
998         self.keep_alive.busy();
999     }
1000 
idle<T: Http1Transaction>(&mut self)1001     fn idle<T: Http1Transaction>(&mut self) {
1002         debug_assert!(!self.is_idle(), "State::idle() called while idle");
1003 
1004         self.method = None;
1005         self.keep_alive.idle();
1006         if self.is_idle() {
1007             self.reading = Reading::Init;
1008             self.writing = Writing::Init;
1009 
1010             // !T::should_read_first() means Client.
1011             //
1012             // If Client connection has just gone idle, the Dispatcher
1013             // should try the poll loop one more time, so as to poll the
1014             // pending requests stream.
1015             if !T::should_read_first() {
1016                 self.notify_read = true;
1017             }
1018         } else {
1019             self.close();
1020         }
1021     }
1022 
is_idle(&self) -> bool1023     fn is_idle(&self) -> bool {
1024         if let KA::Idle = self.keep_alive.status() {
1025             true
1026         } else {
1027             false
1028         }
1029     }
1030 
is_read_closed(&self) -> bool1031     fn is_read_closed(&self) -> bool {
1032         match self.reading {
1033             Reading::Closed => true,
1034             _ => false,
1035         }
1036     }
1037 
is_write_closed(&self) -> bool1038     fn is_write_closed(&self) -> bool {
1039         match self.writing {
1040             Writing::Closed => true,
1041             _ => false,
1042         }
1043     }
1044 
prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade1045     fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
1046         let (tx, rx) = crate::upgrade::pending();
1047         self.upgrade = Some(tx);
1048         rx
1049     }
1050 }
1051 
1052 #[cfg(test)]
1053 mod tests {
1054     #[cfg(feature = "nightly")]
1055     #[bench]
bench_read_head_short(b: &mut ::test::Bencher)1056     fn bench_read_head_short(b: &mut ::test::Bencher) {
1057         use super::*;
1058         let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n";
1059         let len = s.len();
1060         b.bytes = len as u64;
1061 
1062         // an empty IO, we'll be skipping and using the read buffer anyways
1063         let io = tokio_test::io::Builder::new().build();
1064         let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io);
1065         *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]);
1066         conn.state.cached_headers = Some(HeaderMap::with_capacity(2));
1067 
1068         let rt = tokio::runtime::Builder::new_current_thread()
1069             .enable_all()
1070             .build()
1071             .unwrap();
1072 
1073         b.iter(|| {
1074             rt.block_on(futures_util::future::poll_fn(|cx| {
1075                 match conn.poll_read_head(cx) {
1076                     Poll::Ready(Some(Ok(x))) => {
1077                         ::test::black_box(&x);
1078                         let mut headers = x.0.headers;
1079                         headers.clear();
1080                         conn.state.cached_headers = Some(headers);
1081                     }
1082                     f => panic!("expected Ready(Some(Ok(..))): {:?}", f),
1083                 }
1084 
1085                 conn.io.read_buf_mut().reserve(1);
1086                 unsafe {
1087                     conn.io.read_buf_mut().set_len(len);
1088                 }
1089                 conn.state.reading = Reading::Init;
1090                 Poll::Ready(())
1091             }));
1092         });
1093     }
1094 
1095     /*
1096     //TODO: rewrite these using dispatch... someday...
1097     use futures::{Async, Future, Stream, Sink};
1098     use futures::future;
1099 
1100     use proto::{self, ClientTransaction, MessageHead, ServerTransaction};
1101     use super::super::Encoder;
1102     use mock::AsyncIo;
1103 
1104     use super::{Conn, Decoder, Reading, Writing};
1105     use ::uri::Uri;
1106 
1107     use std::str::FromStr;
1108 
1109     #[test]
1110     fn test_conn_init_read() {
1111         let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec();
1112         let len = good_message.len();
1113         let io = AsyncIo::new_buf(good_message, len);
1114         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1115 
1116         match conn.poll().unwrap() {
1117             Async::Ready(Some(Frame::Message { message, body: false })) => {
1118                 assert_eq!(message, MessageHead {
1119                     subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()),
1120                     .. MessageHead::default()
1121                 })
1122             },
1123             f => panic!("frame is not Frame::Message: {:?}", f)
1124         }
1125     }
1126 
1127     #[test]
1128     fn test_conn_parse_partial() {
1129         let _: Result<(), ()> = future::lazy(|| {
1130             let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec();
1131             let io = AsyncIo::new_buf(good_message, 10);
1132             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1133             assert!(conn.poll().unwrap().is_not_ready());
1134             conn.io.io_mut().block_in(50);
1135             let async = conn.poll().unwrap();
1136             assert!(async.is_ready());
1137             match async {
1138                 Async::Ready(Some(Frame::Message { .. })) => (),
1139                 f => panic!("frame is not Message: {:?}", f),
1140             }
1141             Ok(())
1142         }).wait();
1143     }
1144 
1145     #[test]
1146     fn test_conn_init_read_eof_idle() {
1147         let io = AsyncIo::new_buf(vec![], 1);
1148         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1149         conn.state.idle();
1150 
1151         match conn.poll().unwrap() {
1152             Async::Ready(None) => {},
1153             other => panic!("frame is not None: {:?}", other)
1154         }
1155     }
1156 
1157     #[test]
1158     fn test_conn_init_read_eof_idle_partial_parse() {
1159         let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100);
1160         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1161         conn.state.idle();
1162 
1163         match conn.poll() {
1164             Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1165             other => panic!("unexpected frame: {:?}", other)
1166         }
1167     }
1168 
1169     #[test]
1170     fn test_conn_init_read_eof_busy() {
1171         let _: Result<(), ()> = future::lazy(|| {
1172             // server ignores
1173             let io = AsyncIo::new_eof();
1174             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1175             conn.state.busy();
1176 
1177             match conn.poll().unwrap() {
1178                 Async::Ready(None) => {},
1179                 other => panic!("unexpected frame: {:?}", other)
1180             }
1181 
1182             // client
1183             let io = AsyncIo::new_eof();
1184             let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1185             conn.state.busy();
1186 
1187             match conn.poll() {
1188                 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1189                 other => panic!("unexpected frame: {:?}", other)
1190             }
1191             Ok(())
1192         }).wait();
1193     }
1194 
1195     #[test]
1196     fn test_conn_body_finish_read_eof() {
1197         let _: Result<(), ()> = future::lazy(|| {
1198             let io = AsyncIo::new_eof();
1199             let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1200             conn.state.busy();
1201             conn.state.writing = Writing::KeepAlive;
1202             conn.state.reading = Reading::Body(Decoder::length(0));
1203 
1204             match conn.poll() {
1205                 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1206                 other => panic!("unexpected frame: {:?}", other)
1207             }
1208 
1209             // conn eofs, but tokio-proto will call poll() again, before calling flush()
1210             // the conn eof in this case is perfectly fine
1211 
1212             match conn.poll() {
1213                 Ok(Async::Ready(None)) => (),
1214                 other => panic!("unexpected frame: {:?}", other)
1215             }
1216             Ok(())
1217         }).wait();
1218     }
1219 
1220     #[test]
1221     fn test_conn_message_empty_body_read_eof() {
1222         let _: Result<(), ()> = future::lazy(|| {
1223             let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024);
1224             let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1225             conn.state.busy();
1226             conn.state.writing = Writing::KeepAlive;
1227 
1228             match conn.poll() {
1229                 Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (),
1230                 other => panic!("unexpected frame: {:?}", other)
1231             }
1232 
1233             // conn eofs, but tokio-proto will call poll() again, before calling flush()
1234             // the conn eof in this case is perfectly fine
1235 
1236             match conn.poll() {
1237                 Ok(Async::Ready(None)) => (),
1238                 other => panic!("unexpected frame: {:?}", other)
1239             }
1240             Ok(())
1241         }).wait();
1242     }
1243 
1244     #[test]
1245     fn test_conn_read_body_end() {
1246         let _: Result<(), ()> = future::lazy(|| {
1247             let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024);
1248             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1249             conn.state.busy();
1250 
1251             match conn.poll() {
1252                 Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (),
1253                 other => panic!("unexpected frame: {:?}", other)
1254             }
1255 
1256             match conn.poll() {
1257                 Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (),
1258                 other => panic!("unexpected frame: {:?}", other)
1259             }
1260 
1261             // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None`
1262             match conn.poll() {
1263                 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1264                 other => panic!("unexpected frame: {:?}", other)
1265             }
1266 
1267             match conn.poll() {
1268                 Ok(Async::NotReady) => (),
1269                 other => panic!("unexpected frame: {:?}", other)
1270             }
1271             Ok(())
1272         }).wait();
1273     }
1274 
1275     #[test]
1276     fn test_conn_closed_read() {
1277         let io = AsyncIo::new_buf(vec![], 0);
1278         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1279         conn.state.close();
1280 
1281         match conn.poll().unwrap() {
1282             Async::Ready(None) => {},
1283             other => panic!("frame is not None: {:?}", other)
1284         }
1285     }
1286 
1287     #[test]
1288     fn test_conn_body_write_length() {
1289         let _ = pretty_env_logger::try_init();
1290         let _: Result<(), ()> = future::lazy(|| {
1291             let io = AsyncIo::new_buf(vec![], 0);
1292             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1293             let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096;
1294             conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64));
1295 
1296             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready());
1297             assert!(!conn.can_buffer_body());
1298 
1299             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready());
1300 
1301             conn.io.io_mut().block_in(1024 * 3);
1302             assert!(conn.poll_complete().unwrap().is_not_ready());
1303             conn.io.io_mut().block_in(1024 * 3);
1304             assert!(conn.poll_complete().unwrap().is_not_ready());
1305             conn.io.io_mut().block_in(max * 2);
1306             assert!(conn.poll_complete().unwrap().is_ready());
1307 
1308             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready());
1309             Ok(())
1310         }).wait();
1311     }
1312 
1313     #[test]
1314     fn test_conn_body_write_chunked() {
1315         let _: Result<(), ()> = future::lazy(|| {
1316             let io = AsyncIo::new_buf(vec![], 4096);
1317             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1318             conn.state.writing = Writing::Body(Encoder::chunked());
1319 
1320             assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready());
1321             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready());
1322             Ok(())
1323         }).wait();
1324     }
1325 
1326     #[test]
1327     fn test_conn_body_flush() {
1328         let _: Result<(), ()> = future::lazy(|| {
1329             let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5);
1330             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1331             conn.state.writing = Writing::Body(Encoder::length(1024 * 1024));
1332             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready());
1333             assert!(!conn.can_buffer_body());
1334             conn.io.io_mut().block_in(1024 * 1024 * 5);
1335             assert!(conn.poll_complete().unwrap().is_ready());
1336             assert!(conn.can_buffer_body());
1337             assert!(conn.io.io_mut().flushed());
1338 
1339             Ok(())
1340         }).wait();
1341     }
1342 
1343     #[test]
1344     fn test_conn_parking() {
1345         use std::sync::Arc;
1346         use futures::executor::Notify;
1347         use futures::executor::NotifyHandle;
1348 
1349         struct Car {
1350             permit: bool,
1351         }
1352         impl Notify for Car {
1353             fn notify(&self, _id: usize) {
1354                 assert!(self.permit, "unparked without permit");
1355             }
1356         }
1357 
1358         fn car(permit: bool) -> NotifyHandle {
1359             Arc::new(Car {
1360                 permit: permit,
1361             }).into()
1362         }
1363 
1364         // test that once writing is done, unparks
1365         let f = future::lazy(|| {
1366             let io = AsyncIo::new_buf(vec![], 4096);
1367             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1368             conn.state.reading = Reading::KeepAlive;
1369             assert!(conn.poll().unwrap().is_not_ready());
1370 
1371             conn.state.writing = Writing::KeepAlive;
1372             assert!(conn.poll_complete().unwrap().is_ready());
1373             Ok::<(), ()>(())
1374         });
1375         ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap();
1376 
1377 
1378         // test that flushing when not waiting on read doesn't unpark
1379         let f = future::lazy(|| {
1380             let io = AsyncIo::new_buf(vec![], 4096);
1381             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1382             conn.state.writing = Writing::KeepAlive;
1383             assert!(conn.poll_complete().unwrap().is_ready());
1384             Ok::<(), ()>(())
1385         });
1386         ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1387 
1388 
1389         // test that flushing and writing isn't done doesn't unpark
1390         let f = future::lazy(|| {
1391             let io = AsyncIo::new_buf(vec![], 4096);
1392             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1393             conn.state.reading = Reading::KeepAlive;
1394             assert!(conn.poll().unwrap().is_not_ready());
1395             conn.state.writing = Writing::Body(Encoder::length(5_000));
1396             assert!(conn.poll_complete().unwrap().is_ready());
1397             Ok::<(), ()>(())
1398         });
1399         ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1400     }
1401 
1402     #[test]
1403     fn test_conn_closed_write() {
1404         let io = AsyncIo::new_buf(vec![], 0);
1405         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1406         conn.state.close();
1407 
1408         match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) {
1409             Err(_e) => {},
1410             other => panic!("did not return Err: {:?}", other)
1411         }
1412 
1413         assert!(conn.state.is_write_closed());
1414     }
1415 
1416     #[test]
1417     fn test_conn_write_empty_chunk() {
1418         let io = AsyncIo::new_buf(vec![], 0);
1419         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1420         conn.state.writing = Writing::KeepAlive;
1421 
1422         assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready());
1423         assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready());
1424         conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err();
1425     }
1426     */
1427 }
1428