1 use std::fmt; 2 use std::io; 3 use std::marker::PhantomData; 4 #[cfg(all(feature = "server", feature = "runtime"))] 5 use std::time::Duration; 6 7 use bytes::{Buf, Bytes}; 8 use http::header::{HeaderValue, CONNECTION}; 9 use http::{HeaderMap, Method, Version}; 10 use httparse::ParserConfig; 11 use tokio::io::{AsyncRead, AsyncWrite}; 12 #[cfg(all(feature = "server", feature = "runtime"))] 13 use tokio::time::Sleep; 14 use tracing::{debug, error, trace}; 15 16 use super::io::Buffered; 17 use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants}; 18 use crate::body::DecodedLength; 19 use crate::common::{task, Pin, Poll, Unpin}; 20 use crate::headers::connection_keep_alive; 21 use crate::proto::{BodyLength, MessageHead}; 22 23 const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; 24 25 /// This handles a connection, which will have been established over an 26 /// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple 27 /// `Transaction`s over HTTP. 28 /// 29 /// The connection will determine when a message begins and ends as well as 30 /// determine if this connection can be kept alive after the message, 31 /// or if it is complete. 32 pub(crate) struct Conn<I, B, T> { 33 io: Buffered<I, EncodedBuf<B>>, 34 state: State, 35 _marker: PhantomData<fn(T)>, 36 } 37 38 impl<I, B, T> Conn<I, B, T> 39 where 40 I: AsyncRead + AsyncWrite + Unpin, 41 B: Buf, 42 T: Http1Transaction, 43 { new(io: I) -> Conn<I, B, T>44 pub(crate) fn new(io: I) -> Conn<I, B, T> { 45 Conn { 46 io: Buffered::new(io), 47 state: State { 48 allow_half_close: false, 49 cached_headers: None, 50 error: None, 51 keep_alive: KA::Busy, 52 method: None, 53 h1_parser_config: ParserConfig::default(), 54 #[cfg(all(feature = "server", feature = "runtime"))] 55 h1_header_read_timeout: None, 56 #[cfg(all(feature = "server", feature = "runtime"))] 57 h1_header_read_timeout_fut: None, 58 #[cfg(all(feature = "server", feature = "runtime"))] 59 h1_header_read_timeout_running: false, 60 preserve_header_case: false, 61 title_case_headers: false, 62 h09_responses: false, 63 #[cfg(feature = "ffi")] 64 on_informational: None, 65 #[cfg(feature = "ffi")] 66 raw_headers: false, 67 notify_read: false, 68 reading: Reading::Init, 69 writing: Writing::Init, 70 upgrade: None, 71 // We assume a modern world where the remote speaks HTTP/1.1. 72 // If they tell us otherwise, we'll downgrade in `read_head`. 73 version: Version::HTTP_11, 74 }, 75 _marker: PhantomData, 76 } 77 } 78 79 #[cfg(feature = "server")] set_flush_pipeline(&mut self, enabled: bool)80 pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) { 81 self.io.set_flush_pipeline(enabled); 82 } 83 set_write_strategy_queue(&mut self)84 pub(crate) fn set_write_strategy_queue(&mut self) { 85 self.io.set_write_strategy_queue(); 86 } 87 set_max_buf_size(&mut self, max: usize)88 pub(crate) fn set_max_buf_size(&mut self, max: usize) { 89 self.io.set_max_buf_size(max); 90 } 91 92 #[cfg(feature = "client")] set_read_buf_exact_size(&mut self, sz: usize)93 pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) { 94 self.io.set_read_buf_exact_size(sz); 95 } 96 set_write_strategy_flatten(&mut self)97 pub(crate) fn set_write_strategy_flatten(&mut self) { 98 self.io.set_write_strategy_flatten(); 99 } 100 101 #[cfg(feature = "client")] set_h1_parser_config(&mut self, parser_config: ParserConfig)102 pub(crate) fn set_h1_parser_config(&mut self, parser_config: ParserConfig) { 103 self.state.h1_parser_config = parser_config; 104 } 105 set_title_case_headers(&mut self)106 pub(crate) fn set_title_case_headers(&mut self) { 107 self.state.title_case_headers = true; 108 } 109 set_preserve_header_case(&mut self)110 pub(crate) fn set_preserve_header_case(&mut self) { 111 self.state.preserve_header_case = true; 112 } 113 114 #[cfg(feature = "client")] set_h09_responses(&mut self)115 pub(crate) fn set_h09_responses(&mut self) { 116 self.state.h09_responses = true; 117 } 118 119 #[cfg(all(feature = "server", feature = "runtime"))] set_http1_header_read_timeout(&mut self, val: Duration)120 pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) { 121 self.state.h1_header_read_timeout = Some(val); 122 } 123 124 #[cfg(feature = "server")] set_allow_half_close(&mut self)125 pub(crate) fn set_allow_half_close(&mut self) { 126 self.state.allow_half_close = true; 127 } 128 129 #[cfg(feature = "ffi")] set_raw_headers(&mut self, enabled: bool)130 pub(crate) fn set_raw_headers(&mut self, enabled: bool) { 131 self.state.raw_headers = enabled; 132 } 133 into_inner(self) -> (I, Bytes)134 pub(crate) fn into_inner(self) -> (I, Bytes) { 135 self.io.into_inner() 136 } 137 pending_upgrade(&mut self) -> Option<crate::upgrade::Pending>138 pub(crate) fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> { 139 self.state.upgrade.take() 140 } 141 is_read_closed(&self) -> bool142 pub(crate) fn is_read_closed(&self) -> bool { 143 self.state.is_read_closed() 144 } 145 is_write_closed(&self) -> bool146 pub(crate) fn is_write_closed(&self) -> bool { 147 self.state.is_write_closed() 148 } 149 can_read_head(&self) -> bool150 pub(crate) fn can_read_head(&self) -> bool { 151 match self.state.reading { 152 Reading::Init => { 153 if T::should_read_first() { 154 true 155 } else { 156 match self.state.writing { 157 Writing::Init => false, 158 _ => true, 159 } 160 } 161 } 162 _ => false, 163 } 164 } 165 can_read_body(&self) -> bool166 pub(crate) fn can_read_body(&self) -> bool { 167 match self.state.reading { 168 Reading::Body(..) | Reading::Continue(..) => true, 169 _ => false, 170 } 171 } 172 should_error_on_eof(&self) -> bool173 fn should_error_on_eof(&self) -> bool { 174 // If we're idle, it's probably just the connection closing gracefully. 175 T::should_error_on_parse_eof() && !self.state.is_idle() 176 } 177 has_h2_prefix(&self) -> bool178 fn has_h2_prefix(&self) -> bool { 179 let read_buf = self.io.read_buf(); 180 read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE 181 } 182 poll_read_head( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>>183 pub(super) fn poll_read_head( 184 &mut self, 185 cx: &mut task::Context<'_>, 186 ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> { 187 debug_assert!(self.can_read_head()); 188 trace!("Conn::read_head"); 189 190 let msg = match ready!(self.io.parse::<T>( 191 cx, 192 ParseContext { 193 cached_headers: &mut self.state.cached_headers, 194 req_method: &mut self.state.method, 195 h1_parser_config: self.state.h1_parser_config.clone(), 196 #[cfg(all(feature = "server", feature = "runtime"))] 197 h1_header_read_timeout: self.state.h1_header_read_timeout, 198 #[cfg(all(feature = "server", feature = "runtime"))] 199 h1_header_read_timeout_fut: &mut self.state.h1_header_read_timeout_fut, 200 #[cfg(all(feature = "server", feature = "runtime"))] 201 h1_header_read_timeout_running: &mut self.state.h1_header_read_timeout_running, 202 preserve_header_case: self.state.preserve_header_case, 203 h09_responses: self.state.h09_responses, 204 #[cfg(feature = "ffi")] 205 on_informational: &mut self.state.on_informational, 206 #[cfg(feature = "ffi")] 207 raw_headers: self.state.raw_headers, 208 } 209 )) { 210 Ok(msg) => msg, 211 Err(e) => return self.on_read_head_error(e), 212 }; 213 214 // Note: don't deconstruct `msg` into local variables, it appears 215 // the optimizer doesn't remove the extra copies. 216 217 debug!("incoming body is {}", msg.decode); 218 219 // Prevent accepting HTTP/0.9 responses after the initial one, if any. 220 self.state.h09_responses = false; 221 222 // Drop any OnInformational callbacks, we're done there! 223 #[cfg(feature = "ffi")] 224 { 225 self.state.on_informational = None; 226 } 227 228 self.state.busy(); 229 self.state.keep_alive &= msg.keep_alive; 230 self.state.version = msg.head.version; 231 232 let mut wants = if msg.wants_upgrade { 233 Wants::UPGRADE 234 } else { 235 Wants::EMPTY 236 }; 237 238 if msg.decode == DecodedLength::ZERO { 239 if msg.expect_continue { 240 debug!("ignoring expect-continue since body is empty"); 241 } 242 self.state.reading = Reading::KeepAlive; 243 if !T::should_read_first() { 244 self.try_keep_alive(cx); 245 } 246 } else if msg.expect_continue { 247 self.state.reading = Reading::Continue(Decoder::new(msg.decode)); 248 wants = wants.add(Wants::EXPECT); 249 } else { 250 self.state.reading = Reading::Body(Decoder::new(msg.decode)); 251 } 252 253 Poll::Ready(Some(Ok((msg.head, msg.decode, wants)))) 254 } 255 on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>>256 fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> { 257 // If we are currently waiting on a message, then an empty 258 // message should be reported as an error. If not, it is just 259 // the connection closing gracefully. 260 let must_error = self.should_error_on_eof(); 261 self.close_read(); 262 self.io.consume_leading_lines(); 263 let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty(); 264 if was_mid_parse || must_error { 265 // We check if the buf contains the h2 Preface 266 debug!( 267 "parse error ({}) with {} bytes", 268 e, 269 self.io.read_buf().len() 270 ); 271 match self.on_parse_error(e) { 272 Ok(()) => Poll::Pending, // XXX: wat? 273 Err(e) => Poll::Ready(Some(Err(e))), 274 } 275 } else { 276 debug!("read eof"); 277 self.close_write(); 278 Poll::Ready(None) 279 } 280 } 281 poll_read_body( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<io::Result<Bytes>>>282 pub(crate) fn poll_read_body( 283 &mut self, 284 cx: &mut task::Context<'_>, 285 ) -> Poll<Option<io::Result<Bytes>>> { 286 debug_assert!(self.can_read_body()); 287 288 let (reading, ret) = match self.state.reading { 289 Reading::Body(ref mut decoder) => { 290 match ready!(decoder.decode(cx, &mut self.io)) { 291 Ok(slice) => { 292 let (reading, chunk) = if decoder.is_eof() { 293 debug!("incoming body completed"); 294 ( 295 Reading::KeepAlive, 296 if !slice.is_empty() { 297 Some(Ok(slice)) 298 } else { 299 None 300 }, 301 ) 302 } else if slice.is_empty() { 303 error!("incoming body unexpectedly ended"); 304 // This should be unreachable, since all 3 decoders 305 // either set eof=true or return an Err when reading 306 // an empty slice... 307 (Reading::Closed, None) 308 } else { 309 return Poll::Ready(Some(Ok(slice))); 310 }; 311 (reading, Poll::Ready(chunk)) 312 } 313 Err(e) => { 314 debug!("incoming body decode error: {}", e); 315 (Reading::Closed, Poll::Ready(Some(Err(e)))) 316 } 317 } 318 } 319 Reading::Continue(ref decoder) => { 320 // Write the 100 Continue if not already responded... 321 if let Writing::Init = self.state.writing { 322 trace!("automatically sending 100 Continue"); 323 let cont = b"HTTP/1.1 100 Continue\r\n\r\n"; 324 self.io.headers_buf().extend_from_slice(cont); 325 } 326 327 // And now recurse once in the Reading::Body state... 328 self.state.reading = Reading::Body(decoder.clone()); 329 return self.poll_read_body(cx); 330 } 331 _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading), 332 }; 333 334 self.state.reading = reading; 335 self.try_keep_alive(cx); 336 ret 337 } 338 wants_read_again(&mut self) -> bool339 pub(crate) fn wants_read_again(&mut self) -> bool { 340 let ret = self.state.notify_read; 341 self.state.notify_read = false; 342 ret 343 } 344 poll_read_keep_alive( &mut self, cx: &mut task::Context<'_>, ) -> Poll<crate::Result<()>>345 pub(crate) fn poll_read_keep_alive( 346 &mut self, 347 cx: &mut task::Context<'_>, 348 ) -> Poll<crate::Result<()>> { 349 debug_assert!(!self.can_read_head() && !self.can_read_body()); 350 351 if self.is_read_closed() { 352 Poll::Pending 353 } else if self.is_mid_message() { 354 self.mid_message_detect_eof(cx) 355 } else { 356 self.require_empty_read(cx) 357 } 358 } 359 is_mid_message(&self) -> bool360 fn is_mid_message(&self) -> bool { 361 match (&self.state.reading, &self.state.writing) { 362 (&Reading::Init, &Writing::Init) => false, 363 _ => true, 364 } 365 } 366 367 // This will check to make sure the io object read is empty. 368 // 369 // This should only be called for Clients wanting to enter the idle 370 // state. require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>371 fn require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 372 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); 373 debug_assert!(!self.is_mid_message()); 374 debug_assert!(T::is_client()); 375 376 if !self.io.read_buf().is_empty() { 377 debug!("received an unexpected {} bytes", self.io.read_buf().len()); 378 return Poll::Ready(Err(crate::Error::new_unexpected_message())); 379 } 380 381 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?; 382 383 if num_read == 0 { 384 let ret = if self.should_error_on_eof() { 385 trace!("found unexpected EOF on busy connection: {:?}", self.state); 386 Poll::Ready(Err(crate::Error::new_incomplete())) 387 } else { 388 trace!("found EOF on idle connection, closing"); 389 Poll::Ready(Ok(())) 390 }; 391 392 // order is important: should_error needs state BEFORE close_read 393 self.state.close_read(); 394 return ret; 395 } 396 397 debug!( 398 "received unexpected {} bytes on an idle connection", 399 num_read 400 ); 401 Poll::Ready(Err(crate::Error::new_unexpected_message())) 402 } 403 mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>404 fn mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 405 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); 406 debug_assert!(self.is_mid_message()); 407 408 if self.state.allow_half_close || !self.io.read_buf().is_empty() { 409 return Poll::Pending; 410 } 411 412 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?; 413 414 if num_read == 0 { 415 trace!("found unexpected EOF on busy connection: {:?}", self.state); 416 self.state.close_read(); 417 Poll::Ready(Err(crate::Error::new_incomplete())) 418 } else { 419 Poll::Ready(Ok(())) 420 } 421 } 422 force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>>423 fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>> { 424 debug_assert!(!self.state.is_read_closed()); 425 426 let result = ready!(self.io.poll_read_from_io(cx)); 427 Poll::Ready(result.map_err(|e| { 428 trace!("force_io_read; io error = {:?}", e); 429 self.state.close(); 430 e 431 })) 432 } 433 maybe_notify(&mut self, cx: &mut task::Context<'_>)434 fn maybe_notify(&mut self, cx: &mut task::Context<'_>) { 435 // its possible that we returned NotReady from poll() without having 436 // exhausted the underlying Io. We would have done this when we 437 // determined we couldn't keep reading until we knew how writing 438 // would finish. 439 440 match self.state.reading { 441 Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => { 442 return 443 } 444 Reading::Init => (), 445 }; 446 447 match self.state.writing { 448 Writing::Body(..) => return, 449 Writing::Init | Writing::KeepAlive | Writing::Closed => (), 450 } 451 452 if !self.io.is_read_blocked() { 453 if self.io.read_buf().is_empty() { 454 match self.io.poll_read_from_io(cx) { 455 Poll::Ready(Ok(n)) => { 456 if n == 0 { 457 trace!("maybe_notify; read eof"); 458 if self.state.is_idle() { 459 self.state.close(); 460 } else { 461 self.close_read() 462 } 463 return; 464 } 465 } 466 Poll::Pending => { 467 trace!("maybe_notify; read_from_io blocked"); 468 return; 469 } 470 Poll::Ready(Err(e)) => { 471 trace!("maybe_notify; read_from_io error: {}", e); 472 self.state.close(); 473 self.state.error = Some(crate::Error::new_io(e)); 474 } 475 } 476 } 477 self.state.notify_read = true; 478 } 479 } 480 try_keep_alive(&mut self, cx: &mut task::Context<'_>)481 fn try_keep_alive(&mut self, cx: &mut task::Context<'_>) { 482 self.state.try_keep_alive::<T>(); 483 self.maybe_notify(cx); 484 } 485 can_write_head(&self) -> bool486 pub(crate) fn can_write_head(&self) -> bool { 487 if !T::should_read_first() { 488 if let Reading::Closed = self.state.reading { 489 return false; 490 } 491 } 492 match self.state.writing { 493 Writing::Init => self.io.can_headers_buf(), 494 _ => false, 495 } 496 } 497 can_write_body(&self) -> bool498 pub(crate) fn can_write_body(&self) -> bool { 499 match self.state.writing { 500 Writing::Body(..) => true, 501 Writing::Init | Writing::KeepAlive | Writing::Closed => false, 502 } 503 } 504 can_buffer_body(&self) -> bool505 pub(crate) fn can_buffer_body(&self) -> bool { 506 self.io.can_buffer() 507 } 508 write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>)509 pub(crate) fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) { 510 if let Some(encoder) = self.encode_head(head, body) { 511 self.state.writing = if !encoder.is_eof() { 512 Writing::Body(encoder) 513 } else if encoder.is_last() { 514 Writing::Closed 515 } else { 516 Writing::KeepAlive 517 }; 518 } 519 } 520 write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B)521 pub(crate) fn write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B) { 522 if let Some(encoder) = 523 self.encode_head(head, Some(BodyLength::Known(body.remaining() as u64))) 524 { 525 let is_last = encoder.is_last(); 526 // Make sure we don't write a body if we weren't actually allowed 527 // to do so, like because its a HEAD request. 528 if !encoder.is_eof() { 529 encoder.danger_full_buf(body, self.io.write_buf()); 530 } 531 self.state.writing = if is_last { 532 Writing::Closed 533 } else { 534 Writing::KeepAlive 535 } 536 } 537 } 538 encode_head( &mut self, mut head: MessageHead<T::Outgoing>, body: Option<BodyLength>, ) -> Option<Encoder>539 fn encode_head( 540 &mut self, 541 mut head: MessageHead<T::Outgoing>, 542 body: Option<BodyLength>, 543 ) -> Option<Encoder> { 544 debug_assert!(self.can_write_head()); 545 546 if !T::should_read_first() { 547 self.state.busy(); 548 } 549 550 self.enforce_version(&mut head); 551 552 let buf = self.io.headers_buf(); 553 match super::role::encode_headers::<T>( 554 Encode { 555 head: &mut head, 556 body, 557 #[cfg(feature = "server")] 558 keep_alive: self.state.wants_keep_alive(), 559 req_method: &mut self.state.method, 560 title_case_headers: self.state.title_case_headers, 561 }, 562 buf, 563 ) { 564 Ok(encoder) => { 565 debug_assert!(self.state.cached_headers.is_none()); 566 debug_assert!(head.headers.is_empty()); 567 self.state.cached_headers = Some(head.headers); 568 569 #[cfg(feature = "ffi")] 570 { 571 self.state.on_informational = 572 head.extensions.remove::<crate::ffi::OnInformational>(); 573 } 574 575 Some(encoder) 576 } 577 Err(err) => { 578 self.state.error = Some(err); 579 self.state.writing = Writing::Closed; 580 None 581 } 582 } 583 } 584 585 // Fix keep-alives when Connection: keep-alive header is not present fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>)586 fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) { 587 let outgoing_is_keep_alive = head 588 .headers 589 .get(CONNECTION) 590 .map(connection_keep_alive) 591 .unwrap_or(false); 592 593 if !outgoing_is_keep_alive { 594 match head.version { 595 // If response is version 1.0 and keep-alive is not present in the response, 596 // disable keep-alive so the server closes the connection 597 Version::HTTP_10 => self.state.disable_keep_alive(), 598 // If response is version 1.1 and keep-alive is wanted, add 599 // Connection: keep-alive header when not present 600 Version::HTTP_11 => { 601 if self.state.wants_keep_alive() { 602 head.headers 603 .insert(CONNECTION, HeaderValue::from_static("keep-alive")); 604 } 605 } 606 _ => (), 607 } 608 } 609 } 610 611 // If we know the remote speaks an older version, we try to fix up any messages 612 // to work with our older peer. enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>)613 fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) { 614 if let Version::HTTP_10 = self.state.version { 615 // Fixes response or connection when keep-alive header is not present 616 self.fix_keep_alive(head); 617 // If the remote only knows HTTP/1.0, we should force ourselves 618 // to do only speak HTTP/1.0 as well. 619 head.version = Version::HTTP_10; 620 } 621 // If the remote speaks HTTP/1.1, then it *should* be fine with 622 // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let 623 // the user's headers be. 624 } 625 write_body(&mut self, chunk: B)626 pub(crate) fn write_body(&mut self, chunk: B) { 627 debug_assert!(self.can_write_body() && self.can_buffer_body()); 628 // empty chunks should be discarded at Dispatcher level 629 debug_assert!(chunk.remaining() != 0); 630 631 let state = match self.state.writing { 632 Writing::Body(ref mut encoder) => { 633 self.io.buffer(encoder.encode(chunk)); 634 635 if encoder.is_eof() { 636 if encoder.is_last() { 637 Writing::Closed 638 } else { 639 Writing::KeepAlive 640 } 641 } else { 642 return; 643 } 644 } 645 _ => unreachable!("write_body invalid state: {:?}", self.state.writing), 646 }; 647 648 self.state.writing = state; 649 } 650 write_body_and_end(&mut self, chunk: B)651 pub(crate) fn write_body_and_end(&mut self, chunk: B) { 652 debug_assert!(self.can_write_body() && self.can_buffer_body()); 653 // empty chunks should be discarded at Dispatcher level 654 debug_assert!(chunk.remaining() != 0); 655 656 let state = match self.state.writing { 657 Writing::Body(ref encoder) => { 658 let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf()); 659 if can_keep_alive { 660 Writing::KeepAlive 661 } else { 662 Writing::Closed 663 } 664 } 665 _ => unreachable!("write_body invalid state: {:?}", self.state.writing), 666 }; 667 668 self.state.writing = state; 669 } 670 end_body(&mut self) -> crate::Result<()>671 pub(crate) fn end_body(&mut self) -> crate::Result<()> { 672 debug_assert!(self.can_write_body()); 673 674 let mut res = Ok(()); 675 let state = match self.state.writing { 676 Writing::Body(ref mut encoder) => { 677 // end of stream, that means we should try to eof 678 match encoder.end() { 679 Ok(end) => { 680 if let Some(end) = end { 681 self.io.buffer(end); 682 } 683 if encoder.is_last() || encoder.is_close_delimited() { 684 Writing::Closed 685 } else { 686 Writing::KeepAlive 687 } 688 } 689 Err(_not_eof) => { 690 res = Err(crate::Error::new_user_body( 691 crate::Error::new_body_write_aborted(), 692 )); 693 Writing::Closed 694 } 695 } 696 } 697 _ => return Ok(()), 698 }; 699 700 self.state.writing = state; 701 res 702 } 703 704 // When we get a parse error, depending on what side we are, we might be able 705 // to write a response before closing the connection. 706 // 707 // - Client: there is nothing we can do 708 // - Server: if Response hasn't been written yet, we can send a 4xx response on_parse_error(&mut self, err: crate::Error) -> crate::Result<()>709 fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> { 710 if let Writing::Init = self.state.writing { 711 if self.has_h2_prefix() { 712 return Err(crate::Error::new_version_h2()); 713 } 714 if let Some(msg) = T::on_error(&err) { 715 // Drop the cached headers so as to not trigger a debug 716 // assert in `write_head`... 717 self.state.cached_headers.take(); 718 self.write_head(msg, None); 719 self.state.error = Some(err); 720 return Ok(()); 721 } 722 } 723 724 // fallback is pass the error back up 725 Err(err) 726 } 727 poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>>728 pub(crate) fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { 729 ready!(Pin::new(&mut self.io).poll_flush(cx))?; 730 self.try_keep_alive(cx); 731 trace!("flushed({}): {:?}", T::LOG, self.state); 732 Poll::Ready(Ok(())) 733 } 734 poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>>735 pub(crate) fn poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { 736 match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) { 737 Ok(()) => { 738 trace!("shut down IO complete"); 739 Poll::Ready(Ok(())) 740 } 741 Err(e) => { 742 debug!("error shutting down IO: {}", e); 743 Poll::Ready(Err(e)) 744 } 745 } 746 } 747 748 /// If the read side can be cheaply drained, do so. Otherwise, close. poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>)749 pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>) { 750 let _ = self.poll_read_body(cx); 751 752 // If still in Reading::Body, just give up 753 match self.state.reading { 754 Reading::Init | Reading::KeepAlive => { 755 trace!("body drained"); 756 return; 757 } 758 _ => self.close_read(), 759 } 760 } 761 close_read(&mut self)762 pub(crate) fn close_read(&mut self) { 763 self.state.close_read(); 764 } 765 close_write(&mut self)766 pub(crate) fn close_write(&mut self) { 767 self.state.close_write(); 768 } 769 770 #[cfg(feature = "server")] disable_keep_alive(&mut self)771 pub(crate) fn disable_keep_alive(&mut self) { 772 if self.state.is_idle() { 773 trace!("disable_keep_alive; closing idle connection"); 774 self.state.close(); 775 } else { 776 trace!("disable_keep_alive; in-progress connection"); 777 self.state.disable_keep_alive(); 778 } 779 } 780 take_error(&mut self) -> crate::Result<()>781 pub(crate) fn take_error(&mut self) -> crate::Result<()> { 782 if let Some(err) = self.state.error.take() { 783 Err(err) 784 } else { 785 Ok(()) 786 } 787 } 788 on_upgrade(&mut self) -> crate::upgrade::OnUpgrade789 pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade { 790 trace!("{}: prepare possible HTTP upgrade", T::LOG); 791 self.state.prepare_upgrade() 792 } 793 } 794 795 impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result796 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 797 f.debug_struct("Conn") 798 .field("state", &self.state) 799 .field("io", &self.io) 800 .finish() 801 } 802 } 803 804 // B and T are never pinned 805 impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {} 806 807 struct State { 808 allow_half_close: bool, 809 /// Re-usable HeaderMap to reduce allocating new ones. 810 cached_headers: Option<HeaderMap>, 811 /// If an error occurs when there wasn't a direct way to return it 812 /// back to the user, this is set. 813 error: Option<crate::Error>, 814 /// Current keep-alive status. 815 keep_alive: KA, 816 /// If mid-message, the HTTP Method that started it. 817 /// 818 /// This is used to know things such as if the message can include 819 /// a body or not. 820 method: Option<Method>, 821 h1_parser_config: ParserConfig, 822 #[cfg(all(feature = "server", feature = "runtime"))] 823 h1_header_read_timeout: Option<Duration>, 824 #[cfg(all(feature = "server", feature = "runtime"))] 825 h1_header_read_timeout_fut: Option<Pin<Box<Sleep>>>, 826 #[cfg(all(feature = "server", feature = "runtime"))] 827 h1_header_read_timeout_running: bool, 828 preserve_header_case: bool, 829 title_case_headers: bool, 830 h09_responses: bool, 831 /// If set, called with each 1xx informational response received for 832 /// the current request. MUST be unset after a non-1xx response is 833 /// received. 834 #[cfg(feature = "ffi")] 835 on_informational: Option<crate::ffi::OnInformational>, 836 #[cfg(feature = "ffi")] 837 raw_headers: bool, 838 /// Set to true when the Dispatcher should poll read operations 839 /// again. See the `maybe_notify` method for more. 840 notify_read: bool, 841 /// State of allowed reads 842 reading: Reading, 843 /// State of allowed writes 844 writing: Writing, 845 /// An expected pending HTTP upgrade. 846 upgrade: Option<crate::upgrade::Pending>, 847 /// Either HTTP/1.0 or 1.1 connection 848 version: Version, 849 } 850 851 #[derive(Debug)] 852 enum Reading { 853 Init, 854 Continue(Decoder), 855 Body(Decoder), 856 KeepAlive, 857 Closed, 858 } 859 860 enum Writing { 861 Init, 862 Body(Encoder), 863 KeepAlive, 864 Closed, 865 } 866 867 impl fmt::Debug for State { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result868 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 869 let mut builder = f.debug_struct("State"); 870 builder 871 .field("reading", &self.reading) 872 .field("writing", &self.writing) 873 .field("keep_alive", &self.keep_alive); 874 875 // Only show error field if it's interesting... 876 if let Some(ref error) = self.error { 877 builder.field("error", error); 878 } 879 880 if self.allow_half_close { 881 builder.field("allow_half_close", &true); 882 } 883 884 // Purposefully leaving off other fields.. 885 886 builder.finish() 887 } 888 } 889 890 impl fmt::Debug for Writing { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result891 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 892 match *self { 893 Writing::Init => f.write_str("Init"), 894 Writing::Body(ref enc) => f.debug_tuple("Body").field(enc).finish(), 895 Writing::KeepAlive => f.write_str("KeepAlive"), 896 Writing::Closed => f.write_str("Closed"), 897 } 898 } 899 } 900 901 impl std::ops::BitAndAssign<bool> for KA { bitand_assign(&mut self, enabled: bool)902 fn bitand_assign(&mut self, enabled: bool) { 903 if !enabled { 904 trace!("remote disabling keep-alive"); 905 *self = KA::Disabled; 906 } 907 } 908 } 909 910 #[derive(Clone, Copy, Debug)] 911 enum KA { 912 Idle, 913 Busy, 914 Disabled, 915 } 916 917 impl Default for KA { default() -> KA918 fn default() -> KA { 919 KA::Busy 920 } 921 } 922 923 impl KA { idle(&mut self)924 fn idle(&mut self) { 925 *self = KA::Idle; 926 } 927 busy(&mut self)928 fn busy(&mut self) { 929 *self = KA::Busy; 930 } 931 disable(&mut self)932 fn disable(&mut self) { 933 *self = KA::Disabled; 934 } 935 status(&self) -> KA936 fn status(&self) -> KA { 937 *self 938 } 939 } 940 941 impl State { close(&mut self)942 fn close(&mut self) { 943 trace!("State::close()"); 944 self.reading = Reading::Closed; 945 self.writing = Writing::Closed; 946 self.keep_alive.disable(); 947 } 948 close_read(&mut self)949 fn close_read(&mut self) { 950 trace!("State::close_read()"); 951 self.reading = Reading::Closed; 952 self.keep_alive.disable(); 953 } 954 close_write(&mut self)955 fn close_write(&mut self) { 956 trace!("State::close_write()"); 957 self.writing = Writing::Closed; 958 self.keep_alive.disable(); 959 } 960 wants_keep_alive(&self) -> bool961 fn wants_keep_alive(&self) -> bool { 962 if let KA::Disabled = self.keep_alive.status() { 963 false 964 } else { 965 true 966 } 967 } 968 try_keep_alive<T: Http1Transaction>(&mut self)969 fn try_keep_alive<T: Http1Transaction>(&mut self) { 970 match (&self.reading, &self.writing) { 971 (&Reading::KeepAlive, &Writing::KeepAlive) => { 972 if let KA::Busy = self.keep_alive.status() { 973 self.idle::<T>(); 974 } else { 975 trace!( 976 "try_keep_alive({}): could keep-alive, but status = {:?}", 977 T::LOG, 978 self.keep_alive 979 ); 980 self.close(); 981 } 982 } 983 (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => { 984 self.close() 985 } 986 _ => (), 987 } 988 } 989 disable_keep_alive(&mut self)990 fn disable_keep_alive(&mut self) { 991 self.keep_alive.disable() 992 } 993 busy(&mut self)994 fn busy(&mut self) { 995 if let KA::Disabled = self.keep_alive.status() { 996 return; 997 } 998 self.keep_alive.busy(); 999 } 1000 idle<T: Http1Transaction>(&mut self)1001 fn idle<T: Http1Transaction>(&mut self) { 1002 debug_assert!(!self.is_idle(), "State::idle() called while idle"); 1003 1004 self.method = None; 1005 self.keep_alive.idle(); 1006 if self.is_idle() { 1007 self.reading = Reading::Init; 1008 self.writing = Writing::Init; 1009 1010 // !T::should_read_first() means Client. 1011 // 1012 // If Client connection has just gone idle, the Dispatcher 1013 // should try the poll loop one more time, so as to poll the 1014 // pending requests stream. 1015 if !T::should_read_first() { 1016 self.notify_read = true; 1017 } 1018 } else { 1019 self.close(); 1020 } 1021 } 1022 is_idle(&self) -> bool1023 fn is_idle(&self) -> bool { 1024 if let KA::Idle = self.keep_alive.status() { 1025 true 1026 } else { 1027 false 1028 } 1029 } 1030 is_read_closed(&self) -> bool1031 fn is_read_closed(&self) -> bool { 1032 match self.reading { 1033 Reading::Closed => true, 1034 _ => false, 1035 } 1036 } 1037 is_write_closed(&self) -> bool1038 fn is_write_closed(&self) -> bool { 1039 match self.writing { 1040 Writing::Closed => true, 1041 _ => false, 1042 } 1043 } 1044 prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade1045 fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade { 1046 let (tx, rx) = crate::upgrade::pending(); 1047 self.upgrade = Some(tx); 1048 rx 1049 } 1050 } 1051 1052 #[cfg(test)] 1053 mod tests { 1054 #[cfg(feature = "nightly")] 1055 #[bench] bench_read_head_short(b: &mut ::test::Bencher)1056 fn bench_read_head_short(b: &mut ::test::Bencher) { 1057 use super::*; 1058 let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n"; 1059 let len = s.len(); 1060 b.bytes = len as u64; 1061 1062 // an empty IO, we'll be skipping and using the read buffer anyways 1063 let io = tokio_test::io::Builder::new().build(); 1064 let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io); 1065 *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]); 1066 conn.state.cached_headers = Some(HeaderMap::with_capacity(2)); 1067 1068 let rt = tokio::runtime::Builder::new_current_thread() 1069 .enable_all() 1070 .build() 1071 .unwrap(); 1072 1073 b.iter(|| { 1074 rt.block_on(futures_util::future::poll_fn(|cx| { 1075 match conn.poll_read_head(cx) { 1076 Poll::Ready(Some(Ok(x))) => { 1077 ::test::black_box(&x); 1078 let mut headers = x.0.headers; 1079 headers.clear(); 1080 conn.state.cached_headers = Some(headers); 1081 } 1082 f => panic!("expected Ready(Some(Ok(..))): {:?}", f), 1083 } 1084 1085 conn.io.read_buf_mut().reserve(1); 1086 unsafe { 1087 conn.io.read_buf_mut().set_len(len); 1088 } 1089 conn.state.reading = Reading::Init; 1090 Poll::Ready(()) 1091 })); 1092 }); 1093 } 1094 1095 /* 1096 //TODO: rewrite these using dispatch... someday... 1097 use futures::{Async, Future, Stream, Sink}; 1098 use futures::future; 1099 1100 use proto::{self, ClientTransaction, MessageHead, ServerTransaction}; 1101 use super::super::Encoder; 1102 use mock::AsyncIo; 1103 1104 use super::{Conn, Decoder, Reading, Writing}; 1105 use ::uri::Uri; 1106 1107 use std::str::FromStr; 1108 1109 #[test] 1110 fn test_conn_init_read() { 1111 let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec(); 1112 let len = good_message.len(); 1113 let io = AsyncIo::new_buf(good_message, len); 1114 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1115 1116 match conn.poll().unwrap() { 1117 Async::Ready(Some(Frame::Message { message, body: false })) => { 1118 assert_eq!(message, MessageHead { 1119 subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()), 1120 .. MessageHead::default() 1121 }) 1122 }, 1123 f => panic!("frame is not Frame::Message: {:?}", f) 1124 } 1125 } 1126 1127 #[test] 1128 fn test_conn_parse_partial() { 1129 let _: Result<(), ()> = future::lazy(|| { 1130 let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec(); 1131 let io = AsyncIo::new_buf(good_message, 10); 1132 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1133 assert!(conn.poll().unwrap().is_not_ready()); 1134 conn.io.io_mut().block_in(50); 1135 let async = conn.poll().unwrap(); 1136 assert!(async.is_ready()); 1137 match async { 1138 Async::Ready(Some(Frame::Message { .. })) => (), 1139 f => panic!("frame is not Message: {:?}", f), 1140 } 1141 Ok(()) 1142 }).wait(); 1143 } 1144 1145 #[test] 1146 fn test_conn_init_read_eof_idle() { 1147 let io = AsyncIo::new_buf(vec![], 1); 1148 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1149 conn.state.idle(); 1150 1151 match conn.poll().unwrap() { 1152 Async::Ready(None) => {}, 1153 other => panic!("frame is not None: {:?}", other) 1154 } 1155 } 1156 1157 #[test] 1158 fn test_conn_init_read_eof_idle_partial_parse() { 1159 let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100); 1160 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1161 conn.state.idle(); 1162 1163 match conn.poll() { 1164 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {}, 1165 other => panic!("unexpected frame: {:?}", other) 1166 } 1167 } 1168 1169 #[test] 1170 fn test_conn_init_read_eof_busy() { 1171 let _: Result<(), ()> = future::lazy(|| { 1172 // server ignores 1173 let io = AsyncIo::new_eof(); 1174 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1175 conn.state.busy(); 1176 1177 match conn.poll().unwrap() { 1178 Async::Ready(None) => {}, 1179 other => panic!("unexpected frame: {:?}", other) 1180 } 1181 1182 // client 1183 let io = AsyncIo::new_eof(); 1184 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); 1185 conn.state.busy(); 1186 1187 match conn.poll() { 1188 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {}, 1189 other => panic!("unexpected frame: {:?}", other) 1190 } 1191 Ok(()) 1192 }).wait(); 1193 } 1194 1195 #[test] 1196 fn test_conn_body_finish_read_eof() { 1197 let _: Result<(), ()> = future::lazy(|| { 1198 let io = AsyncIo::new_eof(); 1199 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); 1200 conn.state.busy(); 1201 conn.state.writing = Writing::KeepAlive; 1202 conn.state.reading = Reading::Body(Decoder::length(0)); 1203 1204 match conn.poll() { 1205 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (), 1206 other => panic!("unexpected frame: {:?}", other) 1207 } 1208 1209 // conn eofs, but tokio-proto will call poll() again, before calling flush() 1210 // the conn eof in this case is perfectly fine 1211 1212 match conn.poll() { 1213 Ok(Async::Ready(None)) => (), 1214 other => panic!("unexpected frame: {:?}", other) 1215 } 1216 Ok(()) 1217 }).wait(); 1218 } 1219 1220 #[test] 1221 fn test_conn_message_empty_body_read_eof() { 1222 let _: Result<(), ()> = future::lazy(|| { 1223 let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024); 1224 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); 1225 conn.state.busy(); 1226 conn.state.writing = Writing::KeepAlive; 1227 1228 match conn.poll() { 1229 Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (), 1230 other => panic!("unexpected frame: {:?}", other) 1231 } 1232 1233 // conn eofs, but tokio-proto will call poll() again, before calling flush() 1234 // the conn eof in this case is perfectly fine 1235 1236 match conn.poll() { 1237 Ok(Async::Ready(None)) => (), 1238 other => panic!("unexpected frame: {:?}", other) 1239 } 1240 Ok(()) 1241 }).wait(); 1242 } 1243 1244 #[test] 1245 fn test_conn_read_body_end() { 1246 let _: Result<(), ()> = future::lazy(|| { 1247 let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024); 1248 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1249 conn.state.busy(); 1250 1251 match conn.poll() { 1252 Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (), 1253 other => panic!("unexpected frame: {:?}", other) 1254 } 1255 1256 match conn.poll() { 1257 Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (), 1258 other => panic!("unexpected frame: {:?}", other) 1259 } 1260 1261 // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None` 1262 match conn.poll() { 1263 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (), 1264 other => panic!("unexpected frame: {:?}", other) 1265 } 1266 1267 match conn.poll() { 1268 Ok(Async::NotReady) => (), 1269 other => panic!("unexpected frame: {:?}", other) 1270 } 1271 Ok(()) 1272 }).wait(); 1273 } 1274 1275 #[test] 1276 fn test_conn_closed_read() { 1277 let io = AsyncIo::new_buf(vec![], 0); 1278 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1279 conn.state.close(); 1280 1281 match conn.poll().unwrap() { 1282 Async::Ready(None) => {}, 1283 other => panic!("frame is not None: {:?}", other) 1284 } 1285 } 1286 1287 #[test] 1288 fn test_conn_body_write_length() { 1289 let _ = pretty_env_logger::try_init(); 1290 let _: Result<(), ()> = future::lazy(|| { 1291 let io = AsyncIo::new_buf(vec![], 0); 1292 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1293 let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096; 1294 conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64)); 1295 1296 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready()); 1297 assert!(!conn.can_buffer_body()); 1298 1299 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready()); 1300 1301 conn.io.io_mut().block_in(1024 * 3); 1302 assert!(conn.poll_complete().unwrap().is_not_ready()); 1303 conn.io.io_mut().block_in(1024 * 3); 1304 assert!(conn.poll_complete().unwrap().is_not_ready()); 1305 conn.io.io_mut().block_in(max * 2); 1306 assert!(conn.poll_complete().unwrap().is_ready()); 1307 1308 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready()); 1309 Ok(()) 1310 }).wait(); 1311 } 1312 1313 #[test] 1314 fn test_conn_body_write_chunked() { 1315 let _: Result<(), ()> = future::lazy(|| { 1316 let io = AsyncIo::new_buf(vec![], 4096); 1317 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1318 conn.state.writing = Writing::Body(Encoder::chunked()); 1319 1320 assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready()); 1321 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready()); 1322 Ok(()) 1323 }).wait(); 1324 } 1325 1326 #[test] 1327 fn test_conn_body_flush() { 1328 let _: Result<(), ()> = future::lazy(|| { 1329 let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5); 1330 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1331 conn.state.writing = Writing::Body(Encoder::length(1024 * 1024)); 1332 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready()); 1333 assert!(!conn.can_buffer_body()); 1334 conn.io.io_mut().block_in(1024 * 1024 * 5); 1335 assert!(conn.poll_complete().unwrap().is_ready()); 1336 assert!(conn.can_buffer_body()); 1337 assert!(conn.io.io_mut().flushed()); 1338 1339 Ok(()) 1340 }).wait(); 1341 } 1342 1343 #[test] 1344 fn test_conn_parking() { 1345 use std::sync::Arc; 1346 use futures::executor::Notify; 1347 use futures::executor::NotifyHandle; 1348 1349 struct Car { 1350 permit: bool, 1351 } 1352 impl Notify for Car { 1353 fn notify(&self, _id: usize) { 1354 assert!(self.permit, "unparked without permit"); 1355 } 1356 } 1357 1358 fn car(permit: bool) -> NotifyHandle { 1359 Arc::new(Car { 1360 permit: permit, 1361 }).into() 1362 } 1363 1364 // test that once writing is done, unparks 1365 let f = future::lazy(|| { 1366 let io = AsyncIo::new_buf(vec![], 4096); 1367 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1368 conn.state.reading = Reading::KeepAlive; 1369 assert!(conn.poll().unwrap().is_not_ready()); 1370 1371 conn.state.writing = Writing::KeepAlive; 1372 assert!(conn.poll_complete().unwrap().is_ready()); 1373 Ok::<(), ()>(()) 1374 }); 1375 ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap(); 1376 1377 1378 // test that flushing when not waiting on read doesn't unpark 1379 let f = future::lazy(|| { 1380 let io = AsyncIo::new_buf(vec![], 4096); 1381 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1382 conn.state.writing = Writing::KeepAlive; 1383 assert!(conn.poll_complete().unwrap().is_ready()); 1384 Ok::<(), ()>(()) 1385 }); 1386 ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap(); 1387 1388 1389 // test that flushing and writing isn't done doesn't unpark 1390 let f = future::lazy(|| { 1391 let io = AsyncIo::new_buf(vec![], 4096); 1392 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1393 conn.state.reading = Reading::KeepAlive; 1394 assert!(conn.poll().unwrap().is_not_ready()); 1395 conn.state.writing = Writing::Body(Encoder::length(5_000)); 1396 assert!(conn.poll_complete().unwrap().is_ready()); 1397 Ok::<(), ()>(()) 1398 }); 1399 ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap(); 1400 } 1401 1402 #[test] 1403 fn test_conn_closed_write() { 1404 let io = AsyncIo::new_buf(vec![], 0); 1405 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1406 conn.state.close(); 1407 1408 match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) { 1409 Err(_e) => {}, 1410 other => panic!("did not return Err: {:?}", other) 1411 } 1412 1413 assert!(conn.state.is_write_closed()); 1414 } 1415 1416 #[test] 1417 fn test_conn_write_empty_chunk() { 1418 let io = AsyncIo::new_buf(vec![], 0); 1419 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1420 conn.state.writing = Writing::KeepAlive; 1421 1422 assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready()); 1423 assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready()); 1424 conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err(); 1425 } 1426 */ 1427 } 1428