1 use std::error::Error as StdError; 2 3 use bytes::{Buf, Bytes}; 4 use futures::{Async, Future, Poll, Stream}; 5 use http::{Request, Response, StatusCode}; 6 use tokio_io::{AsyncRead, AsyncWrite}; 7 8 use body::{Body, Payload}; 9 use body::internal::FullDataArg; 10 use common::{Never, YieldNow}; 11 use proto::{BodyLength, DecodedLength, Conn, Dispatched, MessageHead, RequestHead, RequestLine, ResponseHead}; 12 use super::Http1Transaction; 13 use service::Service; 14 15 pub(crate) struct Dispatcher<D, Bs: Payload, I, T> { 16 conn: Conn<I, Bs::Data, T>, 17 dispatch: D, 18 body_tx: Option<::body::Sender>, 19 body_rx: Option<Bs>, 20 is_closing: bool, 21 /// If the poll loop reaches its max spin count, it will yield by notifying 22 /// the task immediately. This will cache that `Task`, since it usually is 23 /// the same one. 24 yield_now: YieldNow, 25 } 26 27 pub(crate) trait Dispatch { 28 type PollItem; 29 type PollBody; 30 type PollError; 31 type RecvItem; poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, Self::PollError>32 fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, Self::PollError>; recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()>33 fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()>; poll_ready(&mut self) -> Poll<(), ()>34 fn poll_ready(&mut self) -> Poll<(), ()>; should_poll(&self) -> bool35 fn should_poll(&self) -> bool; 36 } 37 38 pub struct Server<S: Service> { 39 in_flight: Option<S::Future>, 40 pub(crate) service: S, 41 } 42 43 pub struct Client<B> { 44 callback: Option<::client::dispatch::Callback<Request<B>, Response<Body>>>, 45 rx: ClientRx<B>, 46 } 47 48 type ClientRx<B> = ::client::dispatch::Receiver<Request<B>, Response<Body>>; 49 50 impl<D, Bs, I, T> Dispatcher<D, Bs, I, T> 51 where 52 D: Dispatch<PollItem=MessageHead<T::Outgoing>, PollBody=Bs, RecvItem=MessageHead<T::Incoming>>, 53 D::PollError: Into<Box<dyn StdError + Send + Sync>>, 54 I: AsyncRead + AsyncWrite, 55 T: Http1Transaction, 56 Bs: Payload, 57 { new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self58 pub fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self { 59 Dispatcher { 60 conn: conn, 61 dispatch: dispatch, 62 body_tx: None, 63 body_rx: None, 64 is_closing: false, 65 yield_now: YieldNow::new(), 66 } 67 } 68 disable_keep_alive(&mut self)69 pub fn disable_keep_alive(&mut self) { 70 self.conn.disable_keep_alive() 71 } 72 into_inner(self) -> (I, Bytes, D)73 pub fn into_inner(self) -> (I, Bytes, D) { 74 let (io, buf) = self.conn.into_inner(); 75 (io, buf, self.dispatch) 76 } 77 78 /// Run this dispatcher until HTTP says this connection is done, 79 /// but don't call `AsyncWrite::shutdown` on the underlying IO. 80 /// 81 /// This is useful for old-style HTTP upgrades, but ignores 82 /// newer-style upgrade API. poll_without_shutdown(&mut self) -> Poll<(), ::Error>83 pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> { 84 self.poll_catch(false) 85 .map(|x| { 86 x.map(|ds| if let Dispatched::Upgrade(pending) = ds { 87 pending.manual(); 88 }) 89 }) 90 } 91 poll_catch(&mut self, should_shutdown: bool) -> Poll<Dispatched, ::Error>92 fn poll_catch(&mut self, should_shutdown: bool) -> Poll<Dispatched, ::Error> { 93 self.poll_inner(should_shutdown).or_else(|e| { 94 // An error means we're shutting down either way. 95 // We just try to give the error to the user, 96 // and close the connection with an Ok. If we 97 // cannot give it to the user, then return the Err. 98 self.dispatch.recv_msg(Err(e))?; 99 Ok(Async::Ready(Dispatched::Shutdown)) 100 }) 101 } 102 poll_inner(&mut self, should_shutdown: bool) -> Poll<Dispatched, ::Error>103 fn poll_inner(&mut self, should_shutdown: bool) -> Poll<Dispatched, ::Error> { 104 T::update_date(); 105 106 try_ready!(self.poll_loop()); 107 108 if self.is_done() { 109 if let Some(pending) = self.conn.pending_upgrade() { 110 self.conn.take_error()?; 111 return Ok(Async::Ready(Dispatched::Upgrade(pending))); 112 } else if should_shutdown { 113 try_ready!(self.conn.shutdown().map_err(::Error::new_shutdown)); 114 } 115 self.conn.take_error()?; 116 Ok(Async::Ready(Dispatched::Shutdown)) 117 } else { 118 Ok(Async::NotReady) 119 } 120 } 121 poll_loop(&mut self) -> Poll<(), ::Error>122 fn poll_loop(&mut self) -> Poll<(), ::Error> { 123 // Limit the looping on this connection, in case it is ready far too 124 // often, so that other futures don't starve. 125 // 126 // 16 was chosen arbitrarily, as that is number of pipelined requests 127 // benchmarks often use. Perhaps it should be a config option instead. 128 for _ in 0..16 { 129 self.poll_read()?; 130 self.poll_write()?; 131 self.poll_flush()?; 132 133 // This could happen if reading paused before blocking on IO, 134 // such as getting to the end of a framed message, but then 135 // writing/flushing set the state back to Init. In that case, 136 // if the read buffer still had bytes, we'd want to try poll_read 137 // again, or else we wouldn't ever be woken up again. 138 // 139 // Using this instead of task::current() and notify() inside 140 // the Conn is noticeably faster in pipelined benchmarks. 141 if !self.conn.wants_read_again() { 142 //break; 143 return Ok(Async::Ready(())); 144 } 145 } 146 147 trace!("poll_loop yielding (self = {:p})", self); 148 149 match self.yield_now.poll_yield() { 150 Ok(Async::NotReady) => Ok(Async::NotReady), 151 // maybe with `!` this can be cleaner... 152 // but for now, just doing this to eliminate branches 153 Ok(Async::Ready(never)) | 154 Err(never) => match never {} 155 } 156 } 157 poll_read(&mut self) -> Poll<(), ::Error>158 fn poll_read(&mut self) -> Poll<(), ::Error> { 159 loop { 160 if self.is_closing { 161 return Ok(Async::Ready(())); 162 } else if self.conn.can_read_head() { 163 try_ready!(self.poll_read_head()); 164 } else if let Some(mut body) = self.body_tx.take() { 165 if self.conn.can_read_body() { 166 match body.poll_ready() { 167 Ok(Async::Ready(())) => (), 168 Ok(Async::NotReady) => { 169 self.body_tx = Some(body); 170 return Ok(Async::NotReady); 171 }, 172 Err(_canceled) => { 173 // user doesn't care about the body 174 // so we should stop reading 175 trace!("body receiver dropped before eof, closing"); 176 self.conn.close_read(); 177 return Ok(Async::Ready(())); 178 } 179 } 180 match self.conn.read_body() { 181 Ok(Async::Ready(Some(chunk))) => { 182 match body.send_data(chunk) { 183 Ok(()) => { 184 self.body_tx = Some(body); 185 }, 186 Err(_canceled) => { 187 if self.conn.can_read_body() { 188 trace!("body receiver dropped before eof, closing"); 189 self.conn.close_read(); 190 } 191 } 192 } 193 }, 194 Ok(Async::Ready(None)) => { 195 // just drop, the body will close automatically 196 }, 197 Ok(Async::NotReady) => { 198 self.body_tx = Some(body); 199 return Ok(Async::NotReady); 200 } 201 Err(e) => { 202 body.send_error(::Error::new_body(e)); 203 } 204 } 205 } else { 206 // just drop, the body will close automatically 207 } 208 } else { 209 return self.conn.read_keep_alive(); 210 } 211 } 212 } 213 poll_read_head(&mut self) -> Poll<(), ::Error>214 fn poll_read_head(&mut self) -> Poll<(), ::Error> { 215 // can dispatch receive, or does it still care about, an incoming message? 216 match self.dispatch.poll_ready() { 217 Ok(Async::Ready(())) => (), 218 Ok(Async::NotReady) => return Ok(Async::NotReady), // service might not be ready 219 Err(()) => { 220 trace!("dispatch no longer receiving messages"); 221 self.close(); 222 return Ok(Async::Ready(())); 223 } 224 } 225 // dispatch is ready for a message, try to read one 226 match self.conn.read_head() { 227 Ok(Async::Ready(Some((head, body_len, wants_upgrade)))) => { 228 let mut body = match body_len { 229 DecodedLength::ZERO => Body::empty(), 230 other => { 231 let (tx, rx) = Body::new_channel(other.into_opt()); 232 self.body_tx = Some(tx); 233 rx 234 }, 235 }; 236 if wants_upgrade { 237 body.set_on_upgrade(self.conn.on_upgrade()); 238 } 239 self.dispatch.recv_msg(Ok((head, body)))?; 240 Ok(Async::Ready(())) 241 } 242 Ok(Async::Ready(None)) => { 243 // read eof, conn will start to shutdown automatically 244 Ok(Async::Ready(())) 245 } 246 Ok(Async::NotReady) => Ok(Async::NotReady), 247 Err(err) => { 248 debug!("read_head error: {}", err); 249 self.dispatch.recv_msg(Err(err))?; 250 // if here, the dispatcher gave the user the error 251 // somewhere else. we still need to shutdown, but 252 // not as a second error. 253 Ok(Async::Ready(())) 254 } 255 } 256 } 257 poll_write(&mut self) -> Poll<(), ::Error>258 fn poll_write(&mut self) -> Poll<(), ::Error> { 259 loop { 260 if self.is_closing { 261 return Ok(Async::Ready(())); 262 } else if self.body_rx.is_none() && self.conn.can_write_head() && self.dispatch.should_poll() { 263 if let Some((head, mut body)) = try_ready!(self.dispatch.poll_msg().map_err(::Error::new_user_service)) { 264 // Check if the body knows its full data immediately. 265 // 266 // If so, we can skip a bit of bookkeeping that streaming 267 // bodies need to do. 268 if let Some(full) = body.__hyper_full_data(FullDataArg(())).0 { 269 self.conn.write_full_msg(head, full); 270 return Ok(Async::Ready(())); 271 } 272 let body_type = if body.is_end_stream() { 273 self.body_rx = None; 274 None 275 } else { 276 let btype = body.content_length() 277 .map(BodyLength::Known) 278 .or_else(|| Some(BodyLength::Unknown)); 279 self.body_rx = Some(body); 280 btype 281 }; 282 self.conn.write_head(head, body_type); 283 } else { 284 self.close(); 285 return Ok(Async::Ready(())); 286 } 287 } else if !self.conn.can_buffer_body() { 288 try_ready!(self.poll_flush()); 289 } else if let Some(mut body) = self.body_rx.take() { 290 if !self.conn.can_write_body() { 291 trace!( 292 "no more write body allowed, user body is_end_stream = {}", 293 body.is_end_stream(), 294 ); 295 continue; 296 } 297 match body.poll_data().map_err(::Error::new_user_body)? { 298 Async::Ready(Some(chunk)) => { 299 let eos = body.is_end_stream(); 300 if eos { 301 if chunk.remaining() == 0 { 302 trace!("discarding empty chunk"); 303 self.conn.end_body(); 304 } else { 305 self.conn.write_body_and_end(chunk); 306 } 307 } else { 308 self.body_rx = Some(body); 309 if chunk.remaining() == 0 { 310 trace!("discarding empty chunk"); 311 continue; 312 } 313 self.conn.write_body(chunk); 314 } 315 }, 316 Async::Ready(None) => { 317 self.conn.end_body(); 318 }, 319 Async::NotReady => { 320 self.body_rx = Some(body); 321 return Ok(Async::NotReady); 322 } 323 } 324 } else { 325 return Ok(Async::NotReady); 326 } 327 } 328 } 329 poll_flush(&mut self) -> Poll<(), ::Error>330 fn poll_flush(&mut self) -> Poll<(), ::Error> { 331 self.conn.flush().map_err(|err| { 332 debug!("error writing: {}", err); 333 ::Error::new_body_write(err) 334 }) 335 } 336 close(&mut self)337 fn close(&mut self) { 338 self.is_closing = true; 339 self.conn.close_read(); 340 self.conn.close_write(); 341 } 342 is_done(&self) -> bool343 fn is_done(&self) -> bool { 344 if self.is_closing { 345 return true; 346 } 347 348 let read_done = self.conn.is_read_closed(); 349 350 if !T::should_read_first() && read_done { 351 // a client that cannot read may was well be done. 352 true 353 } else { 354 let write_done = self.conn.is_write_closed() || 355 (!self.dispatch.should_poll() && self.body_rx.is_none()); 356 read_done && write_done 357 } 358 } 359 } 360 361 impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T> 362 where 363 D: Dispatch<PollItem=MessageHead<T::Outgoing>, PollBody=Bs, RecvItem=MessageHead<T::Incoming>>, 364 D::PollError: Into<Box<dyn StdError + Send + Sync>>, 365 I: AsyncRead + AsyncWrite, 366 T: Http1Transaction, 367 Bs: Payload, 368 { 369 type Item = Dispatched; 370 type Error = ::Error; 371 372 #[inline] poll(&mut self) -> Poll<Self::Item, Self::Error>373 fn poll(&mut self) -> Poll<Self::Item, Self::Error> { 374 self.poll_catch(true) 375 } 376 } 377 378 // ===== impl Server ===== 379 380 impl<S> Server<S> where S: Service { new(service: S) -> Server<S>381 pub fn new(service: S) -> Server<S> { 382 Server { 383 in_flight: None, 384 service: service, 385 } 386 } into_service(self) -> S387 pub fn into_service(self) -> S { 388 self.service 389 } 390 } 391 392 impl<S, Bs> Dispatch for Server<S> 393 where 394 S: Service<ReqBody=Body, ResBody=Bs>, 395 S::Error: Into<Box<dyn StdError + Send + Sync>>, 396 Bs: Payload, 397 { 398 type PollItem = MessageHead<StatusCode>; 399 type PollBody = Bs; 400 type PollError = S::Error; 401 type RecvItem = RequestHead; 402 poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, Self::PollError>403 fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, Self::PollError> { 404 if let Some(mut fut) = self.in_flight.take() { 405 let resp = match fut.poll()? { 406 Async::Ready(res) => res, 407 Async::NotReady => { 408 self.in_flight = Some(fut); 409 return Ok(Async::NotReady); 410 } 411 }; 412 let (parts, body) = resp.into_parts(); 413 let head = MessageHead { 414 version: parts.version, 415 subject: parts.status, 416 headers: parts.headers, 417 }; 418 Ok(Async::Ready(Some((head, body)))) 419 } else { 420 unreachable!("poll_msg shouldn't be called if no inflight"); 421 } 422 } 423 recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()>424 fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()> { 425 let (msg, body) = msg?; 426 let mut req = Request::new(body); 427 *req.method_mut() = msg.subject.0; 428 *req.uri_mut() = msg.subject.1; 429 *req.headers_mut() = msg.headers; 430 *req.version_mut() = msg.version; 431 self.in_flight = Some(self.service.call(req)); 432 Ok(()) 433 } 434 poll_ready(&mut self) -> Poll<(), ()>435 fn poll_ready(&mut self) -> Poll<(), ()> { 436 if self.in_flight.is_some() { 437 Ok(Async::NotReady) 438 } else { 439 self.service.poll_ready() 440 .map_err(|_e| { 441 // FIXME: return error value. 442 trace!("service closed"); 443 }) 444 } 445 } 446 should_poll(&self) -> bool447 fn should_poll(&self) -> bool { 448 self.in_flight.is_some() 449 } 450 } 451 452 // ===== impl Client ===== 453 454 455 impl<B> Client<B> { new(rx: ClientRx<B>) -> Client<B>456 pub fn new(rx: ClientRx<B>) -> Client<B> { 457 Client { 458 callback: None, 459 rx: rx, 460 } 461 } 462 } 463 464 impl<B> Dispatch for Client<B> 465 where 466 B: Payload, 467 { 468 type PollItem = RequestHead; 469 type PollBody = B; 470 type PollError = Never; 471 type RecvItem = ResponseHead; 472 poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, Never>473 fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, Never> { 474 match self.rx.poll() { 475 Ok(Async::Ready(Some((req, mut cb)))) => { 476 // check that future hasn't been canceled already 477 match cb.poll_cancel().expect("poll_cancel cannot error") { 478 Async::Ready(()) => { 479 trace!("request canceled"); 480 Ok(Async::Ready(None)) 481 }, 482 Async::NotReady => { 483 let (parts, body) = req.into_parts(); 484 let head = RequestHead { 485 version: parts.version, 486 subject: RequestLine(parts.method, parts.uri), 487 headers: parts.headers, 488 }; 489 self.callback = Some(cb); 490 Ok(Async::Ready(Some((head, body)))) 491 } 492 } 493 }, 494 Ok(Async::Ready(None)) => { 495 trace!("client tx closed"); 496 // user has dropped sender handle 497 Ok(Async::Ready(None)) 498 }, 499 Ok(Async::NotReady) => Ok(Async::NotReady), 500 Err(never) => match never {}, 501 } 502 } 503 recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()>504 fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()> { 505 match msg { 506 Ok((msg, body)) => { 507 if let Some(cb) = self.callback.take() { 508 let mut res = Response::new(body); 509 *res.status_mut() = msg.subject; 510 *res.headers_mut() = msg.headers; 511 *res.version_mut() = msg.version; 512 let _ = cb.send(Ok(res)); 513 Ok(()) 514 } else { 515 // Getting here is likely a bug! An error should have happened 516 // in Conn::require_empty_read() before ever parsing a 517 // full message! 518 Err(::Error::new_unexpected_message()) 519 } 520 }, 521 Err(err) => { 522 if let Some(cb) = self.callback.take() { 523 let _ = cb.send(Err((err, None))); 524 Ok(()) 525 } else if let Ok(Async::Ready(Some((req, cb)))) = self.rx.poll() { 526 trace!("canceling queued request with connection error: {}", err); 527 // in this case, the message was never even started, so it's safe to tell 528 // the user that the request was completely canceled 529 let _ = cb.send(Err((::Error::new_canceled().with(err), Some(req)))); 530 Ok(()) 531 } else { 532 Err(err) 533 } 534 } 535 } 536 } 537 poll_ready(&mut self) -> Poll<(), ()>538 fn poll_ready(&mut self) -> Poll<(), ()> { 539 match self.callback { 540 Some(ref mut cb) => match cb.poll_cancel() { 541 Ok(Async::Ready(())) => { 542 trace!("callback receiver has dropped"); 543 Err(()) 544 }, 545 Ok(Async::NotReady) => Ok(Async::Ready(())), 546 Err(_) => unreachable!("oneshot poll_cancel cannot error"), 547 }, 548 None => Err(()), 549 } 550 } 551 should_poll(&self) -> bool552 fn should_poll(&self) -> bool { 553 self.callback.is_none() 554 } 555 } 556 557 #[cfg(test)] 558 mod tests { 559 extern crate pretty_env_logger; 560 561 use super::*; 562 use mock::AsyncIo; 563 use proto::h1::ClientTransaction; 564 565 #[test] client_read_bytes_before_writing_request()566 fn client_read_bytes_before_writing_request() { 567 let _ = pretty_env_logger::try_init(); 568 ::futures::lazy(|| { 569 // Block at 0 for now, but we will release this response before 570 // the request is ready to write later... 571 let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 0); 572 let (mut tx, rx) = ::client::dispatch::channel(); 573 let conn = Conn::<_, ::Chunk, ClientTransaction>::new(io); 574 let mut dispatcher = Dispatcher::new(Client::new(rx), conn); 575 576 // First poll is needed to allow tx to send... 577 assert!(dispatcher.poll().expect("nothing is ready").is_not_ready()); 578 // Unblock our IO, which has a response before we've sent request! 579 dispatcher.conn.io_mut().block_in(100); 580 581 let res_rx = tx.try_send(::Request::new(::Body::empty())).unwrap(); 582 583 let a1 = dispatcher.poll().expect("error should be sent on channel"); 584 assert!(a1.is_ready(), "dispatcher should be closed"); 585 let err = res_rx.wait() 586 .expect("callback poll") 587 .expect_err("callback response"); 588 589 match (err.0.kind(), err.1) { 590 (&::error::Kind::Canceled, Some(_)) => (), 591 other => panic!("expected Canceled, got {:?}", other), 592 } 593 Ok::<(), ()>(()) 594 }).wait().unwrap(); 595 } 596 597 #[test] body_empty_chunks_ignored()598 fn body_empty_chunks_ignored() { 599 let _ = pretty_env_logger::try_init(); 600 ::futures::lazy(|| { 601 let io = AsyncIo::new_buf(vec![], 0); 602 let (mut tx, rx) = ::client::dispatch::channel(); 603 let conn = Conn::<_, ::Chunk, ClientTransaction>::new(io); 604 let mut dispatcher = Dispatcher::new(Client::new(rx), conn); 605 606 // First poll is needed to allow tx to send... 607 assert!(dispatcher.poll().expect("nothing is ready").is_not_ready()); 608 609 let body = ::Body::wrap_stream(::futures::stream::once(Ok::<_, ::Error>(""))); 610 611 let _res_rx = tx.try_send(::Request::new(body)).unwrap(); 612 613 dispatcher.poll().expect("empty body shouldn't panic"); 614 Ok::<(), ()>(()) 615 }).wait().unwrap(); 616 } 617 } 618