1 //! Net document storage backed by sqlite3.
2 //!
3 //! We store most objects in sqlite tables, except for very large ones,
4 //! which we store as "blob" files in a separate directory.
5 
6 use crate::docmeta::{AuthCertMeta, ConsensusMeta};
7 use crate::storage::InputString;
8 use crate::{Error, Result};
9 
10 use tor_netdoc::doc::authcert::AuthCertKeyIds;
11 use tor_netdoc::doc::microdesc::MdDigest;
12 use tor_netdoc::doc::netstatus::{ConsensusFlavor, Lifetime};
13 #[cfg(feature = "routerdesc")]
14 use tor_netdoc::doc::routerdesc::RdDigest;
15 
16 use std::collections::HashMap;
17 use std::convert::TryInto;
18 use std::path::{self, Path, PathBuf};
19 use std::time::SystemTime;
20 
21 use rusqlite::{params, OpenFlags, OptionalExtension, Transaction};
22 use time::OffsetDateTime;
23 use tracing::trace;
24 
25 #[cfg(target_family = "unix")]
26 use std::os::unix::fs::DirBuilderExt;
27 
28 /// Local directory cache using a Sqlite3 connection.
29 pub(crate) struct SqliteStore {
30     /// Connection to the sqlite3 database.
31     conn: rusqlite::Connection,
32     /// Location for the sqlite3 database; used to reopen it.
33     sql_path: Option<PathBuf>,
34     /// Location to store blob files.
35     path: PathBuf,
36     /// Lockfile to prevent concurrent write attempts from different
37     /// processes.
38     ///
39     /// If this is None we aren't using a lockfile.  Watch out!
40     ///
41     /// (sqlite supports that with connection locking, but we want to
42     /// be a little more coarse-grained here)
43     lockfile: Option<fslock::LockFile>,
44 }
45 
46 impl SqliteStore {
47     /// Construct or open a new SqliteStore at some location on disk.
48     /// The provided location must be a directory, or a possible
49     /// location for a directory: the directory will be created if
50     /// necessary.
51     ///
52     /// If readonly is true, the result will be a read-only store.
53     /// Otherwise, when readonly is false, the result may be
54     /// read-only or read-write, depending on whether we can acquire
55     /// the lock.
56     ///
57     /// # Limitations:
58     ///
59     /// The file locking that we use to ensure that only one dirmgr is
60     /// writing to a given storage directory at a time is currently
61     /// _per process_. Therefore, you might get unexpected results if
62     /// two SqliteStores are created in the same process with the
63     /// path.
from_path<P: AsRef<Path>>(path: P, mut readonly: bool) -> Result<Self>64     pub(crate) fn from_path<P: AsRef<Path>>(path: P, mut readonly: bool) -> Result<Self> {
65         let path = path.as_ref();
66         let sqlpath = path.join("dir.sqlite3");
67         let blobpath = path.join("dir_blobs/");
68         let lockpath = path.join("dir.lock");
69 
70         if !readonly {
71             let mut builder = std::fs::DirBuilder::new();
72             #[cfg(target_family = "unix")]
73             builder.mode(0o700);
74             builder.recursive(true).create(&blobpath).map_err(|err| {
75                 Error::StorageError(format!("Creating directory at {:?}: {}", &blobpath, err))
76             })?;
77         }
78 
79         let mut lockfile = fslock::LockFile::open(&lockpath)?;
80         if !readonly && !lockfile.try_lock()? {
81             readonly = true; // we couldn't get the lock!
82         };
83         let flags = if readonly {
84             OpenFlags::SQLITE_OPEN_READ_ONLY
85         } else {
86             OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE
87         };
88         let conn = rusqlite::Connection::open_with_flags(&sqlpath, flags)?;
89         let mut store = SqliteStore::from_conn(conn, &blobpath)?;
90         store.sql_path = Some(sqlpath);
91         store.lockfile = Some(lockfile);
92         Ok(store)
93     }
94 
95     /// Construct a new SqliteStore from a database connection and a location
96     /// for blob files.
97     ///
98     /// Used for testing with a memory-backed database.
from_conn<P>(conn: rusqlite::Connection, path: P) -> Result<Self> where P: AsRef<Path>,99     pub(crate) fn from_conn<P>(conn: rusqlite::Connection, path: P) -> Result<Self>
100     where
101         P: AsRef<Path>,
102     {
103         let path = path.as_ref().to_path_buf();
104         let mut result = SqliteStore {
105             conn,
106             path,
107             lockfile: None,
108             sql_path: None,
109         };
110 
111         result.check_schema()?;
112 
113         Ok(result)
114     }
115 
116     /// Return true if this store is opened in read-only mode.
is_readonly(&self) -> bool117     pub(crate) fn is_readonly(&self) -> bool {
118         match &self.lockfile {
119             Some(f) => !f.owns_lock(),
120             None => false,
121         }
122     }
123 
124     /// Try to upgrade from a read-only connection to a read-write connection.
125     ///
126     /// Return true on success; false if another process had the lock.
upgrade_to_readwrite(&mut self) -> Result<bool>127     pub(crate) fn upgrade_to_readwrite(&mut self) -> Result<bool> {
128         if self.is_readonly() && self.sql_path.is_some() {
129             let lf = self
130                 .lockfile
131                 .as_mut()
132                 .expect("No lockfile open; cannot upgrade to read-write storage");
133             if !lf.try_lock()? {
134                 // Somebody else has the lock.
135                 return Ok(false);
136             }
137             // Unwrap should be safe due to parent `.is_some()` check
138             #[allow(clippy::unwrap_used)]
139             match rusqlite::Connection::open(self.sql_path.as_ref().unwrap()) {
140                 Ok(conn) => {
141                     self.conn = conn;
142                 }
143                 Err(e) => {
144                     let _ignore = lf.unlock();
145                     return Err(e.into());
146                 }
147             }
148         }
149         Ok(true)
150     }
151 
152     /// Check whether this database has a schema format we can read, and
153     /// install or upgrade the schema if necessary.
check_schema(&mut self) -> Result<()>154     fn check_schema(&mut self) -> Result<()> {
155         let tx = self.conn.transaction()?;
156         let db_n_tables: u32 = tx.query_row(
157             "SELECT COUNT(name) FROM sqlite_master
158              WHERE type='table'
159              AND name NOT LIKE 'sqlite_%'",
160             [],
161             |row| row.get(0),
162         )?;
163         let db_exists = db_n_tables > 0;
164 
165         if !db_exists {
166             tx.execute_batch(INSTALL_V0_SCHEMA)?;
167             tx.execute_batch(UPDATE_SCHEMA_V0_TO_V1)?;
168             tx.commit()?;
169             return Ok(());
170         }
171 
172         let (version, readable_by): (u32, u32) = tx.query_row(
173             "SELECT version, readable_by FROM TorSchemaMeta
174              WHERE name = 'TorDirStorage'",
175             [],
176             |row| Ok((row.get(0)?, row.get(1)?)),
177         )?;
178 
179         if version < SCHEMA_VERSION {
180             // Update the schema.
181             tx.execute_batch(UPDATE_SCHEMA_V0_TO_V1)?;
182             tx.commit()?;
183             return Ok(());
184         } else if readable_by > SCHEMA_VERSION {
185             return Err(Error::UnrecognizedSchema);
186         }
187 
188         // rolls back the transaction, but nothing was done.
189         Ok(())
190     }
191 
192     /// Delete all completely-expired objects from the database.
193     ///
194     /// This is pretty conservative, and only removes things that are
195     /// definitely past their good-by date.
expire_all(&mut self) -> Result<()>196     pub(crate) fn expire_all(&mut self) -> Result<()> {
197         let tx = self.conn.transaction()?;
198         let expired_blobs: Vec<String> = {
199             let mut stmt = tx.prepare(FIND_EXPIRED_EXTDOCS)?;
200             let names = stmt
201                 .query_map([], |row| row.get::<_, String>(0))?
202                 .filter_map(std::result::Result::ok)
203                 .collect();
204             names
205         };
206 
207         tx.execute(DROP_OLD_EXTDOCS, [])?;
208         tx.execute(DROP_OLD_MICRODESCS, [])?;
209         tx.execute(DROP_OLD_AUTHCERTS, [])?;
210         tx.execute(DROP_OLD_CONSENSUSES, [])?;
211         tx.execute(DROP_OLD_ROUTERDESCS, [])?;
212         tx.commit()?;
213         for name in expired_blobs {
214             let fname = self.blob_fname(name);
215             if let Ok(fname) = fname {
216                 let _ignore = std::fs::remove_file(fname);
217             }
218         }
219         Ok(())
220     }
221 
222     /// Return the correct filename for a given blob, based on the filename
223     /// from the ExtDocs table.
blob_fname<P>(&self, path: P) -> Result<PathBuf> where P: AsRef<Path>,224     fn blob_fname<P>(&self, path: P) -> Result<PathBuf>
225     where
226         P: AsRef<Path>,
227     {
228         let path = path.as_ref();
229         if !path
230             .components()
231             .all(|c| matches!(c, path::Component::Normal(_)))
232         {
233             return Err(Error::CacheCorruption("Invalid path in database"));
234         }
235 
236         let mut result = self.path.clone();
237         result.push(path);
238         Ok(result)
239     }
240 
241     /// Read a blob from disk, mapping it if possible.
read_blob<P>(&self, path: P) -> Result<InputString> where P: AsRef<Path>,242     fn read_blob<P>(&self, path: P) -> Result<InputString>
243     where
244         P: AsRef<Path>,
245     {
246         let path = path.as_ref();
247         let full_path = self.blob_fname(path)?;
248         InputString::load(&full_path).map_err(|err| {
249             Error::StorageError(format!(
250                 "Loading blob {:?} from storage at {:?}: {}",
251                 path, full_path, err
252             ))
253         })
254     }
255 
256     /// Write a file to disk as a blob, and record it in the ExtDocs table.
257     ///
258     /// Return a SavedBlobHandle that describes where the blob is, and which
259     /// can be used either to commit the blob or delete it.
save_blob_internal( &mut self, contents: &[u8], doctype: &str, dtype: &str, digest: &[u8], expires: OffsetDateTime, ) -> Result<SavedBlobHandle<'_>>260     fn save_blob_internal(
261         &mut self,
262         contents: &[u8],
263         doctype: &str,
264         dtype: &str,
265         digest: &[u8],
266         expires: OffsetDateTime,
267     ) -> Result<SavedBlobHandle<'_>> {
268         let digest = hex::encode(digest);
269         let digeststr = format!("{}-{}", dtype, digest);
270         let fname = format!("{}:{}", doctype, digeststr);
271         let full_path = self.blob_fname(&fname)?;
272 
273         let unlinker = Unlinker::new(&full_path);
274         std::fs::write(full_path, contents)?;
275 
276         let tx = self.conn.unchecked_transaction()?;
277         tx.execute(INSERT_EXTDOC, params![digeststr, expires, dtype, fname])?;
278 
279         Ok(SavedBlobHandle {
280             tx,
281             fname,
282             digeststr,
283             unlinker,
284         })
285     }
286 
287     /// Save a blob to disk and commit it.
288     #[cfg(test)]
save_blob( &mut self, contents: &[u8], doctype: &str, dtype: &str, digest: &[u8], expires: OffsetDateTime, ) -> Result<String>289     fn save_blob(
290         &mut self,
291         contents: &[u8],
292         doctype: &str,
293         dtype: &str,
294         digest: &[u8],
295         expires: OffsetDateTime,
296     ) -> Result<String> {
297         let h = self.save_blob_internal(contents, doctype, dtype, digest, expires)?;
298         let SavedBlobHandle {
299             tx,
300             digeststr,
301             fname,
302             unlinker,
303         } = h;
304         let _ = digeststr;
305         tx.commit()?;
306         unlinker.forget();
307         Ok(fname)
308     }
309 
310     /// Write a consensus to disk.
store_consensus( &mut self, cmeta: &ConsensusMeta, flavor: ConsensusFlavor, pending: bool, contents: &str, ) -> Result<()>311     pub(crate) fn store_consensus(
312         &mut self,
313         cmeta: &ConsensusMeta,
314         flavor: ConsensusFlavor,
315         pending: bool,
316         contents: &str,
317     ) -> Result<()> {
318         let lifetime = cmeta.lifetime();
319         let sha3_of_signed = cmeta.sha3_256_of_signed();
320         let sha3_of_whole = cmeta.sha3_256_of_whole();
321         let valid_after: OffsetDateTime = lifetime.valid_after().into();
322         let fresh_until: OffsetDateTime = lifetime.fresh_until().into();
323         let valid_until: OffsetDateTime = lifetime.valid_until().into();
324 
325         /// How long to keep a consensus around after it has expired
326         const CONSENSUS_LIFETIME: time::Duration = time::Duration::days(4);
327 
328         // After a few days have passed, a consensus is no good for
329         // anything at all, not even diffs.
330         let expires = valid_until + CONSENSUS_LIFETIME;
331 
332         let doctype = format!("con:{}", flavor.name());
333 
334         let h = self.save_blob_internal(
335             contents.as_bytes(),
336             &doctype,
337             "sha3-256",
338             &sha3_of_whole[..],
339             expires,
340         )?;
341         h.tx.execute(
342             INSERT_CONSENSUS,
343             params![
344                 valid_after,
345                 fresh_until,
346                 valid_until,
347                 flavor.name(),
348                 pending,
349                 hex::encode(&sha3_of_signed),
350                 h.digeststr
351             ],
352         )?;
353         h.tx.commit()?;
354         h.unlinker.forget();
355         Ok(())
356     }
357 
358     /// Return the information about the latest non-pending consensus,
359     /// including its valid-after time and digest.
latest_consensus_meta( &self, flavor: ConsensusFlavor, ) -> Result<Option<ConsensusMeta>>360     pub(crate) fn latest_consensus_meta(
361         &self,
362         flavor: ConsensusFlavor,
363     ) -> Result<Option<ConsensusMeta>> {
364         let mut stmt = self.conn.prepare(FIND_LATEST_CONSENSUS_META)?;
365         let mut rows = stmt.query(params![flavor.name()])?;
366         if let Some(row) = rows.next()? {
367             Ok(Some(cmeta_from_row(row)?))
368         } else {
369             Ok(None)
370         }
371     }
372 
373     /// Return the valid-after time for the latest non non-pending consensus,
374     #[cfg(test)]
375     // We should revise the tests to use latest_consensus_meta instead.
latest_consensus_time(&self, flavor: ConsensusFlavor) -> Result<Option<OffsetDateTime>>376     fn latest_consensus_time(&self, flavor: ConsensusFlavor) -> Result<Option<OffsetDateTime>> {
377         Ok(self
378             .latest_consensus_meta(flavor)?
379             .map(|m| m.lifetime().valid_after().into()))
380     }
381 
382     /// Load the latest consensus from disk.
383     ///
384     /// If `pending` is given, we will only return a consensus with
385     /// the given "pending" status.  (A pending consensus doesn't have
386     /// enough descriptors yet.)  If `pending_ok` is None, we'll
387     /// return a consensus with any pending status.
latest_consensus( &self, flavor: ConsensusFlavor, pending: Option<bool>, ) -> Result<Option<InputString>>388     pub(crate) fn latest_consensus(
389         &self,
390         flavor: ConsensusFlavor,
391         pending: Option<bool>,
392     ) -> Result<Option<InputString>> {
393         trace!(?flavor, ?pending, "Loading latest consensus from cache");
394         let rv: Option<(OffsetDateTime, OffsetDateTime, String)>;
395         rv = match pending {
396             None => self
397                 .conn
398                 .query_row(FIND_CONSENSUS, params![flavor.name()], |row| row.try_into())
399                 .optional()?,
400             Some(pending_val) => self
401                 .conn
402                 .query_row(
403                     FIND_CONSENSUS_P,
404                     params![pending_val, flavor.name()],
405                     |row| row.try_into(),
406                 )
407                 .optional()?,
408         };
409 
410         if let Some((_va, _vu, filename)) = rv {
411             self.read_blob(filename).map(Option::Some)
412         } else {
413             Ok(None)
414         }
415     }
416 
417     /// Try to read the consensus corresponding to the provided metadata object.
418     #[allow(unused)]
consensus_by_meta(&self, cmeta: &ConsensusMeta) -> Result<InputString>419     pub(crate) fn consensus_by_meta(&self, cmeta: &ConsensusMeta) -> Result<InputString> {
420         if let Some((text, _)) =
421             self.consensus_by_sha3_digest_of_signed_part(cmeta.sha3_256_of_signed())?
422         {
423             Ok(text)
424         } else {
425             Err(Error::CacheCorruption(
426                 "couldn't find a consensus we thought we had.",
427             ))
428         }
429     }
430 
431     /// Try to read the consensus whose SHA3-256 digests is the provided
432     /// value, and its metadata.
consensus_by_sha3_digest_of_signed_part( &self, d: &[u8; 32], ) -> Result<Option<(InputString, ConsensusMeta)>>433     pub(crate) fn consensus_by_sha3_digest_of_signed_part(
434         &self,
435         d: &[u8; 32],
436     ) -> Result<Option<(InputString, ConsensusMeta)>> {
437         let digest = hex::encode(d);
438         let mut stmt = self
439             .conn
440             .prepare(FIND_CONSENSUS_AND_META_BY_DIGEST_OF_SIGNED)?;
441         let mut rows = stmt.query(params![digest])?;
442         if let Some(row) = rows.next()? {
443             let meta = cmeta_from_row(row)?;
444             let fname: String = row.get(5)?;
445             let text = self.read_blob(&fname)?;
446             Ok(Some((text, meta)))
447         } else {
448             Ok(None)
449         }
450     }
451 
452     /// Mark the consensus generated from `cmeta` as no longer pending.
mark_consensus_usable(&mut self, cmeta: &ConsensusMeta) -> Result<()>453     pub(crate) fn mark_consensus_usable(&mut self, cmeta: &ConsensusMeta) -> Result<()> {
454         let d = hex::encode(cmeta.sha3_256_of_whole());
455         let digest = format!("sha3-256-{}", d);
456 
457         let tx = self.conn.transaction()?;
458         let n = tx.execute(MARK_CONSENSUS_NON_PENDING, params![digest])?;
459         trace!("Marked {} consensuses usable", n);
460         tx.commit()?;
461 
462         Ok(())
463     }
464 
465     /// Remove the consensus generated from `cmeta`.
466     #[allow(unused)]
delete_consensus(&mut self, cmeta: &ConsensusMeta) -> Result<()>467     pub(crate) fn delete_consensus(&mut self, cmeta: &ConsensusMeta) -> Result<()> {
468         let d = hex::encode(cmeta.sha3_256_of_whole());
469         let digest = format!("sha3-256-{}", d);
470 
471         // TODO: We should probably remove the blob as well, but for now
472         // this is enough.
473         let tx = self.conn.transaction()?;
474         tx.execute(REMOVE_CONSENSUS, params![digest])?;
475         tx.commit()?;
476 
477         Ok(())
478     }
479 
480     /// Save a list of authority certificates to the cache.
store_authcerts(&mut self, certs: &[(AuthCertMeta, &str)]) -> Result<()>481     pub(crate) fn store_authcerts(&mut self, certs: &[(AuthCertMeta, &str)]) -> Result<()> {
482         let tx = self.conn.transaction()?;
483         let mut stmt = tx.prepare(INSERT_AUTHCERT)?;
484         for (meta, content) in certs {
485             let ids = meta.key_ids();
486             let id_digest = hex::encode(ids.id_fingerprint.as_bytes());
487             let sk_digest = hex::encode(ids.sk_fingerprint.as_bytes());
488             let published: OffsetDateTime = meta.published().into();
489             let expires: OffsetDateTime = meta.expires().into();
490             stmt.execute(params![id_digest, sk_digest, published, expires, content])?;
491         }
492         stmt.finalize()?;
493         tx.commit()?;
494         Ok(())
495     }
496 
497     /// Read all of the specified authority certs from the cache.
authcerts( &self, certs: &[AuthCertKeyIds], ) -> Result<HashMap<AuthCertKeyIds, String>>498     pub(crate) fn authcerts(
499         &self,
500         certs: &[AuthCertKeyIds],
501     ) -> Result<HashMap<AuthCertKeyIds, String>> {
502         let mut result = HashMap::new();
503         // XXXX Do I need to get a transaction here for performance?
504         let mut stmt = self.conn.prepare(FIND_AUTHCERT)?;
505 
506         for ids in certs {
507             let id_digest = hex::encode(ids.id_fingerprint.as_bytes());
508             let sk_digest = hex::encode(ids.sk_fingerprint.as_bytes());
509             if let Some(contents) = stmt
510                 .query_row(params![id_digest, sk_digest], |row| row.get::<_, String>(0))
511                 .optional()?
512             {
513                 result.insert(*ids, contents);
514             }
515         }
516 
517         Ok(result)
518     }
519 
520     /// Read all the microdescriptors listed in `input` from the cache.
microdescs<'a, I>(&self, input: I) -> Result<HashMap<MdDigest, String>> where I: IntoIterator<Item = &'a MdDigest>,521     pub(crate) fn microdescs<'a, I>(&self, input: I) -> Result<HashMap<MdDigest, String>>
522     where
523         I: IntoIterator<Item = &'a MdDigest>,
524     {
525         let mut result = HashMap::new();
526         let mut stmt = self.conn.prepare(FIND_MD)?;
527 
528         // XXXX Should I speed this up with a transaction, or does it not
529         // matter for queries?
530         for md_digest in input.into_iter() {
531             let h_digest = hex::encode(md_digest);
532             if let Some(contents) = stmt
533                 .query_row(params![h_digest], |row| row.get::<_, String>(0))
534                 .optional()?
535             {
536                 result.insert(*md_digest, contents);
537             }
538         }
539 
540         Ok(result)
541     }
542 
543     /// Read all the microdescriptors listed in `input` from the cache.
544     ///
545     /// Only available when the `routerdesc` feature is present.
546     #[cfg(feature = "routerdesc")]
routerdescs<'a, I>(&self, input: I) -> Result<HashMap<RdDigest, String>> where I: IntoIterator<Item = &'a RdDigest>,547     pub(crate) fn routerdescs<'a, I>(&self, input: I) -> Result<HashMap<RdDigest, String>>
548     where
549         I: IntoIterator<Item = &'a RdDigest>,
550     {
551         let mut result = HashMap::new();
552         let mut stmt = self.conn.prepare(FIND_RD)?;
553 
554         // XXXX Should I speed this up with a transaction, or does it not
555         // matter for queries?
556         for rd_digest in input.into_iter() {
557             let h_digest = hex::encode(rd_digest);
558             if let Some(contents) = stmt
559                 .query_row(params![h_digest], |row| row.get::<_, String>(0))
560                 .optional()?
561             {
562                 result.insert(*rd_digest, contents);
563             }
564         }
565 
566         Ok(result)
567     }
568 
569     /// Update the `last-listed` time of every microdescriptor in
570     /// `input` to `when` or later.
update_microdescs_listed<'a, I>( &mut self, input: I, when: SystemTime, ) -> Result<()> where I: IntoIterator<Item = &'a MdDigest>,571     pub(crate) fn update_microdescs_listed<'a, I>(
572         &mut self,
573         input: I,
574         when: SystemTime,
575     ) -> Result<()>
576     where
577         I: IntoIterator<Item = &'a MdDigest>,
578     {
579         let tx = self.conn.transaction()?;
580         let mut stmt = tx.prepare(UPDATE_MD_LISTED)?;
581         let when: OffsetDateTime = when.into();
582 
583         for md_digest in input.into_iter() {
584             let h_digest = hex::encode(md_digest);
585             stmt.execute(params![when, h_digest])?;
586         }
587 
588         stmt.finalize()?;
589         tx.commit()?;
590         Ok(())
591     }
592 
593     /// Store every microdescriptor in `input` into the cache, and say that
594     /// it was last listed at `when`.
store_microdescs<'a, I>(&mut self, input: I, when: SystemTime) -> Result<()> where I: IntoIterator<Item = (&'a str, &'a MdDigest)>,595     pub(crate) fn store_microdescs<'a, I>(&mut self, input: I, when: SystemTime) -> Result<()>
596     where
597         I: IntoIterator<Item = (&'a str, &'a MdDigest)>,
598     {
599         let when: OffsetDateTime = when.into();
600 
601         let tx = self.conn.transaction()?;
602         let mut stmt = tx.prepare(INSERT_MD)?;
603 
604         for (content, md_digest) in input.into_iter() {
605             let h_digest = hex::encode(md_digest);
606             stmt.execute(params![h_digest, when, content])?;
607         }
608         stmt.finalize()?;
609         tx.commit()?;
610         Ok(())
611     }
612 
613     /// Store every router descriptors in `input` into the cache.
614     #[cfg(feature = "routerdesc")]
615     #[allow(unused)]
store_routerdescs<'a, I>(&mut self, input: I) -> Result<()> where I: IntoIterator<Item = (&'a str, SystemTime, &'a RdDigest)>,616     pub(crate) fn store_routerdescs<'a, I>(&mut self, input: I) -> Result<()>
617     where
618         I: IntoIterator<Item = (&'a str, SystemTime, &'a RdDigest)>,
619     {
620         let tx = self.conn.transaction()?;
621         let mut stmt = tx.prepare(INSERT_RD)?;
622 
623         for (content, when, rd_digest) in input.into_iter() {
624             let when: OffsetDateTime = when.into();
625             let h_digest = hex::encode(rd_digest);
626             stmt.execute(params![h_digest, when, content])?;
627         }
628         stmt.finalize()?;
629         tx.commit()?;
630         Ok(())
631     }
632 }
633 
634 /// Handle to a blob that we have saved to disk but not yet committed to
635 /// the database.
636 struct SavedBlobHandle<'a> {
637     /// Transaction we're using to add the blob to the ExtDocs table.
638     tx: Transaction<'a>,
639     /// Filename for the file, with respect to the the blob directory.
640     #[allow(unused)]
641     fname: String,
642     /// Declared digest string for this blob. Of the format
643     /// "digesttype-hexstr".
644     digeststr: String,
645     /// An 'unlinker' for the blob file.
646     unlinker: Unlinker,
647 }
648 
649 /// Handle to a file which we might have to delete.
650 ///
651 /// When this handle is dropped, the file gets deleted, unless you have
652 /// first called [`Unlinker::forget`].
653 struct Unlinker {
654     /// The location of the file to remove, or None if we shouldn't
655     /// remove it.
656     p: Option<PathBuf>,
657 }
658 impl Unlinker {
659     /// Make a new Unlinker for a given filename.
new<P: AsRef<Path>>(p: P) -> Self660     fn new<P: AsRef<Path>>(p: P) -> Self {
661         Unlinker {
662             p: Some(p.as_ref().to_path_buf()),
663         }
664     }
665     /// Forget about this unlinker, so that the corresponding file won't
666     /// get dropped.
forget(mut self)667     fn forget(mut self) {
668         self.p = None;
669     }
670 }
671 impl Drop for Unlinker {
drop(&mut self)672     fn drop(&mut self) {
673         if let Some(p) = self.p.take() {
674             let _ignore_err = std::fs::remove_file(p);
675         }
676     }
677 }
678 
679 /// Convert a hexadecimal sha3-256 digest from the database into an array.
digest_from_hex(s: &str) -> Result<[u8; 32]>680 fn digest_from_hex(s: &str) -> Result<[u8; 32]> {
681     hex::decode(s)?
682         .try_into()
683         .map_err(|_| Error::CacheCorruption("Invalid digest in database"))
684 }
685 
686 /// Convert a hexadecimal sha3-256 "digest string" as used in the
687 /// digest column from the database into an array.
digest_from_dstr(s: &str) -> Result<[u8; 32]>688 fn digest_from_dstr(s: &str) -> Result<[u8; 32]> {
689     if let Some(stripped) = s.strip_prefix("sha3-256-") {
690         hex::decode(stripped)?
691             .try_into()
692             .map_err(|_| Error::CacheCorruption("Invalid digest in database"))
693     } else {
694         Err(Error::CacheCorruption("Invalid digest in database"))
695     }
696 }
697 
698 /// Create a ConsensusMeta from a `Row` returned by one of
699 /// `FIND_LATEST_CONSENSUS_META` or `FIND_CONSENSUS_AND_META_BY_DIGEST`.
cmeta_from_row(row: &rusqlite::Row<'_>) -> Result<ConsensusMeta>700 fn cmeta_from_row(row: &rusqlite::Row<'_>) -> Result<ConsensusMeta> {
701     let va: OffsetDateTime = row.get(0)?;
702     let fu: OffsetDateTime = row.get(1)?;
703     let vu: OffsetDateTime = row.get(2)?;
704     let d_signed: String = row.get(3)?;
705     let d_all: String = row.get(4)?;
706     let lifetime = Lifetime::new(va.into(), fu.into(), vu.into())?;
707     Ok(ConsensusMeta::new(
708         lifetime,
709         digest_from_hex(&d_signed)?,
710         digest_from_dstr(&d_all)?,
711     ))
712 }
713 
714 /// Version number used for this version of the arti cache schema.
715 const SCHEMA_VERSION: u32 = 1;
716 
717 /// Set up the tables for the arti cache schema in a sqlite database.
718 const INSTALL_V0_SCHEMA: &str = "
719   -- Helps us version the schema.  The schema here corresponds to a
720   -- version number called 'version', and it should be readable by
721   -- anybody who is compliant with versions of at least 'readable_by'.
722   CREATE TABLE TorSchemaMeta (
723      name TEXT NOT NULL PRIMARY KEY,
724      version INTEGER NOT NULL,
725      readable_by INTEGER NOT NULL
726   );
727 
728   INSERT INTO TorSchemaMeta (name, version, readable_by) VALUES ( 'TorDirStorage', 0, 0 );
729 
730   -- Keeps track of external blobs on disk.
731   CREATE TABLE ExtDocs (
732     -- Records a digest of the file contents, in the form 'dtype-hexstr'
733     digest TEXT PRIMARY KEY NOT NULL,
734     -- When was this file created?
735     created DATE NOT NULL,
736     -- After what time will this file definitely be useless?
737     expires DATE NOT NULL,
738     -- What is the type of this file? Currently supported are 'con:<flavor>'.
739     type TEXT NOT NULL,
740     -- Filename for this file within our blob directory.
741     filename TEXT NOT NULL
742   );
743 
744   -- All the microdescriptors we know about.
745   CREATE TABLE Microdescs (
746     sha256_digest TEXT PRIMARY KEY NOT NULL,
747     last_listed DATE NOT NULL,
748     contents BLOB NOT NULL
749   );
750 
751   -- All the authority certificates we know.
752   CREATE TABLE Authcerts (
753     id_digest TEXT NOT NULL,
754     sk_digest TEXT NOT NULL,
755     published DATE NOT NULL,
756     expires DATE NOT NULL,
757     contents BLOB NOT NULL,
758     PRIMARY KEY (id_digest, sk_digest)
759   );
760 
761   -- All the consensuses we're storing.
762   CREATE TABLE Consensuses (
763     valid_after DATE NOT NULL,
764     fresh_until DATE NOT NULL,
765     valid_until DATE NOT NULL,
766     flavor TEXT NOT NULL,
767     pending BOOLEAN NOT NULL,
768     sha3_of_signed_part TEXT NOT NULL,
769     digest TEXT NOT NULL,
770     FOREIGN KEY (digest) REFERENCES ExtDocs (digest) ON DELETE CASCADE
771   );
772   CREATE INDEX Consensuses_vu on CONSENSUSES(valid_until);
773 
774 ";
775 
776 /// Update the database schema from version 0 to version 1.
777 const UPDATE_SCHEMA_V0_TO_V1: &str = "
778   CREATE TABLE RouterDescs (
779     sha1_digest TEXT PRIMARY KEY NOT NULL,
780     published DATE NOT NULL,
781     contents BLOB NOT NULL
782   );
783 
784   UPDATE TorSchemaMeta SET version=1 WHERE version<1;
785 ";
786 
787 /// Query: find the latest-expiring microdesc consensus with a given
788 /// pending status.
789 const FIND_CONSENSUS_P: &str = "
790   SELECT valid_after, valid_until, filename
791   FROM Consensuses
792   INNER JOIN ExtDocs ON ExtDocs.digest = Consensuses.digest
793   WHERE pending = ? AND flavor = ?
794   ORDER BY valid_until DESC
795   LIMIT 1;
796 ";
797 
798 /// Query: find the latest-expiring microdesc consensus, regardless of
799 /// pending status.
800 const FIND_CONSENSUS: &str = "
801   SELECT valid_after, valid_until, filename
802   FROM Consensuses
803   INNER JOIN ExtDocs ON ExtDocs.digest = Consensuses.digest
804   WHERE flavor = ?
805   ORDER BY valid_until DESC
806   LIMIT 1;
807 ";
808 
809 /// Query: Find the valid-after time for the latest-expiring
810 /// non-pending consensus of a given flavor.
811 const FIND_LATEST_CONSENSUS_META: &str = "
812   SELECT valid_after, fresh_until, valid_until, sha3_of_signed_part, digest
813   FROM Consensuses
814   WHERE pending = 0 AND flavor = ?
815   ORDER BY valid_until DESC
816   LIMIT 1;
817 ";
818 
819 /// Look up a consensus by its digest-of-signed-part string.
820 const FIND_CONSENSUS_AND_META_BY_DIGEST_OF_SIGNED: &str = "
821   SELECT valid_after, fresh_until, valid_until, sha3_of_signed_part, Consensuses.digest, filename
822   FROM Consensuses
823   INNER JOIN ExtDocs on ExtDocs.digest = Consensuses.digest
824   WHERE Consensuses.sha3_of_signed_part = ?
825   LIMIT 1;
826 ";
827 
828 /// Query: Update the consensus whose digest field is 'digest' to call it
829 /// no longer pending.
830 const MARK_CONSENSUS_NON_PENDING: &str = "
831   UPDATE Consensuses
832   SET pending = 0
833   WHERE digest = ?;
834 ";
835 
836 /// Query: Remove the consensus with a given digest field.
837 const REMOVE_CONSENSUS: &str = "
838   DELETE FROM Consensuses
839   WHERE digest = ?;
840 ";
841 
842 /// Query: Find the authority certificate with given key digests.
843 const FIND_AUTHCERT: &str = "
844   SELECT contents FROM AuthCerts WHERE id_digest = ? AND sk_digest = ?;
845 ";
846 
847 /// Query: find the microdescriptor with a given hex-encoded sha256 digest
848 const FIND_MD: &str = "
849   SELECT contents
850   FROM Microdescs
851   WHERE sha256_digest = ?
852 ";
853 
854 /// Query: find the router descriptors with a given hex-encoded sha1 digest
855 #[cfg(feature = "routerdesc")]
856 const FIND_RD: &str = "
857   SELECT contents
858   FROM RouterDescs
859   WHERE sha1_digest = ?
860 ";
861 
862 /// Query: find every ExtDocs member that has expired.
863 const FIND_EXPIRED_EXTDOCS: &str = "
864   SELECT filename FROM Extdocs where expires < datetime('now');
865 ";
866 
867 /// Query: Add a new entry to ExtDocs.
868 const INSERT_EXTDOC: &str = "
869   INSERT OR REPLACE INTO ExtDocs ( digest, created, expires, type, filename )
870   VALUES ( ?, datetime('now'), ?, ?, ? );
871 ";
872 
873 /// Query: Add a new consensus.
874 const INSERT_CONSENSUS: &str = "
875   INSERT OR REPLACE INTO Consensuses
876     ( valid_after, fresh_until, valid_until, flavor, pending, sha3_of_signed_part, digest )
877   VALUES ( ?, ?, ?, ?, ?, ?, ? );
878 ";
879 
880 /// Query: Add a new AuthCert
881 const INSERT_AUTHCERT: &str = "
882   INSERT OR REPLACE INTO Authcerts
883     ( id_digest, sk_digest, published, expires, contents)
884   VALUES ( ?, ?, ?, ?, ? );
885 ";
886 
887 /// Query: Add a new microdescriptor
888 const INSERT_MD: &str = "
889   INSERT OR REPLACE INTO Microdescs ( sha256_digest, last_listed, contents )
890   VALUES ( ?, ?, ? );
891 ";
892 
893 /// Query: Add a new router descriptor
894 #[allow(unused)]
895 #[cfg(feature = "routerdesc")]
896 const INSERT_RD: &str = "
897   INSERT OR REPLACE INTO RouterDescs ( sha1_digest, published, contents )
898   VALUES ( ?, ?, ? );
899 ";
900 
901 /// Query: Change the time when a given microdescriptor was last listed.
902 const UPDATE_MD_LISTED: &str = "
903   UPDATE Microdescs
904   SET last_listed = max(last_listed, ?)
905   WHERE sha256_digest = ?;
906 ";
907 
908 /// Query: Discard every expired extdoc.
909 const DROP_OLD_EXTDOCS: &str = "
910   DELETE FROM ExtDocs WHERE expires < datetime('now');
911 ";
912 /// Query: Discard every router descriptor that hasn't been listed for 3
913 /// months.
914 // TODO: Choose a more realistic time.
915 const DROP_OLD_ROUTERDESCS: &str = "
916   DELETE FROM RouterDescs WHERE published < datetime('now','-3 months');
917   ";
918 /// Query: Discard every microdescriptor that hasn't been listed for 3 months.
919 // TODO: Choose a more realistic time.
920 const DROP_OLD_MICRODESCS: &str = "
921   DELETE FROM Microdescs WHERE last_listed < datetime('now','-3 months');
922 ";
923 /// Query: Discard every expired authority certificate.
924 const DROP_OLD_AUTHCERTS: &str = "
925   DELETE FROM Authcerts WHERE expires < datetime('now');
926 ";
927 /// Query: Discard every consensus that's been expired for at least
928 /// two days.
929 const DROP_OLD_CONSENSUSES: &str = "
930   DELETE FROM Consensuses WHERE valid_until < datetime('now','-2 days');
931 ";
932 
933 #[cfg(test)]
934 mod test {
935     #![allow(clippy::unwrap_used)]
936     use super::*;
937     use hex_literal::hex;
938     use tempfile::{tempdir, TempDir};
939     use time::ext::NumericalDuration;
940 
new_empty() -> Result<(TempDir, SqliteStore)>941     fn new_empty() -> Result<(TempDir, SqliteStore)> {
942         let tmp_dir = tempdir().unwrap();
943         let sql_path = tmp_dir.path().join("db.sql");
944         let conn = rusqlite::Connection::open(&sql_path)?;
945         let store = SqliteStore::from_conn(conn, &tmp_dir)?;
946 
947         Ok((tmp_dir, store))
948     }
949 
950     #[test]
init() -> Result<()>951     fn init() -> Result<()> {
952         let tmp_dir = tempdir().unwrap();
953         let sql_path = tmp_dir.path().join("db.sql");
954         // Initial setup: everything should work.
955         {
956             let conn = rusqlite::Connection::open(&sql_path)?;
957             let _store = SqliteStore::from_conn(conn, &tmp_dir)?;
958         }
959         // Second setup: shouldn't need to upgrade.
960         {
961             let conn = rusqlite::Connection::open(&sql_path)?;
962             let _store = SqliteStore::from_conn(conn, &tmp_dir)?;
963         }
964         // Third setup: shouldn't need to upgrade.
965         {
966             let conn = rusqlite::Connection::open(&sql_path)?;
967             conn.execute_batch("UPDATE TorSchemaMeta SET version = 9002;")?;
968             let _store = SqliteStore::from_conn(conn, &tmp_dir)?;
969         }
970         // Fourth: this says we can't read it, so we'll get an error.
971         {
972             let conn = rusqlite::Connection::open(&sql_path)?;
973             conn.execute_batch("UPDATE TorSchemaMeta SET readable_by = 9001;")?;
974             let val = SqliteStore::from_conn(conn, &tmp_dir);
975             assert!(val.is_err());
976         }
977         Ok(())
978     }
979 
980     #[test]
bad_blob_fnames() -> Result<()>981     fn bad_blob_fnames() -> Result<()> {
982         let (_tmp_dir, store) = new_empty()?;
983 
984         assert!(store.blob_fname("abcd").is_ok());
985         assert!(store.blob_fname("abcd..").is_ok());
986         assert!(store.blob_fname("..abcd..").is_ok());
987         assert!(store.blob_fname(".abcd").is_ok());
988 
989         assert!(store.blob_fname(".").is_err());
990         assert!(store.blob_fname("..").is_err());
991         assert!(store.blob_fname("../abcd").is_err());
992         assert!(store.blob_fname("/abcd").is_err());
993 
994         Ok(())
995     }
996 
997     #[test]
blobs() -> Result<()>998     fn blobs() -> Result<()> {
999         let (tmp_dir, mut store) = new_empty()?;
1000 
1001         let now = OffsetDateTime::now_utc();
1002         let one_week = 1.weeks();
1003 
1004         let fname1 = store.save_blob(
1005             b"Hello world",
1006             "greeting",
1007             "sha1",
1008             &hex!("7b502c3a1f48c8609ae212cdfb639dee39673f5e"),
1009             now + one_week,
1010         )?;
1011 
1012         let fname2 = store.save_blob(
1013             b"Goodbye, dear friends",
1014             "greeting",
1015             "sha1",
1016             &hex!("2149c2a7dbf5be2bb36fb3c5080d0fb14cb3355c"),
1017             now - one_week,
1018         )?;
1019 
1020         assert_eq!(
1021             fname1,
1022             "greeting:sha1-7b502c3a1f48c8609ae212cdfb639dee39673f5e"
1023         );
1024         assert_eq!(store.blob_fname(&fname1)?, tmp_dir.path().join(&fname1));
1025         assert_eq!(
1026             &std::fs::read(store.blob_fname(&fname1)?)?[..],
1027             b"Hello world"
1028         );
1029         assert_eq!(
1030             &std::fs::read(store.blob_fname(&fname2)?)?[..],
1031             b"Goodbye, dear friends"
1032         );
1033 
1034         let n: u32 = store
1035             .conn
1036             .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
1037         assert_eq!(n, 2);
1038 
1039         let blob = store.read_blob(&fname2)?;
1040         assert_eq!(blob.as_str().unwrap(), "Goodbye, dear friends");
1041 
1042         // Now expire: the second file should go away.
1043         store.expire_all()?;
1044         assert_eq!(
1045             &std::fs::read(store.blob_fname(&fname1)?)?[..],
1046             b"Hello world"
1047         );
1048         assert!(std::fs::read(store.blob_fname(&fname2)?).is_err());
1049         let n: u32 = store
1050             .conn
1051             .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
1052         assert_eq!(n, 1);
1053 
1054         Ok(())
1055     }
1056 
1057     #[test]
consensus() -> Result<()>1058     fn consensus() -> Result<()> {
1059         use tor_netdoc::doc::netstatus;
1060 
1061         let (_tmp_dir, mut store) = new_empty()?;
1062         let now = OffsetDateTime::now_utc();
1063         let one_hour = 1.hours();
1064 
1065         assert_eq!(
1066             store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
1067             None
1068         );
1069 
1070         let cmeta = ConsensusMeta::new(
1071             netstatus::Lifetime::new(
1072                 now.into(),
1073                 (now + one_hour).into(),
1074                 (now + one_hour * 2).into(),
1075             )
1076             .unwrap(),
1077             [0xAB; 32],
1078             [0xBC; 32],
1079         );
1080 
1081         store.store_consensus(
1082             &cmeta,
1083             ConsensusFlavor::Microdesc,
1084             true,
1085             "Pretend this is a consensus",
1086         )?;
1087 
1088         {
1089             assert_eq!(
1090                 store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
1091                 None
1092             );
1093             let consensus = store
1094                 .latest_consensus(ConsensusFlavor::Microdesc, None)?
1095                 .unwrap();
1096             assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
1097             let consensus = store.latest_consensus(ConsensusFlavor::Microdesc, Some(false))?;
1098             assert!(consensus.is_none());
1099         }
1100 
1101         store.mark_consensus_usable(&cmeta)?;
1102 
1103         {
1104             assert_eq!(
1105                 store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
1106                 now.into()
1107             );
1108             let consensus = store
1109                 .latest_consensus(ConsensusFlavor::Microdesc, None)?
1110                 .unwrap();
1111             assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
1112             let consensus = store
1113                 .latest_consensus(ConsensusFlavor::Microdesc, Some(false))?
1114                 .unwrap();
1115             assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
1116         }
1117 
1118         {
1119             let consensus_text = store.consensus_by_meta(&cmeta)?;
1120             assert_eq!(consensus_text.as_str()?, "Pretend this is a consensus");
1121 
1122             let (is, _cmeta2) = store
1123                 .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
1124                 .unwrap();
1125             assert_eq!(is.as_str()?, "Pretend this is a consensus");
1126 
1127             let cmeta3 = ConsensusMeta::new(
1128                 netstatus::Lifetime::new(
1129                     now.into(),
1130                     (now + one_hour).into(),
1131                     (now + one_hour * 2).into(),
1132                 )
1133                 .unwrap(),
1134                 [0x99; 32],
1135                 [0x99; 32],
1136             );
1137             assert!(store.consensus_by_meta(&cmeta3).is_err());
1138 
1139             assert!(store
1140                 .consensus_by_sha3_digest_of_signed_part(&[0x99; 32])?
1141                 .is_none());
1142         }
1143 
1144         {
1145             assert!(store
1146                 .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
1147                 .is_some());
1148             store.delete_consensus(&cmeta)?;
1149             assert!(store
1150                 .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
1151                 .is_none());
1152         }
1153 
1154         Ok(())
1155     }
1156 
1157     #[test]
authcerts() -> Result<()>1158     fn authcerts() -> Result<()> {
1159         let (_tmp_dir, mut store) = new_empty()?;
1160         let now = OffsetDateTime::now_utc();
1161         let one_hour = 1.hours();
1162 
1163         let keyids = AuthCertKeyIds {
1164             id_fingerprint: [3; 20].into(),
1165             sk_fingerprint: [4; 20].into(),
1166         };
1167         let keyids2 = AuthCertKeyIds {
1168             id_fingerprint: [4; 20].into(),
1169             sk_fingerprint: [3; 20].into(),
1170         };
1171 
1172         let m1 = AuthCertMeta::new(keyids, now.into(), (now + one_hour * 24).into());
1173 
1174         store.store_authcerts(&[(m1, "Pretend this is a cert")])?;
1175 
1176         let certs = store.authcerts(&[keyids, keyids2])?;
1177         assert_eq!(certs.len(), 1);
1178         assert_eq!(certs.get(&keyids).unwrap(), "Pretend this is a cert");
1179 
1180         Ok(())
1181     }
1182 
1183     #[test]
microdescs() -> Result<()>1184     fn microdescs() -> Result<()> {
1185         let (_tmp_dir, mut store) = new_empty()?;
1186 
1187         let now = OffsetDateTime::now_utc();
1188         let one_day = 1.days();
1189 
1190         let d1 = [5_u8; 32];
1191         let d2 = [7; 32];
1192         let d3 = [42; 32];
1193         let d4 = [99; 32];
1194 
1195         let long_ago: OffsetDateTime = now - one_day * 100;
1196         store.store_microdescs(
1197             vec![
1198                 ("Fake micro 1", &d1),
1199                 ("Fake micro 2", &d2),
1200                 ("Fake micro 3", &d3),
1201             ],
1202             long_ago.into(),
1203         )?;
1204 
1205         store.update_microdescs_listed(&[d2], now.into())?;
1206 
1207         let mds = store.microdescs(&[d2, d3, d4])?;
1208         assert_eq!(mds.len(), 2);
1209         assert_eq!(mds.get(&d1), None);
1210         assert_eq!(mds.get(&d2).unwrap(), "Fake micro 2");
1211         assert_eq!(mds.get(&d3).unwrap(), "Fake micro 3");
1212         assert_eq!(mds.get(&d4), None);
1213 
1214         // Now we'll expire.  that should drop everything but d2.
1215         store.expire_all()?;
1216         let mds = store.microdescs(&[d2, d3, d4])?;
1217         assert_eq!(mds.len(), 1);
1218         assert_eq!(mds.get(&d2).unwrap(), "Fake micro 2");
1219 
1220         Ok(())
1221     }
1222 
1223     #[test]
1224     #[cfg(feature = "routerdesc")]
routerdescs() -> Result<()>1225     fn routerdescs() -> Result<()> {
1226         let (_tmp_dir, mut store) = new_empty()?;
1227 
1228         let now = OffsetDateTime::now_utc();
1229         let one_day = 1.days();
1230         let long_ago: OffsetDateTime = now - one_day * 100;
1231         let recently = now - one_day;
1232 
1233         let d1 = [5_u8; 20];
1234         let d2 = [7; 20];
1235         let d3 = [42; 20];
1236         let d4 = [99; 20];
1237 
1238         store.store_routerdescs(vec![
1239             ("Fake routerdesc 1", long_ago.into(), &d1),
1240             ("Fake routerdesc 2", recently.into(), &d2),
1241             ("Fake routerdesc 3", long_ago.into(), &d3),
1242         ])?;
1243 
1244         let rds = store.routerdescs(&[d2, d3, d4])?;
1245         assert_eq!(rds.len(), 2);
1246         assert_eq!(rds.get(&d1), None);
1247         assert_eq!(rds.get(&d2).unwrap(), "Fake routerdesc 2");
1248         assert_eq!(rds.get(&d3).unwrap(), "Fake routerdesc 3");
1249         assert_eq!(rds.get(&d4), None);
1250 
1251         // Now we'll expire.  that should drop everything but d2.
1252         store.expire_all()?;
1253         let rds = store.routerdescs(&[d2, d3, d4])?;
1254         assert_eq!(rds.len(), 1);
1255         assert_eq!(rds.get(&d2).unwrap(), "Fake routerdesc 2");
1256 
1257         Ok(())
1258     }
1259 
1260     #[test]
from_path_rw() -> Result<()>1261     fn from_path_rw() -> Result<()> {
1262         let tmp = tempdir().unwrap();
1263 
1264         // Nothing there: can't open read-only
1265         let r = SqliteStore::from_path(tmp.path(), true);
1266         assert!(r.is_err());
1267         assert!(!tmp.path().join("dir_blobs").exists());
1268 
1269         // Opening it read-write will crate the files
1270         {
1271             let mut store = SqliteStore::from_path(tmp.path(), false)?;
1272             assert!(tmp.path().join("dir_blobs").is_dir());
1273             assert!(store.lockfile.is_some());
1274             assert!(!store.is_readonly());
1275             assert!(store.upgrade_to_readwrite()?); // no-op.
1276         }
1277 
1278         // At this point, we can successfully make a read-only connection.
1279         {
1280             let mut store2 = SqliteStore::from_path(tmp.path(), true)?;
1281             assert!(store2.is_readonly());
1282 
1283             // Nobody else is locking this, so we can upgrade.
1284             assert!(store2.upgrade_to_readwrite()?); // no-op.
1285             assert!(!store2.is_readonly());
1286         }
1287         Ok(())
1288     }
1289 }
1290