1 use std::fmt;
2 use std::io::{self};
3 use std::marker::PhantomData;
4 
5 use bytes::{Buf, Bytes};
6 use http::header::{HeaderValue, CONNECTION};
7 use http::{HeaderMap, Method, Version};
8 use tokio::io::{AsyncRead, AsyncWrite};
9 
10 use super::io::Buffered;
11 use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants};
12 use crate::common::{task, Pin, Poll, Unpin};
13 use crate::headers::connection_keep_alive;
14 use crate::proto::{BodyLength, DecodedLength, MessageHead};
15 
16 const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
17 
18 /// This handles a connection, which will have been established over an
19 /// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple
20 /// `Transaction`s over HTTP.
21 ///
22 /// The connection will determine when a message begins and ends as well as
23 /// determine if this connection can be kept alive after the message,
24 /// or if it is complete.
25 pub(crate) struct Conn<I, B, T> {
26     io: Buffered<I, EncodedBuf<B>>,
27     state: State,
28     _marker: PhantomData<fn(T)>,
29 }
30 
31 impl<I, B, T> Conn<I, B, T>
32 where
33     I: AsyncRead + AsyncWrite + Unpin,
34     B: Buf,
35     T: Http1Transaction,
36 {
new(io: I) -> Conn<I, B, T>37     pub fn new(io: I) -> Conn<I, B, T> {
38         Conn {
39             io: Buffered::new(io),
40             state: State {
41                 allow_half_close: false,
42                 cached_headers: None,
43                 error: None,
44                 keep_alive: KA::Busy,
45                 method: None,
46                 title_case_headers: false,
47                 notify_read: false,
48                 reading: Reading::Init,
49                 writing: Writing::Init,
50                 upgrade: None,
51                 // We assume a modern world where the remote speaks HTTP/1.1.
52                 // If they tell us otherwise, we'll downgrade in `read_head`.
53                 version: Version::HTTP_11,
54             },
55             _marker: PhantomData,
56         }
57     }
58 
set_flush_pipeline(&mut self, enabled: bool)59     pub fn set_flush_pipeline(&mut self, enabled: bool) {
60         self.io.set_flush_pipeline(enabled);
61     }
62 
set_max_buf_size(&mut self, max: usize)63     pub fn set_max_buf_size(&mut self, max: usize) {
64         self.io.set_max_buf_size(max);
65     }
66 
set_read_buf_exact_size(&mut self, sz: usize)67     pub fn set_read_buf_exact_size(&mut self, sz: usize) {
68         self.io.set_read_buf_exact_size(sz);
69     }
70 
set_write_strategy_flatten(&mut self)71     pub fn set_write_strategy_flatten(&mut self) {
72         self.io.set_write_strategy_flatten();
73     }
74 
set_title_case_headers(&mut self)75     pub fn set_title_case_headers(&mut self) {
76         self.state.title_case_headers = true;
77     }
78 
set_allow_half_close(&mut self)79     pub(crate) fn set_allow_half_close(&mut self) {
80         self.state.allow_half_close = true;
81     }
82 
into_inner(self) -> (I, Bytes)83     pub fn into_inner(self) -> (I, Bytes) {
84         self.io.into_inner()
85     }
86 
pending_upgrade(&mut self) -> Option<crate::upgrade::Pending>87     pub fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> {
88         self.state.upgrade.take()
89     }
90 
is_read_closed(&self) -> bool91     pub fn is_read_closed(&self) -> bool {
92         self.state.is_read_closed()
93     }
94 
is_write_closed(&self) -> bool95     pub fn is_write_closed(&self) -> bool {
96         self.state.is_write_closed()
97     }
98 
can_read_head(&self) -> bool99     pub fn can_read_head(&self) -> bool {
100         match self.state.reading {
101             Reading::Init => {
102                 if T::should_read_first() {
103                     true
104                 } else {
105                     match self.state.writing {
106                         Writing::Init => false,
107                         _ => true,
108                     }
109                 }
110             }
111             _ => false,
112         }
113     }
114 
can_read_body(&self) -> bool115     pub fn can_read_body(&self) -> bool {
116         match self.state.reading {
117             Reading::Body(..) | Reading::Continue(..) => true,
118             _ => false,
119         }
120     }
121 
should_error_on_eof(&self) -> bool122     fn should_error_on_eof(&self) -> bool {
123         // If we're idle, it's probably just the connection closing gracefully.
124         T::should_error_on_parse_eof() && !self.state.is_idle()
125     }
126 
has_h2_prefix(&self) -> bool127     fn has_h2_prefix(&self) -> bool {
128         let read_buf = self.io.read_buf();
129         read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
130     }
131 
poll_read_head( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>>132     pub(super) fn poll_read_head(
133         &mut self,
134         cx: &mut task::Context<'_>,
135     ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> {
136         debug_assert!(self.can_read_head());
137         trace!("Conn::read_head");
138 
139         let msg = match ready!(self.io.parse::<T>(
140             cx,
141             ParseContext {
142                 cached_headers: &mut self.state.cached_headers,
143                 req_method: &mut self.state.method,
144             }
145         )) {
146             Ok(msg) => msg,
147             Err(e) => return self.on_read_head_error(e),
148         };
149 
150         // Note: don't deconstruct `msg` into local variables, it appears
151         // the optimizer doesn't remove the extra copies.
152 
153         debug!("incoming body is {}", msg.decode);
154 
155         self.state.busy();
156         self.state.keep_alive &= msg.keep_alive;
157         self.state.version = msg.head.version;
158 
159         let mut wants = if msg.wants_upgrade {
160             Wants::UPGRADE
161         } else {
162             Wants::EMPTY
163         };
164 
165         if msg.decode == DecodedLength::ZERO {
166             if msg.expect_continue {
167                 debug!("ignoring expect-continue since body is empty");
168             }
169             self.state.reading = Reading::KeepAlive;
170             if !T::should_read_first() {
171                 self.try_keep_alive(cx);
172             }
173         } else if msg.expect_continue {
174             self.state.reading = Reading::Continue(Decoder::new(msg.decode));
175             wants = wants.add(Wants::EXPECT);
176         } else {
177             self.state.reading = Reading::Body(Decoder::new(msg.decode));
178         }
179 
180         Poll::Ready(Some(Ok((msg.head, msg.decode, wants))))
181     }
182 
on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>>183     fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> {
184         // If we are currently waiting on a message, then an empty
185         // message should be reported as an error. If not, it is just
186         // the connection closing gracefully.
187         let must_error = self.should_error_on_eof();
188         self.close_read();
189         self.io.consume_leading_lines();
190         let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
191         if was_mid_parse || must_error {
192             // We check if the buf contains the h2 Preface
193             debug!(
194                 "parse error ({}) with {} bytes",
195                 e,
196                 self.io.read_buf().len()
197             );
198             match self.on_parse_error(e) {
199                 Ok(()) => Poll::Pending, // XXX: wat?
200                 Err(e) => Poll::Ready(Some(Err(e))),
201             }
202         } else {
203             debug!("read eof");
204             self.close_write();
205             Poll::Ready(None)
206         }
207     }
208 
poll_read_body( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<io::Result<Bytes>>>209     pub fn poll_read_body(
210         &mut self,
211         cx: &mut task::Context<'_>,
212     ) -> Poll<Option<io::Result<Bytes>>> {
213         debug_assert!(self.can_read_body());
214 
215         let (reading, ret) = match self.state.reading {
216             Reading::Body(ref mut decoder) => {
217                 match decoder.decode(cx, &mut self.io) {
218                     Poll::Ready(Ok(slice)) => {
219                         let (reading, chunk) = if decoder.is_eof() {
220                             debug!("incoming body completed");
221                             (
222                                 Reading::KeepAlive,
223                                 if !slice.is_empty() {
224                                     Some(Ok(slice))
225                                 } else {
226                                     None
227                                 },
228                             )
229                         } else if slice.is_empty() {
230                             error!("incoming body unexpectedly ended");
231                             // This should be unreachable, since all 3 decoders
232                             // either set eof=true or return an Err when reading
233                             // an empty slice...
234                             (Reading::Closed, None)
235                         } else {
236                             return Poll::Ready(Some(Ok(slice)));
237                         };
238                         (reading, Poll::Ready(chunk))
239                     }
240                     Poll::Pending => return Poll::Pending,
241                     Poll::Ready(Err(e)) => {
242                         debug!("incoming body decode error: {}", e);
243                         (Reading::Closed, Poll::Ready(Some(Err(e))))
244                     }
245                 }
246             }
247             Reading::Continue(ref decoder) => {
248                 // Write the 100 Continue if not already responded...
249                 if let Writing::Init = self.state.writing {
250                     trace!("automatically sending 100 Continue");
251                     let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
252                     self.io.headers_buf().extend_from_slice(cont);
253                 }
254 
255                 // And now recurse once in the Reading::Body state...
256                 self.state.reading = Reading::Body(decoder.clone());
257                 return self.poll_read_body(cx);
258             }
259             _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading),
260         };
261 
262         self.state.reading = reading;
263         self.try_keep_alive(cx);
264         ret
265     }
266 
wants_read_again(&mut self) -> bool267     pub fn wants_read_again(&mut self) -> bool {
268         let ret = self.state.notify_read;
269         self.state.notify_read = false;
270         ret
271     }
272 
poll_read_keep_alive(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>273     pub fn poll_read_keep_alive(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
274         debug_assert!(!self.can_read_head() && !self.can_read_body());
275 
276         if self.is_read_closed() {
277             Poll::Pending
278         } else if self.is_mid_message() {
279             self.mid_message_detect_eof(cx)
280         } else {
281             self.require_empty_read(cx)
282         }
283     }
284 
is_mid_message(&self) -> bool285     fn is_mid_message(&self) -> bool {
286         match (&self.state.reading, &self.state.writing) {
287             (&Reading::Init, &Writing::Init) => false,
288             _ => true,
289         }
290     }
291 
292     // This will check to make sure the io object read is empty.
293     //
294     // This should only be called for Clients wanting to enter the idle
295     // state.
require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>296     fn require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
297         debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
298         debug_assert!(!self.is_mid_message());
299         debug_assert!(T::is_client());
300 
301         if !self.io.read_buf().is_empty() {
302             debug!("received an unexpected {} bytes", self.io.read_buf().len());
303             return Poll::Ready(Err(crate::Error::new_unexpected_message()));
304         }
305 
306         let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
307 
308         if num_read == 0 {
309             let ret = if self.should_error_on_eof() {
310                 trace!("found unexpected EOF on busy connection: {:?}", self.state);
311                 Poll::Ready(Err(crate::Error::new_incomplete()))
312             } else {
313                 trace!("found EOF on idle connection, closing");
314                 Poll::Ready(Ok(()))
315             };
316 
317             // order is important: should_error needs state BEFORE close_read
318             self.state.close_read();
319             return ret;
320         }
321 
322         debug!(
323             "received unexpected {} bytes on an idle connection",
324             num_read
325         );
326         Poll::Ready(Err(crate::Error::new_unexpected_message()))
327     }
328 
mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>329     fn mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
330         debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
331         debug_assert!(self.is_mid_message());
332 
333         if self.state.allow_half_close || !self.io.read_buf().is_empty() {
334             return Poll::Pending;
335         }
336 
337         let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
338 
339         if num_read == 0 {
340             trace!("found unexpected EOF on busy connection: {:?}", self.state);
341             self.state.close_read();
342             Poll::Ready(Err(crate::Error::new_incomplete()))
343         } else {
344             Poll::Ready(Ok(()))
345         }
346     }
347 
force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>>348     fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>> {
349         debug_assert!(!self.state.is_read_closed());
350 
351         let result = ready!(self.io.poll_read_from_io(cx));
352         Poll::Ready(result.map_err(|e| {
353             trace!("force_io_read; io error = {:?}", e);
354             self.state.close();
355             e
356         }))
357     }
358 
maybe_notify(&mut self, cx: &mut task::Context<'_>)359     fn maybe_notify(&mut self, cx: &mut task::Context<'_>) {
360         // its possible that we returned NotReady from poll() without having
361         // exhausted the underlying Io. We would have done this when we
362         // determined we couldn't keep reading until we knew how writing
363         // would finish.
364 
365         match self.state.reading {
366             Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => {
367                 return
368             }
369             Reading::Init => (),
370         };
371 
372         match self.state.writing {
373             Writing::Body(..) => return,
374             Writing::Init | Writing::KeepAlive | Writing::Closed => (),
375         }
376 
377         if !self.io.is_read_blocked() {
378             if self.io.read_buf().is_empty() {
379                 match self.io.poll_read_from_io(cx) {
380                     Poll::Ready(Ok(n)) => {
381                         if n == 0 {
382                             trace!("maybe_notify; read eof");
383                             if self.state.is_idle() {
384                                 self.state.close();
385                             } else {
386                                 self.close_read()
387                             }
388                             return;
389                         }
390                     }
391                     Poll::Pending => {
392                         trace!("maybe_notify; read_from_io blocked");
393                         return;
394                     }
395                     Poll::Ready(Err(e)) => {
396                         trace!("maybe_notify; read_from_io error: {}", e);
397                         self.state.close();
398                         self.state.error = Some(crate::Error::new_io(e));
399                     }
400                 }
401             }
402             self.state.notify_read = true;
403         }
404     }
405 
try_keep_alive(&mut self, cx: &mut task::Context<'_>)406     fn try_keep_alive(&mut self, cx: &mut task::Context<'_>) {
407         self.state.try_keep_alive::<T>();
408         self.maybe_notify(cx);
409     }
410 
can_write_head(&self) -> bool411     pub fn can_write_head(&self) -> bool {
412         if !T::should_read_first() {
413             if let Reading::Closed = self.state.reading {
414                 return false;
415             }
416         }
417         match self.state.writing {
418             Writing::Init => true,
419             _ => false,
420         }
421     }
422 
can_write_body(&self) -> bool423     pub fn can_write_body(&self) -> bool {
424         match self.state.writing {
425             Writing::Body(..) => true,
426             Writing::Init | Writing::KeepAlive | Writing::Closed => false,
427         }
428     }
429 
can_buffer_body(&self) -> bool430     pub fn can_buffer_body(&self) -> bool {
431         self.io.can_buffer()
432     }
433 
write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>)434     pub fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) {
435         if let Some(encoder) = self.encode_head(head, body) {
436             self.state.writing = if !encoder.is_eof() {
437                 Writing::Body(encoder)
438             } else if encoder.is_last() {
439                 Writing::Closed
440             } else {
441                 Writing::KeepAlive
442             };
443         }
444     }
445 
write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B)446     pub fn write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B) {
447         if let Some(encoder) =
448             self.encode_head(head, Some(BodyLength::Known(body.remaining() as u64)))
449         {
450             let is_last = encoder.is_last();
451             // Make sure we don't write a body if we weren't actually allowed
452             // to do so, like because its a HEAD request.
453             if !encoder.is_eof() {
454                 encoder.danger_full_buf(body, self.io.write_buf());
455             }
456             self.state.writing = if is_last {
457                 Writing::Closed
458             } else {
459                 Writing::KeepAlive
460             }
461         }
462     }
463 
encode_head( &mut self, mut head: MessageHead<T::Outgoing>, body: Option<BodyLength>, ) -> Option<Encoder>464     fn encode_head(
465         &mut self,
466         mut head: MessageHead<T::Outgoing>,
467         body: Option<BodyLength>,
468     ) -> Option<Encoder> {
469         debug_assert!(self.can_write_head());
470 
471         if !T::should_read_first() {
472             self.state.busy();
473         }
474 
475         self.enforce_version(&mut head);
476 
477         let buf = self.io.headers_buf();
478         match T::encode(
479             Encode {
480                 head: &mut head,
481                 body,
482                 keep_alive: self.state.wants_keep_alive(),
483                 req_method: &mut self.state.method,
484                 title_case_headers: self.state.title_case_headers,
485             },
486             buf,
487         ) {
488             Ok(encoder) => {
489                 debug_assert!(self.state.cached_headers.is_none());
490                 debug_assert!(head.headers.is_empty());
491                 self.state.cached_headers = Some(head.headers);
492                 Some(encoder)
493             }
494             Err(err) => {
495                 self.state.error = Some(err);
496                 self.state.writing = Writing::Closed;
497                 None
498             }
499         }
500     }
501 
502     // Fix keep-alives when Connection: keep-alive header is not present
fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>)503     fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) {
504         let outgoing_is_keep_alive = head
505             .headers
506             .get(CONNECTION)
507             .map(connection_keep_alive)
508             .unwrap_or(false);
509 
510         if !outgoing_is_keep_alive {
511             match head.version {
512                 // If response is version 1.0 and keep-alive is not present in the response,
513                 // disable keep-alive so the server closes the connection
514                 Version::HTTP_10 => self.state.disable_keep_alive(),
515                 // If response is version 1.1 and keep-alive is wanted, add
516                 // Connection: keep-alive header when not present
517                 Version::HTTP_11 => {
518                     if self.state.wants_keep_alive() {
519                         head.headers
520                             .insert(CONNECTION, HeaderValue::from_static("keep-alive"));
521                     }
522                 }
523                 _ => (),
524             }
525         }
526     }
527 
528     // If we know the remote speaks an older version, we try to fix up any messages
529     // to work with our older peer.
enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>)530     fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) {
531         if let Version::HTTP_10 = self.state.version {
532             // Fixes response or connection when keep-alive header is not present
533             self.fix_keep_alive(head);
534             // If the remote only knows HTTP/1.0, we should force ourselves
535             // to do only speak HTTP/1.0 as well.
536             head.version = Version::HTTP_10;
537         }
538         // If the remote speaks HTTP/1.1, then it *should* be fine with
539         // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let
540         // the user's headers be.
541     }
542 
write_body(&mut self, chunk: B)543     pub fn write_body(&mut self, chunk: B) {
544         debug_assert!(self.can_write_body() && self.can_buffer_body());
545         // empty chunks should be discarded at Dispatcher level
546         debug_assert!(chunk.remaining() != 0);
547 
548         let state = match self.state.writing {
549             Writing::Body(ref mut encoder) => {
550                 self.io.buffer(encoder.encode(chunk));
551 
552                 if encoder.is_eof() {
553                     if encoder.is_last() {
554                         Writing::Closed
555                     } else {
556                         Writing::KeepAlive
557                     }
558                 } else {
559                     return;
560                 }
561             }
562             _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
563         };
564 
565         self.state.writing = state;
566     }
567 
write_body_and_end(&mut self, chunk: B)568     pub fn write_body_and_end(&mut self, chunk: B) {
569         debug_assert!(self.can_write_body() && self.can_buffer_body());
570         // empty chunks should be discarded at Dispatcher level
571         debug_assert!(chunk.remaining() != 0);
572 
573         let state = match self.state.writing {
574             Writing::Body(ref encoder) => {
575                 let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf());
576                 if can_keep_alive {
577                     Writing::KeepAlive
578                 } else {
579                     Writing::Closed
580                 }
581             }
582             _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
583         };
584 
585         self.state.writing = state;
586     }
587 
end_body(&mut self)588     pub fn end_body(&mut self) {
589         debug_assert!(self.can_write_body());
590 
591         let state = match self.state.writing {
592             Writing::Body(ref mut encoder) => {
593                 // end of stream, that means we should try to eof
594                 match encoder.end() {
595                     Ok(end) => {
596                         if let Some(end) = end {
597                             self.io.buffer(end);
598                         }
599                         if encoder.is_last() {
600                             Writing::Closed
601                         } else {
602                             Writing::KeepAlive
603                         }
604                     }
605                     Err(_not_eof) => Writing::Closed,
606                 }
607             }
608             _ => return,
609         };
610 
611         self.state.writing = state;
612     }
613 
614     // When we get a parse error, depending on what side we are, we might be able
615     // to write a response before closing the connection.
616     //
617     // - Client: there is nothing we can do
618     // - Server: if Response hasn't been written yet, we can send a 4xx response
on_parse_error(&mut self, err: crate::Error) -> crate::Result<()>619     fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> {
620         if let Writing::Init = self.state.writing {
621             if self.has_h2_prefix() {
622                 return Err(crate::Error::new_version_h2());
623             }
624             if let Some(msg) = T::on_error(&err) {
625                 // Drop the cached headers so as to not trigger a debug
626                 // assert in `write_head`...
627                 self.state.cached_headers.take();
628                 self.write_head(msg, None);
629                 self.state.error = Some(err);
630                 return Ok(());
631             }
632         }
633 
634         // fallback is pass the error back up
635         Err(err)
636     }
637 
poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>>638     pub fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
639         ready!(Pin::new(&mut self.io).poll_flush(cx))?;
640         self.try_keep_alive(cx);
641         trace!("flushed({}): {:?}", T::LOG, self.state);
642         Poll::Ready(Ok(()))
643     }
644 
poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>>645     pub fn poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
646         match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) {
647             Ok(()) => {
648                 trace!("shut down IO complete");
649                 Poll::Ready(Ok(()))
650             }
651             Err(e) => {
652                 debug!("error shutting down IO: {}", e);
653                 Poll::Ready(Err(e))
654             }
655         }
656     }
657 
658     /// If the read side can be cheaply drained, do so. Otherwise, close.
poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>)659     pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>) {
660         let _ = self.poll_read_body(cx);
661 
662         // If still in Reading::Body, just give up
663         match self.state.reading {
664             Reading::Init | Reading::KeepAlive => {
665                 trace!("body drained");
666                 return;
667             }
668             _ => self.close_read(),
669         }
670     }
671 
close_read(&mut self)672     pub fn close_read(&mut self) {
673         self.state.close_read();
674     }
675 
close_write(&mut self)676     pub fn close_write(&mut self) {
677         self.state.close_write();
678     }
679 
disable_keep_alive(&mut self)680     pub fn disable_keep_alive(&mut self) {
681         if self.state.is_idle() {
682             trace!("disable_keep_alive; closing idle connection");
683             self.state.close();
684         } else {
685             trace!("disable_keep_alive; in-progress connection");
686             self.state.disable_keep_alive();
687         }
688     }
689 
take_error(&mut self) -> crate::Result<()>690     pub fn take_error(&mut self) -> crate::Result<()> {
691         if let Some(err) = self.state.error.take() {
692             Err(err)
693         } else {
694             Ok(())
695         }
696     }
697 
on_upgrade(&mut self) -> crate::upgrade::OnUpgrade698     pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
699         trace!("{}: prepare possible HTTP upgrade", T::LOG);
700         self.state.prepare_upgrade()
701     }
702 }
703 
704 impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result705     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
706         f.debug_struct("Conn")
707             .field("state", &self.state)
708             .field("io", &self.io)
709             .finish()
710     }
711 }
712 
713 // B and T are never pinned
714 impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {}
715 
716 struct State {
717     allow_half_close: bool,
718     /// Re-usable HeaderMap to reduce allocating new ones.
719     cached_headers: Option<HeaderMap>,
720     /// If an error occurs when there wasn't a direct way to return it
721     /// back to the user, this is set.
722     error: Option<crate::Error>,
723     /// Current keep-alive status.
724     keep_alive: KA,
725     /// If mid-message, the HTTP Method that started it.
726     ///
727     /// This is used to know things such as if the message can include
728     /// a body or not.
729     method: Option<Method>,
730     title_case_headers: bool,
731     /// Set to true when the Dispatcher should poll read operations
732     /// again. See the `maybe_notify` method for more.
733     notify_read: bool,
734     /// State of allowed reads
735     reading: Reading,
736     /// State of allowed writes
737     writing: Writing,
738     /// An expected pending HTTP upgrade.
739     upgrade: Option<crate::upgrade::Pending>,
740     /// Either HTTP/1.0 or 1.1 connection
741     version: Version,
742 }
743 
744 #[derive(Debug)]
745 enum Reading {
746     Init,
747     Continue(Decoder),
748     Body(Decoder),
749     KeepAlive,
750     Closed,
751 }
752 
753 enum Writing {
754     Init,
755     Body(Encoder),
756     KeepAlive,
757     Closed,
758 }
759 
760 impl fmt::Debug for State {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result761     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
762         let mut builder = f.debug_struct("State");
763         builder
764             .field("reading", &self.reading)
765             .field("writing", &self.writing)
766             .field("keep_alive", &self.keep_alive);
767 
768         // Only show error field if it's interesting...
769         if let Some(ref error) = self.error {
770             builder.field("error", error);
771         }
772 
773         if self.allow_half_close {
774             builder.field("allow_half_close", &true);
775         }
776 
777         // Purposefully leaving off other fields..
778 
779         builder.finish()
780     }
781 }
782 
783 impl fmt::Debug for Writing {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result784     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
785         match *self {
786             Writing::Init => f.write_str("Init"),
787             Writing::Body(ref enc) => f.debug_tuple("Body").field(enc).finish(),
788             Writing::KeepAlive => f.write_str("KeepAlive"),
789             Writing::Closed => f.write_str("Closed"),
790         }
791     }
792 }
793 
794 impl std::ops::BitAndAssign<bool> for KA {
bitand_assign(&mut self, enabled: bool)795     fn bitand_assign(&mut self, enabled: bool) {
796         if !enabled {
797             trace!("remote disabling keep-alive");
798             *self = KA::Disabled;
799         }
800     }
801 }
802 
803 #[derive(Clone, Copy, Debug)]
804 enum KA {
805     Idle,
806     Busy,
807     Disabled,
808 }
809 
810 impl Default for KA {
default() -> KA811     fn default() -> KA {
812         KA::Busy
813     }
814 }
815 
816 impl KA {
idle(&mut self)817     fn idle(&mut self) {
818         *self = KA::Idle;
819     }
820 
busy(&mut self)821     fn busy(&mut self) {
822         *self = KA::Busy;
823     }
824 
disable(&mut self)825     fn disable(&mut self) {
826         *self = KA::Disabled;
827     }
828 
status(&self) -> KA829     fn status(&self) -> KA {
830         *self
831     }
832 }
833 
834 impl State {
close(&mut self)835     fn close(&mut self) {
836         trace!("State::close()");
837         self.reading = Reading::Closed;
838         self.writing = Writing::Closed;
839         self.keep_alive.disable();
840     }
841 
close_read(&mut self)842     fn close_read(&mut self) {
843         trace!("State::close_read()");
844         self.reading = Reading::Closed;
845         self.keep_alive.disable();
846     }
847 
close_write(&mut self)848     fn close_write(&mut self) {
849         trace!("State::close_write()");
850         self.writing = Writing::Closed;
851         self.keep_alive.disable();
852     }
853 
wants_keep_alive(&self) -> bool854     fn wants_keep_alive(&self) -> bool {
855         if let KA::Disabled = self.keep_alive.status() {
856             false
857         } else {
858             true
859         }
860     }
861 
try_keep_alive<T: Http1Transaction>(&mut self)862     fn try_keep_alive<T: Http1Transaction>(&mut self) {
863         match (&self.reading, &self.writing) {
864             (&Reading::KeepAlive, &Writing::KeepAlive) => {
865                 if let KA::Busy = self.keep_alive.status() {
866                     self.idle::<T>();
867                 } else {
868                     trace!(
869                         "try_keep_alive({}): could keep-alive, but status = {:?}",
870                         T::LOG,
871                         self.keep_alive
872                     );
873                     self.close();
874                 }
875             }
876             (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => {
877                 self.close()
878             }
879             _ => (),
880         }
881     }
882 
disable_keep_alive(&mut self)883     fn disable_keep_alive(&mut self) {
884         self.keep_alive.disable()
885     }
886 
busy(&mut self)887     fn busy(&mut self) {
888         if let KA::Disabled = self.keep_alive.status() {
889             return;
890         }
891         self.keep_alive.busy();
892     }
893 
idle<T: Http1Transaction>(&mut self)894     fn idle<T: Http1Transaction>(&mut self) {
895         debug_assert!(!self.is_idle(), "State::idle() called while idle");
896 
897         self.method = None;
898         self.keep_alive.idle();
899         if self.is_idle() {
900             self.reading = Reading::Init;
901             self.writing = Writing::Init;
902 
903             // !T::should_read_first() means Client.
904             //
905             // If Client connection has just gone idle, the Dispatcher
906             // should try the poll loop one more time, so as to poll the
907             // pending requests stream.
908             if !T::should_read_first() {
909                 self.notify_read = true;
910             }
911         } else {
912             self.close();
913         }
914     }
915 
is_idle(&self) -> bool916     fn is_idle(&self) -> bool {
917         if let KA::Idle = self.keep_alive.status() {
918             true
919         } else {
920             false
921         }
922     }
923 
is_read_closed(&self) -> bool924     fn is_read_closed(&self) -> bool {
925         match self.reading {
926             Reading::Closed => true,
927             _ => false,
928         }
929     }
930 
is_write_closed(&self) -> bool931     fn is_write_closed(&self) -> bool {
932         match self.writing {
933             Writing::Closed => true,
934             _ => false,
935         }
936     }
937 
prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade938     fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
939         let (tx, rx) = crate::upgrade::pending();
940         self.upgrade = Some(tx);
941         rx
942     }
943 }
944 
945 #[cfg(test)]
946 mod tests {
947     #[cfg(feature = "nightly")]
948     #[bench]
bench_read_head_short(b: &mut ::test::Bencher)949     fn bench_read_head_short(b: &mut ::test::Bencher) {
950         use super::*;
951         let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n";
952         let len = s.len();
953         b.bytes = len as u64;
954 
955         // an empty IO, we'll be skipping and using the read buffer anyways
956         let io = tokio_test::io::Builder::new().build();
957         let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io);
958         *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]);
959         conn.state.cached_headers = Some(HeaderMap::with_capacity(2));
960 
961         let mut rt = tokio::runtime::Builder::new()
962             .enable_all()
963             .basic_scheduler()
964             .build()
965             .unwrap();
966 
967         b.iter(|| {
968             rt.block_on(futures_util::future::poll_fn(|cx| {
969                 match conn.poll_read_head(cx) {
970                     Poll::Ready(Some(Ok(x))) => {
971                         ::test::black_box(&x);
972                         let mut headers = x.0.headers;
973                         headers.clear();
974                         conn.state.cached_headers = Some(headers);
975                     }
976                     f => panic!("expected Ready(Some(Ok(..))): {:?}", f),
977                 }
978 
979                 conn.io.read_buf_mut().reserve(1);
980                 unsafe {
981                     conn.io.read_buf_mut().set_len(len);
982                 }
983                 conn.state.reading = Reading::Init;
984                 Poll::Ready(())
985             }));
986         });
987     }
988 
989     /*
990     //TODO: rewrite these using dispatch... someday...
991     use futures::{Async, Future, Stream, Sink};
992     use futures::future;
993 
994     use proto::{self, ClientTransaction, MessageHead, ServerTransaction};
995     use super::super::Encoder;
996     use mock::AsyncIo;
997 
998     use super::{Conn, Decoder, Reading, Writing};
999     use ::uri::Uri;
1000 
1001     use std::str::FromStr;
1002 
1003     #[test]
1004     fn test_conn_init_read() {
1005         let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec();
1006         let len = good_message.len();
1007         let io = AsyncIo::new_buf(good_message, len);
1008         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1009 
1010         match conn.poll().unwrap() {
1011             Async::Ready(Some(Frame::Message { message, body: false })) => {
1012                 assert_eq!(message, MessageHead {
1013                     subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()),
1014                     .. MessageHead::default()
1015                 })
1016             },
1017             f => panic!("frame is not Frame::Message: {:?}", f)
1018         }
1019     }
1020 
1021     #[test]
1022     fn test_conn_parse_partial() {
1023         let _: Result<(), ()> = future::lazy(|| {
1024             let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec();
1025             let io = AsyncIo::new_buf(good_message, 10);
1026             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1027             assert!(conn.poll().unwrap().is_not_ready());
1028             conn.io.io_mut().block_in(50);
1029             let async = conn.poll().unwrap();
1030             assert!(async.is_ready());
1031             match async {
1032                 Async::Ready(Some(Frame::Message { .. })) => (),
1033                 f => panic!("frame is not Message: {:?}", f),
1034             }
1035             Ok(())
1036         }).wait();
1037     }
1038 
1039     #[test]
1040     fn test_conn_init_read_eof_idle() {
1041         let io = AsyncIo::new_buf(vec![], 1);
1042         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1043         conn.state.idle();
1044 
1045         match conn.poll().unwrap() {
1046             Async::Ready(None) => {},
1047             other => panic!("frame is not None: {:?}", other)
1048         }
1049     }
1050 
1051     #[test]
1052     fn test_conn_init_read_eof_idle_partial_parse() {
1053         let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100);
1054         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1055         conn.state.idle();
1056 
1057         match conn.poll() {
1058             Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1059             other => panic!("unexpected frame: {:?}", other)
1060         }
1061     }
1062 
1063     #[test]
1064     fn test_conn_init_read_eof_busy() {
1065         let _: Result<(), ()> = future::lazy(|| {
1066             // server ignores
1067             let io = AsyncIo::new_eof();
1068             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1069             conn.state.busy();
1070 
1071             match conn.poll().unwrap() {
1072                 Async::Ready(None) => {},
1073                 other => panic!("unexpected frame: {:?}", other)
1074             }
1075 
1076             // client
1077             let io = AsyncIo::new_eof();
1078             let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1079             conn.state.busy();
1080 
1081             match conn.poll() {
1082                 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1083                 other => panic!("unexpected frame: {:?}", other)
1084             }
1085             Ok(())
1086         }).wait();
1087     }
1088 
1089     #[test]
1090     fn test_conn_body_finish_read_eof() {
1091         let _: Result<(), ()> = future::lazy(|| {
1092             let io = AsyncIo::new_eof();
1093             let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1094             conn.state.busy();
1095             conn.state.writing = Writing::KeepAlive;
1096             conn.state.reading = Reading::Body(Decoder::length(0));
1097 
1098             match conn.poll() {
1099                 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1100                 other => panic!("unexpected frame: {:?}", other)
1101             }
1102 
1103             // conn eofs, but tokio-proto will call poll() again, before calling flush()
1104             // the conn eof in this case is perfectly fine
1105 
1106             match conn.poll() {
1107                 Ok(Async::Ready(None)) => (),
1108                 other => panic!("unexpected frame: {:?}", other)
1109             }
1110             Ok(())
1111         }).wait();
1112     }
1113 
1114     #[test]
1115     fn test_conn_message_empty_body_read_eof() {
1116         let _: Result<(), ()> = future::lazy(|| {
1117             let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024);
1118             let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1119             conn.state.busy();
1120             conn.state.writing = Writing::KeepAlive;
1121 
1122             match conn.poll() {
1123                 Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (),
1124                 other => panic!("unexpected frame: {:?}", other)
1125             }
1126 
1127             // conn eofs, but tokio-proto will call poll() again, before calling flush()
1128             // the conn eof in this case is perfectly fine
1129 
1130             match conn.poll() {
1131                 Ok(Async::Ready(None)) => (),
1132                 other => panic!("unexpected frame: {:?}", other)
1133             }
1134             Ok(())
1135         }).wait();
1136     }
1137 
1138     #[test]
1139     fn test_conn_read_body_end() {
1140         let _: Result<(), ()> = future::lazy(|| {
1141             let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024);
1142             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1143             conn.state.busy();
1144 
1145             match conn.poll() {
1146                 Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (),
1147                 other => panic!("unexpected frame: {:?}", other)
1148             }
1149 
1150             match conn.poll() {
1151                 Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (),
1152                 other => panic!("unexpected frame: {:?}", other)
1153             }
1154 
1155             // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None`
1156             match conn.poll() {
1157                 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1158                 other => panic!("unexpected frame: {:?}", other)
1159             }
1160 
1161             match conn.poll() {
1162                 Ok(Async::NotReady) => (),
1163                 other => panic!("unexpected frame: {:?}", other)
1164             }
1165             Ok(())
1166         }).wait();
1167     }
1168 
1169     #[test]
1170     fn test_conn_closed_read() {
1171         let io = AsyncIo::new_buf(vec![], 0);
1172         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1173         conn.state.close();
1174 
1175         match conn.poll().unwrap() {
1176             Async::Ready(None) => {},
1177             other => panic!("frame is not None: {:?}", other)
1178         }
1179     }
1180 
1181     #[test]
1182     fn test_conn_body_write_length() {
1183         let _ = pretty_env_logger::try_init();
1184         let _: Result<(), ()> = future::lazy(|| {
1185             let io = AsyncIo::new_buf(vec![], 0);
1186             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1187             let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096;
1188             conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64));
1189 
1190             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready());
1191             assert!(!conn.can_buffer_body());
1192 
1193             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready());
1194 
1195             conn.io.io_mut().block_in(1024 * 3);
1196             assert!(conn.poll_complete().unwrap().is_not_ready());
1197             conn.io.io_mut().block_in(1024 * 3);
1198             assert!(conn.poll_complete().unwrap().is_not_ready());
1199             conn.io.io_mut().block_in(max * 2);
1200             assert!(conn.poll_complete().unwrap().is_ready());
1201 
1202             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready());
1203             Ok(())
1204         }).wait();
1205     }
1206 
1207     #[test]
1208     fn test_conn_body_write_chunked() {
1209         let _: Result<(), ()> = future::lazy(|| {
1210             let io = AsyncIo::new_buf(vec![], 4096);
1211             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1212             conn.state.writing = Writing::Body(Encoder::chunked());
1213 
1214             assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready());
1215             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready());
1216             Ok(())
1217         }).wait();
1218     }
1219 
1220     #[test]
1221     fn test_conn_body_flush() {
1222         let _: Result<(), ()> = future::lazy(|| {
1223             let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5);
1224             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1225             conn.state.writing = Writing::Body(Encoder::length(1024 * 1024));
1226             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready());
1227             assert!(!conn.can_buffer_body());
1228             conn.io.io_mut().block_in(1024 * 1024 * 5);
1229             assert!(conn.poll_complete().unwrap().is_ready());
1230             assert!(conn.can_buffer_body());
1231             assert!(conn.io.io_mut().flushed());
1232 
1233             Ok(())
1234         }).wait();
1235     }
1236 
1237     #[test]
1238     fn test_conn_parking() {
1239         use std::sync::Arc;
1240         use futures::executor::Notify;
1241         use futures::executor::NotifyHandle;
1242 
1243         struct Car {
1244             permit: bool,
1245         }
1246         impl Notify for Car {
1247             fn notify(&self, _id: usize) {
1248                 assert!(self.permit, "unparked without permit");
1249             }
1250         }
1251 
1252         fn car(permit: bool) -> NotifyHandle {
1253             Arc::new(Car {
1254                 permit: permit,
1255             }).into()
1256         }
1257 
1258         // test that once writing is done, unparks
1259         let f = future::lazy(|| {
1260             let io = AsyncIo::new_buf(vec![], 4096);
1261             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1262             conn.state.reading = Reading::KeepAlive;
1263             assert!(conn.poll().unwrap().is_not_ready());
1264 
1265             conn.state.writing = Writing::KeepAlive;
1266             assert!(conn.poll_complete().unwrap().is_ready());
1267             Ok::<(), ()>(())
1268         });
1269         ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap();
1270 
1271 
1272         // test that flushing when not waiting on read doesn't unpark
1273         let f = future::lazy(|| {
1274             let io = AsyncIo::new_buf(vec![], 4096);
1275             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1276             conn.state.writing = Writing::KeepAlive;
1277             assert!(conn.poll_complete().unwrap().is_ready());
1278             Ok::<(), ()>(())
1279         });
1280         ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1281 
1282 
1283         // test that flushing and writing isn't done doesn't unpark
1284         let f = future::lazy(|| {
1285             let io = AsyncIo::new_buf(vec![], 4096);
1286             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1287             conn.state.reading = Reading::KeepAlive;
1288             assert!(conn.poll().unwrap().is_not_ready());
1289             conn.state.writing = Writing::Body(Encoder::length(5_000));
1290             assert!(conn.poll_complete().unwrap().is_ready());
1291             Ok::<(), ()>(())
1292         });
1293         ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1294     }
1295 
1296     #[test]
1297     fn test_conn_closed_write() {
1298         let io = AsyncIo::new_buf(vec![], 0);
1299         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1300         conn.state.close();
1301 
1302         match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) {
1303             Err(_e) => {},
1304             other => panic!("did not return Err: {:?}", other)
1305         }
1306 
1307         assert!(conn.state.is_write_closed());
1308     }
1309 
1310     #[test]
1311     fn test_conn_write_empty_chunk() {
1312         let io = AsyncIo::new_buf(vec![], 0);
1313         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1314         conn.state.writing = Writing::KeepAlive;
1315 
1316         assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready());
1317         assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready());
1318         conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err();
1319     }
1320     */
1321 }
1322