1 use std::fmt;
2 use std::io::{self};
3 use std::marker::PhantomData;
4 
5 use bytes::{Buf, Bytes};
6 use http::header::{HeaderValue, CONNECTION};
7 use http::{HeaderMap, Method, Version};
8 use tokio::io::{AsyncRead, AsyncWrite};
9 
10 use super::io::Buffered;
11 use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants};
12 use crate::common::{task, Pin, Poll, Unpin};
13 use crate::headers::connection_keep_alive;
14 use crate::proto::{BodyLength, DecodedLength, MessageHead};
15 
16 const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
17 
18 /// This handles a connection, which will have been established over an
19 /// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple
20 /// `Transaction`s over HTTP.
21 ///
22 /// The connection will determine when a message begins and ends as well as
23 /// determine if this connection can be kept alive after the message,
24 /// or if it is complete.
25 pub(crate) struct Conn<I, B, T> {
26     io: Buffered<I, EncodedBuf<B>>,
27     state: State,
28     _marker: PhantomData<fn(T)>,
29 }
30 
31 impl<I, B, T> Conn<I, B, T>
32 where
33     I: AsyncRead + AsyncWrite + Unpin,
34     B: Buf,
35     T: Http1Transaction,
36 {
37     pub fn new(io: I) -> Conn<I, B, T> {
38         Conn {
39             io: Buffered::new(io),
40             state: State {
41                 allow_half_close: false,
42                 cached_headers: None,
43                 error: None,
44                 keep_alive: KA::Busy,
45                 method: None,
46                 title_case_headers: false,
47                 notify_read: false,
48                 reading: Reading::Init,
49                 writing: Writing::Init,
50                 upgrade: None,
51                 // We assume a modern world where the remote speaks HTTP/1.1.
52                 // If they tell us otherwise, we'll downgrade in `read_head`.
53                 version: Version::HTTP_11,
54             },
55             _marker: PhantomData,
56         }
57     }
58 
59     pub fn set_flush_pipeline(&mut self, enabled: bool) {
60         self.io.set_flush_pipeline(enabled);
61     }
62 
63     pub fn set_max_buf_size(&mut self, max: usize) {
start_txns(void * e)64         self.io.set_max_buf_size(max);
65     }
66 
67     pub fn set_read_buf_exact_size(&mut self, sz: usize) {
68         self.io.set_read_buf_exact_size(sz);
69     }
70 
71     pub fn set_write_strategy_flatten(&mut self) {
72         self.io.set_write_strategy_flatten();
73     }
74 
75     pub fn set_title_case_headers(&mut self) {
76         self.state.title_case_headers = true;
77     }
78 
79     pub(crate) fn set_allow_half_close(&mut self) {
80         self.state.allow_half_close = true;
81     }
start_checkpoints(void)82 
83     pub fn into_inner(self) -> (I, Bytes) {
84         self.io.into_inner()
85     }
86 
87     pub fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> {
88         self.state.upgrade.take()
89     }
90 
91     pub fn is_read_closed(&self) -> bool {
92         self.state.is_read_closed()
test_main(int argc,char * const argv[])93     }
94 
95     pub fn is_write_closed(&self) -> bool {
96         self.state.is_write_closed()
97     }
98 
99     pub fn can_read_head(&self) -> bool {
100         match self.state.reading {
101             Reading::Init => {
102                 if T::should_read_first() {
103                     true
104                 } else {
105                     match self.state.writing {
106                         Writing::Init => false,
107                         _ => true,
108                     }
109                 }
110             }
111             _ => false,
112         }
113     }
114 
115     pub fn can_read_body(&self) -> bool {
116         match self.state.reading {
117             Reading::Body(..) | Reading::Continue(..) => true,
118             _ => false,
119         }
120     }
121 
122     fn should_error_on_eof(&self) -> bool {
123         // If we're idle, it's probably just the connection closing gracefully.
124         T::should_error_on_parse_eof() && !self.state.is_idle()
125     }
126 
127     fn has_h2_prefix(&self) -> bool {
128         let read_buf = self.io.read_buf();
129         read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
130     }
131 
132     pub(super) fn poll_read_head(
133         &mut self,
134         cx: &mut task::Context<'_>,
135     ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> {
136         debug_assert!(self.can_read_head());
137         trace!("Conn::read_head");
138 
139         let msg = match ready!(self.io.parse::<T>(
140             cx,
141             ParseContext {
142                 cached_headers: &mut self.state.cached_headers,
143                 req_method: &mut self.state.method,
144             }
145         )) {
146             Ok(msg) => msg,
147             Err(e) => return self.on_read_head_error(e),
148         };
149 
150         // Note: don't deconstruct `msg` into local variables, it appears
151         // the optimizer doesn't remove the extra copies.
152 
153         debug!("incoming body is {}", msg.decode);
154 
155         self.state.busy();
156         self.state.keep_alive &= msg.keep_alive;
157         self.state.version = msg.head.version;
158 
159         let mut wants = if msg.wants_upgrade {
160             Wants::UPGRADE
161         } else {
162             Wants::EMPTY
163         };
164 
165         if msg.decode == DecodedLength::ZERO {
166             if msg.expect_continue {
167                 debug!("ignoring expect-continue since body is empty");
168             }
169             self.state.reading = Reading::KeepAlive;
170             if !T::should_read_first() {
171                 self.try_keep_alive(cx);
172             }
173         } else if msg.expect_continue {
174             self.state.reading = Reading::Continue(Decoder::new(msg.decode));
175             wants = wants.add(Wants::EXPECT);
176         } else {
177             self.state.reading = Reading::Body(Decoder::new(msg.decode));
178         }
179 
180         Poll::Ready(Some(Ok((msg.head, msg.decode, wants))))
181     }
182 
183     fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> {
184         // If we are currently waiting on a message, then an empty
185         // message should be reported as an error. If not, it is just
186         // the connection closing gracefully.
187         let must_error = self.should_error_on_eof();
188         self.close_read();
189         self.io.consume_leading_lines();
190         let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
191         if was_mid_parse || must_error {
192             // We check if the buf contains the h2 Preface
193             debug!(
194                 "parse error ({}) with {} bytes",
195                 e,
196                 self.io.read_buf().len()
197             );
198             match self.on_parse_error(e) {
199                 Ok(()) => Poll::Pending, // XXX: wat?
200                 Err(e) => Poll::Ready(Some(Err(e))),
201             }
202         } else {
203             debug!("read eof");
204             self.close_write();
205             Poll::Ready(None)
206         }
207     }
208 
209     pub fn poll_read_body(
210         &mut self,
211         cx: &mut task::Context<'_>,
212     ) -> Poll<Option<io::Result<Bytes>>> {
213         debug_assert!(self.can_read_body());
214 
215         let (reading, ret) = match self.state.reading {
216             Reading::Body(ref mut decoder) => {
217                 match decoder.decode(cx, &mut self.io) {
218                     Poll::Ready(Ok(slice)) => {
219                         let (reading, chunk) = if decoder.is_eof() {
220                             debug!("incoming body completed");
221                             (
222                                 Reading::KeepAlive,
223                                 if !slice.is_empty() {
224                                     Some(Ok(slice))
225                                 } else {
226                                     None
227                                 },
228                             )
229                         } else if slice.is_empty() {
230                             error!("incoming body unexpectedly ended");
231                             // This should be unreachable, since all 3 decoders
232                             // either set eof=true or return an Err when reading
233                             // an empty slice...
234                             (Reading::Closed, None)
235                         } else {
236                             return Poll::Ready(Some(Ok(slice)));
237                         };
238                         (reading, Poll::Ready(chunk))
239                     }
240                     Poll::Pending => return Poll::Pending,
241                     Poll::Ready(Err(e)) => {
242                         debug!("incoming body decode error: {}", e);
243                         (Reading::Closed, Poll::Ready(Some(Err(e))))
244                     }
245                 }
246             }
247             Reading::Continue(ref decoder) => {
248                 // Write the 100 Continue if not already responded...
249                 if let Writing::Init = self.state.writing {
250                     trace!("automatically sending 100 Continue");
251                     let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
252                     self.io.headers_buf().extend_from_slice(cont);
253                 }
254 
255                 // And now recurse once in the Reading::Body state...
256                 self.state.reading = Reading::Body(decoder.clone());
257                 return self.poll_read_body(cx);
258             }
259             _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading),
260         };
261 
262         self.state.reading = reading;
263         self.try_keep_alive(cx);
264         ret
265     }
266 
267     pub fn wants_read_again(&mut self) -> bool {
268         let ret = self.state.notify_read;
269         self.state.notify_read = false;
270         ret
271     }
272 
273     pub fn poll_read_keep_alive(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
274         debug_assert!(!self.can_read_head() && !self.can_read_body());
275 
276         if self.is_read_closed() {
277             Poll::Pending
278         } else if self.is_mid_message() {
279             self.mid_message_detect_eof(cx)
280         } else {
281             self.require_empty_read(cx)
282         }
283     }
284 
285     fn is_mid_message(&self) -> bool {
286         match (&self.state.reading, &self.state.writing) {
287             (&Reading::Init, &Writing::Init) => false,
288             _ => true,
289         }
290     }
291 
292     // This will check to make sure the io object read is empty.
293     //
294     // This should only be called for Clients wanting to enter the idle
295     // state.
296     fn require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
297         debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
298         debug_assert!(!self.is_mid_message());
299         debug_assert!(T::is_client());
300 
301         if !self.io.read_buf().is_empty() {
302             debug!("received an unexpected {} bytes", self.io.read_buf().len());
303             return Poll::Ready(Err(crate::Error::new_unexpected_message()));
304         }
305 
306         let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
307 
308         if num_read == 0 {
309             let ret = if self.should_error_on_eof() {
310                 trace!("found unexpected EOF on busy connection: {:?}", self.state);
311                 Poll::Ready(Err(crate::Error::new_incomplete()))
312             } else {
313                 trace!("found EOF on idle connection, closing");
314                 Poll::Ready(Ok(()))
315             };
316 
317             // order is important: should_error needs state BEFORE close_read
318             self.state.close_read();
319             return ret;
320         }
321 
322         debug!(
323             "received unexpected {} bytes on an idle connection",
324             num_read
325         );
326         Poll::Ready(Err(crate::Error::new_unexpected_message()))
327     }
328 
329     fn mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
330         debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
331         debug_assert!(self.is_mid_message());
332 
333         if self.state.allow_half_close || !self.io.read_buf().is_empty() {
334             return Poll::Pending;
335         }
336 
337         let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
338 
339         if num_read == 0 {
340             trace!("found unexpected EOF on busy connection: {:?}", self.state);
341             self.state.close_read();
342             Poll::Ready(Err(crate::Error::new_incomplete()))
343         } else {
344             Poll::Ready(Ok(()))
345         }
346     }
347 
348     fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>> {
349         debug_assert!(!self.state.is_read_closed());
350 
351         let result = ready!(self.io.poll_read_from_io(cx));
352         Poll::Ready(result.map_err(|e| {
353             trace!("force_io_read; io error = {:?}", e);
354             self.state.close();
355             e
356         }))
357     }
358 
359     fn maybe_notify(&mut self, cx: &mut task::Context<'_>) {
360         // its possible that we returned NotReady from poll() without having
361         // exhausted the underlying Io. We would have done this when we
362         // determined we couldn't keep reading until we knew how writing
363         // would finish.
364 
365         match self.state.reading {
366             Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => {
367                 return
368             }
369             Reading::Init => (),
370         };
371 
372         match self.state.writing {
373             Writing::Body(..) => return,
374             Writing::Init | Writing::KeepAlive | Writing::Closed => (),
375         }
376 
377         if !self.io.is_read_blocked() {
378             if self.io.read_buf().is_empty() {
379                 match self.io.poll_read_from_io(cx) {
380                     Poll::Ready(Ok(n)) => {
381                         if n == 0 {
382                             trace!("maybe_notify; read eof");
383                             if self.state.is_idle() {
384                                 self.state.close();
385                             } else {
386                                 self.close_read()
387                             }
388                             return;
389                         }
390                     }
391                     Poll::Pending => {
392                         trace!("maybe_notify; read_from_io blocked");
393                         return;
394                     }
395                     Poll::Ready(Err(e)) => {
396                         trace!("maybe_notify; read_from_io error: {}", e);
397                         self.state.close();
398                         self.state.error = Some(crate::Error::new_io(e));
399                     }
400                 }
401             }
402             self.state.notify_read = true;
403         }
404     }
405 
406     fn try_keep_alive(&mut self, cx: &mut task::Context<'_>) {
407         self.state.try_keep_alive::<T>();
408         self.maybe_notify(cx);
409     }
410 
411     pub fn can_write_head(&self) -> bool {
412         if !T::should_read_first() {
413             if let Reading::Closed = self.state.reading {
414                 return false;
415             }
416         }
417         match self.state.writing {
418             Writing::Init => true,
419             _ => false,
420         }
421     }
422 
423     pub fn can_write_body(&self) -> bool {
424         match self.state.writing {
425             Writing::Body(..) => true,
426             Writing::Init | Writing::KeepAlive | Writing::Closed => false,
427         }
428     }
429 
430     pub fn can_buffer_body(&self) -> bool {
431         self.io.can_buffer()
432     }
433 
434     pub fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) {
435         if let Some(encoder) = self.encode_head(head, body) {
436             self.state.writing = if !encoder.is_eof() {
437                 Writing::Body(encoder)
438             } else if encoder.is_last() {
439                 Writing::Closed
440             } else {
441                 Writing::KeepAlive
442             };
443         }
444     }
445 
446     pub fn write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B) {
447         if let Some(encoder) =
448             self.encode_head(head, Some(BodyLength::Known(body.remaining() as u64)))
449         {
450             let is_last = encoder.is_last();
451             // Make sure we don't write a body if we weren't actually allowed
452             // to do so, like because its a HEAD request.
453             if !encoder.is_eof() {
454                 encoder.danger_full_buf(body, self.io.write_buf());
455             }
456             self.state.writing = if is_last {
457                 Writing::Closed
458             } else {
459                 Writing::KeepAlive
460             }
461         }
462     }
463 
464     fn encode_head(
465         &mut self,
466         mut head: MessageHead<T::Outgoing>,
467         body: Option<BodyLength>,
468     ) -> Option<Encoder> {
469         debug_assert!(self.can_write_head());
470 
471         if !T::should_read_first() {
472             self.state.busy();
473         }
474 
475         self.enforce_version(&mut head);
476 
477         let buf = self.io.headers_buf();
478         match T::encode(
479             Encode {
480                 head: &mut head,
481                 body,
482                 keep_alive: self.state.wants_keep_alive(),
483                 req_method: &mut self.state.method,
484                 title_case_headers: self.state.title_case_headers,
485             },
486             buf,
487         ) {
488             Ok(encoder) => {
489                 debug_assert!(self.state.cached_headers.is_none());
490                 debug_assert!(head.headers.is_empty());
491                 self.state.cached_headers = Some(head.headers);
492                 Some(encoder)
493             }
494             Err(err) => {
495                 self.state.error = Some(err);
496                 self.state.writing = Writing::Closed;
497                 None
498             }
499         }
500     }
501 
502     // Fix keep-alives when Connection: keep-alive header is not present
503     fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) {
504         let outgoing_is_keep_alive = head
505             .headers
506             .get(CONNECTION)
507             .map(connection_keep_alive)
508             .unwrap_or(false);
509 
510         if !outgoing_is_keep_alive {
511             match head.version {
512                 // If response is version 1.0 and keep-alive is not present in the response,
513                 // disable keep-alive so the server closes the connection
514                 Version::HTTP_10 => self.state.disable_keep_alive(),
515                 // If response is version 1.1 and keep-alive is wanted, add
516                 // Connection: keep-alive header when not present
517                 Version::HTTP_11 => {
518                     if self.state.wants_keep_alive() {
519                         head.headers
520                             .insert(CONNECTION, HeaderValue::from_static("keep-alive"));
521                     }
522                 }
523                 _ => (),
524             }
525         }
526     }
527 
528     // If we know the remote speaks an older version, we try to fix up any messages
529     // to work with our older peer.
530     fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) {
531         if let Version::HTTP_10 = self.state.version {
532             // Fixes response or connection when keep-alive header is not present
533             self.fix_keep_alive(head);
534             // If the remote only knows HTTP/1.0, we should force ourselves
535             // to do only speak HTTP/1.0 as well.
536             head.version = Version::HTTP_10;
537         }
538         // If the remote speaks HTTP/1.1, then it *should* be fine with
539         // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let
540         // the user's headers be.
541     }
542 
543     pub fn write_body(&mut self, chunk: B) {
544         debug_assert!(self.can_write_body() && self.can_buffer_body());
545         // empty chunks should be discarded at Dispatcher level
546         debug_assert!(chunk.remaining() != 0);
547 
548         let state = match self.state.writing {
549             Writing::Body(ref mut encoder) => {
550                 self.io.buffer(encoder.encode(chunk));
551 
552                 if encoder.is_eof() {
553                     if encoder.is_last() {
554                         Writing::Closed
555                     } else {
556                         Writing::KeepAlive
557                     }
558                 } else {
559                     return;
560                 }
561             }
562             _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
563         };
564 
565         self.state.writing = state;
566     }
567 
568     pub fn write_body_and_end(&mut self, chunk: B) {
569         debug_assert!(self.can_write_body() && self.can_buffer_body());
570         // empty chunks should be discarded at Dispatcher level
571         debug_assert!(chunk.remaining() != 0);
572 
573         let state = match self.state.writing {
574             Writing::Body(ref encoder) => {
575                 let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf());
576                 if can_keep_alive {
577                     Writing::KeepAlive
578                 } else {
579                     Writing::Closed
580                 }
581             }
582             _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
583         };
584 
585         self.state.writing = state;
586     }
587 
588     pub fn end_body(&mut self) {
589         debug_assert!(self.can_write_body());
590 
591         let state = match self.state.writing {
592             Writing::Body(ref mut encoder) => {
593                 // end of stream, that means we should try to eof
594                 match encoder.end() {
595                     Ok(end) => {
596                         if let Some(end) = end {
597                             self.io.buffer(end);
598                         }
599                         if encoder.is_last() {
600                             Writing::Closed
601                         } else {
602                             Writing::KeepAlive
603                         }
604                     }
605                     Err(_not_eof) => Writing::Closed,
606                 }
607             }
608             _ => return,
609         };
610 
611         self.state.writing = state;
612     }
613 
614     // When we get a parse error, depending on what side we are, we might be able
615     // to write a response before closing the connection.
616     //
617     // - Client: there is nothing we can do
618     // - Server: if Response hasn't been written yet, we can send a 4xx response
619     fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> {
620         if let Writing::Init = self.state.writing {
621             if self.has_h2_prefix() {
622                 return Err(crate::Error::new_version_h2());
623             }
624             if let Some(msg) = T::on_error(&err) {
625                 // Drop the cached headers so as to not trigger a debug
626                 // assert in `write_head`...
627                 self.state.cached_headers.take();
628                 self.write_head(msg, None);
629                 self.state.error = Some(err);
630                 return Ok(());
631             }
632         }
633 
634         // fallback is pass the error back up
635         Err(err)
636     }
637 
638     pub fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
639         ready!(Pin::new(&mut self.io).poll_flush(cx))?;
640         self.try_keep_alive(cx);
641         trace!("flushed({}): {:?}", T::LOG, self.state);
642         Poll::Ready(Ok(()))
643     }
644 
645     pub fn poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
646         match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) {
647             Ok(()) => {
648                 trace!("shut down IO complete");
649                 Poll::Ready(Ok(()))
650             }
651             Err(e) => {
652                 debug!("error shutting down IO: {}", e);
653                 Poll::Ready(Err(e))
654             }
655         }
656     }
657 
658     pub fn close_read(&mut self) {
659         self.state.close_read();
660     }
661 
662     pub fn close_write(&mut self) {
663         self.state.close_write();
664     }
665 
666     pub fn disable_keep_alive(&mut self) {
667         if self.state.is_idle() {
668             trace!("disable_keep_alive; closing idle connection");
669             self.state.close();
670         } else {
671             trace!("disable_keep_alive; in-progress connection");
672             self.state.disable_keep_alive();
673         }
674     }
675 
676     pub fn take_error(&mut self) -> crate::Result<()> {
677         if let Some(err) = self.state.error.take() {
678             Err(err)
679         } else {
680             Ok(())
681         }
682     }
683 
684     pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
685         trace!("{}: prepare possible HTTP upgrade", T::LOG);
686         self.state.prepare_upgrade()
687     }
688 }
689 
690 impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> {
691     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
692         f.debug_struct("Conn")
693             .field("state", &self.state)
694             .field("io", &self.io)
695             .finish()
696     }
697 }
698 
699 // B and T are never pinned
700 impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {}
701 
702 struct State {
703     allow_half_close: bool,
704     /// Re-usable HeaderMap to reduce allocating new ones.
705     cached_headers: Option<HeaderMap>,
706     /// If an error occurs when there wasn't a direct way to return it
707     /// back to the user, this is set.
708     error: Option<crate::Error>,
709     /// Current keep-alive status.
710     keep_alive: KA,
711     /// If mid-message, the HTTP Method that started it.
712     ///
713     /// This is used to know things such as if the message can include
714     /// a body or not.
715     method: Option<Method>,
716     title_case_headers: bool,
717     /// Set to true when the Dispatcher should poll read operations
718     /// again. See the `maybe_notify` method for more.
719     notify_read: bool,
720     /// State of allowed reads
721     reading: Reading,
722     /// State of allowed writes
723     writing: Writing,
724     /// An expected pending HTTP upgrade.
725     upgrade: Option<crate::upgrade::Pending>,
726     /// Either HTTP/1.0 or 1.1 connection
727     version: Version,
728 }
729 
730 #[derive(Debug)]
731 enum Reading {
732     Init,
733     Continue(Decoder),
734     Body(Decoder),
735     KeepAlive,
736     Closed,
737 }
738 
739 enum Writing {
740     Init,
741     Body(Encoder),
742     KeepAlive,
743     Closed,
744 }
745 
746 impl fmt::Debug for State {
747     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
748         let mut builder = f.debug_struct("State");
749         builder
750             .field("reading", &self.reading)
751             .field("writing", &self.writing)
752             .field("keep_alive", &self.keep_alive);
753 
754         // Only show error field if it's interesting...
755         if let Some(ref error) = self.error {
756             builder.field("error", error);
757         }
758 
759         if self.allow_half_close {
760             builder.field("allow_half_close", &true);
761         }
762 
763         // Purposefully leaving off other fields..
764 
765         builder.finish()
766     }
767 }
768 
769 impl fmt::Debug for Writing {
770     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
771         match *self {
772             Writing::Init => f.write_str("Init"),
773             Writing::Body(ref enc) => f.debug_tuple("Body").field(enc).finish(),
774             Writing::KeepAlive => f.write_str("KeepAlive"),
775             Writing::Closed => f.write_str("Closed"),
776         }
777     }
778 }
779 
780 impl std::ops::BitAndAssign<bool> for KA {
781     fn bitand_assign(&mut self, enabled: bool) {
782         if !enabled {
783             trace!("remote disabling keep-alive");
784             *self = KA::Disabled;
785         }
786     }
787 }
788 
789 #[derive(Clone, Copy, Debug)]
790 enum KA {
791     Idle,
792     Busy,
793     Disabled,
794 }
795 
796 impl Default for KA {
797     fn default() -> KA {
798         KA::Busy
799     }
800 }
801 
802 impl KA {
803     fn idle(&mut self) {
804         *self = KA::Idle;
805     }
806 
807     fn busy(&mut self) {
808         *self = KA::Busy;
809     }
810 
811     fn disable(&mut self) {
812         *self = KA::Disabled;
813     }
814 
815     fn status(&self) -> KA {
816         *self
817     }
818 }
819 
820 impl State {
821     fn close(&mut self) {
822         trace!("State::close()");
823         self.reading = Reading::Closed;
824         self.writing = Writing::Closed;
825         self.keep_alive.disable();
826     }
827 
828     fn close_read(&mut self) {
829         trace!("State::close_read()");
830         self.reading = Reading::Closed;
831         self.keep_alive.disable();
832     }
833 
834     fn close_write(&mut self) {
835         trace!("State::close_write()");
836         self.writing = Writing::Closed;
837         self.keep_alive.disable();
838     }
839 
840     fn wants_keep_alive(&self) -> bool {
841         if let KA::Disabled = self.keep_alive.status() {
842             false
843         } else {
844             true
845         }
846     }
847 
848     fn try_keep_alive<T: Http1Transaction>(&mut self) {
849         match (&self.reading, &self.writing) {
850             (&Reading::KeepAlive, &Writing::KeepAlive) => {
851                 if let KA::Busy = self.keep_alive.status() {
852                     self.idle::<T>();
853                 } else {
854                     trace!(
855                         "try_keep_alive({}): could keep-alive, but status = {:?}",
856                         T::LOG,
857                         self.keep_alive
858                     );
859                     self.close();
860                 }
861             }
862             (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => {
863                 self.close()
864             }
865             _ => (),
866         }
867     }
868 
869     fn disable_keep_alive(&mut self) {
870         self.keep_alive.disable()
871     }
872 
873     fn busy(&mut self) {
874         if let KA::Disabled = self.keep_alive.status() {
875             return;
876         }
877         self.keep_alive.busy();
878     }
879 
880     fn idle<T: Http1Transaction>(&mut self) {
881         debug_assert!(!self.is_idle(), "State::idle() called while idle");
882 
883         self.method = None;
884         self.keep_alive.idle();
885         if self.is_idle() {
886             self.reading = Reading::Init;
887             self.writing = Writing::Init;
888 
889             // !T::should_read_first() means Client.
890             //
891             // If Client connection has just gone idle, the Dispatcher
892             // should try the poll loop one more time, so as to poll the
893             // pending requests stream.
894             if !T::should_read_first() {
895                 self.notify_read = true;
896             }
897         } else {
898             self.close();
899         }
900     }
901 
902     fn is_idle(&self) -> bool {
903         if let KA::Idle = self.keep_alive.status() {
904             true
905         } else {
906             false
907         }
908     }
909 
910     fn is_read_closed(&self) -> bool {
911         match self.reading {
912             Reading::Closed => true,
913             _ => false,
914         }
915     }
916 
917     fn is_write_closed(&self) -> bool {
918         match self.writing {
919             Writing::Closed => true,
920             _ => false,
921         }
922     }
923 
924     fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
925         let (tx, rx) = crate::upgrade::pending();
926         self.upgrade = Some(tx);
927         rx
928     }
929 }
930 
931 #[cfg(test)]
932 mod tests {
933     #[cfg(feature = "nightly")]
934     #[bench]
935     fn bench_read_head_short(b: &mut ::test::Bencher) {
936         use super::*;
937         let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n";
938         let len = s.len();
939         b.bytes = len as u64;
940 
941         // an empty IO, we'll be skipping and using the read buffer anyways
942         let io = tokio_test::io::Builder::new().build();
943         let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io);
944         *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]);
945         conn.state.cached_headers = Some(HeaderMap::with_capacity(2));
946 
947         let mut rt = tokio::runtime::Builder::new()
948             .enable_all()
949             .basic_scheduler()
950             .build()
951             .unwrap();
952 
953         b.iter(|| {
954             rt.block_on(futures_util::future::poll_fn(|cx| {
955                 match conn.poll_read_head(cx) {
956                     Poll::Ready(Some(Ok(x))) => {
957                         ::test::black_box(&x);
958                         let mut headers = x.0.headers;
959                         headers.clear();
960                         conn.state.cached_headers = Some(headers);
961                     }
962                     f => panic!("expected Ready(Some(Ok(..))): {:?}", f),
963                 }
964 
965                 conn.io.read_buf_mut().reserve(1);
966                 unsafe {
967                     conn.io.read_buf_mut().set_len(len);
968                 }
969                 conn.state.reading = Reading::Init;
970                 Poll::Ready(())
971             }));
972         });
973     }
974 
975     /*
976     //TODO: rewrite these using dispatch... someday...
977     use futures::{Async, Future, Stream, Sink};
978     use futures::future;
979 
980     use proto::{self, ClientTransaction, MessageHead, ServerTransaction};
981     use super::super::Encoder;
982     use mock::AsyncIo;
983 
984     use super::{Conn, Decoder, Reading, Writing};
985     use ::uri::Uri;
986 
987     use std::str::FromStr;
988 
989     #[test]
990     fn test_conn_init_read() {
991         let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec();
992         let len = good_message.len();
993         let io = AsyncIo::new_buf(good_message, len);
994         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
995 
996         match conn.poll().unwrap() {
997             Async::Ready(Some(Frame::Message { message, body: false })) => {
998                 assert_eq!(message, MessageHead {
999                     subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()),
1000                     .. MessageHead::default()
1001                 })
1002             },
1003             f => panic!("frame is not Frame::Message: {:?}", f)
1004         }
1005     }
1006 
1007     #[test]
1008     fn test_conn_parse_partial() {
1009         let _: Result<(), ()> = future::lazy(|| {
1010             let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec();
1011             let io = AsyncIo::new_buf(good_message, 10);
1012             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1013             assert!(conn.poll().unwrap().is_not_ready());
1014             conn.io.io_mut().block_in(50);
1015             let async = conn.poll().unwrap();
1016             assert!(async.is_ready());
1017             match async {
1018                 Async::Ready(Some(Frame::Message { .. })) => (),
1019                 f => panic!("frame is not Message: {:?}", f),
1020             }
1021             Ok(())
1022         }).wait();
1023     }
1024 
1025     #[test]
1026     fn test_conn_init_read_eof_idle() {
1027         let io = AsyncIo::new_buf(vec![], 1);
1028         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1029         conn.state.idle();
1030 
1031         match conn.poll().unwrap() {
1032             Async::Ready(None) => {},
1033             other => panic!("frame is not None: {:?}", other)
1034         }
1035     }
1036 
1037     #[test]
1038     fn test_conn_init_read_eof_idle_partial_parse() {
1039         let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100);
1040         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1041         conn.state.idle();
1042 
1043         match conn.poll() {
1044             Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1045             other => panic!("unexpected frame: {:?}", other)
1046         }
1047     }
1048 
1049     #[test]
1050     fn test_conn_init_read_eof_busy() {
1051         let _: Result<(), ()> = future::lazy(|| {
1052             // server ignores
1053             let io = AsyncIo::new_eof();
1054             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1055             conn.state.busy();
1056 
1057             match conn.poll().unwrap() {
1058                 Async::Ready(None) => {},
1059                 other => panic!("unexpected frame: {:?}", other)
1060             }
1061 
1062             // client
1063             let io = AsyncIo::new_eof();
1064             let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1065             conn.state.busy();
1066 
1067             match conn.poll() {
1068                 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1069                 other => panic!("unexpected frame: {:?}", other)
1070             }
1071             Ok(())
1072         }).wait();
1073     }
1074 
1075     #[test]
1076     fn test_conn_body_finish_read_eof() {
1077         let _: Result<(), ()> = future::lazy(|| {
1078             let io = AsyncIo::new_eof();
1079             let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1080             conn.state.busy();
1081             conn.state.writing = Writing::KeepAlive;
1082             conn.state.reading = Reading::Body(Decoder::length(0));
1083 
1084             match conn.poll() {
1085                 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1086                 other => panic!("unexpected frame: {:?}", other)
1087             }
1088 
1089             // conn eofs, but tokio-proto will call poll() again, before calling flush()
1090             // the conn eof in this case is perfectly fine
1091 
1092             match conn.poll() {
1093                 Ok(Async::Ready(None)) => (),
1094                 other => panic!("unexpected frame: {:?}", other)
1095             }
1096             Ok(())
1097         }).wait();
1098     }
1099 
1100     #[test]
1101     fn test_conn_message_empty_body_read_eof() {
1102         let _: Result<(), ()> = future::lazy(|| {
1103             let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024);
1104             let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1105             conn.state.busy();
1106             conn.state.writing = Writing::KeepAlive;
1107 
1108             match conn.poll() {
1109                 Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (),
1110                 other => panic!("unexpected frame: {:?}", other)
1111             }
1112 
1113             // conn eofs, but tokio-proto will call poll() again, before calling flush()
1114             // the conn eof in this case is perfectly fine
1115 
1116             match conn.poll() {
1117                 Ok(Async::Ready(None)) => (),
1118                 other => panic!("unexpected frame: {:?}", other)
1119             }
1120             Ok(())
1121         }).wait();
1122     }
1123 
1124     #[test]
1125     fn test_conn_read_body_end() {
1126         let _: Result<(), ()> = future::lazy(|| {
1127             let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024);
1128             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1129             conn.state.busy();
1130 
1131             match conn.poll() {
1132                 Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (),
1133                 other => panic!("unexpected frame: {:?}", other)
1134             }
1135 
1136             match conn.poll() {
1137                 Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (),
1138                 other => panic!("unexpected frame: {:?}", other)
1139             }
1140 
1141             // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None`
1142             match conn.poll() {
1143                 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1144                 other => panic!("unexpected frame: {:?}", other)
1145             }
1146 
1147             match conn.poll() {
1148                 Ok(Async::NotReady) => (),
1149                 other => panic!("unexpected frame: {:?}", other)
1150             }
1151             Ok(())
1152         }).wait();
1153     }
1154 
1155     #[test]
1156     fn test_conn_closed_read() {
1157         let io = AsyncIo::new_buf(vec![], 0);
1158         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1159         conn.state.close();
1160 
1161         match conn.poll().unwrap() {
1162             Async::Ready(None) => {},
1163             other => panic!("frame is not None: {:?}", other)
1164         }
1165     }
1166 
1167     #[test]
1168     fn test_conn_body_write_length() {
1169         let _ = pretty_env_logger::try_init();
1170         let _: Result<(), ()> = future::lazy(|| {
1171             let io = AsyncIo::new_buf(vec![], 0);
1172             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1173             let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096;
1174             conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64));
1175 
1176             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready());
1177             assert!(!conn.can_buffer_body());
1178 
1179             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready());
1180 
1181             conn.io.io_mut().block_in(1024 * 3);
1182             assert!(conn.poll_complete().unwrap().is_not_ready());
1183             conn.io.io_mut().block_in(1024 * 3);
1184             assert!(conn.poll_complete().unwrap().is_not_ready());
1185             conn.io.io_mut().block_in(max * 2);
1186             assert!(conn.poll_complete().unwrap().is_ready());
1187 
1188             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready());
1189             Ok(())
1190         }).wait();
1191     }
1192 
1193     #[test]
1194     fn test_conn_body_write_chunked() {
1195         let _: Result<(), ()> = future::lazy(|| {
1196             let io = AsyncIo::new_buf(vec![], 4096);
1197             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1198             conn.state.writing = Writing::Body(Encoder::chunked());
1199 
1200             assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready());
1201             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready());
1202             Ok(())
1203         }).wait();
1204     }
1205 
1206     #[test]
1207     fn test_conn_body_flush() {
1208         let _: Result<(), ()> = future::lazy(|| {
1209             let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5);
1210             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1211             conn.state.writing = Writing::Body(Encoder::length(1024 * 1024));
1212             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready());
1213             assert!(!conn.can_buffer_body());
1214             conn.io.io_mut().block_in(1024 * 1024 * 5);
1215             assert!(conn.poll_complete().unwrap().is_ready());
1216             assert!(conn.can_buffer_body());
1217             assert!(conn.io.io_mut().flushed());
1218 
1219             Ok(())
1220         }).wait();
1221     }
1222 
1223     #[test]
1224     fn test_conn_parking() {
1225         use std::sync::Arc;
1226         use futures::executor::Notify;
1227         use futures::executor::NotifyHandle;
1228 
1229         struct Car {
1230             permit: bool,
1231         }
1232         impl Notify for Car {
1233             fn notify(&self, _id: usize) {
1234                 assert!(self.permit, "unparked without permit");
1235             }
1236         }
1237 
1238         fn car(permit: bool) -> NotifyHandle {
1239             Arc::new(Car {
1240                 permit: permit,
1241             }).into()
1242         }
1243 
1244         // test that once writing is done, unparks
1245         let f = future::lazy(|| {
1246             let io = AsyncIo::new_buf(vec![], 4096);
1247             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1248             conn.state.reading = Reading::KeepAlive;
1249             assert!(conn.poll().unwrap().is_not_ready());
1250 
1251             conn.state.writing = Writing::KeepAlive;
1252             assert!(conn.poll_complete().unwrap().is_ready());
1253             Ok::<(), ()>(())
1254         });
1255         ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap();
1256 
1257 
1258         // test that flushing when not waiting on read doesn't unpark
1259         let f = future::lazy(|| {
1260             let io = AsyncIo::new_buf(vec![], 4096);
1261             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1262             conn.state.writing = Writing::KeepAlive;
1263             assert!(conn.poll_complete().unwrap().is_ready());
1264             Ok::<(), ()>(())
1265         });
1266         ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1267 
1268 
1269         // test that flushing and writing isn't done doesn't unpark
1270         let f = future::lazy(|| {
1271             let io = AsyncIo::new_buf(vec![], 4096);
1272             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1273             conn.state.reading = Reading::KeepAlive;
1274             assert!(conn.poll().unwrap().is_not_ready());
1275             conn.state.writing = Writing::Body(Encoder::length(5_000));
1276             assert!(conn.poll_complete().unwrap().is_ready());
1277             Ok::<(), ()>(())
1278         });
1279         ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1280     }
1281 
1282     #[test]
1283     fn test_conn_closed_write() {
1284         let io = AsyncIo::new_buf(vec![], 0);
1285         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1286         conn.state.close();
1287 
1288         match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) {
1289             Err(_e) => {},
1290             other => panic!("did not return Err: {:?}", other)
1291         }
1292 
1293         assert!(conn.state.is_write_closed());
1294     }
1295 
1296     #[test]
1297     fn test_conn_write_empty_chunk() {
1298         let io = AsyncIo::new_buf(vec![], 0);
1299         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1300         conn.state.writing = Writing::KeepAlive;
1301 
1302         assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready());
1303         assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready());
1304         conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err();
1305     }
1306     */
1307 }
1308