1 use std::fmt; 2 use std::io::{self}; 3 use std::marker::PhantomData; 4 5 use bytes::{Buf, Bytes}; 6 use http::header::{HeaderValue, CONNECTION}; 7 use http::{HeaderMap, Method, Version}; 8 use tokio::io::{AsyncRead, AsyncWrite}; 9 10 use super::io::Buffered; 11 use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants}; 12 use crate::common::{task, Pin, Poll, Unpin}; 13 use crate::headers::connection_keep_alive; 14 use crate::proto::{BodyLength, DecodedLength, MessageHead}; 15 16 const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; 17 18 /// This handles a connection, which will have been established over an 19 /// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple 20 /// `Transaction`s over HTTP. 21 /// 22 /// The connection will determine when a message begins and ends as well as 23 /// determine if this connection can be kept alive after the message, 24 /// or if it is complete. 25 pub(crate) struct Conn<I, B, T> { 26 io: Buffered<I, EncodedBuf<B>>, 27 state: State, 28 _marker: PhantomData<fn(T)>, 29 } 30 31 impl<I, B, T> Conn<I, B, T> 32 where 33 I: AsyncRead + AsyncWrite + Unpin, 34 B: Buf, 35 T: Http1Transaction, 36 { new(io: I) -> Conn<I, B, T>37 pub fn new(io: I) -> Conn<I, B, T> { 38 Conn { 39 io: Buffered::new(io), 40 state: State { 41 allow_half_close: false, 42 cached_headers: None, 43 error: None, 44 keep_alive: KA::Busy, 45 method: None, 46 title_case_headers: false, 47 notify_read: false, 48 reading: Reading::Init, 49 writing: Writing::Init, 50 upgrade: None, 51 // We assume a modern world where the remote speaks HTTP/1.1. 52 // If they tell us otherwise, we'll downgrade in `read_head`. 53 version: Version::HTTP_11, 54 }, 55 _marker: PhantomData, 56 } 57 } 58 set_flush_pipeline(&mut self, enabled: bool)59 pub fn set_flush_pipeline(&mut self, enabled: bool) { 60 self.io.set_flush_pipeline(enabled); 61 } 62 set_max_buf_size(&mut self, max: usize)63 pub fn set_max_buf_size(&mut self, max: usize) { 64 self.io.set_max_buf_size(max); 65 } 66 set_read_buf_exact_size(&mut self, sz: usize)67 pub fn set_read_buf_exact_size(&mut self, sz: usize) { 68 self.io.set_read_buf_exact_size(sz); 69 } 70 set_write_strategy_flatten(&mut self)71 pub fn set_write_strategy_flatten(&mut self) { 72 self.io.set_write_strategy_flatten(); 73 } 74 set_title_case_headers(&mut self)75 pub fn set_title_case_headers(&mut self) { 76 self.state.title_case_headers = true; 77 } 78 set_allow_half_close(&mut self)79 pub(crate) fn set_allow_half_close(&mut self) { 80 self.state.allow_half_close = true; 81 } 82 into_inner(self) -> (I, Bytes)83 pub fn into_inner(self) -> (I, Bytes) { 84 self.io.into_inner() 85 } 86 pending_upgrade(&mut self) -> Option<crate::upgrade::Pending>87 pub fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> { 88 self.state.upgrade.take() 89 } 90 is_read_closed(&self) -> bool91 pub fn is_read_closed(&self) -> bool { 92 self.state.is_read_closed() 93 } 94 is_write_closed(&self) -> bool95 pub fn is_write_closed(&self) -> bool { 96 self.state.is_write_closed() 97 } 98 can_read_head(&self) -> bool99 pub fn can_read_head(&self) -> bool { 100 match self.state.reading { 101 Reading::Init => { 102 if T::should_read_first() { 103 true 104 } else { 105 match self.state.writing { 106 Writing::Init => false, 107 _ => true, 108 } 109 } 110 } 111 _ => false, 112 } 113 } 114 can_read_body(&self) -> bool115 pub fn can_read_body(&self) -> bool { 116 match self.state.reading { 117 Reading::Body(..) | Reading::Continue(..) => true, 118 _ => false, 119 } 120 } 121 should_error_on_eof(&self) -> bool122 fn should_error_on_eof(&self) -> bool { 123 // If we're idle, it's probably just the connection closing gracefully. 124 T::should_error_on_parse_eof() && !self.state.is_idle() 125 } 126 has_h2_prefix(&self) -> bool127 fn has_h2_prefix(&self) -> bool { 128 let read_buf = self.io.read_buf(); 129 read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE 130 } 131 poll_read_head( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>>132 pub(super) fn poll_read_head( 133 &mut self, 134 cx: &mut task::Context<'_>, 135 ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> { 136 debug_assert!(self.can_read_head()); 137 trace!("Conn::read_head"); 138 139 let msg = match ready!(self.io.parse::<T>( 140 cx, 141 ParseContext { 142 cached_headers: &mut self.state.cached_headers, 143 req_method: &mut self.state.method, 144 } 145 )) { 146 Ok(msg) => msg, 147 Err(e) => return self.on_read_head_error(e), 148 }; 149 150 // Note: don't deconstruct `msg` into local variables, it appears 151 // the optimizer doesn't remove the extra copies. 152 153 debug!("incoming body is {}", msg.decode); 154 155 self.state.busy(); 156 self.state.keep_alive &= msg.keep_alive; 157 self.state.version = msg.head.version; 158 159 let mut wants = if msg.wants_upgrade { 160 Wants::UPGRADE 161 } else { 162 Wants::EMPTY 163 }; 164 165 if msg.decode == DecodedLength::ZERO { 166 if msg.expect_continue { 167 debug!("ignoring expect-continue since body is empty"); 168 } 169 self.state.reading = Reading::KeepAlive; 170 if !T::should_read_first() { 171 self.try_keep_alive(cx); 172 } 173 } else if msg.expect_continue { 174 self.state.reading = Reading::Continue(Decoder::new(msg.decode)); 175 wants = wants.add(Wants::EXPECT); 176 } else { 177 self.state.reading = Reading::Body(Decoder::new(msg.decode)); 178 } 179 180 Poll::Ready(Some(Ok((msg.head, msg.decode, wants)))) 181 } 182 on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>>183 fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> { 184 // If we are currently waiting on a message, then an empty 185 // message should be reported as an error. If not, it is just 186 // the connection closing gracefully. 187 let must_error = self.should_error_on_eof(); 188 self.close_read(); 189 self.io.consume_leading_lines(); 190 let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty(); 191 if was_mid_parse || must_error { 192 // We check if the buf contains the h2 Preface 193 debug!( 194 "parse error ({}) with {} bytes", 195 e, 196 self.io.read_buf().len() 197 ); 198 match self.on_parse_error(e) { 199 Ok(()) => Poll::Pending, // XXX: wat? 200 Err(e) => Poll::Ready(Some(Err(e))), 201 } 202 } else { 203 debug!("read eof"); 204 self.close_write(); 205 Poll::Ready(None) 206 } 207 } 208 poll_read_body( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<io::Result<Bytes>>>209 pub fn poll_read_body( 210 &mut self, 211 cx: &mut task::Context<'_>, 212 ) -> Poll<Option<io::Result<Bytes>>> { 213 debug_assert!(self.can_read_body()); 214 215 let (reading, ret) = match self.state.reading { 216 Reading::Body(ref mut decoder) => { 217 match decoder.decode(cx, &mut self.io) { 218 Poll::Ready(Ok(slice)) => { 219 let (reading, chunk) = if decoder.is_eof() { 220 debug!("incoming body completed"); 221 ( 222 Reading::KeepAlive, 223 if !slice.is_empty() { 224 Some(Ok(slice)) 225 } else { 226 None 227 }, 228 ) 229 } else if slice.is_empty() { 230 error!("incoming body unexpectedly ended"); 231 // This should be unreachable, since all 3 decoders 232 // either set eof=true or return an Err when reading 233 // an empty slice... 234 (Reading::Closed, None) 235 } else { 236 return Poll::Ready(Some(Ok(slice))); 237 }; 238 (reading, Poll::Ready(chunk)) 239 } 240 Poll::Pending => return Poll::Pending, 241 Poll::Ready(Err(e)) => { 242 debug!("incoming body decode error: {}", e); 243 (Reading::Closed, Poll::Ready(Some(Err(e)))) 244 } 245 } 246 } 247 Reading::Continue(ref decoder) => { 248 // Write the 100 Continue if not already responded... 249 if let Writing::Init = self.state.writing { 250 trace!("automatically sending 100 Continue"); 251 let cont = b"HTTP/1.1 100 Continue\r\n\r\n"; 252 self.io.headers_buf().extend_from_slice(cont); 253 } 254 255 // And now recurse once in the Reading::Body state... 256 self.state.reading = Reading::Body(decoder.clone()); 257 return self.poll_read_body(cx); 258 } 259 _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading), 260 }; 261 262 self.state.reading = reading; 263 self.try_keep_alive(cx); 264 ret 265 } 266 wants_read_again(&mut self) -> bool267 pub fn wants_read_again(&mut self) -> bool { 268 let ret = self.state.notify_read; 269 self.state.notify_read = false; 270 ret 271 } 272 poll_read_keep_alive(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>273 pub fn poll_read_keep_alive(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 274 debug_assert!(!self.can_read_head() && !self.can_read_body()); 275 276 if self.is_read_closed() { 277 Poll::Pending 278 } else if self.is_mid_message() { 279 self.mid_message_detect_eof(cx) 280 } else { 281 self.require_empty_read(cx) 282 } 283 } 284 is_mid_message(&self) -> bool285 fn is_mid_message(&self) -> bool { 286 match (&self.state.reading, &self.state.writing) { 287 (&Reading::Init, &Writing::Init) => false, 288 _ => true, 289 } 290 } 291 292 // This will check to make sure the io object read is empty. 293 // 294 // This should only be called for Clients wanting to enter the idle 295 // state. require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>296 fn require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 297 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); 298 debug_assert!(!self.is_mid_message()); 299 debug_assert!(T::is_client()); 300 301 if !self.io.read_buf().is_empty() { 302 debug!("received an unexpected {} bytes", self.io.read_buf().len()); 303 return Poll::Ready(Err(crate::Error::new_unexpected_message())); 304 } 305 306 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?; 307 308 if num_read == 0 { 309 let ret = if self.should_error_on_eof() { 310 trace!("found unexpected EOF on busy connection: {:?}", self.state); 311 Poll::Ready(Err(crate::Error::new_incomplete())) 312 } else { 313 trace!("found EOF on idle connection, closing"); 314 Poll::Ready(Ok(())) 315 }; 316 317 // order is important: should_error needs state BEFORE close_read 318 self.state.close_read(); 319 return ret; 320 } 321 322 debug!( 323 "received unexpected {} bytes on an idle connection", 324 num_read 325 ); 326 Poll::Ready(Err(crate::Error::new_unexpected_message())) 327 } 328 mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>329 fn mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 330 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); 331 debug_assert!(self.is_mid_message()); 332 333 if self.state.allow_half_close || !self.io.read_buf().is_empty() { 334 return Poll::Pending; 335 } 336 337 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?; 338 339 if num_read == 0 { 340 trace!("found unexpected EOF on busy connection: {:?}", self.state); 341 self.state.close_read(); 342 Poll::Ready(Err(crate::Error::new_incomplete())) 343 } else { 344 Poll::Ready(Ok(())) 345 } 346 } 347 force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>>348 fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>> { 349 debug_assert!(!self.state.is_read_closed()); 350 351 let result = ready!(self.io.poll_read_from_io(cx)); 352 Poll::Ready(result.map_err(|e| { 353 trace!("force_io_read; io error = {:?}", e); 354 self.state.close(); 355 e 356 })) 357 } 358 maybe_notify(&mut self, cx: &mut task::Context<'_>)359 fn maybe_notify(&mut self, cx: &mut task::Context<'_>) { 360 // its possible that we returned NotReady from poll() without having 361 // exhausted the underlying Io. We would have done this when we 362 // determined we couldn't keep reading until we knew how writing 363 // would finish. 364 365 match self.state.reading { 366 Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => { 367 return 368 } 369 Reading::Init => (), 370 }; 371 372 match self.state.writing { 373 Writing::Body(..) => return, 374 Writing::Init | Writing::KeepAlive | Writing::Closed => (), 375 } 376 377 if !self.io.is_read_blocked() { 378 if self.io.read_buf().is_empty() { 379 match self.io.poll_read_from_io(cx) { 380 Poll::Ready(Ok(n)) => { 381 if n == 0 { 382 trace!("maybe_notify; read eof"); 383 if self.state.is_idle() { 384 self.state.close(); 385 } else { 386 self.close_read() 387 } 388 return; 389 } 390 } 391 Poll::Pending => { 392 trace!("maybe_notify; read_from_io blocked"); 393 return; 394 } 395 Poll::Ready(Err(e)) => { 396 trace!("maybe_notify; read_from_io error: {}", e); 397 self.state.close(); 398 self.state.error = Some(crate::Error::new_io(e)); 399 } 400 } 401 } 402 self.state.notify_read = true; 403 } 404 } 405 try_keep_alive(&mut self, cx: &mut task::Context<'_>)406 fn try_keep_alive(&mut self, cx: &mut task::Context<'_>) { 407 self.state.try_keep_alive::<T>(); 408 self.maybe_notify(cx); 409 } 410 can_write_head(&self) -> bool411 pub fn can_write_head(&self) -> bool { 412 if !T::should_read_first() { 413 if let Reading::Closed = self.state.reading { 414 return false; 415 } 416 } 417 match self.state.writing { 418 Writing::Init => true, 419 _ => false, 420 } 421 } 422 can_write_body(&self) -> bool423 pub fn can_write_body(&self) -> bool { 424 match self.state.writing { 425 Writing::Body(..) => true, 426 Writing::Init | Writing::KeepAlive | Writing::Closed => false, 427 } 428 } 429 can_buffer_body(&self) -> bool430 pub fn can_buffer_body(&self) -> bool { 431 self.io.can_buffer() 432 } 433 write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>)434 pub fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) { 435 if let Some(encoder) = self.encode_head(head, body) { 436 self.state.writing = if !encoder.is_eof() { 437 Writing::Body(encoder) 438 } else if encoder.is_last() { 439 Writing::Closed 440 } else { 441 Writing::KeepAlive 442 }; 443 } 444 } 445 write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B)446 pub fn write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B) { 447 if let Some(encoder) = 448 self.encode_head(head, Some(BodyLength::Known(body.remaining() as u64))) 449 { 450 let is_last = encoder.is_last(); 451 // Make sure we don't write a body if we weren't actually allowed 452 // to do so, like because its a HEAD request. 453 if !encoder.is_eof() { 454 encoder.danger_full_buf(body, self.io.write_buf()); 455 } 456 self.state.writing = if is_last { 457 Writing::Closed 458 } else { 459 Writing::KeepAlive 460 } 461 } 462 } 463 encode_head( &mut self, mut head: MessageHead<T::Outgoing>, body: Option<BodyLength>, ) -> Option<Encoder>464 fn encode_head( 465 &mut self, 466 mut head: MessageHead<T::Outgoing>, 467 body: Option<BodyLength>, 468 ) -> Option<Encoder> { 469 debug_assert!(self.can_write_head()); 470 471 if !T::should_read_first() { 472 self.state.busy(); 473 } 474 475 self.enforce_version(&mut head); 476 477 let buf = self.io.headers_buf(); 478 match T::encode( 479 Encode { 480 head: &mut head, 481 body, 482 keep_alive: self.state.wants_keep_alive(), 483 req_method: &mut self.state.method, 484 title_case_headers: self.state.title_case_headers, 485 }, 486 buf, 487 ) { 488 Ok(encoder) => { 489 debug_assert!(self.state.cached_headers.is_none()); 490 debug_assert!(head.headers.is_empty()); 491 self.state.cached_headers = Some(head.headers); 492 Some(encoder) 493 } 494 Err(err) => { 495 self.state.error = Some(err); 496 self.state.writing = Writing::Closed; 497 None 498 } 499 } 500 } 501 502 // Fix keep-alives when Connection: keep-alive header is not present fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>)503 fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) { 504 let outgoing_is_keep_alive = head 505 .headers 506 .get(CONNECTION) 507 .map(connection_keep_alive) 508 .unwrap_or(false); 509 510 if !outgoing_is_keep_alive { 511 match head.version { 512 // If response is version 1.0 and keep-alive is not present in the response, 513 // disable keep-alive so the server closes the connection 514 Version::HTTP_10 => self.state.disable_keep_alive(), 515 // If response is version 1.1 and keep-alive is wanted, add 516 // Connection: keep-alive header when not present 517 Version::HTTP_11 => { 518 if self.state.wants_keep_alive() { 519 head.headers 520 .insert(CONNECTION, HeaderValue::from_static("keep-alive")); 521 } 522 } 523 _ => (), 524 } 525 } 526 } 527 528 // If we know the remote speaks an older version, we try to fix up any messages 529 // to work with our older peer. enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>)530 fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) { 531 if let Version::HTTP_10 = self.state.version { 532 // Fixes response or connection when keep-alive header is not present 533 self.fix_keep_alive(head); 534 // If the remote only knows HTTP/1.0, we should force ourselves 535 // to do only speak HTTP/1.0 as well. 536 head.version = Version::HTTP_10; 537 } 538 // If the remote speaks HTTP/1.1, then it *should* be fine with 539 // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let 540 // the user's headers be. 541 } 542 write_body(&mut self, chunk: B)543 pub fn write_body(&mut self, chunk: B) { 544 debug_assert!(self.can_write_body() && self.can_buffer_body()); 545 // empty chunks should be discarded at Dispatcher level 546 debug_assert!(chunk.remaining() != 0); 547 548 let state = match self.state.writing { 549 Writing::Body(ref mut encoder) => { 550 self.io.buffer(encoder.encode(chunk)); 551 552 if encoder.is_eof() { 553 if encoder.is_last() { 554 Writing::Closed 555 } else { 556 Writing::KeepAlive 557 } 558 } else { 559 return; 560 } 561 } 562 _ => unreachable!("write_body invalid state: {:?}", self.state.writing), 563 }; 564 565 self.state.writing = state; 566 } 567 write_body_and_end(&mut self, chunk: B)568 pub fn write_body_and_end(&mut self, chunk: B) { 569 debug_assert!(self.can_write_body() && self.can_buffer_body()); 570 // empty chunks should be discarded at Dispatcher level 571 debug_assert!(chunk.remaining() != 0); 572 573 let state = match self.state.writing { 574 Writing::Body(ref encoder) => { 575 let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf()); 576 if can_keep_alive { 577 Writing::KeepAlive 578 } else { 579 Writing::Closed 580 } 581 } 582 _ => unreachable!("write_body invalid state: {:?}", self.state.writing), 583 }; 584 585 self.state.writing = state; 586 } 587 end_body(&mut self)588 pub fn end_body(&mut self) { 589 debug_assert!(self.can_write_body()); 590 591 let state = match self.state.writing { 592 Writing::Body(ref mut encoder) => { 593 // end of stream, that means we should try to eof 594 match encoder.end() { 595 Ok(end) => { 596 if let Some(end) = end { 597 self.io.buffer(end); 598 } 599 if encoder.is_last() { 600 Writing::Closed 601 } else { 602 Writing::KeepAlive 603 } 604 } 605 Err(_not_eof) => Writing::Closed, 606 } 607 } 608 _ => return, 609 }; 610 611 self.state.writing = state; 612 } 613 614 // When we get a parse error, depending on what side we are, we might be able 615 // to write a response before closing the connection. 616 // 617 // - Client: there is nothing we can do 618 // - Server: if Response hasn't been written yet, we can send a 4xx response on_parse_error(&mut self, err: crate::Error) -> crate::Result<()>619 fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> { 620 if let Writing::Init = self.state.writing { 621 if self.has_h2_prefix() { 622 return Err(crate::Error::new_version_h2()); 623 } 624 if let Some(msg) = T::on_error(&err) { 625 // Drop the cached headers so as to not trigger a debug 626 // assert in `write_head`... 627 self.state.cached_headers.take(); 628 self.write_head(msg, None); 629 self.state.error = Some(err); 630 return Ok(()); 631 } 632 } 633 634 // fallback is pass the error back up 635 Err(err) 636 } 637 poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>>638 pub fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { 639 ready!(Pin::new(&mut self.io).poll_flush(cx))?; 640 self.try_keep_alive(cx); 641 trace!("flushed({}): {:?}", T::LOG, self.state); 642 Poll::Ready(Ok(())) 643 } 644 poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>>645 pub fn poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { 646 match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) { 647 Ok(()) => { 648 trace!("shut down IO complete"); 649 Poll::Ready(Ok(())) 650 } 651 Err(e) => { 652 debug!("error shutting down IO: {}", e); 653 Poll::Ready(Err(e)) 654 } 655 } 656 } 657 658 /// If the read side can be cheaply drained, do so. Otherwise, close. poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>)659 pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>) { 660 let _ = self.poll_read_body(cx); 661 662 // If still in Reading::Body, just give up 663 match self.state.reading { 664 Reading::Init | Reading::KeepAlive => { 665 trace!("body drained"); 666 return; 667 } 668 _ => self.close_read(), 669 } 670 } 671 close_read(&mut self)672 pub fn close_read(&mut self) { 673 self.state.close_read(); 674 } 675 close_write(&mut self)676 pub fn close_write(&mut self) { 677 self.state.close_write(); 678 } 679 disable_keep_alive(&mut self)680 pub fn disable_keep_alive(&mut self) { 681 if self.state.is_idle() { 682 trace!("disable_keep_alive; closing idle connection"); 683 self.state.close(); 684 } else { 685 trace!("disable_keep_alive; in-progress connection"); 686 self.state.disable_keep_alive(); 687 } 688 } 689 take_error(&mut self) -> crate::Result<()>690 pub fn take_error(&mut self) -> crate::Result<()> { 691 if let Some(err) = self.state.error.take() { 692 Err(err) 693 } else { 694 Ok(()) 695 } 696 } 697 on_upgrade(&mut self) -> crate::upgrade::OnUpgrade698 pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade { 699 trace!("{}: prepare possible HTTP upgrade", T::LOG); 700 self.state.prepare_upgrade() 701 } 702 } 703 704 impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result705 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 706 f.debug_struct("Conn") 707 .field("state", &self.state) 708 .field("io", &self.io) 709 .finish() 710 } 711 } 712 713 // B and T are never pinned 714 impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {} 715 716 struct State { 717 allow_half_close: bool, 718 /// Re-usable HeaderMap to reduce allocating new ones. 719 cached_headers: Option<HeaderMap>, 720 /// If an error occurs when there wasn't a direct way to return it 721 /// back to the user, this is set. 722 error: Option<crate::Error>, 723 /// Current keep-alive status. 724 keep_alive: KA, 725 /// If mid-message, the HTTP Method that started it. 726 /// 727 /// This is used to know things such as if the message can include 728 /// a body or not. 729 method: Option<Method>, 730 title_case_headers: bool, 731 /// Set to true when the Dispatcher should poll read operations 732 /// again. See the `maybe_notify` method for more. 733 notify_read: bool, 734 /// State of allowed reads 735 reading: Reading, 736 /// State of allowed writes 737 writing: Writing, 738 /// An expected pending HTTP upgrade. 739 upgrade: Option<crate::upgrade::Pending>, 740 /// Either HTTP/1.0 or 1.1 connection 741 version: Version, 742 } 743 744 #[derive(Debug)] 745 enum Reading { 746 Init, 747 Continue(Decoder), 748 Body(Decoder), 749 KeepAlive, 750 Closed, 751 } 752 753 enum Writing { 754 Init, 755 Body(Encoder), 756 KeepAlive, 757 Closed, 758 } 759 760 impl fmt::Debug for State { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result761 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 762 let mut builder = f.debug_struct("State"); 763 builder 764 .field("reading", &self.reading) 765 .field("writing", &self.writing) 766 .field("keep_alive", &self.keep_alive); 767 768 // Only show error field if it's interesting... 769 if let Some(ref error) = self.error { 770 builder.field("error", error); 771 } 772 773 if self.allow_half_close { 774 builder.field("allow_half_close", &true); 775 } 776 777 // Purposefully leaving off other fields.. 778 779 builder.finish() 780 } 781 } 782 783 impl fmt::Debug for Writing { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result784 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 785 match *self { 786 Writing::Init => f.write_str("Init"), 787 Writing::Body(ref enc) => f.debug_tuple("Body").field(enc).finish(), 788 Writing::KeepAlive => f.write_str("KeepAlive"), 789 Writing::Closed => f.write_str("Closed"), 790 } 791 } 792 } 793 794 impl std::ops::BitAndAssign<bool> for KA { bitand_assign(&mut self, enabled: bool)795 fn bitand_assign(&mut self, enabled: bool) { 796 if !enabled { 797 trace!("remote disabling keep-alive"); 798 *self = KA::Disabled; 799 } 800 } 801 } 802 803 #[derive(Clone, Copy, Debug)] 804 enum KA { 805 Idle, 806 Busy, 807 Disabled, 808 } 809 810 impl Default for KA { default() -> KA811 fn default() -> KA { 812 KA::Busy 813 } 814 } 815 816 impl KA { idle(&mut self)817 fn idle(&mut self) { 818 *self = KA::Idle; 819 } 820 busy(&mut self)821 fn busy(&mut self) { 822 *self = KA::Busy; 823 } 824 disable(&mut self)825 fn disable(&mut self) { 826 *self = KA::Disabled; 827 } 828 status(&self) -> KA829 fn status(&self) -> KA { 830 *self 831 } 832 } 833 834 impl State { close(&mut self)835 fn close(&mut self) { 836 trace!("State::close()"); 837 self.reading = Reading::Closed; 838 self.writing = Writing::Closed; 839 self.keep_alive.disable(); 840 } 841 close_read(&mut self)842 fn close_read(&mut self) { 843 trace!("State::close_read()"); 844 self.reading = Reading::Closed; 845 self.keep_alive.disable(); 846 } 847 close_write(&mut self)848 fn close_write(&mut self) { 849 trace!("State::close_write()"); 850 self.writing = Writing::Closed; 851 self.keep_alive.disable(); 852 } 853 wants_keep_alive(&self) -> bool854 fn wants_keep_alive(&self) -> bool { 855 if let KA::Disabled = self.keep_alive.status() { 856 false 857 } else { 858 true 859 } 860 } 861 try_keep_alive<T: Http1Transaction>(&mut self)862 fn try_keep_alive<T: Http1Transaction>(&mut self) { 863 match (&self.reading, &self.writing) { 864 (&Reading::KeepAlive, &Writing::KeepAlive) => { 865 if let KA::Busy = self.keep_alive.status() { 866 self.idle::<T>(); 867 } else { 868 trace!( 869 "try_keep_alive({}): could keep-alive, but status = {:?}", 870 T::LOG, 871 self.keep_alive 872 ); 873 self.close(); 874 } 875 } 876 (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => { 877 self.close() 878 } 879 _ => (), 880 } 881 } 882 disable_keep_alive(&mut self)883 fn disable_keep_alive(&mut self) { 884 self.keep_alive.disable() 885 } 886 busy(&mut self)887 fn busy(&mut self) { 888 if let KA::Disabled = self.keep_alive.status() { 889 return; 890 } 891 self.keep_alive.busy(); 892 } 893 idle<T: Http1Transaction>(&mut self)894 fn idle<T: Http1Transaction>(&mut self) { 895 debug_assert!(!self.is_idle(), "State::idle() called while idle"); 896 897 self.method = None; 898 self.keep_alive.idle(); 899 if self.is_idle() { 900 self.reading = Reading::Init; 901 self.writing = Writing::Init; 902 903 // !T::should_read_first() means Client. 904 // 905 // If Client connection has just gone idle, the Dispatcher 906 // should try the poll loop one more time, so as to poll the 907 // pending requests stream. 908 if !T::should_read_first() { 909 self.notify_read = true; 910 } 911 } else { 912 self.close(); 913 } 914 } 915 is_idle(&self) -> bool916 fn is_idle(&self) -> bool { 917 if let KA::Idle = self.keep_alive.status() { 918 true 919 } else { 920 false 921 } 922 } 923 is_read_closed(&self) -> bool924 fn is_read_closed(&self) -> bool { 925 match self.reading { 926 Reading::Closed => true, 927 _ => false, 928 } 929 } 930 is_write_closed(&self) -> bool931 fn is_write_closed(&self) -> bool { 932 match self.writing { 933 Writing::Closed => true, 934 _ => false, 935 } 936 } 937 prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade938 fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade { 939 let (tx, rx) = crate::upgrade::pending(); 940 self.upgrade = Some(tx); 941 rx 942 } 943 } 944 945 #[cfg(test)] 946 mod tests { 947 #[cfg(feature = "nightly")] 948 #[bench] bench_read_head_short(b: &mut ::test::Bencher)949 fn bench_read_head_short(b: &mut ::test::Bencher) { 950 use super::*; 951 let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n"; 952 let len = s.len(); 953 b.bytes = len as u64; 954 955 // an empty IO, we'll be skipping and using the read buffer anyways 956 let io = tokio_test::io::Builder::new().build(); 957 let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io); 958 *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]); 959 conn.state.cached_headers = Some(HeaderMap::with_capacity(2)); 960 961 let mut rt = tokio::runtime::Builder::new() 962 .enable_all() 963 .basic_scheduler() 964 .build() 965 .unwrap(); 966 967 b.iter(|| { 968 rt.block_on(futures_util::future::poll_fn(|cx| { 969 match conn.poll_read_head(cx) { 970 Poll::Ready(Some(Ok(x))) => { 971 ::test::black_box(&x); 972 let mut headers = x.0.headers; 973 headers.clear(); 974 conn.state.cached_headers = Some(headers); 975 } 976 f => panic!("expected Ready(Some(Ok(..))): {:?}", f), 977 } 978 979 conn.io.read_buf_mut().reserve(1); 980 unsafe { 981 conn.io.read_buf_mut().set_len(len); 982 } 983 conn.state.reading = Reading::Init; 984 Poll::Ready(()) 985 })); 986 }); 987 } 988 989 /* 990 //TODO: rewrite these using dispatch... someday... 991 use futures::{Async, Future, Stream, Sink}; 992 use futures::future; 993 994 use proto::{self, ClientTransaction, MessageHead, ServerTransaction}; 995 use super::super::Encoder; 996 use mock::AsyncIo; 997 998 use super::{Conn, Decoder, Reading, Writing}; 999 use ::uri::Uri; 1000 1001 use std::str::FromStr; 1002 1003 #[test] 1004 fn test_conn_init_read() { 1005 let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec(); 1006 let len = good_message.len(); 1007 let io = AsyncIo::new_buf(good_message, len); 1008 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1009 1010 match conn.poll().unwrap() { 1011 Async::Ready(Some(Frame::Message { message, body: false })) => { 1012 assert_eq!(message, MessageHead { 1013 subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()), 1014 .. MessageHead::default() 1015 }) 1016 }, 1017 f => panic!("frame is not Frame::Message: {:?}", f) 1018 } 1019 } 1020 1021 #[test] 1022 fn test_conn_parse_partial() { 1023 let _: Result<(), ()> = future::lazy(|| { 1024 let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec(); 1025 let io = AsyncIo::new_buf(good_message, 10); 1026 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1027 assert!(conn.poll().unwrap().is_not_ready()); 1028 conn.io.io_mut().block_in(50); 1029 let async = conn.poll().unwrap(); 1030 assert!(async.is_ready()); 1031 match async { 1032 Async::Ready(Some(Frame::Message { .. })) => (), 1033 f => panic!("frame is not Message: {:?}", f), 1034 } 1035 Ok(()) 1036 }).wait(); 1037 } 1038 1039 #[test] 1040 fn test_conn_init_read_eof_idle() { 1041 let io = AsyncIo::new_buf(vec![], 1); 1042 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1043 conn.state.idle(); 1044 1045 match conn.poll().unwrap() { 1046 Async::Ready(None) => {}, 1047 other => panic!("frame is not None: {:?}", other) 1048 } 1049 } 1050 1051 #[test] 1052 fn test_conn_init_read_eof_idle_partial_parse() { 1053 let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100); 1054 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1055 conn.state.idle(); 1056 1057 match conn.poll() { 1058 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {}, 1059 other => panic!("unexpected frame: {:?}", other) 1060 } 1061 } 1062 1063 #[test] 1064 fn test_conn_init_read_eof_busy() { 1065 let _: Result<(), ()> = future::lazy(|| { 1066 // server ignores 1067 let io = AsyncIo::new_eof(); 1068 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1069 conn.state.busy(); 1070 1071 match conn.poll().unwrap() { 1072 Async::Ready(None) => {}, 1073 other => panic!("unexpected frame: {:?}", other) 1074 } 1075 1076 // client 1077 let io = AsyncIo::new_eof(); 1078 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); 1079 conn.state.busy(); 1080 1081 match conn.poll() { 1082 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {}, 1083 other => panic!("unexpected frame: {:?}", other) 1084 } 1085 Ok(()) 1086 }).wait(); 1087 } 1088 1089 #[test] 1090 fn test_conn_body_finish_read_eof() { 1091 let _: Result<(), ()> = future::lazy(|| { 1092 let io = AsyncIo::new_eof(); 1093 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); 1094 conn.state.busy(); 1095 conn.state.writing = Writing::KeepAlive; 1096 conn.state.reading = Reading::Body(Decoder::length(0)); 1097 1098 match conn.poll() { 1099 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (), 1100 other => panic!("unexpected frame: {:?}", other) 1101 } 1102 1103 // conn eofs, but tokio-proto will call poll() again, before calling flush() 1104 // the conn eof in this case is perfectly fine 1105 1106 match conn.poll() { 1107 Ok(Async::Ready(None)) => (), 1108 other => panic!("unexpected frame: {:?}", other) 1109 } 1110 Ok(()) 1111 }).wait(); 1112 } 1113 1114 #[test] 1115 fn test_conn_message_empty_body_read_eof() { 1116 let _: Result<(), ()> = future::lazy(|| { 1117 let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024); 1118 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); 1119 conn.state.busy(); 1120 conn.state.writing = Writing::KeepAlive; 1121 1122 match conn.poll() { 1123 Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (), 1124 other => panic!("unexpected frame: {:?}", other) 1125 } 1126 1127 // conn eofs, but tokio-proto will call poll() again, before calling flush() 1128 // the conn eof in this case is perfectly fine 1129 1130 match conn.poll() { 1131 Ok(Async::Ready(None)) => (), 1132 other => panic!("unexpected frame: {:?}", other) 1133 } 1134 Ok(()) 1135 }).wait(); 1136 } 1137 1138 #[test] 1139 fn test_conn_read_body_end() { 1140 let _: Result<(), ()> = future::lazy(|| { 1141 let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024); 1142 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1143 conn.state.busy(); 1144 1145 match conn.poll() { 1146 Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (), 1147 other => panic!("unexpected frame: {:?}", other) 1148 } 1149 1150 match conn.poll() { 1151 Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (), 1152 other => panic!("unexpected frame: {:?}", other) 1153 } 1154 1155 // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None` 1156 match conn.poll() { 1157 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (), 1158 other => panic!("unexpected frame: {:?}", other) 1159 } 1160 1161 match conn.poll() { 1162 Ok(Async::NotReady) => (), 1163 other => panic!("unexpected frame: {:?}", other) 1164 } 1165 Ok(()) 1166 }).wait(); 1167 } 1168 1169 #[test] 1170 fn test_conn_closed_read() { 1171 let io = AsyncIo::new_buf(vec![], 0); 1172 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1173 conn.state.close(); 1174 1175 match conn.poll().unwrap() { 1176 Async::Ready(None) => {}, 1177 other => panic!("frame is not None: {:?}", other) 1178 } 1179 } 1180 1181 #[test] 1182 fn test_conn_body_write_length() { 1183 let _ = pretty_env_logger::try_init(); 1184 let _: Result<(), ()> = future::lazy(|| { 1185 let io = AsyncIo::new_buf(vec![], 0); 1186 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1187 let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096; 1188 conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64)); 1189 1190 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready()); 1191 assert!(!conn.can_buffer_body()); 1192 1193 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready()); 1194 1195 conn.io.io_mut().block_in(1024 * 3); 1196 assert!(conn.poll_complete().unwrap().is_not_ready()); 1197 conn.io.io_mut().block_in(1024 * 3); 1198 assert!(conn.poll_complete().unwrap().is_not_ready()); 1199 conn.io.io_mut().block_in(max * 2); 1200 assert!(conn.poll_complete().unwrap().is_ready()); 1201 1202 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready()); 1203 Ok(()) 1204 }).wait(); 1205 } 1206 1207 #[test] 1208 fn test_conn_body_write_chunked() { 1209 let _: Result<(), ()> = future::lazy(|| { 1210 let io = AsyncIo::new_buf(vec![], 4096); 1211 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1212 conn.state.writing = Writing::Body(Encoder::chunked()); 1213 1214 assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready()); 1215 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready()); 1216 Ok(()) 1217 }).wait(); 1218 } 1219 1220 #[test] 1221 fn test_conn_body_flush() { 1222 let _: Result<(), ()> = future::lazy(|| { 1223 let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5); 1224 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1225 conn.state.writing = Writing::Body(Encoder::length(1024 * 1024)); 1226 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready()); 1227 assert!(!conn.can_buffer_body()); 1228 conn.io.io_mut().block_in(1024 * 1024 * 5); 1229 assert!(conn.poll_complete().unwrap().is_ready()); 1230 assert!(conn.can_buffer_body()); 1231 assert!(conn.io.io_mut().flushed()); 1232 1233 Ok(()) 1234 }).wait(); 1235 } 1236 1237 #[test] 1238 fn test_conn_parking() { 1239 use std::sync::Arc; 1240 use futures::executor::Notify; 1241 use futures::executor::NotifyHandle; 1242 1243 struct Car { 1244 permit: bool, 1245 } 1246 impl Notify for Car { 1247 fn notify(&self, _id: usize) { 1248 assert!(self.permit, "unparked without permit"); 1249 } 1250 } 1251 1252 fn car(permit: bool) -> NotifyHandle { 1253 Arc::new(Car { 1254 permit: permit, 1255 }).into() 1256 } 1257 1258 // test that once writing is done, unparks 1259 let f = future::lazy(|| { 1260 let io = AsyncIo::new_buf(vec![], 4096); 1261 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1262 conn.state.reading = Reading::KeepAlive; 1263 assert!(conn.poll().unwrap().is_not_ready()); 1264 1265 conn.state.writing = Writing::KeepAlive; 1266 assert!(conn.poll_complete().unwrap().is_ready()); 1267 Ok::<(), ()>(()) 1268 }); 1269 ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap(); 1270 1271 1272 // test that flushing when not waiting on read doesn't unpark 1273 let f = future::lazy(|| { 1274 let io = AsyncIo::new_buf(vec![], 4096); 1275 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1276 conn.state.writing = Writing::KeepAlive; 1277 assert!(conn.poll_complete().unwrap().is_ready()); 1278 Ok::<(), ()>(()) 1279 }); 1280 ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap(); 1281 1282 1283 // test that flushing and writing isn't done doesn't unpark 1284 let f = future::lazy(|| { 1285 let io = AsyncIo::new_buf(vec![], 4096); 1286 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1287 conn.state.reading = Reading::KeepAlive; 1288 assert!(conn.poll().unwrap().is_not_ready()); 1289 conn.state.writing = Writing::Body(Encoder::length(5_000)); 1290 assert!(conn.poll_complete().unwrap().is_ready()); 1291 Ok::<(), ()>(()) 1292 }); 1293 ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap(); 1294 } 1295 1296 #[test] 1297 fn test_conn_closed_write() { 1298 let io = AsyncIo::new_buf(vec![], 0); 1299 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1300 conn.state.close(); 1301 1302 match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) { 1303 Err(_e) => {}, 1304 other => panic!("did not return Err: {:?}", other) 1305 } 1306 1307 assert!(conn.state.is_write_closed()); 1308 } 1309 1310 #[test] 1311 fn test_conn_write_empty_chunk() { 1312 let io = AsyncIo::new_buf(vec![], 0); 1313 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1314 conn.state.writing = Writing::KeepAlive; 1315 1316 assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready()); 1317 assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready()); 1318 conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err(); 1319 } 1320 */ 1321 } 1322