1 //! Functions to download or load directory objects, using the
2 //! state machines in the `states` module.
3 
4 use std::{
5     collections::HashMap,
6     sync::{Arc, Weak},
7     time::{Duration, SystemTime},
8 };
9 
10 use crate::{
11     docid::{self, ClientRequest},
12     upgrade_weak_ref, DirMgr, DirState, DocId, DocumentText, Error, Readiness, Result,
13 };
14 
15 use futures::channel::oneshot;
16 use futures::FutureExt;
17 use futures::StreamExt;
18 use tor_dirclient::DirResponse;
19 use tor_rtcompat::{Runtime, SleepProviderExt};
20 use tracing::{info, trace, warn};
21 
22 #[cfg(test)]
23 use once_cell::sync::Lazy;
24 #[cfg(test)]
25 use std::sync::Mutex;
26 
27 /// Try to read a set of documents from `dirmgr` by ID.
load_all<R: Runtime>( dirmgr: &DirMgr<R>, missing: Vec<DocId>, ) -> Result<HashMap<DocId, DocumentText>>28 fn load_all<R: Runtime>(
29     dirmgr: &DirMgr<R>,
30     missing: Vec<DocId>,
31 ) -> Result<HashMap<DocId, DocumentText>> {
32     let mut loaded = HashMap::new();
33     for query in docid::partition_by_type(missing.into_iter()).values() {
34         dirmgr.load_documents_into(query, &mut loaded)?;
35     }
36     Ok(loaded)
37 }
38 
39 /// Testing helper: if this is Some, then we return it in place of any
40 /// response to fetch_single.
41 ///
42 /// Note that only one test uses this: otherwise there would be a race
43 /// condition. :p
44 #[cfg(test)]
45 static CANNED_RESPONSE: Lazy<Mutex<Option<String>>> = Lazy::new(|| Mutex::new(None));
46 
47 /// Launch a single client request and get an associated response.
fetch_single<R: Runtime>( dirmgr: Arc<DirMgr<R>>, request: ClientRequest, ) -> Result<(ClientRequest, DirResponse)>48 async fn fetch_single<R: Runtime>(
49     dirmgr: Arc<DirMgr<R>>,
50     request: ClientRequest,
51 ) -> Result<(ClientRequest, DirResponse)> {
52     #[cfg(test)]
53     {
54         let m = CANNED_RESPONSE.lock().expect("Poisoned mutex");
55         if let Some(s) = m.as_ref() {
56             return Ok((request, DirResponse::from_body(s)));
57         }
58     }
59     let circmgr = dirmgr.circmgr()?;
60     let cur_netdir = dirmgr.opt_netdir();
61     let dirinfo = match cur_netdir {
62         Some(ref netdir) => netdir.as_ref().into(),
63         None => dirmgr.config.fallbacks().into(),
64     };
65     let resource =
66         tor_dirclient::get_resource(request.as_requestable(), dirinfo, &dirmgr.runtime, circmgr)
67             .await?;
68 
69     Ok((request, resource))
70 }
71 
72 /// Launch a set of download requests for a set of missing objects in
73 /// `missing`, and return each request along with the response it received.
74 ///
75 /// Don't launch more than `parallelism` requests at once.
fetch_multiple<R: Runtime>( dirmgr: Arc<DirMgr<R>>, missing: Vec<DocId>, parallelism: usize, ) -> Result<Vec<(ClientRequest, DirResponse)>>76 async fn fetch_multiple<R: Runtime>(
77     dirmgr: Arc<DirMgr<R>>,
78     missing: Vec<DocId>,
79     parallelism: usize,
80 ) -> Result<Vec<(ClientRequest, DirResponse)>> {
81     let mut requests = Vec::new();
82     for (_type, query) in docid::partition_by_type(missing.into_iter()) {
83         requests.extend(dirmgr.query_into_requests(query)?);
84     }
85 
86     // TODO: instead of waiting for all the queries to finish, we
87     // could stream the responses back or something.
88     let responses: Vec<Result<(ClientRequest, DirResponse)>> = futures::stream::iter(requests)
89         .map(|query| fetch_single(Arc::clone(&dirmgr), query))
90         .buffer_unordered(parallelism)
91         .collect()
92         .await;
93 
94     let mut useful_responses = Vec::new();
95     for r in responses {
96         match r {
97             Ok(x) => useful_responses.push(x),
98             // TODO: in this case we might want to stop using this source.
99             Err(e) => warn!("error while downloading: {:?}", e),
100         }
101     }
102 
103     Ok(useful_responses)
104 }
105 
106 /// Try tp update `state` by loading cached information from `dirmgr`.
107 /// Return true if anything changed.
load_once<R: Runtime>( dirmgr: &Arc<DirMgr<R>>, state: &mut Box<dyn DirState>, ) -> Result<bool>108 async fn load_once<R: Runtime>(
109     dirmgr: &Arc<DirMgr<R>>,
110     state: &mut Box<dyn DirState>,
111 ) -> Result<bool> {
112     let missing = state.missing_docs();
113     let outcome = if missing.is_empty() {
114         trace!("Found no missing documents; can't advance current state");
115         Ok(false)
116     } else {
117         trace!(
118             "Found {} missing documents; trying to load them",
119             missing.len()
120         );
121         let documents = load_all(dirmgr, missing)?;
122         state.add_from_cache(documents, dirmgr.store_if_rw())
123     };
124     dirmgr.notify().await;
125     outcome
126 }
127 
128 /// Try to load as much state as possible for a provided `state` from the
129 /// cache in `dirmgr`, advancing the state to the extent possible.
130 ///
131 /// No downloads are performed; the provided state will not be reset.
load<R: Runtime>( dirmgr: Arc<DirMgr<R>>, mut state: Box<dyn DirState>, ) -> Result<Box<dyn DirState>>132 pub(crate) async fn load<R: Runtime>(
133     dirmgr: Arc<DirMgr<R>>,
134     mut state: Box<dyn DirState>,
135 ) -> Result<Box<dyn DirState>> {
136     let mut safety_counter = 0_usize;
137     loop {
138         trace!(state=%state.describe(), "Loading from cache");
139         let changed = load_once(&dirmgr, &mut state).await?;
140 
141         if state.can_advance() {
142             state = state.advance()?;
143             dirmgr.notify().await;
144             safety_counter = 0;
145         } else {
146             if !changed {
147                 break;
148             }
149             safety_counter += 1;
150             assert!(
151                 safety_counter < 100,
152                 "Spent 100 iterations in the same state: this is a bug"
153             );
154         }
155     }
156 
157     Ok(state)
158 }
159 
160 /// Helper: Make a set of download attempts for the current directory state,
161 /// and on success feed their results into the state object.
162 ///
163 /// This can launch one or more download requests, but will not launch more
164 /// than `parallelism` requests at a time.
165 ///
166 /// Return true if the state reports that it changed.
download_attempt<R: Runtime>( dirmgr: &Arc<DirMgr<R>>, state: &mut Box<dyn DirState>, parallelism: usize, ) -> Result<bool>167 async fn download_attempt<R: Runtime>(
168     dirmgr: &Arc<DirMgr<R>>,
169     state: &mut Box<dyn DirState>,
170     parallelism: usize,
171 ) -> Result<bool> {
172     let mut changed = false;
173     let missing = state.missing_docs();
174     let fetched = fetch_multiple(Arc::clone(dirmgr), missing, parallelism).await?;
175     for (client_req, dir_response) in fetched {
176         let text = String::from_utf8(dir_response.into_output())?;
177         match dirmgr.expand_response_text(&client_req, text) {
178             Ok(text) => {
179                 let outcome = state.add_from_download(&text, &client_req, Some(&dirmgr.store));
180                 dirmgr.notify().await;
181                 match outcome {
182                     Ok(b) => changed |= b,
183                     // TODO: in this case we might want to stop using this source.
184                     Err(e) => warn!("error while adding directory info: {}", e),
185                 }
186             }
187             Err(e) => {
188                 // TODO: in this case we might want to stop using this source.
189                 warn!("Error when expanding directory text: {}", e);
190             }
191         }
192     }
193 
194     Ok(changed)
195 }
196 
197 /// Download information into a DirState state machine until it is
198 /// ["complete"](Readiness::Complete), or until we hit a
199 /// non-recoverable error.
200 ///
201 /// Use `dirmgr` to load from the cache or to launch downloads.
202 ///
203 /// Keep resetting the state as needed.
204 ///
205 /// The first time that the state becomes ["usable"](Readiness::Usable),
206 /// notify the sender in `on_usable`.
207 ///
208 /// Return Err only on a non-recoverable error.  On an error that
209 /// merits another bootstrap attempt with the same state, return the
210 /// state and an Error object in an option.
download<R: Runtime>( dirmgr: Weak<DirMgr<R>>, mut state: Box<dyn DirState>, on_usable: &mut Option<oneshot::Sender<()>>, ) -> Result<(Box<dyn DirState>, Option<Error>)>211 pub(crate) async fn download<R: Runtime>(
212     dirmgr: Weak<DirMgr<R>>,
213     mut state: Box<dyn DirState>,
214     on_usable: &mut Option<oneshot::Sender<()>>,
215 ) -> Result<(Box<dyn DirState>, Option<Error>)> {
216     let runtime = upgrade_weak_ref(&dirmgr)?.runtime.clone();
217 
218     'next_state: loop {
219         let retry_config = state.dl_config()?;
220         let parallelism = retry_config.parallelism();
221 
222         // In theory this could be inside the loop below maybe?  If we
223         // want to drop the restriction that the missing() members of a
224         // state must never grow, then we'll need to move it inside.
225         {
226             let dirmgr = upgrade_weak_ref(&dirmgr)?;
227             load_once(&dirmgr, &mut state).await?;
228         }
229 
230         // Skip the downloads if we can...
231         if state.can_advance() {
232             state = state.advance()?;
233             continue 'next_state;
234         }
235         if state.is_ready(Readiness::Complete) {
236             return Ok((state, None));
237         }
238 
239         let mut retry = retry_config.schedule();
240 
241         // Make several attempts to fetch whatever we're missing,
242         // until either we can advance, or we've got a complete
243         // document, or we run out of tries, or we run out of time.
244         'next_attempt: for attempt in retry_config.attempts() {
245             info!("{}: {}", attempt + 1, state.describe());
246             let reset_time = no_more_than_a_week_from(SystemTime::now(), state.reset_time());
247 
248             {
249                 let dirmgr = upgrade_weak_ref(&dirmgr)?;
250                 futures::select_biased! {
251                     outcome = download_attempt(&dirmgr, &mut state, parallelism.into()).fuse() => {
252                         match outcome {
253                             Err(e) => {
254                                 warn!("Error while downloading: {}", e);
255                                 continue 'next_attempt;
256                             }
257                             Ok(changed) => {
258                                 changed
259                             }
260                         }
261                     }
262                     _ = runtime.sleep_until_wallclock(reset_time).fuse() => {
263                         // We need to reset. This can happen if (for
264                         // example) we're downloading the last few
265                         // microdescriptors on a consensus that now
266                         // we're ready to replace.
267                         state = state.reset()?;
268                         continue 'next_state;
269                     },
270                 };
271             }
272 
273             // Exit if there is nothing more to download.
274             if state.is_ready(Readiness::Complete) {
275                 return Ok((state, None));
276             }
277 
278             // Report usable-ness if appropriate.
279             if on_usable.is_some() && state.is_ready(Readiness::Usable) {
280                 // Unwrap should be safe due to parent `.is_some()` check
281                 #[allow(clippy::unwrap_used)]
282                 let _ = on_usable.take().unwrap().send(());
283             }
284 
285             if state.can_advance() {
286                 // We have enough info to advance to another state.
287                 state = state.advance()?;
288                 upgrade_weak_ref(&dirmgr)?.notify().await;
289                 continue 'next_state;
290             } else {
291                 // We should wait a bit, and then retry.
292                 // TODO: we shouldn't wait on the final attempt.
293                 let reset_time = no_more_than_a_week_from(SystemTime::now(), state.reset_time());
294                 let delay = retry.next_delay(&mut rand::thread_rng());
295                 futures::select_biased! {
296                     _ = runtime.sleep_until_wallclock(reset_time).fuse() => {
297                         state = state.reset()?;
298                         continue 'next_state;
299                     }
300                     _ = FutureExt::fuse(runtime.sleep(delay)) => {}
301                 };
302             }
303         }
304 
305         // We didn't advance the state, after all the retries.
306         warn!(n_attempts=retry_config.n_attempts(),
307               state=%state.describe(),
308               "Unable to advance downloading state");
309         return Ok((state, Some(Error::CantAdvanceState)));
310     }
311 }
312 
313 /// Helper: Clamp `v` so that it is no more than one week from `now`.
314 ///
315 /// If `v` is absent, return the time that's one week from now.
316 ///
317 /// We use this to determine a reset time when no reset time is
318 /// available, or when it is too far in the future.
no_more_than_a_week_from(now: SystemTime, v: Option<SystemTime>) -> SystemTime319 fn no_more_than_a_week_from(now: SystemTime, v: Option<SystemTime>) -> SystemTime {
320     let one_week_later = now + Duration::new(86400 * 7, 0);
321     match v {
322         Some(t) => std::cmp::min(t, one_week_later),
323         None => one_week_later,
324     }
325 }
326 
327 #[cfg(test)]
328 mod test {
329     #![allow(clippy::unwrap_used)]
330     use super::*;
331     use crate::test::new_mgr;
332     use crate::{DownloadSchedule, SqliteStore};
333     use std::convert::TryInto;
334     use std::sync::Mutex;
335     use tor_netdoc::doc::microdesc::MdDigest;
336 
337     #[test]
week()338     fn week() {
339         let now = SystemTime::now();
340         let one_day = Duration::new(86400, 0);
341 
342         assert_eq!(no_more_than_a_week_from(now, None), now + one_day * 7);
343         assert_eq!(
344             no_more_than_a_week_from(now, Some(now + one_day)),
345             now + one_day
346         );
347         assert_eq!(
348             no_more_than_a_week_from(now, Some(now - one_day)),
349             now - one_day
350         );
351         assert_eq!(
352             no_more_than_a_week_from(now, Some(now + 30 * one_day)),
353             now + one_day * 7
354         );
355     }
356 
357     /// A fake implementation of DirState that just wants a fixed set
358     /// of microdescriptors.  It doesn't care if it gets them: it just
359     /// wants to be told that the IDs exist.
360     #[derive(Debug, Clone)]
361     struct DemoState {
362         second_time_around: bool,
363         got_items: HashMap<MdDigest, bool>,
364     }
365 
366     // Constants from Lou Reed
367     const H1: MdDigest = *b"satellite's gone up to the skies";
368     const H2: MdDigest = *b"things like that drive me out of";
369     const H3: MdDigest = *b"my mind i watched it for a littl";
370     const H4: MdDigest = *b"while i like to watch things on ";
371     const H5: MdDigest = *b"TV Satellite of love Satellite--";
372 
373     impl DemoState {
new1() -> Self374         fn new1() -> Self {
375             DemoState {
376                 second_time_around: false,
377                 got_items: vec![(H1, false), (H2, false)].into_iter().collect(),
378             }
379         }
new2() -> Self380         fn new2() -> Self {
381             DemoState {
382                 second_time_around: true,
383                 got_items: vec![(H3, false), (H4, false), (H5, false)]
384                     .into_iter()
385                     .collect(),
386             }
387         }
n_ready(&self) -> usize388         fn n_ready(&self) -> usize {
389             self.got_items.values().filter(|x| **x).count()
390         }
391     }
392 
393     impl DirState for DemoState {
describe(&self) -> String394         fn describe(&self) -> String {
395             format!("{:?}", &self)
396         }
is_ready(&self, ready: Readiness) -> bool397         fn is_ready(&self, ready: Readiness) -> bool {
398             match (ready, self.second_time_around) {
399                 (_, false) => false,
400                 (Readiness::Complete, true) => self.n_ready() == self.got_items.len(),
401                 (Readiness::Usable, true) => self.n_ready() >= self.got_items.len() - 1,
402             }
403         }
can_advance(&self) -> bool404         fn can_advance(&self) -> bool {
405             if self.second_time_around {
406                 false
407             } else {
408                 self.n_ready() == self.got_items.len()
409             }
410         }
missing_docs(&self) -> Vec<DocId>411         fn missing_docs(&self) -> Vec<DocId> {
412             self.got_items
413                 .iter()
414                 .filter_map(|(id, have)| {
415                     if *have {
416                         None
417                     } else {
418                         Some(DocId::Microdesc(*id))
419                     }
420                 })
421                 .collect()
422         }
add_from_cache( &mut self, docs: HashMap<DocId, DocumentText>, _storage: Option<&Mutex<SqliteStore>>, ) -> Result<bool>423         fn add_from_cache(
424             &mut self,
425             docs: HashMap<DocId, DocumentText>,
426             _storage: Option<&Mutex<SqliteStore>>,
427         ) -> Result<bool> {
428             let mut changed = false;
429             for id in docs.keys() {
430                 if let DocId::Microdesc(id) = id {
431                     if self.got_items.get(id) == Some(&false) {
432                         self.got_items.insert(*id, true);
433                         changed = true;
434                     }
435                 }
436             }
437             Ok(changed)
438         }
add_from_download( &mut self, text: &str, _request: &ClientRequest, _storage: Option<&Mutex<SqliteStore>>, ) -> Result<bool>439         fn add_from_download(
440             &mut self,
441             text: &str,
442             _request: &ClientRequest,
443             _storage: Option<&Mutex<SqliteStore>>,
444         ) -> Result<bool> {
445             let mut changed = false;
446             for token in text.split_ascii_whitespace() {
447                 if let Ok(v) = hex::decode(token) {
448                     if let Ok(id) = v.try_into() {
449                         if self.got_items.get(&id) == Some(&false) {
450                             self.got_items.insert(id, true);
451                             changed = true;
452                         }
453                     }
454                 }
455             }
456             Ok(changed)
457         }
dl_config(&self) -> Result<DownloadSchedule>458         fn dl_config(&self) -> Result<DownloadSchedule> {
459             Ok(DownloadSchedule::default())
460         }
advance(self: Box<Self>) -> Result<Box<dyn DirState>>461         fn advance(self: Box<Self>) -> Result<Box<dyn DirState>> {
462             if self.can_advance() {
463                 Ok(Box::new(Self::new2()))
464             } else {
465                 Ok(self)
466             }
467         }
reset_time(&self) -> Option<SystemTime>468         fn reset_time(&self) -> Option<SystemTime> {
469             None
470         }
reset(self: Box<Self>) -> Result<Box<dyn DirState>>471         fn reset(self: Box<Self>) -> Result<Box<dyn DirState>> {
472             Ok(Box::new(Self::new1()))
473         }
474     }
475 
476     #[test]
all_in_cache()477     fn all_in_cache() {
478         // Let's try bootstrapping when everything is in the cache.
479         tor_rtcompat::test_with_one_runtime!(|rt| async {
480             let (_tempdir, mgr) = new_mgr(rt);
481 
482             {
483                 let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
484                 for h in [H1, H2, H3, H4, H5] {
485                     store
486                         .store_microdescs(vec![("ignore", &h)], SystemTime::now())
487                         .unwrap();
488                 }
489             }
490             let mgr = Arc::new(mgr);
491 
492             // Try just a load.
493             let state = Box::new(DemoState::new1());
494             let result = super::load(Arc::clone(&mgr), state).await.unwrap();
495             assert!(result.is_ready(Readiness::Complete));
496 
497             // Try a bootstrap that could (but won't!) download.
498             let state = Box::new(DemoState::new1());
499 
500             let mut on_usable = None;
501             let result = super::download(Arc::downgrade(&mgr), state, &mut on_usable)
502                 .await
503                 .unwrap();
504             assert!(result.0.is_ready(Readiness::Complete));
505         });
506     }
507 
508     #[test]
partly_in_cache()509     fn partly_in_cache() {
510         // Let's try bootstrapping with all of phase1 and part of
511         // phase 2 in cache.
512         tor_rtcompat::test_with_one_runtime!(|rt| async {
513             let (_tempdir, mgr) = new_mgr(rt);
514 
515             {
516                 let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
517                 for h in [H1, H2, H3] {
518                     store
519                         .store_microdescs(vec![("ignore", &h)], SystemTime::now())
520                         .unwrap();
521                 }
522             }
523             {
524                 let mut resp = CANNED_RESPONSE.lock().unwrap();
525                 // H4 and H5.
526                 *resp = Some(
527                     "7768696c652069206c696b6520746f207761746368207468696e6773206f6e20
528                      545620536174656c6c697465206f66206c6f766520536174656c6c6974652d2d"
529                         .to_owned(),
530                 );
531             }
532             let mgr = Arc::new(mgr);
533             let mut on_usable = None;
534 
535             let state = Box::new(DemoState::new1());
536             let result = super::download(Arc::downgrade(&mgr), state, &mut on_usable)
537                 .await
538                 .unwrap();
539             assert!(result.0.is_ready(Readiness::Complete));
540         });
541     }
542 }
543