1 //! Functions to download or load directory objects, using the
2 //! state machines in the `states` module.
3
4 use std::{
5 collections::HashMap,
6 sync::{Arc, Weak},
7 time::{Duration, SystemTime},
8 };
9
10 use crate::{
11 docid::{self, ClientRequest},
12 upgrade_weak_ref, DirMgr, DirState, DocId, DocumentText, Error, Readiness, Result,
13 };
14
15 use futures::channel::oneshot;
16 use futures::FutureExt;
17 use futures::StreamExt;
18 use tor_dirclient::DirResponse;
19 use tor_rtcompat::{Runtime, SleepProviderExt};
20 use tracing::{info, trace, warn};
21
22 #[cfg(test)]
23 use once_cell::sync::Lazy;
24 #[cfg(test)]
25 use std::sync::Mutex;
26
27 /// Try to read a set of documents from `dirmgr` by ID.
load_all<R: Runtime>( dirmgr: &DirMgr<R>, missing: Vec<DocId>, ) -> Result<HashMap<DocId, DocumentText>>28 fn load_all<R: Runtime>(
29 dirmgr: &DirMgr<R>,
30 missing: Vec<DocId>,
31 ) -> Result<HashMap<DocId, DocumentText>> {
32 let mut loaded = HashMap::new();
33 for query in docid::partition_by_type(missing.into_iter()).values() {
34 dirmgr.load_documents_into(query, &mut loaded)?;
35 }
36 Ok(loaded)
37 }
38
39 /// Testing helper: if this is Some, then we return it in place of any
40 /// response to fetch_single.
41 ///
42 /// Note that only one test uses this: otherwise there would be a race
43 /// condition. :p
44 #[cfg(test)]
45 static CANNED_RESPONSE: Lazy<Mutex<Option<String>>> = Lazy::new(|| Mutex::new(None));
46
47 /// Launch a single client request and get an associated response.
fetch_single<R: Runtime>( dirmgr: Arc<DirMgr<R>>, request: ClientRequest, ) -> Result<(ClientRequest, DirResponse)>48 async fn fetch_single<R: Runtime>(
49 dirmgr: Arc<DirMgr<R>>,
50 request: ClientRequest,
51 ) -> Result<(ClientRequest, DirResponse)> {
52 #[cfg(test)]
53 {
54 let m = CANNED_RESPONSE.lock().expect("Poisoned mutex");
55 if let Some(s) = m.as_ref() {
56 return Ok((request, DirResponse::from_body(s)));
57 }
58 }
59 let circmgr = dirmgr.circmgr()?;
60 let cur_netdir = dirmgr.opt_netdir();
61 let dirinfo = match cur_netdir {
62 Some(ref netdir) => netdir.as_ref().into(),
63 None => dirmgr.config.fallbacks().into(),
64 };
65 let resource =
66 tor_dirclient::get_resource(request.as_requestable(), dirinfo, &dirmgr.runtime, circmgr)
67 .await?;
68
69 Ok((request, resource))
70 }
71
72 /// Launch a set of download requests for a set of missing objects in
73 /// `missing`, and return each request along with the response it received.
74 ///
75 /// Don't launch more than `parallelism` requests at once.
fetch_multiple<R: Runtime>( dirmgr: Arc<DirMgr<R>>, missing: Vec<DocId>, parallelism: usize, ) -> Result<Vec<(ClientRequest, DirResponse)>>76 async fn fetch_multiple<R: Runtime>(
77 dirmgr: Arc<DirMgr<R>>,
78 missing: Vec<DocId>,
79 parallelism: usize,
80 ) -> Result<Vec<(ClientRequest, DirResponse)>> {
81 let mut requests = Vec::new();
82 for (_type, query) in docid::partition_by_type(missing.into_iter()) {
83 requests.extend(dirmgr.query_into_requests(query)?);
84 }
85
86 // TODO: instead of waiting for all the queries to finish, we
87 // could stream the responses back or something.
88 let responses: Vec<Result<(ClientRequest, DirResponse)>> = futures::stream::iter(requests)
89 .map(|query| fetch_single(Arc::clone(&dirmgr), query))
90 .buffer_unordered(parallelism)
91 .collect()
92 .await;
93
94 let mut useful_responses = Vec::new();
95 for r in responses {
96 match r {
97 Ok(x) => useful_responses.push(x),
98 // TODO: in this case we might want to stop using this source.
99 Err(e) => warn!("error while downloading: {:?}", e),
100 }
101 }
102
103 Ok(useful_responses)
104 }
105
106 /// Try tp update `state` by loading cached information from `dirmgr`.
107 /// Return true if anything changed.
load_once<R: Runtime>( dirmgr: &Arc<DirMgr<R>>, state: &mut Box<dyn DirState>, ) -> Result<bool>108 async fn load_once<R: Runtime>(
109 dirmgr: &Arc<DirMgr<R>>,
110 state: &mut Box<dyn DirState>,
111 ) -> Result<bool> {
112 let missing = state.missing_docs();
113 let outcome = if missing.is_empty() {
114 trace!("Found no missing documents; can't advance current state");
115 Ok(false)
116 } else {
117 trace!(
118 "Found {} missing documents; trying to load them",
119 missing.len()
120 );
121 let documents = load_all(dirmgr, missing)?;
122 state.add_from_cache(documents, dirmgr.store_if_rw())
123 };
124 dirmgr.notify().await;
125 outcome
126 }
127
128 /// Try to load as much state as possible for a provided `state` from the
129 /// cache in `dirmgr`, advancing the state to the extent possible.
130 ///
131 /// No downloads are performed; the provided state will not be reset.
load<R: Runtime>( dirmgr: Arc<DirMgr<R>>, mut state: Box<dyn DirState>, ) -> Result<Box<dyn DirState>>132 pub(crate) async fn load<R: Runtime>(
133 dirmgr: Arc<DirMgr<R>>,
134 mut state: Box<dyn DirState>,
135 ) -> Result<Box<dyn DirState>> {
136 let mut safety_counter = 0_usize;
137 loop {
138 trace!(state=%state.describe(), "Loading from cache");
139 let changed = load_once(&dirmgr, &mut state).await?;
140
141 if state.can_advance() {
142 state = state.advance()?;
143 dirmgr.notify().await;
144 safety_counter = 0;
145 } else {
146 if !changed {
147 break;
148 }
149 safety_counter += 1;
150 assert!(
151 safety_counter < 100,
152 "Spent 100 iterations in the same state: this is a bug"
153 );
154 }
155 }
156
157 Ok(state)
158 }
159
160 /// Helper: Make a set of download attempts for the current directory state,
161 /// and on success feed their results into the state object.
162 ///
163 /// This can launch one or more download requests, but will not launch more
164 /// than `parallelism` requests at a time.
165 ///
166 /// Return true if the state reports that it changed.
download_attempt<R: Runtime>( dirmgr: &Arc<DirMgr<R>>, state: &mut Box<dyn DirState>, parallelism: usize, ) -> Result<bool>167 async fn download_attempt<R: Runtime>(
168 dirmgr: &Arc<DirMgr<R>>,
169 state: &mut Box<dyn DirState>,
170 parallelism: usize,
171 ) -> Result<bool> {
172 let mut changed = false;
173 let missing = state.missing_docs();
174 let fetched = fetch_multiple(Arc::clone(dirmgr), missing, parallelism).await?;
175 for (client_req, dir_response) in fetched {
176 let text = String::from_utf8(dir_response.into_output())?;
177 match dirmgr.expand_response_text(&client_req, text) {
178 Ok(text) => {
179 let outcome = state.add_from_download(&text, &client_req, Some(&dirmgr.store));
180 dirmgr.notify().await;
181 match outcome {
182 Ok(b) => changed |= b,
183 // TODO: in this case we might want to stop using this source.
184 Err(e) => warn!("error while adding directory info: {}", e),
185 }
186 }
187 Err(e) => {
188 // TODO: in this case we might want to stop using this source.
189 warn!("Error when expanding directory text: {}", e);
190 }
191 }
192 }
193
194 Ok(changed)
195 }
196
197 /// Download information into a DirState state machine until it is
198 /// ["complete"](Readiness::Complete), or until we hit a
199 /// non-recoverable error.
200 ///
201 /// Use `dirmgr` to load from the cache or to launch downloads.
202 ///
203 /// Keep resetting the state as needed.
204 ///
205 /// The first time that the state becomes ["usable"](Readiness::Usable),
206 /// notify the sender in `on_usable`.
207 ///
208 /// Return Err only on a non-recoverable error. On an error that
209 /// merits another bootstrap attempt with the same state, return the
210 /// state and an Error object in an option.
download<R: Runtime>( dirmgr: Weak<DirMgr<R>>, mut state: Box<dyn DirState>, on_usable: &mut Option<oneshot::Sender<()>>, ) -> Result<(Box<dyn DirState>, Option<Error>)>211 pub(crate) async fn download<R: Runtime>(
212 dirmgr: Weak<DirMgr<R>>,
213 mut state: Box<dyn DirState>,
214 on_usable: &mut Option<oneshot::Sender<()>>,
215 ) -> Result<(Box<dyn DirState>, Option<Error>)> {
216 let runtime = upgrade_weak_ref(&dirmgr)?.runtime.clone();
217
218 'next_state: loop {
219 let retry_config = state.dl_config()?;
220 let parallelism = retry_config.parallelism();
221
222 // In theory this could be inside the loop below maybe? If we
223 // want to drop the restriction that the missing() members of a
224 // state must never grow, then we'll need to move it inside.
225 {
226 let dirmgr = upgrade_weak_ref(&dirmgr)?;
227 load_once(&dirmgr, &mut state).await?;
228 }
229
230 // Skip the downloads if we can...
231 if state.can_advance() {
232 state = state.advance()?;
233 continue 'next_state;
234 }
235 if state.is_ready(Readiness::Complete) {
236 return Ok((state, None));
237 }
238
239 let mut retry = retry_config.schedule();
240
241 // Make several attempts to fetch whatever we're missing,
242 // until either we can advance, or we've got a complete
243 // document, or we run out of tries, or we run out of time.
244 'next_attempt: for attempt in retry_config.attempts() {
245 info!("{}: {}", attempt + 1, state.describe());
246 let reset_time = no_more_than_a_week_from(SystemTime::now(), state.reset_time());
247
248 {
249 let dirmgr = upgrade_weak_ref(&dirmgr)?;
250 futures::select_biased! {
251 outcome = download_attempt(&dirmgr, &mut state, parallelism.into()).fuse() => {
252 match outcome {
253 Err(e) => {
254 warn!("Error while downloading: {}", e);
255 continue 'next_attempt;
256 }
257 Ok(changed) => {
258 changed
259 }
260 }
261 }
262 _ = runtime.sleep_until_wallclock(reset_time).fuse() => {
263 // We need to reset. This can happen if (for
264 // example) we're downloading the last few
265 // microdescriptors on a consensus that now
266 // we're ready to replace.
267 state = state.reset()?;
268 continue 'next_state;
269 },
270 };
271 }
272
273 // Exit if there is nothing more to download.
274 if state.is_ready(Readiness::Complete) {
275 return Ok((state, None));
276 }
277
278 // Report usable-ness if appropriate.
279 if on_usable.is_some() && state.is_ready(Readiness::Usable) {
280 // Unwrap should be safe due to parent `.is_some()` check
281 #[allow(clippy::unwrap_used)]
282 let _ = on_usable.take().unwrap().send(());
283 }
284
285 if state.can_advance() {
286 // We have enough info to advance to another state.
287 state = state.advance()?;
288 upgrade_weak_ref(&dirmgr)?.notify().await;
289 continue 'next_state;
290 } else {
291 // We should wait a bit, and then retry.
292 // TODO: we shouldn't wait on the final attempt.
293 let reset_time = no_more_than_a_week_from(SystemTime::now(), state.reset_time());
294 let delay = retry.next_delay(&mut rand::thread_rng());
295 futures::select_biased! {
296 _ = runtime.sleep_until_wallclock(reset_time).fuse() => {
297 state = state.reset()?;
298 continue 'next_state;
299 }
300 _ = FutureExt::fuse(runtime.sleep(delay)) => {}
301 };
302 }
303 }
304
305 // We didn't advance the state, after all the retries.
306 warn!(n_attempts=retry_config.n_attempts(),
307 state=%state.describe(),
308 "Unable to advance downloading state");
309 return Ok((state, Some(Error::CantAdvanceState)));
310 }
311 }
312
313 /// Helper: Clamp `v` so that it is no more than one week from `now`.
314 ///
315 /// If `v` is absent, return the time that's one week from now.
316 ///
317 /// We use this to determine a reset time when no reset time is
318 /// available, or when it is too far in the future.
no_more_than_a_week_from(now: SystemTime, v: Option<SystemTime>) -> SystemTime319 fn no_more_than_a_week_from(now: SystemTime, v: Option<SystemTime>) -> SystemTime {
320 let one_week_later = now + Duration::new(86400 * 7, 0);
321 match v {
322 Some(t) => std::cmp::min(t, one_week_later),
323 None => one_week_later,
324 }
325 }
326
327 #[cfg(test)]
328 mod test {
329 #![allow(clippy::unwrap_used)]
330 use super::*;
331 use crate::test::new_mgr;
332 use crate::{DownloadSchedule, SqliteStore};
333 use std::convert::TryInto;
334 use std::sync::Mutex;
335 use tor_netdoc::doc::microdesc::MdDigest;
336
337 #[test]
week()338 fn week() {
339 let now = SystemTime::now();
340 let one_day = Duration::new(86400, 0);
341
342 assert_eq!(no_more_than_a_week_from(now, None), now + one_day * 7);
343 assert_eq!(
344 no_more_than_a_week_from(now, Some(now + one_day)),
345 now + one_day
346 );
347 assert_eq!(
348 no_more_than_a_week_from(now, Some(now - one_day)),
349 now - one_day
350 );
351 assert_eq!(
352 no_more_than_a_week_from(now, Some(now + 30 * one_day)),
353 now + one_day * 7
354 );
355 }
356
357 /// A fake implementation of DirState that just wants a fixed set
358 /// of microdescriptors. It doesn't care if it gets them: it just
359 /// wants to be told that the IDs exist.
360 #[derive(Debug, Clone)]
361 struct DemoState {
362 second_time_around: bool,
363 got_items: HashMap<MdDigest, bool>,
364 }
365
366 // Constants from Lou Reed
367 const H1: MdDigest = *b"satellite's gone up to the skies";
368 const H2: MdDigest = *b"things like that drive me out of";
369 const H3: MdDigest = *b"my mind i watched it for a littl";
370 const H4: MdDigest = *b"while i like to watch things on ";
371 const H5: MdDigest = *b"TV Satellite of love Satellite--";
372
373 impl DemoState {
new1() -> Self374 fn new1() -> Self {
375 DemoState {
376 second_time_around: false,
377 got_items: vec![(H1, false), (H2, false)].into_iter().collect(),
378 }
379 }
new2() -> Self380 fn new2() -> Self {
381 DemoState {
382 second_time_around: true,
383 got_items: vec![(H3, false), (H4, false), (H5, false)]
384 .into_iter()
385 .collect(),
386 }
387 }
n_ready(&self) -> usize388 fn n_ready(&self) -> usize {
389 self.got_items.values().filter(|x| **x).count()
390 }
391 }
392
393 impl DirState for DemoState {
describe(&self) -> String394 fn describe(&self) -> String {
395 format!("{:?}", &self)
396 }
is_ready(&self, ready: Readiness) -> bool397 fn is_ready(&self, ready: Readiness) -> bool {
398 match (ready, self.second_time_around) {
399 (_, false) => false,
400 (Readiness::Complete, true) => self.n_ready() == self.got_items.len(),
401 (Readiness::Usable, true) => self.n_ready() >= self.got_items.len() - 1,
402 }
403 }
can_advance(&self) -> bool404 fn can_advance(&self) -> bool {
405 if self.second_time_around {
406 false
407 } else {
408 self.n_ready() == self.got_items.len()
409 }
410 }
missing_docs(&self) -> Vec<DocId>411 fn missing_docs(&self) -> Vec<DocId> {
412 self.got_items
413 .iter()
414 .filter_map(|(id, have)| {
415 if *have {
416 None
417 } else {
418 Some(DocId::Microdesc(*id))
419 }
420 })
421 .collect()
422 }
add_from_cache( &mut self, docs: HashMap<DocId, DocumentText>, _storage: Option<&Mutex<SqliteStore>>, ) -> Result<bool>423 fn add_from_cache(
424 &mut self,
425 docs: HashMap<DocId, DocumentText>,
426 _storage: Option<&Mutex<SqliteStore>>,
427 ) -> Result<bool> {
428 let mut changed = false;
429 for id in docs.keys() {
430 if let DocId::Microdesc(id) = id {
431 if self.got_items.get(id) == Some(&false) {
432 self.got_items.insert(*id, true);
433 changed = true;
434 }
435 }
436 }
437 Ok(changed)
438 }
add_from_download( &mut self, text: &str, _request: &ClientRequest, _storage: Option<&Mutex<SqliteStore>>, ) -> Result<bool>439 fn add_from_download(
440 &mut self,
441 text: &str,
442 _request: &ClientRequest,
443 _storage: Option<&Mutex<SqliteStore>>,
444 ) -> Result<bool> {
445 let mut changed = false;
446 for token in text.split_ascii_whitespace() {
447 if let Ok(v) = hex::decode(token) {
448 if let Ok(id) = v.try_into() {
449 if self.got_items.get(&id) == Some(&false) {
450 self.got_items.insert(id, true);
451 changed = true;
452 }
453 }
454 }
455 }
456 Ok(changed)
457 }
dl_config(&self) -> Result<DownloadSchedule>458 fn dl_config(&self) -> Result<DownloadSchedule> {
459 Ok(DownloadSchedule::default())
460 }
advance(self: Box<Self>) -> Result<Box<dyn DirState>>461 fn advance(self: Box<Self>) -> Result<Box<dyn DirState>> {
462 if self.can_advance() {
463 Ok(Box::new(Self::new2()))
464 } else {
465 Ok(self)
466 }
467 }
reset_time(&self) -> Option<SystemTime>468 fn reset_time(&self) -> Option<SystemTime> {
469 None
470 }
reset(self: Box<Self>) -> Result<Box<dyn DirState>>471 fn reset(self: Box<Self>) -> Result<Box<dyn DirState>> {
472 Ok(Box::new(Self::new1()))
473 }
474 }
475
476 #[test]
all_in_cache()477 fn all_in_cache() {
478 // Let's try bootstrapping when everything is in the cache.
479 tor_rtcompat::test_with_one_runtime!(|rt| async {
480 let (_tempdir, mgr) = new_mgr(rt);
481
482 {
483 let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
484 for h in [H1, H2, H3, H4, H5] {
485 store
486 .store_microdescs(vec![("ignore", &h)], SystemTime::now())
487 .unwrap();
488 }
489 }
490 let mgr = Arc::new(mgr);
491
492 // Try just a load.
493 let state = Box::new(DemoState::new1());
494 let result = super::load(Arc::clone(&mgr), state).await.unwrap();
495 assert!(result.is_ready(Readiness::Complete));
496
497 // Try a bootstrap that could (but won't!) download.
498 let state = Box::new(DemoState::new1());
499
500 let mut on_usable = None;
501 let result = super::download(Arc::downgrade(&mgr), state, &mut on_usable)
502 .await
503 .unwrap();
504 assert!(result.0.is_ready(Readiness::Complete));
505 });
506 }
507
508 #[test]
partly_in_cache()509 fn partly_in_cache() {
510 // Let's try bootstrapping with all of phase1 and part of
511 // phase 2 in cache.
512 tor_rtcompat::test_with_one_runtime!(|rt| async {
513 let (_tempdir, mgr) = new_mgr(rt);
514
515 {
516 let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
517 for h in [H1, H2, H3] {
518 store
519 .store_microdescs(vec![("ignore", &h)], SystemTime::now())
520 .unwrap();
521 }
522 }
523 {
524 let mut resp = CANNED_RESPONSE.lock().unwrap();
525 // H4 and H5.
526 *resp = Some(
527 "7768696c652069206c696b6520746f207761746368207468696e6773206f6e20
528 545620536174656c6c697465206f66206c6f766520536174656c6c6974652d2d"
529 .to_owned(),
530 );
531 }
532 let mgr = Arc::new(mgr);
533 let mut on_usable = None;
534
535 let state = Box::new(DemoState::new1());
536 let result = super::download(Arc::downgrade(&mgr), state, &mut on_usable)
537 .await
538 .unwrap();
539 assert!(result.0.is_ready(Readiness::Complete));
540 });
541 }
542 }
543