1 //! A cache of services. 2 3 use super::error; 4 use futures_core::Stream; 5 use futures_util::stream::FuturesUnordered; 6 pub use indexmap::Equivalent; 7 use indexmap::IndexMap; 8 use std::future::Future; 9 use std::hash::Hash; 10 use std::pin::Pin; 11 use std::task::{Context, Poll}; 12 use tokio::sync::oneshot; 13 use tower_service::Service; 14 use tracing::{debug, trace}; 15 16 /// Drives readiness over a set of services. 17 /// 18 /// The cache maintains two internal data structures: 19 /// 20 /// * a set of _pending_ services that have not yet become ready; and 21 /// * a set of _ready_ services that have previously polled ready. 22 /// 23 /// As each `S` typed `Service` is added to the cache via `ReadyCache::push`, it 24 /// is added to the _pending set_. As `ReadyCache::poll_pending` is invoked, 25 /// pending services are polled and added to the _ready set_. 26 /// 27 /// `ReadyCache::call_ready` (or `ReadyCache::call_ready_index`) dispatches a 28 /// request to the specified service, but panics if the specified service is not 29 /// in the ready set. The `ReadyCache::check_*` functions can be used to ensure 30 /// that a service is ready before dispatching a request. 31 /// 32 /// The ready set can hold services for an abitrarily long time. During this 33 /// time, the runtime may process events that invalidate that ready state (for 34 /// instance, if a keepalive detects a lost connection). In such cases, callers 35 /// should use `ReadyCache::check_ready` (or `ReadyCache::check_ready_index`) 36 /// immediately before dispatching a request to ensure that the service has not 37 /// become unavailable. 38 /// 39 /// Once `ReadyCache::call_ready*` is invoked, the service is placed back into 40 /// the _pending_ set to be driven to readiness again. 41 /// 42 /// When `ReadyCache::check_ready*` returns `false`, it indicates that the 43 /// specified service is _not_ ready. If an error is returned, this indicats that 44 /// the server failed and has been removed from the cache entirely. 45 /// 46 /// `ReadyCache::evict` can be used to remove a service from the cache (by key), 47 /// though the service may not be dropped (if it is currently pending) until 48 /// `ReadyCache::poll_pending` is invoked. 49 /// 50 /// Note that the by-index accessors are provided to support use cases (like 51 /// power-of-two-choices load balancing) where the caller does not care to keep 52 /// track of each service's key. Instead, it needs only to access _some_ ready 53 /// service. In such a case, it should be noted that calls to 54 /// `ReadyCache::poll_pending` and `ReadyCache::evict` may perturb the order of 55 /// the ready set, so any cached indexes should be discarded after such a call. 56 #[derive(Debug)] 57 pub struct ReadyCache<K, S, Req> 58 where 59 K: Eq + Hash, 60 { 61 /// A stream of services that are not yet ready. 62 pending: FuturesUnordered<Pending<K, S, Req>>, 63 /// An index of cancelation handles for pending streams. 64 pending_cancel_txs: IndexMap<K, CancelTx>, 65 66 /// Services that have previously become ready. Readiness can become stale, 67 /// so a given service should be polled immediately before use. 68 /// 69 /// The cancelation oneshot is preserved (though unused) while the service is 70 /// ready so that it need not be reallocated each time a request is 71 /// dispatched. 72 ready: IndexMap<K, (S, CancelPair)>, 73 } 74 75 // Safety: This is safe because we do not use `Pin::new_unchecked`. 76 impl<S, K: Eq + Hash, Req> Unpin for ReadyCache<K, S, Req> {} 77 78 type CancelRx = oneshot::Receiver<()>; 79 type CancelTx = oneshot::Sender<()>; 80 type CancelPair = (CancelTx, CancelRx); 81 82 #[derive(Debug)] 83 enum PendingError<K, E> { 84 Canceled(K), 85 Inner(K, E), 86 } 87 88 /// A Future that becomes satisfied when an `S`-typed service is ready. 89 /// 90 /// May fail due to cancelation, i.e. if the service is evicted from the balancer. 91 #[derive(Debug)] 92 struct Pending<K, S, Req> { 93 key: Option<K>, 94 cancel: Option<CancelRx>, 95 ready: Option<S>, 96 _pd: std::marker::PhantomData<Req>, 97 } 98 99 // === ReadyCache === 100 101 impl<K, S, Req> Default for ReadyCache<K, S, Req> 102 where 103 K: Eq + Hash, 104 S: Service<Req>, 105 { default() -> Self106 fn default() -> Self { 107 Self { 108 ready: IndexMap::default(), 109 pending: FuturesUnordered::new(), 110 pending_cancel_txs: IndexMap::default(), 111 } 112 } 113 } 114 115 impl<K, S, Req> ReadyCache<K, S, Req> 116 where 117 K: Eq + Hash, 118 { 119 /// Returns the total number of services in the cache. len(&self) -> usize120 pub fn len(&self) -> usize { 121 self.ready_len() + self.pending_len() 122 } 123 124 /// Returns the number of services in the ready set. ready_len(&self) -> usize125 pub fn ready_len(&self) -> usize { 126 self.ready.len() 127 } 128 129 /// Returns the number of services in the unready set. pending_len(&self) -> usize130 pub fn pending_len(&self) -> usize { 131 self.pending.len() 132 } 133 134 /// Returns true iff the given key is in the unready set. pending_contains<Q: Hash + Equivalent<K>>(&self, key: &Q) -> bool135 pub fn pending_contains<Q: Hash + Equivalent<K>>(&self, key: &Q) -> bool { 136 self.pending_cancel_txs.contains_key(key) 137 } 138 139 /// Obtains a reference to a service in the ready set by key. get_ready<Q: Hash + Equivalent<K>>(&self, key: &Q) -> Option<(usize, &K, &S)>140 pub fn get_ready<Q: Hash + Equivalent<K>>(&self, key: &Q) -> Option<(usize, &K, &S)> { 141 self.ready.get_full(key).map(|(i, k, v)| (i, k, &v.0)) 142 } 143 144 /// Obtains a mutable reference to a service in the ready set by key. get_ready_mut<Q: Hash + Equivalent<K>>( &mut self, key: &Q, ) -> Option<(usize, &K, &mut S)>145 pub fn get_ready_mut<Q: Hash + Equivalent<K>>( 146 &mut self, 147 key: &Q, 148 ) -> Option<(usize, &K, &mut S)> { 149 self.ready 150 .get_full_mut(key) 151 .map(|(i, k, v)| (i, k, &mut v.0)) 152 } 153 154 /// Obtains a reference to a service in the ready set by index. get_ready_index(&self, idx: usize) -> Option<(&K, &S)>155 pub fn get_ready_index(&self, idx: usize) -> Option<(&K, &S)> { 156 self.ready.get_index(idx).map(|(k, v)| (k, &v.0)) 157 } 158 159 /// Obtains a mutable reference to a service in the ready set by index. get_ready_index_mut(&mut self, idx: usize) -> Option<(&mut K, &mut S)>160 pub fn get_ready_index_mut(&mut self, idx: usize) -> Option<(&mut K, &mut S)> { 161 self.ready.get_index_mut(idx).map(|(k, v)| (k, &mut v.0)) 162 } 163 164 /// Evicts an item from the cache. 165 /// 166 /// Returns true if a service was marked for eviction. 167 /// 168 /// Services are dropped from the ready set immediately. Services in the 169 /// pending set are marked for cancellation, but `ReadyCache::poll_pending` 170 /// must be called to cause the service to be dropped. evict<Q: Hash + Equivalent<K>>(&mut self, key: &Q) -> bool171 pub fn evict<Q: Hash + Equivalent<K>>(&mut self, key: &Q) -> bool { 172 let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) { 173 c.send(()).expect("cancel receiver lost"); 174 true 175 } else { 176 false 177 }; 178 179 self.ready 180 .swap_remove_full(key) 181 .map(|_| true) 182 .unwrap_or(canceled) 183 } 184 } 185 186 impl<K, S, Req> ReadyCache<K, S, Req> 187 where 188 K: Clone + Eq + Hash, 189 S: Service<Req>, 190 <S as Service<Req>>::Error: Into<error::Error>, 191 S::Error: Into<error::Error>, 192 { 193 /// Pushes a new service onto the pending set. 194 /// 195 /// The service will be promoted to the ready set as `poll_pending` is invoked. 196 /// 197 /// Note that this does **not** remove services from the ready set. Once the 198 /// old service is used, it will be dropped instead of being added back to 199 /// the pending set; OR, when the new service becomes ready, it will replace 200 /// the prior service in the ready set. push(&mut self, key: K, svc: S)201 pub fn push(&mut self, key: K, svc: S) { 202 let cancel = oneshot::channel(); 203 self.push_pending(key, svc, cancel); 204 } 205 push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair)206 fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) { 207 if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) { 208 // If there is already a service for this key, cancel it. 209 c.send(()).expect("cancel receiver lost"); 210 } 211 self.pending.push(Pending { 212 key: Some(key), 213 cancel: Some(cancel_rx), 214 ready: Some(svc), 215 _pd: std::marker::PhantomData, 216 }); 217 } 218 219 /// Polls services pending readiness, adding ready services to the ready set. 220 /// 221 /// Returns `Async::Ready` when there are no remaining unready services. 222 /// `poll_pending` should be called again after `push_service` or 223 /// `call_ready_index` are invoked. 224 /// 225 /// Failures indicate that an individual pending service failed to become 226 /// ready (and has been removed from the cache). In such a case, 227 /// `poll_pending` should typically be called again to continue driving 228 /// pending services to readiness. poll_pending(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), error::Failed<K>>>229 pub fn poll_pending(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), error::Failed<K>>> { 230 loop { 231 match Pin::new(&mut self.pending).poll_next(cx) { 232 Poll::Pending => return Poll::Pending, 233 Poll::Ready(None) => return Poll::Ready(Ok(())), 234 Poll::Ready(Some(Ok((key, svc, cancel_rx)))) => { 235 trace!("endpoint ready"); 236 let cancel_tx = self.pending_cancel_txs.swap_remove(&key); 237 if let Some(cancel_tx) = cancel_tx { 238 // Keep track of the cancelation so that it need not be 239 // recreated after the service is used. 240 self.ready.insert(key, (svc, (cancel_tx, cancel_rx))); 241 } else { 242 // This should not technically be possible. We must have decided to cancel 243 // a Service (by sending on the CancelTx), yet that same service then 244 // returns Ready. Since polling a Pending _first_ polls the CancelRx, that 245 // _should_ always see our CancelTx send. Yet empirically, that isn't true: 246 // 247 // https://github.com/tower-rs/tower/issues/415 248 // 249 // So, we instead detect the endpoint as canceled at this point. That 250 // should be fine, since the oneshot is only really there to ensure that 251 // the Pending is polled again anyway. 252 // 253 // We assert that this can't happen in debug mode so that hopefully one day 254 // we can find a test that triggers this reliably. 255 debug_assert!(cancel_tx.is_some()); 256 debug!("canceled endpoint removed when ready"); 257 } 258 } 259 Poll::Ready(Some(Err(PendingError::Canceled(_)))) => { 260 debug!("endpoint canceled"); 261 // The cancellation for this service was removed in order to 262 // cause this cancellation. 263 } 264 Poll::Ready(Some(Err(PendingError::Inner(key, e)))) => { 265 let cancel_tx = self.pending_cancel_txs.swap_remove(&key); 266 if let Some(_) = cancel_tx { 267 return Err(error::Failed(key, e.into())).into(); 268 } else { 269 // See comment for the same clause under Ready(Some(Ok)). 270 debug_assert!(cancel_tx.is_some()); 271 debug!("canceled endpoint removed on error"); 272 } 273 } 274 } 275 } 276 } 277 278 /// Checks whether the referenced endpoint is ready. 279 /// 280 /// Returns true if the endpoint is ready and false if it is not. An error is 281 /// returned if the endpoint fails. check_ready<Q: Hash + Equivalent<K>>( &mut self, cx: &mut Context<'_>, key: &Q, ) -> Result<bool, error::Failed<K>>282 pub fn check_ready<Q: Hash + Equivalent<K>>( 283 &mut self, 284 cx: &mut Context<'_>, 285 key: &Q, 286 ) -> Result<bool, error::Failed<K>> { 287 match self.ready.get_full_mut(key) { 288 Some((index, _, _)) => self.check_ready_index(cx, index), 289 None => Ok(false), 290 } 291 } 292 293 /// Checks whether the referenced endpoint is ready. 294 /// 295 /// If the service is no longer ready, it is moved back into the pending set 296 /// and `false` is returned. 297 /// 298 /// If the service errors, it is removed and dropped and the error is returned. check_ready_index( &mut self, cx: &mut Context<'_>, index: usize, ) -> Result<bool, error::Failed<K>>299 pub fn check_ready_index( 300 &mut self, 301 cx: &mut Context<'_>, 302 index: usize, 303 ) -> Result<bool, error::Failed<K>> { 304 let svc = match self.ready.get_index_mut(index) { 305 None => return Ok(false), 306 Some((_, (svc, _))) => svc, 307 }; 308 match svc.poll_ready(cx) { 309 Poll::Ready(Ok(())) => Ok(true), 310 Poll::Pending => { 311 // became unready; so move it back there. 312 let (key, (svc, cancel)) = self 313 .ready 314 .swap_remove_index(index) 315 .expect("invalid ready index"); 316 317 // If a new version of this service has been added to the 318 // unready set, don't overwrite it. 319 if !self.pending_contains(&key) { 320 self.push_pending(key, svc, cancel); 321 } 322 323 Ok(false) 324 } 325 Poll::Ready(Err(e)) => { 326 // failed, so drop it. 327 let (key, _) = self 328 .ready 329 .swap_remove_index(index) 330 .expect("invalid ready index"); 331 Err(error::Failed(key, e.into())) 332 } 333 } 334 } 335 336 /// Calls a ready service by key. 337 /// 338 /// # Panics 339 /// 340 /// If the specified key does not exist in the ready call_ready<Q: Hash + Equivalent<K>>(&mut self, key: &Q, req: Req) -> S::Future341 pub fn call_ready<Q: Hash + Equivalent<K>>(&mut self, key: &Q, req: Req) -> S::Future { 342 let (index, _, _) = self 343 .ready 344 .get_full_mut(key) 345 .expect("check_ready was not called"); 346 self.call_ready_index(index, req) 347 } 348 349 /// Calls a ready service by index. 350 /// 351 /// # Panics 352 /// 353 /// If the specified index is out of range. call_ready_index(&mut self, index: usize, req: Req) -> S::Future354 pub fn call_ready_index(&mut self, index: usize, req: Req) -> S::Future { 355 let (key, (mut svc, cancel)) = self 356 .ready 357 .swap_remove_index(index) 358 .expect("check_ready_index was not called"); 359 360 let fut = svc.call(req); 361 362 // If a new version of this service has been added to the 363 // unready set, don't overwrite it. 364 if !self.pending_contains(&key) { 365 self.push_pending(key, svc, cancel); 366 } 367 368 fut 369 } 370 } 371 372 // === Pending === 373 374 // Safety: No use unsafe access therefore this is safe. 375 impl<K, S, Req> Unpin for Pending<K, S, Req> {} 376 377 impl<K, S, Req> Future for Pending<K, S, Req> 378 where 379 S: Service<Req>, 380 { 381 type Output = Result<(K, S, CancelRx), PendingError<K, S::Error>>; 382 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>383 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 384 let mut fut = self.cancel.as_mut().expect("polled after complete"); 385 if let Poll::Ready(r) = Pin::new(&mut fut).poll(cx) { 386 assert!(r.is_ok(), "cancel sender lost"); 387 let key = self.key.take().expect("polled after complete"); 388 return Err(PendingError::Canceled(key)).into(); 389 } 390 391 match self 392 .ready 393 .as_mut() 394 .expect("polled after ready") 395 .poll_ready(cx) 396 { 397 Poll::Pending => Poll::Pending, 398 Poll::Ready(Ok(())) => { 399 let key = self.key.take().expect("polled after complete"); 400 let cancel = self.cancel.take().expect("polled after complete"); 401 Ok((key, self.ready.take().expect("polled after ready"), cancel)).into() 402 } 403 Poll::Ready(Err(e)) => { 404 let key = self.key.take().expect("polled after compete"); 405 Err(PendingError::Inner(key, e)).into() 406 } 407 } 408 } 409 } 410