1 use std::fmt; 2 use std::io::{self}; 3 use std::marker::PhantomData; 4 5 use bytes::{Buf, Bytes}; 6 use http::header::{HeaderValue, CONNECTION}; 7 use http::{HeaderMap, Method, Version}; 8 use tokio::io::{AsyncRead, AsyncWrite}; 9 10 use super::io::Buffered; 11 use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants}; 12 use crate::common::{task, Pin, Poll, Unpin}; 13 use crate::headers::connection_keep_alive; 14 use crate::proto::{BodyLength, DecodedLength, MessageHead}; 15 use crate::Result; 16 17 const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; 18 19 /// This handles a connection, which will have been established over an 20 /// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple 21 /// `Transaction`s over HTTP. 22 /// 23 /// The connection will determine when a message begins and ends as well as 24 /// determine if this connection can be kept alive after the message, 25 /// or if it is complete. 26 pub(crate) struct Conn<I, B, T> { 27 io: Buffered<I, EncodedBuf<B>>, 28 state: State, 29 _marker: PhantomData<fn(T)>, 30 } 31 32 impl<I, B, T> Conn<I, B, T> 33 where 34 I: AsyncRead + AsyncWrite + Unpin, 35 B: Buf, 36 T: Http1Transaction, 37 { new(io: I) -> Conn<I, B, T>38 pub fn new(io: I) -> Conn<I, B, T> { 39 Conn { 40 io: Buffered::new(io), 41 state: State { 42 allow_half_close: false, 43 cached_headers: None, 44 error: None, 45 keep_alive: KA::Busy, 46 method: None, 47 title_case_headers: false, 48 notify_read: false, 49 reading: Reading::Init, 50 writing: Writing::Init, 51 upgrade: None, 52 // We assume a modern world where the remote speaks HTTP/1.1. 53 // If they tell us otherwise, we'll downgrade in `read_head`. 54 version: Version::HTTP_11, 55 }, 56 _marker: PhantomData, 57 } 58 } 59 set_flush_pipeline(&mut self, enabled: bool)60 pub fn set_flush_pipeline(&mut self, enabled: bool) { 61 self.io.set_flush_pipeline(enabled); 62 } 63 set_max_buf_size(&mut self, max: usize)64 pub fn set_max_buf_size(&mut self, max: usize) { 65 self.io.set_max_buf_size(max); 66 } 67 set_read_buf_exact_size(&mut self, sz: usize)68 pub fn set_read_buf_exact_size(&mut self, sz: usize) { 69 self.io.set_read_buf_exact_size(sz); 70 } 71 set_write_strategy_flatten(&mut self)72 pub fn set_write_strategy_flatten(&mut self) { 73 self.io.set_write_strategy_flatten(); 74 } 75 set_write_strategy_queue(&mut self)76 pub fn set_write_strategy_queue(&mut self) { 77 self.io.set_write_strategy_queue(); 78 } 79 set_title_case_headers(&mut self)80 pub fn set_title_case_headers(&mut self) { 81 self.state.title_case_headers = true; 82 } 83 set_allow_half_close(&mut self)84 pub(crate) fn set_allow_half_close(&mut self) { 85 self.state.allow_half_close = true; 86 } 87 into_inner(self) -> (I, Bytes)88 pub fn into_inner(self) -> (I, Bytes) { 89 self.io.into_inner() 90 } 91 pending_upgrade(&mut self) -> Option<crate::upgrade::Pending>92 pub fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> { 93 self.state.upgrade.take() 94 } 95 is_read_closed(&self) -> bool96 pub fn is_read_closed(&self) -> bool { 97 self.state.is_read_closed() 98 } 99 is_write_closed(&self) -> bool100 pub fn is_write_closed(&self) -> bool { 101 self.state.is_write_closed() 102 } 103 can_read_head(&self) -> bool104 pub fn can_read_head(&self) -> bool { 105 match self.state.reading { 106 Reading::Init => { 107 if T::should_read_first() { 108 true 109 } else { 110 match self.state.writing { 111 Writing::Init => false, 112 _ => true, 113 } 114 } 115 } 116 _ => false, 117 } 118 } 119 can_read_body(&self) -> bool120 pub fn can_read_body(&self) -> bool { 121 match self.state.reading { 122 Reading::Body(..) | Reading::Continue(..) => true, 123 _ => false, 124 } 125 } 126 should_error_on_eof(&self) -> bool127 fn should_error_on_eof(&self) -> bool { 128 // If we're idle, it's probably just the connection closing gracefully. 129 T::should_error_on_parse_eof() && !self.state.is_idle() 130 } 131 has_h2_prefix(&self) -> bool132 fn has_h2_prefix(&self) -> bool { 133 let read_buf = self.io.read_buf(); 134 read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE 135 } 136 poll_read_head( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>>137 pub(super) fn poll_read_head( 138 &mut self, 139 cx: &mut task::Context<'_>, 140 ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> { 141 debug_assert!(self.can_read_head()); 142 trace!("Conn::read_head"); 143 144 let msg = match ready!(self.io.parse::<T>( 145 cx, 146 ParseContext { 147 cached_headers: &mut self.state.cached_headers, 148 req_method: &mut self.state.method, 149 } 150 )) { 151 Ok(msg) => msg, 152 Err(e) => return self.on_read_head_error(e), 153 }; 154 155 // Note: don't deconstruct `msg` into local variables, it appears 156 // the optimizer doesn't remove the extra copies. 157 158 debug!("incoming body is {}", msg.decode); 159 160 self.state.busy(); 161 self.state.keep_alive &= msg.keep_alive; 162 self.state.version = msg.head.version; 163 164 let mut wants = if msg.wants_upgrade { 165 Wants::UPGRADE 166 } else { 167 Wants::EMPTY 168 }; 169 170 if msg.decode == DecodedLength::ZERO { 171 if msg.expect_continue { 172 debug!("ignoring expect-continue since body is empty"); 173 } 174 self.state.reading = Reading::KeepAlive; 175 if !T::should_read_first() { 176 self.try_keep_alive(cx); 177 } 178 } else if msg.expect_continue { 179 self.state.reading = Reading::Continue(Decoder::new(msg.decode)); 180 wants = wants.add(Wants::EXPECT); 181 } else { 182 self.state.reading = Reading::Body(Decoder::new(msg.decode)); 183 } 184 185 Poll::Ready(Some(Ok((msg.head, msg.decode, wants)))) 186 } 187 on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>>188 fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> { 189 // If we are currently waiting on a message, then an empty 190 // message should be reported as an error. If not, it is just 191 // the connection closing gracefully. 192 let must_error = self.should_error_on_eof(); 193 self.close_read(); 194 self.io.consume_leading_lines(); 195 let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty(); 196 if was_mid_parse || must_error { 197 // We check if the buf contains the h2 Preface 198 debug!( 199 "parse error ({}) with {} bytes", 200 e, 201 self.io.read_buf().len() 202 ); 203 match self.on_parse_error(e) { 204 Ok(()) => Poll::Pending, // XXX: wat? 205 Err(e) => Poll::Ready(Some(Err(e))), 206 } 207 } else { 208 debug!("read eof"); 209 self.close_write(); 210 Poll::Ready(None) 211 } 212 } 213 poll_read_body( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<io::Result<Bytes>>>214 pub fn poll_read_body( 215 &mut self, 216 cx: &mut task::Context<'_>, 217 ) -> Poll<Option<io::Result<Bytes>>> { 218 debug_assert!(self.can_read_body()); 219 220 let (reading, ret) = match self.state.reading { 221 Reading::Body(ref mut decoder) => { 222 match ready!(decoder.decode(cx, &mut self.io)) { 223 Ok(slice) => { 224 let (reading, chunk) = if decoder.is_eof() { 225 debug!("incoming body completed"); 226 ( 227 Reading::KeepAlive, 228 if !slice.is_empty() { 229 Some(Ok(slice)) 230 } else { 231 None 232 }, 233 ) 234 } else if slice.is_empty() { 235 error!("incoming body unexpectedly ended"); 236 // This should be unreachable, since all 3 decoders 237 // either set eof=true or return an Err when reading 238 // an empty slice... 239 (Reading::Closed, None) 240 } else { 241 return Poll::Ready(Some(Ok(slice))); 242 }; 243 (reading, Poll::Ready(chunk)) 244 } 245 Err(e) => { 246 debug!("incoming body decode error: {}", e); 247 (Reading::Closed, Poll::Ready(Some(Err(e)))) 248 } 249 } 250 } 251 Reading::Continue(ref decoder) => { 252 // Write the 100 Continue if not already responded... 253 if let Writing::Init = self.state.writing { 254 trace!("automatically sending 100 Continue"); 255 let cont = b"HTTP/1.1 100 Continue\r\n\r\n"; 256 self.io.headers_buf().extend_from_slice(cont); 257 } 258 259 // And now recurse once in the Reading::Body state... 260 self.state.reading = Reading::Body(decoder.clone()); 261 return self.poll_read_body(cx); 262 } 263 _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading), 264 }; 265 266 self.state.reading = reading; 267 self.try_keep_alive(cx); 268 ret 269 } 270 wants_read_again(&mut self) -> bool271 pub fn wants_read_again(&mut self) -> bool { 272 let ret = self.state.notify_read; 273 self.state.notify_read = false; 274 ret 275 } 276 poll_read_keep_alive(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>277 pub fn poll_read_keep_alive(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 278 debug_assert!(!self.can_read_head() && !self.can_read_body()); 279 280 if self.is_read_closed() { 281 Poll::Pending 282 } else if self.is_mid_message() { 283 self.mid_message_detect_eof(cx) 284 } else { 285 self.require_empty_read(cx) 286 } 287 } 288 is_mid_message(&self) -> bool289 fn is_mid_message(&self) -> bool { 290 match (&self.state.reading, &self.state.writing) { 291 (&Reading::Init, &Writing::Init) => false, 292 _ => true, 293 } 294 } 295 296 // This will check to make sure the io object read is empty. 297 // 298 // This should only be called for Clients wanting to enter the idle 299 // state. require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>300 fn require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 301 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); 302 debug_assert!(!self.is_mid_message()); 303 debug_assert!(T::is_client()); 304 305 if !self.io.read_buf().is_empty() { 306 debug!("received an unexpected {} bytes", self.io.read_buf().len()); 307 return Poll::Ready(Err(crate::Error::new_unexpected_message())); 308 } 309 310 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?; 311 312 if num_read == 0 { 313 let ret = if self.should_error_on_eof() { 314 trace!("found unexpected EOF on busy connection: {:?}", self.state); 315 Poll::Ready(Err(crate::Error::new_incomplete())) 316 } else { 317 trace!("found EOF on idle connection, closing"); 318 Poll::Ready(Ok(())) 319 }; 320 321 // order is important: should_error needs state BEFORE close_read 322 self.state.close_read(); 323 return ret; 324 } 325 326 debug!( 327 "received unexpected {} bytes on an idle connection", 328 num_read 329 ); 330 Poll::Ready(Err(crate::Error::new_unexpected_message())) 331 } 332 mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>333 fn mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 334 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); 335 debug_assert!(self.is_mid_message()); 336 337 if self.state.allow_half_close || !self.io.read_buf().is_empty() { 338 return Poll::Pending; 339 } 340 341 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?; 342 343 if num_read == 0 { 344 trace!("found unexpected EOF on busy connection: {:?}", self.state); 345 self.state.close_read(); 346 Poll::Ready(Err(crate::Error::new_incomplete())) 347 } else { 348 Poll::Ready(Ok(())) 349 } 350 } 351 force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>>352 fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>> { 353 debug_assert!(!self.state.is_read_closed()); 354 355 let result = ready!(self.io.poll_read_from_io(cx)); 356 Poll::Ready(result.map_err(|e| { 357 trace!("force_io_read; io error = {:?}", e); 358 self.state.close(); 359 e 360 })) 361 } 362 maybe_notify(&mut self, cx: &mut task::Context<'_>)363 fn maybe_notify(&mut self, cx: &mut task::Context<'_>) { 364 // its possible that we returned NotReady from poll() without having 365 // exhausted the underlying Io. We would have done this when we 366 // determined we couldn't keep reading until we knew how writing 367 // would finish. 368 369 match self.state.reading { 370 Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => { 371 return 372 } 373 Reading::Init => (), 374 }; 375 376 match self.state.writing { 377 Writing::Body(..) => return, 378 Writing::Init | Writing::KeepAlive | Writing::Closed => (), 379 } 380 381 if !self.io.is_read_blocked() { 382 if self.io.read_buf().is_empty() { 383 match self.io.poll_read_from_io(cx) { 384 Poll::Ready(Ok(n)) => { 385 if n == 0 { 386 trace!("maybe_notify; read eof"); 387 if self.state.is_idle() { 388 self.state.close(); 389 } else { 390 self.close_read() 391 } 392 return; 393 } 394 } 395 Poll::Pending => { 396 trace!("maybe_notify; read_from_io blocked"); 397 return; 398 } 399 Poll::Ready(Err(e)) => { 400 trace!("maybe_notify; read_from_io error: {}", e); 401 self.state.close(); 402 self.state.error = Some(crate::Error::new_io(e)); 403 } 404 } 405 } 406 self.state.notify_read = true; 407 } 408 } 409 try_keep_alive(&mut self, cx: &mut task::Context<'_>)410 fn try_keep_alive(&mut self, cx: &mut task::Context<'_>) { 411 self.state.try_keep_alive::<T>(); 412 self.maybe_notify(cx); 413 } 414 can_write_head(&self) -> bool415 pub fn can_write_head(&self) -> bool { 416 if !T::should_read_first() { 417 if let Reading::Closed = self.state.reading { 418 return false; 419 } 420 } 421 match self.state.writing { 422 Writing::Init => true, 423 _ => false, 424 } 425 } 426 can_write_body(&self) -> bool427 pub fn can_write_body(&self) -> bool { 428 match self.state.writing { 429 Writing::Body(..) => true, 430 Writing::Init | Writing::KeepAlive | Writing::Closed => false, 431 } 432 } 433 can_buffer_body(&self) -> bool434 pub fn can_buffer_body(&self) -> bool { 435 self.io.can_buffer() 436 } 437 write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>)438 pub fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) { 439 if let Some(encoder) = self.encode_head(head, body) { 440 self.state.writing = if !encoder.is_eof() { 441 Writing::Body(encoder) 442 } else if encoder.is_last() { 443 Writing::Closed 444 } else { 445 Writing::KeepAlive 446 }; 447 } 448 } 449 write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B)450 pub fn write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B) { 451 if let Some(encoder) = 452 self.encode_head(head, Some(BodyLength::Known(body.remaining() as u64))) 453 { 454 let is_last = encoder.is_last(); 455 // Make sure we don't write a body if we weren't actually allowed 456 // to do so, like because its a HEAD request. 457 if !encoder.is_eof() { 458 encoder.danger_full_buf(body, self.io.write_buf()); 459 } 460 self.state.writing = if is_last { 461 Writing::Closed 462 } else { 463 Writing::KeepAlive 464 } 465 } 466 } 467 encode_head( &mut self, mut head: MessageHead<T::Outgoing>, body: Option<BodyLength>, ) -> Option<Encoder>468 fn encode_head( 469 &mut self, 470 mut head: MessageHead<T::Outgoing>, 471 body: Option<BodyLength>, 472 ) -> Option<Encoder> { 473 debug_assert!(self.can_write_head()); 474 475 if !T::should_read_first() { 476 self.state.busy(); 477 } 478 479 self.enforce_version(&mut head); 480 481 let buf = self.io.headers_buf(); 482 match super::role::encode_headers::<T>( 483 Encode { 484 head: &mut head, 485 body, 486 keep_alive: self.state.wants_keep_alive(), 487 req_method: &mut self.state.method, 488 title_case_headers: self.state.title_case_headers, 489 }, 490 buf, 491 ) { 492 Ok(encoder) => { 493 debug_assert!(self.state.cached_headers.is_none()); 494 debug_assert!(head.headers.is_empty()); 495 self.state.cached_headers = Some(head.headers); 496 Some(encoder) 497 } 498 Err(err) => { 499 self.state.error = Some(err); 500 self.state.writing = Writing::Closed; 501 None 502 } 503 } 504 } 505 506 // Fix keep-alives when Connection: keep-alive header is not present fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>)507 fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) { 508 let outgoing_is_keep_alive = head 509 .headers 510 .get(CONNECTION) 511 .map(connection_keep_alive) 512 .unwrap_or(false); 513 514 if !outgoing_is_keep_alive { 515 match head.version { 516 // If response is version 1.0 and keep-alive is not present in the response, 517 // disable keep-alive so the server closes the connection 518 Version::HTTP_10 => self.state.disable_keep_alive(), 519 // If response is version 1.1 and keep-alive is wanted, add 520 // Connection: keep-alive header when not present 521 Version::HTTP_11 => { 522 if self.state.wants_keep_alive() { 523 head.headers 524 .insert(CONNECTION, HeaderValue::from_static("keep-alive")); 525 } 526 } 527 _ => (), 528 } 529 } 530 } 531 532 // If we know the remote speaks an older version, we try to fix up any messages 533 // to work with our older peer. enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>)534 fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) { 535 if let Version::HTTP_10 = self.state.version { 536 // Fixes response or connection when keep-alive header is not present 537 self.fix_keep_alive(head); 538 // If the remote only knows HTTP/1.0, we should force ourselves 539 // to do only speak HTTP/1.0 as well. 540 head.version = Version::HTTP_10; 541 } 542 // If the remote speaks HTTP/1.1, then it *should* be fine with 543 // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let 544 // the user's headers be. 545 } 546 write_body(&mut self, chunk: B)547 pub fn write_body(&mut self, chunk: B) { 548 debug_assert!(self.can_write_body() && self.can_buffer_body()); 549 // empty chunks should be discarded at Dispatcher level 550 debug_assert!(chunk.remaining() != 0); 551 552 let state = match self.state.writing { 553 Writing::Body(ref mut encoder) => { 554 self.io.buffer(encoder.encode(chunk)); 555 556 if encoder.is_eof() { 557 if encoder.is_last() { 558 Writing::Closed 559 } else { 560 Writing::KeepAlive 561 } 562 } else { 563 return; 564 } 565 } 566 _ => unreachable!("write_body invalid state: {:?}", self.state.writing), 567 }; 568 569 self.state.writing = state; 570 } 571 write_body_and_end(&mut self, chunk: B)572 pub fn write_body_and_end(&mut self, chunk: B) { 573 debug_assert!(self.can_write_body() && self.can_buffer_body()); 574 // empty chunks should be discarded at Dispatcher level 575 debug_assert!(chunk.remaining() != 0); 576 577 let state = match self.state.writing { 578 Writing::Body(ref encoder) => { 579 let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf()); 580 if can_keep_alive { 581 Writing::KeepAlive 582 } else { 583 Writing::Closed 584 } 585 } 586 _ => unreachable!("write_body invalid state: {:?}", self.state.writing), 587 }; 588 589 self.state.writing = state; 590 } 591 end_body(&mut self) -> Result<()>592 pub fn end_body(&mut self) -> Result<()> { 593 debug_assert!(self.can_write_body()); 594 595 let state = match self.state.writing { 596 Writing::Body(ref mut encoder) => { 597 // end of stream, that means we should try to eof 598 match encoder.end() { 599 Ok(end) => { 600 if let Some(end) = end { 601 self.io.buffer(end); 602 } 603 if encoder.is_last() { 604 Writing::Closed 605 } else { 606 Writing::KeepAlive 607 } 608 } 609 Err(_not_eof) => { 610 return Err(crate::Error::new_user_body( 611 crate::Error::new_body_write_aborted(), 612 )) 613 } 614 } 615 } 616 _ => return Ok(()), 617 }; 618 619 self.state.writing = state; 620 Ok(()) 621 } 622 623 // When we get a parse error, depending on what side we are, we might be able 624 // to write a response before closing the connection. 625 // 626 // - Client: there is nothing we can do 627 // - Server: if Response hasn't been written yet, we can send a 4xx response on_parse_error(&mut self, err: crate::Error) -> crate::Result<()>628 fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> { 629 if let Writing::Init = self.state.writing { 630 if self.has_h2_prefix() { 631 return Err(crate::Error::new_version_h2()); 632 } 633 if let Some(msg) = T::on_error(&err) { 634 // Drop the cached headers so as to not trigger a debug 635 // assert in `write_head`... 636 self.state.cached_headers.take(); 637 self.write_head(msg, None); 638 self.state.error = Some(err); 639 return Ok(()); 640 } 641 } 642 643 // fallback is pass the error back up 644 Err(err) 645 } 646 poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>>647 pub fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { 648 ready!(Pin::new(&mut self.io).poll_flush(cx))?; 649 self.try_keep_alive(cx); 650 trace!("flushed({}): {:?}", T::LOG, self.state); 651 Poll::Ready(Ok(())) 652 } 653 poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>>654 pub fn poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { 655 match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) { 656 Ok(()) => { 657 trace!("shut down IO complete"); 658 Poll::Ready(Ok(())) 659 } 660 Err(e) => { 661 debug!("error shutting down IO: {}", e); 662 Poll::Ready(Err(e)) 663 } 664 } 665 } 666 667 /// If the read side can be cheaply drained, do so. Otherwise, close. poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>)668 pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>) { 669 let _ = self.poll_read_body(cx); 670 671 // If still in Reading::Body, just give up 672 match self.state.reading { 673 Reading::Init | Reading::KeepAlive => { 674 trace!("body drained"); 675 return; 676 } 677 _ => self.close_read(), 678 } 679 } 680 close_read(&mut self)681 pub fn close_read(&mut self) { 682 self.state.close_read(); 683 } 684 close_write(&mut self)685 pub fn close_write(&mut self) { 686 self.state.close_write(); 687 } 688 disable_keep_alive(&mut self)689 pub fn disable_keep_alive(&mut self) { 690 if self.state.is_idle() { 691 trace!("disable_keep_alive; closing idle connection"); 692 self.state.close(); 693 } else { 694 trace!("disable_keep_alive; in-progress connection"); 695 self.state.disable_keep_alive(); 696 } 697 } 698 take_error(&mut self) -> crate::Result<()>699 pub fn take_error(&mut self) -> crate::Result<()> { 700 if let Some(err) = self.state.error.take() { 701 Err(err) 702 } else { 703 Ok(()) 704 } 705 } 706 on_upgrade(&mut self) -> crate::upgrade::OnUpgrade707 pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade { 708 trace!("{}: prepare possible HTTP upgrade", T::LOG); 709 self.state.prepare_upgrade() 710 } 711 } 712 713 impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result714 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 715 f.debug_struct("Conn") 716 .field("state", &self.state) 717 .field("io", &self.io) 718 .finish() 719 } 720 } 721 722 // B and T are never pinned 723 impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {} 724 725 struct State { 726 allow_half_close: bool, 727 /// Re-usable HeaderMap to reduce allocating new ones. 728 cached_headers: Option<HeaderMap>, 729 /// If an error occurs when there wasn't a direct way to return it 730 /// back to the user, this is set. 731 error: Option<crate::Error>, 732 /// Current keep-alive status. 733 keep_alive: KA, 734 /// If mid-message, the HTTP Method that started it. 735 /// 736 /// This is used to know things such as if the message can include 737 /// a body or not. 738 method: Option<Method>, 739 title_case_headers: bool, 740 /// Set to true when the Dispatcher should poll read operations 741 /// again. See the `maybe_notify` method for more. 742 notify_read: bool, 743 /// State of allowed reads 744 reading: Reading, 745 /// State of allowed writes 746 writing: Writing, 747 /// An expected pending HTTP upgrade. 748 upgrade: Option<crate::upgrade::Pending>, 749 /// Either HTTP/1.0 or 1.1 connection 750 version: Version, 751 } 752 753 #[derive(Debug)] 754 enum Reading { 755 Init, 756 Continue(Decoder), 757 Body(Decoder), 758 KeepAlive, 759 Closed, 760 } 761 762 enum Writing { 763 Init, 764 Body(Encoder), 765 KeepAlive, 766 Closed, 767 } 768 769 impl fmt::Debug for State { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result770 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 771 let mut builder = f.debug_struct("State"); 772 builder 773 .field("reading", &self.reading) 774 .field("writing", &self.writing) 775 .field("keep_alive", &self.keep_alive); 776 777 // Only show error field if it's interesting... 778 if let Some(ref error) = self.error { 779 builder.field("error", error); 780 } 781 782 if self.allow_half_close { 783 builder.field("allow_half_close", &true); 784 } 785 786 // Purposefully leaving off other fields.. 787 788 builder.finish() 789 } 790 } 791 792 impl fmt::Debug for Writing { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result793 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 794 match *self { 795 Writing::Init => f.write_str("Init"), 796 Writing::Body(ref enc) => f.debug_tuple("Body").field(enc).finish(), 797 Writing::KeepAlive => f.write_str("KeepAlive"), 798 Writing::Closed => f.write_str("Closed"), 799 } 800 } 801 } 802 803 impl std::ops::BitAndAssign<bool> for KA { bitand_assign(&mut self, enabled: bool)804 fn bitand_assign(&mut self, enabled: bool) { 805 if !enabled { 806 trace!("remote disabling keep-alive"); 807 *self = KA::Disabled; 808 } 809 } 810 } 811 812 #[derive(Clone, Copy, Debug)] 813 enum KA { 814 Idle, 815 Busy, 816 Disabled, 817 } 818 819 impl Default for KA { default() -> KA820 fn default() -> KA { 821 KA::Busy 822 } 823 } 824 825 impl KA { idle(&mut self)826 fn idle(&mut self) { 827 *self = KA::Idle; 828 } 829 busy(&mut self)830 fn busy(&mut self) { 831 *self = KA::Busy; 832 } 833 disable(&mut self)834 fn disable(&mut self) { 835 *self = KA::Disabled; 836 } 837 status(&self) -> KA838 fn status(&self) -> KA { 839 *self 840 } 841 } 842 843 impl State { close(&mut self)844 fn close(&mut self) { 845 trace!("State::close()"); 846 self.reading = Reading::Closed; 847 self.writing = Writing::Closed; 848 self.keep_alive.disable(); 849 } 850 close_read(&mut self)851 fn close_read(&mut self) { 852 trace!("State::close_read()"); 853 self.reading = Reading::Closed; 854 self.keep_alive.disable(); 855 } 856 close_write(&mut self)857 fn close_write(&mut self) { 858 trace!("State::close_write()"); 859 self.writing = Writing::Closed; 860 self.keep_alive.disable(); 861 } 862 wants_keep_alive(&self) -> bool863 fn wants_keep_alive(&self) -> bool { 864 if let KA::Disabled = self.keep_alive.status() { 865 false 866 } else { 867 true 868 } 869 } 870 try_keep_alive<T: Http1Transaction>(&mut self)871 fn try_keep_alive<T: Http1Transaction>(&mut self) { 872 match (&self.reading, &self.writing) { 873 (&Reading::KeepAlive, &Writing::KeepAlive) => { 874 if let KA::Busy = self.keep_alive.status() { 875 self.idle::<T>(); 876 } else { 877 trace!( 878 "try_keep_alive({}): could keep-alive, but status = {:?}", 879 T::LOG, 880 self.keep_alive 881 ); 882 self.close(); 883 } 884 } 885 (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => { 886 self.close() 887 } 888 _ => (), 889 } 890 } 891 disable_keep_alive(&mut self)892 fn disable_keep_alive(&mut self) { 893 self.keep_alive.disable() 894 } 895 busy(&mut self)896 fn busy(&mut self) { 897 if let KA::Disabled = self.keep_alive.status() { 898 return; 899 } 900 self.keep_alive.busy(); 901 } 902 idle<T: Http1Transaction>(&mut self)903 fn idle<T: Http1Transaction>(&mut self) { 904 debug_assert!(!self.is_idle(), "State::idle() called while idle"); 905 906 self.method = None; 907 self.keep_alive.idle(); 908 if self.is_idle() { 909 self.reading = Reading::Init; 910 self.writing = Writing::Init; 911 912 // !T::should_read_first() means Client. 913 // 914 // If Client connection has just gone idle, the Dispatcher 915 // should try the poll loop one more time, so as to poll the 916 // pending requests stream. 917 if !T::should_read_first() { 918 self.notify_read = true; 919 } 920 } else { 921 self.close(); 922 } 923 } 924 is_idle(&self) -> bool925 fn is_idle(&self) -> bool { 926 if let KA::Idle = self.keep_alive.status() { 927 true 928 } else { 929 false 930 } 931 } 932 is_read_closed(&self) -> bool933 fn is_read_closed(&self) -> bool { 934 match self.reading { 935 Reading::Closed => true, 936 _ => false, 937 } 938 } 939 is_write_closed(&self) -> bool940 fn is_write_closed(&self) -> bool { 941 match self.writing { 942 Writing::Closed => true, 943 _ => false, 944 } 945 } 946 prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade947 fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade { 948 let (tx, rx) = crate::upgrade::pending(); 949 self.upgrade = Some(tx); 950 rx 951 } 952 } 953 954 #[cfg(test)] 955 mod tests { 956 #[cfg(feature = "nightly")] 957 #[bench] bench_read_head_short(b: &mut ::test::Bencher)958 fn bench_read_head_short(b: &mut ::test::Bencher) { 959 use super::*; 960 let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n"; 961 let len = s.len(); 962 b.bytes = len as u64; 963 964 // an empty IO, we'll be skipping and using the read buffer anyways 965 let io = tokio_test::io::Builder::new().build(); 966 let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io); 967 *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]); 968 conn.state.cached_headers = Some(HeaderMap::with_capacity(2)); 969 970 let mut rt = tokio::runtime::Builder::new() 971 .enable_all() 972 .basic_scheduler() 973 .build() 974 .unwrap(); 975 976 b.iter(|| { 977 rt.block_on(futures_util::future::poll_fn(|cx| { 978 match conn.poll_read_head(cx) { 979 Poll::Ready(Some(Ok(x))) => { 980 ::test::black_box(&x); 981 let mut headers = x.0.headers; 982 headers.clear(); 983 conn.state.cached_headers = Some(headers); 984 } 985 f => panic!("expected Ready(Some(Ok(..))): {:?}", f), 986 } 987 988 conn.io.read_buf_mut().reserve(1); 989 unsafe { 990 conn.io.read_buf_mut().set_len(len); 991 } 992 conn.state.reading = Reading::Init; 993 Poll::Ready(()) 994 })); 995 }); 996 } 997 998 /* 999 //TODO: rewrite these using dispatch... someday... 1000 use futures::{Async, Future, Stream, Sink}; 1001 use futures::future; 1002 1003 use proto::{self, ClientTransaction, MessageHead, ServerTransaction}; 1004 use super::super::Encoder; 1005 use mock::AsyncIo; 1006 1007 use super::{Conn, Decoder, Reading, Writing}; 1008 use ::uri::Uri; 1009 1010 use std::str::FromStr; 1011 1012 #[test] 1013 fn test_conn_init_read() { 1014 let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec(); 1015 let len = good_message.len(); 1016 let io = AsyncIo::new_buf(good_message, len); 1017 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1018 1019 match conn.poll().unwrap() { 1020 Async::Ready(Some(Frame::Message { message, body: false })) => { 1021 assert_eq!(message, MessageHead { 1022 subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()), 1023 .. MessageHead::default() 1024 }) 1025 }, 1026 f => panic!("frame is not Frame::Message: {:?}", f) 1027 } 1028 } 1029 1030 #[test] 1031 fn test_conn_parse_partial() { 1032 let _: Result<(), ()> = future::lazy(|| { 1033 let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec(); 1034 let io = AsyncIo::new_buf(good_message, 10); 1035 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1036 assert!(conn.poll().unwrap().is_not_ready()); 1037 conn.io.io_mut().block_in(50); 1038 let async = conn.poll().unwrap(); 1039 assert!(async.is_ready()); 1040 match async { 1041 Async::Ready(Some(Frame::Message { .. })) => (), 1042 f => panic!("frame is not Message: {:?}", f), 1043 } 1044 Ok(()) 1045 }).wait(); 1046 } 1047 1048 #[test] 1049 fn test_conn_init_read_eof_idle() { 1050 let io = AsyncIo::new_buf(vec![], 1); 1051 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1052 conn.state.idle(); 1053 1054 match conn.poll().unwrap() { 1055 Async::Ready(None) => {}, 1056 other => panic!("frame is not None: {:?}", other) 1057 } 1058 } 1059 1060 #[test] 1061 fn test_conn_init_read_eof_idle_partial_parse() { 1062 let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100); 1063 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1064 conn.state.idle(); 1065 1066 match conn.poll() { 1067 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {}, 1068 other => panic!("unexpected frame: {:?}", other) 1069 } 1070 } 1071 1072 #[test] 1073 fn test_conn_init_read_eof_busy() { 1074 let _: Result<(), ()> = future::lazy(|| { 1075 // server ignores 1076 let io = AsyncIo::new_eof(); 1077 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1078 conn.state.busy(); 1079 1080 match conn.poll().unwrap() { 1081 Async::Ready(None) => {}, 1082 other => panic!("unexpected frame: {:?}", other) 1083 } 1084 1085 // client 1086 let io = AsyncIo::new_eof(); 1087 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); 1088 conn.state.busy(); 1089 1090 match conn.poll() { 1091 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {}, 1092 other => panic!("unexpected frame: {:?}", other) 1093 } 1094 Ok(()) 1095 }).wait(); 1096 } 1097 1098 #[test] 1099 fn test_conn_body_finish_read_eof() { 1100 let _: Result<(), ()> = future::lazy(|| { 1101 let io = AsyncIo::new_eof(); 1102 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); 1103 conn.state.busy(); 1104 conn.state.writing = Writing::KeepAlive; 1105 conn.state.reading = Reading::Body(Decoder::length(0)); 1106 1107 match conn.poll() { 1108 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (), 1109 other => panic!("unexpected frame: {:?}", other) 1110 } 1111 1112 // conn eofs, but tokio-proto will call poll() again, before calling flush() 1113 // the conn eof in this case is perfectly fine 1114 1115 match conn.poll() { 1116 Ok(Async::Ready(None)) => (), 1117 other => panic!("unexpected frame: {:?}", other) 1118 } 1119 Ok(()) 1120 }).wait(); 1121 } 1122 1123 #[test] 1124 fn test_conn_message_empty_body_read_eof() { 1125 let _: Result<(), ()> = future::lazy(|| { 1126 let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024); 1127 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); 1128 conn.state.busy(); 1129 conn.state.writing = Writing::KeepAlive; 1130 1131 match conn.poll() { 1132 Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (), 1133 other => panic!("unexpected frame: {:?}", other) 1134 } 1135 1136 // conn eofs, but tokio-proto will call poll() again, before calling flush() 1137 // the conn eof in this case is perfectly fine 1138 1139 match conn.poll() { 1140 Ok(Async::Ready(None)) => (), 1141 other => panic!("unexpected frame: {:?}", other) 1142 } 1143 Ok(()) 1144 }).wait(); 1145 } 1146 1147 #[test] 1148 fn test_conn_read_body_end() { 1149 let _: Result<(), ()> = future::lazy(|| { 1150 let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024); 1151 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1152 conn.state.busy(); 1153 1154 match conn.poll() { 1155 Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (), 1156 other => panic!("unexpected frame: {:?}", other) 1157 } 1158 1159 match conn.poll() { 1160 Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (), 1161 other => panic!("unexpected frame: {:?}", other) 1162 } 1163 1164 // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None` 1165 match conn.poll() { 1166 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (), 1167 other => panic!("unexpected frame: {:?}", other) 1168 } 1169 1170 match conn.poll() { 1171 Ok(Async::NotReady) => (), 1172 other => panic!("unexpected frame: {:?}", other) 1173 } 1174 Ok(()) 1175 }).wait(); 1176 } 1177 1178 #[test] 1179 fn test_conn_closed_read() { 1180 let io = AsyncIo::new_buf(vec![], 0); 1181 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1182 conn.state.close(); 1183 1184 match conn.poll().unwrap() { 1185 Async::Ready(None) => {}, 1186 other => panic!("frame is not None: {:?}", other) 1187 } 1188 } 1189 1190 #[test] 1191 fn test_conn_body_write_length() { 1192 let _ = pretty_env_logger::try_init(); 1193 let _: Result<(), ()> = future::lazy(|| { 1194 let io = AsyncIo::new_buf(vec![], 0); 1195 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1196 let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096; 1197 conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64)); 1198 1199 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready()); 1200 assert!(!conn.can_buffer_body()); 1201 1202 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready()); 1203 1204 conn.io.io_mut().block_in(1024 * 3); 1205 assert!(conn.poll_complete().unwrap().is_not_ready()); 1206 conn.io.io_mut().block_in(1024 * 3); 1207 assert!(conn.poll_complete().unwrap().is_not_ready()); 1208 conn.io.io_mut().block_in(max * 2); 1209 assert!(conn.poll_complete().unwrap().is_ready()); 1210 1211 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready()); 1212 Ok(()) 1213 }).wait(); 1214 } 1215 1216 #[test] 1217 fn test_conn_body_write_chunked() { 1218 let _: Result<(), ()> = future::lazy(|| { 1219 let io = AsyncIo::new_buf(vec![], 4096); 1220 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1221 conn.state.writing = Writing::Body(Encoder::chunked()); 1222 1223 assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready()); 1224 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready()); 1225 Ok(()) 1226 }).wait(); 1227 } 1228 1229 #[test] 1230 fn test_conn_body_flush() { 1231 let _: Result<(), ()> = future::lazy(|| { 1232 let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5); 1233 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1234 conn.state.writing = Writing::Body(Encoder::length(1024 * 1024)); 1235 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready()); 1236 assert!(!conn.can_buffer_body()); 1237 conn.io.io_mut().block_in(1024 * 1024 * 5); 1238 assert!(conn.poll_complete().unwrap().is_ready()); 1239 assert!(conn.can_buffer_body()); 1240 assert!(conn.io.io_mut().flushed()); 1241 1242 Ok(()) 1243 }).wait(); 1244 } 1245 1246 #[test] 1247 fn test_conn_parking() { 1248 use std::sync::Arc; 1249 use futures::executor::Notify; 1250 use futures::executor::NotifyHandle; 1251 1252 struct Car { 1253 permit: bool, 1254 } 1255 impl Notify for Car { 1256 fn notify(&self, _id: usize) { 1257 assert!(self.permit, "unparked without permit"); 1258 } 1259 } 1260 1261 fn car(permit: bool) -> NotifyHandle { 1262 Arc::new(Car { 1263 permit: permit, 1264 }).into() 1265 } 1266 1267 // test that once writing is done, unparks 1268 let f = future::lazy(|| { 1269 let io = AsyncIo::new_buf(vec![], 4096); 1270 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1271 conn.state.reading = Reading::KeepAlive; 1272 assert!(conn.poll().unwrap().is_not_ready()); 1273 1274 conn.state.writing = Writing::KeepAlive; 1275 assert!(conn.poll_complete().unwrap().is_ready()); 1276 Ok::<(), ()>(()) 1277 }); 1278 ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap(); 1279 1280 1281 // test that flushing when not waiting on read doesn't unpark 1282 let f = future::lazy(|| { 1283 let io = AsyncIo::new_buf(vec![], 4096); 1284 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1285 conn.state.writing = Writing::KeepAlive; 1286 assert!(conn.poll_complete().unwrap().is_ready()); 1287 Ok::<(), ()>(()) 1288 }); 1289 ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap(); 1290 1291 1292 // test that flushing and writing isn't done doesn't unpark 1293 let f = future::lazy(|| { 1294 let io = AsyncIo::new_buf(vec![], 4096); 1295 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1296 conn.state.reading = Reading::KeepAlive; 1297 assert!(conn.poll().unwrap().is_not_ready()); 1298 conn.state.writing = Writing::Body(Encoder::length(5_000)); 1299 assert!(conn.poll_complete().unwrap().is_ready()); 1300 Ok::<(), ()>(()) 1301 }); 1302 ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap(); 1303 } 1304 1305 #[test] 1306 fn test_conn_closed_write() { 1307 let io = AsyncIo::new_buf(vec![], 0); 1308 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1309 conn.state.close(); 1310 1311 match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) { 1312 Err(_e) => {}, 1313 other => panic!("did not return Err: {:?}", other) 1314 } 1315 1316 assert!(conn.state.is_write_closed()); 1317 } 1318 1319 #[test] 1320 fn test_conn_write_empty_chunk() { 1321 let io = AsyncIo::new_buf(vec![], 0); 1322 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1323 conn.state.writing = Writing::KeepAlive; 1324 1325 assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready()); 1326 assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready()); 1327 conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err(); 1328 } 1329 */ 1330 } 1331