1 //! A cache of services.
2 
3 use super::error;
4 use futures_core::Stream;
5 use futures_util::stream::FuturesUnordered;
6 pub use indexmap::Equivalent;
7 use indexmap::IndexMap;
8 use std::future::Future;
9 use std::hash::Hash;
10 use std::pin::Pin;
11 use std::task::{Context, Poll};
12 use tokio::sync::oneshot;
13 use tower_service::Service;
14 use tracing::{debug, trace};
15 
16 /// Drives readiness over a set of services.
17 ///
18 /// The cache maintains two internal data structures:
19 ///
20 /// * a set of  _pending_ services that have not yet become ready; and
21 /// * a set of _ready_ services that have previously polled ready.
22 ///
23 /// As each `S` typed `Service` is added to the cache via `ReadyCache::push`, it
24 /// is added to the _pending set_. As `ReadyCache::poll_pending` is invoked,
25 /// pending services are polled and added to the _ready set_.
26 ///
27 /// `ReadyCache::call_ready` (or `ReadyCache::call_ready_index`) dispatches a
28 /// request to the specified service, but panics if the specified service is not
29 /// in the ready set. The `ReadyCache::check_*` functions can be used to ensure
30 /// that a service is ready before dispatching a request.
31 ///
32 /// The ready set can hold services for an abitrarily long time. During this
33 /// time, the runtime may process events that invalidate that ready state (for
34 /// instance, if a keepalive detects a lost connection). In such cases, callers
35 /// should use `ReadyCache::check_ready` (or `ReadyCache::check_ready_index`)
36 /// immediately before dispatching a request to ensure that the service has not
37 /// become unavailable.
38 ///
39 /// Once `ReadyCache::call_ready*` is invoked, the service is placed back into
40 /// the _pending_ set to be driven to readiness again.
41 ///
42 /// When `ReadyCache::check_ready*` returns `false`, it indicates that the
43 /// specified service is _not_ ready. If an error is returned, this indicats that
44 /// the server failed and has been removed from the cache entirely.
45 ///
46 /// `ReadyCache::evict` can be used to remove a service from the cache (by key),
47 /// though the service may not be dropped (if it is currently pending) until
48 /// `ReadyCache::poll_pending` is invoked.
49 ///
50 /// Note that the by-index accessors are provided to support use cases (like
51 /// power-of-two-choices load balancing) where the caller does not care to keep
52 /// track of each service's key. Instead, it needs only to access _some_ ready
53 /// service. In such a case, it should be noted that calls to
54 /// `ReadyCache::poll_pending` and `ReadyCache::evict` may perturb the order of
55 /// the ready set, so any cached indexes should be discarded after such a call.
56 #[derive(Debug)]
57 pub struct ReadyCache<K, S, Req>
58 where
59     K: Eq + Hash,
60 {
61     /// A stream of services that are not yet ready.
62     pending: FuturesUnordered<Pending<K, S, Req>>,
63     /// An index of cancelation handles for pending streams.
64     pending_cancel_txs: IndexMap<K, CancelTx>,
65 
66     /// Services that have previously become ready. Readiness can become stale,
67     /// so a given service should be polled immediately before use.
68     ///
69     /// The cancelation oneshot is preserved (though unused) while the service is
70     /// ready so that it need not be reallocated each time a request is
71     /// dispatched.
72     ready: IndexMap<K, (S, CancelPair)>,
73 }
74 
75 // Safety: This is safe because we do not use `Pin::new_unchecked`.
76 impl<S, K: Eq + Hash, Req> Unpin for ReadyCache<K, S, Req> {}
77 
78 type CancelRx = oneshot::Receiver<()>;
79 type CancelTx = oneshot::Sender<()>;
80 type CancelPair = (CancelTx, CancelRx);
81 
82 #[derive(Debug)]
83 enum PendingError<K, E> {
84     Canceled(K),
85     Inner(K, E),
86 }
87 
88 /// A Future that becomes satisfied when an `S`-typed service is ready.
89 ///
90 /// May fail due to cancelation, i.e. if the service is evicted from the balancer.
91 #[derive(Debug)]
92 struct Pending<K, S, Req> {
93     key: Option<K>,
94     cancel: Option<CancelRx>,
95     ready: Option<S>,
96     _pd: std::marker::PhantomData<Req>,
97 }
98 
99 // === ReadyCache ===
100 
101 impl<K, S, Req> Default for ReadyCache<K, S, Req>
102 where
103     K: Eq + Hash,
104     S: Service<Req>,
105 {
default() -> Self106     fn default() -> Self {
107         Self {
108             ready: IndexMap::default(),
109             pending: FuturesUnordered::new(),
110             pending_cancel_txs: IndexMap::default(),
111         }
112     }
113 }
114 
115 impl<K, S, Req> ReadyCache<K, S, Req>
116 where
117     K: Eq + Hash,
118 {
119     /// Returns the total number of services in the cache.
len(&self) -> usize120     pub fn len(&self) -> usize {
121         self.ready_len() + self.pending_len()
122     }
123 
124     /// Returns the number of services in the ready set.
ready_len(&self) -> usize125     pub fn ready_len(&self) -> usize {
126         self.ready.len()
127     }
128 
129     /// Returns the number of services in the unready set.
pending_len(&self) -> usize130     pub fn pending_len(&self) -> usize {
131         self.pending.len()
132     }
133 
134     /// Returns true iff the given key is in the unready set.
pending_contains<Q: Hash + Equivalent<K>>(&self, key: &Q) -> bool135     pub fn pending_contains<Q: Hash + Equivalent<K>>(&self, key: &Q) -> bool {
136         self.pending_cancel_txs.contains_key(key)
137     }
138 
139     /// Obtains a reference to a service in the ready set by key.
get_ready<Q: Hash + Equivalent<K>>(&self, key: &Q) -> Option<(usize, &K, &S)>140     pub fn get_ready<Q: Hash + Equivalent<K>>(&self, key: &Q) -> Option<(usize, &K, &S)> {
141         self.ready.get_full(key).map(|(i, k, v)| (i, k, &v.0))
142     }
143 
144     /// Obtains a mutable reference to a service in the ready set by key.
get_ready_mut<Q: Hash + Equivalent<K>>( &mut self, key: &Q, ) -> Option<(usize, &K, &mut S)>145     pub fn get_ready_mut<Q: Hash + Equivalent<K>>(
146         &mut self,
147         key: &Q,
148     ) -> Option<(usize, &K, &mut S)> {
149         self.ready
150             .get_full_mut(key)
151             .map(|(i, k, v)| (i, k, &mut v.0))
152     }
153 
154     /// Obtains a reference to a service in the ready set by index.
get_ready_index(&self, idx: usize) -> Option<(&K, &S)>155     pub fn get_ready_index(&self, idx: usize) -> Option<(&K, &S)> {
156         self.ready.get_index(idx).map(|(k, v)| (k, &v.0))
157     }
158 
159     /// Obtains a mutable reference to a service in the ready set by index.
get_ready_index_mut(&mut self, idx: usize) -> Option<(&mut K, &mut S)>160     pub fn get_ready_index_mut(&mut self, idx: usize) -> Option<(&mut K, &mut S)> {
161         self.ready.get_index_mut(idx).map(|(k, v)| (k, &mut v.0))
162     }
163 
164     /// Evicts an item from the cache.
165     ///
166     /// Returns true if a service was marked for eviction.
167     ///
168     /// Services are dropped from the ready set immediately. Services in the
169     /// pending set are marked for cancellation, but `ReadyCache::poll_pending`
170     /// must be called to cause the service to be dropped.
evict<Q: Hash + Equivalent<K>>(&mut self, key: &Q) -> bool171     pub fn evict<Q: Hash + Equivalent<K>>(&mut self, key: &Q) -> bool {
172         let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) {
173             c.send(()).expect("cancel receiver lost");
174             true
175         } else {
176             false
177         };
178 
179         self.ready
180             .swap_remove_full(key)
181             .map(|_| true)
182             .unwrap_or(canceled)
183     }
184 }
185 
186 impl<K, S, Req> ReadyCache<K, S, Req>
187 where
188     K: Clone + Eq + Hash,
189     S: Service<Req>,
190     <S as Service<Req>>::Error: Into<error::Error>,
191     S::Error: Into<error::Error>,
192 {
193     /// Pushes a new service onto the pending set.
194     ///
195     /// The service will be promoted to the ready set as `poll_pending` is invoked.
196     ///
197     /// Note that this does **not** remove services from the ready set. Once the
198     /// old service is used, it will be dropped instead of being added back to
199     /// the pending set; OR, when the new service becomes ready, it will replace
200     /// the prior service in the ready set.
push(&mut self, key: K, svc: S)201     pub fn push(&mut self, key: K, svc: S) {
202         let cancel = oneshot::channel();
203         self.push_pending(key, svc, cancel);
204     }
205 
push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair)206     fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) {
207         if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) {
208             // If there is already a service for this key, cancel it.
209             c.send(()).expect("cancel receiver lost");
210         }
211         self.pending.push(Pending {
212             key: Some(key),
213             cancel: Some(cancel_rx),
214             ready: Some(svc),
215             _pd: std::marker::PhantomData,
216         });
217     }
218 
219     /// Polls services pending readiness, adding ready services to the ready set.
220     ///
221     /// Returns `Async::Ready` when there are no remaining unready services.
222     /// `poll_pending` should be called again after `push_service` or
223     /// `call_ready_index` are invoked.
224     ///
225     /// Failures indicate that an individual pending service failed to become
226     /// ready (and has been removed from the cache). In such a case,
227     /// `poll_pending` should typically be called again to continue driving
228     /// pending services to readiness.
poll_pending(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), error::Failed<K>>>229     pub fn poll_pending(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), error::Failed<K>>> {
230         loop {
231             match Pin::new(&mut self.pending).poll_next(cx) {
232                 Poll::Pending => return Poll::Pending,
233                 Poll::Ready(None) => return Poll::Ready(Ok(())),
234                 Poll::Ready(Some(Ok((key, svc, cancel_rx)))) => {
235                     trace!("endpoint ready");
236                     let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
237                     if let Some(cancel_tx) = cancel_tx {
238                         // Keep track of the cancelation so that it need not be
239                         // recreated after the service is used.
240                         self.ready.insert(key, (svc, (cancel_tx, cancel_rx)));
241                     } else {
242                         // This should not technically be possible. We must have decided to cancel
243                         // a Service (by sending on the CancelTx), yet that same service then
244                         // returns Ready. Since polling a Pending _first_ polls the CancelRx, that
245                         // _should_ always see our CancelTx send. Yet empirically, that isn't true:
246                         //
247                         //   https://github.com/tower-rs/tower/issues/415
248                         //
249                         // So, we instead detect the endpoint as canceled at this point. That
250                         // should be fine, since the oneshot is only really there to ensure that
251                         // the Pending is polled again anyway.
252                         //
253                         // We assert that this can't happen in debug mode so that hopefully one day
254                         // we can find a test that triggers this reliably.
255                         debug_assert!(cancel_tx.is_some());
256                         debug!("canceled endpoint removed when ready");
257                     }
258                 }
259                 Poll::Ready(Some(Err(PendingError::Canceled(_)))) => {
260                     debug!("endpoint canceled");
261                     // The cancellation for this service was removed in order to
262                     // cause this cancellation.
263                 }
264                 Poll::Ready(Some(Err(PendingError::Inner(key, e)))) => {
265                     let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
266                     if let Some(_) = cancel_tx {
267                         return Err(error::Failed(key, e.into())).into();
268                     } else {
269                         // See comment for the same clause under Ready(Some(Ok)).
270                         debug_assert!(cancel_tx.is_some());
271                         debug!("canceled endpoint removed on error");
272                     }
273                 }
274             }
275         }
276     }
277 
278     /// Checks whether the referenced endpoint is ready.
279     ///
280     /// Returns true if the endpoint is ready and false if it is not. An error is
281     /// returned if the endpoint fails.
check_ready<Q: Hash + Equivalent<K>>( &mut self, cx: &mut Context<'_>, key: &Q, ) -> Result<bool, error::Failed<K>>282     pub fn check_ready<Q: Hash + Equivalent<K>>(
283         &mut self,
284         cx: &mut Context<'_>,
285         key: &Q,
286     ) -> Result<bool, error::Failed<K>> {
287         match self.ready.get_full_mut(key) {
288             Some((index, _, _)) => self.check_ready_index(cx, index),
289             None => Ok(false),
290         }
291     }
292 
293     /// Checks whether the referenced endpoint is ready.
294     ///
295     /// If the service is no longer ready, it is moved back into the pending set
296     /// and `false` is returned.
297     ///
298     /// If the service errors, it is removed and dropped and the error is returned.
check_ready_index( &mut self, cx: &mut Context<'_>, index: usize, ) -> Result<bool, error::Failed<K>>299     pub fn check_ready_index(
300         &mut self,
301         cx: &mut Context<'_>,
302         index: usize,
303     ) -> Result<bool, error::Failed<K>> {
304         let svc = match self.ready.get_index_mut(index) {
305             None => return Ok(false),
306             Some((_, (svc, _))) => svc,
307         };
308         match svc.poll_ready(cx) {
309             Poll::Ready(Ok(())) => Ok(true),
310             Poll::Pending => {
311                 // became unready; so move it back there.
312                 let (key, (svc, cancel)) = self
313                     .ready
314                     .swap_remove_index(index)
315                     .expect("invalid ready index");
316 
317                 // If a new version of this service has been added to the
318                 // unready set, don't overwrite it.
319                 if !self.pending_contains(&key) {
320                     self.push_pending(key, svc, cancel);
321                 }
322 
323                 Ok(false)
324             }
325             Poll::Ready(Err(e)) => {
326                 // failed, so drop it.
327                 let (key, _) = self
328                     .ready
329                     .swap_remove_index(index)
330                     .expect("invalid ready index");
331                 Err(error::Failed(key, e.into()))
332             }
333         }
334     }
335 
336     /// Calls a ready service by key.
337     ///
338     /// # Panics
339     ///
340     /// If the specified key does not exist in the ready
call_ready<Q: Hash + Equivalent<K>>(&mut self, key: &Q, req: Req) -> S::Future341     pub fn call_ready<Q: Hash + Equivalent<K>>(&mut self, key: &Q, req: Req) -> S::Future {
342         let (index, _, _) = self
343             .ready
344             .get_full_mut(key)
345             .expect("check_ready was not called");
346         self.call_ready_index(index, req)
347     }
348 
349     /// Calls a ready service by index.
350     ///
351     /// # Panics
352     ///
353     /// If the specified index is out of range.
call_ready_index(&mut self, index: usize, req: Req) -> S::Future354     pub fn call_ready_index(&mut self, index: usize, req: Req) -> S::Future {
355         let (key, (mut svc, cancel)) = self
356             .ready
357             .swap_remove_index(index)
358             .expect("check_ready_index was not called");
359 
360         let fut = svc.call(req);
361 
362         // If a new version of this service has been added to the
363         // unready set, don't overwrite it.
364         if !self.pending_contains(&key) {
365             self.push_pending(key, svc, cancel);
366         }
367 
368         fut
369     }
370 }
371 
372 // === Pending ===
373 
374 // Safety: No use unsafe access therefore this is safe.
375 impl<K, S, Req> Unpin for Pending<K, S, Req> {}
376 
377 impl<K, S, Req> Future for Pending<K, S, Req>
378 where
379     S: Service<Req>,
380 {
381     type Output = Result<(K, S, CancelRx), PendingError<K, S::Error>>;
382 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>383     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
384         let mut fut = self.cancel.as_mut().expect("polled after complete");
385         if let Poll::Ready(r) = Pin::new(&mut fut).poll(cx) {
386             assert!(r.is_ok(), "cancel sender lost");
387             let key = self.key.take().expect("polled after complete");
388             return Err(PendingError::Canceled(key)).into();
389         }
390 
391         match self
392             .ready
393             .as_mut()
394             .expect("polled after ready")
395             .poll_ready(cx)
396         {
397             Poll::Pending => Poll::Pending,
398             Poll::Ready(Ok(())) => {
399                 let key = self.key.take().expect("polled after complete");
400                 let cancel = self.cancel.take().expect("polled after complete");
401                 Ok((key, self.ready.take().expect("polled after ready"), cancel)).into()
402             }
403             Poll::Ready(Err(e)) => {
404                 let key = self.key.take().expect("polled after compete");
405                 Err(PendingError::Inner(key, e)).into()
406             }
407         }
408     }
409 }
410