1 use std::{ 2 collections::VecDeque, 3 error::Error as StdError, 4 fmt, 5 future::Future, 6 io, mem, net, 7 pin::Pin, 8 rc::Rc, 9 task::{Context, Poll}, 10 }; 11 12 use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts}; 13 use actix_rt::time::{sleep_until, Instant, Sleep}; 14 use actix_service::Service; 15 use bitflags::bitflags; 16 use bytes::{Buf, BytesMut}; 17 use futures_core::ready; 18 use log::{error, trace}; 19 use pin_project::pin_project; 20 21 use crate::{ 22 body::{AnyBody, BodySize, MessageBody}, 23 config::ServiceConfig, 24 error::{DispatchError, ParseError, PayloadError}, 25 service::HttpFlow, 26 OnConnectData, Request, Response, StatusCode, 27 }; 28 29 use super::{ 30 codec::Codec, 31 payload::{Payload, PayloadSender, PayloadStatus}, 32 Message, MessageType, 33 }; 34 35 const LW_BUFFER_SIZE: usize = 1024; 36 const HW_BUFFER_SIZE: usize = 1024 * 8; 37 const MAX_PIPELINED_MESSAGES: usize = 16; 38 39 bitflags! { 40 pub struct Flags: u8 { 41 const STARTED = 0b0000_0001; 42 const KEEPALIVE = 0b0000_0010; 43 const SHUTDOWN = 0b0000_0100; 44 const READ_DISCONNECT = 0b0000_1000; 45 const WRITE_DISCONNECT = 0b0001_0000; 46 } 47 } 48 49 #[pin_project] 50 /// Dispatcher for HTTP/1.1 protocol 51 pub struct Dispatcher<T, S, B, X, U> 52 where 53 S: Service<Request>, 54 S::Error: Into<Response<AnyBody>>, 55 56 B: MessageBody, 57 B::Error: Into<Box<dyn StdError>>, 58 59 X: Service<Request, Response = Request>, 60 X::Error: Into<Response<AnyBody>>, 61 62 U: Service<(Request, Framed<T, Codec>), Response = ()>, 63 U::Error: fmt::Display, 64 { 65 #[pin] 66 inner: DispatcherState<T, S, B, X, U>, 67 68 #[cfg(test)] 69 poll_count: u64, 70 } 71 72 #[pin_project(project = DispatcherStateProj)] 73 enum DispatcherState<T, S, B, X, U> 74 where 75 S: Service<Request>, 76 S::Error: Into<Response<AnyBody>>, 77 78 B: MessageBody, 79 B::Error: Into<Box<dyn StdError>>, 80 81 X: Service<Request, Response = Request>, 82 X::Error: Into<Response<AnyBody>>, 83 84 U: Service<(Request, Framed<T, Codec>), Response = ()>, 85 U::Error: fmt::Display, 86 { 87 Normal(#[pin] InnerDispatcher<T, S, B, X, U>), 88 Upgrade(#[pin] U::Future), 89 } 90 91 #[pin_project(project = InnerDispatcherProj)] 92 struct InnerDispatcher<T, S, B, X, U> 93 where 94 S: Service<Request>, 95 S::Error: Into<Response<AnyBody>>, 96 97 B: MessageBody, 98 B::Error: Into<Box<dyn StdError>>, 99 100 X: Service<Request, Response = Request>, 101 X::Error: Into<Response<AnyBody>>, 102 103 U: Service<(Request, Framed<T, Codec>), Response = ()>, 104 U::Error: fmt::Display, 105 { 106 flow: Rc<HttpFlow<S, X, U>>, 107 on_connect_data: OnConnectData, 108 flags: Flags, 109 peer_addr: Option<net::SocketAddr>, 110 error: Option<DispatchError>, 111 112 #[pin] 113 state: State<S, B, X>, 114 payload: Option<PayloadSender>, 115 messages: VecDeque<DispatcherMessage>, 116 117 ka_expire: Instant, 118 #[pin] 119 ka_timer: Option<Sleep>, 120 121 io: Option<T>, 122 read_buf: BytesMut, 123 write_buf: BytesMut, 124 codec: Codec, 125 } 126 127 enum DispatcherMessage { 128 Item(Request), 129 Upgrade(Request), 130 Error(Response<()>), 131 } 132 133 #[pin_project(project = StateProj)] 134 enum State<S, B, X> 135 where 136 S: Service<Request>, 137 X: Service<Request, Response = Request>, 138 139 B: MessageBody, 140 B::Error: Into<Box<dyn StdError>>, 141 { 142 None, 143 ExpectCall(#[pin] X::Future), 144 ServiceCall(#[pin] S::Future), 145 SendPayload(#[pin] B), 146 SendErrorPayload(#[pin] AnyBody), 147 } 148 149 impl<S, B, X> State<S, B, X> 150 where 151 S: Service<Request>, 152 153 X: Service<Request, Response = Request>, 154 155 B: MessageBody, 156 B::Error: Into<Box<dyn StdError>>, 157 { is_empty(&self) -> bool158 fn is_empty(&self) -> bool { 159 matches!(self, State::None) 160 } 161 } 162 163 enum PollResponse { 164 Upgrade(Request), 165 DoNothing, 166 DrainWriteBuf, 167 } 168 169 impl<T, S, B, X, U> Dispatcher<T, S, B, X, U> 170 where 171 T: AsyncRead + AsyncWrite + Unpin, 172 173 S: Service<Request>, 174 S::Error: Into<Response<AnyBody>>, 175 S::Response: Into<Response<B>>, 176 177 B: MessageBody, 178 B::Error: Into<Box<dyn StdError>>, 179 180 X: Service<Request, Response = Request>, 181 X::Error: Into<Response<AnyBody>>, 182 183 U: Service<(Request, Framed<T, Codec>), Response = ()>, 184 U::Error: fmt::Display, 185 { 186 /// Create HTTP/1 dispatcher. new( io: T, config: ServiceConfig, flow: Rc<HttpFlow<S, X, U>>, on_connect_data: OnConnectData, peer_addr: Option<net::SocketAddr>, ) -> Self187 pub(crate) fn new( 188 io: T, 189 config: ServiceConfig, 190 flow: Rc<HttpFlow<S, X, U>>, 191 on_connect_data: OnConnectData, 192 peer_addr: Option<net::SocketAddr>, 193 ) -> Self { 194 let flags = if config.keep_alive_enabled() { 195 Flags::KEEPALIVE 196 } else { 197 Flags::empty() 198 }; 199 200 // keep-alive timer 201 let (ka_expire, ka_timer) = match config.keep_alive_timer() { 202 Some(delay) => (delay.deadline(), Some(delay)), 203 None => (config.now(), None), 204 }; 205 206 Dispatcher { 207 inner: DispatcherState::Normal(InnerDispatcher { 208 read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE), 209 write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE), 210 payload: None, 211 state: State::None, 212 error: None, 213 messages: VecDeque::new(), 214 io: Some(io), 215 codec: Codec::new(config), 216 flow, 217 on_connect_data, 218 flags, 219 peer_addr, 220 ka_expire, 221 ka_timer, 222 }), 223 224 #[cfg(test)] 225 poll_count: 0, 226 } 227 } 228 } 229 230 impl<T, S, B, X, U> InnerDispatcher<T, S, B, X, U> 231 where 232 T: AsyncRead + AsyncWrite + Unpin, 233 234 S: Service<Request>, 235 S::Error: Into<Response<AnyBody>>, 236 S::Response: Into<Response<B>>, 237 238 B: MessageBody, 239 B::Error: Into<Box<dyn StdError>>, 240 241 X: Service<Request, Response = Request>, 242 X::Error: Into<Response<AnyBody>>, 243 244 U: Service<(Request, Framed<T, Codec>), Response = ()>, 245 U::Error: fmt::Display, 246 { can_read(&self, cx: &mut Context<'_>) -> bool247 fn can_read(&self, cx: &mut Context<'_>) -> bool { 248 if self.flags.contains(Flags::READ_DISCONNECT) { 249 false 250 } else if let Some(ref info) = self.payload { 251 info.need_read(cx) == PayloadStatus::Read 252 } else { 253 true 254 } 255 } 256 257 // if checked is set to true, delay disconnect until all tasks have finished. client_disconnected(self: Pin<&mut Self>)258 fn client_disconnected(self: Pin<&mut Self>) { 259 let this = self.project(); 260 this.flags 261 .insert(Flags::READ_DISCONNECT | Flags::WRITE_DISCONNECT); 262 if let Some(mut payload) = this.payload.take() { 263 payload.set_error(PayloadError::Incomplete(None)); 264 } 265 } 266 poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), io::Error>>267 fn poll_flush( 268 self: Pin<&mut Self>, 269 cx: &mut Context<'_>, 270 ) -> Poll<Result<(), io::Error>> { 271 let InnerDispatcherProj { io, write_buf, .. } = self.project(); 272 let mut io = Pin::new(io.as_mut().unwrap()); 273 274 let len = write_buf.len(); 275 let mut written = 0; 276 277 while written < len { 278 match io.as_mut().poll_write(cx, &write_buf[written..])? { 279 Poll::Ready(0) => { 280 return Poll::Ready(Err(io::Error::new( 281 io::ErrorKind::WriteZero, 282 "", 283 ))) 284 } 285 Poll::Ready(n) => written += n, 286 Poll::Pending => { 287 write_buf.advance(written); 288 return Poll::Pending; 289 } 290 } 291 } 292 293 // everything has written to io. clear buffer. 294 write_buf.clear(); 295 296 // flush the io and check if get blocked. 297 io.poll_flush(cx) 298 } 299 send_response_inner( self: Pin<&mut Self>, message: Response<()>, body: &impl MessageBody, ) -> Result<BodySize, DispatchError>300 fn send_response_inner( 301 self: Pin<&mut Self>, 302 message: Response<()>, 303 body: &impl MessageBody, 304 ) -> Result<BodySize, DispatchError> { 305 let size = body.size(); 306 let mut this = self.project(); 307 this.codec 308 .encode(Message::Item((message, size)), &mut this.write_buf) 309 .map_err(|err| { 310 if let Some(mut payload) = this.payload.take() { 311 payload.set_error(PayloadError::Incomplete(None)); 312 } 313 DispatchError::Io(err) 314 })?; 315 316 this.flags.set(Flags::KEEPALIVE, this.codec.keepalive()); 317 318 Ok(size) 319 } 320 send_response( mut self: Pin<&mut Self>, message: Response<()>, body: B, ) -> Result<(), DispatchError>321 fn send_response( 322 mut self: Pin<&mut Self>, 323 message: Response<()>, 324 body: B, 325 ) -> Result<(), DispatchError> { 326 let size = self.as_mut().send_response_inner(message, &body)?; 327 let state = match size { 328 BodySize::None | BodySize::Empty => State::None, 329 _ => State::SendPayload(body), 330 }; 331 self.project().state.set(state); 332 Ok(()) 333 } 334 send_error_response( mut self: Pin<&mut Self>, message: Response<()>, body: AnyBody, ) -> Result<(), DispatchError>335 fn send_error_response( 336 mut self: Pin<&mut Self>, 337 message: Response<()>, 338 body: AnyBody, 339 ) -> Result<(), DispatchError> { 340 let size = self.as_mut().send_response_inner(message, &body)?; 341 let state = match size { 342 BodySize::None | BodySize::Empty => State::None, 343 _ => State::SendErrorPayload(body), 344 }; 345 self.project().state.set(state); 346 Ok(()) 347 } 348 send_continue(self: Pin<&mut Self>)349 fn send_continue(self: Pin<&mut Self>) { 350 self.project() 351 .write_buf 352 .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); 353 } 354 poll_response( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result<PollResponse, DispatchError>355 fn poll_response( 356 mut self: Pin<&mut Self>, 357 cx: &mut Context<'_>, 358 ) -> Result<PollResponse, DispatchError> { 359 'res: loop { 360 let mut this = self.as_mut().project(); 361 match this.state.as_mut().project() { 362 // no future is in InnerDispatcher state. pop next message. 363 StateProj::None => match this.messages.pop_front() { 364 // handle request message. 365 Some(DispatcherMessage::Item(req)) => { 366 // Handle `EXPECT: 100-Continue` header 367 if req.head().expect() { 368 // set InnerDispatcher state and continue loop to poll it. 369 let task = this.flow.expect.call(req); 370 this.state.set(State::ExpectCall(task)); 371 } else { 372 // the same as expect call. 373 let task = this.flow.service.call(req); 374 this.state.set(State::ServiceCall(task)); 375 }; 376 } 377 378 // handle error message. 379 Some(DispatcherMessage::Error(res)) => { 380 // send_response would update InnerDispatcher state to SendPayload or 381 // None(If response body is empty). 382 // continue loop to poll it. 383 self.as_mut().send_error_response(res, AnyBody::Empty)?; 384 } 385 386 // return with upgrade request and poll it exclusively. 387 Some(DispatcherMessage::Upgrade(req)) => { 388 return Ok(PollResponse::Upgrade(req)); 389 } 390 391 // all messages are dealt with. 392 None => return Ok(PollResponse::DoNothing), 393 }, 394 StateProj::ServiceCall(fut) => match fut.poll(cx) { 395 // service call resolved. send response. 396 Poll::Ready(Ok(res)) => { 397 let (res, body) = res.into().replace_body(()); 398 self.as_mut().send_response(res, body)?; 399 } 400 401 // send service call error as response 402 Poll::Ready(Err(err)) => { 403 let res: Response<AnyBody> = err.into(); 404 let (res, body) = res.replace_body(()); 405 self.as_mut().send_error_response(res, body)?; 406 } 407 408 // service call pending and could be waiting for more chunk messages. 409 // (pipeline message limit and/or payload can_read limit) 410 Poll::Pending => { 411 // no new message is decoded and no new payload is feed. 412 // nothing to do except waiting for new incoming data from client. 413 if !self.as_mut().poll_request(cx)? { 414 return Ok(PollResponse::DoNothing); 415 } 416 // otherwise keep loop. 417 } 418 }, 419 420 StateProj::SendPayload(mut stream) => { 421 // keep populate writer buffer until buffer size limit hit, 422 // get blocked or finished. 423 while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { 424 match stream.as_mut().poll_next(cx) { 425 Poll::Ready(Some(Ok(item))) => { 426 this.codec.encode( 427 Message::Chunk(Some(item)), 428 &mut this.write_buf, 429 )?; 430 } 431 432 Poll::Ready(None) => { 433 this.codec 434 .encode(Message::Chunk(None), &mut this.write_buf)?; 435 // payload stream finished. 436 // set state to None and handle next message 437 this.state.set(State::None); 438 continue 'res; 439 } 440 441 Poll::Ready(Some(Err(err))) => { 442 return Err(DispatchError::Body(err.into())) 443 } 444 445 Poll::Pending => return Ok(PollResponse::DoNothing), 446 } 447 } 448 // buffer is beyond max size. 449 // return and try to write the whole buffer to io stream. 450 return Ok(PollResponse::DrainWriteBuf); 451 } 452 453 StateProj::SendErrorPayload(mut stream) => { 454 // TODO: de-dupe impl with SendPayload 455 456 // keep populate writer buffer until buffer size limit hit, 457 // get blocked or finished. 458 while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { 459 match stream.as_mut().poll_next(cx) { 460 Poll::Ready(Some(Ok(item))) => { 461 this.codec.encode( 462 Message::Chunk(Some(item)), 463 &mut this.write_buf, 464 )?; 465 } 466 467 Poll::Ready(None) => { 468 this.codec 469 .encode(Message::Chunk(None), &mut this.write_buf)?; 470 // payload stream finished. 471 // set state to None and handle next message 472 this.state.set(State::None); 473 continue 'res; 474 } 475 476 Poll::Ready(Some(Err(err))) => { 477 return Err(DispatchError::Service(err.into())) 478 } 479 480 Poll::Pending => return Ok(PollResponse::DoNothing), 481 } 482 } 483 // buffer is beyond max size. 484 // return and try to write the whole buffer to io stream. 485 return Ok(PollResponse::DrainWriteBuf); 486 } 487 488 StateProj::ExpectCall(fut) => match fut.poll(cx) { 489 // expect resolved. write continue to buffer and set InnerDispatcher state 490 // to service call. 491 Poll::Ready(Ok(req)) => { 492 this.write_buf 493 .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); 494 let fut = this.flow.service.call(req); 495 this.state.set(State::ServiceCall(fut)); 496 } 497 498 // send expect error as response 499 Poll::Ready(Err(err)) => { 500 let res: Response<AnyBody> = err.into(); 501 let (res, body) = res.replace_body(()); 502 self.as_mut().send_error_response(res, body)?; 503 } 504 505 // expect must be solved before progress can be made. 506 Poll::Pending => return Ok(PollResponse::DoNothing), 507 }, 508 } 509 } 510 } 511 handle_request( mut self: Pin<&mut Self>, req: Request, cx: &mut Context<'_>, ) -> Result<(), DispatchError>512 fn handle_request( 513 mut self: Pin<&mut Self>, 514 req: Request, 515 cx: &mut Context<'_>, 516 ) -> Result<(), DispatchError> { 517 // Handle `EXPECT: 100-Continue` header 518 let mut this = self.as_mut().project(); 519 if req.head().expect() { 520 // set dispatcher state so the future is pinned. 521 let task = this.flow.expect.call(req); 522 this.state.set(State::ExpectCall(task)); 523 } else { 524 // the same as above. 525 let task = this.flow.service.call(req); 526 this.state.set(State::ServiceCall(task)); 527 }; 528 529 // eagerly poll the future for once(or twice if expect is resolved immediately). 530 loop { 531 match self.as_mut().project().state.project() { 532 StateProj::ExpectCall(fut) => { 533 match fut.poll(cx) { 534 // expect is resolved. continue loop and poll the service call branch. 535 Poll::Ready(Ok(req)) => { 536 self.as_mut().send_continue(); 537 let mut this = self.as_mut().project(); 538 let task = this.flow.service.call(req); 539 this.state.set(State::ServiceCall(task)); 540 continue; 541 } 542 // future is pending. return Ok(()) to notify that a new state is 543 // set and the outer loop should be continue. 544 Poll::Pending => return Ok(()), 545 // future is error. send response and return a result. On success 546 // to notify the dispatcher a new state is set and the outer loop 547 // should be continue. 548 Poll::Ready(Err(err)) => { 549 let res: Response<AnyBody> = err.into(); 550 let (res, body) = res.replace_body(()); 551 return self.send_error_response(res, body); 552 } 553 } 554 } 555 StateProj::ServiceCall(fut) => { 556 // return no matter the service call future's result. 557 return match fut.poll(cx) { 558 // future is resolved. send response and return a result. On success 559 // to notify the dispatcher a new state is set and the outer loop 560 // should be continue. 561 Poll::Ready(Ok(res)) => { 562 let (res, body) = res.into().replace_body(()); 563 self.send_response(res, body) 564 } 565 // see the comment on ExpectCall state branch's Pending. 566 Poll::Pending => Ok(()), 567 // see the comment on ExpectCall state branch's Ready(Err(err)). 568 Poll::Ready(Err(err)) => { 569 let res: Response<AnyBody> = err.into(); 570 let (res, body) = res.replace_body(()); 571 self.send_error_response(res, body) 572 } 573 }; 574 } 575 _ => unreachable!( 576 "State must be set to ServiceCall or ExceptCall in handle_request" 577 ), 578 } 579 } 580 } 581 582 /// Process one incoming request. poll_request( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result<bool, DispatchError>583 fn poll_request( 584 mut self: Pin<&mut Self>, 585 cx: &mut Context<'_>, 586 ) -> Result<bool, DispatchError> { 587 // limit amount of non-processed requests 588 if self.messages.len() >= MAX_PIPELINED_MESSAGES || !self.can_read(cx) { 589 return Ok(false); 590 } 591 592 let mut updated = false; 593 let mut this = self.as_mut().project(); 594 loop { 595 match this.codec.decode(&mut this.read_buf) { 596 Ok(Some(msg)) => { 597 updated = true; 598 this.flags.insert(Flags::STARTED); 599 600 match msg { 601 Message::Item(mut req) => { 602 req.head_mut().peer_addr = *this.peer_addr; 603 604 // merge on_connect_ext data into request extensions 605 this.on_connect_data.merge_into(&mut req); 606 607 match this.codec.message_type() { 608 // Request is upgradable. add upgrade message and break. 609 // everything remain in read buffer would be handed to 610 // upgraded Request. 611 MessageType::Stream if this.flow.upgrade.is_some() => { 612 this.messages 613 .push_back(DispatcherMessage::Upgrade(req)); 614 break; 615 } 616 617 // Request is not upgradable. 618 MessageType::Payload | MessageType::Stream => { 619 /* 620 PayloadSender and Payload are smart pointers share the 621 same state. 622 PayloadSender is attached to dispatcher and used to sink 623 new chunked request data to state. 624 Payload is attached to Request and passed to Service::call 625 where the state can be collected and consumed. 626 */ 627 let (ps, pl) = Payload::create(false); 628 let (req1, _) = 629 req.replace_payload(crate::Payload::H1(pl)); 630 req = req1; 631 *this.payload = Some(ps); 632 } 633 634 // Request has no payload. 635 MessageType::None => {} 636 } 637 638 // handle request early when no future in InnerDispatcher state. 639 if this.state.is_empty() { 640 self.as_mut().handle_request(req, cx)?; 641 this = self.as_mut().project(); 642 } else { 643 this.messages.push_back(DispatcherMessage::Item(req)); 644 } 645 } 646 Message::Chunk(Some(chunk)) => { 647 if let Some(ref mut payload) = this.payload { 648 payload.feed_data(chunk); 649 } else { 650 error!( 651 "Internal server error: unexpected payload chunk" 652 ); 653 this.flags.insert(Flags::READ_DISCONNECT); 654 this.messages.push_back(DispatcherMessage::Error( 655 Response::internal_server_error().drop_body(), 656 )); 657 *this.error = Some(DispatchError::InternalError); 658 break; 659 } 660 } 661 Message::Chunk(None) => { 662 if let Some(mut payload) = this.payload.take() { 663 payload.feed_eof(); 664 } else { 665 error!("Internal server error: unexpected eof"); 666 this.flags.insert(Flags::READ_DISCONNECT); 667 this.messages.push_back(DispatcherMessage::Error( 668 Response::internal_server_error().drop_body(), 669 )); 670 *this.error = Some(DispatchError::InternalError); 671 break; 672 } 673 } 674 } 675 } 676 // decode is partial and buffer is not full yet. 677 // break and wait for more read. 678 Ok(None) => break, 679 Err(ParseError::Io(err)) => { 680 self.as_mut().client_disconnected(); 681 this = self.as_mut().project(); 682 *this.error = Some(DispatchError::Io(err)); 683 break; 684 } 685 Err(ParseError::TooLarge) => { 686 if let Some(mut payload) = this.payload.take() { 687 payload.set_error(PayloadError::Overflow); 688 } 689 // Requests overflow buffer size should be responded with 431 690 this.messages.push_back(DispatcherMessage::Error( 691 Response::with_body( 692 StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE, 693 (), 694 ), 695 )); 696 this.flags.insert(Flags::READ_DISCONNECT); 697 *this.error = Some(ParseError::TooLarge.into()); 698 break; 699 } 700 Err(err) => { 701 if let Some(mut payload) = this.payload.take() { 702 payload.set_error(PayloadError::EncodingCorrupted); 703 } 704 705 // Malformed requests should be responded with 400 706 this.messages.push_back(DispatcherMessage::Error( 707 Response::bad_request().drop_body(), 708 )); 709 this.flags.insert(Flags::READ_DISCONNECT); 710 *this.error = Some(err.into()); 711 break; 712 } 713 } 714 } 715 716 if updated && this.ka_timer.is_some() { 717 if let Some(expire) = this.codec.config().keep_alive_expire() { 718 *this.ka_expire = expire; 719 } 720 } 721 Ok(updated) 722 } 723 724 /// keep-alive timer poll_keepalive( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result<(), DispatchError>725 fn poll_keepalive( 726 mut self: Pin<&mut Self>, 727 cx: &mut Context<'_>, 728 ) -> Result<(), DispatchError> { 729 let mut this = self.as_mut().project(); 730 731 // when a branch is not explicit return early it's meant to fall through 732 // and return as Ok(()) 733 match this.ka_timer.as_mut().as_pin_mut() { 734 None => { 735 // conditionally go into shutdown timeout 736 if this.flags.contains(Flags::SHUTDOWN) { 737 if let Some(deadline) = this.codec.config().client_disconnect_timer() 738 { 739 // write client disconnect time out and poll again to 740 // go into Some<Pin<&mut Sleep>> branch 741 this.ka_timer.set(Some(sleep_until(deadline))); 742 return self.poll_keepalive(cx); 743 } 744 } 745 } 746 Some(mut timer) => { 747 // only operate when keep-alive timer is resolved. 748 if timer.as_mut().poll(cx).is_ready() { 749 // got timeout during shutdown, drop connection 750 if this.flags.contains(Flags::SHUTDOWN) { 751 return Err(DispatchError::DisconnectTimeout); 752 // exceed deadline. check for any outstanding tasks 753 } else if timer.deadline() >= *this.ka_expire { 754 // have no task at hand. 755 if this.state.is_empty() && this.write_buf.is_empty() { 756 if this.flags.contains(Flags::STARTED) { 757 trace!("Keep-alive timeout, close connection"); 758 this.flags.insert(Flags::SHUTDOWN); 759 760 // start shutdown timeout 761 if let Some(deadline) = 762 this.codec.config().client_disconnect_timer() 763 { 764 timer.as_mut().reset(deadline); 765 let _ = timer.poll(cx); 766 } else { 767 // no shutdown timeout, drop socket 768 this.flags.insert(Flags::WRITE_DISCONNECT); 769 } 770 } else { 771 // timeout on first request (slow request) return 408 772 trace!("Slow request timeout"); 773 let _ = self.as_mut().send_error_response( 774 Response::with_body(StatusCode::REQUEST_TIMEOUT, ()), 775 AnyBody::Empty, 776 ); 777 this = self.project(); 778 this.flags.insert(Flags::STARTED | Flags::SHUTDOWN); 779 } 780 // still have unfinished task. try to reset and register keep-alive. 781 } else if let Some(deadline) = 782 this.codec.config().keep_alive_expire() 783 { 784 timer.as_mut().reset(deadline); 785 let _ = timer.poll(cx); 786 } 787 // timer resolved but still have not met the keep-alive expire deadline. 788 // reset and register for later wakeup. 789 } else { 790 timer.as_mut().reset(*this.ka_expire); 791 let _ = timer.poll(cx); 792 } 793 } 794 } 795 } 796 Ok(()) 797 } 798 799 /// Returns true when io stream can be disconnected after write to it. 800 /// 801 /// It covers these conditions: 802 /// 803 /// - `std::io::ErrorKind::ConnectionReset` after partial read. 804 /// - all data read done. 805 #[inline(always)] read_available( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result<bool, DispatchError>806 fn read_available( 807 self: Pin<&mut Self>, 808 cx: &mut Context<'_>, 809 ) -> Result<bool, DispatchError> { 810 let this = self.project(); 811 812 if this.flags.contains(Flags::READ_DISCONNECT) { 813 return Ok(false); 814 }; 815 816 let mut io = Pin::new(this.io.as_mut().unwrap()); 817 818 let mut read_some = false; 819 820 loop { 821 // Return early when read buf exceed decoder's max buffer size. 822 if this.read_buf.len() >= super::decoder::MAX_BUFFER_SIZE { 823 /* 824 At this point it's not known IO stream is still scheduled 825 to be waked up. so force wake up dispatcher just in case. 826 827 Reason: 828 AsyncRead mostly would only have guarantee wake up 829 when the poll_read return Poll::Pending. 830 831 Case: 832 When read_buf is beyond max buffer size the early return 833 could be successfully be parsed as a new Request. 834 This case would not generate ParseError::TooLarge 835 and at this point IO stream is not fully read to Pending 836 and would result in dispatcher stuck until timeout (KA) 837 838 Note: 839 This is a perf choice to reduce branch on 840 <Request as MessageType>::decode. 841 842 A Request head too large to parse is only checked on 843 httparse::Status::Partial condition. 844 */ 845 if this.payload.is_none() { 846 /* 847 When dispatcher has a payload the responsibility of 848 wake up it would be shift to h1::payload::Payload. 849 850 Reason: 851 Self wake up when there is payload would waste poll 852 and/or result in over read. 853 854 Case: 855 When payload is (partial) dropped by user there is 856 no need to do read anymore. 857 At this case read_buf could always remain beyond 858 MAX_BUFFER_SIZE and self wake up would be busy poll 859 dispatcher and waste resource. 860 861 */ 862 cx.waker().wake_by_ref(); 863 } 864 865 return Ok(false); 866 } 867 868 // grow buffer if necessary. 869 let remaining = this.read_buf.capacity() - this.read_buf.len(); 870 if remaining < LW_BUFFER_SIZE { 871 this.read_buf.reserve(HW_BUFFER_SIZE - remaining); 872 } 873 874 match actix_codec::poll_read_buf(io.as_mut(), cx, this.read_buf) { 875 Poll::Ready(Ok(n)) => { 876 if n == 0 { 877 return Ok(true); 878 } 879 read_some = true; 880 } 881 Poll::Pending => return Ok(false), 882 Poll::Ready(Err(err)) => { 883 return match err.kind() { 884 io::ErrorKind::WouldBlock => Ok(false), 885 io::ErrorKind::ConnectionReset if read_some => Ok(true), 886 _ => Err(DispatchError::Io(err)), 887 } 888 } 889 } 890 } 891 } 892 893 /// call upgrade service with request. upgrade(self: Pin<&mut Self>, req: Request) -> U::Future894 fn upgrade(self: Pin<&mut Self>, req: Request) -> U::Future { 895 let this = self.project(); 896 let mut parts = FramedParts::with_read_buf( 897 this.io.take().unwrap(), 898 mem::take(this.codec), 899 mem::take(this.read_buf), 900 ); 901 parts.write_buf = mem::take(this.write_buf); 902 let framed = Framed::from_parts(parts); 903 this.flow.upgrade.as_ref().unwrap().call((req, framed)) 904 } 905 } 906 907 impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U> 908 where 909 T: AsyncRead + AsyncWrite + Unpin, 910 911 S: Service<Request>, 912 S::Error: Into<Response<AnyBody>>, 913 S::Response: Into<Response<B>>, 914 915 B: MessageBody, 916 B::Error: Into<Box<dyn StdError>>, 917 918 X: Service<Request, Response = Request>, 919 X::Error: Into<Response<AnyBody>>, 920 921 U: Service<(Request, Framed<T, Codec>), Response = ()>, 922 U::Error: fmt::Display, 923 { 924 type Output = Result<(), DispatchError>; 925 926 #[inline] poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>927 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 928 let this = self.as_mut().project(); 929 930 #[cfg(test)] 931 { 932 *this.poll_count += 1; 933 } 934 935 match this.inner.project() { 936 DispatcherStateProj::Normal(mut inner) => { 937 inner.as_mut().poll_keepalive(cx)?; 938 939 if inner.flags.contains(Flags::SHUTDOWN) { 940 if inner.flags.contains(Flags::WRITE_DISCONNECT) { 941 Poll::Ready(Ok(())) 942 } else { 943 // flush buffer and wait on blocked. 944 ready!(inner.as_mut().poll_flush(cx))?; 945 Pin::new(inner.project().io.as_mut().unwrap()) 946 .poll_shutdown(cx) 947 .map_err(DispatchError::from) 948 } 949 } else { 950 // read from io stream and fill read buffer. 951 let should_disconnect = inner.as_mut().read_available(cx)?; 952 953 inner.as_mut().poll_request(cx)?; 954 955 // io stream should to be closed. 956 if should_disconnect { 957 let inner = inner.as_mut().project(); 958 inner.flags.insert(Flags::READ_DISCONNECT); 959 if let Some(mut payload) = inner.payload.take() { 960 payload.feed_eof(); 961 } 962 }; 963 964 loop { 965 // poll_response and populate write buffer. 966 // drain indicate if write buffer should be emptied before next run. 967 let drain = match inner.as_mut().poll_response(cx)? { 968 PollResponse::DrainWriteBuf => true, 969 PollResponse::DoNothing => false, 970 // upgrade request and goes Upgrade variant of DispatcherState. 971 PollResponse::Upgrade(req) => { 972 let upgrade = inner.upgrade(req); 973 self.as_mut() 974 .project() 975 .inner 976 .set(DispatcherState::Upgrade(upgrade)); 977 return self.poll(cx); 978 } 979 }; 980 981 // we didn't get WouldBlock from write operation, 982 // so data get written to kernel completely (macOS) 983 // and we have to write again otherwise response can get stuck 984 // 985 // TODO: what? is WouldBlock good or bad? 986 // want to find a reference for this macOS behavior 987 if inner.as_mut().poll_flush(cx)?.is_pending() || !drain { 988 break; 989 } 990 } 991 992 // client is gone 993 if inner.flags.contains(Flags::WRITE_DISCONNECT) { 994 return Poll::Ready(Ok(())); 995 } 996 997 let is_empty = inner.state.is_empty(); 998 999 let inner_p = inner.as_mut().project(); 1000 // read half is closed and we do not processing any responses 1001 if inner_p.flags.contains(Flags::READ_DISCONNECT) && is_empty { 1002 inner_p.flags.insert(Flags::SHUTDOWN); 1003 } 1004 1005 // keep-alive and stream errors 1006 if is_empty && inner_p.write_buf.is_empty() { 1007 if let Some(err) = inner_p.error.take() { 1008 Poll::Ready(Err(err)) 1009 } 1010 // disconnect if keep-alive is not enabled 1011 else if inner_p.flags.contains(Flags::STARTED) 1012 && !inner_p.flags.intersects(Flags::KEEPALIVE) 1013 { 1014 inner_p.flags.insert(Flags::SHUTDOWN); 1015 self.poll(cx) 1016 } 1017 // disconnect if shutdown 1018 else if inner_p.flags.contains(Flags::SHUTDOWN) { 1019 self.poll(cx) 1020 } else { 1021 Poll::Pending 1022 } 1023 } else { 1024 Poll::Pending 1025 } 1026 } 1027 } 1028 DispatcherStateProj::Upgrade(fut) => fut.poll(cx).map_err(|e| { 1029 error!("Upgrade handler error: {}", e); 1030 DispatchError::Upgrade 1031 }), 1032 } 1033 } 1034 } 1035 1036 #[cfg(test)] 1037 mod tests { 1038 use std::str; 1039 1040 use actix_service::fn_service; 1041 use actix_utils::future::{ready, Ready}; 1042 use bytes::Bytes; 1043 use futures_util::future::lazy; 1044 1045 use super::*; 1046 use crate::{ 1047 error::Error, 1048 h1::{ExpectHandler, UpgradeHandler}, 1049 http::Method, 1050 test::{TestBuffer, TestSeqBuffer}, 1051 HttpMessage, KeepAlive, 1052 }; 1053 find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize>1054 fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize> { 1055 haystack[from..] 1056 .windows(needle.len()) 1057 .position(|window| window == needle) 1058 } 1059 stabilize_date_header(payload: &mut [u8])1060 fn stabilize_date_header(payload: &mut [u8]) { 1061 let mut from = 0; 1062 1063 while let Some(pos) = find_slice(&payload, b"date", from) { 1064 payload[(from + pos)..(from + pos + 35)] 1065 .copy_from_slice(b"date: Thu, 01 Jan 1970 12:34:56 UTC"); 1066 from += 35; 1067 } 1068 } 1069 ok_service() -> impl Service<Request, Response = Response<AnyBody>, Error = Error>1070 fn ok_service() -> impl Service<Request, Response = Response<AnyBody>, Error = Error> 1071 { 1072 fn_service(|_req: Request| ready(Ok::<_, Error>(Response::ok()))) 1073 } 1074 echo_path_service( ) -> impl Service<Request, Response = Response<AnyBody>, Error = Error>1075 fn echo_path_service( 1076 ) -> impl Service<Request, Response = Response<AnyBody>, Error = Error> { 1077 fn_service(|req: Request| { 1078 let path = req.path().as_bytes(); 1079 ready(Ok::<_, Error>( 1080 Response::ok().set_body(AnyBody::from_slice(path)), 1081 )) 1082 }) 1083 } 1084 echo_payload_service( ) -> impl Service<Request, Response = Response<Bytes>, Error = Error>1085 fn echo_payload_service( 1086 ) -> impl Service<Request, Response = Response<Bytes>, Error = Error> { 1087 fn_service(|mut req: Request| { 1088 Box::pin(async move { 1089 use futures_util::stream::StreamExt as _; 1090 1091 let mut pl = req.take_payload(); 1092 let mut body = BytesMut::new(); 1093 while let Some(chunk) = pl.next().await { 1094 body.extend_from_slice(chunk.unwrap().chunk()) 1095 } 1096 1097 Ok::<_, Error>(Response::ok().set_body(body.freeze())) 1098 }) 1099 }) 1100 } 1101 1102 #[actix_rt::test] test_req_parse_err()1103 async fn test_req_parse_err() { 1104 lazy(|cx| { 1105 let buf = TestBuffer::new("GET /test HTTP/1\r\n\r\n"); 1106 1107 let services = HttpFlow::new(ok_service(), ExpectHandler, None); 1108 1109 let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( 1110 buf, 1111 ServiceConfig::default(), 1112 services, 1113 OnConnectData::default(), 1114 None, 1115 ); 1116 1117 actix_rt::pin!(h1); 1118 1119 match h1.as_mut().poll(cx) { 1120 Poll::Pending => panic!(), 1121 Poll::Ready(res) => assert!(res.is_err()), 1122 } 1123 1124 if let DispatcherStateProj::Normal(inner) = h1.project().inner.project() { 1125 assert!(inner.flags.contains(Flags::READ_DISCONNECT)); 1126 assert_eq!( 1127 &inner.project().io.take().unwrap().write_buf[..26], 1128 b"HTTP/1.1 400 Bad Request\r\n" 1129 ); 1130 } 1131 }) 1132 .await; 1133 } 1134 1135 #[actix_rt::test] test_pipelining()1136 async fn test_pipelining() { 1137 lazy(|cx| { 1138 let buf = TestBuffer::new( 1139 "\ 1140 GET /abcd HTTP/1.1\r\n\r\n\ 1141 GET /def HTTP/1.1\r\n\r\n\ 1142 ", 1143 ); 1144 1145 let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None); 1146 1147 let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); 1148 1149 let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( 1150 buf, 1151 cfg, 1152 services, 1153 OnConnectData::default(), 1154 None, 1155 ); 1156 1157 actix_rt::pin!(h1); 1158 1159 assert!(matches!(&h1.inner, DispatcherState::Normal(_))); 1160 1161 match h1.as_mut().poll(cx) { 1162 Poll::Pending => panic!("first poll should not be pending"), 1163 Poll::Ready(res) => assert!(res.is_ok()), 1164 } 1165 1166 // polls: initial => shutdown 1167 assert_eq!(h1.poll_count, 2); 1168 1169 if let DispatcherStateProj::Normal(inner) = h1.project().inner.project() { 1170 let res = &mut inner.project().io.take().unwrap().write_buf[..]; 1171 stabilize_date_header(res); 1172 1173 let exp = b"\ 1174 HTTP/1.1 200 OK\r\n\ 1175 content-length: 5\r\n\ 1176 connection: close\r\n\ 1177 date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ 1178 /abcd\ 1179 HTTP/1.1 200 OK\r\n\ 1180 content-length: 4\r\n\ 1181 connection: close\r\n\ 1182 date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ 1183 /def\ 1184 "; 1185 1186 assert_eq!(res.to_vec(), exp.to_vec()); 1187 } 1188 }) 1189 .await; 1190 1191 lazy(|cx| { 1192 let buf = TestBuffer::new( 1193 "\ 1194 GET /abcd HTTP/1.1\r\n\r\n\ 1195 GET /def HTTP/1\r\n\r\n\ 1196 ", 1197 ); 1198 1199 let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None); 1200 1201 let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); 1202 1203 let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( 1204 buf, 1205 cfg, 1206 services, 1207 OnConnectData::default(), 1208 None, 1209 ); 1210 1211 actix_rt::pin!(h1); 1212 1213 assert!(matches!(&h1.inner, DispatcherState::Normal(_))); 1214 1215 match h1.as_mut().poll(cx) { 1216 Poll::Pending => panic!("first poll should not be pending"), 1217 Poll::Ready(res) => assert!(res.is_err()), 1218 } 1219 1220 // polls: initial => shutdown 1221 assert_eq!(h1.poll_count, 1); 1222 1223 if let DispatcherStateProj::Normal(inner) = h1.project().inner.project() { 1224 let res = &mut inner.project().io.take().unwrap().write_buf[..]; 1225 stabilize_date_header(res); 1226 1227 let exp = b"\ 1228 HTTP/1.1 200 OK\r\n\ 1229 content-length: 5\r\n\ 1230 connection: close\r\n\ 1231 date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ 1232 /abcd\ 1233 HTTP/1.1 400 Bad Request\r\n\ 1234 content-length: 0\r\n\ 1235 connection: close\r\n\ 1236 date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ 1237 "; 1238 1239 assert_eq!(res.to_vec(), exp.to_vec()); 1240 } 1241 }) 1242 .await; 1243 } 1244 1245 #[actix_rt::test] test_expect()1246 async fn test_expect() { 1247 lazy(|cx| { 1248 let mut buf = TestSeqBuffer::empty(); 1249 let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); 1250 1251 let services = HttpFlow::new(echo_payload_service(), ExpectHandler, None); 1252 1253 let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( 1254 buf.clone(), 1255 cfg, 1256 services, 1257 OnConnectData::default(), 1258 None, 1259 ); 1260 1261 buf.extend_read_buf( 1262 "\ 1263 POST /upload HTTP/1.1\r\n\ 1264 Content-Length: 5\r\n\ 1265 Expect: 100-continue\r\n\ 1266 \r\n\ 1267 ", 1268 ); 1269 1270 actix_rt::pin!(h1); 1271 1272 assert!(h1.as_mut().poll(cx).is_pending()); 1273 assert!(matches!(&h1.inner, DispatcherState::Normal(_))); 1274 1275 // polls: manual 1276 assert_eq!(h1.poll_count, 1); 1277 eprintln!("poll count: {}", h1.poll_count); 1278 1279 if let DispatcherState::Normal(ref inner) = h1.inner { 1280 let io = inner.io.as_ref().unwrap(); 1281 let res = &io.write_buf()[..]; 1282 assert_eq!( 1283 str::from_utf8(res).unwrap(), 1284 "HTTP/1.1 100 Continue\r\n\r\n" 1285 ); 1286 } 1287 1288 buf.extend_read_buf("12345"); 1289 assert!(h1.as_mut().poll(cx).is_ready()); 1290 1291 // polls: manual manual shutdown 1292 assert_eq!(h1.poll_count, 3); 1293 1294 if let DispatcherState::Normal(ref inner) = h1.inner { 1295 let io = inner.io.as_ref().unwrap(); 1296 let mut res = (&io.write_buf()[..]).to_owned(); 1297 stabilize_date_header(&mut res); 1298 1299 assert_eq!( 1300 str::from_utf8(&res).unwrap(), 1301 "\ 1302 HTTP/1.1 100 Continue\r\n\ 1303 \r\n\ 1304 HTTP/1.1 200 OK\r\n\ 1305 content-length: 5\r\n\ 1306 connection: close\r\n\ 1307 date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\ 1308 \r\n\ 1309 12345\ 1310 " 1311 ); 1312 } 1313 }) 1314 .await; 1315 } 1316 1317 #[actix_rt::test] test_eager_expect()1318 async fn test_eager_expect() { 1319 lazy(|cx| { 1320 let mut buf = TestSeqBuffer::empty(); 1321 let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); 1322 1323 let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); 1324 1325 let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( 1326 buf.clone(), 1327 cfg, 1328 services, 1329 OnConnectData::default(), 1330 None, 1331 ); 1332 1333 buf.extend_read_buf( 1334 "\ 1335 POST /upload HTTP/1.1\r\n\ 1336 Content-Length: 5\r\n\ 1337 Expect: 100-continue\r\n\ 1338 \r\n\ 1339 ", 1340 ); 1341 1342 actix_rt::pin!(h1); 1343 1344 assert!(h1.as_mut().poll(cx).is_ready()); 1345 assert!(matches!(&h1.inner, DispatcherState::Normal(_))); 1346 1347 // polls: manual shutdown 1348 assert_eq!(h1.poll_count, 2); 1349 1350 if let DispatcherState::Normal(ref inner) = h1.inner { 1351 let io = inner.io.as_ref().unwrap(); 1352 let mut res = (&io.write_buf()[..]).to_owned(); 1353 stabilize_date_header(&mut res); 1354 1355 // Despite the content-length header and even though the request payload has not 1356 // been sent, this test expects a complete service response since the payload 1357 // is not used at all. The service passed to dispatcher is path echo and doesn't 1358 // consume payload bytes. 1359 assert_eq!( 1360 str::from_utf8(&res).unwrap(), 1361 "\ 1362 HTTP/1.1 100 Continue\r\n\ 1363 \r\n\ 1364 HTTP/1.1 200 OK\r\n\ 1365 content-length: 7\r\n\ 1366 connection: close\r\n\ 1367 date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\ 1368 \r\n\ 1369 /upload\ 1370 " 1371 ); 1372 } 1373 }) 1374 .await; 1375 } 1376 1377 #[actix_rt::test] test_upgrade()1378 async fn test_upgrade() { 1379 struct TestUpgrade; 1380 1381 impl<T> Service<(Request, Framed<T, Codec>)> for TestUpgrade { 1382 type Response = (); 1383 type Error = Error; 1384 type Future = Ready<Result<Self::Response, Self::Error>>; 1385 1386 actix_service::always_ready!(); 1387 1388 fn call(&self, (req, _framed): (Request, Framed<T, Codec>)) -> Self::Future { 1389 assert_eq!(req.method(), Method::GET); 1390 assert!(req.upgrade()); 1391 assert_eq!(req.headers().get("upgrade").unwrap(), "websocket"); 1392 ready(Ok(())) 1393 } 1394 } 1395 1396 lazy(|cx| { 1397 let mut buf = TestSeqBuffer::empty(); 1398 let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); 1399 1400 let services = HttpFlow::new(ok_service(), ExpectHandler, Some(TestUpgrade)); 1401 1402 let h1 = Dispatcher::<_, _, _, _, TestUpgrade>::new( 1403 buf.clone(), 1404 cfg, 1405 services, 1406 OnConnectData::default(), 1407 None, 1408 ); 1409 1410 buf.extend_read_buf( 1411 "\ 1412 GET /ws HTTP/1.1\r\n\ 1413 Connection: Upgrade\r\n\ 1414 Upgrade: websocket\r\n\ 1415 \r\n\ 1416 ", 1417 ); 1418 1419 actix_rt::pin!(h1); 1420 1421 assert!(h1.as_mut().poll(cx).is_ready()); 1422 assert!(matches!(&h1.inner, DispatcherState::Upgrade(_))); 1423 1424 // polls: manual shutdown 1425 assert_eq!(h1.poll_count, 2); 1426 }) 1427 .await; 1428 } 1429 } 1430