1 //! `Stream<Item = Request>` + `Service<Request>` => `Stream<Item = Response>`.
2 
3 use super::{common, Error};
4 use futures_core::Stream;
5 use futures_util::stream::FuturesOrdered;
6 use pin_project::pin_project;
7 use std::{
8     future::Future,
9     pin::Pin,
10     task::{Context, Poll},
11 };
12 use tower_service::Service;
13 
14 /// This is a `futures::Stream` of responses resulting from calling the wrapped `tower::Service`
15 /// for each request received on the wrapped `Stream`.
16 ///
17 /// ```rust
18 /// # use std::task::{Poll, Context};
19 /// # use std::cell::Cell;
20 /// # use std::error::Error;
21 /// # use std::rc::Rc;
22 /// #
23 /// use futures_util::future::{ready, Ready};
24 /// use futures_util::StreamExt;
25 /// use tower_service::Service;
26 /// use tower::util::ServiceExt;
27 /// use tokio::prelude::*;
28 ///
29 /// // First, we need to have a Service to process our requests.
30 /// #[derive(Debug, Eq, PartialEq)]
31 /// struct FirstLetter;
32 /// impl Service<&'static str> for FirstLetter {
33 ///      type Response = &'static str;
34 ///      type Error = Box<dyn Error + Send + Sync>;
35 ///      type Future = Ready<Result<Self::Response, Self::Error>>;
36 ///
37 ///      fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
38 ///          Poll::Ready(Ok(()))
39 ///      }
40 ///
41 ///      fn call(&mut self, req: &'static str) -> Self::Future {
42 ///          ready(Ok(&req[..1]))
43 ///      }
44 /// }
45 ///
46 /// #[tokio::main]
47 /// async fn main() {
48 ///     // Next, we need a Stream of requests.
49 ///     let (mut reqs, rx) = tokio::sync::mpsc::unbounded_channel();
50 ///     // Note that we have to help Rust out here by telling it what error type to use.
51 ///     // Specifically, it has to be From<Service::Error> + From<Stream::Error>.
52 ///     let mut rsps = FirstLetter.call_all(rx);
53 ///
54 ///     // Now, let's send a few requests and then check that we get the corresponding responses.
55 ///     reqs.send("one");
56 ///     reqs.send("two");
57 ///     reqs.send("three");
58 ///     drop(reqs);
59 ///
60 ///     // We then loop over the response Strem that we get back from call_all.
61 ///     let mut i = 0usize;
62 ///     while let Some(rsp) = rsps.next().await {
63 ///         // Each response is a Result (we could also have used TryStream::try_next)
64 ///         match (i + 1, rsp.unwrap()) {
65 ///             (1, "o") |
66 ///             (2, "t") |
67 ///             (3, "t") => {}
68 ///             (n, i) => {
69 ///                 unreachable!("{}. response was '{}'", n, i);
70 ///             }
71 ///         }
72 ///         i += 1;
73 ///     }
74 ///
75 ///     // And at the end, we can get the Service back when there are no more requests.
76 ///     assert_eq!(rsps.into_inner(), FirstLetter);
77 /// }
78 /// ```
79 #[pin_project]
80 #[derive(Debug)]
81 pub struct CallAll<Svc, S>
82 where
83     Svc: Service<S::Item>,
84     S: Stream,
85 {
86     #[pin]
87     inner: common::CallAll<Svc, S, FuturesOrdered<Svc::Future>>,
88 }
89 
90 impl<Svc, S> CallAll<Svc, S>
91 where
92     Svc: Service<S::Item>,
93     Svc::Error: Into<Error>,
94     S: Stream,
95 {
96     /// Create new `CallAll` combinator.
97     ///
98     /// Each request yielded by `stread` is passed to `svc`, and the resulting responses are
99     /// yielded in the same order by the implementation of `Stream` for `CallAll`.
new(service: Svc, stream: S) -> CallAll<Svc, S>100     pub fn new(service: Svc, stream: S) -> CallAll<Svc, S> {
101         CallAll {
102             inner: common::CallAll::new(service, stream, FuturesOrdered::new()),
103         }
104     }
105 
106     /// Extract the wrapped `Service`.
107     ///
108     /// # Panics
109     ///
110     /// Panics if `take_service` was already called.
into_inner(self) -> Svc111     pub fn into_inner(self) -> Svc {
112         self.inner.into_inner()
113     }
114 
115     /// Extract the wrapped `Service`.
116     ///
117     /// This `CallAll` can no longer be used after this function has been called.
118     ///
119     /// # Panics
120     ///
121     /// Panics if `take_service` was already called.
take_service(self: Pin<&mut Self>) -> Svc122     pub fn take_service(self: Pin<&mut Self>) -> Svc {
123         self.project().inner.take_service()
124     }
125 
126     /// Return responses as they are ready, regardless of the initial order.
127     ///
128     /// This function must be called before the stream is polled.
129     ///
130     /// # Panics
131     ///
132     /// Panics if `poll` was called.
unordered(self) -> super::CallAllUnordered<Svc, S>133     pub fn unordered(self) -> super::CallAllUnordered<Svc, S> {
134         self.inner.unordered()
135     }
136 }
137 
138 impl<Svc, S> Stream for CallAll<Svc, S>
139 where
140     Svc: Service<S::Item>,
141     Svc::Error: Into<Error>,
142     S: Stream,
143 {
144     type Item = Result<Svc::Response, Error>;
145 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>146     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
147         self.project().inner.poll_next(cx)
148     }
149 }
150 
151 impl<F: Future> common::Drive<F> for FuturesOrdered<F> {
is_empty(&self) -> bool152     fn is_empty(&self) -> bool {
153         FuturesOrdered::is_empty(self)
154     }
155 
push(&mut self, future: F)156     fn push(&mut self, future: F) {
157         FuturesOrdered::push(self, future)
158     }
159 
poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>160     fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> {
161         Stream::poll_next(Pin::new(self), cx)
162     }
163 }
164