1 //! `tor-dirmgr`: Code to fetch, store, and update Tor directory information.
2 //!
3 //! # Overview
4 //!
5 //! This crate is part of
6 //! [Arti](https://gitlab.torproject.org/tpo/core/arti/), a project to
7 //! implement [Tor](https://www.torproject.org/) in Rust.
8 //!
9 //! In its current design, Tor requires a set of up-to-date
10 //! authenticated directory documents in order to build multi-hop
11 //! anonymized circuits through the network.
12 //!
13 //! This directory manager crate is responsible for figuring out which
14 //! directory information we lack, downloading what we're missing, and
15 //! keeping a cache of it on disk.
16 //!
17 //! # Compile-time features
18 //!
19 //! `mmap` (default) -- Use memory mapping to reduce the memory load for
20 //! reading large directory objects from disk.
21 //!
22 //! `static` -- Try to link with a static copy of sqlite3.
23 //!
24 //! `routerdesc` -- (Incomplete) support for downloading and storing
25 //!      router descriptors.
26 
27 #![deny(missing_docs)]
28 #![warn(noop_method_call)]
29 #![deny(unreachable_pub)]
30 #![deny(clippy::await_holding_lock)]
31 #![deny(clippy::cargo_common_metadata)]
32 #![deny(clippy::cast_lossless)]
33 #![deny(clippy::checked_conversions)]
34 #![warn(clippy::clone_on_ref_ptr)]
35 #![warn(clippy::cognitive_complexity)]
36 #![deny(clippy::debug_assert_with_mut_call)]
37 #![deny(clippy::exhaustive_enums)]
38 #![deny(clippy::exhaustive_structs)]
39 #![deny(clippy::expl_impl_clone_on_copy)]
40 #![deny(clippy::fallible_impl_from)]
41 #![deny(clippy::implicit_clone)]
42 #![deny(clippy::large_stack_arrays)]
43 #![warn(clippy::manual_ok_or)]
44 #![deny(clippy::missing_docs_in_private_items)]
45 #![deny(clippy::missing_panics_doc)]
46 #![warn(clippy::needless_borrow)]
47 #![warn(clippy::needless_pass_by_value)]
48 #![warn(clippy::option_option)]
49 #![warn(clippy::rc_buffer)]
50 #![deny(clippy::ref_option_ref)]
51 #![warn(clippy::semicolon_if_nothing_returned)]
52 #![warn(clippy::trait_duplication_in_bounds)]
53 #![deny(clippy::unnecessary_wraps)]
54 #![warn(clippy::unseparated_literal_suffix)]
55 #![deny(clippy::unwrap_used)]
56 
57 pub mod authority;
58 mod bootstrap;
59 mod config;
60 mod docid;
61 mod docmeta;
62 mod err;
63 mod event;
64 mod retry;
65 mod shared_ref;
66 mod state;
67 mod storage;
68 
69 use crate::docid::{CacheUsage, ClientRequest, DocQuery};
70 use crate::shared_ref::SharedMutArc;
71 use crate::storage::sqlite::SqliteStore;
72 pub use retry::DownloadSchedule;
73 use tor_circmgr::CircMgr;
74 use tor_netdir::NetDir;
75 use tor_netdoc::doc::netstatus::ConsensusFlavor;
76 
77 use futures::{channel::oneshot, task::SpawnExt};
78 use tor_rtcompat::{Runtime, SleepProviderExt};
79 use tracing::{info, trace, warn};
80 
81 use std::sync::atomic::{AtomicBool, Ordering};
82 use std::sync::{Arc, Mutex};
83 use std::{collections::HashMap, sync::Weak};
84 use std::{fmt::Debug, time::SystemTime};
85 
86 pub use authority::{Authority, AuthorityBuilder};
87 pub use config::{
88     DirMgrConfig, DirMgrConfigBuilder, DownloadScheduleConfig, DownloadScheduleConfigBuilder,
89     NetworkConfig, NetworkConfigBuilder,
90 };
91 pub use docid::DocId;
92 pub use err::Error;
93 pub use event::DirEvent;
94 pub use storage::DocumentText;
95 pub use tor_netdir::fallback::{FallbackDir, FallbackDirBuilder};
96 
97 /// A Result as returned by this crate.
98 pub type Result<T> = std::result::Result<T, Error>;
99 
100 /// A directory manager to download, fetch, and cache a Tor directory.
101 ///
102 /// A DirMgr can operate in three modes:
103 ///   * In **offline** mode, it only reads from the cache, and can
104 ///     only read once.
105 ///   * In **read-only** mode, it reads from the cache, but checks
106 ///     whether it can acquire an associated lock file.  If it can, then
107 ///     it enters read-write mode.  If not, it checks the cache
108 ///     periodically for new information.
109 ///   * In **read-write** mode, it knows that no other process will be
110 ///     writing to the cache, and it takes responsibility for fetching
111 ///     data from the network and updating the directory with new
112 ///     directory information.
113 pub struct DirMgr<R: Runtime> {
114     /// Configuration information: where to find directories, how to
115     /// validate them, and so on.
116     config: DirMgrConfig,
117     /// Handle to our sqlite cache.
118     // XXXX I'd like to use an rwlock, but that's not feasible, since
119     // rusqlite::Connection isn't Sync.
120     store: Mutex<SqliteStore>,
121     /// Our latest sufficiently bootstrapped directory, if we have one.
122     ///
123     /// We use the RwLock so that we can give this out to a bunch of other
124     /// users, and replace it once a new directory is bootstrapped.
125     netdir: SharedMutArc<NetDir>,
126 
127     /// A flag that gets set whenever the _consensus_ part of `netdir` has
128     /// changed.
129     netdir_consensus_changed: AtomicBool,
130 
131     /// A flag that gets set whenever the _descriptors_ part of `netdir` has
132     /// changed without adding a new consensus.
133     netdir_descriptors_changed: AtomicBool,
134 
135     /// A publisher handle, used to inform others about changes in the
136     /// status of this directory handle.
137     publisher: event::Publisher,
138 
139     /// A circuit manager, if this DirMgr supports downloading.
140     circmgr: Option<Arc<CircMgr<R>>>,
141 
142     /// Our asynchronous runtime.
143     runtime: R,
144 }
145 
146 impl<R: Runtime> DirMgr<R> {
147     /// Try to load the directory from disk, without launching any
148     /// kind of update process.
149     ///
150     /// This function runs in **offline** mode: it will give an error
151     /// if the result is not up-to-date, or not fully downloaded.
152     ///
153     /// In general, you shouldn't use this function in a long-running
154     /// program; it's only suitable for command-line or batch tools.
155     // TODO: I wish this function didn't have to be async or take a runtime.
load_once(runtime: R, config: DirMgrConfig) -> Result<Arc<NetDir>>156     pub async fn load_once(runtime: R, config: DirMgrConfig) -> Result<Arc<NetDir>> {
157         let dirmgr = Arc::new(Self::from_config(config, runtime, None, true)?);
158 
159         // TODO: add some way to return a directory that isn't up-to-date
160         let _success = dirmgr.load_directory().await?;
161 
162         dirmgr.opt_netdir().ok_or(Error::DirectoryNotPresent)
163     }
164 
165     /// Return a current netdir, either loading it or bootstrapping it
166     /// as needed.
167     ///
168     /// Like load_once, but will try to bootstrap (or wait for another
169     /// process to bootstrap) if we don't have an up-to-date
170     /// bootstrapped directory.
171     ///
172     /// In general, you shouldn't use this function in a long-running
173     /// program; it's only suitable for command-line or batch tools.
load_or_bootstrap_once( config: DirMgrConfig, runtime: R, circmgr: Arc<CircMgr<R>>, ) -> Result<Arc<NetDir>>174     pub async fn load_or_bootstrap_once(
175         config: DirMgrConfig,
176         runtime: R,
177         circmgr: Arc<CircMgr<R>>,
178     ) -> Result<Arc<NetDir>> {
179         let dirmgr = DirMgr::bootstrap_from_config(config, runtime, circmgr).await?;
180         Ok(dirmgr.netdir())
181     }
182 
183     /// Return a new directory manager from a given configuration,
184     /// bootstrapping from the network as necessary.
185     ///
186     /// This function will to return until the directory is
187     /// bootstrapped enough to build circuits.  It will also launch a
188     /// background task that fetches any missing information, and that
189     /// replaces the directory when a new one is available.
bootstrap_from_config( config: DirMgrConfig, runtime: R, circmgr: Arc<CircMgr<R>>, ) -> Result<Arc<Self>>190     pub async fn bootstrap_from_config(
191         config: DirMgrConfig,
192         runtime: R,
193         circmgr: Arc<CircMgr<R>>,
194     ) -> Result<Arc<Self>> {
195         let dirmgr = Arc::new(DirMgr::from_config(
196             config,
197             runtime.clone(),
198             Some(circmgr),
199             false,
200         )?);
201 
202         // Try to load from the cache.
203         let have_directory = dirmgr.load_directory().await?;
204 
205         let (mut sender, receiver) = if have_directory {
206             info!("Loaded a good directory from cache.");
207             (None, None)
208         } else {
209             info!("Didn't get usable directory from cache.");
210             let (sender, receiver) = oneshot::channel();
211             (Some(sender), Some(receiver))
212         };
213 
214         // Whether we loaded or not, we now start downloading.
215         let dirmgr_weak = Arc::downgrade(&dirmgr);
216         runtime.spawn(async move {
217             // NOTE: This is a daemon task.  It should eventually get
218             // treated as one.
219 
220             // TODO: don't warn when these are Error::ManagerDropped: that
221             // means that the DirMgr has been shut down.
222             if let Err(e) = Self::reload_until_owner(&dirmgr_weak, &mut sender).await {
223                 warn!("Unrecovered error while waiting for bootstrap: {}", e);
224             } else if let Err(e) = Self::download_forever(dirmgr_weak, sender).await {
225                 warn!("Unrecovered error while downloading: {}", e);
226             }
227         })?;
228 
229         if let Some(receiver) = receiver {
230             match receiver.await {
231                 Ok(()) => {
232                     info!("We have enough information to build circuits.");
233                 }
234                 Err(_) => {
235                     warn!("Bootstrapping task exited before finishing.");
236                     return Err(Error::CantAdvanceState);
237                 }
238             }
239         }
240 
241         Ok(dirmgr)
242     }
243 
244     /// Try forever to either lock the storage (and thereby become the
245     /// owner), or to reload the database.
246     ///
247     /// If we have begin to have a bootstrapped directory, send a
248     /// message using `on_complete`.
249     ///
250     /// If we eventually become the owner, return Ok().
reload_until_owner( weak: &Weak<Self>, on_complete: &mut Option<oneshot::Sender<()>>, ) -> Result<()>251     async fn reload_until_owner(
252         weak: &Weak<Self>,
253         on_complete: &mut Option<oneshot::Sender<()>>,
254     ) -> Result<()> {
255         let mut logged = false;
256         let mut bootstrapped;
257         let runtime;
258         {
259             let dirmgr = upgrade_weak_ref(weak)?;
260             runtime = dirmgr.runtime.clone();
261             bootstrapped = dirmgr.netdir.get().is_some();
262         }
263 
264         loop {
265             {
266                 let dirmgr = upgrade_weak_ref(weak)?;
267                 trace!("Trying to take ownership of the directory cache lock");
268                 if dirmgr.try_upgrade_to_readwrite()? {
269                     // We now own the lock!  (Maybe we owned it before; the
270                     // upgrade_to_readwrite() function is idempotent.)  We can
271                     // do our own bootstrapping.
272                     if logged {
273                         info!("The previous owning process has given up the lock. We are now in charge of managing the directory.");
274                     }
275                     return Ok(());
276                 }
277             }
278 
279             if !logged {
280                 logged = true;
281                 if bootstrapped {
282                     info!("Another process is managing the directory. We'll use its cache.");
283                 } else {
284                     info!("Another process is bootstrapping the directory. Waiting till it finishes or exits.");
285                 }
286             }
287 
288             // We don't own the lock.  Somebody else owns the cache.  They
289             // should be updating it.  Wait a bit, then try again.
290             let pause = if bootstrapped {
291                 std::time::Duration::new(120, 0)
292             } else {
293                 std::time::Duration::new(5, 0)
294             };
295             runtime.sleep(pause).await;
296             // TODO: instead of loading the whole thing we should have a
297             // database entry that says when the last update was, or use
298             // our state functions.
299             {
300                 let dirmgr = upgrade_weak_ref(weak)?;
301                 trace!("Trying to load from the directory cache");
302                 if dirmgr.load_directory().await? {
303                     // Successfully loaded a bootstrapped directory.
304                     if let Some(send_done) = on_complete.take() {
305                         let _ = send_done.send(());
306                     }
307                     if !bootstrapped {
308                         info!("The directory is now bootstrapped.");
309                     }
310                     bootstrapped = true;
311                 }
312             }
313         }
314     }
315 
316     /// Try to fetch our directory info and keep it updated, indefinitely.
317     ///
318     /// If we have begin to have a bootstrapped directory, send a
319     /// message using `on_complete`.
download_forever( weak: Weak<Self>, mut on_complete: Option<oneshot::Sender<()>>, ) -> Result<()>320     async fn download_forever(
321         weak: Weak<Self>,
322         mut on_complete: Option<oneshot::Sender<()>>,
323     ) -> Result<()> {
324         let mut state: Box<dyn DirState> = Box::new(state::GetConsensusState::new(
325             Weak::clone(&weak),
326             CacheUsage::CacheOkay,
327         )?);
328 
329         let (retry_config, runtime) = {
330             let dirmgr = upgrade_weak_ref(&weak)?;
331             (
332                 *dirmgr.config.schedule().retry_bootstrap(),
333                 dirmgr.runtime.clone(),
334             )
335         };
336 
337         loop {
338             let mut usable = false;
339             let mut retry_delay = retry_config.schedule();
340 
341             'retry_attempt: for _ in retry_config.attempts() {
342                 let (newstate, recoverable_err) =
343                     bootstrap::download(Weak::clone(&weak), state, &mut on_complete).await?;
344                 state = newstate;
345 
346                 if let Some(err) = recoverable_err {
347                     if state.is_ready(Readiness::Usable) {
348                         usable = true;
349                         info!("Unable to completely download a directory: {}.  Nevertheless, the directory is usable, so we'll pause for now.", err);
350                         break 'retry_attempt;
351                     }
352 
353                     let delay = retry_delay.next_delay(&mut rand::thread_rng());
354                     warn!(
355                         "Unable to download a usable directory: {}.  We will restart in {:?}.",
356                         err, delay
357                     );
358                     runtime.sleep(delay).await;
359                     state = state.reset()?;
360                 } else {
361                     info!("Directory is complete.");
362                     usable = true;
363                     break 'retry_attempt;
364                 }
365             }
366 
367             if !usable {
368                 // we ran out of attempts.
369                 warn!(
370                     "We failed {} times to bootstrap a directory. We're going to give up.",
371                     retry_config.n_attempts()
372                 );
373                 return Err(Error::CantAdvanceState);
374             } else {
375                 // Report success, if appropriate.
376                 if let Some(send_done) = on_complete.take() {
377                     let _ = send_done.send(());
378                 }
379             }
380 
381             let reset_at = state.reset_time();
382             match reset_at {
383                 Some(t) => runtime.sleep_until_wallclock(t).await,
384                 None => return Ok(()),
385             }
386             state = state.reset()?;
387         }
388     }
389 
390     /// Get a reference to the circuit manager, if we have one.
circmgr(&self) -> Result<Arc<CircMgr<R>>>391     fn circmgr(&self) -> Result<Arc<CircMgr<R>>> {
392         self.circmgr
393             .as_ref()
394             .map(Arc::clone)
395             .ok_or(Error::NoDownloadSupport)
396     }
397 
398     /// Try to make this a directory manager with read-write access to its
399     /// storage.
400     ///
401     /// Return true if we got the lock, or if we already had it.
402     ///
403     /// Return false if another process has the lock
try_upgrade_to_readwrite(&self) -> Result<bool>404     fn try_upgrade_to_readwrite(&self) -> Result<bool> {
405         self.store
406             .lock()
407             .expect("Directory storage lock poisoned")
408             .upgrade_to_readwrite()
409     }
410 
411     /// Return a reference to the store, if it is currently read-write.
store_if_rw(&self) -> Option<&Mutex<SqliteStore>>412     fn store_if_rw(&self) -> Option<&Mutex<SqliteStore>> {
413         let rw = !self
414             .store
415             .lock()
416             .expect("Directory storage lock poisoned")
417             .is_readonly();
418         // A race-condition is possible here, but I believe it's harmless.
419         if rw {
420             Some(&self.store)
421         } else {
422             None
423         }
424     }
425 
426     /// Construct a DirMgr from a DirMgrConfig.
from_config( config: DirMgrConfig, runtime: R, circmgr: Option<Arc<CircMgr<R>>>, readonly: bool, ) -> Result<Self>427     fn from_config(
428         config: DirMgrConfig,
429         runtime: R,
430         circmgr: Option<Arc<CircMgr<R>>>,
431         readonly: bool,
432     ) -> Result<Self> {
433         let store = Mutex::new(config.open_sqlite_store(readonly)?);
434         let netdir = SharedMutArc::new();
435         let netdir_consensus_changed = AtomicBool::new(false);
436         let netdir_descriptors_changed = AtomicBool::new(false);
437         let publisher = event::Publisher::new();
438         Ok(DirMgr {
439             config,
440             store,
441             netdir,
442             netdir_consensus_changed,
443             netdir_descriptors_changed,
444             publisher,
445             circmgr,
446             runtime,
447         })
448     }
449 
450     /// Load the latest non-pending non-expired directory from the
451     /// cache, if it is newer than the one we have.
452     ///
453     /// Return false if there is no such consensus.
load_directory(self: &Arc<Self>) -> Result<bool>454     async fn load_directory(self: &Arc<Self>) -> Result<bool> {
455         let state = state::GetConsensusState::new(Arc::downgrade(self), CacheUsage::CacheOnly)?;
456         let _ = bootstrap::load(Arc::clone(self), Box::new(state)).await?;
457 
458         Ok(self.netdir.get().is_some())
459     }
460 
461     /// Return an Arc handle to our latest directory, if we have one.
462     ///
463     /// This is a private method, since by the time anybody else has a
464     /// handle to a DirMgr, the NetDir should definitely be
465     /// bootstrapped.
opt_netdir(&self) -> Option<Arc<NetDir>>466     fn opt_netdir(&self) -> Option<Arc<NetDir>> {
467         self.netdir.get()
468     }
469 
470     /// Return an Arc handle to our latest directory, if we have one.
471     // TODO: Add variants of this that make sure that it's up-to-date?
netdir(&self) -> Arc<NetDir>472     pub fn netdir(&self) -> Arc<NetDir> {
473         self.opt_netdir().expect("DirMgr was not bootstrapped!")
474     }
475 
476     /// Return a new asynchronous stream about events taking place with
477     /// this directory manager.
478     ///
479     /// The caller must regularly process events from this stream to
480     /// prevent it from blocking.
events(&self) -> impl futures::Stream<Item = DirEvent>481     pub fn events(&self) -> impl futures::Stream<Item = DirEvent> {
482         self.publisher.subscribe()
483     }
484 
485     /// Try to load the text of a single document described by `doc` from
486     /// storage.
text(&self, doc: &DocId) -> Result<Option<DocumentText>>487     pub fn text(&self, doc: &DocId) -> Result<Option<DocumentText>> {
488         use itertools::Itertools;
489         let mut result = HashMap::new();
490         let query = (*doc).into();
491         self.load_documents_into(&query, &mut result)?;
492         let item = result.into_iter().at_most_one().map_err(|_| {
493             Error::CacheCorruption("Found more than one entry in storage for given docid")
494         })?;
495         if let Some((docid, doctext)) = item {
496             if &docid != doc {
497                 return Err(Error::CacheCorruption(
498                     "Item from storage had incorrect docid.",
499                 ));
500             }
501             Ok(Some(doctext))
502         } else {
503             Ok(None)
504         }
505     }
506 
507     /// Load the text for a collection of documents.
508     ///
509     /// If many of the documents have the same type, this can be more
510     /// efficient than calling [`text`](Self::text).
texts<T>(&self, docs: T) -> Result<HashMap<DocId, DocumentText>> where T: IntoIterator<Item = DocId>,511     pub fn texts<T>(&self, docs: T) -> Result<HashMap<DocId, DocumentText>>
512     where
513         T: IntoIterator<Item = DocId>,
514     {
515         let partitioned = docid::partition_by_type(docs);
516         let mut result = HashMap::new();
517         for (_, query) in partitioned.into_iter() {
518             self.load_documents_into(&query, &mut result)?;
519         }
520         Ok(result)
521     }
522 
523     /// If the consensus has changed, notify any subscribers.
524     // TODO: I don't like all the different places in `bootstrap`
525     // where we have to call this function.  Can we simplify it or
526     // clean it up somehow?  Maybe we can build some kind of intelligence into
527     // shared_ref?
notify(&self)528     pub(crate) async fn notify(&self) {
529         if self.netdir_consensus_changed.swap(false, Ordering::SeqCst) {
530             self.publisher.send(DirEvent::NewConsensus).await;
531         }
532         if self
533             .netdir_descriptors_changed
534             .swap(false, Ordering::SeqCst)
535         {
536             self.publisher.send(DirEvent::NewDescriptors).await;
537         }
538     }
539 
540     /// Load all the documents for a single DocumentQuery from the store.
load_documents_into( &self, query: &DocQuery, result: &mut HashMap<DocId, DocumentText>, ) -> Result<()>541     fn load_documents_into(
542         &self,
543         query: &DocQuery,
544         result: &mut HashMap<DocId, DocumentText>,
545     ) -> Result<()> {
546         use DocQuery::*;
547         let store = self.store.lock().expect("Directory storage lock poisoned");
548         match query {
549             LatestConsensus {
550                 flavor,
551                 cache_usage,
552             } => {
553                 if *cache_usage == CacheUsage::MustDownload {
554                     // Do nothing: we don't want a cached consensus.
555                     trace!("MustDownload is set; not checking for cached consensus.");
556                 } else if let Some(c) =
557                     store.latest_consensus(*flavor, cache_usage.pending_requirement())?
558                 {
559                     trace!("Found a reasonable consensus in the cache");
560                     let id = DocId::LatestConsensus {
561                         flavor: *flavor,
562                         cache_usage: *cache_usage,
563                     };
564                     result.insert(id, c.into());
565                 }
566             }
567             AuthCert(ids) => result.extend(
568                 store
569                     .authcerts(ids)?
570                     .into_iter()
571                     .map(|(id, c)| (DocId::AuthCert(id), DocumentText::from_string(c))),
572             ),
573             Microdesc(digests) => {
574                 result.extend(
575                     store
576                         .microdescs(digests)?
577                         .into_iter()
578                         .map(|(id, md)| (DocId::Microdesc(id), DocumentText::from_string(md))),
579                 );
580             }
581             #[cfg(feature = "routerdesc")]
582             RouterDesc(digests) => result.extend(
583                 store
584                     .routerdescs(digests)?
585                     .into_iter()
586                     .map(|(id, rd)| (DocId::RouterDesc(id), DocumentText::from_string(rd))),
587             ),
588         }
589         Ok(())
590     }
591 
592     /// Convert a DocQuery into a set of ClientRequests, suitable for sending
593     /// to a directory cache.
594     ///
595     /// This conversion has to be a function of the dirmgr, since it may
596     /// require knowledge about our current state.
query_into_requests(&self, q: DocQuery) -> Result<Vec<ClientRequest>>597     fn query_into_requests(&self, q: DocQuery) -> Result<Vec<ClientRequest>> {
598         let mut res = Vec::new();
599         for q in q.split_for_download() {
600             match q {
601                 DocQuery::LatestConsensus { flavor, .. } => {
602                     res.push(self.make_consensus_request(flavor)?);
603                 }
604                 DocQuery::AuthCert(ids) => {
605                     res.push(ClientRequest::AuthCert(ids.into_iter().collect()));
606                 }
607                 DocQuery::Microdesc(ids) => {
608                     res.push(ClientRequest::Microdescs(ids.into_iter().collect()));
609                 }
610                 #[cfg(feature = "routerdesc")]
611                 DocQuery::RouterDesc(ids) => {
612                     res.push(ClientRequest::RouterDescs(ids.into_iter().collect()));
613                 }
614             }
615         }
616         Ok(res)
617     }
618 
619     /// Construct an appropriate ClientRequest to download a consensus
620     /// of the given flavor.
make_consensus_request(&self, flavor: ConsensusFlavor) -> Result<ClientRequest>621     fn make_consensus_request(&self, flavor: ConsensusFlavor) -> Result<ClientRequest> {
622         #![allow(clippy::unnecessary_wraps)]
623         let mut request = tor_dirclient::request::ConsensusRequest::new(flavor);
624 
625         let r = self.store.lock().expect("Directory storage lock poisoned");
626         match r.latest_consensus_meta(flavor) {
627             Ok(Some(meta)) => {
628                 request.set_last_consensus_date(meta.lifetime().valid_after());
629                 request.push_old_consensus_digest(*meta.sha3_256_of_signed());
630             }
631             Ok(None) => {}
632             Err(e) => {
633                 warn!("Error loading directory metadata: {}", e);
634             }
635         }
636 
637         Ok(ClientRequest::Consensus(request))
638     }
639 
640     /// Given a request we sent and the response we got from a
641     /// directory server, see whether we should expand that response
642     /// into "something larger".
643     ///
644     /// Currently, this handles expanding consensus diffs, and nothing
645     /// else.  We do it at this stage of our downloading operation
646     /// because it requires access to the store.
expand_response_text(&self, req: &ClientRequest, text: String) -> Result<String>647     fn expand_response_text(&self, req: &ClientRequest, text: String) -> Result<String> {
648         if let ClientRequest::Consensus(req) = req {
649             if tor_consdiff::looks_like_diff(&text) {
650                 if let Some(old_d) = req.old_consensus_digests().next() {
651                     let db_val = {
652                         let s = self.store.lock().expect("Directory storage lock poisoned");
653                         s.consensus_by_sha3_digest_of_signed_part(old_d)?
654                     };
655                     if let Some((old_consensus, meta)) = db_val {
656                         info!("Applying a consensus diff");
657                         let new_consensus = tor_consdiff::apply_diff(
658                             old_consensus.as_str()?,
659                             &text,
660                             Some(*meta.sha3_256_of_signed()),
661                         )?;
662                         new_consensus.check_digest()?;
663                         return Ok(new_consensus.to_string());
664                     }
665                 }
666                 return Err(Error::Unwanted(
667                     "Received a consensus diff we did not ask for",
668                 ));
669             }
670         }
671         Ok(text)
672     }
673 }
674 
675 /// A degree of readiness for a given directory state object.
676 #[derive(Debug, Copy, Clone)]
677 enum Readiness {
678     /// There is no more information to download.
679     Complete,
680     /// There is more information to download, but we don't need to
681     Usable,
682 }
683 
684 /// A "state" object used to represent our progress in downloading a
685 /// directory.
686 ///
687 /// These state objects are not meant to know about the network, or
688 /// how to fetch documents at all.  Instead, they keep track of what
689 /// information they are missing, and what to do when they get that
690 /// information.
691 ///
692 /// Every state object has two possible transitions: "resetting", and
693 /// "advancing".  Advancing happens when a state has no more work to
694 /// do, and needs to transform into a different kind of object.
695 /// Resetting happens when this state needs to go back to an initial
696 /// state in order to start over -- either because of an error or
697 /// because the information it has downloaded is no longer timely.
698 trait DirState: Send {
699     /// Return a human-readable description of this state.
describe(&self) -> String700     fn describe(&self) -> String;
701     /// Return a list of the documents we're missing.
702     ///
703     /// If every document on this list were to be loaded or downloaded, then
704     /// the state should either become "ready to advance", or "complete."
705     ///
706     /// This list should never _grow_ on a given state; only advancing
707     /// or resetting the state should add new DocIds that weren't
708     /// there before.
missing_docs(&self) -> Vec<DocId>709     fn missing_docs(&self) -> Vec<DocId>;
710     /// Describe whether this state has reached `ready` status.
is_ready(&self, ready: Readiness) -> bool711     fn is_ready(&self, ready: Readiness) -> bool;
712     /// Return true if this state can advance to another state via its
713     /// `advance` method.
can_advance(&self) -> bool714     fn can_advance(&self) -> bool;
715     /// Add one or more documents from our cache; returns 'true' if there
716     /// was any change in this state.
717     ///
718     /// If `storage` is provided, then we should write any state changes into
719     /// it.  (We don't read from it in this method.)
add_from_cache( &mut self, docs: HashMap<DocId, DocumentText>, storage: Option<&Mutex<SqliteStore>>, ) -> Result<bool>720     fn add_from_cache(
721         &mut self,
722         docs: HashMap<DocId, DocumentText>,
723         storage: Option<&Mutex<SqliteStore>>,
724     ) -> Result<bool>;
725 
726     /// Add information that we have just downloaded to this state; returns
727     /// 'true' if there as any change in this state.
728     ///
729     /// This method receives a copy of the original request, and
730     /// should reject any documents that do not pertain to it.
731     ///
732     /// If `storage` is provided, then we should write any accepted documents
733     /// into `storage` so they can be saved in a cache.
734     // TODO: It might be good to say "there was a change but also an
735     // error" in this API if possible.
736     // TODO: It would be better to not have this function be async,
737     // once the `must_not_suspend` lint is stable.
738     // TODO: this should take a "DirSource" too.
add_from_download( &mut self, text: &str, request: &ClientRequest, storage: Option<&Mutex<SqliteStore>>, ) -> Result<bool>739     fn add_from_download(
740         &mut self,
741         text: &str,
742         request: &ClientRequest,
743         storage: Option<&Mutex<SqliteStore>>,
744     ) -> Result<bool>;
745     /// Return a configuration for attempting downloads.
dl_config(&self) -> Result<DownloadSchedule>746     fn dl_config(&self) -> Result<DownloadSchedule>;
747     /// If possible, advance to the next state.
advance(self: Box<Self>) -> Result<Box<dyn DirState>>748     fn advance(self: Box<Self>) -> Result<Box<dyn DirState>>;
749     /// Return a time (if any) when downloaders should stop attempting to
750     /// advance this state, and should instead reset it and start over.
reset_time(&self) -> Option<SystemTime>751     fn reset_time(&self) -> Option<SystemTime>;
752     /// Reset this state and start over.
reset(self: Box<Self>) -> Result<Box<dyn DirState>>753     fn reset(self: Box<Self>) -> Result<Box<dyn DirState>>;
754 }
755 
756 /// Try to upgrade a weak reference to a DirMgr, and give an error on
757 /// failure.
upgrade_weak_ref<T>(weak: &Weak<T>) -> Result<Arc<T>>758 fn upgrade_weak_ref<T>(weak: &Weak<T>) -> Result<Arc<T>> {
759     Weak::upgrade(weak).ok_or(Error::ManagerDropped)
760 }
761 
762 #[cfg(test)]
763 mod test {
764     #![allow(clippy::unwrap_used)]
765     use super::*;
766     use crate::docmeta::{AuthCertMeta, ConsensusMeta};
767     use std::time::Duration;
768     use tempfile::TempDir;
769     use tor_netdoc::doc::{authcert::AuthCertKeyIds, netstatus::Lifetime};
770 
new_mgr<R: Runtime>(runtime: R) -> (TempDir, DirMgr<R>)771     pub(crate) fn new_mgr<R: Runtime>(runtime: R) -> (TempDir, DirMgr<R>) {
772         let dir = TempDir::new().unwrap();
773         let config = DirMgrConfig::builder()
774             .cache_path(dir.path())
775             .build()
776             .unwrap();
777         let dirmgr = DirMgr::from_config(config, runtime, None, false).unwrap();
778 
779         (dir, dirmgr)
780     }
781 
782     #[test]
failing_accessors()783     fn failing_accessors() {
784         tor_rtcompat::test_with_one_runtime!(|rt| async {
785             let (_tempdir, mgr) = new_mgr(rt);
786 
787             assert!(mgr.circmgr().is_err());
788             assert!(mgr.opt_netdir().is_none());
789         });
790     }
791 
792     #[test]
load_and_store_internals()793     fn load_and_store_internals() {
794         tor_rtcompat::test_with_one_runtime!(|rt| async {
795             let (_tempdir, mgr) = new_mgr(rt);
796 
797             let now = SystemTime::now();
798             let tomorrow = now + Duration::from_secs(86400);
799             let later = tomorrow + Duration::from_secs(86400);
800 
801             // Seed the storage with a bunch of junk.
802             let d1 = [5_u8; 32];
803             let d2 = [7; 32];
804             let d3 = [42; 32];
805             let d4 = [99; 20];
806             let d5 = [12; 20];
807             let certid1 = AuthCertKeyIds {
808                 id_fingerprint: d4.into(),
809                 sk_fingerprint: d5.into(),
810             };
811             let certid2 = AuthCertKeyIds {
812                 id_fingerprint: d5.into(),
813                 sk_fingerprint: d4.into(),
814             };
815 
816             {
817                 let mut store = mgr.store.lock().unwrap();
818 
819                 store
820                     .store_microdescs(
821                         vec![
822                             ("Fake micro 1", &d1),
823                             ("Fake micro 2", &d2),
824                             ("Fake micro 3", &d3),
825                         ],
826                         now,
827                     )
828                     .unwrap();
829 
830                 #[cfg(feature = "routerdesc")]
831                 store
832                     .store_routerdescs(vec![("Fake rd1", now, &d4), ("Fake rd2", now, &d5)])
833                     .unwrap();
834 
835                 store
836                     .store_authcerts(&[
837                         (
838                             AuthCertMeta::new(certid1, now, tomorrow),
839                             "Fake certificate one",
840                         ),
841                         (
842                             AuthCertMeta::new(certid2, now, tomorrow),
843                             "Fake certificate two",
844                         ),
845                     ])
846                     .unwrap();
847 
848                 let cmeta = ConsensusMeta::new(
849                     Lifetime::new(now, tomorrow, later).unwrap(),
850                     [102; 32],
851                     [103; 32],
852                 );
853                 store
854                     .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
855                     .unwrap();
856             }
857 
858             // Try to get it with text().
859             let t1 = mgr.text(&DocId::Microdesc(d1)).unwrap().unwrap();
860             assert_eq!(t1.as_str(), Ok("Fake micro 1"));
861 
862             let t2 = mgr
863                 .text(&DocId::LatestConsensus {
864                     flavor: ConsensusFlavor::Microdesc,
865                     cache_usage: CacheUsage::CacheOkay,
866                 })
867                 .unwrap()
868                 .unwrap();
869             assert_eq!(t2.as_str(), Ok("Fake consensus!"));
870 
871             let t3 = mgr.text(&DocId::Microdesc([255; 32])).unwrap();
872             assert!(t3.is_none());
873 
874             // Now try texts()
875             let d_bogus = DocId::Microdesc([255; 32]);
876             let res = mgr
877                 .texts(vec![
878                     DocId::Microdesc(d2),
879                     DocId::Microdesc(d3),
880                     d_bogus,
881                     DocId::AuthCert(certid2),
882                     #[cfg(feature = "routerdesc")]
883                     DocId::RouterDesc(d5),
884                 ])
885                 .unwrap();
886             assert_eq!(
887                 res.get(&DocId::Microdesc(d2)).unwrap().as_str(),
888                 Ok("Fake micro 2")
889             );
890             assert_eq!(
891                 res.get(&DocId::Microdesc(d3)).unwrap().as_str(),
892                 Ok("Fake micro 3")
893             );
894             assert!(res.get(&d_bogus).is_none());
895             assert_eq!(
896                 res.get(&DocId::AuthCert(certid2)).unwrap().as_str(),
897                 Ok("Fake certificate two")
898             );
899             #[cfg(feature = "routerdesc")]
900             assert_eq!(
901                 res.get(&DocId::RouterDesc(d5)).unwrap().as_str(),
902                 Ok("Fake rd2")
903             );
904         });
905     }
906 
907     #[test]
make_consensus_request()908     fn make_consensus_request() {
909         tor_rtcompat::test_with_one_runtime!(|rt| async {
910             let (_tempdir, mgr) = new_mgr(rt);
911 
912             let now = SystemTime::now();
913             let tomorrow = now + Duration::from_secs(86400);
914             let later = tomorrow + Duration::from_secs(86400);
915 
916             // Try with an empty store.
917             let req = mgr
918                 .make_consensus_request(ConsensusFlavor::Microdesc)
919                 .unwrap();
920             match req {
921                 ClientRequest::Consensus(r) => {
922                     assert_eq!(r.old_consensus_digests().count(), 0);
923                     assert_eq!(r.last_consensus_date(), None);
924                 }
925                 _ => panic!("Wrong request type"),
926             }
927 
928             // Add a fake consensus record.
929             let d_prev = [42; 32];
930             {
931                 let mut store = mgr.store.lock().unwrap();
932 
933                 let cmeta = ConsensusMeta::new(
934                     Lifetime::new(now, tomorrow, later).unwrap(),
935                     d_prev,
936                     [103; 32],
937                 );
938                 store
939                     .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
940                     .unwrap();
941             }
942 
943             // Now try again.
944             let req = mgr
945                 .make_consensus_request(ConsensusFlavor::Microdesc)
946                 .unwrap();
947             match req {
948                 ClientRequest::Consensus(r) => {
949                     let ds: Vec<_> = r.old_consensus_digests().collect();
950                     assert_eq!(ds.len(), 1);
951                     assert_eq!(ds[0], &d_prev);
952                     assert_eq!(r.last_consensus_date(), Some(now));
953                 }
954                 _ => panic!("Wrong request type"),
955             }
956         });
957     }
958 
959     #[test]
make_other_requests()960     fn make_other_requests() {
961         tor_rtcompat::test_with_one_runtime!(|rt| async {
962             use rand::Rng;
963             let (_tempdir, mgr) = new_mgr(rt);
964 
965             let certid1 = AuthCertKeyIds {
966                 id_fingerprint: [99; 20].into(),
967                 sk_fingerprint: [100; 20].into(),
968             };
969             let mut rng = rand::thread_rng();
970             #[cfg(feature = "routerdesc")]
971             let rd_ids: Vec<[u8; 20]> = (0..1000).map(|_| rng.gen()).collect();
972             let md_ids: Vec<[u8; 32]> = (0..1000).map(|_| rng.gen()).collect();
973 
974             // Try an authcert.
975             let query = DocQuery::AuthCert(vec![certid1]);
976             let reqs = mgr.query_into_requests(query).unwrap();
977             assert_eq!(reqs.len(), 1);
978             let req = &reqs[0];
979             if let ClientRequest::AuthCert(r) = req {
980                 assert_eq!(r.keys().next(), Some(&certid1));
981             } else {
982                 panic!();
983             }
984 
985             // Try a bunch of mds.
986             let query = DocQuery::Microdesc(md_ids);
987             let reqs = mgr.query_into_requests(query).unwrap();
988             assert_eq!(reqs.len(), 2);
989             assert!(matches!(reqs[0], ClientRequest::Microdescs(_)));
990 
991             // Try a bunch of rds.
992             #[cfg(feature = "routerdesc")]
993             {
994                 let query = DocQuery::RouterDesc(rd_ids);
995                 let reqs = mgr.query_into_requests(query).unwrap();
996                 assert_eq!(reqs.len(), 2);
997                 assert!(matches!(reqs[0], ClientRequest::RouterDescs(_)));
998             }
999         });
1000     }
1001 
1002     #[test]
expand_response()1003     fn expand_response() {
1004         tor_rtcompat::test_with_one_runtime!(|rt| async {
1005             let (_tempdir, mgr) = new_mgr(rt);
1006 
1007             // Try a simple request: nothing should happen.
1008             let q = DocId::Microdesc([99; 32]).into();
1009             let r = &mgr.query_into_requests(q).unwrap()[0];
1010             let expanded = mgr.expand_response_text(r, "ABC".to_string());
1011             assert_eq!(&expanded.unwrap(), "ABC");
1012 
1013             // Try a consensus response that doesn't look like a diff in
1014             // response to a query that doesn't ask for one.
1015             let latest_id = DocId::LatestConsensus {
1016                 flavor: ConsensusFlavor::Microdesc,
1017                 cache_usage: CacheUsage::CacheOkay,
1018             };
1019             let q: DocQuery = latest_id.into();
1020             let r = &mgr.query_into_requests(q.clone()).unwrap()[0];
1021             let expanded = mgr.expand_response_text(r, "DEF".to_string());
1022             assert_eq!(&expanded.unwrap(), "DEF");
1023 
1024             // Now stick some metadata and a string into the storage so that
1025             // we can ask for a diff.
1026             {
1027                 let mut store = mgr.store.lock().unwrap();
1028                 let now = SystemTime::now();
1029                 let day = Duration::from_secs(86400);
1030                 let d_in = [0x99; 32]; // This one, we can fake.
1031                 let cmeta = ConsensusMeta::new(
1032                     Lifetime::new(now, now + day, now + 2 * day).unwrap(),
1033                     d_in,
1034                     d_in,
1035                 );
1036                 store
1037                     .store_consensus(
1038                         &cmeta,
1039                         ConsensusFlavor::Microdesc,
1040                         false,
1041                         "line 1\nline2\nline 3\n",
1042                     )
1043                     .unwrap();
1044             }
1045 
1046             // Try expanding something that isn't a consensus, even if we'd like
1047             // one.
1048             let r = &mgr.query_into_requests(q).unwrap()[0];
1049             let expanded = mgr.expand_response_text(r, "hello".to_string());
1050             assert_eq!(&expanded.unwrap(), "hello");
1051 
1052             // Finally, try "expanding" a diff (by applying it and checking the digest.
1053             let diff = "network-status-diff-version 1
1054 hash 9999999999999999999999999999999999999999999999999999999999999999 8382374ca766873eb0d2530643191c6eaa2c5e04afa554cbac349b5d0592d300
1055 2c
1056 replacement line
1057 .
1058 ".to_string();
1059             let expanded = mgr.expand_response_text(r, diff);
1060 
1061             assert_eq!(expanded.unwrap(), "line 1\nreplacement line\nline 3\n");
1062 
1063             // If the digest is wrong, that should get rejected.
1064             let diff = "network-status-diff-version 1
1065 hash 9999999999999999999999999999999999999999999999999999999999999999 9999999999999999999999999999999999999999999999999999999999999999
1066 2c
1067 replacement line
1068 .
1069 ".to_string();
1070             let expanded = mgr.expand_response_text(r, diff);
1071             assert!(expanded.is_err());
1072         });
1073     }
1074 }
1075