1 use std::{
2     collections::{HashMap, VecDeque},
3     fmt, fs, mem,
4     path::{Path, PathBuf},
5     str::FromStr,
6     sync::{Arc, RwLock},
7 };
8 
9 use bytes::Bytes;
10 
11 use rpki::{
12     repository::{crypto::KeyIdentifier, x509::Time},
13     uri,
14 };
15 
16 use crate::{
17     commons::{
18         actor::Actor,
19         api::rrdp::{
20             CurrentObjects, Delta, DeltaElements, DeltaRef, FileRef, Notification, RrdpSession, Snapshot, SnapshotRef,
21         },
22         api::{
23             Handle, HexEncodedHash, ListReply, PublicationServerUris, PublishDelta, PublisherHandle, RepoInfo,
24             StorableRepositoryCommand,
25         },
26         crypto::{IdCert, KrillSigner, ProtocolCms, ProtocolCmsBuilder},
27         error::{Error, KrillIoError},
28         eventsourcing::{Aggregate, AggregateStore, KeyStoreKey, KeyValueStore},
29         remote::rfc8183,
30         util::file,
31         KrillResult,
32     },
33     constants::{
34         PUBSERVER_CONTENT_DIR, PUBSERVER_DFLT, PUBSERVER_DIR, REPOSITORY_DIR, REPOSITORY_RRDP_ARCHIVE_DIR,
35         REPOSITORY_RRDP_DIR, REPOSITORY_RSYNC_DIR, RRDP_FIRST_SERIAL,
36     },
37     daemon::config::{Config, RepositoryRetentionConfig},
38     pubd::{
39         publishers::Publisher, RepoAccessCmd, RepoAccessCmdDet, RepositoryAccessEvent, RepositoryAccessEventDetails,
40         RepositoryAccessIni, RepositoryAccessInitDetails,
41     },
42 };
43 
44 //------------ RepositoryContentProxy ----------------------------------------
45 
46 /// We can only have one (1) RepositoryContent, but it is stored
47 /// in a KeyValueStore. So this type provides a wrapper around this
48 /// so that callers don't need to worry about storage details.
49 #[derive(Debug)]
50 pub struct RepositoryContentProxy {
51     cache: RwLock<Option<RepositoryContent>>,
52     store: RwLock<KeyValueStore>,
53     key: KeyStoreKey,
54 }
55 
56 impl RepositoryContentProxy {
disk(config: &Config) -> KrillResult<Self>57     pub fn disk(config: &Config) -> KrillResult<Self> {
58         let work_dir = &config.data_dir;
59         let store = KeyValueStore::disk(work_dir, PUBSERVER_CONTENT_DIR)?;
60         let store = RwLock::new(store);
61         let key = KeyStoreKey::simple(format!("{}.json", PUBSERVER_DFLT));
62         let cache = RwLock::new(None);
63 
64         let proxy = RepositoryContentProxy { cache, store, key };
65         proxy.warm_cache()?;
66 
67         Ok(proxy)
68     }
69 
warm_cache(&self) -> KrillResult<()>70     fn warm_cache(&self) -> KrillResult<()> {
71         let key_store_read = self.store.read().unwrap();
72 
73         if key_store_read.has(&self.key)? {
74             info!("Warming the repository content cache, this can take a minute for large repositories.");
75             let content = key_store_read.get(&self.key)?.unwrap();
76             self.cache.write().unwrap().replace(content);
77         }
78 
79         Ok(())
80     }
81 
82     /// Initialize
init(&self, work_dir: &Path, uris: PublicationServerUris) -> KrillResult<()>83     pub fn init(&self, work_dir: &Path, uris: PublicationServerUris) -> KrillResult<()> {
84         if self.store.read().unwrap().has(&self.key)? {
85             Err(Error::RepositoryServerAlreadyInitialized)
86         } else {
87             // initialize new repo content
88             let repository_content = {
89                 let (rrdp_base_uri, rsync_jail) = uris.unpack();
90 
91                 let publishers = HashMap::new();
92 
93                 let session = RrdpSession::default();
94                 let stats = RepoStats::new(session);
95 
96                 let mut repo_dir = work_dir.to_path_buf();
97                 repo_dir.push(REPOSITORY_DIR);
98 
99                 let rrdp = RrdpServer::create(rrdp_base_uri, &repo_dir, session);
100                 let rsync = RsyncdStore::new(rsync_jail, &repo_dir);
101 
102                 RepositoryContent::new(publishers, rrdp, rsync, stats)
103             };
104 
105             // Store newly initialized repo content on disk
106             let store = self.store.write().unwrap();
107             store.store(&self.key, &repository_content)?;
108 
109             // Store newly initialized repo content in cache
110             let mut cache = self.cache.write().unwrap();
111             cache.replace(repository_content);
112 
113             Ok(())
114         }
115     }
116 
117     // Clear all content, so it can be re-initialized.
118     // Only to be called after all publishers have been removed from the RepoAccess as well.
clear(&self) -> KrillResult<()>119     pub fn clear(&self) -> KrillResult<()> {
120         let store = self.store.write().unwrap();
121 
122         if let Ok(Some(content)) = store.get::<RepositoryContent>(&self.key) {
123             content.clear();
124             store.drop_key(&self.key)?;
125         }
126 
127         let mut cache = self.cache.write().unwrap();
128         cache.take();
129 
130         Ok(())
131     }
132 
133     /// Return the repository content stats
stats(&self) -> KrillResult<RepoStats>134     pub fn stats(&self) -> KrillResult<RepoStats> {
135         self.read(|content| Ok(content.stats().clone()))
136     }
137 
138     /// Add a publisher with an empty set of published objects.
139     ///
140     /// Replaces an existing publisher if it existed.
141     /// This is only supposed to be called if adding the publisher
142     /// to the RepositoryAccess was successful (and *that* will fail if
143     /// the publisher is a duplicate). This method can only fail if
144     /// there is an issue with the underlying key value store.
add_publisher(&self, name: PublisherHandle) -> KrillResult<()>145     pub fn add_publisher(&self, name: PublisherHandle) -> KrillResult<()> {
146         self.write(|content| content.add_publisher(name))
147     }
148 
149     /// Removes a publisher and its content.
remove_publisher( &self, name: &PublisherHandle, jail: &uri::Rsync, config: &RepositoryRetentionConfig, ) -> KrillResult<()>150     pub fn remove_publisher(
151         &self,
152         name: &PublisherHandle,
153         jail: &uri::Rsync,
154         config: &RepositoryRetentionConfig,
155     ) -> KrillResult<()> {
156         self.write(|content| content.remove_publisher(name, jail, config))
157     }
158 
159     /// Publish an update for a publisher.
160     ///
161     /// Assumes that the RFC 8181 CMS has been verified, but will check that all objects
162     /// are within the publisher's uri space (jail).
publish( &self, name: &PublisherHandle, delta: PublishDelta, jail: &uri::Rsync, config: &RepositoryRetentionConfig, ) -> KrillResult<()>163     pub fn publish(
164         &self,
165         name: &PublisherHandle,
166         delta: PublishDelta,
167         jail: &uri::Rsync,
168         config: &RepositoryRetentionConfig,
169     ) -> KrillResult<()> {
170         self.write(|content| content.publish(name, delta.into(), jail, config))
171     }
172 
173     /// Write all current files to disk
write_repository(&self, config: &RepositoryRetentionConfig) -> KrillResult<()>174     pub fn write_repository(&self, config: &RepositoryRetentionConfig) -> KrillResult<()> {
175         self.read(|content| content.write_repository(config))
176     }
177 
178     /// Reset the RRDP session if it is initialized. Otherwise do nothing.
session_reset(&self, config: &RepositoryRetentionConfig) -> KrillResult<()>179     pub fn session_reset(&self, config: &RepositoryRetentionConfig) -> KrillResult<()> {
180         if self.cache.read().unwrap().is_some() {
181             self.write(|content| content.session_reset(config))
182         } else {
183             // repository server was not initialized on this Krill instance. Nothing to reset.
184             Ok(())
185         }
186     }
187 
188     /// Create a list reply containing all current objects for a publisher
list_reply(&self, name: &PublisherHandle) -> KrillResult<ListReply>189     pub fn list_reply(&self, name: &PublisherHandle) -> KrillResult<ListReply> {
190         self.read(|content| content.list_reply(name))
191     }
192 
193     // Get all current objects for a publisher
current_objects(&self, name: &PublisherHandle) -> KrillResult<CurrentObjects>194     pub fn current_objects(&self, name: &PublisherHandle) -> KrillResult<CurrentObjects> {
195         self.read(|content| content.objects_for_publisher(name).map(|o| o.clone()))
196     }
197 
198     // Execute a closure on a mutable repository content in a single write 'transaction'
write<F: FnOnce(&mut RepositoryContent) -> KrillResult<()>>(&self, op: F) -> KrillResult<()>199     fn write<F: FnOnce(&mut RepositoryContent) -> KrillResult<()>>(&self, op: F) -> KrillResult<()> {
200         // If there is any existing content, then we can assume that the cache
201         // has it - because it's initialized when we read the content during
202         // initialization.
203         let store = self.store.write().unwrap();
204         let mut cache = self.cache.write().unwrap();
205 
206         let content: &mut RepositoryContent = cache.as_mut().ok_or(Error::RepositoryServerNotInitialized)?;
207 
208         op(content)?;
209 
210         store.store(&self.key, content)?;
211         Ok(())
212     }
213 
214     // Execute a closure on a mutable repository content in a single read 'transaction'
215     //
216     // This function fails if the repository content is not initialized.
read<A, F: FnOnce(&RepositoryContent) -> KrillResult<A>>(&self, op: F) -> KrillResult<A>217     fn read<A, F: FnOnce(&RepositoryContent) -> KrillResult<A>>(&self, op: F) -> KrillResult<A> {
218         // Note that because the content is initialized it is implied that the cache MUST always be
219         // set. I.e. it is set on initialization and updated whenever the repository content is updated.
220         // So, we can safely read from the cache only.
221         let cache = self.cache.read().unwrap();
222         let content = cache.as_ref().ok_or(Error::RepositoryServerNotInitialized)?;
223         op(content)
224     }
225 }
226 
227 //------------ RepositoryContent -------------------------------------------
228 
229 /// This type manages the content of the repository. Note that access
230 /// to the repository is managed by an event sourced component which
231 /// handles RFC8181 based access, and which can enforce restrictions,
232 /// such as the base uri for publishers.
233 #[derive(Clone, Debug, Deserialize, Serialize)]
234 pub struct RepositoryContent {
235     publishers: HashMap<PublisherHandle, CurrentObjects>,
236 
237     rrdp: RrdpServer,
238     rsync: RsyncdStore,
239 
240     stats: RepoStats,
241 }
242 
243 impl RepositoryContent {
new( publishers: HashMap<PublisherHandle, CurrentObjects>, rrdp: RrdpServer, rsync: RsyncdStore, stats: RepoStats, ) -> Self244     pub fn new(
245         publishers: HashMap<PublisherHandle, CurrentObjects>,
246         rrdp: RrdpServer,
247         rsync: RsyncdStore,
248         stats: RepoStats,
249     ) -> Self {
250         RepositoryContent {
251             publishers,
252             rrdp,
253             rsync,
254             stats,
255         }
256     }
257 
init(rrdp_base_uri: uri::Https, rsync_jail: uri::Rsync, session: RrdpSession, repo_base_dir: &Path) -> Self258     pub fn init(rrdp_base_uri: uri::Https, rsync_jail: uri::Rsync, session: RrdpSession, repo_base_dir: &Path) -> Self {
259         let publishers = HashMap::new();
260         let rrdp = RrdpServer::create(rrdp_base_uri, repo_base_dir, session);
261         let rsync = RsyncdStore::new(rsync_jail, repo_base_dir);
262         let stats = RepoStats::new(session);
263 
264         RepositoryContent {
265             publishers,
266             rrdp,
267             rsync,
268             stats,
269         }
270     }
271 
272     // Clears all content on disk so the repository can be re-initialized
clear(&self)273     pub fn clear(&self) {
274         self.rrdp.clear();
275         self.rsync.clear();
276     }
277 
stats(&self) -> &RepoStats278     pub fn stats(&self) -> &RepoStats {
279         &self.stats
280     }
281 }
282 
283 /// # Publisher Content
284 impl RepositoryContent {
objects_for_publisher(&self, publisher: &PublisherHandle) -> KrillResult<&CurrentObjects>285     fn objects_for_publisher(&self, publisher: &PublisherHandle) -> KrillResult<&CurrentObjects> {
286         self.publishers
287             .get(publisher)
288             .ok_or_else(|| Error::PublisherUnknown(publisher.clone()))
289     }
290 
objects_for_publisher_mut(&mut self, publisher: &PublisherHandle) -> KrillResult<&mut CurrentObjects>291     fn objects_for_publisher_mut(&mut self, publisher: &PublisherHandle) -> KrillResult<&mut CurrentObjects> {
292         self.publishers
293             .get_mut(publisher)
294             .ok_or_else(|| Error::PublisherUnknown(publisher.clone()))
295     }
296 
297     /// Gets a list reply containing all objects for this publisher.
list_reply(&self, publisher: &Handle) -> KrillResult<ListReply>298     pub fn list_reply(&self, publisher: &Handle) -> KrillResult<ListReply> {
299         self.objects_for_publisher(publisher).map(|o| o.to_list_reply())
300     }
301 
publish( &mut self, name: &PublisherHandle, delta: DeltaElements, jail: &uri::Rsync, config: &RepositoryRetentionConfig, ) -> KrillResult<()>302     pub fn publish(
303         &mut self,
304         name: &PublisherHandle,
305         delta: DeltaElements,
306         jail: &uri::Rsync,
307         config: &RepositoryRetentionConfig,
308     ) -> KrillResult<()> {
309         // update publisher, this will fail if the publisher tries
310         // to update outside of its jail.
311         let objects = self.objects_for_publisher_mut(name)?;
312         objects.apply_delta(delta.clone(), jail)?;
313         let publisher_stats = PublisherStats::new(objects, Time::now());
314 
315         // update the RRDP server
316         self.rrdp.publish(delta, jail, config)?;
317 
318         // write repo (note rsync is based on updated rrdp snapshot)
319         self.write_repository(config)?;
320 
321         // Update publisher stats
322         self.stats.publish(name, publisher_stats, self.rrdp.notification());
323 
324         Ok(())
325     }
326 
session_reset(&mut self, config: &RepositoryRetentionConfig) -> KrillResult<()>327     pub fn session_reset(&mut self, config: &RepositoryRetentionConfig) -> KrillResult<()> {
328         info!(
329             "Performing RRDP session reset. This ensures a consistent view for RPs in case we restarted from a backup."
330         );
331 
332         self.rrdp.session_reset();
333         self.stats.session_reset(self.rrdp.notification());
334         self.write_repository(config)?;
335 
336         Ok(())
337     }
338 
write_repository(&self, config: &RepositoryRetentionConfig) -> KrillResult<()>339     pub fn write_repository(&self, config: &RepositoryRetentionConfig) -> KrillResult<()> {
340         self.rrdp.write(config)?;
341         self.rsync.write(self.rrdp.snapshot())
342     }
343 
add_publisher(&mut self, name: PublisherHandle) -> KrillResult<()>344     pub fn add_publisher(&mut self, name: PublisherHandle) -> KrillResult<()> {
345         self.stats.new_publisher(&name);
346         self.publishers.insert(name, CurrentObjects::default());
347         Ok(())
348     }
349 
350     /// Removes the content for a publisher. This function will return
351     /// ok if there is no content to remove - it is idempotent in that
352     /// sense. However, if there are I/O errors removing the content then
353     /// this function will fail.
remove_publisher( &mut self, name: &PublisherHandle, jail: &uri::Rsync, config: &RepositoryRetentionConfig, ) -> KrillResult<()>354     pub fn remove_publisher(
355         &mut self,
356         name: &PublisherHandle,
357         jail: &uri::Rsync,
358         config: &RepositoryRetentionConfig,
359     ) -> KrillResult<()> {
360         if let Ok(objects) = self.objects_for_publisher(name) {
361             let withdraws = objects.elements().iter().map(|e| e.as_withdraw()).collect();
362             let delta = DeltaElements::new(vec![], vec![], withdraws);
363 
364             self.rrdp.publish(delta, jail, config)?;
365             self.stats.remove_publisher(name, self.rrdp.notification());
366 
367             self.write_repository(config)
368         } else {
369             // nothing to remove
370             Ok(())
371         }
372     }
373 }
374 
375 //------------ RsyncdStore ---------------------------------------------------
376 
377 /// This type is responsible for publishing files on disk in a structure so
378 /// that an rsyncd can be set up to serve this (RPKI) data. Note that the
379 /// rsync host name and module are part of the path, so make sure that the
380 /// rsyncd modules and paths are setup properly for each supported rsync
381 /// base uri used.
382 #[derive(Clone, Debug, Deserialize, Serialize)]
383 pub struct RsyncdStore {
384     base_uri: uri::Rsync,
385     rsync_dir: PathBuf,
386     #[serde(skip_serializing, skip_deserializing, default = "RsyncdStore::new_lock")]
387     lock: Arc<RwLock<()>>,
388 }
389 
390 /// # Construct
391 ///
392 impl RsyncdStore {
new_lock() -> Arc<RwLock<()>>393     pub fn new_lock() -> Arc<RwLock<()>> {
394         Arc::new(RwLock::new(()))
395     }
396 
new(base_uri: uri::Rsync, repo_dir: &Path) -> Self397     pub fn new(base_uri: uri::Rsync, repo_dir: &Path) -> Self {
398         let mut rsync_dir = repo_dir.to_path_buf();
399         rsync_dir.push(REPOSITORY_RSYNC_DIR);
400         let lock = Self::new_lock();
401         RsyncdStore {
402             base_uri,
403             rsync_dir,
404             lock,
405         }
406     }
407 }
408 
409 /// # Publishing
410 ///
411 impl RsyncdStore {
412     /// Write all the files to disk for rsync to a tmp-dir, then switch
413     /// things over in an effort to minimize the chance of people getting
414     /// inconsistent syncs..
write(&self, snapshot: &Snapshot) -> KrillResult<()>415     pub fn write(&self, snapshot: &Snapshot) -> KrillResult<()> {
416         let _lock = self
417             .lock
418             .write()
419             .map_err(|_| Error::custom("Could not get write lock for rsync repo"))?;
420 
421         let mut new_dir = self.rsync_dir.clone();
422         new_dir.push(&format!("tmp-{}", snapshot.serial()));
423         fs::create_dir_all(&new_dir).map_err(|e| {
424             KrillIoError::new(
425                 format!(
426                     "Could not create dir(s) '{}' for publishing rsync",
427                     new_dir.to_string_lossy()
428                 ),
429                 e,
430             )
431         })?;
432 
433         let elements = snapshot.elements();
434 
435         for publish in elements {
436             let rel = publish
437                 .uri()
438                 .relative_to(&self.base_uri)
439                 .ok_or_else(|| Error::publishing_outside_jail(publish.uri(), &self.base_uri))?;
440 
441             let mut path = new_dir.clone();
442             path.push(rel);
443 
444             file::save(&publish.base64().to_bytes(), &path)?;
445         }
446 
447         let mut current_dir = self.rsync_dir.clone();
448         current_dir.push("current");
449 
450         let mut old_dir = self.rsync_dir.clone();
451         old_dir.push("old");
452 
453         if current_dir.exists() {
454             fs::rename(&current_dir, &old_dir).map_err(|e| {
455                 KrillIoError::new(
456                     format!(
457                         "Could not rename current rsync dir from '{}' to '{}' while publishing",
458                         current_dir.to_string_lossy(),
459                         old_dir.to_string_lossy()
460                     ),
461                     e,
462                 )
463             })?;
464         }
465 
466         fs::rename(&new_dir, &current_dir).map_err(|e| {
467             KrillIoError::new(
468                 format!(
469                     "Could not rename new rsync dir from '{}' to '{}' while publishing",
470                     new_dir.to_string_lossy(),
471                     current_dir.to_string_lossy()
472                 ),
473                 e,
474             )
475         })?;
476 
477         if old_dir.exists() {
478             fs::remove_dir_all(&old_dir).map_err(|e| {
479                 KrillIoError::new(
480                     format!(
481                         "Could not remove up old rsync dir '{}' while publishing",
482                         old_dir.to_string_lossy()
483                     ),
484                     e,
485                 )
486             })?;
487         }
488 
489         Ok(())
490     }
491 
clear(&self)492     fn clear(&self) {
493         let _ = fs::remove_dir_all(&self.rsync_dir);
494     }
495 }
496 
497 /// The RRDP server used by a Repository instance
498 #[derive(Clone, Debug, Deserialize, Serialize)]
499 pub struct RrdpServer {
500     /// The base URI for notification, snapshot and delta files.
501     rrdp_base_uri: uri::Https,
502 
503     /// The base directory where notification, snapshot and deltas will be
504     /// published.
505     rrdp_base_dir: PathBuf,
506     rrdp_archive_dir: PathBuf,
507 
508     session: RrdpSession,
509     serial: u64,
510     notification: Notification,
511 
512     #[serde(skip_serializing_if = "VecDeque::is_empty", default = "VecDeque::new")]
513     old_notifications: VecDeque<Notification>,
514 
515     snapshot: Snapshot,
516     deltas: VecDeque<Delta>,
517 }
518 
519 impl RrdpServer {
520     #[allow(clippy::too_many_arguments)]
new( rrdp_base_uri: uri::Https, rrdp_base_dir: PathBuf, rrdp_archive_dir: PathBuf, session: RrdpSession, serial: u64, notification: Notification, old_notifications: VecDeque<Notification>, snapshot: Snapshot, deltas: VecDeque<Delta>, ) -> Self521     pub fn new(
522         rrdp_base_uri: uri::Https,
523         rrdp_base_dir: PathBuf,
524         rrdp_archive_dir: PathBuf,
525         session: RrdpSession,
526         serial: u64,
527         notification: Notification,
528         old_notifications: VecDeque<Notification>,
529         snapshot: Snapshot,
530         deltas: VecDeque<Delta>,
531     ) -> Self {
532         RrdpServer {
533             rrdp_base_uri,
534             rrdp_base_dir,
535             rrdp_archive_dir,
536             session,
537             serial,
538             notification,
539             old_notifications,
540             snapshot,
541             deltas,
542         }
543     }
544 
create(rrdp_base_uri: uri::Https, repo_dir: &Path, session: RrdpSession) -> Self545     pub fn create(rrdp_base_uri: uri::Https, repo_dir: &Path, session: RrdpSession) -> Self {
546         let mut rrdp_base_dir = repo_dir.to_path_buf();
547         rrdp_base_dir.push(REPOSITORY_RRDP_DIR);
548 
549         let mut rrdp_archive_dir = repo_dir.to_path_buf();
550         rrdp_archive_dir.push(REPOSITORY_RRDP_ARCHIVE_DIR);
551 
552         let snapshot = Snapshot::create(session);
553 
554         let serial = RRDP_FIRST_SERIAL;
555         let snapshot_uri = snapshot.uri(&rrdp_base_uri);
556         let snapshot_path = snapshot.path(&rrdp_base_dir);
557         let snapshot_hash = HexEncodedHash::from_content(snapshot.xml().as_slice());
558 
559         let snapshot_ref = SnapshotRef::new(snapshot_uri, snapshot_path, snapshot_hash);
560 
561         let notification = Notification::create(session, snapshot_ref);
562 
563         RrdpServer {
564             rrdp_base_uri,
565             rrdp_base_dir,
566             rrdp_archive_dir,
567             session,
568             serial,
569             notification,
570             snapshot,
571             old_notifications: VecDeque::new(),
572             deltas: VecDeque::new(),
573         }
574     }
575 
clear(&self)576     fn clear(&self) {
577         let _ = fs::remove_dir_all(&self.rrdp_base_dir);
578         let _ = fs::remove_dir_all(&self.rrdp_archive_dir);
579     }
580 
snapshot(&self) -> &Snapshot581     fn snapshot(&self) -> &Snapshot {
582         &self.snapshot
583     }
584 
notification(&self) -> &Notification585     pub fn notification(&self) -> &Notification {
586         &self.notification
587     }
588 
589     /// Performs a session reset of the RRDP server. Useful if the serial needs
590     /// to be rolled, or in case the RRDP server needed to recover to a previous
591     /// state.
session_reset(&mut self)592     fn session_reset(&mut self) {
593         let session = RrdpSession::random();
594         let snapshot = self.snapshot.session_reset(session);
595 
596         let snapshot_uri = snapshot.uri(&self.rrdp_base_uri);
597         let snapshot_path = snapshot.path(&self.rrdp_base_dir);
598         let snapshot_hash = HexEncodedHash::from_content(snapshot.xml().as_slice());
599 
600         let snapshot_ref = SnapshotRef::new(snapshot_uri, snapshot_path, snapshot_hash);
601 
602         let notification = Notification::create(session, snapshot_ref);
603 
604         self.serial = notification.serial();
605         self.session = notification.session();
606         self.notification = notification;
607         self.old_notifications.clear();
608         self.snapshot = snapshot;
609         self.deltas = VecDeque::new();
610     }
611 
612     /// Updates the RRDP server with the elements. Note that this assumes that
613     /// the delta has already been checked against the jail and current
614     /// objects of the publisher.
publish( &mut self, elements: DeltaElements, jail: &uri::Rsync, config: &RepositoryRetentionConfig, ) -> KrillResult<()>615     fn publish(
616         &mut self,
617         elements: DeltaElements,
618         jail: &uri::Rsync,
619         config: &RepositoryRetentionConfig,
620     ) -> KrillResult<()> {
621         if elements.is_empty() {
622             Ok(())
623         } else {
624             // Update the snapshot, this can fail if the delta is illegal.
625             self.snapshot.apply_delta(elements.clone(), jail)?;
626             self.serial += 1;
627 
628             self.update_deltas(elements, config);
629             self.update_notification(config);
630 
631             Ok(())
632         }
633     }
634 
635     // Push the delta and truncate excessive deltas:
636     //  - never keep more than the size of the snapshot
637     //  - always keep 'retention_delta_files_min_nr' files
638     //  - always keep 'retention_delta_files_min_seconds' files
639     //  - beyond this:
640     //     - never keep more than 'retention_delta_files_max_nr'
641     //     - never keep older than 'retention_delta_files_max_seconds'
642     //     - keep the others
update_deltas(&mut self, elements: DeltaElements, config: &RepositoryRetentionConfig)643     fn update_deltas(&mut self, elements: DeltaElements, config: &RepositoryRetentionConfig) {
644         self.deltas.push_front(Delta::new(self.session, self.serial, elements));
645         let mut keep = 0;
646         let mut size = 0;
647         let snapshot_size = self.snapshot.size();
648 
649         let min_nr = config.retention_delta_files_min_nr;
650         let min_secs = config.retention_delta_files_min_seconds;
651         let max_nr = config.retention_delta_files_max_nr;
652         let max_secs = config.retention_delta_files_max_seconds;
653 
654         for delta in &self.deltas {
655             size += delta.elements().size();
656 
657             if size > snapshot_size {
658                 // never keep more than the size of the snapshot
659                 break;
660             } else if keep < min_nr || delta.younger_than_seconds(min_secs) {
661                 // always keep 'retention_delta_files_min_nr' files
662                 // always keep 'retention_delta_files_min_seconds' file
663                 keep += 1
664             } else if keep == max_nr || delta.older_than_seconds(max_secs) {
665                 // never keep more than 'retention_delta_files_max_nr'
666                 // never keep older than 'retention_delta_files_max_seconds'
667                 break;
668             } else {
669                 // keep the remainder
670                 keep += 1;
671             }
672         }
673         self.deltas.truncate(keep);
674     }
675 
676     // Update the notification to include the current snapshot and
677     // deltas. Remove old notifications exceeding the retention time,
678     // so that we can delete old snapshots and deltas which are no longer
679     // relevant.
update_notification(&mut self, config: &RepositoryRetentionConfig)680     fn update_notification(&mut self, config: &RepositoryRetentionConfig) {
681         let snapshot_ref = {
682             let snapshot_uri = self.snapshot.uri(&self.rrdp_base_uri);
683             let snapshot_path = self.snapshot.path(&self.rrdp_base_dir);
684             let snapshot_xml = self.snapshot.xml();
685             let snapshot_hash = HexEncodedHash::from_content(snapshot_xml.as_slice());
686             SnapshotRef::new(snapshot_uri, snapshot_path, snapshot_hash)
687         };
688 
689         let delta_refs = self
690             .deltas
691             .iter()
692             .map(|delta| {
693                 let serial = delta.serial();
694                 let xml = delta.xml();
695                 let hash = HexEncodedHash::from_content(xml.as_slice());
696 
697                 let delta_uri = delta.uri(&self.rrdp_base_uri);
698                 let delta_path = delta.path(&self.rrdp_base_dir);
699                 let file_ref = FileRef::new(delta_uri, delta_path, hash);
700                 DeltaRef::new(serial, file_ref)
701             })
702             .collect();
703 
704         let mut notification = Notification::new(self.session, self.serial, snapshot_ref, delta_refs);
705 
706         mem::swap(&mut self.notification, &mut notification);
707         notification.replace(self.notification.time());
708         self.old_notifications.push_front(notification);
709 
710         self.old_notifications
711             .retain(|n| !n.older_than_seconds(config.retention_old_notification_files_seconds));
712     }
713 
714     /// Write the (missing) RRDP files to disk, and remove the ones
715     /// no longer referenced in the notification file.
write(&self, config: &RepositoryRetentionConfig) -> Result<(), Error>716     fn write(&self, config: &RepositoryRetentionConfig) -> Result<(), Error> {
717         // write snapshot if it's not there
718         let snapshot_path = self.snapshot.path(&self.rrdp_base_dir);
719         if !snapshot_path.exists() {
720             self.snapshot.write_xml(&snapshot_path)?;
721         }
722 
723         // write deltas if they are not there
724         for delta in &self.deltas {
725             let path = delta.path(&self.rrdp_base_dir);
726             if !path.exists() {
727                 // assume that if the delta exists, it is correct
728                 delta.write_xml(&path)?;
729             }
730         }
731 
732         // write notification file
733         let notification_path_new = self.notification_path_new();
734         let notification_path = self.notification_path();
735         self.notification.write_xml(&notification_path_new)?;
736         fs::rename(&notification_path_new, &notification_path).map_err(|e| {
737             KrillIoError::new(
738                 format!(
739                     "Could not rename notification file from '{}' to '{}'",
740                     notification_path_new.to_string_lossy(),
741                     notification_path.to_string_lossy()
742                 ),
743                 e,
744             )
745         })?;
746 
747         // clean up under the base dir:
748         // - old session dirs
749         for entry in fs::read_dir(&self.rrdp_base_dir).map_err(|e| {
750             KrillIoError::new(
751                 format!(
752                     "Could not read RRDP base directory '{}'",
753                     self.rrdp_base_dir.to_string_lossy()
754                 ),
755                 e,
756             )
757         })? {
758             let entry = entry.map_err(|e| {
759                 KrillIoError::new(
760                     format!(
761                         "Could not read entry in RRDP base directory '{}'",
762                         self.rrdp_base_dir.to_string_lossy()
763                     ),
764                     e,
765                 )
766             })?;
767             if self.session.to_string() == entry.file_name().to_string_lossy() {
768                 continue;
769             } else {
770                 let path = entry.path();
771                 if path.is_dir() {
772                     let _best_effort_rm = fs::remove_dir_all(path);
773                 }
774             }
775         }
776 
777         // clean up under the current session
778         let mut session_dir = self.rrdp_base_dir.clone();
779         session_dir.push(self.session.to_string());
780 
781         for entry in fs::read_dir(&session_dir).map_err(|e| {
782             KrillIoError::new(
783                 format!(
784                     "Could not read RRDP session directory '{}'",
785                     session_dir.to_string_lossy()
786                 ),
787                 e,
788             )
789         })? {
790             let entry = entry.map_err(|e| {
791                 KrillIoError::new(
792                     format!(
793                         "Could not read entry in RRDP session directory '{}'",
794                         session_dir.to_string_lossy()
795                     ),
796                     e,
797                 )
798             })?;
799             let path = entry.path();
800 
801             // remove any dir or file that is:
802             // - not a number
803             // - a number that is higher than the current serial
804             // - a number that is lower than the last delta (if set)
805             if let Ok(serial) = u64::from_str(entry.file_name().to_string_lossy().as_ref()) {
806                 // Skip the current serial
807                 if serial == self.serial {
808                     continue;
809                 // Clean up old serial dirs once deltas are out of scope
810                 } else if !self.notification.includes_delta(serial)
811                     && !self.old_notifications.iter().any(|n| n.includes_delta(serial))
812                 {
813                     if config.retention_archive {
814                         // If archiving is enabled, then move these directories under the archive base
815 
816                         let mut dest = self.rrdp_archive_dir.clone();
817                         dest.push(self.session.to_string());
818                         dest.push(format!("{}", serial));
819 
820                         info!("Archiving RRDP serial '{}' to '{}", serial, dest.to_string_lossy());
821                         let _ = fs::create_dir_all(&dest);
822                         let _ = fs::rename(path, dest);
823                     } else if path.is_dir() {
824                         let _best_effort_rm = fs::remove_dir_all(path);
825                     } else {
826                         let _best_effort_rm = fs::remove_file(path);
827                     }
828                 // We still need this old serial dir for the delta, but may not need the snapshot
829                 // in it unless archiving is enabled.. in that case leave them and move them when
830                 // the complete serial dir goes out of scope above.
831                 } else if !config.retention_archive
832                     && !self
833                         .old_notifications
834                         .iter()
835                         .any(|old_notification| old_notification.includes_snapshot(serial))
836                 {
837                     // see if the there is a snapshot file in this serial dir and if so do a best
838                     // effort removal.
839                     if let Ok(Some(snapshot_file_to_remove)) = Self::session_dir_snapshot(&session_dir, serial) {
840                         // snapshot files are stored under their own unique random dir, e.g:
841                         // <session_dir>/<serial>/<random>/snapshot.xml
842                         //
843                         // So also remove the otherwise empty parent directory.
844                         if let Some(snapshot_parent_dir) = snapshot_file_to_remove.parent() {
845                             let _ = fs::remove_dir_all(snapshot_parent_dir);
846                         }
847                     }
848                 } else {
849                     // we still need this
850                 }
851             } else {
852                 // clean up dirs or files under the base dir which are not sessions
853                 warn!(
854                     "Found unexpected file or dir in RRDP repository - will try to remove: {}",
855                     path.to_string_lossy()
856                 );
857                 if path.is_dir() {
858                     let _best_effort_rm = fs::remove_dir_all(path);
859                 } else {
860                     let _best_effort_rm = fs::remove_file(path);
861                 }
862             }
863         }
864 
865         Ok(())
866     }
867 }
868 
869 /// rrdp paths and uris
870 ///
871 impl RrdpServer {
notification_uri(&self) -> uri::Https872     pub fn notification_uri(&self) -> uri::Https {
873         self.rrdp_base_uri.join(b"notification.xml").unwrap()
874     }
875 
notification_path_new(&self) -> PathBuf876     fn notification_path_new(&self) -> PathBuf {
877         let mut path = self.rrdp_base_dir.clone();
878         path.push("new-notification.xml");
879         path
880     }
881 
notification_path(&self) -> PathBuf882     fn notification_path(&self) -> PathBuf {
883         let mut path = self.rrdp_base_dir.clone();
884         path.push("notification.xml");
885         path
886     }
887 
session_dir_snapshot(session_path: &Path, serial: u64) -> KrillResult<Option<PathBuf>>888     pub fn session_dir_snapshot(session_path: &Path, serial: u64) -> KrillResult<Option<PathBuf>> {
889         Self::find_in_serial_dir(session_path, serial, "snapshot.xml")
890     }
891 
892     /// Expects files (like delta.xml or snapshot.xml) under dir structure like:
893     /// <session_path>/<serial>/<some random>/<filename>
find_in_serial_dir(session_path: &Path, serial: u64, filename: &str) -> KrillResult<Option<PathBuf>>894     pub fn find_in_serial_dir(session_path: &Path, serial: u64, filename: &str) -> KrillResult<Option<PathBuf>> {
895         let serial_dir = session_path.join(serial.to_string());
896         if let Ok(randoms) = fs::read_dir(&serial_dir) {
897             for entry in randoms {
898                 let entry = entry.map_err(|e| {
899                     Error::io_error_with_context(
900                         format!(
901                             "Could not open directory entry under RRDP directory {}",
902                             serial_dir.to_string_lossy()
903                         ),
904                         e,
905                     )
906                 })?;
907                 if let Ok(files) = fs::read_dir(entry.path()) {
908                     for file in files {
909                         let file = file.map_err(|e| {
910                             Error::io_error_with_context(
911                                 format!(
912                                     "Could not open directory entry under RRDP directory {}",
913                                     entry.path().to_string_lossy()
914                                 ),
915                                 e,
916                             )
917                         })?;
918                         if file.file_name().to_string_lossy() == filename {
919                             return Ok(Some(file.path()));
920                         }
921                     }
922                 }
923             }
924         }
925         Ok(None)
926     }
927 }
928 
929 /// We can only have one (1) RepositoryAccess, but it is an event-sourced
930 /// typed which is stored in an AggregateStore which could theoretically
931 /// serve multiple. So, we use RepositoryAccessProxy as a wrapper around
932 /// this so that callers don't need to worry about storage details.
933 pub struct RepositoryAccessProxy {
934     store: AggregateStore<RepositoryAccess>,
935     key: Handle,
936 }
937 
938 impl RepositoryAccessProxy {
disk(config: &Config) -> KrillResult<Self>939     pub fn disk(config: &Config) -> KrillResult<Self> {
940         let store = AggregateStore::<RepositoryAccess>::disk(&config.data_dir, PUBSERVER_DIR)?;
941         let key = Handle::from_str(PUBSERVER_DFLT).unwrap();
942 
943         if store.has(&key)? {
944             if config.always_recover_data {
945                 store.recover()?;
946             } else if let Err(e) = store.warm() {
947                 error!(
948                     "Could not warm up cache, storage seems corrupt, will try to recover!! Error was: {}",
949                     e
950                 );
951                 store.recover()?;
952             }
953         }
954 
955         Ok(RepositoryAccessProxy { store, key })
956     }
957 
initialized(&self) -> KrillResult<bool>958     pub fn initialized(&self) -> KrillResult<bool> {
959         self.store.has(&self.key).map_err(Error::AggregateStoreError)
960     }
961 
init(&self, uris: PublicationServerUris, signer: &KrillSigner) -> KrillResult<()>962     pub fn init(&self, uris: PublicationServerUris, signer: &KrillSigner) -> KrillResult<()> {
963         if self.initialized()? {
964             Err(Error::RepositoryServerAlreadyInitialized)
965         } else {
966             let (rrdp_base_uri, rsync_jail) = uris.unpack();
967 
968             let ini = RepositoryAccessInitDetails::init(&self.key, rsync_jail, rrdp_base_uri, signer)?;
969 
970             self.store.add(ini)?;
971 
972             Ok(())
973         }
974     }
975 
clear(&self) -> KrillResult<()>976     pub fn clear(&self) -> KrillResult<()> {
977         if !self.initialized()? {
978             Err(Error::RepositoryServerNotInitialized)
979         } else if !self.publishers()?.is_empty() {
980             Err(Error::RepositoryServerHasPublishers)
981         } else {
982             self.store.drop_aggregate(&self.key)?;
983             Ok(())
984         }
985     }
986 
read(&self) -> KrillResult<Arc<RepositoryAccess>>987     fn read(&self) -> KrillResult<Arc<RepositoryAccess>> {
988         if !self.initialized()? {
989             Err(Error::RepositoryServerNotInitialized)
990         } else {
991             self.store
992                 .get_latest(&self.key)
993                 .map_err(|e| Error::custom(format!("Publication Server data issue: {}", e)))
994         }
995     }
996 
publishers(&self) -> KrillResult<Vec<PublisherHandle>>997     pub fn publishers(&self) -> KrillResult<Vec<PublisherHandle>> {
998         Ok(self.read()?.publishers())
999     }
1000 
get_publisher(&self, name: &PublisherHandle) -> KrillResult<Publisher>1001     pub fn get_publisher(&self, name: &PublisherHandle) -> KrillResult<Publisher> {
1002         self.read()?.get_publisher(name).map(|p| p.clone())
1003     }
1004 
add_publisher(&self, req: rfc8183::PublisherRequest, actor: &Actor) -> KrillResult<()>1005     pub fn add_publisher(&self, req: rfc8183::PublisherRequest, actor: &Actor) -> KrillResult<()> {
1006         let base_uri = self.read()?.base_uri_for(req.publisher_handle())?; // will verify that server was initialized
1007         let cmd = RepoAccessCmdDet::add_publisher(&self.key, req, base_uri, actor);
1008         self.store.command(cmd)?;
1009         Ok(())
1010     }
1011 
remove_publisher(&self, name: PublisherHandle, actor: &Actor) -> KrillResult<()>1012     pub fn remove_publisher(&self, name: PublisherHandle, actor: &Actor) -> KrillResult<()> {
1013         if !self.initialized()? {
1014             Err(Error::RepositoryServerNotInitialized)
1015         } else {
1016             let cmd = RepoAccessCmdDet::remove_publisher(&self.key, name, actor);
1017             self.store.command(cmd)?;
1018             Ok(())
1019         }
1020     }
1021 
1022     /// Returns the repository URI information for a publisher.
repo_info_for(&self, name: &PublisherHandle) -> KrillResult<RepoInfo>1023     pub fn repo_info_for(&self, name: &PublisherHandle) -> KrillResult<RepoInfo> {
1024         self.read()?.repo_info_for(name)
1025     }
1026 
1027     /// Returns the RFC8183 Repository Response for the publisher
repository_response( &self, rfc8181_uri: uri::Https, publisher: &PublisherHandle, ) -> KrillResult<rfc8183::RepositoryResponse>1028     pub fn repository_response(
1029         &self,
1030         rfc8181_uri: uri::Https,
1031         publisher: &PublisherHandle,
1032     ) -> KrillResult<rfc8183::RepositoryResponse> {
1033         self.read()?.repository_response(rfc8181_uri, publisher)
1034     }
1035 
1036     /// Parse submitted bytes by a Publisher as an RFC8181 ProtocolCms object, and validates it.
validate(&self, publisher: &PublisherHandle, msg: Bytes) -> KrillResult<ProtocolCms>1037     pub fn validate(&self, publisher: &PublisherHandle, msg: Bytes) -> KrillResult<ProtocolCms> {
1038         let publisher = self.get_publisher(publisher)?;
1039         let msg = ProtocolCms::decode(msg, false).map_err(|e| Error::Rfc8181Decode(e.to_string()))?;
1040         msg.validate(publisher.id_cert()).map_err(Error::Rfc8181Validation)?;
1041         Ok(msg)
1042     }
1043 
1044     /// Creates and signs an RFC8181 CMS response.
respond(&self, message: Bytes, signer: &KrillSigner) -> KrillResult<Bytes>1045     pub fn respond(&self, message: Bytes, signer: &KrillSigner) -> KrillResult<Bytes> {
1046         let key_id = self.read()?.key_id();
1047         let response_builder = ProtocolCmsBuilder::create(&key_id, signer, message).map_err(Error::signer)?;
1048         Ok(response_builder.as_bytes())
1049     }
1050 }
1051 
1052 //------------ RepositoryAccess --------------------------------------------
1053 
1054 /// An RFC8183 Repository server, capable of handling Publishers (both embedded, and
1055 /// remote RFC8183), and publishing to RRDP and disk, and signing responses.
1056 #[derive(Clone, Debug, Deserialize, Serialize)]
1057 pub struct RepositoryAccess {
1058     // Event sourcing support
1059     handle: Handle,
1060     version: u64,
1061 
1062     id_cert: IdCert,
1063     publishers: HashMap<PublisherHandle, Publisher>,
1064 
1065     rsync_base: uri::Rsync,
1066     rrdp_base: uri::Https,
1067 }
1068 
1069 impl RepositoryAccess {
key_id(&self) -> KeyIdentifier1070     pub fn key_id(&self) -> KeyIdentifier {
1071         self.id_cert.subject_public_key_info().key_identifier()
1072     }
1073 }
1074 
1075 /// # Event Sourcing support
1076 ///
1077 impl Aggregate for RepositoryAccess {
1078     type Command = RepoAccessCmd;
1079     type StorableCommandDetails = StorableRepositoryCommand;
1080     type Event = RepositoryAccessEvent;
1081     type InitEvent = RepositoryAccessIni;
1082     type Error = Error;
1083 
init(event: Self::InitEvent) -> Result<Self, Self::Error>1084     fn init(event: Self::InitEvent) -> Result<Self, Self::Error> {
1085         let (handle, _version, details) = event.unpack();
1086         let (id_cert, rrdp_base, rsync_base) = details.unpack();
1087 
1088         Ok(RepositoryAccess {
1089             handle,
1090             version: 1,
1091             id_cert,
1092             publishers: HashMap::new(),
1093             rsync_base,
1094             rrdp_base,
1095         })
1096     }
1097 
version(&self) -> u641098     fn version(&self) -> u64 {
1099         self.version
1100     }
1101 
apply(&mut self, event: Self::Event)1102     fn apply(&mut self, event: Self::Event) {
1103         self.version += 1;
1104         match event.into_details() {
1105             RepositoryAccessEventDetails::PublisherAdded { name, publisher } => {
1106                 self.publishers.insert(name, publisher);
1107             }
1108             RepositoryAccessEventDetails::PublisherRemoved { name } => {
1109                 self.publishers.remove(&name);
1110             }
1111         }
1112     }
1113 
process_command(&self, command: Self::Command) -> Result<Vec<Self::Event>, Self::Error>1114     fn process_command(&self, command: Self::Command) -> Result<Vec<Self::Event>, Self::Error> {
1115         info!(
1116             "Sending command to publisher '{}', version: {}: {}",
1117             self.handle, self.version, command
1118         );
1119 
1120         match command.into_details() {
1121             RepoAccessCmdDet::AddPublisher { request, base_uri } => self.add_publisher(request, base_uri),
1122             RepoAccessCmdDet::RemovePublisher { name } => self.remove_publisher(name),
1123         }
1124     }
1125 }
1126 
1127 /// # Manage publishers
1128 ///
1129 impl RepositoryAccess {
1130     /// Adds a publisher with access to the repository
add_publisher( &self, publisher_request: rfc8183::PublisherRequest, base_uri: uri::Rsync, ) -> Result<Vec<RepositoryAccessEvent>, Error>1131     fn add_publisher(
1132         &self,
1133         publisher_request: rfc8183::PublisherRequest,
1134         base_uri: uri::Rsync,
1135     ) -> Result<Vec<RepositoryAccessEvent>, Error> {
1136         let (_tag, name, id_cert) = publisher_request.unpack();
1137 
1138         if self.publishers.contains_key(&name) {
1139             Err(Error::PublisherDuplicate(name))
1140         } else {
1141             let publisher = Publisher::new(id_cert, base_uri);
1142 
1143             Ok(vec![RepositoryAccessEventDetails::publisher_added(
1144                 &self.handle,
1145                 self.version,
1146                 name,
1147                 publisher,
1148             )])
1149         }
1150     }
1151 
1152     /// Removes a publisher and all its content
remove_publisher(&self, publisher_handle: PublisherHandle) -> Result<Vec<RepositoryAccessEvent>, Error>1153     fn remove_publisher(&self, publisher_handle: PublisherHandle) -> Result<Vec<RepositoryAccessEvent>, Error> {
1154         if !self.has_publisher(&publisher_handle) {
1155             Err(Error::PublisherUnknown(publisher_handle))
1156         } else {
1157             Ok(vec![RepositoryAccessEventDetails::publisher_removed(
1158                 &self.handle,
1159                 self.version,
1160                 publisher_handle,
1161             )])
1162         }
1163     }
1164 
notification_uri(&self) -> uri::Https1165     fn notification_uri(&self) -> uri::Https {
1166         self.rrdp_base.join(b"notification.xml").unwrap()
1167     }
1168 
base_uri_for(&self, name: &PublisherHandle) -> KrillResult<uri::Rsync>1169     fn base_uri_for(&self, name: &PublisherHandle) -> KrillResult<uri::Rsync> {
1170         uri::Rsync::from_str(&format!("{}{}/", self.rsync_base, name))
1171             .map_err(|_| Error::Custom(format!("Cannot derive base uri for {}", name)))
1172     }
1173 
1174     /// Returns the repository URI information for a publisher.
repo_info_for(&self, name: &PublisherHandle) -> KrillResult<RepoInfo>1175     pub fn repo_info_for(&self, name: &PublisherHandle) -> KrillResult<RepoInfo> {
1176         let rsync_base = self.base_uri_for(name)?;
1177         Ok(RepoInfo::new(rsync_base, self.notification_uri()))
1178     }
1179 
repository_response( &self, rfc8181_uri: uri::Https, publisher_handle: &PublisherHandle, ) -> Result<rfc8183::RepositoryResponse, Error>1180     pub fn repository_response(
1181         &self,
1182         rfc8181_uri: uri::Https,
1183         publisher_handle: &PublisherHandle,
1184     ) -> Result<rfc8183::RepositoryResponse, Error> {
1185         let publisher = self.get_publisher(publisher_handle)?;
1186         let rsync_base = publisher.base_uri();
1187         let service_uri = rfc8183::ServiceUri::Https(rfc8181_uri);
1188 
1189         let repo_info = RepoInfo::new(rsync_base.clone(), self.notification_uri());
1190 
1191         Ok(rfc8183::RepositoryResponse::new(
1192             None,
1193             publisher_handle.clone(),
1194             self.id_cert.clone(),
1195             service_uri,
1196             repo_info,
1197         ))
1198     }
1199 
get_publisher(&self, publisher_handle: &PublisherHandle) -> Result<&Publisher, Error>1200     pub fn get_publisher(&self, publisher_handle: &PublisherHandle) -> Result<&Publisher, Error> {
1201         self.publishers
1202             .get(publisher_handle)
1203             .ok_or_else(|| Error::PublisherUnknown(publisher_handle.clone()))
1204     }
1205 
has_publisher(&self, name: &PublisherHandle) -> bool1206     pub fn has_publisher(&self, name: &PublisherHandle) -> bool {
1207         self.publishers.contains_key(name)
1208     }
1209 
publishers(&self) -> Vec<PublisherHandle>1210     pub fn publishers(&self) -> Vec<PublisherHandle> {
1211         self.publishers.keys().cloned().collect()
1212     }
1213 }
1214 
1215 //------------ RepoStats -----------------------------------------------------
1216 
1217 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1218 pub struct RepoStats {
1219     publishers: HashMap<PublisherHandle, PublisherStats>,
1220     session: RrdpSession,
1221     serial: u64,
1222     last_update: Option<Time>,
1223 }
1224 
1225 impl Default for RepoStats {
default() -> Self1226     fn default() -> Self {
1227         RepoStats {
1228             publishers: HashMap::new(),
1229             session: RrdpSession::default(),
1230             serial: 0,
1231             last_update: None,
1232         }
1233     }
1234 }
1235 
1236 impl RepoStats {
new(session: RrdpSession) -> Self1237     pub fn new(session: RrdpSession) -> Self {
1238         RepoStats {
1239             publishers: HashMap::new(),
1240             session,
1241             serial: 0,
1242             last_update: None,
1243         }
1244     }
1245 
publish( &mut self, publisher: &PublisherHandle, publisher_stats: PublisherStats, notification: &Notification, )1246     pub fn publish(
1247         &mut self,
1248         publisher: &PublisherHandle,
1249         publisher_stats: PublisherStats,
1250         notification: &Notification,
1251     ) {
1252         self.publishers.insert(publisher.clone(), publisher_stats);
1253         self.serial = notification.serial();
1254         self.last_update = Some(notification.time());
1255     }
1256 
session_reset(&mut self, notification: &Notification)1257     pub fn session_reset(&mut self, notification: &Notification) {
1258         self.session = notification.session();
1259         self.serial = notification.serial();
1260         self.last_update = Some(notification.time())
1261     }
1262 
new_publisher(&mut self, publisher: &PublisherHandle)1263     pub fn new_publisher(&mut self, publisher: &PublisherHandle) {
1264         self.publishers.insert(publisher.clone(), PublisherStats::default());
1265     }
1266 
remove_publisher(&mut self, publisher: &PublisherHandle, notification: &Notification)1267     pub fn remove_publisher(&mut self, publisher: &PublisherHandle, notification: &Notification) {
1268         self.publishers.remove(publisher);
1269         self.serial = notification.serial();
1270         self.last_update = Some(notification.time())
1271     }
1272 
get_publishers(&self) -> &HashMap<PublisherHandle, PublisherStats>1273     pub fn get_publishers(&self) -> &HashMap<PublisherHandle, PublisherStats> {
1274         &self.publishers
1275     }
1276 
stale_publishers(&self, seconds: i64) -> Vec<PublisherHandle>1277     pub fn stale_publishers(&self, seconds: i64) -> Vec<PublisherHandle> {
1278         let mut res = vec![];
1279         for (publisher, stats) in self.publishers.iter() {
1280             if let Some(update_time) = stats.last_update {
1281                 if Time::now().timestamp() - update_time.timestamp() >= seconds {
1282                     res.push(publisher.clone())
1283                 }
1284             } else {
1285                 res.push(publisher.clone())
1286             }
1287         }
1288         res
1289     }
1290 
last_update(&self) -> Option<Time>1291     pub fn last_update(&self) -> Option<Time> {
1292         self.last_update
1293     }
1294 
serial(&self) -> u641295     pub fn serial(&self) -> u64 {
1296         self.serial
1297     }
1298 
session(&self) -> RrdpSession1299     pub fn session(&self) -> RrdpSession {
1300         self.session
1301     }
1302 }
1303 
1304 impl fmt::Display for RepoStats {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1305     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1306         if let Some(update) = self.last_update() {
1307             writeln!(f, "RRDP updated: {}", update.to_rfc3339())?;
1308         }
1309         writeln!(f, "RRDP session: {}", self.session())?;
1310         writeln!(f, "RRDP serial:  {}", self.serial())?;
1311         writeln!(f)?;
1312         writeln!(f, "Publisher, Objects, Size, Last Updated")?;
1313         for (publisher, stats) in self.get_publishers() {
1314             let update_str = match stats.last_update() {
1315                 None => "never".to_string(),
1316                 Some(update) => update.to_rfc3339(),
1317             };
1318             writeln!(
1319                 f,
1320                 "{}, {}, {}, {}",
1321                 publisher,
1322                 stats.objects(),
1323                 stats.size(),
1324                 update_str
1325             )?;
1326         }
1327 
1328         Ok(())
1329     }
1330 }
1331 
1332 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1333 pub struct PublisherStats {
1334     objects: usize,
1335     size: usize,
1336     last_update: Option<Time>,
1337 }
1338 
1339 impl PublisherStats {
new(current_objects: &CurrentObjects, last_update: Time) -> Self1340     pub fn new(current_objects: &CurrentObjects, last_update: Time) -> Self {
1341         let objects = current_objects.len();
1342         let size = current_objects.size();
1343         PublisherStats {
1344             objects,
1345             size,
1346             last_update: Some(last_update),
1347         }
1348     }
1349 
objects(&self) -> usize1350     pub fn objects(&self) -> usize {
1351         self.objects
1352     }
1353 
size(&self) -> usize1354     pub fn size(&self) -> usize {
1355         self.size
1356     }
1357 
last_update(&self) -> Option<Time>1358     pub fn last_update(&self) -> Option<Time> {
1359         self.last_update
1360     }
1361 }
1362 
1363 impl From<&CurrentObjects> for PublisherStats {
from(objects: &CurrentObjects) -> Self1364     fn from(objects: &CurrentObjects) -> Self {
1365         PublisherStats {
1366             objects: objects.len(),
1367             size: objects.size(),
1368             last_update: None,
1369         }
1370     }
1371 }
1372 
1373 impl Default for PublisherStats {
default() -> Self1374     fn default() -> Self {
1375         PublisherStats {
1376             objects: 0,
1377             size: 0,
1378             last_update: None,
1379         }
1380     }
1381 }
1382