1 //! Implement Tor's sort-of-Pareto estimator for circuit build timeouts.
2 //!
3 //! Our build times don't truly follow a
4 //! [Pareto](https://en.wikipedia.org/wiki/Pareto_distribution)
5 //! distribution; instead they seem to be closer to a
6 //! [Fréchet](https://en.wikipedia.org/wiki/Fr%C3%A9chet_distribution)
7 //! distribution.  But those are hard to work with, and we only care
8 //! about the right tail, so we're using Pareto instead.
9 //!
10 //! This estimator also includes several heuristics and kludges to
11 //! try to behave better on unreliable networks.
12 //! For more information on the exact algorithms and their rationales,
13 //! see [`path-spec.txt`](https://gitlab.torproject.org/tpo/core/torspec/-/blob/master/path-spec.txt).
14 
15 use bounded_vec_deque::BoundedVecDeque;
16 use serde::{Deserialize, Serialize};
17 use std::collections::{BTreeMap, HashMap};
18 use std::convert::TryInto;
19 use std::time::Duration;
20 use tor_netdir::params::NetParameters;
21 
22 use super::Action;
23 use tor_persist::JsonValue;
24 
25 /// How many circuit build time observations do we record?
26 const TIME_HISTORY_LEN: usize = 1000;
27 
28 /// How many circuit success-versus-timeout observations do we record
29 /// by default?
30 const SUCCESS_HISTORY_DEFAULT_LEN: usize = 20;
31 
32 /// How many milliseconds wide is each bucket in our histogram?
33 const BUCKET_WIDTH_MSEC: u32 = 10;
34 
35 /// A circuit build time or timeout duration, measured in milliseconds.
36 ///
37 /// Requires that we don't care about tracking timeouts above u32::MAX
38 /// milliseconds (about 49 days).
39 #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
40 #[serde(transparent)]
41 struct MsecDuration(u32);
42 
43 impl MsecDuration {
44     /// Convert a Duration into a MsecDuration, saturating
45     /// extra-high values to u32::MAX milliseconds.
new_saturating(d: &Duration) -> Self46     fn new_saturating(d: &Duration) -> Self {
47         let msec = std::cmp::min(d.as_millis(), u128::from(u32::MAX)) as u32;
48         MsecDuration(msec)
49     }
50 }
51 
52 /// Module to hold calls to const_assert.
53 ///
54 /// This is a separate module so we can change the clippy warnings on it.
55 #[allow(clippy::checked_conversions)]
56 mod assertion {
57     use static_assertions::const_assert;
58     // If this assertion is untrue, then we can't safely use u16 fields in
59     // time_histogram.
60     const_assert!(super::TIME_HISTORY_LEN <= u16::MAX as usize);
61 }
62 
63 /// A history of circuit timeout observations, used to estimate our
64 /// likely circuit timeouts.
65 #[derive(Debug, Clone)]
66 struct History {
67     /// Our most recent observed circuit construction times.
68     ///
69     /// For the purpose of this estimator, a circuit counts as
70     /// "constructed" when a certain "significant" hop (typically the third)
71     /// is completed.
72     time_history: BoundedVecDeque<MsecDuration>,
73 
74     /// A histogram representation of the values in [`History::time_history`].
75     ///
76     /// This histogram is implemented as a sparse map from the center
77     /// value of each histogram bucket to the number of entries in
78     /// that bucket.  It is completely derivable from time_history; we
79     /// keep it separate here for efficiency.
80     time_histogram: BTreeMap<MsecDuration, u16>,
81 
82     /// Our most recent circuit timeout statuses.
83     ///
84     /// Each `true` value represents a successfully completed circuit
85     /// (all hops).  Each `false` value represents a circuit that
86     /// timed out after having completed at least one hop.
87     success_history: BoundedVecDeque<bool>,
88 }
89 
90 impl History {
91     /// Initialize a new empty `History` with no observations.
new_empty() -> Self92     fn new_empty() -> Self {
93         History {
94             time_history: BoundedVecDeque::new(TIME_HISTORY_LEN),
95             time_histogram: BTreeMap::new(),
96             success_history: BoundedVecDeque::new(SUCCESS_HISTORY_DEFAULT_LEN),
97         }
98     }
99 
100     /// Remove all observations from this `History`.
clear(&mut self)101     fn clear(&mut self) {
102         self.time_history.clear();
103         self.time_histogram.clear();
104         self.success_history.clear();
105     }
106 
107     /// Change the number of successes to record in our success
108     /// history to `n`.
set_success_history_len(&mut self, n: usize)109     fn set_success_history_len(&mut self, n: usize) {
110         if n < self.success_history.len() {
111             self.success_history
112                 .drain(0..(self.success_history.len() - n));
113         }
114         self.success_history.set_max_len(n);
115     }
116 
117     /// Change the number of circuit time observations to record in
118     /// our time history to `n`.
119     ///
120     /// This is a testing-only function.
121     #[cfg(test)]
set_time_history_len(&mut self, n: usize)122     fn set_time_history_len(&mut self, n: usize) {
123         self.time_history.set_max_len(n);
124     }
125 
126     /// Construct a new `History` from an iterator representing a sparse
127     /// histogram of values.
128     ///
129     /// The input must be a sequence of `(D,N)` tuples, where each `D`
130     /// represents a circuit build duration, and `N` represents the
131     /// number of observations with that duration.
132     ///
133     /// These observations are shuffled into a random order, then
134     /// added to a new History.
from_sparse_histogram<I>(iter: I) -> Self where I: Iterator<Item = (MsecDuration, u16)>,135     fn from_sparse_histogram<I>(iter: I) -> Self
136     where
137         I: Iterator<Item = (MsecDuration, u16)>,
138     {
139         // XXXX if the input is bogus, then this could be a huge array.
140         let mut observations = Vec::new();
141         for (d, n) in iter {
142             for _ in 0..n {
143                 observations.push(d);
144             }
145         }
146         use rand::seq::SliceRandom;
147         let mut rng = rand::thread_rng();
148         observations[..].shuffle(&mut rng);
149 
150         let mut result = History::new_empty();
151         for obs in observations {
152             result.add_time(obs);
153         }
154         result
155     }
156 
157     /// Return an iterator yielding a sparse histogram of the circuit build
158     /// time values in this `History`.
159     ///
160     /// Each histogram entry is a `(D,N)` tuple, where `D` is the
161     /// center of a histogram bucket, and `N` is the number of
162     /// observations in that bucket.
163     ///
164     /// Buckets with `N=0` are omitted.  Buckets are yielded in order.
sparse_histogram(&self) -> impl Iterator<Item = (MsecDuration, u16)> + '_165     fn sparse_histogram(&self) -> impl Iterator<Item = (MsecDuration, u16)> + '_ {
166         self.time_histogram.iter().map(|(d, n)| (*d, *n))
167     }
168 
169     /// Return the center value for the bucket containing `time`.
bucket_center(time: MsecDuration) -> MsecDuration170     fn bucket_center(time: MsecDuration) -> MsecDuration {
171         let idx = time.0 / BUCKET_WIDTH_MSEC;
172         let msec = (idx * BUCKET_WIDTH_MSEC) + (BUCKET_WIDTH_MSEC) / 2;
173         MsecDuration(msec)
174     }
175 
176     /// Increment the histogram bucket containing `time` by one.
inc_bucket(&mut self, time: MsecDuration)177     fn inc_bucket(&mut self, time: MsecDuration) {
178         let center = History::bucket_center(time);
179         *self.time_histogram.entry(center).or_insert(0) += 1;
180     }
181 
182     /// Decrement the histogram bucket containing `time` by one, removing
183     /// it if it becomes 0.
dec_bucket(&mut self, time: MsecDuration)184     fn dec_bucket(&mut self, time: MsecDuration) {
185         use std::collections::btree_map::Entry;
186         let center = History::bucket_center(time);
187         match self.time_histogram.entry(center) {
188             Entry::Vacant(_) => {
189                 // this is a bug.
190             }
191             Entry::Occupied(e) if e.get() <= &1 => {
192                 e.remove();
193             }
194             Entry::Occupied(mut e) => {
195                 *e.get_mut() -= 1;
196             }
197         }
198     }
199 
200     /// Add `time` to our list of circuit build time observations, and
201     /// adjust the histogram accordingly.
add_time(&mut self, time: MsecDuration)202     fn add_time(&mut self, time: MsecDuration) {
203         match self.time_history.push_back(time) {
204             None => {}
205             Some(removed_time) => {
206                 // `removed_time` just fell off the end of the deque:
207                 // remove it from the histogram.
208                 self.dec_bucket(removed_time);
209             }
210         }
211         self.inc_bucket(time);
212     }
213 
214     /// Return the number of observations in our time history.
215     ///
216     /// This will always be `<= TIME_HISTORY_LEN`.
n_times(&self) -> usize217     fn n_times(&self) -> usize {
218         self.time_history.len()
219     }
220 
221     /// Record a success (true) or timeout (false) in our record of whether
222     /// circuits timed out or not.
add_success(&mut self, succeeded: bool)223     fn add_success(&mut self, succeeded: bool) {
224         self.success_history.push_back(succeeded);
225     }
226 
227     /// Return the number of timeouts recorded in our success history.
n_recent_timeouts(&self) -> usize228     fn n_recent_timeouts(&self) -> usize {
229         self.success_history.iter().filter(|x| !**x).count()
230     }
231 
232     /// Helper: return the `n` most frequent histogram bins.
n_most_frequent_bins(&self, n: usize) -> Vec<(MsecDuration, u16)>233     fn n_most_frequent_bins(&self, n: usize) -> Vec<(MsecDuration, u16)> {
234         use itertools::Itertools;
235         // we use cmp::Reverse here so that we can use k_smallest as
236         // if it were "k_largest".
237         use std::cmp::Reverse;
238 
239         // We want the buckets that have the _largest_ counts; we want
240         // to break ties in favor of the _smallest_ values.  So we
241         // apply Reverse only to the counts before passing the tuples
242         // to k_smallest.
243 
244         self.sparse_histogram()
245             .map(|(center, count)| (Reverse(count), center))
246             // (k_smallest runs in O(n_bins * lg(n))
247             .k_smallest(n)
248             .into_iter()
249             .map(|(Reverse(count), center)| (center, count))
250             .collect()
251     }
252 
253     /// Return an estimator for the `X_m` of our Pareto distribution,
254     /// by looking at the `n_modes` most frequently filled histogram
255     /// bins.
256     ///
257     /// It is not a true `X_m` value, since there are definitely
258     /// values less than this, but it seems to work as a decent
259     /// heuristic.
260     ///
261     /// Return `None` if we have no observations.
estimate_xm(&self, n_modes: usize) -> Option<u32>262     fn estimate_xm(&self, n_modes: usize) -> Option<u32> {
263         // From path-spec:
264         //   Tor clients compute the Xm parameter using the weighted
265         //   average of the the midpoints of the 'cbtnummodes' (10)
266         //   most frequently occurring 10ms histogram bins.
267 
268         // The most frequently used bins.
269         let bins = self.n_most_frequent_bins(n_modes);
270         // Total number of observations in these bins.
271         let n_observations: u16 = bins.iter().map(|(_, n)| n).sum();
272         // Sum of all observations in these bins.
273         let total_observations: u64 = bins
274             .iter()
275             .map(|(d, n)| u64::from(d.0 * u32::from(*n)))
276             .sum();
277 
278         if n_observations == 0 {
279             None
280         } else {
281             Some((total_observations / u64::from(n_observations)) as u32)
282         }
283     }
284 
285     /// Compute a maximum-likelihood pareto distribution based on this
286     /// history, computing `X_m` based on the `n_modes` most frequent
287     /// histograms.
288     ///
289     /// Return None if we have no observations.
pareto_estimate(&self, n_modes: usize) -> Option<ParetoDist>290     fn pareto_estimate(&self, n_modes: usize) -> Option<ParetoDist> {
291         let xm = self.estimate_xm(n_modes)?;
292 
293         // From path-spec:
294         //     alpha = n/(Sum_n{ln(MAX(Xm, x_i))} - n*ln(Xm))
295 
296         let n = self.time_history.len();
297         let sum_of_log_observations: f64 = self
298             .time_history
299             .iter()
300             .map(|m| f64::from(std::cmp::max(m.0, xm)).ln())
301             .sum();
302         let sum_of_log_xm = (n as f64) * f64::from(xm).ln();
303 
304         // We're computing 1/alpha here, instead of alpha.  This avoids
305         // division by zero, and has the advantage of being what our
306         // quantile estimator actually needs.
307         let inv_alpha = (sum_of_log_observations - sum_of_log_xm) / (n as f64);
308 
309         Some(ParetoDist {
310             x_m: f64::from(xm),
311             inv_alpha,
312         })
313     }
314 }
315 
316 /// A Pareto distribution, for use in estimating timeouts.
317 ///
318 /// Values are represented by a number of milliseconds.
319 #[derive(Debug)]
320 struct ParetoDist {
321     /// The lower bound for the pareto distribution.
322     x_m: f64,
323     /// The inverse of the alpha parameter in the pareto distribution.
324     ///
325     /// (We use 1/alpha here to save a step in [`ParetoDist::quantile`].
326     inv_alpha: f64,
327 }
328 
329 impl ParetoDist {
330     /// Compute an inverse CDF for this distribution.
331     ///
332     /// Given a `q` value between 0 and 1, compute a distribution `v`
333     /// value such that `q` of the Pareto Distribution is expected to
334     /// be less than `v`.
335     ///
336     /// If `q` is out of bounds, it is clamped to [0.0, 1.0].
quantile(&self, q: f64) -> f64337     fn quantile(&self, q: f64) -> f64 {
338         let q = q.clamp(0.0, 1.0);
339         self.x_m / ((1.0 - q).powf(self.inv_alpha))
340     }
341 }
342 
343 /// A set of parameters determining the behavior of a ParetoTimeoutEstimator.
344 ///
345 /// These are typically derived from a set of consensus parameters.
346 #[derive(Clone, Debug)]
347 pub(crate) struct Params {
348     /// Should we use our estimates when deciding on circuit timeouts.
349     ///
350     /// When this is false, our timeouts are fixed to the default.
351     use_estimates: bool,
352     /// How many observations must we have made before we can use our
353     /// Pareto estimators to guess a good set of timeouts?
354     min_observations: u16,
355     /// Which hop is the "significant hop" we should use when recording circuit
356     /// build times?  (Watch out! This is zero-indexed.)
357     significant_hop: u8,
358     /// A quantile (in range [0.0,1.0]) describing a point in the
359     /// Pareto distribution to use when determining when a circuit
360     /// should be treated as having "timed out".
361     ///
362     /// (A "timed out" circuit continues building for measurement
363     /// purposes, but can't be used for traffic.)
364     timeout_quantile: f64,
365     /// A quantile (in range [0.0,1.0]) describing a point in the Pareto
366     /// distribution to use when determining when a circuit should be
367     /// "abandoned".
368     ///
369     /// (An "abandoned" circuit is stopped entirely, and not included
370     /// in measurements.
371     abandon_quantile: f64,
372     /// Default values to return from the `timeouts` function when we
373     /// have no observations.
374     default_thresholds: (Duration, Duration),
375     /// Number of histogram buckets to use when determining the Xm estimate.
376     ///
377     /// (See [`History::estimate_xm`] for details.)
378     n_modes_for_xm: usize,
379     /// How many entries do we record in our success/timeout history?
380     success_history_len: usize,
381     /// How many timeouts should we allow in our success/timeout history
382     /// before we assume that network has changed in a way that makes
383     /// our estimates completely wrong?
384     reset_after_timeouts: usize,
385     /// Minimum base timeout to ever infer or return.
386     min_timeout: Duration,
387 }
388 
389 impl Default for Params {
default() -> Self390     fn default() -> Self {
391         Params {
392             use_estimates: true,
393             min_observations: 100,
394             significant_hop: 2,
395             timeout_quantile: 0.80,
396             abandon_quantile: 0.99,
397             default_thresholds: (Duration::from_secs(60), Duration::from_secs(60)),
398             n_modes_for_xm: 10,
399             success_history_len: SUCCESS_HISTORY_DEFAULT_LEN,
400             reset_after_timeouts: 18,
401             min_timeout: Duration::from_millis(10),
402         }
403     }
404 }
405 
406 impl From<&NetParameters> for Params {
from(p: &NetParameters) -> Params407     fn from(p: &NetParameters) -> Params {
408         // Because of the underlying bounds, the "unwrap_or_else"
409         // conversions here should be impossible, and the "as"
410         // conversions should always be in-range.
411 
412         let timeout = p
413             .cbt_initial_timeout
414             .try_into()
415             .unwrap_or_else(|_| Duration::from_secs(60));
416         let learning_disabled: bool = p.cbt_learning_disabled.into();
417         Params {
418             use_estimates: !learning_disabled,
419             min_observations: p.cbt_min_circs_for_estimate.get() as u16,
420             significant_hop: 2,
421             timeout_quantile: p.cbt_timeout_quantile.as_fraction(),
422             abandon_quantile: p.cbt_abandon_quantile.as_fraction(),
423             default_thresholds: (timeout, timeout),
424             n_modes_for_xm: p.cbt_num_xm_modes.get() as usize,
425             success_history_len: p.cbt_success_count.get() as usize,
426             reset_after_timeouts: p.cbt_max_timeouts.get() as usize,
427             min_timeout: p
428                 .cbt_min_timeout
429                 .try_into()
430                 .unwrap_or_else(|_| Duration::from_millis(10)),
431         }
432     }
433 }
434 
435 /// Tor's default circuit build timeout estimator.
436 ///
437 /// This object records a set of observed circuit build times, and
438 /// uses it to determine good values for how long we should allow
439 /// circuits to build.
440 ///
441 /// For full details of the algorithms used, see
442 /// [`path-spec.txt`](https://gitlab.torproject.org/tpo/core/torspec/-/blob/master/path-spec.txt).
443 pub(crate) struct ParetoTimeoutEstimator {
444     /// Our observations for circuit build times and success/failure
445     /// history.
446     history: History,
447 
448     /// Our most recent timeout estimate, if we have one that is
449     /// up-to-date.
450     ///
451     /// (We reset this to None whenever we get a new observation.)
452     timeouts: Option<(Duration, Duration)>,
453 
454     /// The timeouts that we use when we do not have sufficient observations
455     /// to conclude anything about our circuit build times.
456     ///
457     /// These start out as `p.default_thresholds`, but can be adjusted
458     /// depending on how many timeouts we've been seeing.
459     fallback_timeouts: (Duration, Duration),
460 
461     /// A set of parameters to use in computing circuit build timeout
462     /// estimates.
463     p: Params,
464 }
465 
466 impl Default for ParetoTimeoutEstimator {
default() -> Self467     fn default() -> Self {
468         Self::from_history(History::new_empty())
469     }
470 }
471 
472 /// An object used to serialize our timeout history for persistent state.
473 #[derive(Clone, Debug, Serialize, Deserialize, Default)]
474 #[serde(default)]
475 pub(crate) struct ParetoTimeoutState {
476     /// A version field used to help encoding and decoding.
477     #[allow(dead_code)]
478     version: usize,
479     /// A record of observed timeouts, as returned by `sparse_histogram()`.
480     histogram: Vec<(MsecDuration, u16)>,
481     /// The current timeout estimate: kept for reference.
482     current_timeout: Option<MsecDuration>,
483 
484     /// Fields from the state file that was used to make this `ParetoTimeoutState` that
485     /// this version of Arti doesn't understand.
486     #[serde(flatten)]
487     unknown_fields: HashMap<String, JsonValue>,
488 }
489 
490 impl ParetoTimeoutState {
491     /// Return the latest base timeout estimate, as recorded in this state.
latest_estimate(&self) -> Option<Duration>492     pub(crate) fn latest_estimate(&self) -> Option<Duration> {
493         self.current_timeout
494             .map(|m| Duration::from_millis(m.0.into()))
495     }
496 }
497 
498 impl ParetoTimeoutEstimator {
499     /// Construct a new ParetoTimeoutEstimator from the provided history
500     /// object.
from_history(history: History) -> Self501     fn from_history(history: History) -> Self {
502         let p = Params::default();
503         ParetoTimeoutEstimator {
504             history,
505             timeouts: None,
506             fallback_timeouts: p.default_thresholds,
507             p,
508         }
509     }
510 
511     /// Create a new ParetoTimeoutEstimator based on a loaded
512     /// ParetoTimeoutState.
from_state(state: ParetoTimeoutState) -> Self513     pub(crate) fn from_state(state: ParetoTimeoutState) -> Self {
514         let history = History::from_sparse_histogram(state.histogram.into_iter());
515         Self::from_history(history)
516     }
517 
518     /// Compute an unscaled basic pair of timeouts for a circuit of
519     /// the "normal" length.
520     ///
521     /// Return a cached value if we have no observations since the
522     /// last time this function was called.
base_timeouts(&mut self) -> (Duration, Duration)523     fn base_timeouts(&mut self) -> (Duration, Duration) {
524         if let Some(x) = self.timeouts {
525             // Great; we have a cached value.
526             return x;
527         }
528 
529         if self.history.n_times() < self.p.min_observations as usize {
530             // We don't have enough values to estimate.
531             return self.fallback_timeouts;
532         }
533 
534         // Here we're going to compute the timeouts, cache them, and
535         // return them.
536         let dist = match self.history.pareto_estimate(self.p.n_modes_for_xm) {
537             Some(dist) => dist,
538             None => {
539                 return self.fallback_timeouts;
540             }
541         };
542         let timeout_threshold = dist.quantile(self.p.timeout_quantile);
543         let abandon_threshold = dist
544             .quantile(self.p.abandon_quantile)
545             .max(timeout_threshold);
546 
547         let timeouts = (
548             Duration::from_secs_f64(timeout_threshold / 1000.0).max(self.p.min_timeout),
549             Duration::from_secs_f64(abandon_threshold / 1000.0).max(self.p.min_timeout),
550         );
551         self.timeouts = Some(timeouts);
552 
553         timeouts
554     }
555 }
556 
557 impl super::TimeoutEstimator for ParetoTimeoutEstimator {
update_params(&mut self, p: &NetParameters)558     fn update_params(&mut self, p: &NetParameters) {
559         let parameters = p.into();
560         self.p = parameters;
561         let new_success_len = self.p.success_history_len;
562         self.history.set_success_history_len(new_success_len);
563     }
564 
note_hop_completed(&mut self, hop: u8, delay: Duration, is_last: bool)565     fn note_hop_completed(&mut self, hop: u8, delay: Duration, is_last: bool) {
566         if hop == self.p.significant_hop {
567             let time = MsecDuration::new_saturating(&delay);
568             self.history.add_time(time);
569             self.timeouts.take();
570         }
571         if is_last {
572             self.history.add_success(true);
573         }
574     }
575 
note_circ_timeout(&mut self, hop: u8, _delay: Duration)576     fn note_circ_timeout(&mut self, hop: u8, _delay: Duration) {
577         // XXXXX This only counts if we have recent-enough
578         // activity.  See circuit_build_times_network_check_live.
579         if hop > 0 {
580             self.history.add_success(false);
581             if self.history.n_recent_timeouts() > self.p.reset_after_timeouts {
582                 let base_timeouts = self.base_timeouts();
583                 self.history.clear();
584                 self.timeouts.take();
585                 // If we already had a timeout that was at least the
586                 // length of our fallback timeouts, we should double
587                 // those fallback timeouts.
588                 if base_timeouts.0 >= self.fallback_timeouts.0 {
589                     self.fallback_timeouts.0 *= 2;
590                     self.fallback_timeouts.1 *= 2;
591                 }
592             }
593         }
594     }
595 
timeouts(&mut self, action: &Action) -> (Duration, Duration)596     fn timeouts(&mut self, action: &Action) -> (Duration, Duration) {
597         let (base_t, base_a) = if self.p.use_estimates {
598             self.base_timeouts()
599         } else {
600             // If we aren't using this estimator, then just return the
601             // default thresholds from our parameters.
602             return self.p.default_thresholds;
603         };
604 
605         let reference_action = Action::BuildCircuit {
606             length: self.p.significant_hop as usize + 1,
607         };
608         debug_assert!(reference_action.timeout_scale() > 0);
609 
610         let multiplier =
611             (action.timeout_scale() as f64) / (reference_action.timeout_scale() as f64);
612 
613         // TODO-SPEC The spec define any of self.  Tor doesn't multiply the
614         // abandon timeout.
615         // XXXX `mul_f64()` can panic if we overflow Duration.
616         (base_t.mul_f64(multiplier), base_a.mul_f64(multiplier))
617     }
618 
learning_timeouts(&self) -> bool619     fn learning_timeouts(&self) -> bool {
620         self.p.use_estimates && self.history.n_times() < self.p.min_observations.into()
621     }
622 
build_state(&mut self) -> Option<ParetoTimeoutState>623     fn build_state(&mut self) -> Option<ParetoTimeoutState> {
624         let cur_timeout = MsecDuration::new_saturating(&self.base_timeouts().0);
625         Some(ParetoTimeoutState {
626             version: 1,
627             histogram: self.history.sparse_histogram().collect(),
628             current_timeout: Some(cur_timeout),
629             unknown_fields: Default::default(),
630         })
631     }
632 }
633 
634 #[cfg(test)]
635 mod test {
636     #![allow(clippy::unwrap_used)]
637     use super::*;
638     use crate::timeouts::TimeoutEstimator;
639 
640     /// Return an action to build a 3-hop circuit.
b3() -> Action641     fn b3() -> Action {
642         Action::BuildCircuit { length: 3 }
643     }
644 
645     impl From<u32> for MsecDuration {
from(v: u32) -> Self646         fn from(v: u32) -> Self {
647             Self(v)
648         }
649     }
650 
651     #[test]
ms_partial_cmp()652     fn ms_partial_cmp() {
653         #![allow(clippy::eq_op)]
654         let myriad: MsecDuration = 10_000.into();
655         let lakh: MsecDuration = 100_000.into();
656         let crore: MsecDuration = 10_000_000.into();
657 
658         assert!(myriad < lakh);
659         assert!(myriad == myriad);
660         assert!(crore > lakh);
661         assert!(crore >= crore);
662         assert!(crore <= crore);
663     }
664 
665     #[test]
history_lowlev()666     fn history_lowlev() {
667         assert_eq!(History::bucket_center(1.into()), 5.into());
668         assert_eq!(History::bucket_center(903.into()), 905.into());
669         assert_eq!(History::bucket_center(0.into()), 5.into());
670         assert_eq!(History::bucket_center(u32::MAX.into()), 4294967295.into());
671 
672         let mut h = History::new_empty();
673         h.inc_bucket(7.into());
674         h.inc_bucket(8.into());
675         h.inc_bucket(9.into());
676         h.inc_bucket(10.into());
677         h.inc_bucket(11.into());
678         h.inc_bucket(12.into());
679         h.inc_bucket(13.into());
680         h.inc_bucket(299.into());
681         assert_eq!(h.time_histogram.get(&5.into()), Some(&3));
682         assert_eq!(h.time_histogram.get(&15.into()), Some(&4));
683         assert_eq!(h.time_histogram.get(&25.into()), None);
684         assert_eq!(h.time_histogram.get(&295.into()), Some(&1));
685 
686         h.dec_bucket(299.into());
687         h.dec_bucket(24.into());
688         h.dec_bucket(12.into());
689 
690         assert_eq!(h.time_histogram.get(&15.into()), Some(&3));
691         assert_eq!(h.time_histogram.get(&25.into()), None);
692         assert_eq!(h.time_histogram.get(&295.into()), None);
693 
694         h.add_success(true);
695         h.add_success(false);
696         assert_eq!(h.success_history.len(), 2);
697 
698         h.clear();
699         assert_eq!(h.time_histogram.len(), 0);
700         assert_eq!(h.time_history.len(), 0);
701         assert_eq!(h.success_history.len(), 0);
702     }
703 
704     #[test]
time_observation_management()705     fn time_observation_management() {
706         let mut h = History::new_empty();
707         h.set_time_history_len(8); // to make it easier to overflow.
708 
709         h.add_time(300.into());
710         h.add_time(500.into());
711         h.add_time(542.into());
712         h.add_time(305.into());
713         h.add_time(543.into());
714         h.add_time(307.into());
715 
716         assert_eq!(h.n_times(), 6);
717         let v = h.n_most_frequent_bins(10);
718         assert_eq!(&v[..], [(305.into(), 3), (545.into(), 2), (505.into(), 1)]);
719         let v = h.n_most_frequent_bins(2);
720         assert_eq!(&v[..], [(305.into(), 3), (545.into(), 2)]);
721 
722         let v: Vec<_> = h.sparse_histogram().collect();
723         assert_eq!(&v[..], [(305.into(), 3), (505.into(), 1), (545.into(), 2)]);
724 
725         h.add_time(212.into());
726         h.add_time(203.into());
727         // now we replace the first couple of older elements.
728         h.add_time(617.into());
729         h.add_time(413.into());
730 
731         assert_eq!(h.n_times(), 8);
732 
733         let v: Vec<_> = h.sparse_histogram().collect();
734         assert_eq!(
735             &v[..],
736             [
737                 (205.into(), 1),
738                 (215.into(), 1),
739                 (305.into(), 2),
740                 (415.into(), 1),
741                 (545.into(), 2),
742                 (615.into(), 1)
743             ]
744         );
745 
746         let h2 = History::from_sparse_histogram(v.clone().into_iter());
747         let v2: Vec<_> = h2.sparse_histogram().collect();
748         assert_eq!(v, v2);
749     }
750 
751     #[test]
success_observation_mechanism()752     fn success_observation_mechanism() {
753         let mut h = History::new_empty();
754         h.set_success_history_len(20);
755 
756         assert_eq!(h.n_recent_timeouts(), 0);
757         h.add_success(true);
758         assert_eq!(h.n_recent_timeouts(), 0);
759         h.add_success(false);
760         assert_eq!(h.n_recent_timeouts(), 1);
761         for _ in 0..200 {
762             h.add_success(false);
763         }
764         assert_eq!(h.n_recent_timeouts(), 20);
765         h.add_success(true);
766         h.add_success(true);
767         h.add_success(true);
768         assert_eq!(h.n_recent_timeouts(), 20 - 3);
769 
770         h.set_success_history_len(10);
771         assert_eq!(h.n_recent_timeouts(), 10 - 3);
772     }
773 
774     #[test]
xm_calculation()775     fn xm_calculation() {
776         let mut h = History::new_empty();
777         assert_eq!(h.estimate_xm(2), None);
778 
779         for n in &[300, 500, 542, 305, 543, 307, 212, 203, 617, 413] {
780             h.add_time(MsecDuration(*n));
781         }
782 
783         let v = h.n_most_frequent_bins(2);
784         assert_eq!(&v[..], [(305.into(), 3), (545.into(), 2)]);
785         let est = (305 * 3 + 545 * 2) / 5;
786         assert_eq!(h.estimate_xm(2), Some(est));
787         assert_eq!(est, 401);
788     }
789 
790     #[test]
pareto_estimate()791     fn pareto_estimate() {
792         let mut h = History::new_empty();
793         assert!(h.pareto_estimate(2).is_none());
794 
795         for n in &[300, 500, 542, 305, 543, 307, 212, 203, 617, 413] {
796             h.add_time(MsecDuration(*n));
797         }
798         let expected_log_sum: f64 = [401, 500, 542, 401, 543, 401, 401, 401, 617, 413]
799             .iter()
800             .map(|x| f64::from(*x).ln())
801             .sum();
802         let expected_log_xm: f64 = (401_f64).ln() * 10.0;
803         let expected_alpha = 10.0 / (expected_log_sum - expected_log_xm);
804         let expected_inv_alpha = 1.0 / expected_alpha;
805 
806         let p = h.pareto_estimate(2).unwrap();
807 
808         // We can't do "eq" with floats, so we'll do "very close".
809         assert!((401.0 - p.x_m).abs() < 1.0e-9);
810         assert!((expected_inv_alpha - p.inv_alpha).abs() < 1.0e-9);
811 
812         let q60 = p.quantile(0.60);
813         let q99 = p.quantile(0.99);
814 
815         assert!((q60 - 451.127) < 0.001);
816         assert!((q99 - 724.841) < 0.001);
817     }
818 
819     #[test]
pareto_estimate_timeout()820     fn pareto_estimate_timeout() {
821         let mut est = ParetoTimeoutEstimator::default();
822 
823         assert_eq!(
824             est.timeouts(&b3()),
825             (Duration::from_secs(60), Duration::from_secs(60))
826         );
827         // Set the parameters up to mimic the situation in
828         // `pareto_estimate` above.
829         est.p.min_observations = 0;
830         est.p.n_modes_for_xm = 2;
831         assert_eq!(
832             est.timeouts(&b3()),
833             (Duration::from_secs(60), Duration::from_secs(60))
834         );
835 
836         for msec in &[300, 500, 542, 305, 543, 307, 212, 203, 617, 413] {
837             let d = Duration::from_millis(*msec);
838             est.note_hop_completed(2, d, true);
839         }
840 
841         let t = est.timeouts(&b3());
842         assert_eq!(t.0.as_micros(), 493_169);
843         assert_eq!(t.1.as_micros(), 724_841);
844 
845         let t2 = est.timeouts(&b3());
846         assert_eq!(t2, t);
847 
848         let t2 = est.timeouts(&Action::BuildCircuit { length: 4 });
849         assert_eq!(t2.0, t.0.mul_f64(10.0 / 6.0));
850         assert_eq!(t2.1, t.1.mul_f64(10.0 / 6.0));
851     }
852 
853     #[test]
pareto_estimate_clear()854     fn pareto_estimate_clear() {
855         let mut est = ParetoTimeoutEstimator::default();
856 
857         // Set the parameters up to mimic the situation in
858         // `pareto_estimate` above.
859         let params = NetParameters::from_map(&"cbtmincircs=1 cbtnummodes=2".parse().unwrap());
860         est.update_params(&params);
861 
862         assert_eq!(est.timeouts(&b3()).0.as_micros(), 60_000_000);
863         assert!(est.learning_timeouts());
864 
865         for msec in &[300, 500, 542, 305, 543, 307, 212, 203, 617, 413] {
866             let d = Duration::from_millis(*msec);
867             est.note_hop_completed(2, d, true);
868         }
869         assert_ne!(est.timeouts(&b3()).0.as_micros(), 60_000_000);
870         assert!(!est.learning_timeouts());
871         assert_eq!(est.history.n_recent_timeouts(), 0);
872 
873         // 17 timeouts happen and we're still getting real numbers...
874         for _ in 0..18 {
875             est.note_circ_timeout(2, Duration::from_secs(2000));
876         }
877         assert_ne!(est.timeouts(&b3()).0.as_micros(), 60_000_000);
878 
879         // ... but 18 means "reset".
880         est.note_circ_timeout(2, Duration::from_secs(2000));
881         assert_eq!(est.timeouts(&b3()).0.as_micros(), 60_000_000);
882 
883         // And if we fail 18 bunch more times, it doubles.
884         for _ in 0..20 {
885             est.note_circ_timeout(2, Duration::from_secs(2000));
886         }
887         assert_eq!(est.timeouts(&b3()).0.as_micros(), 120_000_000);
888     }
889 
890     #[test]
default_params()891     fn default_params() {
892         let p1 = Params::default();
893         let p2 = Params::from(&tor_netdir::params::NetParameters::default());
894         // discount version of derive(eq)
895         assert_eq!(format!("{:?}", p1), format!("{:?}", p2));
896     }
897 
898     #[test]
state_conversion()899     fn state_conversion() {
900         // We have tests elsewhere for converting to and from
901         // histograms, so all we really need to ddo here is make sure
902         // that the histogram conversion happens.
903 
904         use rand::Rng;
905         let mut est = ParetoTimeoutEstimator::default();
906         let mut rng = rand::thread_rng();
907         for _ in 0..1000 {
908             let d = Duration::from_millis(rng.gen_range(10..3_000));
909             est.note_hop_completed(2, d, true);
910         }
911 
912         let state = est.build_state().unwrap();
913         assert_eq!(state.version, 1);
914         assert!(state.current_timeout.is_some());
915 
916         let mut est2 = ParetoTimeoutEstimator::from_state(state);
917         let act = Action::BuildCircuit { length: 3 };
918         // This isn't going to be exact, since we're recording histogram bins
919         // instead of exact timeouts.
920         let ms1 = est.timeouts(&act).0.as_millis() as i32;
921         let ms2 = est2.timeouts(&act).0.as_millis() as i32;
922         assert!((ms1 - ms2).abs() < 50);
923     }
924 
925     // TODO: add tests from Tor.
926 }
927