1 //! `tor-dirmgr`: Code to fetch, store, and update Tor directory information.
2 //!
3 //! # Overview
4 //!
5 //! This crate is part of
6 //! [Arti](https://gitlab.torproject.org/tpo/core/arti/), a project to
7 //! implement [Tor](https://www.torproject.org/) in Rust.
8 //!
9 //! In its current design, Tor requires a set of up-to-date
10 //! authenticated directory documents in order to build multi-hop
11 //! anonymized circuits through the network.
12 //!
13 //! This directory manager crate is responsible for figuring out which
14 //! directory information we lack, downloading what we're missing, and
15 //! keeping a cache of it on disk.
16 //!
17 //! # Compile-time features
18 //!
19 //! `mmap` (default) -- Use memory mapping to reduce the memory load for
20 //! reading large directory objects from disk.
21 //!
22 //! `static` -- Try to link with a static copy of sqlite3.
23 //!
24 //! `routerdesc` -- (Incomplete) support for downloading and storing
25 //! router descriptors.
26
27 #![deny(missing_docs)]
28 #![warn(noop_method_call)]
29 #![deny(unreachable_pub)]
30 #![deny(clippy::await_holding_lock)]
31 #![deny(clippy::cargo_common_metadata)]
32 #![deny(clippy::cast_lossless)]
33 #![deny(clippy::checked_conversions)]
34 #![warn(clippy::clone_on_ref_ptr)]
35 #![warn(clippy::cognitive_complexity)]
36 #![deny(clippy::debug_assert_with_mut_call)]
37 #![deny(clippy::exhaustive_enums)]
38 #![deny(clippy::exhaustive_structs)]
39 #![deny(clippy::expl_impl_clone_on_copy)]
40 #![deny(clippy::fallible_impl_from)]
41 #![deny(clippy::implicit_clone)]
42 #![deny(clippy::large_stack_arrays)]
43 #![warn(clippy::manual_ok_or)]
44 #![deny(clippy::missing_docs_in_private_items)]
45 #![deny(clippy::missing_panics_doc)]
46 #![warn(clippy::needless_borrow)]
47 #![warn(clippy::needless_pass_by_value)]
48 #![warn(clippy::option_option)]
49 #![warn(clippy::rc_buffer)]
50 #![deny(clippy::ref_option_ref)]
51 #![warn(clippy::semicolon_if_nothing_returned)]
52 #![warn(clippy::trait_duplication_in_bounds)]
53 #![deny(clippy::unnecessary_wraps)]
54 #![warn(clippy::unseparated_literal_suffix)]
55 #![deny(clippy::unwrap_used)]
56
57 pub mod authority;
58 mod bootstrap;
59 mod config;
60 mod docid;
61 mod docmeta;
62 mod err;
63 mod event;
64 mod retry;
65 mod shared_ref;
66 mod state;
67 mod storage;
68
69 use crate::docid::{CacheUsage, ClientRequest, DocQuery};
70 use crate::shared_ref::SharedMutArc;
71 use crate::storage::sqlite::SqliteStore;
72 pub use retry::DownloadSchedule;
73 use tor_circmgr::CircMgr;
74 use tor_netdir::NetDir;
75 use tor_netdoc::doc::netstatus::ConsensusFlavor;
76
77 use futures::{channel::oneshot, task::SpawnExt};
78 use tor_rtcompat::{Runtime, SleepProviderExt};
79 use tracing::{info, trace, warn};
80
81 use std::sync::atomic::{AtomicBool, Ordering};
82 use std::sync::{Arc, Mutex};
83 use std::{collections::HashMap, sync::Weak};
84 use std::{fmt::Debug, time::SystemTime};
85
86 pub use authority::{Authority, AuthorityBuilder};
87 pub use config::{
88 DirMgrConfig, DirMgrConfigBuilder, DownloadScheduleConfig, DownloadScheduleConfigBuilder,
89 NetworkConfig, NetworkConfigBuilder,
90 };
91 pub use docid::DocId;
92 pub use err::Error;
93 pub use event::DirEvent;
94 pub use storage::DocumentText;
95 pub use tor_netdir::fallback::{FallbackDir, FallbackDirBuilder};
96
97 /// A Result as returned by this crate.
98 pub type Result<T> = std::result::Result<T, Error>;
99
100 /// A directory manager to download, fetch, and cache a Tor directory.
101 ///
102 /// A DirMgr can operate in three modes:
103 /// * In **offline** mode, it only reads from the cache, and can
104 /// only read once.
105 /// * In **read-only** mode, it reads from the cache, but checks
106 /// whether it can acquire an associated lock file. If it can, then
107 /// it enters read-write mode. If not, it checks the cache
108 /// periodically for new information.
109 /// * In **read-write** mode, it knows that no other process will be
110 /// writing to the cache, and it takes responsibility for fetching
111 /// data from the network and updating the directory with new
112 /// directory information.
113 pub struct DirMgr<R: Runtime> {
114 /// Configuration information: where to find directories, how to
115 /// validate them, and so on.
116 config: DirMgrConfig,
117 /// Handle to our sqlite cache.
118 // XXXX I'd like to use an rwlock, but that's not feasible, since
119 // rusqlite::Connection isn't Sync.
120 store: Mutex<SqliteStore>,
121 /// Our latest sufficiently bootstrapped directory, if we have one.
122 ///
123 /// We use the RwLock so that we can give this out to a bunch of other
124 /// users, and replace it once a new directory is bootstrapped.
125 netdir: SharedMutArc<NetDir>,
126
127 /// A flag that gets set whenever the _consensus_ part of `netdir` has
128 /// changed.
129 netdir_consensus_changed: AtomicBool,
130
131 /// A flag that gets set whenever the _descriptors_ part of `netdir` has
132 /// changed without adding a new consensus.
133 netdir_descriptors_changed: AtomicBool,
134
135 /// A publisher handle, used to inform others about changes in the
136 /// status of this directory handle.
137 publisher: event::Publisher,
138
139 /// A circuit manager, if this DirMgr supports downloading.
140 circmgr: Option<Arc<CircMgr<R>>>,
141
142 /// Our asynchronous runtime.
143 runtime: R,
144 }
145
146 impl<R: Runtime> DirMgr<R> {
147 /// Try to load the directory from disk, without launching any
148 /// kind of update process.
149 ///
150 /// This function runs in **offline** mode: it will give an error
151 /// if the result is not up-to-date, or not fully downloaded.
152 ///
153 /// In general, you shouldn't use this function in a long-running
154 /// program; it's only suitable for command-line or batch tools.
155 // TODO: I wish this function didn't have to be async or take a runtime.
load_once(runtime: R, config: DirMgrConfig) -> Result<Arc<NetDir>>156 pub async fn load_once(runtime: R, config: DirMgrConfig) -> Result<Arc<NetDir>> {
157 let dirmgr = Arc::new(Self::from_config(config, runtime, None, true)?);
158
159 // TODO: add some way to return a directory that isn't up-to-date
160 let _success = dirmgr.load_directory().await?;
161
162 dirmgr.opt_netdir().ok_or(Error::DirectoryNotPresent)
163 }
164
165 /// Return a current netdir, either loading it or bootstrapping it
166 /// as needed.
167 ///
168 /// Like load_once, but will try to bootstrap (or wait for another
169 /// process to bootstrap) if we don't have an up-to-date
170 /// bootstrapped directory.
171 ///
172 /// In general, you shouldn't use this function in a long-running
173 /// program; it's only suitable for command-line or batch tools.
load_or_bootstrap_once( config: DirMgrConfig, runtime: R, circmgr: Arc<CircMgr<R>>, ) -> Result<Arc<NetDir>>174 pub async fn load_or_bootstrap_once(
175 config: DirMgrConfig,
176 runtime: R,
177 circmgr: Arc<CircMgr<R>>,
178 ) -> Result<Arc<NetDir>> {
179 let dirmgr = DirMgr::bootstrap_from_config(config, runtime, circmgr).await?;
180 Ok(dirmgr.netdir())
181 }
182
183 /// Return a new directory manager from a given configuration,
184 /// bootstrapping from the network as necessary.
185 ///
186 /// This function will to return until the directory is
187 /// bootstrapped enough to build circuits. It will also launch a
188 /// background task that fetches any missing information, and that
189 /// replaces the directory when a new one is available.
bootstrap_from_config( config: DirMgrConfig, runtime: R, circmgr: Arc<CircMgr<R>>, ) -> Result<Arc<Self>>190 pub async fn bootstrap_from_config(
191 config: DirMgrConfig,
192 runtime: R,
193 circmgr: Arc<CircMgr<R>>,
194 ) -> Result<Arc<Self>> {
195 let dirmgr = Arc::new(DirMgr::from_config(
196 config,
197 runtime.clone(),
198 Some(circmgr),
199 false,
200 )?);
201
202 // Try to load from the cache.
203 let have_directory = dirmgr.load_directory().await?;
204
205 let (mut sender, receiver) = if have_directory {
206 info!("Loaded a good directory from cache.");
207 (None, None)
208 } else {
209 info!("Didn't get usable directory from cache.");
210 let (sender, receiver) = oneshot::channel();
211 (Some(sender), Some(receiver))
212 };
213
214 // Whether we loaded or not, we now start downloading.
215 let dirmgr_weak = Arc::downgrade(&dirmgr);
216 runtime.spawn(async move {
217 // NOTE: This is a daemon task. It should eventually get
218 // treated as one.
219
220 // TODO: don't warn when these are Error::ManagerDropped: that
221 // means that the DirMgr has been shut down.
222 if let Err(e) = Self::reload_until_owner(&dirmgr_weak, &mut sender).await {
223 warn!("Unrecovered error while waiting for bootstrap: {}", e);
224 } else if let Err(e) = Self::download_forever(dirmgr_weak, sender).await {
225 warn!("Unrecovered error while downloading: {}", e);
226 }
227 })?;
228
229 if let Some(receiver) = receiver {
230 match receiver.await {
231 Ok(()) => {
232 info!("We have enough information to build circuits.");
233 }
234 Err(_) => {
235 warn!("Bootstrapping task exited before finishing.");
236 return Err(Error::CantAdvanceState);
237 }
238 }
239 }
240
241 Ok(dirmgr)
242 }
243
244 /// Try forever to either lock the storage (and thereby become the
245 /// owner), or to reload the database.
246 ///
247 /// If we have begin to have a bootstrapped directory, send a
248 /// message using `on_complete`.
249 ///
250 /// If we eventually become the owner, return Ok().
reload_until_owner( weak: &Weak<Self>, on_complete: &mut Option<oneshot::Sender<()>>, ) -> Result<()>251 async fn reload_until_owner(
252 weak: &Weak<Self>,
253 on_complete: &mut Option<oneshot::Sender<()>>,
254 ) -> Result<()> {
255 let mut logged = false;
256 let mut bootstrapped;
257 let runtime;
258 {
259 let dirmgr = upgrade_weak_ref(weak)?;
260 runtime = dirmgr.runtime.clone();
261 bootstrapped = dirmgr.netdir.get().is_some();
262 }
263
264 loop {
265 {
266 let dirmgr = upgrade_weak_ref(weak)?;
267 trace!("Trying to take ownership of the directory cache lock");
268 if dirmgr.try_upgrade_to_readwrite()? {
269 // We now own the lock! (Maybe we owned it before; the
270 // upgrade_to_readwrite() function is idempotent.) We can
271 // do our own bootstrapping.
272 if logged {
273 info!("The previous owning process has given up the lock. We are now in charge of managing the directory.");
274 }
275 return Ok(());
276 }
277 }
278
279 if !logged {
280 logged = true;
281 if bootstrapped {
282 info!("Another process is managing the directory. We'll use its cache.");
283 } else {
284 info!("Another process is bootstrapping the directory. Waiting till it finishes or exits.");
285 }
286 }
287
288 // We don't own the lock. Somebody else owns the cache. They
289 // should be updating it. Wait a bit, then try again.
290 let pause = if bootstrapped {
291 std::time::Duration::new(120, 0)
292 } else {
293 std::time::Duration::new(5, 0)
294 };
295 runtime.sleep(pause).await;
296 // TODO: instead of loading the whole thing we should have a
297 // database entry that says when the last update was, or use
298 // our state functions.
299 {
300 let dirmgr = upgrade_weak_ref(weak)?;
301 trace!("Trying to load from the directory cache");
302 if dirmgr.load_directory().await? {
303 // Successfully loaded a bootstrapped directory.
304 if let Some(send_done) = on_complete.take() {
305 let _ = send_done.send(());
306 }
307 if !bootstrapped {
308 info!("The directory is now bootstrapped.");
309 }
310 bootstrapped = true;
311 }
312 }
313 }
314 }
315
316 /// Try to fetch our directory info and keep it updated, indefinitely.
317 ///
318 /// If we have begin to have a bootstrapped directory, send a
319 /// message using `on_complete`.
download_forever( weak: Weak<Self>, mut on_complete: Option<oneshot::Sender<()>>, ) -> Result<()>320 async fn download_forever(
321 weak: Weak<Self>,
322 mut on_complete: Option<oneshot::Sender<()>>,
323 ) -> Result<()> {
324 let mut state: Box<dyn DirState> = Box::new(state::GetConsensusState::new(
325 Weak::clone(&weak),
326 CacheUsage::CacheOkay,
327 )?);
328
329 let (retry_config, runtime) = {
330 let dirmgr = upgrade_weak_ref(&weak)?;
331 (
332 *dirmgr.config.schedule().retry_bootstrap(),
333 dirmgr.runtime.clone(),
334 )
335 };
336
337 loop {
338 let mut usable = false;
339 let mut retry_delay = retry_config.schedule();
340
341 'retry_attempt: for _ in retry_config.attempts() {
342 let (newstate, recoverable_err) =
343 bootstrap::download(Weak::clone(&weak), state, &mut on_complete).await?;
344 state = newstate;
345
346 if let Some(err) = recoverable_err {
347 if state.is_ready(Readiness::Usable) {
348 usable = true;
349 info!("Unable to completely download a directory: {}. Nevertheless, the directory is usable, so we'll pause for now.", err);
350 break 'retry_attempt;
351 }
352
353 let delay = retry_delay.next_delay(&mut rand::thread_rng());
354 warn!(
355 "Unable to download a usable directory: {}. We will restart in {:?}.",
356 err, delay
357 );
358 runtime.sleep(delay).await;
359 state = state.reset()?;
360 } else {
361 info!("Directory is complete.");
362 usable = true;
363 break 'retry_attempt;
364 }
365 }
366
367 if !usable {
368 // we ran out of attempts.
369 warn!(
370 "We failed {} times to bootstrap a directory. We're going to give up.",
371 retry_config.n_attempts()
372 );
373 return Err(Error::CantAdvanceState);
374 } else {
375 // Report success, if appropriate.
376 if let Some(send_done) = on_complete.take() {
377 let _ = send_done.send(());
378 }
379 }
380
381 let reset_at = state.reset_time();
382 match reset_at {
383 Some(t) => runtime.sleep_until_wallclock(t).await,
384 None => return Ok(()),
385 }
386 state = state.reset()?;
387 }
388 }
389
390 /// Get a reference to the circuit manager, if we have one.
circmgr(&self) -> Result<Arc<CircMgr<R>>>391 fn circmgr(&self) -> Result<Arc<CircMgr<R>>> {
392 self.circmgr
393 .as_ref()
394 .map(Arc::clone)
395 .ok_or(Error::NoDownloadSupport)
396 }
397
398 /// Try to make this a directory manager with read-write access to its
399 /// storage.
400 ///
401 /// Return true if we got the lock, or if we already had it.
402 ///
403 /// Return false if another process has the lock
try_upgrade_to_readwrite(&self) -> Result<bool>404 fn try_upgrade_to_readwrite(&self) -> Result<bool> {
405 self.store
406 .lock()
407 .expect("Directory storage lock poisoned")
408 .upgrade_to_readwrite()
409 }
410
411 /// Return a reference to the store, if it is currently read-write.
store_if_rw(&self) -> Option<&Mutex<SqliteStore>>412 fn store_if_rw(&self) -> Option<&Mutex<SqliteStore>> {
413 let rw = !self
414 .store
415 .lock()
416 .expect("Directory storage lock poisoned")
417 .is_readonly();
418 // A race-condition is possible here, but I believe it's harmless.
419 if rw {
420 Some(&self.store)
421 } else {
422 None
423 }
424 }
425
426 /// Construct a DirMgr from a DirMgrConfig.
from_config( config: DirMgrConfig, runtime: R, circmgr: Option<Arc<CircMgr<R>>>, readonly: bool, ) -> Result<Self>427 fn from_config(
428 config: DirMgrConfig,
429 runtime: R,
430 circmgr: Option<Arc<CircMgr<R>>>,
431 readonly: bool,
432 ) -> Result<Self> {
433 let store = Mutex::new(config.open_sqlite_store(readonly)?);
434 let netdir = SharedMutArc::new();
435 let netdir_consensus_changed = AtomicBool::new(false);
436 let netdir_descriptors_changed = AtomicBool::new(false);
437 let publisher = event::Publisher::new();
438 Ok(DirMgr {
439 config,
440 store,
441 netdir,
442 netdir_consensus_changed,
443 netdir_descriptors_changed,
444 publisher,
445 circmgr,
446 runtime,
447 })
448 }
449
450 /// Load the latest non-pending non-expired directory from the
451 /// cache, if it is newer than the one we have.
452 ///
453 /// Return false if there is no such consensus.
load_directory(self: &Arc<Self>) -> Result<bool>454 async fn load_directory(self: &Arc<Self>) -> Result<bool> {
455 let state = state::GetConsensusState::new(Arc::downgrade(self), CacheUsage::CacheOnly)?;
456 let _ = bootstrap::load(Arc::clone(self), Box::new(state)).await?;
457
458 Ok(self.netdir.get().is_some())
459 }
460
461 /// Return an Arc handle to our latest directory, if we have one.
462 ///
463 /// This is a private method, since by the time anybody else has a
464 /// handle to a DirMgr, the NetDir should definitely be
465 /// bootstrapped.
opt_netdir(&self) -> Option<Arc<NetDir>>466 fn opt_netdir(&self) -> Option<Arc<NetDir>> {
467 self.netdir.get()
468 }
469
470 /// Return an Arc handle to our latest directory, if we have one.
471 // TODO: Add variants of this that make sure that it's up-to-date?
netdir(&self) -> Arc<NetDir>472 pub fn netdir(&self) -> Arc<NetDir> {
473 self.opt_netdir().expect("DirMgr was not bootstrapped!")
474 }
475
476 /// Return a new asynchronous stream about events taking place with
477 /// this directory manager.
478 ///
479 /// The caller must regularly process events from this stream to
480 /// prevent it from blocking.
events(&self) -> impl futures::Stream<Item = DirEvent>481 pub fn events(&self) -> impl futures::Stream<Item = DirEvent> {
482 self.publisher.subscribe()
483 }
484
485 /// Try to load the text of a single document described by `doc` from
486 /// storage.
text(&self, doc: &DocId) -> Result<Option<DocumentText>>487 pub fn text(&self, doc: &DocId) -> Result<Option<DocumentText>> {
488 use itertools::Itertools;
489 let mut result = HashMap::new();
490 let query = (*doc).into();
491 self.load_documents_into(&query, &mut result)?;
492 let item = result.into_iter().at_most_one().map_err(|_| {
493 Error::CacheCorruption("Found more than one entry in storage for given docid")
494 })?;
495 if let Some((docid, doctext)) = item {
496 if &docid != doc {
497 return Err(Error::CacheCorruption(
498 "Item from storage had incorrect docid.",
499 ));
500 }
501 Ok(Some(doctext))
502 } else {
503 Ok(None)
504 }
505 }
506
507 /// Load the text for a collection of documents.
508 ///
509 /// If many of the documents have the same type, this can be more
510 /// efficient than calling [`text`](Self::text).
texts<T>(&self, docs: T) -> Result<HashMap<DocId, DocumentText>> where T: IntoIterator<Item = DocId>,511 pub fn texts<T>(&self, docs: T) -> Result<HashMap<DocId, DocumentText>>
512 where
513 T: IntoIterator<Item = DocId>,
514 {
515 let partitioned = docid::partition_by_type(docs);
516 let mut result = HashMap::new();
517 for (_, query) in partitioned.into_iter() {
518 self.load_documents_into(&query, &mut result)?;
519 }
520 Ok(result)
521 }
522
523 /// If the consensus has changed, notify any subscribers.
524 // TODO: I don't like all the different places in `bootstrap`
525 // where we have to call this function. Can we simplify it or
526 // clean it up somehow? Maybe we can build some kind of intelligence into
527 // shared_ref?
notify(&self)528 pub(crate) async fn notify(&self) {
529 if self.netdir_consensus_changed.swap(false, Ordering::SeqCst) {
530 self.publisher.send(DirEvent::NewConsensus).await;
531 }
532 if self
533 .netdir_descriptors_changed
534 .swap(false, Ordering::SeqCst)
535 {
536 self.publisher.send(DirEvent::NewDescriptors).await;
537 }
538 }
539
540 /// Load all the documents for a single DocumentQuery from the store.
load_documents_into( &self, query: &DocQuery, result: &mut HashMap<DocId, DocumentText>, ) -> Result<()>541 fn load_documents_into(
542 &self,
543 query: &DocQuery,
544 result: &mut HashMap<DocId, DocumentText>,
545 ) -> Result<()> {
546 use DocQuery::*;
547 let store = self.store.lock().expect("Directory storage lock poisoned");
548 match query {
549 LatestConsensus {
550 flavor,
551 cache_usage,
552 } => {
553 if *cache_usage == CacheUsage::MustDownload {
554 // Do nothing: we don't want a cached consensus.
555 trace!("MustDownload is set; not checking for cached consensus.");
556 } else if let Some(c) =
557 store.latest_consensus(*flavor, cache_usage.pending_requirement())?
558 {
559 trace!("Found a reasonable consensus in the cache");
560 let id = DocId::LatestConsensus {
561 flavor: *flavor,
562 cache_usage: *cache_usage,
563 };
564 result.insert(id, c.into());
565 }
566 }
567 AuthCert(ids) => result.extend(
568 store
569 .authcerts(ids)?
570 .into_iter()
571 .map(|(id, c)| (DocId::AuthCert(id), DocumentText::from_string(c))),
572 ),
573 Microdesc(digests) => {
574 result.extend(
575 store
576 .microdescs(digests)?
577 .into_iter()
578 .map(|(id, md)| (DocId::Microdesc(id), DocumentText::from_string(md))),
579 );
580 }
581 #[cfg(feature = "routerdesc")]
582 RouterDesc(digests) => result.extend(
583 store
584 .routerdescs(digests)?
585 .into_iter()
586 .map(|(id, rd)| (DocId::RouterDesc(id), DocumentText::from_string(rd))),
587 ),
588 }
589 Ok(())
590 }
591
592 /// Convert a DocQuery into a set of ClientRequests, suitable for sending
593 /// to a directory cache.
594 ///
595 /// This conversion has to be a function of the dirmgr, since it may
596 /// require knowledge about our current state.
query_into_requests(&self, q: DocQuery) -> Result<Vec<ClientRequest>>597 fn query_into_requests(&self, q: DocQuery) -> Result<Vec<ClientRequest>> {
598 let mut res = Vec::new();
599 for q in q.split_for_download() {
600 match q {
601 DocQuery::LatestConsensus { flavor, .. } => {
602 res.push(self.make_consensus_request(flavor)?);
603 }
604 DocQuery::AuthCert(ids) => {
605 res.push(ClientRequest::AuthCert(ids.into_iter().collect()));
606 }
607 DocQuery::Microdesc(ids) => {
608 res.push(ClientRequest::Microdescs(ids.into_iter().collect()));
609 }
610 #[cfg(feature = "routerdesc")]
611 DocQuery::RouterDesc(ids) => {
612 res.push(ClientRequest::RouterDescs(ids.into_iter().collect()));
613 }
614 }
615 }
616 Ok(res)
617 }
618
619 /// Construct an appropriate ClientRequest to download a consensus
620 /// of the given flavor.
make_consensus_request(&self, flavor: ConsensusFlavor) -> Result<ClientRequest>621 fn make_consensus_request(&self, flavor: ConsensusFlavor) -> Result<ClientRequest> {
622 #![allow(clippy::unnecessary_wraps)]
623 let mut request = tor_dirclient::request::ConsensusRequest::new(flavor);
624
625 let r = self.store.lock().expect("Directory storage lock poisoned");
626 match r.latest_consensus_meta(flavor) {
627 Ok(Some(meta)) => {
628 request.set_last_consensus_date(meta.lifetime().valid_after());
629 request.push_old_consensus_digest(*meta.sha3_256_of_signed());
630 }
631 Ok(None) => {}
632 Err(e) => {
633 warn!("Error loading directory metadata: {}", e);
634 }
635 }
636
637 Ok(ClientRequest::Consensus(request))
638 }
639
640 /// Given a request we sent and the response we got from a
641 /// directory server, see whether we should expand that response
642 /// into "something larger".
643 ///
644 /// Currently, this handles expanding consensus diffs, and nothing
645 /// else. We do it at this stage of our downloading operation
646 /// because it requires access to the store.
expand_response_text(&self, req: &ClientRequest, text: String) -> Result<String>647 fn expand_response_text(&self, req: &ClientRequest, text: String) -> Result<String> {
648 if let ClientRequest::Consensus(req) = req {
649 if tor_consdiff::looks_like_diff(&text) {
650 if let Some(old_d) = req.old_consensus_digests().next() {
651 let db_val = {
652 let s = self.store.lock().expect("Directory storage lock poisoned");
653 s.consensus_by_sha3_digest_of_signed_part(old_d)?
654 };
655 if let Some((old_consensus, meta)) = db_val {
656 info!("Applying a consensus diff");
657 let new_consensus = tor_consdiff::apply_diff(
658 old_consensus.as_str()?,
659 &text,
660 Some(*meta.sha3_256_of_signed()),
661 )?;
662 new_consensus.check_digest()?;
663 return Ok(new_consensus.to_string());
664 }
665 }
666 return Err(Error::Unwanted(
667 "Received a consensus diff we did not ask for",
668 ));
669 }
670 }
671 Ok(text)
672 }
673 }
674
675 /// A degree of readiness for a given directory state object.
676 #[derive(Debug, Copy, Clone)]
677 enum Readiness {
678 /// There is no more information to download.
679 Complete,
680 /// There is more information to download, but we don't need to
681 Usable,
682 }
683
684 /// A "state" object used to represent our progress in downloading a
685 /// directory.
686 ///
687 /// These state objects are not meant to know about the network, or
688 /// how to fetch documents at all. Instead, they keep track of what
689 /// information they are missing, and what to do when they get that
690 /// information.
691 ///
692 /// Every state object has two possible transitions: "resetting", and
693 /// "advancing". Advancing happens when a state has no more work to
694 /// do, and needs to transform into a different kind of object.
695 /// Resetting happens when this state needs to go back to an initial
696 /// state in order to start over -- either because of an error or
697 /// because the information it has downloaded is no longer timely.
698 trait DirState: Send {
699 /// Return a human-readable description of this state.
describe(&self) -> String700 fn describe(&self) -> String;
701 /// Return a list of the documents we're missing.
702 ///
703 /// If every document on this list were to be loaded or downloaded, then
704 /// the state should either become "ready to advance", or "complete."
705 ///
706 /// This list should never _grow_ on a given state; only advancing
707 /// or resetting the state should add new DocIds that weren't
708 /// there before.
missing_docs(&self) -> Vec<DocId>709 fn missing_docs(&self) -> Vec<DocId>;
710 /// Describe whether this state has reached `ready` status.
is_ready(&self, ready: Readiness) -> bool711 fn is_ready(&self, ready: Readiness) -> bool;
712 /// Return true if this state can advance to another state via its
713 /// `advance` method.
can_advance(&self) -> bool714 fn can_advance(&self) -> bool;
715 /// Add one or more documents from our cache; returns 'true' if there
716 /// was any change in this state.
717 ///
718 /// If `storage` is provided, then we should write any state changes into
719 /// it. (We don't read from it in this method.)
add_from_cache( &mut self, docs: HashMap<DocId, DocumentText>, storage: Option<&Mutex<SqliteStore>>, ) -> Result<bool>720 fn add_from_cache(
721 &mut self,
722 docs: HashMap<DocId, DocumentText>,
723 storage: Option<&Mutex<SqliteStore>>,
724 ) -> Result<bool>;
725
726 /// Add information that we have just downloaded to this state; returns
727 /// 'true' if there as any change in this state.
728 ///
729 /// This method receives a copy of the original request, and
730 /// should reject any documents that do not pertain to it.
731 ///
732 /// If `storage` is provided, then we should write any accepted documents
733 /// into `storage` so they can be saved in a cache.
734 // TODO: It might be good to say "there was a change but also an
735 // error" in this API if possible.
736 // TODO: It would be better to not have this function be async,
737 // once the `must_not_suspend` lint is stable.
738 // TODO: this should take a "DirSource" too.
add_from_download( &mut self, text: &str, request: &ClientRequest, storage: Option<&Mutex<SqliteStore>>, ) -> Result<bool>739 fn add_from_download(
740 &mut self,
741 text: &str,
742 request: &ClientRequest,
743 storage: Option<&Mutex<SqliteStore>>,
744 ) -> Result<bool>;
745 /// Return a configuration for attempting downloads.
dl_config(&self) -> Result<DownloadSchedule>746 fn dl_config(&self) -> Result<DownloadSchedule>;
747 /// If possible, advance to the next state.
advance(self: Box<Self>) -> Result<Box<dyn DirState>>748 fn advance(self: Box<Self>) -> Result<Box<dyn DirState>>;
749 /// Return a time (if any) when downloaders should stop attempting to
750 /// advance this state, and should instead reset it and start over.
reset_time(&self) -> Option<SystemTime>751 fn reset_time(&self) -> Option<SystemTime>;
752 /// Reset this state and start over.
reset(self: Box<Self>) -> Result<Box<dyn DirState>>753 fn reset(self: Box<Self>) -> Result<Box<dyn DirState>>;
754 }
755
756 /// Try to upgrade a weak reference to a DirMgr, and give an error on
757 /// failure.
upgrade_weak_ref<T>(weak: &Weak<T>) -> Result<Arc<T>>758 fn upgrade_weak_ref<T>(weak: &Weak<T>) -> Result<Arc<T>> {
759 Weak::upgrade(weak).ok_or(Error::ManagerDropped)
760 }
761
762 #[cfg(test)]
763 mod test {
764 #![allow(clippy::unwrap_used)]
765 use super::*;
766 use crate::docmeta::{AuthCertMeta, ConsensusMeta};
767 use std::time::Duration;
768 use tempfile::TempDir;
769 use tor_netdoc::doc::{authcert::AuthCertKeyIds, netstatus::Lifetime};
770
new_mgr<R: Runtime>(runtime: R) -> (TempDir, DirMgr<R>)771 pub(crate) fn new_mgr<R: Runtime>(runtime: R) -> (TempDir, DirMgr<R>) {
772 let dir = TempDir::new().unwrap();
773 let config = DirMgrConfig::builder()
774 .cache_path(dir.path())
775 .build()
776 .unwrap();
777 let dirmgr = DirMgr::from_config(config, runtime, None, false).unwrap();
778
779 (dir, dirmgr)
780 }
781
782 #[test]
failing_accessors()783 fn failing_accessors() {
784 tor_rtcompat::test_with_one_runtime!(|rt| async {
785 let (_tempdir, mgr) = new_mgr(rt);
786
787 assert!(mgr.circmgr().is_err());
788 assert!(mgr.opt_netdir().is_none());
789 });
790 }
791
792 #[test]
load_and_store_internals()793 fn load_and_store_internals() {
794 tor_rtcompat::test_with_one_runtime!(|rt| async {
795 let (_tempdir, mgr) = new_mgr(rt);
796
797 let now = SystemTime::now();
798 let tomorrow = now + Duration::from_secs(86400);
799 let later = tomorrow + Duration::from_secs(86400);
800
801 // Seed the storage with a bunch of junk.
802 let d1 = [5_u8; 32];
803 let d2 = [7; 32];
804 let d3 = [42; 32];
805 let d4 = [99; 20];
806 let d5 = [12; 20];
807 let certid1 = AuthCertKeyIds {
808 id_fingerprint: d4.into(),
809 sk_fingerprint: d5.into(),
810 };
811 let certid2 = AuthCertKeyIds {
812 id_fingerprint: d5.into(),
813 sk_fingerprint: d4.into(),
814 };
815
816 {
817 let mut store = mgr.store.lock().unwrap();
818
819 store
820 .store_microdescs(
821 vec![
822 ("Fake micro 1", &d1),
823 ("Fake micro 2", &d2),
824 ("Fake micro 3", &d3),
825 ],
826 now,
827 )
828 .unwrap();
829
830 #[cfg(feature = "routerdesc")]
831 store
832 .store_routerdescs(vec![("Fake rd1", now, &d4), ("Fake rd2", now, &d5)])
833 .unwrap();
834
835 store
836 .store_authcerts(&[
837 (
838 AuthCertMeta::new(certid1, now, tomorrow),
839 "Fake certificate one",
840 ),
841 (
842 AuthCertMeta::new(certid2, now, tomorrow),
843 "Fake certificate two",
844 ),
845 ])
846 .unwrap();
847
848 let cmeta = ConsensusMeta::new(
849 Lifetime::new(now, tomorrow, later).unwrap(),
850 [102; 32],
851 [103; 32],
852 );
853 store
854 .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
855 .unwrap();
856 }
857
858 // Try to get it with text().
859 let t1 = mgr.text(&DocId::Microdesc(d1)).unwrap().unwrap();
860 assert_eq!(t1.as_str(), Ok("Fake micro 1"));
861
862 let t2 = mgr
863 .text(&DocId::LatestConsensus {
864 flavor: ConsensusFlavor::Microdesc,
865 cache_usage: CacheUsage::CacheOkay,
866 })
867 .unwrap()
868 .unwrap();
869 assert_eq!(t2.as_str(), Ok("Fake consensus!"));
870
871 let t3 = mgr.text(&DocId::Microdesc([255; 32])).unwrap();
872 assert!(t3.is_none());
873
874 // Now try texts()
875 let d_bogus = DocId::Microdesc([255; 32]);
876 let res = mgr
877 .texts(vec![
878 DocId::Microdesc(d2),
879 DocId::Microdesc(d3),
880 d_bogus,
881 DocId::AuthCert(certid2),
882 #[cfg(feature = "routerdesc")]
883 DocId::RouterDesc(d5),
884 ])
885 .unwrap();
886 assert_eq!(
887 res.get(&DocId::Microdesc(d2)).unwrap().as_str(),
888 Ok("Fake micro 2")
889 );
890 assert_eq!(
891 res.get(&DocId::Microdesc(d3)).unwrap().as_str(),
892 Ok("Fake micro 3")
893 );
894 assert!(res.get(&d_bogus).is_none());
895 assert_eq!(
896 res.get(&DocId::AuthCert(certid2)).unwrap().as_str(),
897 Ok("Fake certificate two")
898 );
899 #[cfg(feature = "routerdesc")]
900 assert_eq!(
901 res.get(&DocId::RouterDesc(d5)).unwrap().as_str(),
902 Ok("Fake rd2")
903 );
904 });
905 }
906
907 #[test]
make_consensus_request()908 fn make_consensus_request() {
909 tor_rtcompat::test_with_one_runtime!(|rt| async {
910 let (_tempdir, mgr) = new_mgr(rt);
911
912 let now = SystemTime::now();
913 let tomorrow = now + Duration::from_secs(86400);
914 let later = tomorrow + Duration::from_secs(86400);
915
916 // Try with an empty store.
917 let req = mgr
918 .make_consensus_request(ConsensusFlavor::Microdesc)
919 .unwrap();
920 match req {
921 ClientRequest::Consensus(r) => {
922 assert_eq!(r.old_consensus_digests().count(), 0);
923 assert_eq!(r.last_consensus_date(), None);
924 }
925 _ => panic!("Wrong request type"),
926 }
927
928 // Add a fake consensus record.
929 let d_prev = [42; 32];
930 {
931 let mut store = mgr.store.lock().unwrap();
932
933 let cmeta = ConsensusMeta::new(
934 Lifetime::new(now, tomorrow, later).unwrap(),
935 d_prev,
936 [103; 32],
937 );
938 store
939 .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
940 .unwrap();
941 }
942
943 // Now try again.
944 let req = mgr
945 .make_consensus_request(ConsensusFlavor::Microdesc)
946 .unwrap();
947 match req {
948 ClientRequest::Consensus(r) => {
949 let ds: Vec<_> = r.old_consensus_digests().collect();
950 assert_eq!(ds.len(), 1);
951 assert_eq!(ds[0], &d_prev);
952 assert_eq!(r.last_consensus_date(), Some(now));
953 }
954 _ => panic!("Wrong request type"),
955 }
956 });
957 }
958
959 #[test]
make_other_requests()960 fn make_other_requests() {
961 tor_rtcompat::test_with_one_runtime!(|rt| async {
962 use rand::Rng;
963 let (_tempdir, mgr) = new_mgr(rt);
964
965 let certid1 = AuthCertKeyIds {
966 id_fingerprint: [99; 20].into(),
967 sk_fingerprint: [100; 20].into(),
968 };
969 let mut rng = rand::thread_rng();
970 #[cfg(feature = "routerdesc")]
971 let rd_ids: Vec<[u8; 20]> = (0..1000).map(|_| rng.gen()).collect();
972 let md_ids: Vec<[u8; 32]> = (0..1000).map(|_| rng.gen()).collect();
973
974 // Try an authcert.
975 let query = DocQuery::AuthCert(vec![certid1]);
976 let reqs = mgr.query_into_requests(query).unwrap();
977 assert_eq!(reqs.len(), 1);
978 let req = &reqs[0];
979 if let ClientRequest::AuthCert(r) = req {
980 assert_eq!(r.keys().next(), Some(&certid1));
981 } else {
982 panic!();
983 }
984
985 // Try a bunch of mds.
986 let query = DocQuery::Microdesc(md_ids);
987 let reqs = mgr.query_into_requests(query).unwrap();
988 assert_eq!(reqs.len(), 2);
989 assert!(matches!(reqs[0], ClientRequest::Microdescs(_)));
990
991 // Try a bunch of rds.
992 #[cfg(feature = "routerdesc")]
993 {
994 let query = DocQuery::RouterDesc(rd_ids);
995 let reqs = mgr.query_into_requests(query).unwrap();
996 assert_eq!(reqs.len(), 2);
997 assert!(matches!(reqs[0], ClientRequest::RouterDescs(_)));
998 }
999 });
1000 }
1001
1002 #[test]
expand_response()1003 fn expand_response() {
1004 tor_rtcompat::test_with_one_runtime!(|rt| async {
1005 let (_tempdir, mgr) = new_mgr(rt);
1006
1007 // Try a simple request: nothing should happen.
1008 let q = DocId::Microdesc([99; 32]).into();
1009 let r = &mgr.query_into_requests(q).unwrap()[0];
1010 let expanded = mgr.expand_response_text(r, "ABC".to_string());
1011 assert_eq!(&expanded.unwrap(), "ABC");
1012
1013 // Try a consensus response that doesn't look like a diff in
1014 // response to a query that doesn't ask for one.
1015 let latest_id = DocId::LatestConsensus {
1016 flavor: ConsensusFlavor::Microdesc,
1017 cache_usage: CacheUsage::CacheOkay,
1018 };
1019 let q: DocQuery = latest_id.into();
1020 let r = &mgr.query_into_requests(q.clone()).unwrap()[0];
1021 let expanded = mgr.expand_response_text(r, "DEF".to_string());
1022 assert_eq!(&expanded.unwrap(), "DEF");
1023
1024 // Now stick some metadata and a string into the storage so that
1025 // we can ask for a diff.
1026 {
1027 let mut store = mgr.store.lock().unwrap();
1028 let now = SystemTime::now();
1029 let day = Duration::from_secs(86400);
1030 let d_in = [0x99; 32]; // This one, we can fake.
1031 let cmeta = ConsensusMeta::new(
1032 Lifetime::new(now, now + day, now + 2 * day).unwrap(),
1033 d_in,
1034 d_in,
1035 );
1036 store
1037 .store_consensus(
1038 &cmeta,
1039 ConsensusFlavor::Microdesc,
1040 false,
1041 "line 1\nline2\nline 3\n",
1042 )
1043 .unwrap();
1044 }
1045
1046 // Try expanding something that isn't a consensus, even if we'd like
1047 // one.
1048 let r = &mgr.query_into_requests(q).unwrap()[0];
1049 let expanded = mgr.expand_response_text(r, "hello".to_string());
1050 assert_eq!(&expanded.unwrap(), "hello");
1051
1052 // Finally, try "expanding" a diff (by applying it and checking the digest.
1053 let diff = "network-status-diff-version 1
1054 hash 9999999999999999999999999999999999999999999999999999999999999999 8382374ca766873eb0d2530643191c6eaa2c5e04afa554cbac349b5d0592d300
1055 2c
1056 replacement line
1057 .
1058 ".to_string();
1059 let expanded = mgr.expand_response_text(r, diff);
1060
1061 assert_eq!(expanded.unwrap(), "line 1\nreplacement line\nline 3\n");
1062
1063 // If the digest is wrong, that should get rejected.
1064 let diff = "network-status-diff-version 1
1065 hash 9999999999999999999999999999999999999999999999999999999999999999 9999999999999999999999999999999999999999999999999999999999999999
1066 2c
1067 replacement line
1068 .
1069 ".to_string();
1070 let expanded = mgr.expand_response_text(r, diff);
1071 assert!(expanded.is_err());
1072 });
1073 }
1074 }
1075