1 //! `Stream<Item = Request>` + `Service<Request>` => `Stream<Item = Response>`. 2 3 use super::{common, Error}; 4 use futures_core::Stream; 5 use futures_util::stream::FuturesOrdered; 6 use pin_project::pin_project; 7 use std::{ 8 future::Future, 9 pin::Pin, 10 task::{Context, Poll}, 11 }; 12 use tower_service::Service; 13 14 /// This is a `futures::Stream` of responses resulting from calling the wrapped `tower::Service` 15 /// for each request received on the wrapped `Stream`. 16 /// 17 /// ```rust 18 /// # use std::task::{Poll, Context}; 19 /// # use std::cell::Cell; 20 /// # use std::error::Error; 21 /// # use std::rc::Rc; 22 /// # 23 /// use futures_util::future::{ready, Ready}; 24 /// use futures_util::StreamExt; 25 /// use tower_service::Service; 26 /// use tower::util::ServiceExt; 27 /// use tokio::prelude::*; 28 /// 29 /// // First, we need to have a Service to process our requests. 30 /// #[derive(Debug, Eq, PartialEq)] 31 /// struct FirstLetter; 32 /// impl Service<&'static str> for FirstLetter { 33 /// type Response = &'static str; 34 /// type Error = Box<dyn Error + Send + Sync>; 35 /// type Future = Ready<Result<Self::Response, Self::Error>>; 36 /// 37 /// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 38 /// Poll::Ready(Ok(())) 39 /// } 40 /// 41 /// fn call(&mut self, req: &'static str) -> Self::Future { 42 /// ready(Ok(&req[..1])) 43 /// } 44 /// } 45 /// 46 /// #[tokio::main] 47 /// async fn main() { 48 /// // Next, we need a Stream of requests. 49 /// let (mut reqs, rx) = tokio::sync::mpsc::unbounded_channel(); 50 /// // Note that we have to help Rust out here by telling it what error type to use. 51 /// // Specifically, it has to be From<Service::Error> + From<Stream::Error>. 52 /// let mut rsps = FirstLetter.call_all(rx); 53 /// 54 /// // Now, let's send a few requests and then check that we get the corresponding responses. 55 /// reqs.send("one"); 56 /// reqs.send("two"); 57 /// reqs.send("three"); 58 /// drop(reqs); 59 /// 60 /// // We then loop over the response Strem that we get back from call_all. 61 /// let mut i = 0usize; 62 /// while let Some(rsp) = rsps.next().await { 63 /// // Each response is a Result (we could also have used TryStream::try_next) 64 /// match (i + 1, rsp.unwrap()) { 65 /// (1, "o") | 66 /// (2, "t") | 67 /// (3, "t") => {} 68 /// (n, i) => { 69 /// unreachable!("{}. response was '{}'", n, i); 70 /// } 71 /// } 72 /// i += 1; 73 /// } 74 /// 75 /// // And at the end, we can get the Service back when there are no more requests. 76 /// assert_eq!(rsps.into_inner(), FirstLetter); 77 /// } 78 /// ``` 79 #[pin_project] 80 #[derive(Debug)] 81 pub struct CallAll<Svc, S> 82 where 83 Svc: Service<S::Item>, 84 S: Stream, 85 { 86 #[pin] 87 inner: common::CallAll<Svc, S, FuturesOrdered<Svc::Future>>, 88 } 89 90 impl<Svc, S> CallAll<Svc, S> 91 where 92 Svc: Service<S::Item>, 93 Svc::Error: Into<Error>, 94 S: Stream, 95 { 96 /// Create new `CallAll` combinator. 97 /// 98 /// Each request yielded by `stread` is passed to `svc`, and the resulting responses are 99 /// yielded in the same order by the implementation of `Stream` for `CallAll`. new(service: Svc, stream: S) -> CallAll<Svc, S>100 pub fn new(service: Svc, stream: S) -> CallAll<Svc, S> { 101 CallAll { 102 inner: common::CallAll::new(service, stream, FuturesOrdered::new()), 103 } 104 } 105 106 /// Extract the wrapped `Service`. 107 /// 108 /// # Panics 109 /// 110 /// Panics if `take_service` was already called. into_inner(self) -> Svc111 pub fn into_inner(self) -> Svc { 112 self.inner.into_inner() 113 } 114 115 /// Extract the wrapped `Service`. 116 /// 117 /// This `CallAll` can no longer be used after this function has been called. 118 /// 119 /// # Panics 120 /// 121 /// Panics if `take_service` was already called. take_service(self: Pin<&mut Self>) -> Svc122 pub fn take_service(self: Pin<&mut Self>) -> Svc { 123 self.project().inner.take_service() 124 } 125 126 /// Return responses as they are ready, regardless of the initial order. 127 /// 128 /// This function must be called before the stream is polled. 129 /// 130 /// # Panics 131 /// 132 /// Panics if `poll` was called. unordered(self) -> super::CallAllUnordered<Svc, S>133 pub fn unordered(self) -> super::CallAllUnordered<Svc, S> { 134 self.inner.unordered() 135 } 136 } 137 138 impl<Svc, S> Stream for CallAll<Svc, S> 139 where 140 Svc: Service<S::Item>, 141 Svc::Error: Into<Error>, 142 S: Stream, 143 { 144 type Item = Result<Svc::Response, Error>; 145 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>146 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 147 self.project().inner.poll_next(cx) 148 } 149 } 150 151 impl<F: Future> common::Drive<F> for FuturesOrdered<F> { is_empty(&self) -> bool152 fn is_empty(&self) -> bool { 153 FuturesOrdered::is_empty(self) 154 } 155 push(&mut self, future: F)156 fn push(&mut self, future: F) { 157 FuturesOrdered::push(self, future) 158 } 159 poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>160 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> { 161 Stream::poll_next(Pin::new(self), cx) 162 } 163 } 164