1 use std::fmt; 2 use std::io::{self}; 3 use std::marker::PhantomData; 4 5 use bytes::{Buf, Bytes}; 6 use http::header::{HeaderValue, CONNECTION}; 7 use http::{HeaderMap, Method, Version}; 8 use tokio::io::{AsyncRead, AsyncWrite}; 9 10 use super::io::Buffered; 11 use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants}; 12 use crate::common::{task, Pin, Poll, Unpin}; 13 use crate::headers::connection_keep_alive; 14 use crate::proto::{BodyLength, DecodedLength, MessageHead}; 15 16 const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; 17 18 /// This handles a connection, which will have been established over an 19 /// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple 20 /// `Transaction`s over HTTP. 21 /// 22 /// The connection will determine when a message begins and ends as well as 23 /// determine if this connection can be kept alive after the message, 24 /// or if it is complete. 25 pub(crate) struct Conn<I, B, T> { 26 io: Buffered<I, EncodedBuf<B>>, 27 state: State, 28 _marker: PhantomData<fn(T)>, 29 } 30 31 impl<I, B, T> Conn<I, B, T> 32 where 33 I: AsyncRead + AsyncWrite + Unpin, 34 B: Buf, 35 T: Http1Transaction, 36 { 37 pub fn new(io: I) -> Conn<I, B, T> { 38 Conn { 39 io: Buffered::new(io), 40 state: State { 41 allow_half_close: false, 42 cached_headers: None, 43 error: None, 44 keep_alive: KA::Busy, 45 method: None, 46 title_case_headers: false, 47 notify_read: false, 48 reading: Reading::Init, 49 writing: Writing::Init, 50 upgrade: None, 51 // We assume a modern world where the remote speaks HTTP/1.1. 52 // If they tell us otherwise, we'll downgrade in `read_head`. 53 version: Version::HTTP_11, 54 }, 55 _marker: PhantomData, 56 } 57 } 58 59 pub fn set_flush_pipeline(&mut self, enabled: bool) { 60 self.io.set_flush_pipeline(enabled); 61 } 62 63 pub fn set_max_buf_size(&mut self, max: usize) { start_txns(void * e)64 self.io.set_max_buf_size(max); 65 } 66 67 pub fn set_read_buf_exact_size(&mut self, sz: usize) { 68 self.io.set_read_buf_exact_size(sz); 69 } 70 71 pub fn set_write_strategy_flatten(&mut self) { 72 self.io.set_write_strategy_flatten(); 73 } 74 75 pub fn set_title_case_headers(&mut self) { 76 self.state.title_case_headers = true; 77 } 78 79 pub(crate) fn set_allow_half_close(&mut self) { 80 self.state.allow_half_close = true; 81 } start_checkpoints(void)82 83 pub fn into_inner(self) -> (I, Bytes) { 84 self.io.into_inner() 85 } 86 87 pub fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> { 88 self.state.upgrade.take() 89 } 90 91 pub fn is_read_closed(&self) -> bool { 92 self.state.is_read_closed() test_main(int argc,char * const argv[])93 } 94 95 pub fn is_write_closed(&self) -> bool { 96 self.state.is_write_closed() 97 } 98 99 pub fn can_read_head(&self) -> bool { 100 match self.state.reading { 101 Reading::Init => { 102 if T::should_read_first() { 103 true 104 } else { 105 match self.state.writing { 106 Writing::Init => false, 107 _ => true, 108 } 109 } 110 } 111 _ => false, 112 } 113 } 114 115 pub fn can_read_body(&self) -> bool { 116 match self.state.reading { 117 Reading::Body(..) | Reading::Continue(..) => true, 118 _ => false, 119 } 120 } 121 122 fn should_error_on_eof(&self) -> bool { 123 // If we're idle, it's probably just the connection closing gracefully. 124 T::should_error_on_parse_eof() && !self.state.is_idle() 125 } 126 127 fn has_h2_prefix(&self) -> bool { 128 let read_buf = self.io.read_buf(); 129 read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE 130 } 131 132 pub(super) fn poll_read_head( 133 &mut self, 134 cx: &mut task::Context<'_>, 135 ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> { 136 debug_assert!(self.can_read_head()); 137 trace!("Conn::read_head"); 138 139 let msg = match ready!(self.io.parse::<T>( 140 cx, 141 ParseContext { 142 cached_headers: &mut self.state.cached_headers, 143 req_method: &mut self.state.method, 144 } 145 )) { 146 Ok(msg) => msg, 147 Err(e) => return self.on_read_head_error(e), 148 }; 149 150 // Note: don't deconstruct `msg` into local variables, it appears 151 // the optimizer doesn't remove the extra copies. 152 153 debug!("incoming body is {}", msg.decode); 154 155 self.state.busy(); 156 self.state.keep_alive &= msg.keep_alive; 157 self.state.version = msg.head.version; 158 159 let mut wants = if msg.wants_upgrade { 160 Wants::UPGRADE 161 } else { 162 Wants::EMPTY 163 }; 164 165 if msg.decode == DecodedLength::ZERO { 166 if msg.expect_continue { 167 debug!("ignoring expect-continue since body is empty"); 168 } 169 self.state.reading = Reading::KeepAlive; 170 if !T::should_read_first() { 171 self.try_keep_alive(cx); 172 } 173 } else if msg.expect_continue { 174 self.state.reading = Reading::Continue(Decoder::new(msg.decode)); 175 wants = wants.add(Wants::EXPECT); 176 } else { 177 self.state.reading = Reading::Body(Decoder::new(msg.decode)); 178 } 179 180 Poll::Ready(Some(Ok((msg.head, msg.decode, wants)))) 181 } 182 183 fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> { 184 // If we are currently waiting on a message, then an empty 185 // message should be reported as an error. If not, it is just 186 // the connection closing gracefully. 187 let must_error = self.should_error_on_eof(); 188 self.close_read(); 189 self.io.consume_leading_lines(); 190 let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty(); 191 if was_mid_parse || must_error { 192 // We check if the buf contains the h2 Preface 193 debug!( 194 "parse error ({}) with {} bytes", 195 e, 196 self.io.read_buf().len() 197 ); 198 match self.on_parse_error(e) { 199 Ok(()) => Poll::Pending, // XXX: wat? 200 Err(e) => Poll::Ready(Some(Err(e))), 201 } 202 } else { 203 debug!("read eof"); 204 self.close_write(); 205 Poll::Ready(None) 206 } 207 } 208 209 pub fn poll_read_body( 210 &mut self, 211 cx: &mut task::Context<'_>, 212 ) -> Poll<Option<io::Result<Bytes>>> { 213 debug_assert!(self.can_read_body()); 214 215 let (reading, ret) = match self.state.reading { 216 Reading::Body(ref mut decoder) => { 217 match decoder.decode(cx, &mut self.io) { 218 Poll::Ready(Ok(slice)) => { 219 let (reading, chunk) = if decoder.is_eof() { 220 debug!("incoming body completed"); 221 ( 222 Reading::KeepAlive, 223 if !slice.is_empty() { 224 Some(Ok(slice)) 225 } else { 226 None 227 }, 228 ) 229 } else if slice.is_empty() { 230 error!("incoming body unexpectedly ended"); 231 // This should be unreachable, since all 3 decoders 232 // either set eof=true or return an Err when reading 233 // an empty slice... 234 (Reading::Closed, None) 235 } else { 236 return Poll::Ready(Some(Ok(slice))); 237 }; 238 (reading, Poll::Ready(chunk)) 239 } 240 Poll::Pending => return Poll::Pending, 241 Poll::Ready(Err(e)) => { 242 debug!("incoming body decode error: {}", e); 243 (Reading::Closed, Poll::Ready(Some(Err(e)))) 244 } 245 } 246 } 247 Reading::Continue(ref decoder) => { 248 // Write the 100 Continue if not already responded... 249 if let Writing::Init = self.state.writing { 250 trace!("automatically sending 100 Continue"); 251 let cont = b"HTTP/1.1 100 Continue\r\n\r\n"; 252 self.io.headers_buf().extend_from_slice(cont); 253 } 254 255 // And now recurse once in the Reading::Body state... 256 self.state.reading = Reading::Body(decoder.clone()); 257 return self.poll_read_body(cx); 258 } 259 _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading), 260 }; 261 262 self.state.reading = reading; 263 self.try_keep_alive(cx); 264 ret 265 } 266 267 pub fn wants_read_again(&mut self) -> bool { 268 let ret = self.state.notify_read; 269 self.state.notify_read = false; 270 ret 271 } 272 273 pub fn poll_read_keep_alive(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 274 debug_assert!(!self.can_read_head() && !self.can_read_body()); 275 276 if self.is_read_closed() { 277 Poll::Pending 278 } else if self.is_mid_message() { 279 self.mid_message_detect_eof(cx) 280 } else { 281 self.require_empty_read(cx) 282 } 283 } 284 285 fn is_mid_message(&self) -> bool { 286 match (&self.state.reading, &self.state.writing) { 287 (&Reading::Init, &Writing::Init) => false, 288 _ => true, 289 } 290 } 291 292 // This will check to make sure the io object read is empty. 293 // 294 // This should only be called for Clients wanting to enter the idle 295 // state. 296 fn require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 297 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); 298 debug_assert!(!self.is_mid_message()); 299 debug_assert!(T::is_client()); 300 301 if !self.io.read_buf().is_empty() { 302 debug!("received an unexpected {} bytes", self.io.read_buf().len()); 303 return Poll::Ready(Err(crate::Error::new_unexpected_message())); 304 } 305 306 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?; 307 308 if num_read == 0 { 309 let ret = if self.should_error_on_eof() { 310 trace!("found unexpected EOF on busy connection: {:?}", self.state); 311 Poll::Ready(Err(crate::Error::new_incomplete())) 312 } else { 313 trace!("found EOF on idle connection, closing"); 314 Poll::Ready(Ok(())) 315 }; 316 317 // order is important: should_error needs state BEFORE close_read 318 self.state.close_read(); 319 return ret; 320 } 321 322 debug!( 323 "received unexpected {} bytes on an idle connection", 324 num_read 325 ); 326 Poll::Ready(Err(crate::Error::new_unexpected_message())) 327 } 328 329 fn mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 330 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); 331 debug_assert!(self.is_mid_message()); 332 333 if self.state.allow_half_close || !self.io.read_buf().is_empty() { 334 return Poll::Pending; 335 } 336 337 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?; 338 339 if num_read == 0 { 340 trace!("found unexpected EOF on busy connection: {:?}", self.state); 341 self.state.close_read(); 342 Poll::Ready(Err(crate::Error::new_incomplete())) 343 } else { 344 Poll::Ready(Ok(())) 345 } 346 } 347 348 fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>> { 349 debug_assert!(!self.state.is_read_closed()); 350 351 let result = ready!(self.io.poll_read_from_io(cx)); 352 Poll::Ready(result.map_err(|e| { 353 trace!("force_io_read; io error = {:?}", e); 354 self.state.close(); 355 e 356 })) 357 } 358 359 fn maybe_notify(&mut self, cx: &mut task::Context<'_>) { 360 // its possible that we returned NotReady from poll() without having 361 // exhausted the underlying Io. We would have done this when we 362 // determined we couldn't keep reading until we knew how writing 363 // would finish. 364 365 match self.state.reading { 366 Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => { 367 return 368 } 369 Reading::Init => (), 370 }; 371 372 match self.state.writing { 373 Writing::Body(..) => return, 374 Writing::Init | Writing::KeepAlive | Writing::Closed => (), 375 } 376 377 if !self.io.is_read_blocked() { 378 if self.io.read_buf().is_empty() { 379 match self.io.poll_read_from_io(cx) { 380 Poll::Ready(Ok(n)) => { 381 if n == 0 { 382 trace!("maybe_notify; read eof"); 383 if self.state.is_idle() { 384 self.state.close(); 385 } else { 386 self.close_read() 387 } 388 return; 389 } 390 } 391 Poll::Pending => { 392 trace!("maybe_notify; read_from_io blocked"); 393 return; 394 } 395 Poll::Ready(Err(e)) => { 396 trace!("maybe_notify; read_from_io error: {}", e); 397 self.state.close(); 398 self.state.error = Some(crate::Error::new_io(e)); 399 } 400 } 401 } 402 self.state.notify_read = true; 403 } 404 } 405 406 fn try_keep_alive(&mut self, cx: &mut task::Context<'_>) { 407 self.state.try_keep_alive::<T>(); 408 self.maybe_notify(cx); 409 } 410 411 pub fn can_write_head(&self) -> bool { 412 if !T::should_read_first() { 413 if let Reading::Closed = self.state.reading { 414 return false; 415 } 416 } 417 match self.state.writing { 418 Writing::Init => true, 419 _ => false, 420 } 421 } 422 423 pub fn can_write_body(&self) -> bool { 424 match self.state.writing { 425 Writing::Body(..) => true, 426 Writing::Init | Writing::KeepAlive | Writing::Closed => false, 427 } 428 } 429 430 pub fn can_buffer_body(&self) -> bool { 431 self.io.can_buffer() 432 } 433 434 pub fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) { 435 if let Some(encoder) = self.encode_head(head, body) { 436 self.state.writing = if !encoder.is_eof() { 437 Writing::Body(encoder) 438 } else if encoder.is_last() { 439 Writing::Closed 440 } else { 441 Writing::KeepAlive 442 }; 443 } 444 } 445 446 pub fn write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B) { 447 if let Some(encoder) = 448 self.encode_head(head, Some(BodyLength::Known(body.remaining() as u64))) 449 { 450 let is_last = encoder.is_last(); 451 // Make sure we don't write a body if we weren't actually allowed 452 // to do so, like because its a HEAD request. 453 if !encoder.is_eof() { 454 encoder.danger_full_buf(body, self.io.write_buf()); 455 } 456 self.state.writing = if is_last { 457 Writing::Closed 458 } else { 459 Writing::KeepAlive 460 } 461 } 462 } 463 464 fn encode_head( 465 &mut self, 466 mut head: MessageHead<T::Outgoing>, 467 body: Option<BodyLength>, 468 ) -> Option<Encoder> { 469 debug_assert!(self.can_write_head()); 470 471 if !T::should_read_first() { 472 self.state.busy(); 473 } 474 475 self.enforce_version(&mut head); 476 477 let buf = self.io.headers_buf(); 478 match T::encode( 479 Encode { 480 head: &mut head, 481 body, 482 keep_alive: self.state.wants_keep_alive(), 483 req_method: &mut self.state.method, 484 title_case_headers: self.state.title_case_headers, 485 }, 486 buf, 487 ) { 488 Ok(encoder) => { 489 debug_assert!(self.state.cached_headers.is_none()); 490 debug_assert!(head.headers.is_empty()); 491 self.state.cached_headers = Some(head.headers); 492 Some(encoder) 493 } 494 Err(err) => { 495 self.state.error = Some(err); 496 self.state.writing = Writing::Closed; 497 None 498 } 499 } 500 } 501 502 // Fix keep-alives when Connection: keep-alive header is not present 503 fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) { 504 let outgoing_is_keep_alive = head 505 .headers 506 .get(CONNECTION) 507 .map(connection_keep_alive) 508 .unwrap_or(false); 509 510 if !outgoing_is_keep_alive { 511 match head.version { 512 // If response is version 1.0 and keep-alive is not present in the response, 513 // disable keep-alive so the server closes the connection 514 Version::HTTP_10 => self.state.disable_keep_alive(), 515 // If response is version 1.1 and keep-alive is wanted, add 516 // Connection: keep-alive header when not present 517 Version::HTTP_11 => { 518 if self.state.wants_keep_alive() { 519 head.headers 520 .insert(CONNECTION, HeaderValue::from_static("keep-alive")); 521 } 522 } 523 _ => (), 524 } 525 } 526 } 527 528 // If we know the remote speaks an older version, we try to fix up any messages 529 // to work with our older peer. 530 fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) { 531 if let Version::HTTP_10 = self.state.version { 532 // Fixes response or connection when keep-alive header is not present 533 self.fix_keep_alive(head); 534 // If the remote only knows HTTP/1.0, we should force ourselves 535 // to do only speak HTTP/1.0 as well. 536 head.version = Version::HTTP_10; 537 } 538 // If the remote speaks HTTP/1.1, then it *should* be fine with 539 // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let 540 // the user's headers be. 541 } 542 543 pub fn write_body(&mut self, chunk: B) { 544 debug_assert!(self.can_write_body() && self.can_buffer_body()); 545 // empty chunks should be discarded at Dispatcher level 546 debug_assert!(chunk.remaining() != 0); 547 548 let state = match self.state.writing { 549 Writing::Body(ref mut encoder) => { 550 self.io.buffer(encoder.encode(chunk)); 551 552 if encoder.is_eof() { 553 if encoder.is_last() { 554 Writing::Closed 555 } else { 556 Writing::KeepAlive 557 } 558 } else { 559 return; 560 } 561 } 562 _ => unreachable!("write_body invalid state: {:?}", self.state.writing), 563 }; 564 565 self.state.writing = state; 566 } 567 568 pub fn write_body_and_end(&mut self, chunk: B) { 569 debug_assert!(self.can_write_body() && self.can_buffer_body()); 570 // empty chunks should be discarded at Dispatcher level 571 debug_assert!(chunk.remaining() != 0); 572 573 let state = match self.state.writing { 574 Writing::Body(ref encoder) => { 575 let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf()); 576 if can_keep_alive { 577 Writing::KeepAlive 578 } else { 579 Writing::Closed 580 } 581 } 582 _ => unreachable!("write_body invalid state: {:?}", self.state.writing), 583 }; 584 585 self.state.writing = state; 586 } 587 588 pub fn end_body(&mut self) { 589 debug_assert!(self.can_write_body()); 590 591 let state = match self.state.writing { 592 Writing::Body(ref mut encoder) => { 593 // end of stream, that means we should try to eof 594 match encoder.end() { 595 Ok(end) => { 596 if let Some(end) = end { 597 self.io.buffer(end); 598 } 599 if encoder.is_last() { 600 Writing::Closed 601 } else { 602 Writing::KeepAlive 603 } 604 } 605 Err(_not_eof) => Writing::Closed, 606 } 607 } 608 _ => return, 609 }; 610 611 self.state.writing = state; 612 } 613 614 // When we get a parse error, depending on what side we are, we might be able 615 // to write a response before closing the connection. 616 // 617 // - Client: there is nothing we can do 618 // - Server: if Response hasn't been written yet, we can send a 4xx response 619 fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> { 620 if let Writing::Init = self.state.writing { 621 if self.has_h2_prefix() { 622 return Err(crate::Error::new_version_h2()); 623 } 624 if let Some(msg) = T::on_error(&err) { 625 // Drop the cached headers so as to not trigger a debug 626 // assert in `write_head`... 627 self.state.cached_headers.take(); 628 self.write_head(msg, None); 629 self.state.error = Some(err); 630 return Ok(()); 631 } 632 } 633 634 // fallback is pass the error back up 635 Err(err) 636 } 637 638 pub fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { 639 ready!(Pin::new(&mut self.io).poll_flush(cx))?; 640 self.try_keep_alive(cx); 641 trace!("flushed({}): {:?}", T::LOG, self.state); 642 Poll::Ready(Ok(())) 643 } 644 645 pub fn poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { 646 match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) { 647 Ok(()) => { 648 trace!("shut down IO complete"); 649 Poll::Ready(Ok(())) 650 } 651 Err(e) => { 652 debug!("error shutting down IO: {}", e); 653 Poll::Ready(Err(e)) 654 } 655 } 656 } 657 658 pub fn close_read(&mut self) { 659 self.state.close_read(); 660 } 661 662 pub fn close_write(&mut self) { 663 self.state.close_write(); 664 } 665 666 pub fn disable_keep_alive(&mut self) { 667 if self.state.is_idle() { 668 trace!("disable_keep_alive; closing idle connection"); 669 self.state.close(); 670 } else { 671 trace!("disable_keep_alive; in-progress connection"); 672 self.state.disable_keep_alive(); 673 } 674 } 675 676 pub fn take_error(&mut self) -> crate::Result<()> { 677 if let Some(err) = self.state.error.take() { 678 Err(err) 679 } else { 680 Ok(()) 681 } 682 } 683 684 pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade { 685 trace!("{}: prepare possible HTTP upgrade", T::LOG); 686 self.state.prepare_upgrade() 687 } 688 } 689 690 impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> { 691 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 692 f.debug_struct("Conn") 693 .field("state", &self.state) 694 .field("io", &self.io) 695 .finish() 696 } 697 } 698 699 // B and T are never pinned 700 impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {} 701 702 struct State { 703 allow_half_close: bool, 704 /// Re-usable HeaderMap to reduce allocating new ones. 705 cached_headers: Option<HeaderMap>, 706 /// If an error occurs when there wasn't a direct way to return it 707 /// back to the user, this is set. 708 error: Option<crate::Error>, 709 /// Current keep-alive status. 710 keep_alive: KA, 711 /// If mid-message, the HTTP Method that started it. 712 /// 713 /// This is used to know things such as if the message can include 714 /// a body or not. 715 method: Option<Method>, 716 title_case_headers: bool, 717 /// Set to true when the Dispatcher should poll read operations 718 /// again. See the `maybe_notify` method for more. 719 notify_read: bool, 720 /// State of allowed reads 721 reading: Reading, 722 /// State of allowed writes 723 writing: Writing, 724 /// An expected pending HTTP upgrade. 725 upgrade: Option<crate::upgrade::Pending>, 726 /// Either HTTP/1.0 or 1.1 connection 727 version: Version, 728 } 729 730 #[derive(Debug)] 731 enum Reading { 732 Init, 733 Continue(Decoder), 734 Body(Decoder), 735 KeepAlive, 736 Closed, 737 } 738 739 enum Writing { 740 Init, 741 Body(Encoder), 742 KeepAlive, 743 Closed, 744 } 745 746 impl fmt::Debug for State { 747 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 748 let mut builder = f.debug_struct("State"); 749 builder 750 .field("reading", &self.reading) 751 .field("writing", &self.writing) 752 .field("keep_alive", &self.keep_alive); 753 754 // Only show error field if it's interesting... 755 if let Some(ref error) = self.error { 756 builder.field("error", error); 757 } 758 759 if self.allow_half_close { 760 builder.field("allow_half_close", &true); 761 } 762 763 // Purposefully leaving off other fields.. 764 765 builder.finish() 766 } 767 } 768 769 impl fmt::Debug for Writing { 770 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 771 match *self { 772 Writing::Init => f.write_str("Init"), 773 Writing::Body(ref enc) => f.debug_tuple("Body").field(enc).finish(), 774 Writing::KeepAlive => f.write_str("KeepAlive"), 775 Writing::Closed => f.write_str("Closed"), 776 } 777 } 778 } 779 780 impl std::ops::BitAndAssign<bool> for KA { 781 fn bitand_assign(&mut self, enabled: bool) { 782 if !enabled { 783 trace!("remote disabling keep-alive"); 784 *self = KA::Disabled; 785 } 786 } 787 } 788 789 #[derive(Clone, Copy, Debug)] 790 enum KA { 791 Idle, 792 Busy, 793 Disabled, 794 } 795 796 impl Default for KA { 797 fn default() -> KA { 798 KA::Busy 799 } 800 } 801 802 impl KA { 803 fn idle(&mut self) { 804 *self = KA::Idle; 805 } 806 807 fn busy(&mut self) { 808 *self = KA::Busy; 809 } 810 811 fn disable(&mut self) { 812 *self = KA::Disabled; 813 } 814 815 fn status(&self) -> KA { 816 *self 817 } 818 } 819 820 impl State { 821 fn close(&mut self) { 822 trace!("State::close()"); 823 self.reading = Reading::Closed; 824 self.writing = Writing::Closed; 825 self.keep_alive.disable(); 826 } 827 828 fn close_read(&mut self) { 829 trace!("State::close_read()"); 830 self.reading = Reading::Closed; 831 self.keep_alive.disable(); 832 } 833 834 fn close_write(&mut self) { 835 trace!("State::close_write()"); 836 self.writing = Writing::Closed; 837 self.keep_alive.disable(); 838 } 839 840 fn wants_keep_alive(&self) -> bool { 841 if let KA::Disabled = self.keep_alive.status() { 842 false 843 } else { 844 true 845 } 846 } 847 848 fn try_keep_alive<T: Http1Transaction>(&mut self) { 849 match (&self.reading, &self.writing) { 850 (&Reading::KeepAlive, &Writing::KeepAlive) => { 851 if let KA::Busy = self.keep_alive.status() { 852 self.idle::<T>(); 853 } else { 854 trace!( 855 "try_keep_alive({}): could keep-alive, but status = {:?}", 856 T::LOG, 857 self.keep_alive 858 ); 859 self.close(); 860 } 861 } 862 (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => { 863 self.close() 864 } 865 _ => (), 866 } 867 } 868 869 fn disable_keep_alive(&mut self) { 870 self.keep_alive.disable() 871 } 872 873 fn busy(&mut self) { 874 if let KA::Disabled = self.keep_alive.status() { 875 return; 876 } 877 self.keep_alive.busy(); 878 } 879 880 fn idle<T: Http1Transaction>(&mut self) { 881 debug_assert!(!self.is_idle(), "State::idle() called while idle"); 882 883 self.method = None; 884 self.keep_alive.idle(); 885 if self.is_idle() { 886 self.reading = Reading::Init; 887 self.writing = Writing::Init; 888 889 // !T::should_read_first() means Client. 890 // 891 // If Client connection has just gone idle, the Dispatcher 892 // should try the poll loop one more time, so as to poll the 893 // pending requests stream. 894 if !T::should_read_first() { 895 self.notify_read = true; 896 } 897 } else { 898 self.close(); 899 } 900 } 901 902 fn is_idle(&self) -> bool { 903 if let KA::Idle = self.keep_alive.status() { 904 true 905 } else { 906 false 907 } 908 } 909 910 fn is_read_closed(&self) -> bool { 911 match self.reading { 912 Reading::Closed => true, 913 _ => false, 914 } 915 } 916 917 fn is_write_closed(&self) -> bool { 918 match self.writing { 919 Writing::Closed => true, 920 _ => false, 921 } 922 } 923 924 fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade { 925 let (tx, rx) = crate::upgrade::pending(); 926 self.upgrade = Some(tx); 927 rx 928 } 929 } 930 931 #[cfg(test)] 932 mod tests { 933 #[cfg(feature = "nightly")] 934 #[bench] 935 fn bench_read_head_short(b: &mut ::test::Bencher) { 936 use super::*; 937 let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n"; 938 let len = s.len(); 939 b.bytes = len as u64; 940 941 // an empty IO, we'll be skipping and using the read buffer anyways 942 let io = tokio_test::io::Builder::new().build(); 943 let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io); 944 *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]); 945 conn.state.cached_headers = Some(HeaderMap::with_capacity(2)); 946 947 let mut rt = tokio::runtime::Builder::new() 948 .enable_all() 949 .basic_scheduler() 950 .build() 951 .unwrap(); 952 953 b.iter(|| { 954 rt.block_on(futures_util::future::poll_fn(|cx| { 955 match conn.poll_read_head(cx) { 956 Poll::Ready(Some(Ok(x))) => { 957 ::test::black_box(&x); 958 let mut headers = x.0.headers; 959 headers.clear(); 960 conn.state.cached_headers = Some(headers); 961 } 962 f => panic!("expected Ready(Some(Ok(..))): {:?}", f), 963 } 964 965 conn.io.read_buf_mut().reserve(1); 966 unsafe { 967 conn.io.read_buf_mut().set_len(len); 968 } 969 conn.state.reading = Reading::Init; 970 Poll::Ready(()) 971 })); 972 }); 973 } 974 975 /* 976 //TODO: rewrite these using dispatch... someday... 977 use futures::{Async, Future, Stream, Sink}; 978 use futures::future; 979 980 use proto::{self, ClientTransaction, MessageHead, ServerTransaction}; 981 use super::super::Encoder; 982 use mock::AsyncIo; 983 984 use super::{Conn, Decoder, Reading, Writing}; 985 use ::uri::Uri; 986 987 use std::str::FromStr; 988 989 #[test] 990 fn test_conn_init_read() { 991 let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec(); 992 let len = good_message.len(); 993 let io = AsyncIo::new_buf(good_message, len); 994 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 995 996 match conn.poll().unwrap() { 997 Async::Ready(Some(Frame::Message { message, body: false })) => { 998 assert_eq!(message, MessageHead { 999 subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()), 1000 .. MessageHead::default() 1001 }) 1002 }, 1003 f => panic!("frame is not Frame::Message: {:?}", f) 1004 } 1005 } 1006 1007 #[test] 1008 fn test_conn_parse_partial() { 1009 let _: Result<(), ()> = future::lazy(|| { 1010 let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec(); 1011 let io = AsyncIo::new_buf(good_message, 10); 1012 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1013 assert!(conn.poll().unwrap().is_not_ready()); 1014 conn.io.io_mut().block_in(50); 1015 let async = conn.poll().unwrap(); 1016 assert!(async.is_ready()); 1017 match async { 1018 Async::Ready(Some(Frame::Message { .. })) => (), 1019 f => panic!("frame is not Message: {:?}", f), 1020 } 1021 Ok(()) 1022 }).wait(); 1023 } 1024 1025 #[test] 1026 fn test_conn_init_read_eof_idle() { 1027 let io = AsyncIo::new_buf(vec![], 1); 1028 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1029 conn.state.idle(); 1030 1031 match conn.poll().unwrap() { 1032 Async::Ready(None) => {}, 1033 other => panic!("frame is not None: {:?}", other) 1034 } 1035 } 1036 1037 #[test] 1038 fn test_conn_init_read_eof_idle_partial_parse() { 1039 let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100); 1040 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1041 conn.state.idle(); 1042 1043 match conn.poll() { 1044 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {}, 1045 other => panic!("unexpected frame: {:?}", other) 1046 } 1047 } 1048 1049 #[test] 1050 fn test_conn_init_read_eof_busy() { 1051 let _: Result<(), ()> = future::lazy(|| { 1052 // server ignores 1053 let io = AsyncIo::new_eof(); 1054 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1055 conn.state.busy(); 1056 1057 match conn.poll().unwrap() { 1058 Async::Ready(None) => {}, 1059 other => panic!("unexpected frame: {:?}", other) 1060 } 1061 1062 // client 1063 let io = AsyncIo::new_eof(); 1064 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); 1065 conn.state.busy(); 1066 1067 match conn.poll() { 1068 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {}, 1069 other => panic!("unexpected frame: {:?}", other) 1070 } 1071 Ok(()) 1072 }).wait(); 1073 } 1074 1075 #[test] 1076 fn test_conn_body_finish_read_eof() { 1077 let _: Result<(), ()> = future::lazy(|| { 1078 let io = AsyncIo::new_eof(); 1079 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); 1080 conn.state.busy(); 1081 conn.state.writing = Writing::KeepAlive; 1082 conn.state.reading = Reading::Body(Decoder::length(0)); 1083 1084 match conn.poll() { 1085 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (), 1086 other => panic!("unexpected frame: {:?}", other) 1087 } 1088 1089 // conn eofs, but tokio-proto will call poll() again, before calling flush() 1090 // the conn eof in this case is perfectly fine 1091 1092 match conn.poll() { 1093 Ok(Async::Ready(None)) => (), 1094 other => panic!("unexpected frame: {:?}", other) 1095 } 1096 Ok(()) 1097 }).wait(); 1098 } 1099 1100 #[test] 1101 fn test_conn_message_empty_body_read_eof() { 1102 let _: Result<(), ()> = future::lazy(|| { 1103 let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024); 1104 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); 1105 conn.state.busy(); 1106 conn.state.writing = Writing::KeepAlive; 1107 1108 match conn.poll() { 1109 Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (), 1110 other => panic!("unexpected frame: {:?}", other) 1111 } 1112 1113 // conn eofs, but tokio-proto will call poll() again, before calling flush() 1114 // the conn eof in this case is perfectly fine 1115 1116 match conn.poll() { 1117 Ok(Async::Ready(None)) => (), 1118 other => panic!("unexpected frame: {:?}", other) 1119 } 1120 Ok(()) 1121 }).wait(); 1122 } 1123 1124 #[test] 1125 fn test_conn_read_body_end() { 1126 let _: Result<(), ()> = future::lazy(|| { 1127 let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024); 1128 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1129 conn.state.busy(); 1130 1131 match conn.poll() { 1132 Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (), 1133 other => panic!("unexpected frame: {:?}", other) 1134 } 1135 1136 match conn.poll() { 1137 Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (), 1138 other => panic!("unexpected frame: {:?}", other) 1139 } 1140 1141 // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None` 1142 match conn.poll() { 1143 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (), 1144 other => panic!("unexpected frame: {:?}", other) 1145 } 1146 1147 match conn.poll() { 1148 Ok(Async::NotReady) => (), 1149 other => panic!("unexpected frame: {:?}", other) 1150 } 1151 Ok(()) 1152 }).wait(); 1153 } 1154 1155 #[test] 1156 fn test_conn_closed_read() { 1157 let io = AsyncIo::new_buf(vec![], 0); 1158 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1159 conn.state.close(); 1160 1161 match conn.poll().unwrap() { 1162 Async::Ready(None) => {}, 1163 other => panic!("frame is not None: {:?}", other) 1164 } 1165 } 1166 1167 #[test] 1168 fn test_conn_body_write_length() { 1169 let _ = pretty_env_logger::try_init(); 1170 let _: Result<(), ()> = future::lazy(|| { 1171 let io = AsyncIo::new_buf(vec![], 0); 1172 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1173 let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096; 1174 conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64)); 1175 1176 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready()); 1177 assert!(!conn.can_buffer_body()); 1178 1179 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready()); 1180 1181 conn.io.io_mut().block_in(1024 * 3); 1182 assert!(conn.poll_complete().unwrap().is_not_ready()); 1183 conn.io.io_mut().block_in(1024 * 3); 1184 assert!(conn.poll_complete().unwrap().is_not_ready()); 1185 conn.io.io_mut().block_in(max * 2); 1186 assert!(conn.poll_complete().unwrap().is_ready()); 1187 1188 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready()); 1189 Ok(()) 1190 }).wait(); 1191 } 1192 1193 #[test] 1194 fn test_conn_body_write_chunked() { 1195 let _: Result<(), ()> = future::lazy(|| { 1196 let io = AsyncIo::new_buf(vec![], 4096); 1197 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1198 conn.state.writing = Writing::Body(Encoder::chunked()); 1199 1200 assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready()); 1201 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready()); 1202 Ok(()) 1203 }).wait(); 1204 } 1205 1206 #[test] 1207 fn test_conn_body_flush() { 1208 let _: Result<(), ()> = future::lazy(|| { 1209 let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5); 1210 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1211 conn.state.writing = Writing::Body(Encoder::length(1024 * 1024)); 1212 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready()); 1213 assert!(!conn.can_buffer_body()); 1214 conn.io.io_mut().block_in(1024 * 1024 * 5); 1215 assert!(conn.poll_complete().unwrap().is_ready()); 1216 assert!(conn.can_buffer_body()); 1217 assert!(conn.io.io_mut().flushed()); 1218 1219 Ok(()) 1220 }).wait(); 1221 } 1222 1223 #[test] 1224 fn test_conn_parking() { 1225 use std::sync::Arc; 1226 use futures::executor::Notify; 1227 use futures::executor::NotifyHandle; 1228 1229 struct Car { 1230 permit: bool, 1231 } 1232 impl Notify for Car { 1233 fn notify(&self, _id: usize) { 1234 assert!(self.permit, "unparked without permit"); 1235 } 1236 } 1237 1238 fn car(permit: bool) -> NotifyHandle { 1239 Arc::new(Car { 1240 permit: permit, 1241 }).into() 1242 } 1243 1244 // test that once writing is done, unparks 1245 let f = future::lazy(|| { 1246 let io = AsyncIo::new_buf(vec![], 4096); 1247 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1248 conn.state.reading = Reading::KeepAlive; 1249 assert!(conn.poll().unwrap().is_not_ready()); 1250 1251 conn.state.writing = Writing::KeepAlive; 1252 assert!(conn.poll_complete().unwrap().is_ready()); 1253 Ok::<(), ()>(()) 1254 }); 1255 ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap(); 1256 1257 1258 // test that flushing when not waiting on read doesn't unpark 1259 let f = future::lazy(|| { 1260 let io = AsyncIo::new_buf(vec![], 4096); 1261 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1262 conn.state.writing = Writing::KeepAlive; 1263 assert!(conn.poll_complete().unwrap().is_ready()); 1264 Ok::<(), ()>(()) 1265 }); 1266 ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap(); 1267 1268 1269 // test that flushing and writing isn't done doesn't unpark 1270 let f = future::lazy(|| { 1271 let io = AsyncIo::new_buf(vec![], 4096); 1272 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1273 conn.state.reading = Reading::KeepAlive; 1274 assert!(conn.poll().unwrap().is_not_ready()); 1275 conn.state.writing = Writing::Body(Encoder::length(5_000)); 1276 assert!(conn.poll_complete().unwrap().is_ready()); 1277 Ok::<(), ()>(()) 1278 }); 1279 ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap(); 1280 } 1281 1282 #[test] 1283 fn test_conn_closed_write() { 1284 let io = AsyncIo::new_buf(vec![], 0); 1285 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1286 conn.state.close(); 1287 1288 match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) { 1289 Err(_e) => {}, 1290 other => panic!("did not return Err: {:?}", other) 1291 } 1292 1293 assert!(conn.state.is_write_closed()); 1294 } 1295 1296 #[test] 1297 fn test_conn_write_empty_chunk() { 1298 let io = AsyncIo::new_buf(vec![], 0); 1299 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1300 conn.state.writing = Writing::KeepAlive; 1301 1302 assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready()); 1303 assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready()); 1304 conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err(); 1305 } 1306 */ 1307 } 1308