1 use std::{
2     collections::VecDeque,
3     error::Error as StdError,
4     fmt,
5     future::Future,
6     io, mem, net,
7     pin::Pin,
8     rc::Rc,
9     task::{Context, Poll},
10 };
11 
12 use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts};
13 use actix_rt::time::{sleep_until, Instant, Sleep};
14 use actix_service::Service;
15 use bitflags::bitflags;
16 use bytes::{Buf, BytesMut};
17 use futures_core::ready;
18 use log::{error, trace};
19 use pin_project::pin_project;
20 
21 use crate::{
22     body::{AnyBody, BodySize, MessageBody},
23     config::ServiceConfig,
24     error::{DispatchError, ParseError, PayloadError},
25     service::HttpFlow,
26     OnConnectData, Request, Response, StatusCode,
27 };
28 
29 use super::{
30     codec::Codec,
31     payload::{Payload, PayloadSender, PayloadStatus},
32     Message, MessageType,
33 };
34 
35 const LW_BUFFER_SIZE: usize = 1024;
36 const HW_BUFFER_SIZE: usize = 1024 * 8;
37 const MAX_PIPELINED_MESSAGES: usize = 16;
38 
39 bitflags! {
40     pub struct Flags: u8 {
41         const STARTED            = 0b0000_0001;
42         const KEEPALIVE          = 0b0000_0010;
43         const SHUTDOWN           = 0b0000_0100;
44         const READ_DISCONNECT    = 0b0000_1000;
45         const WRITE_DISCONNECT   = 0b0001_0000;
46     }
47 }
48 
49 #[pin_project]
50 /// Dispatcher for HTTP/1.1 protocol
51 pub struct Dispatcher<T, S, B, X, U>
52 where
53     S: Service<Request>,
54     S::Error: Into<Response<AnyBody>>,
55 
56     B: MessageBody,
57     B::Error: Into<Box<dyn StdError>>,
58 
59     X: Service<Request, Response = Request>,
60     X::Error: Into<Response<AnyBody>>,
61 
62     U: Service<(Request, Framed<T, Codec>), Response = ()>,
63     U::Error: fmt::Display,
64 {
65     #[pin]
66     inner: DispatcherState<T, S, B, X, U>,
67 
68     #[cfg(test)]
69     poll_count: u64,
70 }
71 
72 #[pin_project(project = DispatcherStateProj)]
73 enum DispatcherState<T, S, B, X, U>
74 where
75     S: Service<Request>,
76     S::Error: Into<Response<AnyBody>>,
77 
78     B: MessageBody,
79     B::Error: Into<Box<dyn StdError>>,
80 
81     X: Service<Request, Response = Request>,
82     X::Error: Into<Response<AnyBody>>,
83 
84     U: Service<(Request, Framed<T, Codec>), Response = ()>,
85     U::Error: fmt::Display,
86 {
87     Normal(#[pin] InnerDispatcher<T, S, B, X, U>),
88     Upgrade(#[pin] U::Future),
89 }
90 
91 #[pin_project(project = InnerDispatcherProj)]
92 struct InnerDispatcher<T, S, B, X, U>
93 where
94     S: Service<Request>,
95     S::Error: Into<Response<AnyBody>>,
96 
97     B: MessageBody,
98     B::Error: Into<Box<dyn StdError>>,
99 
100     X: Service<Request, Response = Request>,
101     X::Error: Into<Response<AnyBody>>,
102 
103     U: Service<(Request, Framed<T, Codec>), Response = ()>,
104     U::Error: fmt::Display,
105 {
106     flow: Rc<HttpFlow<S, X, U>>,
107     on_connect_data: OnConnectData,
108     flags: Flags,
109     peer_addr: Option<net::SocketAddr>,
110     error: Option<DispatchError>,
111 
112     #[pin]
113     state: State<S, B, X>,
114     payload: Option<PayloadSender>,
115     messages: VecDeque<DispatcherMessage>,
116 
117     ka_expire: Instant,
118     #[pin]
119     ka_timer: Option<Sleep>,
120 
121     io: Option<T>,
122     read_buf: BytesMut,
123     write_buf: BytesMut,
124     codec: Codec,
125 }
126 
127 enum DispatcherMessage {
128     Item(Request),
129     Upgrade(Request),
130     Error(Response<()>),
131 }
132 
133 #[pin_project(project = StateProj)]
134 enum State<S, B, X>
135 where
136     S: Service<Request>,
137     X: Service<Request, Response = Request>,
138 
139     B: MessageBody,
140     B::Error: Into<Box<dyn StdError>>,
141 {
142     None,
143     ExpectCall(#[pin] X::Future),
144     ServiceCall(#[pin] S::Future),
145     SendPayload(#[pin] B),
146     SendErrorPayload(#[pin] AnyBody),
147 }
148 
149 impl<S, B, X> State<S, B, X>
150 where
151     S: Service<Request>,
152 
153     X: Service<Request, Response = Request>,
154 
155     B: MessageBody,
156     B::Error: Into<Box<dyn StdError>>,
157 {
is_empty(&self) -> bool158     fn is_empty(&self) -> bool {
159         matches!(self, State::None)
160     }
161 }
162 
163 enum PollResponse {
164     Upgrade(Request),
165     DoNothing,
166     DrainWriteBuf,
167 }
168 
169 impl<T, S, B, X, U> Dispatcher<T, S, B, X, U>
170 where
171     T: AsyncRead + AsyncWrite + Unpin,
172 
173     S: Service<Request>,
174     S::Error: Into<Response<AnyBody>>,
175     S::Response: Into<Response<B>>,
176 
177     B: MessageBody,
178     B::Error: Into<Box<dyn StdError>>,
179 
180     X: Service<Request, Response = Request>,
181     X::Error: Into<Response<AnyBody>>,
182 
183     U: Service<(Request, Framed<T, Codec>), Response = ()>,
184     U::Error: fmt::Display,
185 {
186     /// Create HTTP/1 dispatcher.
new( io: T, config: ServiceConfig, flow: Rc<HttpFlow<S, X, U>>, on_connect_data: OnConnectData, peer_addr: Option<net::SocketAddr>, ) -> Self187     pub(crate) fn new(
188         io: T,
189         config: ServiceConfig,
190         flow: Rc<HttpFlow<S, X, U>>,
191         on_connect_data: OnConnectData,
192         peer_addr: Option<net::SocketAddr>,
193     ) -> Self {
194         let flags = if config.keep_alive_enabled() {
195             Flags::KEEPALIVE
196         } else {
197             Flags::empty()
198         };
199 
200         // keep-alive timer
201         let (ka_expire, ka_timer) = match config.keep_alive_timer() {
202             Some(delay) => (delay.deadline(), Some(delay)),
203             None => (config.now(), None),
204         };
205 
206         Dispatcher {
207             inner: DispatcherState::Normal(InnerDispatcher {
208                 read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
209                 write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
210                 payload: None,
211                 state: State::None,
212                 error: None,
213                 messages: VecDeque::new(),
214                 io: Some(io),
215                 codec: Codec::new(config),
216                 flow,
217                 on_connect_data,
218                 flags,
219                 peer_addr,
220                 ka_expire,
221                 ka_timer,
222             }),
223 
224             #[cfg(test)]
225             poll_count: 0,
226         }
227     }
228 }
229 
230 impl<T, S, B, X, U> InnerDispatcher<T, S, B, X, U>
231 where
232     T: AsyncRead + AsyncWrite + Unpin,
233 
234     S: Service<Request>,
235     S::Error: Into<Response<AnyBody>>,
236     S::Response: Into<Response<B>>,
237 
238     B: MessageBody,
239     B::Error: Into<Box<dyn StdError>>,
240 
241     X: Service<Request, Response = Request>,
242     X::Error: Into<Response<AnyBody>>,
243 
244     U: Service<(Request, Framed<T, Codec>), Response = ()>,
245     U::Error: fmt::Display,
246 {
can_read(&self, cx: &mut Context<'_>) -> bool247     fn can_read(&self, cx: &mut Context<'_>) -> bool {
248         if self.flags.contains(Flags::READ_DISCONNECT) {
249             false
250         } else if let Some(ref info) = self.payload {
251             info.need_read(cx) == PayloadStatus::Read
252         } else {
253             true
254         }
255     }
256 
257     // if checked is set to true, delay disconnect until all tasks have finished.
client_disconnected(self: Pin<&mut Self>)258     fn client_disconnected(self: Pin<&mut Self>) {
259         let this = self.project();
260         this.flags
261             .insert(Flags::READ_DISCONNECT | Flags::WRITE_DISCONNECT);
262         if let Some(mut payload) = this.payload.take() {
263             payload.set_error(PayloadError::Incomplete(None));
264         }
265     }
266 
poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), io::Error>>267     fn poll_flush(
268         self: Pin<&mut Self>,
269         cx: &mut Context<'_>,
270     ) -> Poll<Result<(), io::Error>> {
271         let InnerDispatcherProj { io, write_buf, .. } = self.project();
272         let mut io = Pin::new(io.as_mut().unwrap());
273 
274         let len = write_buf.len();
275         let mut written = 0;
276 
277         while written < len {
278             match io.as_mut().poll_write(cx, &write_buf[written..])? {
279                 Poll::Ready(0) => {
280                     return Poll::Ready(Err(io::Error::new(
281                         io::ErrorKind::WriteZero,
282                         "",
283                     )))
284                 }
285                 Poll::Ready(n) => written += n,
286                 Poll::Pending => {
287                     write_buf.advance(written);
288                     return Poll::Pending;
289                 }
290             }
291         }
292 
293         // everything has written to io. clear buffer.
294         write_buf.clear();
295 
296         // flush the io and check if get blocked.
297         io.poll_flush(cx)
298     }
299 
send_response_inner( self: Pin<&mut Self>, message: Response<()>, body: &impl MessageBody, ) -> Result<BodySize, DispatchError>300     fn send_response_inner(
301         self: Pin<&mut Self>,
302         message: Response<()>,
303         body: &impl MessageBody,
304     ) -> Result<BodySize, DispatchError> {
305         let size = body.size();
306         let mut this = self.project();
307         this.codec
308             .encode(Message::Item((message, size)), &mut this.write_buf)
309             .map_err(|err| {
310                 if let Some(mut payload) = this.payload.take() {
311                     payload.set_error(PayloadError::Incomplete(None));
312                 }
313                 DispatchError::Io(err)
314             })?;
315 
316         this.flags.set(Flags::KEEPALIVE, this.codec.keepalive());
317 
318         Ok(size)
319     }
320 
send_response( mut self: Pin<&mut Self>, message: Response<()>, body: B, ) -> Result<(), DispatchError>321     fn send_response(
322         mut self: Pin<&mut Self>,
323         message: Response<()>,
324         body: B,
325     ) -> Result<(), DispatchError> {
326         let size = self.as_mut().send_response_inner(message, &body)?;
327         let state = match size {
328             BodySize::None | BodySize::Empty => State::None,
329             _ => State::SendPayload(body),
330         };
331         self.project().state.set(state);
332         Ok(())
333     }
334 
send_error_response( mut self: Pin<&mut Self>, message: Response<()>, body: AnyBody, ) -> Result<(), DispatchError>335     fn send_error_response(
336         mut self: Pin<&mut Self>,
337         message: Response<()>,
338         body: AnyBody,
339     ) -> Result<(), DispatchError> {
340         let size = self.as_mut().send_response_inner(message, &body)?;
341         let state = match size {
342             BodySize::None | BodySize::Empty => State::None,
343             _ => State::SendErrorPayload(body),
344         };
345         self.project().state.set(state);
346         Ok(())
347     }
348 
send_continue(self: Pin<&mut Self>)349     fn send_continue(self: Pin<&mut Self>) {
350         self.project()
351             .write_buf
352             .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n");
353     }
354 
poll_response( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result<PollResponse, DispatchError>355     fn poll_response(
356         mut self: Pin<&mut Self>,
357         cx: &mut Context<'_>,
358     ) -> Result<PollResponse, DispatchError> {
359         'res: loop {
360             let mut this = self.as_mut().project();
361             match this.state.as_mut().project() {
362                 // no future is in InnerDispatcher state. pop next message.
363                 StateProj::None => match this.messages.pop_front() {
364                     // handle request message.
365                     Some(DispatcherMessage::Item(req)) => {
366                         // Handle `EXPECT: 100-Continue` header
367                         if req.head().expect() {
368                             // set InnerDispatcher state and continue loop to poll it.
369                             let task = this.flow.expect.call(req);
370                             this.state.set(State::ExpectCall(task));
371                         } else {
372                             // the same as expect call.
373                             let task = this.flow.service.call(req);
374                             this.state.set(State::ServiceCall(task));
375                         };
376                     }
377 
378                     // handle error message.
379                     Some(DispatcherMessage::Error(res)) => {
380                         // send_response would update InnerDispatcher state to SendPayload or
381                         // None(If response body is empty).
382                         // continue loop to poll it.
383                         self.as_mut().send_error_response(res, AnyBody::Empty)?;
384                     }
385 
386                     // return with upgrade request and poll it exclusively.
387                     Some(DispatcherMessage::Upgrade(req)) => {
388                         return Ok(PollResponse::Upgrade(req));
389                     }
390 
391                     // all messages are dealt with.
392                     None => return Ok(PollResponse::DoNothing),
393                 },
394                 StateProj::ServiceCall(fut) => match fut.poll(cx) {
395                     // service call resolved. send response.
396                     Poll::Ready(Ok(res)) => {
397                         let (res, body) = res.into().replace_body(());
398                         self.as_mut().send_response(res, body)?;
399                     }
400 
401                     // send service call error as response
402                     Poll::Ready(Err(err)) => {
403                         let res: Response<AnyBody> = err.into();
404                         let (res, body) = res.replace_body(());
405                         self.as_mut().send_error_response(res, body)?;
406                     }
407 
408                     // service call pending and could be waiting for more chunk messages.
409                     // (pipeline message limit and/or payload can_read limit)
410                     Poll::Pending => {
411                         // no new message is decoded and no new payload is feed.
412                         // nothing to do except waiting for new incoming data from client.
413                         if !self.as_mut().poll_request(cx)? {
414                             return Ok(PollResponse::DoNothing);
415                         }
416                         // otherwise keep loop.
417                     }
418                 },
419 
420                 StateProj::SendPayload(mut stream) => {
421                     // keep populate writer buffer until buffer size limit hit,
422                     // get blocked or finished.
423                     while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
424                         match stream.as_mut().poll_next(cx) {
425                             Poll::Ready(Some(Ok(item))) => {
426                                 this.codec.encode(
427                                     Message::Chunk(Some(item)),
428                                     &mut this.write_buf,
429                                 )?;
430                             }
431 
432                             Poll::Ready(None) => {
433                                 this.codec
434                                     .encode(Message::Chunk(None), &mut this.write_buf)?;
435                                 // payload stream finished.
436                                 // set state to None and handle next message
437                                 this.state.set(State::None);
438                                 continue 'res;
439                             }
440 
441                             Poll::Ready(Some(Err(err))) => {
442                                 return Err(DispatchError::Body(err.into()))
443                             }
444 
445                             Poll::Pending => return Ok(PollResponse::DoNothing),
446                         }
447                     }
448                     // buffer is beyond max size.
449                     // return and try to write the whole buffer to io stream.
450                     return Ok(PollResponse::DrainWriteBuf);
451                 }
452 
453                 StateProj::SendErrorPayload(mut stream) => {
454                     // TODO: de-dupe impl with SendPayload
455 
456                     // keep populate writer buffer until buffer size limit hit,
457                     // get blocked or finished.
458                     while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
459                         match stream.as_mut().poll_next(cx) {
460                             Poll::Ready(Some(Ok(item))) => {
461                                 this.codec.encode(
462                                     Message::Chunk(Some(item)),
463                                     &mut this.write_buf,
464                                 )?;
465                             }
466 
467                             Poll::Ready(None) => {
468                                 this.codec
469                                     .encode(Message::Chunk(None), &mut this.write_buf)?;
470                                 // payload stream finished.
471                                 // set state to None and handle next message
472                                 this.state.set(State::None);
473                                 continue 'res;
474                             }
475 
476                             Poll::Ready(Some(Err(err))) => {
477                                 return Err(DispatchError::Service(err.into()))
478                             }
479 
480                             Poll::Pending => return Ok(PollResponse::DoNothing),
481                         }
482                     }
483                     // buffer is beyond max size.
484                     // return and try to write the whole buffer to io stream.
485                     return Ok(PollResponse::DrainWriteBuf);
486                 }
487 
488                 StateProj::ExpectCall(fut) => match fut.poll(cx) {
489                     // expect resolved. write continue to buffer and set InnerDispatcher state
490                     // to service call.
491                     Poll::Ready(Ok(req)) => {
492                         this.write_buf
493                             .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n");
494                         let fut = this.flow.service.call(req);
495                         this.state.set(State::ServiceCall(fut));
496                     }
497 
498                     // send expect error as response
499                     Poll::Ready(Err(err)) => {
500                         let res: Response<AnyBody> = err.into();
501                         let (res, body) = res.replace_body(());
502                         self.as_mut().send_error_response(res, body)?;
503                     }
504 
505                     // expect must be solved before progress can be made.
506                     Poll::Pending => return Ok(PollResponse::DoNothing),
507                 },
508             }
509         }
510     }
511 
handle_request( mut self: Pin<&mut Self>, req: Request, cx: &mut Context<'_>, ) -> Result<(), DispatchError>512     fn handle_request(
513         mut self: Pin<&mut Self>,
514         req: Request,
515         cx: &mut Context<'_>,
516     ) -> Result<(), DispatchError> {
517         // Handle `EXPECT: 100-Continue` header
518         let mut this = self.as_mut().project();
519         if req.head().expect() {
520             // set dispatcher state so the future is pinned.
521             let task = this.flow.expect.call(req);
522             this.state.set(State::ExpectCall(task));
523         } else {
524             // the same as above.
525             let task = this.flow.service.call(req);
526             this.state.set(State::ServiceCall(task));
527         };
528 
529         // eagerly poll the future for once(or twice if expect is resolved immediately).
530         loop {
531             match self.as_mut().project().state.project() {
532                 StateProj::ExpectCall(fut) => {
533                     match fut.poll(cx) {
534                         // expect is resolved. continue loop and poll the service call branch.
535                         Poll::Ready(Ok(req)) => {
536                             self.as_mut().send_continue();
537                             let mut this = self.as_mut().project();
538                             let task = this.flow.service.call(req);
539                             this.state.set(State::ServiceCall(task));
540                             continue;
541                         }
542                         // future is pending. return Ok(()) to notify that a new state is
543                         // set  and the outer loop should be continue.
544                         Poll::Pending => return Ok(()),
545                         // future is error. send response and return a result. On success
546                         // to notify the dispatcher a new state is set and the outer loop
547                         // should be continue.
548                         Poll::Ready(Err(err)) => {
549                             let res: Response<AnyBody> = err.into();
550                             let (res, body) = res.replace_body(());
551                             return self.send_error_response(res, body);
552                         }
553                     }
554                 }
555                 StateProj::ServiceCall(fut) => {
556                     // return no matter the service call future's result.
557                     return match fut.poll(cx) {
558                         // future is resolved. send response and return a result. On success
559                         // to notify the dispatcher a new state is set and the outer loop
560                         // should be continue.
561                         Poll::Ready(Ok(res)) => {
562                             let (res, body) = res.into().replace_body(());
563                             self.send_response(res, body)
564                         }
565                         // see the comment on ExpectCall state branch's Pending.
566                         Poll::Pending => Ok(()),
567                         // see the comment on ExpectCall state branch's Ready(Err(err)).
568                         Poll::Ready(Err(err)) => {
569                             let res: Response<AnyBody> = err.into();
570                             let (res, body) = res.replace_body(());
571                             self.send_error_response(res, body)
572                         }
573                     };
574                 }
575                 _ => unreachable!(
576                     "State must be set to ServiceCall or ExceptCall in handle_request"
577                 ),
578             }
579         }
580     }
581 
582     /// Process one incoming request.
poll_request( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result<bool, DispatchError>583     fn poll_request(
584         mut self: Pin<&mut Self>,
585         cx: &mut Context<'_>,
586     ) -> Result<bool, DispatchError> {
587         // limit amount of non-processed requests
588         if self.messages.len() >= MAX_PIPELINED_MESSAGES || !self.can_read(cx) {
589             return Ok(false);
590         }
591 
592         let mut updated = false;
593         let mut this = self.as_mut().project();
594         loop {
595             match this.codec.decode(&mut this.read_buf) {
596                 Ok(Some(msg)) => {
597                     updated = true;
598                     this.flags.insert(Flags::STARTED);
599 
600                     match msg {
601                         Message::Item(mut req) => {
602                             req.head_mut().peer_addr = *this.peer_addr;
603 
604                             // merge on_connect_ext data into request extensions
605                             this.on_connect_data.merge_into(&mut req);
606 
607                             match this.codec.message_type() {
608                                 // Request is upgradable. add upgrade message and break.
609                                 // everything remain in read buffer would be handed to
610                                 // upgraded Request.
611                                 MessageType::Stream if this.flow.upgrade.is_some() => {
612                                     this.messages
613                                         .push_back(DispatcherMessage::Upgrade(req));
614                                     break;
615                                 }
616 
617                                 // Request is not upgradable.
618                                 MessageType::Payload | MessageType::Stream => {
619                                     /*
620                                     PayloadSender and Payload are smart pointers share the
621                                     same state.
622                                     PayloadSender is attached to dispatcher and used to sink
623                                     new chunked request data to state.
624                                     Payload is attached to Request and passed to Service::call
625                                     where the state can be collected and consumed.
626                                     */
627                                     let (ps, pl) = Payload::create(false);
628                                     let (req1, _) =
629                                         req.replace_payload(crate::Payload::H1(pl));
630                                     req = req1;
631                                     *this.payload = Some(ps);
632                                 }
633 
634                                 // Request has no payload.
635                                 MessageType::None => {}
636                             }
637 
638                             // handle request early when no future in InnerDispatcher state.
639                             if this.state.is_empty() {
640                                 self.as_mut().handle_request(req, cx)?;
641                                 this = self.as_mut().project();
642                             } else {
643                                 this.messages.push_back(DispatcherMessage::Item(req));
644                             }
645                         }
646                         Message::Chunk(Some(chunk)) => {
647                             if let Some(ref mut payload) = this.payload {
648                                 payload.feed_data(chunk);
649                             } else {
650                                 error!(
651                                     "Internal server error: unexpected payload chunk"
652                                 );
653                                 this.flags.insert(Flags::READ_DISCONNECT);
654                                 this.messages.push_back(DispatcherMessage::Error(
655                                     Response::internal_server_error().drop_body(),
656                                 ));
657                                 *this.error = Some(DispatchError::InternalError);
658                                 break;
659                             }
660                         }
661                         Message::Chunk(None) => {
662                             if let Some(mut payload) = this.payload.take() {
663                                 payload.feed_eof();
664                             } else {
665                                 error!("Internal server error: unexpected eof");
666                                 this.flags.insert(Flags::READ_DISCONNECT);
667                                 this.messages.push_back(DispatcherMessage::Error(
668                                     Response::internal_server_error().drop_body(),
669                                 ));
670                                 *this.error = Some(DispatchError::InternalError);
671                                 break;
672                             }
673                         }
674                     }
675                 }
676                 // decode is partial and buffer is not full yet.
677                 // break and wait for more read.
678                 Ok(None) => break,
679                 Err(ParseError::Io(err)) => {
680                     self.as_mut().client_disconnected();
681                     this = self.as_mut().project();
682                     *this.error = Some(DispatchError::Io(err));
683                     break;
684                 }
685                 Err(ParseError::TooLarge) => {
686                     if let Some(mut payload) = this.payload.take() {
687                         payload.set_error(PayloadError::Overflow);
688                     }
689                     // Requests overflow buffer size should be responded with 431
690                     this.messages.push_back(DispatcherMessage::Error(
691                         Response::with_body(
692                             StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE,
693                             (),
694                         ),
695                     ));
696                     this.flags.insert(Flags::READ_DISCONNECT);
697                     *this.error = Some(ParseError::TooLarge.into());
698                     break;
699                 }
700                 Err(err) => {
701                     if let Some(mut payload) = this.payload.take() {
702                         payload.set_error(PayloadError::EncodingCorrupted);
703                     }
704 
705                     // Malformed requests should be responded with 400
706                     this.messages.push_back(DispatcherMessage::Error(
707                         Response::bad_request().drop_body(),
708                     ));
709                     this.flags.insert(Flags::READ_DISCONNECT);
710                     *this.error = Some(err.into());
711                     break;
712                 }
713             }
714         }
715 
716         if updated && this.ka_timer.is_some() {
717             if let Some(expire) = this.codec.config().keep_alive_expire() {
718                 *this.ka_expire = expire;
719             }
720         }
721         Ok(updated)
722     }
723 
724     /// keep-alive timer
poll_keepalive( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result<(), DispatchError>725     fn poll_keepalive(
726         mut self: Pin<&mut Self>,
727         cx: &mut Context<'_>,
728     ) -> Result<(), DispatchError> {
729         let mut this = self.as_mut().project();
730 
731         // when a branch is not explicit return early it's meant to fall through
732         // and return as Ok(())
733         match this.ka_timer.as_mut().as_pin_mut() {
734             None => {
735                 // conditionally go into shutdown timeout
736                 if this.flags.contains(Flags::SHUTDOWN) {
737                     if let Some(deadline) = this.codec.config().client_disconnect_timer()
738                     {
739                         // write client disconnect time out and poll again to
740                         // go into Some<Pin<&mut Sleep>> branch
741                         this.ka_timer.set(Some(sleep_until(deadline)));
742                         return self.poll_keepalive(cx);
743                     }
744                 }
745             }
746             Some(mut timer) => {
747                 // only operate when keep-alive timer is resolved.
748                 if timer.as_mut().poll(cx).is_ready() {
749                     // got timeout during shutdown, drop connection
750                     if this.flags.contains(Flags::SHUTDOWN) {
751                         return Err(DispatchError::DisconnectTimeout);
752                         // exceed deadline. check for any outstanding tasks
753                     } else if timer.deadline() >= *this.ka_expire {
754                         // have no task at hand.
755                         if this.state.is_empty() && this.write_buf.is_empty() {
756                             if this.flags.contains(Flags::STARTED) {
757                                 trace!("Keep-alive timeout, close connection");
758                                 this.flags.insert(Flags::SHUTDOWN);
759 
760                                 // start shutdown timeout
761                                 if let Some(deadline) =
762                                     this.codec.config().client_disconnect_timer()
763                                 {
764                                     timer.as_mut().reset(deadline);
765                                     let _ = timer.poll(cx);
766                                 } else {
767                                     // no shutdown timeout, drop socket
768                                     this.flags.insert(Flags::WRITE_DISCONNECT);
769                                 }
770                             } else {
771                                 // timeout on first request (slow request) return 408
772                                 trace!("Slow request timeout");
773                                 let _ = self.as_mut().send_error_response(
774                                     Response::with_body(StatusCode::REQUEST_TIMEOUT, ()),
775                                     AnyBody::Empty,
776                                 );
777                                 this = self.project();
778                                 this.flags.insert(Flags::STARTED | Flags::SHUTDOWN);
779                             }
780                             // still have unfinished task. try to reset and register keep-alive.
781                         } else if let Some(deadline) =
782                             this.codec.config().keep_alive_expire()
783                         {
784                             timer.as_mut().reset(deadline);
785                             let _ = timer.poll(cx);
786                         }
787                         // timer resolved but still have not met the keep-alive expire deadline.
788                         // reset and register for later wakeup.
789                     } else {
790                         timer.as_mut().reset(*this.ka_expire);
791                         let _ = timer.poll(cx);
792                     }
793                 }
794             }
795         }
796         Ok(())
797     }
798 
799     /// Returns true when io stream can be disconnected after write to it.
800     ///
801     /// It covers these conditions:
802     ///
803     /// - `std::io::ErrorKind::ConnectionReset` after partial read.
804     /// - all data read done.
805     #[inline(always)]
read_available( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result<bool, DispatchError>806     fn read_available(
807         self: Pin<&mut Self>,
808         cx: &mut Context<'_>,
809     ) -> Result<bool, DispatchError> {
810         let this = self.project();
811 
812         if this.flags.contains(Flags::READ_DISCONNECT) {
813             return Ok(false);
814         };
815 
816         let mut io = Pin::new(this.io.as_mut().unwrap());
817 
818         let mut read_some = false;
819 
820         loop {
821             // Return early when read buf exceed decoder's max buffer size.
822             if this.read_buf.len() >= super::decoder::MAX_BUFFER_SIZE {
823                 /*
824                  At this point it's not known IO stream is still scheduled
825                  to be waked up. so force wake up dispatcher just in case.
826 
827                  Reason:
828                  AsyncRead mostly would only have guarantee wake up
829                  when the poll_read return Poll::Pending.
830 
831                  Case:
832                  When read_buf is beyond max buffer size the early return
833                  could be successfully be parsed as a new Request.
834                  This case would not generate ParseError::TooLarge
835                  and at this point IO stream is not fully read to Pending
836                  and would result in dispatcher stuck until timeout (KA)
837 
838                  Note:
839                  This is a perf choice to reduce branch on
840                  <Request as MessageType>::decode.
841 
842                  A Request head too large to parse is only checked on
843                  httparse::Status::Partial condition.
844                 */
845                 if this.payload.is_none() {
846                     /*
847                     When dispatcher has a payload the responsibility of
848                     wake up it would be shift to h1::payload::Payload.
849 
850                     Reason:
851                     Self wake up when there is payload would waste poll
852                     and/or result in over read.
853 
854                     Case:
855                     When payload is (partial) dropped by user there is
856                     no need to do read anymore.
857                     At this case read_buf could always remain beyond
858                     MAX_BUFFER_SIZE and self wake up would be busy poll
859                     dispatcher and waste resource.
860 
861                     */
862                     cx.waker().wake_by_ref();
863                 }
864 
865                 return Ok(false);
866             }
867 
868             // grow buffer if necessary.
869             let remaining = this.read_buf.capacity() - this.read_buf.len();
870             if remaining < LW_BUFFER_SIZE {
871                 this.read_buf.reserve(HW_BUFFER_SIZE - remaining);
872             }
873 
874             match actix_codec::poll_read_buf(io.as_mut(), cx, this.read_buf) {
875                 Poll::Ready(Ok(n)) => {
876                     if n == 0 {
877                         return Ok(true);
878                     }
879                     read_some = true;
880                 }
881                 Poll::Pending => return Ok(false),
882                 Poll::Ready(Err(err)) => {
883                     return match err.kind() {
884                         io::ErrorKind::WouldBlock => Ok(false),
885                         io::ErrorKind::ConnectionReset if read_some => Ok(true),
886                         _ => Err(DispatchError::Io(err)),
887                     }
888                 }
889             }
890         }
891     }
892 
893     /// call upgrade service with request.
upgrade(self: Pin<&mut Self>, req: Request) -> U::Future894     fn upgrade(self: Pin<&mut Self>, req: Request) -> U::Future {
895         let this = self.project();
896         let mut parts = FramedParts::with_read_buf(
897             this.io.take().unwrap(),
898             mem::take(this.codec),
899             mem::take(this.read_buf),
900         );
901         parts.write_buf = mem::take(this.write_buf);
902         let framed = Framed::from_parts(parts);
903         this.flow.upgrade.as_ref().unwrap().call((req, framed))
904     }
905 }
906 
907 impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U>
908 where
909     T: AsyncRead + AsyncWrite + Unpin,
910 
911     S: Service<Request>,
912     S::Error: Into<Response<AnyBody>>,
913     S::Response: Into<Response<B>>,
914 
915     B: MessageBody,
916     B::Error: Into<Box<dyn StdError>>,
917 
918     X: Service<Request, Response = Request>,
919     X::Error: Into<Response<AnyBody>>,
920 
921     U: Service<(Request, Framed<T, Codec>), Response = ()>,
922     U::Error: fmt::Display,
923 {
924     type Output = Result<(), DispatchError>;
925 
926     #[inline]
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>927     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
928         let this = self.as_mut().project();
929 
930         #[cfg(test)]
931         {
932             *this.poll_count += 1;
933         }
934 
935         match this.inner.project() {
936             DispatcherStateProj::Normal(mut inner) => {
937                 inner.as_mut().poll_keepalive(cx)?;
938 
939                 if inner.flags.contains(Flags::SHUTDOWN) {
940                     if inner.flags.contains(Flags::WRITE_DISCONNECT) {
941                         Poll::Ready(Ok(()))
942                     } else {
943                         // flush buffer and wait on blocked.
944                         ready!(inner.as_mut().poll_flush(cx))?;
945                         Pin::new(inner.project().io.as_mut().unwrap())
946                             .poll_shutdown(cx)
947                             .map_err(DispatchError::from)
948                     }
949                 } else {
950                     // read from io stream and fill read buffer.
951                     let should_disconnect = inner.as_mut().read_available(cx)?;
952 
953                     inner.as_mut().poll_request(cx)?;
954 
955                     // io stream should to be closed.
956                     if should_disconnect {
957                         let inner = inner.as_mut().project();
958                         inner.flags.insert(Flags::READ_DISCONNECT);
959                         if let Some(mut payload) = inner.payload.take() {
960                             payload.feed_eof();
961                         }
962                     };
963 
964                     loop {
965                         // poll_response and populate write buffer.
966                         // drain indicate if write buffer should be emptied before next run.
967                         let drain = match inner.as_mut().poll_response(cx)? {
968                             PollResponse::DrainWriteBuf => true,
969                             PollResponse::DoNothing => false,
970                             // upgrade request and goes Upgrade variant of DispatcherState.
971                             PollResponse::Upgrade(req) => {
972                                 let upgrade = inner.upgrade(req);
973                                 self.as_mut()
974                                     .project()
975                                     .inner
976                                     .set(DispatcherState::Upgrade(upgrade));
977                                 return self.poll(cx);
978                             }
979                         };
980 
981                         // we didn't get WouldBlock from write operation,
982                         // so data get written to kernel completely (macOS)
983                         // and we have to write again otherwise response can get stuck
984                         //
985                         // TODO: what? is WouldBlock good or bad?
986                         // want to find a reference for this macOS behavior
987                         if inner.as_mut().poll_flush(cx)?.is_pending() || !drain {
988                             break;
989                         }
990                     }
991 
992                     // client is gone
993                     if inner.flags.contains(Flags::WRITE_DISCONNECT) {
994                         return Poll::Ready(Ok(()));
995                     }
996 
997                     let is_empty = inner.state.is_empty();
998 
999                     let inner_p = inner.as_mut().project();
1000                     // read half is closed and we do not processing any responses
1001                     if inner_p.flags.contains(Flags::READ_DISCONNECT) && is_empty {
1002                         inner_p.flags.insert(Flags::SHUTDOWN);
1003                     }
1004 
1005                     // keep-alive and stream errors
1006                     if is_empty && inner_p.write_buf.is_empty() {
1007                         if let Some(err) = inner_p.error.take() {
1008                             Poll::Ready(Err(err))
1009                         }
1010                         // disconnect if keep-alive is not enabled
1011                         else if inner_p.flags.contains(Flags::STARTED)
1012                             && !inner_p.flags.intersects(Flags::KEEPALIVE)
1013                         {
1014                             inner_p.flags.insert(Flags::SHUTDOWN);
1015                             self.poll(cx)
1016                         }
1017                         // disconnect if shutdown
1018                         else if inner_p.flags.contains(Flags::SHUTDOWN) {
1019                             self.poll(cx)
1020                         } else {
1021                             Poll::Pending
1022                         }
1023                     } else {
1024                         Poll::Pending
1025                     }
1026                 }
1027             }
1028             DispatcherStateProj::Upgrade(fut) => fut.poll(cx).map_err(|e| {
1029                 error!("Upgrade handler error: {}", e);
1030                 DispatchError::Upgrade
1031             }),
1032         }
1033     }
1034 }
1035 
1036 #[cfg(test)]
1037 mod tests {
1038     use std::str;
1039 
1040     use actix_service::fn_service;
1041     use actix_utils::future::{ready, Ready};
1042     use bytes::Bytes;
1043     use futures_util::future::lazy;
1044 
1045     use super::*;
1046     use crate::{
1047         error::Error,
1048         h1::{ExpectHandler, UpgradeHandler},
1049         http::Method,
1050         test::{TestBuffer, TestSeqBuffer},
1051         HttpMessage, KeepAlive,
1052     };
1053 
find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize>1054     fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize> {
1055         haystack[from..]
1056             .windows(needle.len())
1057             .position(|window| window == needle)
1058     }
1059 
stabilize_date_header(payload: &mut [u8])1060     fn stabilize_date_header(payload: &mut [u8]) {
1061         let mut from = 0;
1062 
1063         while let Some(pos) = find_slice(&payload, b"date", from) {
1064             payload[(from + pos)..(from + pos + 35)]
1065                 .copy_from_slice(b"date: Thu, 01 Jan 1970 12:34:56 UTC");
1066             from += 35;
1067         }
1068     }
1069 
ok_service() -> impl Service<Request, Response = Response<AnyBody>, Error = Error>1070     fn ok_service() -> impl Service<Request, Response = Response<AnyBody>, Error = Error>
1071     {
1072         fn_service(|_req: Request| ready(Ok::<_, Error>(Response::ok())))
1073     }
1074 
echo_path_service( ) -> impl Service<Request, Response = Response<AnyBody>, Error = Error>1075     fn echo_path_service(
1076     ) -> impl Service<Request, Response = Response<AnyBody>, Error = Error> {
1077         fn_service(|req: Request| {
1078             let path = req.path().as_bytes();
1079             ready(Ok::<_, Error>(
1080                 Response::ok().set_body(AnyBody::from_slice(path)),
1081             ))
1082         })
1083     }
1084 
echo_payload_service( ) -> impl Service<Request, Response = Response<Bytes>, Error = Error>1085     fn echo_payload_service(
1086     ) -> impl Service<Request, Response = Response<Bytes>, Error = Error> {
1087         fn_service(|mut req: Request| {
1088             Box::pin(async move {
1089                 use futures_util::stream::StreamExt as _;
1090 
1091                 let mut pl = req.take_payload();
1092                 let mut body = BytesMut::new();
1093                 while let Some(chunk) = pl.next().await {
1094                     body.extend_from_slice(chunk.unwrap().chunk())
1095                 }
1096 
1097                 Ok::<_, Error>(Response::ok().set_body(body.freeze()))
1098             })
1099         })
1100     }
1101 
1102     #[actix_rt::test]
test_req_parse_err()1103     async fn test_req_parse_err() {
1104         lazy(|cx| {
1105             let buf = TestBuffer::new("GET /test HTTP/1\r\n\r\n");
1106 
1107             let services = HttpFlow::new(ok_service(), ExpectHandler, None);
1108 
1109             let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
1110                 buf,
1111                 ServiceConfig::default(),
1112                 services,
1113                 OnConnectData::default(),
1114                 None,
1115             );
1116 
1117             actix_rt::pin!(h1);
1118 
1119             match h1.as_mut().poll(cx) {
1120                 Poll::Pending => panic!(),
1121                 Poll::Ready(res) => assert!(res.is_err()),
1122             }
1123 
1124             if let DispatcherStateProj::Normal(inner) = h1.project().inner.project() {
1125                 assert!(inner.flags.contains(Flags::READ_DISCONNECT));
1126                 assert_eq!(
1127                     &inner.project().io.take().unwrap().write_buf[..26],
1128                     b"HTTP/1.1 400 Bad Request\r\n"
1129                 );
1130             }
1131         })
1132         .await;
1133     }
1134 
1135     #[actix_rt::test]
test_pipelining()1136     async fn test_pipelining() {
1137         lazy(|cx| {
1138             let buf = TestBuffer::new(
1139                 "\
1140                 GET /abcd HTTP/1.1\r\n\r\n\
1141                 GET /def HTTP/1.1\r\n\r\n\
1142                 ",
1143             );
1144 
1145             let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None);
1146 
1147             let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
1148 
1149             let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
1150                 buf,
1151                 cfg,
1152                 services,
1153                 OnConnectData::default(),
1154                 None,
1155             );
1156 
1157             actix_rt::pin!(h1);
1158 
1159             assert!(matches!(&h1.inner, DispatcherState::Normal(_)));
1160 
1161             match h1.as_mut().poll(cx) {
1162                 Poll::Pending => panic!("first poll should not be pending"),
1163                 Poll::Ready(res) => assert!(res.is_ok()),
1164             }
1165 
1166             // polls: initial => shutdown
1167             assert_eq!(h1.poll_count, 2);
1168 
1169             if let DispatcherStateProj::Normal(inner) = h1.project().inner.project() {
1170                 let res = &mut inner.project().io.take().unwrap().write_buf[..];
1171                 stabilize_date_header(res);
1172 
1173                 let exp = b"\
1174                 HTTP/1.1 200 OK\r\n\
1175                 content-length: 5\r\n\
1176                 connection: close\r\n\
1177                 date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
1178                 /abcd\
1179                 HTTP/1.1 200 OK\r\n\
1180                 content-length: 4\r\n\
1181                 connection: close\r\n\
1182                 date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
1183                 /def\
1184                 ";
1185 
1186                 assert_eq!(res.to_vec(), exp.to_vec());
1187             }
1188         })
1189         .await;
1190 
1191         lazy(|cx| {
1192             let buf = TestBuffer::new(
1193                 "\
1194                 GET /abcd HTTP/1.1\r\n\r\n\
1195                 GET /def HTTP/1\r\n\r\n\
1196                 ",
1197             );
1198 
1199             let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None);
1200 
1201             let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
1202 
1203             let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
1204                 buf,
1205                 cfg,
1206                 services,
1207                 OnConnectData::default(),
1208                 None,
1209             );
1210 
1211             actix_rt::pin!(h1);
1212 
1213             assert!(matches!(&h1.inner, DispatcherState::Normal(_)));
1214 
1215             match h1.as_mut().poll(cx) {
1216                 Poll::Pending => panic!("first poll should not be pending"),
1217                 Poll::Ready(res) => assert!(res.is_err()),
1218             }
1219 
1220             // polls: initial => shutdown
1221             assert_eq!(h1.poll_count, 1);
1222 
1223             if let DispatcherStateProj::Normal(inner) = h1.project().inner.project() {
1224                 let res = &mut inner.project().io.take().unwrap().write_buf[..];
1225                 stabilize_date_header(res);
1226 
1227                 let exp = b"\
1228                 HTTP/1.1 200 OK\r\n\
1229                 content-length: 5\r\n\
1230                 connection: close\r\n\
1231                 date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
1232                 /abcd\
1233                 HTTP/1.1 400 Bad Request\r\n\
1234                 content-length: 0\r\n\
1235                 connection: close\r\n\
1236                 date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
1237                 ";
1238 
1239                 assert_eq!(res.to_vec(), exp.to_vec());
1240             }
1241         })
1242         .await;
1243     }
1244 
1245     #[actix_rt::test]
test_expect()1246     async fn test_expect() {
1247         lazy(|cx| {
1248             let mut buf = TestSeqBuffer::empty();
1249             let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None);
1250 
1251             let services = HttpFlow::new(echo_payload_service(), ExpectHandler, None);
1252 
1253             let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
1254                 buf.clone(),
1255                 cfg,
1256                 services,
1257                 OnConnectData::default(),
1258                 None,
1259             );
1260 
1261             buf.extend_read_buf(
1262                 "\
1263                 POST /upload HTTP/1.1\r\n\
1264                 Content-Length: 5\r\n\
1265                 Expect: 100-continue\r\n\
1266                 \r\n\
1267                 ",
1268             );
1269 
1270             actix_rt::pin!(h1);
1271 
1272             assert!(h1.as_mut().poll(cx).is_pending());
1273             assert!(matches!(&h1.inner, DispatcherState::Normal(_)));
1274 
1275             // polls: manual
1276             assert_eq!(h1.poll_count, 1);
1277             eprintln!("poll count: {}", h1.poll_count);
1278 
1279             if let DispatcherState::Normal(ref inner) = h1.inner {
1280                 let io = inner.io.as_ref().unwrap();
1281                 let res = &io.write_buf()[..];
1282                 assert_eq!(
1283                     str::from_utf8(res).unwrap(),
1284                     "HTTP/1.1 100 Continue\r\n\r\n"
1285                 );
1286             }
1287 
1288             buf.extend_read_buf("12345");
1289             assert!(h1.as_mut().poll(cx).is_ready());
1290 
1291             // polls: manual manual shutdown
1292             assert_eq!(h1.poll_count, 3);
1293 
1294             if let DispatcherState::Normal(ref inner) = h1.inner {
1295                 let io = inner.io.as_ref().unwrap();
1296                 let mut res = (&io.write_buf()[..]).to_owned();
1297                 stabilize_date_header(&mut res);
1298 
1299                 assert_eq!(
1300                     str::from_utf8(&res).unwrap(),
1301                     "\
1302                     HTTP/1.1 100 Continue\r\n\
1303                     \r\n\
1304                     HTTP/1.1 200 OK\r\n\
1305                     content-length: 5\r\n\
1306                     connection: close\r\n\
1307                     date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\
1308                     \r\n\
1309                     12345\
1310                     "
1311                 );
1312             }
1313         })
1314         .await;
1315     }
1316 
1317     #[actix_rt::test]
test_eager_expect()1318     async fn test_eager_expect() {
1319         lazy(|cx| {
1320             let mut buf = TestSeqBuffer::empty();
1321             let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None);
1322 
1323             let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
1324 
1325             let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
1326                 buf.clone(),
1327                 cfg,
1328                 services,
1329                 OnConnectData::default(),
1330                 None,
1331             );
1332 
1333             buf.extend_read_buf(
1334                 "\
1335                 POST /upload HTTP/1.1\r\n\
1336                 Content-Length: 5\r\n\
1337                 Expect: 100-continue\r\n\
1338                 \r\n\
1339                 ",
1340             );
1341 
1342             actix_rt::pin!(h1);
1343 
1344             assert!(h1.as_mut().poll(cx).is_ready());
1345             assert!(matches!(&h1.inner, DispatcherState::Normal(_)));
1346 
1347             // polls: manual shutdown
1348             assert_eq!(h1.poll_count, 2);
1349 
1350             if let DispatcherState::Normal(ref inner) = h1.inner {
1351                 let io = inner.io.as_ref().unwrap();
1352                 let mut res = (&io.write_buf()[..]).to_owned();
1353                 stabilize_date_header(&mut res);
1354 
1355                 // Despite the content-length header and even though the request payload has not
1356                 // been sent, this test expects a complete service response since the payload
1357                 // is not used at all. The service passed to dispatcher is path echo and doesn't
1358                 // consume payload bytes.
1359                 assert_eq!(
1360                     str::from_utf8(&res).unwrap(),
1361                     "\
1362                     HTTP/1.1 100 Continue\r\n\
1363                     \r\n\
1364                     HTTP/1.1 200 OK\r\n\
1365                     content-length: 7\r\n\
1366                     connection: close\r\n\
1367                     date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\
1368                     \r\n\
1369                     /upload\
1370                     "
1371                 );
1372             }
1373         })
1374         .await;
1375     }
1376 
1377     #[actix_rt::test]
test_upgrade()1378     async fn test_upgrade() {
1379         struct TestUpgrade;
1380 
1381         impl<T> Service<(Request, Framed<T, Codec>)> for TestUpgrade {
1382             type Response = ();
1383             type Error = Error;
1384             type Future = Ready<Result<Self::Response, Self::Error>>;
1385 
1386             actix_service::always_ready!();
1387 
1388             fn call(&self, (req, _framed): (Request, Framed<T, Codec>)) -> Self::Future {
1389                 assert_eq!(req.method(), Method::GET);
1390                 assert!(req.upgrade());
1391                 assert_eq!(req.headers().get("upgrade").unwrap(), "websocket");
1392                 ready(Ok(()))
1393             }
1394         }
1395 
1396         lazy(|cx| {
1397             let mut buf = TestSeqBuffer::empty();
1398             let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None);
1399 
1400             let services = HttpFlow::new(ok_service(), ExpectHandler, Some(TestUpgrade));
1401 
1402             let h1 = Dispatcher::<_, _, _, _, TestUpgrade>::new(
1403                 buf.clone(),
1404                 cfg,
1405                 services,
1406                 OnConnectData::default(),
1407                 None,
1408             );
1409 
1410             buf.extend_read_buf(
1411                 "\
1412                 GET /ws HTTP/1.1\r\n\
1413                 Connection: Upgrade\r\n\
1414                 Upgrade: websocket\r\n\
1415                 \r\n\
1416                 ",
1417             );
1418 
1419             actix_rt::pin!(h1);
1420 
1421             assert!(h1.as_mut().poll(cx).is_ready());
1422             assert!(matches!(&h1.inner, DispatcherState::Upgrade(_)));
1423 
1424             // polls: manual shutdown
1425             assert_eq!(h1.poll_count, 2);
1426         })
1427         .await;
1428     }
1429 }
1430