1 //! Pre-emptively retry requests which have been outstanding for longer
2 //! than a given latency percentile.
3 
4 #![warn(
5     missing_debug_implementations,
6     missing_docs,
7     rust_2018_idioms,
8     unreachable_pub
9 )]
10 
11 use crate::filter::Filter;
12 use futures_util::future;
13 use pin_project::pin_project;
14 use std::sync::{Arc, Mutex};
15 use std::time::Duration;
16 use std::{
17     pin::Pin,
18     task::{Context, Poll},
19 };
20 use tracing::error;
21 
22 mod delay;
23 mod latency;
24 mod rotating_histogram;
25 mod select;
26 
27 use delay::Delay;
28 use latency::Latency;
29 use rotating_histogram::RotatingHistogram;
30 use select::Select;
31 
32 type Histo = Arc<Mutex<RotatingHistogram>>;
33 type Service<S, P> = select::Select<
34     SelectPolicy<P>,
35     Latency<Histo, S>,
36     Delay<DelayPolicy, Filter<Latency<Histo, S>, PolicyPredicate<P>>>,
37 >;
38 /// A middleware that pre-emptively retries requests which have been outstanding
39 /// for longer than a given latency percentile.  If either of the original
40 /// future or the retry future completes, that value is used.
41 #[derive(Debug)]
42 pub struct Hedge<S, P>(Service<S, P>);
43 
44 /// The Future returned by the hedge Service.
45 #[pin_project]
46 #[derive(Debug)]
47 pub struct Future<S, Request>
48 where
49     S: tower_service::Service<Request>,
50 {
51     #[pin]
52     inner: S::Future,
53 }
54 
55 type Error = Box<dyn std::error::Error + Send + Sync>;
56 
57 /// A policy which describes which requests can be cloned and then whether those
58 /// requests should be retried.
59 pub trait Policy<Request> {
60     /// clone_request is called when the request is first received to determine
61     /// if the request is retryable.
clone_request(&self, req: &Request) -> Option<Request>62     fn clone_request(&self, req: &Request) -> Option<Request>;
63     /// can_retry is called after the hedge timeout to determine if the hedge
64     /// retry should be issued.
can_retry(&self, req: &Request) -> bool65     fn can_retry(&self, req: &Request) -> bool;
66 }
67 
68 // NOTE: these are pub only because they appear inside a Future<F>
69 
70 #[doc(hidden)]
71 #[derive(Clone, Debug)]
72 pub struct PolicyPredicate<P>(P);
73 #[doc(hidden)]
74 #[derive(Debug)]
75 pub struct DelayPolicy {
76     histo: Histo,
77     latency_percentile: f32,
78 }
79 #[doc(hidden)]
80 #[derive(Debug)]
81 pub struct SelectPolicy<P> {
82     policy: P,
83     histo: Histo,
84     min_data_points: u64,
85 }
86 
87 impl<S, P> Hedge<S, P> {
88     /// Create a new hedge middleware.
new<Request>( service: S, policy: P, min_data_points: u64, latency_percentile: f32, period: Duration, ) -> Hedge<S, P> where S: tower_service::Service<Request> + Clone, S::Error: Into<Error>, P: Policy<Request> + Clone,89     pub fn new<Request>(
90         service: S,
91         policy: P,
92         min_data_points: u64,
93         latency_percentile: f32,
94         period: Duration,
95     ) -> Hedge<S, P>
96     where
97         S: tower_service::Service<Request> + Clone,
98         S::Error: Into<Error>,
99         P: Policy<Request> + Clone,
100     {
101         let histo = Arc::new(Mutex::new(RotatingHistogram::new(period)));
102         Self::new_with_histo(service, policy, min_data_points, latency_percentile, histo)
103     }
104 
105     /// A hedge middleware with a prepopulated latency histogram.  This is usedful
106     /// for integration tests.
new_with_mock_latencies<Request>( service: S, policy: P, min_data_points: u64, latency_percentile: f32, period: Duration, latencies_ms: &[u64], ) -> Hedge<S, P> where S: tower_service::Service<Request> + Clone, S::Error: Into<Error>, P: Policy<Request> + Clone,107     pub fn new_with_mock_latencies<Request>(
108         service: S,
109         policy: P,
110         min_data_points: u64,
111         latency_percentile: f32,
112         period: Duration,
113         latencies_ms: &[u64],
114     ) -> Hedge<S, P>
115     where
116         S: tower_service::Service<Request> + Clone,
117         S::Error: Into<Error>,
118         P: Policy<Request> + Clone,
119     {
120         let histo = Arc::new(Mutex::new(RotatingHistogram::new(period)));
121         {
122             let mut locked = histo.lock().unwrap();
123             for latency in latencies_ms.iter() {
124                 locked.read().record(*latency).unwrap();
125             }
126         }
127         Self::new_with_histo(service, policy, min_data_points, latency_percentile, histo)
128     }
129 
new_with_histo<Request>( service: S, policy: P, min_data_points: u64, latency_percentile: f32, histo: Histo, ) -> Hedge<S, P> where S: tower_service::Service<Request> + Clone, S::Error: Into<Error>, P: Policy<Request> + Clone,130     fn new_with_histo<Request>(
131         service: S,
132         policy: P,
133         min_data_points: u64,
134         latency_percentile: f32,
135         histo: Histo,
136     ) -> Hedge<S, P>
137     where
138         S: tower_service::Service<Request> + Clone,
139         S::Error: Into<Error>,
140         P: Policy<Request> + Clone,
141     {
142         // Clone the underlying service and wrap both copies in a middleware that
143         // records the latencies in a rotating histogram.
144         let recorded_a = Latency::new(histo.clone(), service.clone());
145         let recorded_b = Latency::new(histo.clone(), service);
146 
147         // Check policy to see if the hedge request should be issued.
148         let filtered = Filter::new(recorded_b, PolicyPredicate(policy.clone()));
149 
150         // Delay the second request by a percentile of the recorded request latency
151         // histogram.
152         let delay_policy = DelayPolicy {
153             histo: histo.clone(),
154             latency_percentile,
155         };
156         let delayed = Delay::new(delay_policy, filtered);
157 
158         // If the request is retryable, issue two requests -- the second one delayed
159         // by a latency percentile.  Use the first result to complete.
160         let select_policy = SelectPolicy {
161             policy,
162             histo,
163             min_data_points,
164         };
165         Hedge(Select::new(select_policy, recorded_a, delayed))
166     }
167 }
168 
169 impl<S, P, Request> tower_service::Service<Request> for Hedge<S, P>
170 where
171     S: tower_service::Service<Request> + Clone,
172     S::Error: Into<Error>,
173     P: Policy<Request> + Clone,
174 {
175     type Response = S::Response;
176     type Error = Error;
177     type Future = Future<Service<S, P>, Request>;
178 
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>179     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
180         self.0.poll_ready(cx)
181     }
182 
call(&mut self, request: Request) -> Self::Future183     fn call(&mut self, request: Request) -> Self::Future {
184         Future {
185             inner: self.0.call(request),
186         }
187     }
188 }
189 
190 impl<S, Request> std::future::Future for Future<S, Request>
191 where
192     S: tower_service::Service<Request>,
193     S::Error: Into<Error>,
194 {
195     type Output = Result<S::Response, Error>;
196 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>197     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
198         self.project().inner.poll(cx).map_err(Into::into)
199     }
200 }
201 
202 // TODO: Remove when Duration::as_millis() becomes stable.
203 const NANOS_PER_MILLI: u32 = 1_000_000;
204 const MILLIS_PER_SEC: u64 = 1_000;
millis(duration: Duration) -> u64205 fn millis(duration: Duration) -> u64 {
206     // Round up.
207     let millis = (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI;
208     duration
209         .as_secs()
210         .saturating_mul(MILLIS_PER_SEC)
211         .saturating_add(u64::from(millis))
212 }
213 
214 impl latency::Record for Histo {
record(&mut self, latency: Duration)215     fn record(&mut self, latency: Duration) {
216         let mut locked = self.lock().unwrap();
217         locked.write().record(millis(latency)).unwrap_or_else(|e| {
218             error!("Failed to write to hedge histogram: {:?}", e);
219         })
220     }
221 }
222 
223 impl<P, Request> crate::filter::Predicate<Request> for PolicyPredicate<P>
224 where
225     P: Policy<Request>,
226 {
227     type Future = future::Either<
228         future::Ready<Result<(), crate::filter::error::Error>>,
229         future::Pending<Result<(), crate::filter::error::Error>>,
230     >;
231 
check(&mut self, request: &Request) -> Self::Future232     fn check(&mut self, request: &Request) -> Self::Future {
233         if self.0.can_retry(request) {
234             future::Either::Left(future::ready(Ok(())))
235         } else {
236             // If the hedge retry should not be issued, we simply want to wait
237             // for the result of the original request.  Therefore we don't want
238             // to return an error here.  Instead, we use future::pending to ensure
239             // that the original request wins the select.
240             future::Either::Right(future::pending())
241         }
242     }
243 }
244 
245 impl<Request> delay::Policy<Request> for DelayPolicy {
delay(&self, _req: &Request) -> Duration246     fn delay(&self, _req: &Request) -> Duration {
247         let mut locked = self.histo.lock().unwrap();
248         let millis = locked
249             .read()
250             .value_at_quantile(self.latency_percentile.into());
251         Duration::from_millis(millis)
252     }
253 }
254 
255 impl<P, Request> select::Policy<Request> for SelectPolicy<P>
256 where
257     P: Policy<Request>,
258 {
clone_request(&self, req: &Request) -> Option<Request>259     fn clone_request(&self, req: &Request) -> Option<Request> {
260         self.policy.clone_request(req).filter(|_| {
261             let mut locked = self.histo.lock().unwrap();
262             // Do not attempt a retry if there are insufficiently many data
263             // points in the histogram.
264             locked.read().len() >= self.min_data_points
265         })
266     }
267 }
268