1 use std::fmt;
2 use std::io::{self};
3 use std::marker::PhantomData;
4 
5 use bytes::{Buf, Bytes};
6 use http::header::{HeaderValue, CONNECTION};
7 use http::{HeaderMap, Method, Version};
8 use tokio::io::{AsyncRead, AsyncWrite};
9 
10 use super::io::Buffered;
11 use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants};
12 use crate::common::{task, Pin, Poll, Unpin};
13 use crate::headers::connection_keep_alive;
14 use crate::proto::{BodyLength, DecodedLength, MessageHead};
15 use crate::Result;
16 
17 const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
18 
19 /// This handles a connection, which will have been established over an
20 /// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple
21 /// `Transaction`s over HTTP.
22 ///
23 /// The connection will determine when a message begins and ends as well as
24 /// determine if this connection can be kept alive after the message,
25 /// or if it is complete.
26 pub(crate) struct Conn<I, B, T> {
27     io: Buffered<I, EncodedBuf<B>>,
28     state: State,
29     _marker: PhantomData<fn(T)>,
30 }
31 
32 impl<I, B, T> Conn<I, B, T>
33 where
34     I: AsyncRead + AsyncWrite + Unpin,
35     B: Buf,
36     T: Http1Transaction,
37 {
new(io: I) -> Conn<I, B, T>38     pub fn new(io: I) -> Conn<I, B, T> {
39         Conn {
40             io: Buffered::new(io),
41             state: State {
42                 allow_half_close: false,
43                 cached_headers: None,
44                 error: None,
45                 keep_alive: KA::Busy,
46                 method: None,
47                 title_case_headers: false,
48                 notify_read: false,
49                 reading: Reading::Init,
50                 writing: Writing::Init,
51                 upgrade: None,
52                 // We assume a modern world where the remote speaks HTTP/1.1.
53                 // If they tell us otherwise, we'll downgrade in `read_head`.
54                 version: Version::HTTP_11,
55             },
56             _marker: PhantomData,
57         }
58     }
59 
set_flush_pipeline(&mut self, enabled: bool)60     pub fn set_flush_pipeline(&mut self, enabled: bool) {
61         self.io.set_flush_pipeline(enabled);
62     }
63 
set_max_buf_size(&mut self, max: usize)64     pub fn set_max_buf_size(&mut self, max: usize) {
65         self.io.set_max_buf_size(max);
66     }
67 
set_read_buf_exact_size(&mut self, sz: usize)68     pub fn set_read_buf_exact_size(&mut self, sz: usize) {
69         self.io.set_read_buf_exact_size(sz);
70     }
71 
set_write_strategy_flatten(&mut self)72     pub fn set_write_strategy_flatten(&mut self) {
73         self.io.set_write_strategy_flatten();
74     }
75 
set_write_strategy_queue(&mut self)76     pub fn set_write_strategy_queue(&mut self) {
77         self.io.set_write_strategy_queue();
78     }
79 
set_title_case_headers(&mut self)80     pub fn set_title_case_headers(&mut self) {
81         self.state.title_case_headers = true;
82     }
83 
set_allow_half_close(&mut self)84     pub(crate) fn set_allow_half_close(&mut self) {
85         self.state.allow_half_close = true;
86     }
87 
into_inner(self) -> (I, Bytes)88     pub fn into_inner(self) -> (I, Bytes) {
89         self.io.into_inner()
90     }
91 
pending_upgrade(&mut self) -> Option<crate::upgrade::Pending>92     pub fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> {
93         self.state.upgrade.take()
94     }
95 
is_read_closed(&self) -> bool96     pub fn is_read_closed(&self) -> bool {
97         self.state.is_read_closed()
98     }
99 
is_write_closed(&self) -> bool100     pub fn is_write_closed(&self) -> bool {
101         self.state.is_write_closed()
102     }
103 
can_read_head(&self) -> bool104     pub fn can_read_head(&self) -> bool {
105         match self.state.reading {
106             Reading::Init => {
107                 if T::should_read_first() {
108                     true
109                 } else {
110                     match self.state.writing {
111                         Writing::Init => false,
112                         _ => true,
113                     }
114                 }
115             }
116             _ => false,
117         }
118     }
119 
can_read_body(&self) -> bool120     pub fn can_read_body(&self) -> bool {
121         match self.state.reading {
122             Reading::Body(..) | Reading::Continue(..) => true,
123             _ => false,
124         }
125     }
126 
should_error_on_eof(&self) -> bool127     fn should_error_on_eof(&self) -> bool {
128         // If we're idle, it's probably just the connection closing gracefully.
129         T::should_error_on_parse_eof() && !self.state.is_idle()
130     }
131 
has_h2_prefix(&self) -> bool132     fn has_h2_prefix(&self) -> bool {
133         let read_buf = self.io.read_buf();
134         read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
135     }
136 
poll_read_head( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>>137     pub(super) fn poll_read_head(
138         &mut self,
139         cx: &mut task::Context<'_>,
140     ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> {
141         debug_assert!(self.can_read_head());
142         trace!("Conn::read_head");
143 
144         let msg = match ready!(self.io.parse::<T>(
145             cx,
146             ParseContext {
147                 cached_headers: &mut self.state.cached_headers,
148                 req_method: &mut self.state.method,
149             }
150         )) {
151             Ok(msg) => msg,
152             Err(e) => return self.on_read_head_error(e),
153         };
154 
155         // Note: don't deconstruct `msg` into local variables, it appears
156         // the optimizer doesn't remove the extra copies.
157 
158         debug!("incoming body is {}", msg.decode);
159 
160         self.state.busy();
161         self.state.keep_alive &= msg.keep_alive;
162         self.state.version = msg.head.version;
163 
164         let mut wants = if msg.wants_upgrade {
165             Wants::UPGRADE
166         } else {
167             Wants::EMPTY
168         };
169 
170         if msg.decode == DecodedLength::ZERO {
171             if msg.expect_continue {
172                 debug!("ignoring expect-continue since body is empty");
173             }
174             self.state.reading = Reading::KeepAlive;
175             if !T::should_read_first() {
176                 self.try_keep_alive(cx);
177             }
178         } else if msg.expect_continue {
179             self.state.reading = Reading::Continue(Decoder::new(msg.decode));
180             wants = wants.add(Wants::EXPECT);
181         } else {
182             self.state.reading = Reading::Body(Decoder::new(msg.decode));
183         }
184 
185         Poll::Ready(Some(Ok((msg.head, msg.decode, wants))))
186     }
187 
on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>>188     fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> {
189         // If we are currently waiting on a message, then an empty
190         // message should be reported as an error. If not, it is just
191         // the connection closing gracefully.
192         let must_error = self.should_error_on_eof();
193         self.close_read();
194         self.io.consume_leading_lines();
195         let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
196         if was_mid_parse || must_error {
197             // We check if the buf contains the h2 Preface
198             debug!(
199                 "parse error ({}) with {} bytes",
200                 e,
201                 self.io.read_buf().len()
202             );
203             match self.on_parse_error(e) {
204                 Ok(()) => Poll::Pending, // XXX: wat?
205                 Err(e) => Poll::Ready(Some(Err(e))),
206             }
207         } else {
208             debug!("read eof");
209             self.close_write();
210             Poll::Ready(None)
211         }
212     }
213 
poll_read_body( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<io::Result<Bytes>>>214     pub fn poll_read_body(
215         &mut self,
216         cx: &mut task::Context<'_>,
217     ) -> Poll<Option<io::Result<Bytes>>> {
218         debug_assert!(self.can_read_body());
219 
220         let (reading, ret) = match self.state.reading {
221             Reading::Body(ref mut decoder) => {
222                 match ready!(decoder.decode(cx, &mut self.io)) {
223                     Ok(slice) => {
224                         let (reading, chunk) = if decoder.is_eof() {
225                             debug!("incoming body completed");
226                             (
227                                 Reading::KeepAlive,
228                                 if !slice.is_empty() {
229                                     Some(Ok(slice))
230                                 } else {
231                                     None
232                                 },
233                             )
234                         } else if slice.is_empty() {
235                             error!("incoming body unexpectedly ended");
236                             // This should be unreachable, since all 3 decoders
237                             // either set eof=true or return an Err when reading
238                             // an empty slice...
239                             (Reading::Closed, None)
240                         } else {
241                             return Poll::Ready(Some(Ok(slice)));
242                         };
243                         (reading, Poll::Ready(chunk))
244                     }
245                     Err(e) => {
246                         debug!("incoming body decode error: {}", e);
247                         (Reading::Closed, Poll::Ready(Some(Err(e))))
248                     }
249                 }
250             }
251             Reading::Continue(ref decoder) => {
252                 // Write the 100 Continue if not already responded...
253                 if let Writing::Init = self.state.writing {
254                     trace!("automatically sending 100 Continue");
255                     let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
256                     self.io.headers_buf().extend_from_slice(cont);
257                 }
258 
259                 // And now recurse once in the Reading::Body state...
260                 self.state.reading = Reading::Body(decoder.clone());
261                 return self.poll_read_body(cx);
262             }
263             _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading),
264         };
265 
266         self.state.reading = reading;
267         self.try_keep_alive(cx);
268         ret
269     }
270 
wants_read_again(&mut self) -> bool271     pub fn wants_read_again(&mut self) -> bool {
272         let ret = self.state.notify_read;
273         self.state.notify_read = false;
274         ret
275     }
276 
poll_read_keep_alive(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>277     pub fn poll_read_keep_alive(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
278         debug_assert!(!self.can_read_head() && !self.can_read_body());
279 
280         if self.is_read_closed() {
281             Poll::Pending
282         } else if self.is_mid_message() {
283             self.mid_message_detect_eof(cx)
284         } else {
285             self.require_empty_read(cx)
286         }
287     }
288 
is_mid_message(&self) -> bool289     fn is_mid_message(&self) -> bool {
290         match (&self.state.reading, &self.state.writing) {
291             (&Reading::Init, &Writing::Init) => false,
292             _ => true,
293         }
294     }
295 
296     // This will check to make sure the io object read is empty.
297     //
298     // This should only be called for Clients wanting to enter the idle
299     // state.
require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>300     fn require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
301         debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
302         debug_assert!(!self.is_mid_message());
303         debug_assert!(T::is_client());
304 
305         if !self.io.read_buf().is_empty() {
306             debug!("received an unexpected {} bytes", self.io.read_buf().len());
307             return Poll::Ready(Err(crate::Error::new_unexpected_message()));
308         }
309 
310         let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
311 
312         if num_read == 0 {
313             let ret = if self.should_error_on_eof() {
314                 trace!("found unexpected EOF on busy connection: {:?}", self.state);
315                 Poll::Ready(Err(crate::Error::new_incomplete()))
316             } else {
317                 trace!("found EOF on idle connection, closing");
318                 Poll::Ready(Ok(()))
319             };
320 
321             // order is important: should_error needs state BEFORE close_read
322             self.state.close_read();
323             return ret;
324         }
325 
326         debug!(
327             "received unexpected {} bytes on an idle connection",
328             num_read
329         );
330         Poll::Ready(Err(crate::Error::new_unexpected_message()))
331     }
332 
mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>333     fn mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
334         debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
335         debug_assert!(self.is_mid_message());
336 
337         if self.state.allow_half_close || !self.io.read_buf().is_empty() {
338             return Poll::Pending;
339         }
340 
341         let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
342 
343         if num_read == 0 {
344             trace!("found unexpected EOF on busy connection: {:?}", self.state);
345             self.state.close_read();
346             Poll::Ready(Err(crate::Error::new_incomplete()))
347         } else {
348             Poll::Ready(Ok(()))
349         }
350     }
351 
force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>>352     fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>> {
353         debug_assert!(!self.state.is_read_closed());
354 
355         let result = ready!(self.io.poll_read_from_io(cx));
356         Poll::Ready(result.map_err(|e| {
357             trace!("force_io_read; io error = {:?}", e);
358             self.state.close();
359             e
360         }))
361     }
362 
maybe_notify(&mut self, cx: &mut task::Context<'_>)363     fn maybe_notify(&mut self, cx: &mut task::Context<'_>) {
364         // its possible that we returned NotReady from poll() without having
365         // exhausted the underlying Io. We would have done this when we
366         // determined we couldn't keep reading until we knew how writing
367         // would finish.
368 
369         match self.state.reading {
370             Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => {
371                 return
372             }
373             Reading::Init => (),
374         };
375 
376         match self.state.writing {
377             Writing::Body(..) => return,
378             Writing::Init | Writing::KeepAlive | Writing::Closed => (),
379         }
380 
381         if !self.io.is_read_blocked() {
382             if self.io.read_buf().is_empty() {
383                 match self.io.poll_read_from_io(cx) {
384                     Poll::Ready(Ok(n)) => {
385                         if n == 0 {
386                             trace!("maybe_notify; read eof");
387                             if self.state.is_idle() {
388                                 self.state.close();
389                             } else {
390                                 self.close_read()
391                             }
392                             return;
393                         }
394                     }
395                     Poll::Pending => {
396                         trace!("maybe_notify; read_from_io blocked");
397                         return;
398                     }
399                     Poll::Ready(Err(e)) => {
400                         trace!("maybe_notify; read_from_io error: {}", e);
401                         self.state.close();
402                         self.state.error = Some(crate::Error::new_io(e));
403                     }
404                 }
405             }
406             self.state.notify_read = true;
407         }
408     }
409 
try_keep_alive(&mut self, cx: &mut task::Context<'_>)410     fn try_keep_alive(&mut self, cx: &mut task::Context<'_>) {
411         self.state.try_keep_alive::<T>();
412         self.maybe_notify(cx);
413     }
414 
can_write_head(&self) -> bool415     pub fn can_write_head(&self) -> bool {
416         if !T::should_read_first() {
417             if let Reading::Closed = self.state.reading {
418                 return false;
419             }
420         }
421         match self.state.writing {
422             Writing::Init => true,
423             _ => false,
424         }
425     }
426 
can_write_body(&self) -> bool427     pub fn can_write_body(&self) -> bool {
428         match self.state.writing {
429             Writing::Body(..) => true,
430             Writing::Init | Writing::KeepAlive | Writing::Closed => false,
431         }
432     }
433 
can_buffer_body(&self) -> bool434     pub fn can_buffer_body(&self) -> bool {
435         self.io.can_buffer()
436     }
437 
write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>)438     pub fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) {
439         if let Some(encoder) = self.encode_head(head, body) {
440             self.state.writing = if !encoder.is_eof() {
441                 Writing::Body(encoder)
442             } else if encoder.is_last() {
443                 Writing::Closed
444             } else {
445                 Writing::KeepAlive
446             };
447         }
448     }
449 
write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B)450     pub fn write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B) {
451         if let Some(encoder) =
452             self.encode_head(head, Some(BodyLength::Known(body.remaining() as u64)))
453         {
454             let is_last = encoder.is_last();
455             // Make sure we don't write a body if we weren't actually allowed
456             // to do so, like because its a HEAD request.
457             if !encoder.is_eof() {
458                 encoder.danger_full_buf(body, self.io.write_buf());
459             }
460             self.state.writing = if is_last {
461                 Writing::Closed
462             } else {
463                 Writing::KeepAlive
464             }
465         }
466     }
467 
encode_head( &mut self, mut head: MessageHead<T::Outgoing>, body: Option<BodyLength>, ) -> Option<Encoder>468     fn encode_head(
469         &mut self,
470         mut head: MessageHead<T::Outgoing>,
471         body: Option<BodyLength>,
472     ) -> Option<Encoder> {
473         debug_assert!(self.can_write_head());
474 
475         if !T::should_read_first() {
476             self.state.busy();
477         }
478 
479         self.enforce_version(&mut head);
480 
481         let buf = self.io.headers_buf();
482         match super::role::encode_headers::<T>(
483             Encode {
484                 head: &mut head,
485                 body,
486                 keep_alive: self.state.wants_keep_alive(),
487                 req_method: &mut self.state.method,
488                 title_case_headers: self.state.title_case_headers,
489             },
490             buf,
491         ) {
492             Ok(encoder) => {
493                 debug_assert!(self.state.cached_headers.is_none());
494                 debug_assert!(head.headers.is_empty());
495                 self.state.cached_headers = Some(head.headers);
496                 Some(encoder)
497             }
498             Err(err) => {
499                 self.state.error = Some(err);
500                 self.state.writing = Writing::Closed;
501                 None
502             }
503         }
504     }
505 
506     // Fix keep-alives when Connection: keep-alive header is not present
fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>)507     fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) {
508         let outgoing_is_keep_alive = head
509             .headers
510             .get(CONNECTION)
511             .map(connection_keep_alive)
512             .unwrap_or(false);
513 
514         if !outgoing_is_keep_alive {
515             match head.version {
516                 // If response is version 1.0 and keep-alive is not present in the response,
517                 // disable keep-alive so the server closes the connection
518                 Version::HTTP_10 => self.state.disable_keep_alive(),
519                 // If response is version 1.1 and keep-alive is wanted, add
520                 // Connection: keep-alive header when not present
521                 Version::HTTP_11 => {
522                     if self.state.wants_keep_alive() {
523                         head.headers
524                             .insert(CONNECTION, HeaderValue::from_static("keep-alive"));
525                     }
526                 }
527                 _ => (),
528             }
529         }
530     }
531 
532     // If we know the remote speaks an older version, we try to fix up any messages
533     // to work with our older peer.
enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>)534     fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) {
535         if let Version::HTTP_10 = self.state.version {
536             // Fixes response or connection when keep-alive header is not present
537             self.fix_keep_alive(head);
538             // If the remote only knows HTTP/1.0, we should force ourselves
539             // to do only speak HTTP/1.0 as well.
540             head.version = Version::HTTP_10;
541         }
542         // If the remote speaks HTTP/1.1, then it *should* be fine with
543         // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let
544         // the user's headers be.
545     }
546 
write_body(&mut self, chunk: B)547     pub fn write_body(&mut self, chunk: B) {
548         debug_assert!(self.can_write_body() && self.can_buffer_body());
549         // empty chunks should be discarded at Dispatcher level
550         debug_assert!(chunk.remaining() != 0);
551 
552         let state = match self.state.writing {
553             Writing::Body(ref mut encoder) => {
554                 self.io.buffer(encoder.encode(chunk));
555 
556                 if encoder.is_eof() {
557                     if encoder.is_last() {
558                         Writing::Closed
559                     } else {
560                         Writing::KeepAlive
561                     }
562                 } else {
563                     return;
564                 }
565             }
566             _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
567         };
568 
569         self.state.writing = state;
570     }
571 
write_body_and_end(&mut self, chunk: B)572     pub fn write_body_and_end(&mut self, chunk: B) {
573         debug_assert!(self.can_write_body() && self.can_buffer_body());
574         // empty chunks should be discarded at Dispatcher level
575         debug_assert!(chunk.remaining() != 0);
576 
577         let state = match self.state.writing {
578             Writing::Body(ref encoder) => {
579                 let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf());
580                 if can_keep_alive {
581                     Writing::KeepAlive
582                 } else {
583                     Writing::Closed
584                 }
585             }
586             _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
587         };
588 
589         self.state.writing = state;
590     }
591 
end_body(&mut self) -> Result<()>592     pub fn end_body(&mut self) -> Result<()> {
593         debug_assert!(self.can_write_body());
594 
595         let state = match self.state.writing {
596             Writing::Body(ref mut encoder) => {
597                 // end of stream, that means we should try to eof
598                 match encoder.end() {
599                     Ok(end) => {
600                         if let Some(end) = end {
601                             self.io.buffer(end);
602                         }
603                         if encoder.is_last() {
604                             Writing::Closed
605                         } else {
606                             Writing::KeepAlive
607                         }
608                     }
609                     Err(_not_eof) => {
610                         return Err(crate::Error::new_user_body(
611                             crate::Error::new_body_write_aborted(),
612                         ))
613                     }
614                 }
615             }
616             _ => return Ok(()),
617         };
618 
619         self.state.writing = state;
620         Ok(())
621     }
622 
623     // When we get a parse error, depending on what side we are, we might be able
624     // to write a response before closing the connection.
625     //
626     // - Client: there is nothing we can do
627     // - Server: if Response hasn't been written yet, we can send a 4xx response
on_parse_error(&mut self, err: crate::Error) -> crate::Result<()>628     fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> {
629         if let Writing::Init = self.state.writing {
630             if self.has_h2_prefix() {
631                 return Err(crate::Error::new_version_h2());
632             }
633             if let Some(msg) = T::on_error(&err) {
634                 // Drop the cached headers so as to not trigger a debug
635                 // assert in `write_head`...
636                 self.state.cached_headers.take();
637                 self.write_head(msg, None);
638                 self.state.error = Some(err);
639                 return Ok(());
640             }
641         }
642 
643         // fallback is pass the error back up
644         Err(err)
645     }
646 
poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>>647     pub fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
648         ready!(Pin::new(&mut self.io).poll_flush(cx))?;
649         self.try_keep_alive(cx);
650         trace!("flushed({}): {:?}", T::LOG, self.state);
651         Poll::Ready(Ok(()))
652     }
653 
poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>>654     pub fn poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
655         match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) {
656             Ok(()) => {
657                 trace!("shut down IO complete");
658                 Poll::Ready(Ok(()))
659             }
660             Err(e) => {
661                 debug!("error shutting down IO: {}", e);
662                 Poll::Ready(Err(e))
663             }
664         }
665     }
666 
667     /// If the read side can be cheaply drained, do so. Otherwise, close.
poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>)668     pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>) {
669         let _ = self.poll_read_body(cx);
670 
671         // If still in Reading::Body, just give up
672         match self.state.reading {
673             Reading::Init | Reading::KeepAlive => {
674                 trace!("body drained");
675                 return;
676             }
677             _ => self.close_read(),
678         }
679     }
680 
close_read(&mut self)681     pub fn close_read(&mut self) {
682         self.state.close_read();
683     }
684 
close_write(&mut self)685     pub fn close_write(&mut self) {
686         self.state.close_write();
687     }
688 
disable_keep_alive(&mut self)689     pub fn disable_keep_alive(&mut self) {
690         if self.state.is_idle() {
691             trace!("disable_keep_alive; closing idle connection");
692             self.state.close();
693         } else {
694             trace!("disable_keep_alive; in-progress connection");
695             self.state.disable_keep_alive();
696         }
697     }
698 
take_error(&mut self) -> crate::Result<()>699     pub fn take_error(&mut self) -> crate::Result<()> {
700         if let Some(err) = self.state.error.take() {
701             Err(err)
702         } else {
703             Ok(())
704         }
705     }
706 
on_upgrade(&mut self) -> crate::upgrade::OnUpgrade707     pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
708         trace!("{}: prepare possible HTTP upgrade", T::LOG);
709         self.state.prepare_upgrade()
710     }
711 }
712 
713 impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result714     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
715         f.debug_struct("Conn")
716             .field("state", &self.state)
717             .field("io", &self.io)
718             .finish()
719     }
720 }
721 
722 // B and T are never pinned
723 impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {}
724 
725 struct State {
726     allow_half_close: bool,
727     /// Re-usable HeaderMap to reduce allocating new ones.
728     cached_headers: Option<HeaderMap>,
729     /// If an error occurs when there wasn't a direct way to return it
730     /// back to the user, this is set.
731     error: Option<crate::Error>,
732     /// Current keep-alive status.
733     keep_alive: KA,
734     /// If mid-message, the HTTP Method that started it.
735     ///
736     /// This is used to know things such as if the message can include
737     /// a body or not.
738     method: Option<Method>,
739     title_case_headers: bool,
740     /// Set to true when the Dispatcher should poll read operations
741     /// again. See the `maybe_notify` method for more.
742     notify_read: bool,
743     /// State of allowed reads
744     reading: Reading,
745     /// State of allowed writes
746     writing: Writing,
747     /// An expected pending HTTP upgrade.
748     upgrade: Option<crate::upgrade::Pending>,
749     /// Either HTTP/1.0 or 1.1 connection
750     version: Version,
751 }
752 
753 #[derive(Debug)]
754 enum Reading {
755     Init,
756     Continue(Decoder),
757     Body(Decoder),
758     KeepAlive,
759     Closed,
760 }
761 
762 enum Writing {
763     Init,
764     Body(Encoder),
765     KeepAlive,
766     Closed,
767 }
768 
769 impl fmt::Debug for State {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result770     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
771         let mut builder = f.debug_struct("State");
772         builder
773             .field("reading", &self.reading)
774             .field("writing", &self.writing)
775             .field("keep_alive", &self.keep_alive);
776 
777         // Only show error field if it's interesting...
778         if let Some(ref error) = self.error {
779             builder.field("error", error);
780         }
781 
782         if self.allow_half_close {
783             builder.field("allow_half_close", &true);
784         }
785 
786         // Purposefully leaving off other fields..
787 
788         builder.finish()
789     }
790 }
791 
792 impl fmt::Debug for Writing {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result793     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
794         match *self {
795             Writing::Init => f.write_str("Init"),
796             Writing::Body(ref enc) => f.debug_tuple("Body").field(enc).finish(),
797             Writing::KeepAlive => f.write_str("KeepAlive"),
798             Writing::Closed => f.write_str("Closed"),
799         }
800     }
801 }
802 
803 impl std::ops::BitAndAssign<bool> for KA {
bitand_assign(&mut self, enabled: bool)804     fn bitand_assign(&mut self, enabled: bool) {
805         if !enabled {
806             trace!("remote disabling keep-alive");
807             *self = KA::Disabled;
808         }
809     }
810 }
811 
812 #[derive(Clone, Copy, Debug)]
813 enum KA {
814     Idle,
815     Busy,
816     Disabled,
817 }
818 
819 impl Default for KA {
default() -> KA820     fn default() -> KA {
821         KA::Busy
822     }
823 }
824 
825 impl KA {
idle(&mut self)826     fn idle(&mut self) {
827         *self = KA::Idle;
828     }
829 
busy(&mut self)830     fn busy(&mut self) {
831         *self = KA::Busy;
832     }
833 
disable(&mut self)834     fn disable(&mut self) {
835         *self = KA::Disabled;
836     }
837 
status(&self) -> KA838     fn status(&self) -> KA {
839         *self
840     }
841 }
842 
843 impl State {
close(&mut self)844     fn close(&mut self) {
845         trace!("State::close()");
846         self.reading = Reading::Closed;
847         self.writing = Writing::Closed;
848         self.keep_alive.disable();
849     }
850 
close_read(&mut self)851     fn close_read(&mut self) {
852         trace!("State::close_read()");
853         self.reading = Reading::Closed;
854         self.keep_alive.disable();
855     }
856 
close_write(&mut self)857     fn close_write(&mut self) {
858         trace!("State::close_write()");
859         self.writing = Writing::Closed;
860         self.keep_alive.disable();
861     }
862 
wants_keep_alive(&self) -> bool863     fn wants_keep_alive(&self) -> bool {
864         if let KA::Disabled = self.keep_alive.status() {
865             false
866         } else {
867             true
868         }
869     }
870 
try_keep_alive<T: Http1Transaction>(&mut self)871     fn try_keep_alive<T: Http1Transaction>(&mut self) {
872         match (&self.reading, &self.writing) {
873             (&Reading::KeepAlive, &Writing::KeepAlive) => {
874                 if let KA::Busy = self.keep_alive.status() {
875                     self.idle::<T>();
876                 } else {
877                     trace!(
878                         "try_keep_alive({}): could keep-alive, but status = {:?}",
879                         T::LOG,
880                         self.keep_alive
881                     );
882                     self.close();
883                 }
884             }
885             (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => {
886                 self.close()
887             }
888             _ => (),
889         }
890     }
891 
disable_keep_alive(&mut self)892     fn disable_keep_alive(&mut self) {
893         self.keep_alive.disable()
894     }
895 
busy(&mut self)896     fn busy(&mut self) {
897         if let KA::Disabled = self.keep_alive.status() {
898             return;
899         }
900         self.keep_alive.busy();
901     }
902 
idle<T: Http1Transaction>(&mut self)903     fn idle<T: Http1Transaction>(&mut self) {
904         debug_assert!(!self.is_idle(), "State::idle() called while idle");
905 
906         self.method = None;
907         self.keep_alive.idle();
908         if self.is_idle() {
909             self.reading = Reading::Init;
910             self.writing = Writing::Init;
911 
912             // !T::should_read_first() means Client.
913             //
914             // If Client connection has just gone idle, the Dispatcher
915             // should try the poll loop one more time, so as to poll the
916             // pending requests stream.
917             if !T::should_read_first() {
918                 self.notify_read = true;
919             }
920         } else {
921             self.close();
922         }
923     }
924 
is_idle(&self) -> bool925     fn is_idle(&self) -> bool {
926         if let KA::Idle = self.keep_alive.status() {
927             true
928         } else {
929             false
930         }
931     }
932 
is_read_closed(&self) -> bool933     fn is_read_closed(&self) -> bool {
934         match self.reading {
935             Reading::Closed => true,
936             _ => false,
937         }
938     }
939 
is_write_closed(&self) -> bool940     fn is_write_closed(&self) -> bool {
941         match self.writing {
942             Writing::Closed => true,
943             _ => false,
944         }
945     }
946 
prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade947     fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
948         let (tx, rx) = crate::upgrade::pending();
949         self.upgrade = Some(tx);
950         rx
951     }
952 }
953 
954 #[cfg(test)]
955 mod tests {
956     #[cfg(feature = "nightly")]
957     #[bench]
bench_read_head_short(b: &mut ::test::Bencher)958     fn bench_read_head_short(b: &mut ::test::Bencher) {
959         use super::*;
960         let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n";
961         let len = s.len();
962         b.bytes = len as u64;
963 
964         // an empty IO, we'll be skipping and using the read buffer anyways
965         let io = tokio_test::io::Builder::new().build();
966         let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io);
967         *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]);
968         conn.state.cached_headers = Some(HeaderMap::with_capacity(2));
969 
970         let mut rt = tokio::runtime::Builder::new()
971             .enable_all()
972             .basic_scheduler()
973             .build()
974             .unwrap();
975 
976         b.iter(|| {
977             rt.block_on(futures_util::future::poll_fn(|cx| {
978                 match conn.poll_read_head(cx) {
979                     Poll::Ready(Some(Ok(x))) => {
980                         ::test::black_box(&x);
981                         let mut headers = x.0.headers;
982                         headers.clear();
983                         conn.state.cached_headers = Some(headers);
984                     }
985                     f => panic!("expected Ready(Some(Ok(..))): {:?}", f),
986                 }
987 
988                 conn.io.read_buf_mut().reserve(1);
989                 unsafe {
990                     conn.io.read_buf_mut().set_len(len);
991                 }
992                 conn.state.reading = Reading::Init;
993                 Poll::Ready(())
994             }));
995         });
996     }
997 
998     /*
999     //TODO: rewrite these using dispatch... someday...
1000     use futures::{Async, Future, Stream, Sink};
1001     use futures::future;
1002 
1003     use proto::{self, ClientTransaction, MessageHead, ServerTransaction};
1004     use super::super::Encoder;
1005     use mock::AsyncIo;
1006 
1007     use super::{Conn, Decoder, Reading, Writing};
1008     use ::uri::Uri;
1009 
1010     use std::str::FromStr;
1011 
1012     #[test]
1013     fn test_conn_init_read() {
1014         let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec();
1015         let len = good_message.len();
1016         let io = AsyncIo::new_buf(good_message, len);
1017         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1018 
1019         match conn.poll().unwrap() {
1020             Async::Ready(Some(Frame::Message { message, body: false })) => {
1021                 assert_eq!(message, MessageHead {
1022                     subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()),
1023                     .. MessageHead::default()
1024                 })
1025             },
1026             f => panic!("frame is not Frame::Message: {:?}", f)
1027         }
1028     }
1029 
1030     #[test]
1031     fn test_conn_parse_partial() {
1032         let _: Result<(), ()> = future::lazy(|| {
1033             let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec();
1034             let io = AsyncIo::new_buf(good_message, 10);
1035             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1036             assert!(conn.poll().unwrap().is_not_ready());
1037             conn.io.io_mut().block_in(50);
1038             let async = conn.poll().unwrap();
1039             assert!(async.is_ready());
1040             match async {
1041                 Async::Ready(Some(Frame::Message { .. })) => (),
1042                 f => panic!("frame is not Message: {:?}", f),
1043             }
1044             Ok(())
1045         }).wait();
1046     }
1047 
1048     #[test]
1049     fn test_conn_init_read_eof_idle() {
1050         let io = AsyncIo::new_buf(vec![], 1);
1051         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1052         conn.state.idle();
1053 
1054         match conn.poll().unwrap() {
1055             Async::Ready(None) => {},
1056             other => panic!("frame is not None: {:?}", other)
1057         }
1058     }
1059 
1060     #[test]
1061     fn test_conn_init_read_eof_idle_partial_parse() {
1062         let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100);
1063         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1064         conn.state.idle();
1065 
1066         match conn.poll() {
1067             Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1068             other => panic!("unexpected frame: {:?}", other)
1069         }
1070     }
1071 
1072     #[test]
1073     fn test_conn_init_read_eof_busy() {
1074         let _: Result<(), ()> = future::lazy(|| {
1075             // server ignores
1076             let io = AsyncIo::new_eof();
1077             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1078             conn.state.busy();
1079 
1080             match conn.poll().unwrap() {
1081                 Async::Ready(None) => {},
1082                 other => panic!("unexpected frame: {:?}", other)
1083             }
1084 
1085             // client
1086             let io = AsyncIo::new_eof();
1087             let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1088             conn.state.busy();
1089 
1090             match conn.poll() {
1091                 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1092                 other => panic!("unexpected frame: {:?}", other)
1093             }
1094             Ok(())
1095         }).wait();
1096     }
1097 
1098     #[test]
1099     fn test_conn_body_finish_read_eof() {
1100         let _: Result<(), ()> = future::lazy(|| {
1101             let io = AsyncIo::new_eof();
1102             let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1103             conn.state.busy();
1104             conn.state.writing = Writing::KeepAlive;
1105             conn.state.reading = Reading::Body(Decoder::length(0));
1106 
1107             match conn.poll() {
1108                 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1109                 other => panic!("unexpected frame: {:?}", other)
1110             }
1111 
1112             // conn eofs, but tokio-proto will call poll() again, before calling flush()
1113             // the conn eof in this case is perfectly fine
1114 
1115             match conn.poll() {
1116                 Ok(Async::Ready(None)) => (),
1117                 other => panic!("unexpected frame: {:?}", other)
1118             }
1119             Ok(())
1120         }).wait();
1121     }
1122 
1123     #[test]
1124     fn test_conn_message_empty_body_read_eof() {
1125         let _: Result<(), ()> = future::lazy(|| {
1126             let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024);
1127             let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1128             conn.state.busy();
1129             conn.state.writing = Writing::KeepAlive;
1130 
1131             match conn.poll() {
1132                 Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (),
1133                 other => panic!("unexpected frame: {:?}", other)
1134             }
1135 
1136             // conn eofs, but tokio-proto will call poll() again, before calling flush()
1137             // the conn eof in this case is perfectly fine
1138 
1139             match conn.poll() {
1140                 Ok(Async::Ready(None)) => (),
1141                 other => panic!("unexpected frame: {:?}", other)
1142             }
1143             Ok(())
1144         }).wait();
1145     }
1146 
1147     #[test]
1148     fn test_conn_read_body_end() {
1149         let _: Result<(), ()> = future::lazy(|| {
1150             let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024);
1151             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1152             conn.state.busy();
1153 
1154             match conn.poll() {
1155                 Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (),
1156                 other => panic!("unexpected frame: {:?}", other)
1157             }
1158 
1159             match conn.poll() {
1160                 Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (),
1161                 other => panic!("unexpected frame: {:?}", other)
1162             }
1163 
1164             // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None`
1165             match conn.poll() {
1166                 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1167                 other => panic!("unexpected frame: {:?}", other)
1168             }
1169 
1170             match conn.poll() {
1171                 Ok(Async::NotReady) => (),
1172                 other => panic!("unexpected frame: {:?}", other)
1173             }
1174             Ok(())
1175         }).wait();
1176     }
1177 
1178     #[test]
1179     fn test_conn_closed_read() {
1180         let io = AsyncIo::new_buf(vec![], 0);
1181         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1182         conn.state.close();
1183 
1184         match conn.poll().unwrap() {
1185             Async::Ready(None) => {},
1186             other => panic!("frame is not None: {:?}", other)
1187         }
1188     }
1189 
1190     #[test]
1191     fn test_conn_body_write_length() {
1192         let _ = pretty_env_logger::try_init();
1193         let _: Result<(), ()> = future::lazy(|| {
1194             let io = AsyncIo::new_buf(vec![], 0);
1195             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1196             let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096;
1197             conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64));
1198 
1199             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready());
1200             assert!(!conn.can_buffer_body());
1201 
1202             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready());
1203 
1204             conn.io.io_mut().block_in(1024 * 3);
1205             assert!(conn.poll_complete().unwrap().is_not_ready());
1206             conn.io.io_mut().block_in(1024 * 3);
1207             assert!(conn.poll_complete().unwrap().is_not_ready());
1208             conn.io.io_mut().block_in(max * 2);
1209             assert!(conn.poll_complete().unwrap().is_ready());
1210 
1211             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready());
1212             Ok(())
1213         }).wait();
1214     }
1215 
1216     #[test]
1217     fn test_conn_body_write_chunked() {
1218         let _: Result<(), ()> = future::lazy(|| {
1219             let io = AsyncIo::new_buf(vec![], 4096);
1220             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1221             conn.state.writing = Writing::Body(Encoder::chunked());
1222 
1223             assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready());
1224             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready());
1225             Ok(())
1226         }).wait();
1227     }
1228 
1229     #[test]
1230     fn test_conn_body_flush() {
1231         let _: Result<(), ()> = future::lazy(|| {
1232             let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5);
1233             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1234             conn.state.writing = Writing::Body(Encoder::length(1024 * 1024));
1235             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready());
1236             assert!(!conn.can_buffer_body());
1237             conn.io.io_mut().block_in(1024 * 1024 * 5);
1238             assert!(conn.poll_complete().unwrap().is_ready());
1239             assert!(conn.can_buffer_body());
1240             assert!(conn.io.io_mut().flushed());
1241 
1242             Ok(())
1243         }).wait();
1244     }
1245 
1246     #[test]
1247     fn test_conn_parking() {
1248         use std::sync::Arc;
1249         use futures::executor::Notify;
1250         use futures::executor::NotifyHandle;
1251 
1252         struct Car {
1253             permit: bool,
1254         }
1255         impl Notify for Car {
1256             fn notify(&self, _id: usize) {
1257                 assert!(self.permit, "unparked without permit");
1258             }
1259         }
1260 
1261         fn car(permit: bool) -> NotifyHandle {
1262             Arc::new(Car {
1263                 permit: permit,
1264             }).into()
1265         }
1266 
1267         // test that once writing is done, unparks
1268         let f = future::lazy(|| {
1269             let io = AsyncIo::new_buf(vec![], 4096);
1270             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1271             conn.state.reading = Reading::KeepAlive;
1272             assert!(conn.poll().unwrap().is_not_ready());
1273 
1274             conn.state.writing = Writing::KeepAlive;
1275             assert!(conn.poll_complete().unwrap().is_ready());
1276             Ok::<(), ()>(())
1277         });
1278         ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap();
1279 
1280 
1281         // test that flushing when not waiting on read doesn't unpark
1282         let f = future::lazy(|| {
1283             let io = AsyncIo::new_buf(vec![], 4096);
1284             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1285             conn.state.writing = Writing::KeepAlive;
1286             assert!(conn.poll_complete().unwrap().is_ready());
1287             Ok::<(), ()>(())
1288         });
1289         ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1290 
1291 
1292         // test that flushing and writing isn't done doesn't unpark
1293         let f = future::lazy(|| {
1294             let io = AsyncIo::new_buf(vec![], 4096);
1295             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1296             conn.state.reading = Reading::KeepAlive;
1297             assert!(conn.poll().unwrap().is_not_ready());
1298             conn.state.writing = Writing::Body(Encoder::length(5_000));
1299             assert!(conn.poll_complete().unwrap().is_ready());
1300             Ok::<(), ()>(())
1301         });
1302         ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1303     }
1304 
1305     #[test]
1306     fn test_conn_closed_write() {
1307         let io = AsyncIo::new_buf(vec![], 0);
1308         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1309         conn.state.close();
1310 
1311         match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) {
1312             Err(_e) => {},
1313             other => panic!("did not return Err: {:?}", other)
1314         }
1315 
1316         assert!(conn.state.is_write_closed());
1317     }
1318 
1319     #[test]
1320     fn test_conn_write_empty_chunk() {
1321         let io = AsyncIo::new_buf(vec![], 0);
1322         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1323         conn.state.writing = Writing::KeepAlive;
1324 
1325         assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready());
1326         assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready());
1327         conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err();
1328     }
1329     */
1330 }
1331