1 //! Pre-emptively retry requests which have been outstanding for longer
2 //! than a given latency percentile.
3
4 #![warn(
5 missing_debug_implementations,
6 missing_docs,
7 rust_2018_idioms,
8 unreachable_pub
9 )]
10
11 use crate::filter::Filter;
12 use futures_util::future;
13 use pin_project::pin_project;
14 use std::sync::{Arc, Mutex};
15 use std::time::Duration;
16 use std::{
17 pin::Pin,
18 task::{Context, Poll},
19 };
20 use tracing::error;
21
22 mod delay;
23 mod latency;
24 mod rotating_histogram;
25 mod select;
26
27 use delay::Delay;
28 use latency::Latency;
29 use rotating_histogram::RotatingHistogram;
30 use select::Select;
31
32 type Histo = Arc<Mutex<RotatingHistogram>>;
33 type Service<S, P> = select::Select<
34 SelectPolicy<P>,
35 Latency<Histo, S>,
36 Delay<DelayPolicy, Filter<Latency<Histo, S>, PolicyPredicate<P>>>,
37 >;
38 /// A middleware that pre-emptively retries requests which have been outstanding
39 /// for longer than a given latency percentile. If either of the original
40 /// future or the retry future completes, that value is used.
41 #[derive(Debug)]
42 pub struct Hedge<S, P>(Service<S, P>);
43
44 /// The Future returned by the hedge Service.
45 #[pin_project]
46 #[derive(Debug)]
47 pub struct Future<S, Request>
48 where
49 S: tower_service::Service<Request>,
50 {
51 #[pin]
52 inner: S::Future,
53 }
54
55 type Error = Box<dyn std::error::Error + Send + Sync>;
56
57 /// A policy which describes which requests can be cloned and then whether those
58 /// requests should be retried.
59 pub trait Policy<Request> {
60 /// clone_request is called when the request is first received to determine
61 /// if the request is retryable.
clone_request(&self, req: &Request) -> Option<Request>62 fn clone_request(&self, req: &Request) -> Option<Request>;
63 /// can_retry is called after the hedge timeout to determine if the hedge
64 /// retry should be issued.
can_retry(&self, req: &Request) -> bool65 fn can_retry(&self, req: &Request) -> bool;
66 }
67
68 // NOTE: these are pub only because they appear inside a Future<F>
69
70 #[doc(hidden)]
71 #[derive(Clone, Debug)]
72 pub struct PolicyPredicate<P>(P);
73 #[doc(hidden)]
74 #[derive(Debug)]
75 pub struct DelayPolicy {
76 histo: Histo,
77 latency_percentile: f32,
78 }
79 #[doc(hidden)]
80 #[derive(Debug)]
81 pub struct SelectPolicy<P> {
82 policy: P,
83 histo: Histo,
84 min_data_points: u64,
85 }
86
87 impl<S, P> Hedge<S, P> {
88 /// Create a new hedge middleware.
new<Request>( service: S, policy: P, min_data_points: u64, latency_percentile: f32, period: Duration, ) -> Hedge<S, P> where S: tower_service::Service<Request> + Clone, S::Error: Into<Error>, P: Policy<Request> + Clone,89 pub fn new<Request>(
90 service: S,
91 policy: P,
92 min_data_points: u64,
93 latency_percentile: f32,
94 period: Duration,
95 ) -> Hedge<S, P>
96 where
97 S: tower_service::Service<Request> + Clone,
98 S::Error: Into<Error>,
99 P: Policy<Request> + Clone,
100 {
101 let histo = Arc::new(Mutex::new(RotatingHistogram::new(period)));
102 Self::new_with_histo(service, policy, min_data_points, latency_percentile, histo)
103 }
104
105 /// A hedge middleware with a prepopulated latency histogram. This is usedful
106 /// for integration tests.
new_with_mock_latencies<Request>( service: S, policy: P, min_data_points: u64, latency_percentile: f32, period: Duration, latencies_ms: &[u64], ) -> Hedge<S, P> where S: tower_service::Service<Request> + Clone, S::Error: Into<Error>, P: Policy<Request> + Clone,107 pub fn new_with_mock_latencies<Request>(
108 service: S,
109 policy: P,
110 min_data_points: u64,
111 latency_percentile: f32,
112 period: Duration,
113 latencies_ms: &[u64],
114 ) -> Hedge<S, P>
115 where
116 S: tower_service::Service<Request> + Clone,
117 S::Error: Into<Error>,
118 P: Policy<Request> + Clone,
119 {
120 let histo = Arc::new(Mutex::new(RotatingHistogram::new(period)));
121 {
122 let mut locked = histo.lock().unwrap();
123 for latency in latencies_ms.iter() {
124 locked.read().record(*latency).unwrap();
125 }
126 }
127 Self::new_with_histo(service, policy, min_data_points, latency_percentile, histo)
128 }
129
new_with_histo<Request>( service: S, policy: P, min_data_points: u64, latency_percentile: f32, histo: Histo, ) -> Hedge<S, P> where S: tower_service::Service<Request> + Clone, S::Error: Into<Error>, P: Policy<Request> + Clone,130 fn new_with_histo<Request>(
131 service: S,
132 policy: P,
133 min_data_points: u64,
134 latency_percentile: f32,
135 histo: Histo,
136 ) -> Hedge<S, P>
137 where
138 S: tower_service::Service<Request> + Clone,
139 S::Error: Into<Error>,
140 P: Policy<Request> + Clone,
141 {
142 // Clone the underlying service and wrap both copies in a middleware that
143 // records the latencies in a rotating histogram.
144 let recorded_a = Latency::new(histo.clone(), service.clone());
145 let recorded_b = Latency::new(histo.clone(), service);
146
147 // Check policy to see if the hedge request should be issued.
148 let filtered = Filter::new(recorded_b, PolicyPredicate(policy.clone()));
149
150 // Delay the second request by a percentile of the recorded request latency
151 // histogram.
152 let delay_policy = DelayPolicy {
153 histo: histo.clone(),
154 latency_percentile,
155 };
156 let delayed = Delay::new(delay_policy, filtered);
157
158 // If the request is retryable, issue two requests -- the second one delayed
159 // by a latency percentile. Use the first result to complete.
160 let select_policy = SelectPolicy {
161 policy,
162 histo,
163 min_data_points,
164 };
165 Hedge(Select::new(select_policy, recorded_a, delayed))
166 }
167 }
168
169 impl<S, P, Request> tower_service::Service<Request> for Hedge<S, P>
170 where
171 S: tower_service::Service<Request> + Clone,
172 S::Error: Into<Error>,
173 P: Policy<Request> + Clone,
174 {
175 type Response = S::Response;
176 type Error = Error;
177 type Future = Future<Service<S, P>, Request>;
178
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>179 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
180 self.0.poll_ready(cx)
181 }
182
call(&mut self, request: Request) -> Self::Future183 fn call(&mut self, request: Request) -> Self::Future {
184 Future {
185 inner: self.0.call(request),
186 }
187 }
188 }
189
190 impl<S, Request> std::future::Future for Future<S, Request>
191 where
192 S: tower_service::Service<Request>,
193 S::Error: Into<Error>,
194 {
195 type Output = Result<S::Response, Error>;
196
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>197 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
198 self.project().inner.poll(cx).map_err(Into::into)
199 }
200 }
201
202 // TODO: Remove when Duration::as_millis() becomes stable.
203 const NANOS_PER_MILLI: u32 = 1_000_000;
204 const MILLIS_PER_SEC: u64 = 1_000;
millis(duration: Duration) -> u64205 fn millis(duration: Duration) -> u64 {
206 // Round up.
207 let millis = (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI;
208 duration
209 .as_secs()
210 .saturating_mul(MILLIS_PER_SEC)
211 .saturating_add(u64::from(millis))
212 }
213
214 impl latency::Record for Histo {
record(&mut self, latency: Duration)215 fn record(&mut self, latency: Duration) {
216 let mut locked = self.lock().unwrap();
217 locked.write().record(millis(latency)).unwrap_or_else(|e| {
218 error!("Failed to write to hedge histogram: {:?}", e);
219 })
220 }
221 }
222
223 impl<P, Request> crate::filter::Predicate<Request> for PolicyPredicate<P>
224 where
225 P: Policy<Request>,
226 {
227 type Future = future::Either<
228 future::Ready<Result<(), crate::filter::error::Error>>,
229 future::Pending<Result<(), crate::filter::error::Error>>,
230 >;
231
check(&mut self, request: &Request) -> Self::Future232 fn check(&mut self, request: &Request) -> Self::Future {
233 if self.0.can_retry(request) {
234 future::Either::Left(future::ready(Ok(())))
235 } else {
236 // If the hedge retry should not be issued, we simply want to wait
237 // for the result of the original request. Therefore we don't want
238 // to return an error here. Instead, we use future::pending to ensure
239 // that the original request wins the select.
240 future::Either::Right(future::pending())
241 }
242 }
243 }
244
245 impl<Request> delay::Policy<Request> for DelayPolicy {
delay(&self, _req: &Request) -> Duration246 fn delay(&self, _req: &Request) -> Duration {
247 let mut locked = self.histo.lock().unwrap();
248 let millis = locked
249 .read()
250 .value_at_quantile(self.latency_percentile.into());
251 Duration::from_millis(millis)
252 }
253 }
254
255 impl<P, Request> select::Policy<Request> for SelectPolicy<P>
256 where
257 P: Policy<Request>,
258 {
clone_request(&self, req: &Request) -> Option<Request>259 fn clone_request(&self, req: &Request) -> Option<Request> {
260 self.policy.clone_request(req).filter(|_| {
261 let mut locked = self.histo.lock().unwrap();
262 // Do not attempt a retry if there are insufficiently many data
263 // points in the histogram.
264 locked.read().len() >= self.min_data_points
265 })
266 }
267 }
268