1 use std::{ 2 collections::{HashMap, VecDeque}, 3 fmt, fs, mem, 4 path::{Path, PathBuf}, 5 str::FromStr, 6 sync::{Arc, RwLock}, 7 }; 8 9 use bytes::Bytes; 10 11 use rpki::{ 12 repository::{crypto::KeyIdentifier, x509::Time}, 13 uri, 14 }; 15 16 use crate::{ 17 commons::{ 18 actor::Actor, 19 api::rrdp::{ 20 CurrentObjects, Delta, DeltaElements, DeltaRef, FileRef, Notification, RrdpSession, Snapshot, SnapshotRef, 21 }, 22 api::{ 23 Handle, HexEncodedHash, ListReply, PublicationServerUris, PublishDelta, PublisherHandle, RepoInfo, 24 StorableRepositoryCommand, 25 }, 26 crypto::{IdCert, KrillSigner, ProtocolCms, ProtocolCmsBuilder}, 27 error::{Error, KrillIoError}, 28 eventsourcing::{Aggregate, AggregateStore, KeyStoreKey, KeyValueStore}, 29 remote::rfc8183, 30 util::file, 31 KrillResult, 32 }, 33 constants::{ 34 PUBSERVER_CONTENT_DIR, PUBSERVER_DFLT, PUBSERVER_DIR, REPOSITORY_DIR, REPOSITORY_RRDP_ARCHIVE_DIR, 35 REPOSITORY_RRDP_DIR, REPOSITORY_RSYNC_DIR, RRDP_FIRST_SERIAL, 36 }, 37 daemon::config::{Config, RepositoryRetentionConfig}, 38 pubd::{ 39 publishers::Publisher, RepoAccessCmd, RepoAccessCmdDet, RepositoryAccessEvent, RepositoryAccessEventDetails, 40 RepositoryAccessIni, RepositoryAccessInitDetails, 41 }, 42 }; 43 44 //------------ RepositoryContentProxy ---------------------------------------- 45 46 /// We can only have one (1) RepositoryContent, but it is stored 47 /// in a KeyValueStore. So this type provides a wrapper around this 48 /// so that callers don't need to worry about storage details. 49 #[derive(Debug)] 50 pub struct RepositoryContentProxy { 51 cache: RwLock<Option<RepositoryContent>>, 52 store: RwLock<KeyValueStore>, 53 key: KeyStoreKey, 54 } 55 56 impl RepositoryContentProxy { disk(config: &Config) -> KrillResult<Self>57 pub fn disk(config: &Config) -> KrillResult<Self> { 58 let work_dir = &config.data_dir; 59 let store = KeyValueStore::disk(work_dir, PUBSERVER_CONTENT_DIR)?; 60 let store = RwLock::new(store); 61 let key = KeyStoreKey::simple(format!("{}.json", PUBSERVER_DFLT)); 62 let cache = RwLock::new(None); 63 64 let proxy = RepositoryContentProxy { cache, store, key }; 65 proxy.warm_cache()?; 66 67 Ok(proxy) 68 } 69 warm_cache(&self) -> KrillResult<()>70 fn warm_cache(&self) -> KrillResult<()> { 71 let key_store_read = self.store.read().unwrap(); 72 73 if key_store_read.has(&self.key)? { 74 info!("Warming the repository content cache, this can take a minute for large repositories."); 75 let content = key_store_read.get(&self.key)?.unwrap(); 76 self.cache.write().unwrap().replace(content); 77 } 78 79 Ok(()) 80 } 81 82 /// Initialize init(&self, work_dir: &Path, uris: PublicationServerUris) -> KrillResult<()>83 pub fn init(&self, work_dir: &Path, uris: PublicationServerUris) -> KrillResult<()> { 84 if self.store.read().unwrap().has(&self.key)? { 85 Err(Error::RepositoryServerAlreadyInitialized) 86 } else { 87 // initialize new repo content 88 let repository_content = { 89 let (rrdp_base_uri, rsync_jail) = uris.unpack(); 90 91 let publishers = HashMap::new(); 92 93 let session = RrdpSession::default(); 94 let stats = RepoStats::new(session); 95 96 let mut repo_dir = work_dir.to_path_buf(); 97 repo_dir.push(REPOSITORY_DIR); 98 99 let rrdp = RrdpServer::create(rrdp_base_uri, &repo_dir, session); 100 let rsync = RsyncdStore::new(rsync_jail, &repo_dir); 101 102 RepositoryContent::new(publishers, rrdp, rsync, stats) 103 }; 104 105 // Store newly initialized repo content on disk 106 let store = self.store.write().unwrap(); 107 store.store(&self.key, &repository_content)?; 108 109 // Store newly initialized repo content in cache 110 let mut cache = self.cache.write().unwrap(); 111 cache.replace(repository_content); 112 113 Ok(()) 114 } 115 } 116 117 // Clear all content, so it can be re-initialized. 118 // Only to be called after all publishers have been removed from the RepoAccess as well. clear(&self) -> KrillResult<()>119 pub fn clear(&self) -> KrillResult<()> { 120 let store = self.store.write().unwrap(); 121 122 if let Ok(Some(content)) = store.get::<RepositoryContent>(&self.key) { 123 content.clear(); 124 store.drop_key(&self.key)?; 125 } 126 127 let mut cache = self.cache.write().unwrap(); 128 cache.take(); 129 130 Ok(()) 131 } 132 133 /// Return the repository content stats stats(&self) -> KrillResult<RepoStats>134 pub fn stats(&self) -> KrillResult<RepoStats> { 135 self.read(|content| Ok(content.stats().clone())) 136 } 137 138 /// Add a publisher with an empty set of published objects. 139 /// 140 /// Replaces an existing publisher if it existed. 141 /// This is only supposed to be called if adding the publisher 142 /// to the RepositoryAccess was successful (and *that* will fail if 143 /// the publisher is a duplicate). This method can only fail if 144 /// there is an issue with the underlying key value store. add_publisher(&self, name: PublisherHandle) -> KrillResult<()>145 pub fn add_publisher(&self, name: PublisherHandle) -> KrillResult<()> { 146 self.write(|content| content.add_publisher(name)) 147 } 148 149 /// Removes a publisher and its content. remove_publisher( &self, name: &PublisherHandle, jail: &uri::Rsync, config: &RepositoryRetentionConfig, ) -> KrillResult<()>150 pub fn remove_publisher( 151 &self, 152 name: &PublisherHandle, 153 jail: &uri::Rsync, 154 config: &RepositoryRetentionConfig, 155 ) -> KrillResult<()> { 156 self.write(|content| content.remove_publisher(name, jail, config)) 157 } 158 159 /// Publish an update for a publisher. 160 /// 161 /// Assumes that the RFC 8181 CMS has been verified, but will check that all objects 162 /// are within the publisher's uri space (jail). publish( &self, name: &PublisherHandle, delta: PublishDelta, jail: &uri::Rsync, config: &RepositoryRetentionConfig, ) -> KrillResult<()>163 pub fn publish( 164 &self, 165 name: &PublisherHandle, 166 delta: PublishDelta, 167 jail: &uri::Rsync, 168 config: &RepositoryRetentionConfig, 169 ) -> KrillResult<()> { 170 self.write(|content| content.publish(name, delta.into(), jail, config)) 171 } 172 173 /// Write all current files to disk write_repository(&self, config: &RepositoryRetentionConfig) -> KrillResult<()>174 pub fn write_repository(&self, config: &RepositoryRetentionConfig) -> KrillResult<()> { 175 self.read(|content| content.write_repository(config)) 176 } 177 178 /// Reset the RRDP session if it is initialized. Otherwise do nothing. session_reset(&self, config: &RepositoryRetentionConfig) -> KrillResult<()>179 pub fn session_reset(&self, config: &RepositoryRetentionConfig) -> KrillResult<()> { 180 if self.cache.read().unwrap().is_some() { 181 self.write(|content| content.session_reset(config)) 182 } else { 183 // repository server was not initialized on this Krill instance. Nothing to reset. 184 Ok(()) 185 } 186 } 187 188 /// Create a list reply containing all current objects for a publisher list_reply(&self, name: &PublisherHandle) -> KrillResult<ListReply>189 pub fn list_reply(&self, name: &PublisherHandle) -> KrillResult<ListReply> { 190 self.read(|content| content.list_reply(name)) 191 } 192 193 // Get all current objects for a publisher current_objects(&self, name: &PublisherHandle) -> KrillResult<CurrentObjects>194 pub fn current_objects(&self, name: &PublisherHandle) -> KrillResult<CurrentObjects> { 195 self.read(|content| content.objects_for_publisher(name).map(|o| o.clone())) 196 } 197 198 // Execute a closure on a mutable repository content in a single write 'transaction' write<F: FnOnce(&mut RepositoryContent) -> KrillResult<()>>(&self, op: F) -> KrillResult<()>199 fn write<F: FnOnce(&mut RepositoryContent) -> KrillResult<()>>(&self, op: F) -> KrillResult<()> { 200 // If there is any existing content, then we can assume that the cache 201 // has it - because it's initialized when we read the content during 202 // initialization. 203 let store = self.store.write().unwrap(); 204 let mut cache = self.cache.write().unwrap(); 205 206 let content: &mut RepositoryContent = cache.as_mut().ok_or(Error::RepositoryServerNotInitialized)?; 207 208 op(content)?; 209 210 store.store(&self.key, content)?; 211 Ok(()) 212 } 213 214 // Execute a closure on a mutable repository content in a single read 'transaction' 215 // 216 // This function fails if the repository content is not initialized. read<A, F: FnOnce(&RepositoryContent) -> KrillResult<A>>(&self, op: F) -> KrillResult<A>217 fn read<A, F: FnOnce(&RepositoryContent) -> KrillResult<A>>(&self, op: F) -> KrillResult<A> { 218 // Note that because the content is initialized it is implied that the cache MUST always be 219 // set. I.e. it is set on initialization and updated whenever the repository content is updated. 220 // So, we can safely read from the cache only. 221 let cache = self.cache.read().unwrap(); 222 let content = cache.as_ref().ok_or(Error::RepositoryServerNotInitialized)?; 223 op(content) 224 } 225 } 226 227 //------------ RepositoryContent ------------------------------------------- 228 229 /// This type manages the content of the repository. Note that access 230 /// to the repository is managed by an event sourced component which 231 /// handles RFC8181 based access, and which can enforce restrictions, 232 /// such as the base uri for publishers. 233 #[derive(Clone, Debug, Deserialize, Serialize)] 234 pub struct RepositoryContent { 235 publishers: HashMap<PublisherHandle, CurrentObjects>, 236 237 rrdp: RrdpServer, 238 rsync: RsyncdStore, 239 240 stats: RepoStats, 241 } 242 243 impl RepositoryContent { new( publishers: HashMap<PublisherHandle, CurrentObjects>, rrdp: RrdpServer, rsync: RsyncdStore, stats: RepoStats, ) -> Self244 pub fn new( 245 publishers: HashMap<PublisherHandle, CurrentObjects>, 246 rrdp: RrdpServer, 247 rsync: RsyncdStore, 248 stats: RepoStats, 249 ) -> Self { 250 RepositoryContent { 251 publishers, 252 rrdp, 253 rsync, 254 stats, 255 } 256 } 257 init(rrdp_base_uri: uri::Https, rsync_jail: uri::Rsync, session: RrdpSession, repo_base_dir: &Path) -> Self258 pub fn init(rrdp_base_uri: uri::Https, rsync_jail: uri::Rsync, session: RrdpSession, repo_base_dir: &Path) -> Self { 259 let publishers = HashMap::new(); 260 let rrdp = RrdpServer::create(rrdp_base_uri, repo_base_dir, session); 261 let rsync = RsyncdStore::new(rsync_jail, repo_base_dir); 262 let stats = RepoStats::new(session); 263 264 RepositoryContent { 265 publishers, 266 rrdp, 267 rsync, 268 stats, 269 } 270 } 271 272 // Clears all content on disk so the repository can be re-initialized clear(&self)273 pub fn clear(&self) { 274 self.rrdp.clear(); 275 self.rsync.clear(); 276 } 277 stats(&self) -> &RepoStats278 pub fn stats(&self) -> &RepoStats { 279 &self.stats 280 } 281 } 282 283 /// # Publisher Content 284 impl RepositoryContent { objects_for_publisher(&self, publisher: &PublisherHandle) -> KrillResult<&CurrentObjects>285 fn objects_for_publisher(&self, publisher: &PublisherHandle) -> KrillResult<&CurrentObjects> { 286 self.publishers 287 .get(publisher) 288 .ok_or_else(|| Error::PublisherUnknown(publisher.clone())) 289 } 290 objects_for_publisher_mut(&mut self, publisher: &PublisherHandle) -> KrillResult<&mut CurrentObjects>291 fn objects_for_publisher_mut(&mut self, publisher: &PublisherHandle) -> KrillResult<&mut CurrentObjects> { 292 self.publishers 293 .get_mut(publisher) 294 .ok_or_else(|| Error::PublisherUnknown(publisher.clone())) 295 } 296 297 /// Gets a list reply containing all objects for this publisher. list_reply(&self, publisher: &Handle) -> KrillResult<ListReply>298 pub fn list_reply(&self, publisher: &Handle) -> KrillResult<ListReply> { 299 self.objects_for_publisher(publisher).map(|o| o.to_list_reply()) 300 } 301 publish( &mut self, name: &PublisherHandle, delta: DeltaElements, jail: &uri::Rsync, config: &RepositoryRetentionConfig, ) -> KrillResult<()>302 pub fn publish( 303 &mut self, 304 name: &PublisherHandle, 305 delta: DeltaElements, 306 jail: &uri::Rsync, 307 config: &RepositoryRetentionConfig, 308 ) -> KrillResult<()> { 309 // update publisher, this will fail if the publisher tries 310 // to update outside of its jail. 311 let objects = self.objects_for_publisher_mut(name)?; 312 objects.apply_delta(delta.clone(), jail)?; 313 let publisher_stats = PublisherStats::new(objects, Time::now()); 314 315 // update the RRDP server 316 self.rrdp.publish(delta, jail, config)?; 317 318 // write repo (note rsync is based on updated rrdp snapshot) 319 self.write_repository(config)?; 320 321 // Update publisher stats 322 self.stats.publish(name, publisher_stats, self.rrdp.notification()); 323 324 Ok(()) 325 } 326 session_reset(&mut self, config: &RepositoryRetentionConfig) -> KrillResult<()>327 pub fn session_reset(&mut self, config: &RepositoryRetentionConfig) -> KrillResult<()> { 328 info!( 329 "Performing RRDP session reset. This ensures a consistent view for RPs in case we restarted from a backup." 330 ); 331 332 self.rrdp.session_reset(); 333 self.stats.session_reset(self.rrdp.notification()); 334 self.write_repository(config)?; 335 336 Ok(()) 337 } 338 write_repository(&self, config: &RepositoryRetentionConfig) -> KrillResult<()>339 pub fn write_repository(&self, config: &RepositoryRetentionConfig) -> KrillResult<()> { 340 self.rrdp.write(config)?; 341 self.rsync.write(self.rrdp.snapshot()) 342 } 343 add_publisher(&mut self, name: PublisherHandle) -> KrillResult<()>344 pub fn add_publisher(&mut self, name: PublisherHandle) -> KrillResult<()> { 345 self.stats.new_publisher(&name); 346 self.publishers.insert(name, CurrentObjects::default()); 347 Ok(()) 348 } 349 350 /// Removes the content for a publisher. This function will return 351 /// ok if there is no content to remove - it is idempotent in that 352 /// sense. However, if there are I/O errors removing the content then 353 /// this function will fail. remove_publisher( &mut self, name: &PublisherHandle, jail: &uri::Rsync, config: &RepositoryRetentionConfig, ) -> KrillResult<()>354 pub fn remove_publisher( 355 &mut self, 356 name: &PublisherHandle, 357 jail: &uri::Rsync, 358 config: &RepositoryRetentionConfig, 359 ) -> KrillResult<()> { 360 if let Ok(objects) = self.objects_for_publisher(name) { 361 let withdraws = objects.elements().iter().map(|e| e.as_withdraw()).collect(); 362 let delta = DeltaElements::new(vec![], vec![], withdraws); 363 364 self.rrdp.publish(delta, jail, config)?; 365 self.stats.remove_publisher(name, self.rrdp.notification()); 366 367 self.write_repository(config) 368 } else { 369 // nothing to remove 370 Ok(()) 371 } 372 } 373 } 374 375 //------------ RsyncdStore --------------------------------------------------- 376 377 /// This type is responsible for publishing files on disk in a structure so 378 /// that an rsyncd can be set up to serve this (RPKI) data. Note that the 379 /// rsync host name and module are part of the path, so make sure that the 380 /// rsyncd modules and paths are setup properly for each supported rsync 381 /// base uri used. 382 #[derive(Clone, Debug, Deserialize, Serialize)] 383 pub struct RsyncdStore { 384 base_uri: uri::Rsync, 385 rsync_dir: PathBuf, 386 #[serde(skip_serializing, skip_deserializing, default = "RsyncdStore::new_lock")] 387 lock: Arc<RwLock<()>>, 388 } 389 390 /// # Construct 391 /// 392 impl RsyncdStore { new_lock() -> Arc<RwLock<()>>393 pub fn new_lock() -> Arc<RwLock<()>> { 394 Arc::new(RwLock::new(())) 395 } 396 new(base_uri: uri::Rsync, repo_dir: &Path) -> Self397 pub fn new(base_uri: uri::Rsync, repo_dir: &Path) -> Self { 398 let mut rsync_dir = repo_dir.to_path_buf(); 399 rsync_dir.push(REPOSITORY_RSYNC_DIR); 400 let lock = Self::new_lock(); 401 RsyncdStore { 402 base_uri, 403 rsync_dir, 404 lock, 405 } 406 } 407 } 408 409 /// # Publishing 410 /// 411 impl RsyncdStore { 412 /// Write all the files to disk for rsync to a tmp-dir, then switch 413 /// things over in an effort to minimize the chance of people getting 414 /// inconsistent syncs.. write(&self, snapshot: &Snapshot) -> KrillResult<()>415 pub fn write(&self, snapshot: &Snapshot) -> KrillResult<()> { 416 let _lock = self 417 .lock 418 .write() 419 .map_err(|_| Error::custom("Could not get write lock for rsync repo"))?; 420 421 let mut new_dir = self.rsync_dir.clone(); 422 new_dir.push(&format!("tmp-{}", snapshot.serial())); 423 fs::create_dir_all(&new_dir).map_err(|e| { 424 KrillIoError::new( 425 format!( 426 "Could not create dir(s) '{}' for publishing rsync", 427 new_dir.to_string_lossy() 428 ), 429 e, 430 ) 431 })?; 432 433 let elements = snapshot.elements(); 434 435 for publish in elements { 436 let rel = publish 437 .uri() 438 .relative_to(&self.base_uri) 439 .ok_or_else(|| Error::publishing_outside_jail(publish.uri(), &self.base_uri))?; 440 441 let mut path = new_dir.clone(); 442 path.push(rel); 443 444 file::save(&publish.base64().to_bytes(), &path)?; 445 } 446 447 let mut current_dir = self.rsync_dir.clone(); 448 current_dir.push("current"); 449 450 let mut old_dir = self.rsync_dir.clone(); 451 old_dir.push("old"); 452 453 if current_dir.exists() { 454 fs::rename(¤t_dir, &old_dir).map_err(|e| { 455 KrillIoError::new( 456 format!( 457 "Could not rename current rsync dir from '{}' to '{}' while publishing", 458 current_dir.to_string_lossy(), 459 old_dir.to_string_lossy() 460 ), 461 e, 462 ) 463 })?; 464 } 465 466 fs::rename(&new_dir, ¤t_dir).map_err(|e| { 467 KrillIoError::new( 468 format!( 469 "Could not rename new rsync dir from '{}' to '{}' while publishing", 470 new_dir.to_string_lossy(), 471 current_dir.to_string_lossy() 472 ), 473 e, 474 ) 475 })?; 476 477 if old_dir.exists() { 478 fs::remove_dir_all(&old_dir).map_err(|e| { 479 KrillIoError::new( 480 format!( 481 "Could not remove up old rsync dir '{}' while publishing", 482 old_dir.to_string_lossy() 483 ), 484 e, 485 ) 486 })?; 487 } 488 489 Ok(()) 490 } 491 clear(&self)492 fn clear(&self) { 493 let _ = fs::remove_dir_all(&self.rsync_dir); 494 } 495 } 496 497 /// The RRDP server used by a Repository instance 498 #[derive(Clone, Debug, Deserialize, Serialize)] 499 pub struct RrdpServer { 500 /// The base URI for notification, snapshot and delta files. 501 rrdp_base_uri: uri::Https, 502 503 /// The base directory where notification, snapshot and deltas will be 504 /// published. 505 rrdp_base_dir: PathBuf, 506 rrdp_archive_dir: PathBuf, 507 508 session: RrdpSession, 509 serial: u64, 510 notification: Notification, 511 512 #[serde(skip_serializing_if = "VecDeque::is_empty", default = "VecDeque::new")] 513 old_notifications: VecDeque<Notification>, 514 515 snapshot: Snapshot, 516 deltas: VecDeque<Delta>, 517 } 518 519 impl RrdpServer { 520 #[allow(clippy::too_many_arguments)] new( rrdp_base_uri: uri::Https, rrdp_base_dir: PathBuf, rrdp_archive_dir: PathBuf, session: RrdpSession, serial: u64, notification: Notification, old_notifications: VecDeque<Notification>, snapshot: Snapshot, deltas: VecDeque<Delta>, ) -> Self521 pub fn new( 522 rrdp_base_uri: uri::Https, 523 rrdp_base_dir: PathBuf, 524 rrdp_archive_dir: PathBuf, 525 session: RrdpSession, 526 serial: u64, 527 notification: Notification, 528 old_notifications: VecDeque<Notification>, 529 snapshot: Snapshot, 530 deltas: VecDeque<Delta>, 531 ) -> Self { 532 RrdpServer { 533 rrdp_base_uri, 534 rrdp_base_dir, 535 rrdp_archive_dir, 536 session, 537 serial, 538 notification, 539 old_notifications, 540 snapshot, 541 deltas, 542 } 543 } 544 create(rrdp_base_uri: uri::Https, repo_dir: &Path, session: RrdpSession) -> Self545 pub fn create(rrdp_base_uri: uri::Https, repo_dir: &Path, session: RrdpSession) -> Self { 546 let mut rrdp_base_dir = repo_dir.to_path_buf(); 547 rrdp_base_dir.push(REPOSITORY_RRDP_DIR); 548 549 let mut rrdp_archive_dir = repo_dir.to_path_buf(); 550 rrdp_archive_dir.push(REPOSITORY_RRDP_ARCHIVE_DIR); 551 552 let snapshot = Snapshot::create(session); 553 554 let serial = RRDP_FIRST_SERIAL; 555 let snapshot_uri = snapshot.uri(&rrdp_base_uri); 556 let snapshot_path = snapshot.path(&rrdp_base_dir); 557 let snapshot_hash = HexEncodedHash::from_content(snapshot.xml().as_slice()); 558 559 let snapshot_ref = SnapshotRef::new(snapshot_uri, snapshot_path, snapshot_hash); 560 561 let notification = Notification::create(session, snapshot_ref); 562 563 RrdpServer { 564 rrdp_base_uri, 565 rrdp_base_dir, 566 rrdp_archive_dir, 567 session, 568 serial, 569 notification, 570 snapshot, 571 old_notifications: VecDeque::new(), 572 deltas: VecDeque::new(), 573 } 574 } 575 clear(&self)576 fn clear(&self) { 577 let _ = fs::remove_dir_all(&self.rrdp_base_dir); 578 let _ = fs::remove_dir_all(&self.rrdp_archive_dir); 579 } 580 snapshot(&self) -> &Snapshot581 fn snapshot(&self) -> &Snapshot { 582 &self.snapshot 583 } 584 notification(&self) -> &Notification585 pub fn notification(&self) -> &Notification { 586 &self.notification 587 } 588 589 /// Performs a session reset of the RRDP server. Useful if the serial needs 590 /// to be rolled, or in case the RRDP server needed to recover to a previous 591 /// state. session_reset(&mut self)592 fn session_reset(&mut self) { 593 let session = RrdpSession::random(); 594 let snapshot = self.snapshot.session_reset(session); 595 596 let snapshot_uri = snapshot.uri(&self.rrdp_base_uri); 597 let snapshot_path = snapshot.path(&self.rrdp_base_dir); 598 let snapshot_hash = HexEncodedHash::from_content(snapshot.xml().as_slice()); 599 600 let snapshot_ref = SnapshotRef::new(snapshot_uri, snapshot_path, snapshot_hash); 601 602 let notification = Notification::create(session, snapshot_ref); 603 604 self.serial = notification.serial(); 605 self.session = notification.session(); 606 self.notification = notification; 607 self.old_notifications.clear(); 608 self.snapshot = snapshot; 609 self.deltas = VecDeque::new(); 610 } 611 612 /// Updates the RRDP server with the elements. Note that this assumes that 613 /// the delta has already been checked against the jail and current 614 /// objects of the publisher. publish( &mut self, elements: DeltaElements, jail: &uri::Rsync, config: &RepositoryRetentionConfig, ) -> KrillResult<()>615 fn publish( 616 &mut self, 617 elements: DeltaElements, 618 jail: &uri::Rsync, 619 config: &RepositoryRetentionConfig, 620 ) -> KrillResult<()> { 621 if elements.is_empty() { 622 Ok(()) 623 } else { 624 // Update the snapshot, this can fail if the delta is illegal. 625 self.snapshot.apply_delta(elements.clone(), jail)?; 626 self.serial += 1; 627 628 self.update_deltas(elements, config); 629 self.update_notification(config); 630 631 Ok(()) 632 } 633 } 634 635 // Push the delta and truncate excessive deltas: 636 // - never keep more than the size of the snapshot 637 // - always keep 'retention_delta_files_min_nr' files 638 // - always keep 'retention_delta_files_min_seconds' files 639 // - beyond this: 640 // - never keep more than 'retention_delta_files_max_nr' 641 // - never keep older than 'retention_delta_files_max_seconds' 642 // - keep the others update_deltas(&mut self, elements: DeltaElements, config: &RepositoryRetentionConfig)643 fn update_deltas(&mut self, elements: DeltaElements, config: &RepositoryRetentionConfig) { 644 self.deltas.push_front(Delta::new(self.session, self.serial, elements)); 645 let mut keep = 0; 646 let mut size = 0; 647 let snapshot_size = self.snapshot.size(); 648 649 let min_nr = config.retention_delta_files_min_nr; 650 let min_secs = config.retention_delta_files_min_seconds; 651 let max_nr = config.retention_delta_files_max_nr; 652 let max_secs = config.retention_delta_files_max_seconds; 653 654 for delta in &self.deltas { 655 size += delta.elements().size(); 656 657 if size > snapshot_size { 658 // never keep more than the size of the snapshot 659 break; 660 } else if keep < min_nr || delta.younger_than_seconds(min_secs) { 661 // always keep 'retention_delta_files_min_nr' files 662 // always keep 'retention_delta_files_min_seconds' file 663 keep += 1 664 } else if keep == max_nr || delta.older_than_seconds(max_secs) { 665 // never keep more than 'retention_delta_files_max_nr' 666 // never keep older than 'retention_delta_files_max_seconds' 667 break; 668 } else { 669 // keep the remainder 670 keep += 1; 671 } 672 } 673 self.deltas.truncate(keep); 674 } 675 676 // Update the notification to include the current snapshot and 677 // deltas. Remove old notifications exceeding the retention time, 678 // so that we can delete old snapshots and deltas which are no longer 679 // relevant. update_notification(&mut self, config: &RepositoryRetentionConfig)680 fn update_notification(&mut self, config: &RepositoryRetentionConfig) { 681 let snapshot_ref = { 682 let snapshot_uri = self.snapshot.uri(&self.rrdp_base_uri); 683 let snapshot_path = self.snapshot.path(&self.rrdp_base_dir); 684 let snapshot_xml = self.snapshot.xml(); 685 let snapshot_hash = HexEncodedHash::from_content(snapshot_xml.as_slice()); 686 SnapshotRef::new(snapshot_uri, snapshot_path, snapshot_hash) 687 }; 688 689 let delta_refs = self 690 .deltas 691 .iter() 692 .map(|delta| { 693 let serial = delta.serial(); 694 let xml = delta.xml(); 695 let hash = HexEncodedHash::from_content(xml.as_slice()); 696 697 let delta_uri = delta.uri(&self.rrdp_base_uri); 698 let delta_path = delta.path(&self.rrdp_base_dir); 699 let file_ref = FileRef::new(delta_uri, delta_path, hash); 700 DeltaRef::new(serial, file_ref) 701 }) 702 .collect(); 703 704 let mut notification = Notification::new(self.session, self.serial, snapshot_ref, delta_refs); 705 706 mem::swap(&mut self.notification, &mut notification); 707 notification.replace(self.notification.time()); 708 self.old_notifications.push_front(notification); 709 710 self.old_notifications 711 .retain(|n| !n.older_than_seconds(config.retention_old_notification_files_seconds)); 712 } 713 714 /// Write the (missing) RRDP files to disk, and remove the ones 715 /// no longer referenced in the notification file. write(&self, config: &RepositoryRetentionConfig) -> Result<(), Error>716 fn write(&self, config: &RepositoryRetentionConfig) -> Result<(), Error> { 717 // write snapshot if it's not there 718 let snapshot_path = self.snapshot.path(&self.rrdp_base_dir); 719 if !snapshot_path.exists() { 720 self.snapshot.write_xml(&snapshot_path)?; 721 } 722 723 // write deltas if they are not there 724 for delta in &self.deltas { 725 let path = delta.path(&self.rrdp_base_dir); 726 if !path.exists() { 727 // assume that if the delta exists, it is correct 728 delta.write_xml(&path)?; 729 } 730 } 731 732 // write notification file 733 let notification_path_new = self.notification_path_new(); 734 let notification_path = self.notification_path(); 735 self.notification.write_xml(¬ification_path_new)?; 736 fs::rename(¬ification_path_new, ¬ification_path).map_err(|e| { 737 KrillIoError::new( 738 format!( 739 "Could not rename notification file from '{}' to '{}'", 740 notification_path_new.to_string_lossy(), 741 notification_path.to_string_lossy() 742 ), 743 e, 744 ) 745 })?; 746 747 // clean up under the base dir: 748 // - old session dirs 749 for entry in fs::read_dir(&self.rrdp_base_dir).map_err(|e| { 750 KrillIoError::new( 751 format!( 752 "Could not read RRDP base directory '{}'", 753 self.rrdp_base_dir.to_string_lossy() 754 ), 755 e, 756 ) 757 })? { 758 let entry = entry.map_err(|e| { 759 KrillIoError::new( 760 format!( 761 "Could not read entry in RRDP base directory '{}'", 762 self.rrdp_base_dir.to_string_lossy() 763 ), 764 e, 765 ) 766 })?; 767 if self.session.to_string() == entry.file_name().to_string_lossy() { 768 continue; 769 } else { 770 let path = entry.path(); 771 if path.is_dir() { 772 let _best_effort_rm = fs::remove_dir_all(path); 773 } 774 } 775 } 776 777 // clean up under the current session 778 let mut session_dir = self.rrdp_base_dir.clone(); 779 session_dir.push(self.session.to_string()); 780 781 for entry in fs::read_dir(&session_dir).map_err(|e| { 782 KrillIoError::new( 783 format!( 784 "Could not read RRDP session directory '{}'", 785 session_dir.to_string_lossy() 786 ), 787 e, 788 ) 789 })? { 790 let entry = entry.map_err(|e| { 791 KrillIoError::new( 792 format!( 793 "Could not read entry in RRDP session directory '{}'", 794 session_dir.to_string_lossy() 795 ), 796 e, 797 ) 798 })?; 799 let path = entry.path(); 800 801 // remove any dir or file that is: 802 // - not a number 803 // - a number that is higher than the current serial 804 // - a number that is lower than the last delta (if set) 805 if let Ok(serial) = u64::from_str(entry.file_name().to_string_lossy().as_ref()) { 806 // Skip the current serial 807 if serial == self.serial { 808 continue; 809 // Clean up old serial dirs once deltas are out of scope 810 } else if !self.notification.includes_delta(serial) 811 && !self.old_notifications.iter().any(|n| n.includes_delta(serial)) 812 { 813 if config.retention_archive { 814 // If archiving is enabled, then move these directories under the archive base 815 816 let mut dest = self.rrdp_archive_dir.clone(); 817 dest.push(self.session.to_string()); 818 dest.push(format!("{}", serial)); 819 820 info!("Archiving RRDP serial '{}' to '{}", serial, dest.to_string_lossy()); 821 let _ = fs::create_dir_all(&dest); 822 let _ = fs::rename(path, dest); 823 } else if path.is_dir() { 824 let _best_effort_rm = fs::remove_dir_all(path); 825 } else { 826 let _best_effort_rm = fs::remove_file(path); 827 } 828 // We still need this old serial dir for the delta, but may not need the snapshot 829 // in it unless archiving is enabled.. in that case leave them and move them when 830 // the complete serial dir goes out of scope above. 831 } else if !config.retention_archive 832 && !self 833 .old_notifications 834 .iter() 835 .any(|old_notification| old_notification.includes_snapshot(serial)) 836 { 837 // see if the there is a snapshot file in this serial dir and if so do a best 838 // effort removal. 839 if let Ok(Some(snapshot_file_to_remove)) = Self::session_dir_snapshot(&session_dir, serial) { 840 // snapshot files are stored under their own unique random dir, e.g: 841 // <session_dir>/<serial>/<random>/snapshot.xml 842 // 843 // So also remove the otherwise empty parent directory. 844 if let Some(snapshot_parent_dir) = snapshot_file_to_remove.parent() { 845 let _ = fs::remove_dir_all(snapshot_parent_dir); 846 } 847 } 848 } else { 849 // we still need this 850 } 851 } else { 852 // clean up dirs or files under the base dir which are not sessions 853 warn!( 854 "Found unexpected file or dir in RRDP repository - will try to remove: {}", 855 path.to_string_lossy() 856 ); 857 if path.is_dir() { 858 let _best_effort_rm = fs::remove_dir_all(path); 859 } else { 860 let _best_effort_rm = fs::remove_file(path); 861 } 862 } 863 } 864 865 Ok(()) 866 } 867 } 868 869 /// rrdp paths and uris 870 /// 871 impl RrdpServer { notification_uri(&self) -> uri::Https872 pub fn notification_uri(&self) -> uri::Https { 873 self.rrdp_base_uri.join(b"notification.xml").unwrap() 874 } 875 notification_path_new(&self) -> PathBuf876 fn notification_path_new(&self) -> PathBuf { 877 let mut path = self.rrdp_base_dir.clone(); 878 path.push("new-notification.xml"); 879 path 880 } 881 notification_path(&self) -> PathBuf882 fn notification_path(&self) -> PathBuf { 883 let mut path = self.rrdp_base_dir.clone(); 884 path.push("notification.xml"); 885 path 886 } 887 session_dir_snapshot(session_path: &Path, serial: u64) -> KrillResult<Option<PathBuf>>888 pub fn session_dir_snapshot(session_path: &Path, serial: u64) -> KrillResult<Option<PathBuf>> { 889 Self::find_in_serial_dir(session_path, serial, "snapshot.xml") 890 } 891 892 /// Expects files (like delta.xml or snapshot.xml) under dir structure like: 893 /// <session_path>/<serial>/<some random>/<filename> find_in_serial_dir(session_path: &Path, serial: u64, filename: &str) -> KrillResult<Option<PathBuf>>894 pub fn find_in_serial_dir(session_path: &Path, serial: u64, filename: &str) -> KrillResult<Option<PathBuf>> { 895 let serial_dir = session_path.join(serial.to_string()); 896 if let Ok(randoms) = fs::read_dir(&serial_dir) { 897 for entry in randoms { 898 let entry = entry.map_err(|e| { 899 Error::io_error_with_context( 900 format!( 901 "Could not open directory entry under RRDP directory {}", 902 serial_dir.to_string_lossy() 903 ), 904 e, 905 ) 906 })?; 907 if let Ok(files) = fs::read_dir(entry.path()) { 908 for file in files { 909 let file = file.map_err(|e| { 910 Error::io_error_with_context( 911 format!( 912 "Could not open directory entry under RRDP directory {}", 913 entry.path().to_string_lossy() 914 ), 915 e, 916 ) 917 })?; 918 if file.file_name().to_string_lossy() == filename { 919 return Ok(Some(file.path())); 920 } 921 } 922 } 923 } 924 } 925 Ok(None) 926 } 927 } 928 929 /// We can only have one (1) RepositoryAccess, but it is an event-sourced 930 /// typed which is stored in an AggregateStore which could theoretically 931 /// serve multiple. So, we use RepositoryAccessProxy as a wrapper around 932 /// this so that callers don't need to worry about storage details. 933 pub struct RepositoryAccessProxy { 934 store: AggregateStore<RepositoryAccess>, 935 key: Handle, 936 } 937 938 impl RepositoryAccessProxy { disk(config: &Config) -> KrillResult<Self>939 pub fn disk(config: &Config) -> KrillResult<Self> { 940 let store = AggregateStore::<RepositoryAccess>::disk(&config.data_dir, PUBSERVER_DIR)?; 941 let key = Handle::from_str(PUBSERVER_DFLT).unwrap(); 942 943 if store.has(&key)? { 944 if config.always_recover_data { 945 store.recover()?; 946 } else if let Err(e) = store.warm() { 947 error!( 948 "Could not warm up cache, storage seems corrupt, will try to recover!! Error was: {}", 949 e 950 ); 951 store.recover()?; 952 } 953 } 954 955 Ok(RepositoryAccessProxy { store, key }) 956 } 957 initialized(&self) -> KrillResult<bool>958 pub fn initialized(&self) -> KrillResult<bool> { 959 self.store.has(&self.key).map_err(Error::AggregateStoreError) 960 } 961 init(&self, uris: PublicationServerUris, signer: &KrillSigner) -> KrillResult<()>962 pub fn init(&self, uris: PublicationServerUris, signer: &KrillSigner) -> KrillResult<()> { 963 if self.initialized()? { 964 Err(Error::RepositoryServerAlreadyInitialized) 965 } else { 966 let (rrdp_base_uri, rsync_jail) = uris.unpack(); 967 968 let ini = RepositoryAccessInitDetails::init(&self.key, rsync_jail, rrdp_base_uri, signer)?; 969 970 self.store.add(ini)?; 971 972 Ok(()) 973 } 974 } 975 clear(&self) -> KrillResult<()>976 pub fn clear(&self) -> KrillResult<()> { 977 if !self.initialized()? { 978 Err(Error::RepositoryServerNotInitialized) 979 } else if !self.publishers()?.is_empty() { 980 Err(Error::RepositoryServerHasPublishers) 981 } else { 982 self.store.drop_aggregate(&self.key)?; 983 Ok(()) 984 } 985 } 986 read(&self) -> KrillResult<Arc<RepositoryAccess>>987 fn read(&self) -> KrillResult<Arc<RepositoryAccess>> { 988 if !self.initialized()? { 989 Err(Error::RepositoryServerNotInitialized) 990 } else { 991 self.store 992 .get_latest(&self.key) 993 .map_err(|e| Error::custom(format!("Publication Server data issue: {}", e))) 994 } 995 } 996 publishers(&self) -> KrillResult<Vec<PublisherHandle>>997 pub fn publishers(&self) -> KrillResult<Vec<PublisherHandle>> { 998 Ok(self.read()?.publishers()) 999 } 1000 get_publisher(&self, name: &PublisherHandle) -> KrillResult<Publisher>1001 pub fn get_publisher(&self, name: &PublisherHandle) -> KrillResult<Publisher> { 1002 self.read()?.get_publisher(name).map(|p| p.clone()) 1003 } 1004 add_publisher(&self, req: rfc8183::PublisherRequest, actor: &Actor) -> KrillResult<()>1005 pub fn add_publisher(&self, req: rfc8183::PublisherRequest, actor: &Actor) -> KrillResult<()> { 1006 let base_uri = self.read()?.base_uri_for(req.publisher_handle())?; // will verify that server was initialized 1007 let cmd = RepoAccessCmdDet::add_publisher(&self.key, req, base_uri, actor); 1008 self.store.command(cmd)?; 1009 Ok(()) 1010 } 1011 remove_publisher(&self, name: PublisherHandle, actor: &Actor) -> KrillResult<()>1012 pub fn remove_publisher(&self, name: PublisherHandle, actor: &Actor) -> KrillResult<()> { 1013 if !self.initialized()? { 1014 Err(Error::RepositoryServerNotInitialized) 1015 } else { 1016 let cmd = RepoAccessCmdDet::remove_publisher(&self.key, name, actor); 1017 self.store.command(cmd)?; 1018 Ok(()) 1019 } 1020 } 1021 1022 /// Returns the repository URI information for a publisher. repo_info_for(&self, name: &PublisherHandle) -> KrillResult<RepoInfo>1023 pub fn repo_info_for(&self, name: &PublisherHandle) -> KrillResult<RepoInfo> { 1024 self.read()?.repo_info_for(name) 1025 } 1026 1027 /// Returns the RFC8183 Repository Response for the publisher repository_response( &self, rfc8181_uri: uri::Https, publisher: &PublisherHandle, ) -> KrillResult<rfc8183::RepositoryResponse>1028 pub fn repository_response( 1029 &self, 1030 rfc8181_uri: uri::Https, 1031 publisher: &PublisherHandle, 1032 ) -> KrillResult<rfc8183::RepositoryResponse> { 1033 self.read()?.repository_response(rfc8181_uri, publisher) 1034 } 1035 1036 /// Parse submitted bytes by a Publisher as an RFC8181 ProtocolCms object, and validates it. validate(&self, publisher: &PublisherHandle, msg: Bytes) -> KrillResult<ProtocolCms>1037 pub fn validate(&self, publisher: &PublisherHandle, msg: Bytes) -> KrillResult<ProtocolCms> { 1038 let publisher = self.get_publisher(publisher)?; 1039 let msg = ProtocolCms::decode(msg, false).map_err(|e| Error::Rfc8181Decode(e.to_string()))?; 1040 msg.validate(publisher.id_cert()).map_err(Error::Rfc8181Validation)?; 1041 Ok(msg) 1042 } 1043 1044 /// Creates and signs an RFC8181 CMS response. respond(&self, message: Bytes, signer: &KrillSigner) -> KrillResult<Bytes>1045 pub fn respond(&self, message: Bytes, signer: &KrillSigner) -> KrillResult<Bytes> { 1046 let key_id = self.read()?.key_id(); 1047 let response_builder = ProtocolCmsBuilder::create(&key_id, signer, message).map_err(Error::signer)?; 1048 Ok(response_builder.as_bytes()) 1049 } 1050 } 1051 1052 //------------ RepositoryAccess -------------------------------------------- 1053 1054 /// An RFC8183 Repository server, capable of handling Publishers (both embedded, and 1055 /// remote RFC8183), and publishing to RRDP and disk, and signing responses. 1056 #[derive(Clone, Debug, Deserialize, Serialize)] 1057 pub struct RepositoryAccess { 1058 // Event sourcing support 1059 handle: Handle, 1060 version: u64, 1061 1062 id_cert: IdCert, 1063 publishers: HashMap<PublisherHandle, Publisher>, 1064 1065 rsync_base: uri::Rsync, 1066 rrdp_base: uri::Https, 1067 } 1068 1069 impl RepositoryAccess { key_id(&self) -> KeyIdentifier1070 pub fn key_id(&self) -> KeyIdentifier { 1071 self.id_cert.subject_public_key_info().key_identifier() 1072 } 1073 } 1074 1075 /// # Event Sourcing support 1076 /// 1077 impl Aggregate for RepositoryAccess { 1078 type Command = RepoAccessCmd; 1079 type StorableCommandDetails = StorableRepositoryCommand; 1080 type Event = RepositoryAccessEvent; 1081 type InitEvent = RepositoryAccessIni; 1082 type Error = Error; 1083 init(event: Self::InitEvent) -> Result<Self, Self::Error>1084 fn init(event: Self::InitEvent) -> Result<Self, Self::Error> { 1085 let (handle, _version, details) = event.unpack(); 1086 let (id_cert, rrdp_base, rsync_base) = details.unpack(); 1087 1088 Ok(RepositoryAccess { 1089 handle, 1090 version: 1, 1091 id_cert, 1092 publishers: HashMap::new(), 1093 rsync_base, 1094 rrdp_base, 1095 }) 1096 } 1097 version(&self) -> u641098 fn version(&self) -> u64 { 1099 self.version 1100 } 1101 apply(&mut self, event: Self::Event)1102 fn apply(&mut self, event: Self::Event) { 1103 self.version += 1; 1104 match event.into_details() { 1105 RepositoryAccessEventDetails::PublisherAdded { name, publisher } => { 1106 self.publishers.insert(name, publisher); 1107 } 1108 RepositoryAccessEventDetails::PublisherRemoved { name } => { 1109 self.publishers.remove(&name); 1110 } 1111 } 1112 } 1113 process_command(&self, command: Self::Command) -> Result<Vec<Self::Event>, Self::Error>1114 fn process_command(&self, command: Self::Command) -> Result<Vec<Self::Event>, Self::Error> { 1115 info!( 1116 "Sending command to publisher '{}', version: {}: {}", 1117 self.handle, self.version, command 1118 ); 1119 1120 match command.into_details() { 1121 RepoAccessCmdDet::AddPublisher { request, base_uri } => self.add_publisher(request, base_uri), 1122 RepoAccessCmdDet::RemovePublisher { name } => self.remove_publisher(name), 1123 } 1124 } 1125 } 1126 1127 /// # Manage publishers 1128 /// 1129 impl RepositoryAccess { 1130 /// Adds a publisher with access to the repository add_publisher( &self, publisher_request: rfc8183::PublisherRequest, base_uri: uri::Rsync, ) -> Result<Vec<RepositoryAccessEvent>, Error>1131 fn add_publisher( 1132 &self, 1133 publisher_request: rfc8183::PublisherRequest, 1134 base_uri: uri::Rsync, 1135 ) -> Result<Vec<RepositoryAccessEvent>, Error> { 1136 let (_tag, name, id_cert) = publisher_request.unpack(); 1137 1138 if self.publishers.contains_key(&name) { 1139 Err(Error::PublisherDuplicate(name)) 1140 } else { 1141 let publisher = Publisher::new(id_cert, base_uri); 1142 1143 Ok(vec![RepositoryAccessEventDetails::publisher_added( 1144 &self.handle, 1145 self.version, 1146 name, 1147 publisher, 1148 )]) 1149 } 1150 } 1151 1152 /// Removes a publisher and all its content remove_publisher(&self, publisher_handle: PublisherHandle) -> Result<Vec<RepositoryAccessEvent>, Error>1153 fn remove_publisher(&self, publisher_handle: PublisherHandle) -> Result<Vec<RepositoryAccessEvent>, Error> { 1154 if !self.has_publisher(&publisher_handle) { 1155 Err(Error::PublisherUnknown(publisher_handle)) 1156 } else { 1157 Ok(vec![RepositoryAccessEventDetails::publisher_removed( 1158 &self.handle, 1159 self.version, 1160 publisher_handle, 1161 )]) 1162 } 1163 } 1164 notification_uri(&self) -> uri::Https1165 fn notification_uri(&self) -> uri::Https { 1166 self.rrdp_base.join(b"notification.xml").unwrap() 1167 } 1168 base_uri_for(&self, name: &PublisherHandle) -> KrillResult<uri::Rsync>1169 fn base_uri_for(&self, name: &PublisherHandle) -> KrillResult<uri::Rsync> { 1170 uri::Rsync::from_str(&format!("{}{}/", self.rsync_base, name)) 1171 .map_err(|_| Error::Custom(format!("Cannot derive base uri for {}", name))) 1172 } 1173 1174 /// Returns the repository URI information for a publisher. repo_info_for(&self, name: &PublisherHandle) -> KrillResult<RepoInfo>1175 pub fn repo_info_for(&self, name: &PublisherHandle) -> KrillResult<RepoInfo> { 1176 let rsync_base = self.base_uri_for(name)?; 1177 Ok(RepoInfo::new(rsync_base, self.notification_uri())) 1178 } 1179 repository_response( &self, rfc8181_uri: uri::Https, publisher_handle: &PublisherHandle, ) -> Result<rfc8183::RepositoryResponse, Error>1180 pub fn repository_response( 1181 &self, 1182 rfc8181_uri: uri::Https, 1183 publisher_handle: &PublisherHandle, 1184 ) -> Result<rfc8183::RepositoryResponse, Error> { 1185 let publisher = self.get_publisher(publisher_handle)?; 1186 let rsync_base = publisher.base_uri(); 1187 let service_uri = rfc8183::ServiceUri::Https(rfc8181_uri); 1188 1189 let repo_info = RepoInfo::new(rsync_base.clone(), self.notification_uri()); 1190 1191 Ok(rfc8183::RepositoryResponse::new( 1192 None, 1193 publisher_handle.clone(), 1194 self.id_cert.clone(), 1195 service_uri, 1196 repo_info, 1197 )) 1198 } 1199 get_publisher(&self, publisher_handle: &PublisherHandle) -> Result<&Publisher, Error>1200 pub fn get_publisher(&self, publisher_handle: &PublisherHandle) -> Result<&Publisher, Error> { 1201 self.publishers 1202 .get(publisher_handle) 1203 .ok_or_else(|| Error::PublisherUnknown(publisher_handle.clone())) 1204 } 1205 has_publisher(&self, name: &PublisherHandle) -> bool1206 pub fn has_publisher(&self, name: &PublisherHandle) -> bool { 1207 self.publishers.contains_key(name) 1208 } 1209 publishers(&self) -> Vec<PublisherHandle>1210 pub fn publishers(&self) -> Vec<PublisherHandle> { 1211 self.publishers.keys().cloned().collect() 1212 } 1213 } 1214 1215 //------------ RepoStats ----------------------------------------------------- 1216 1217 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] 1218 pub struct RepoStats { 1219 publishers: HashMap<PublisherHandle, PublisherStats>, 1220 session: RrdpSession, 1221 serial: u64, 1222 last_update: Option<Time>, 1223 } 1224 1225 impl Default for RepoStats { default() -> Self1226 fn default() -> Self { 1227 RepoStats { 1228 publishers: HashMap::new(), 1229 session: RrdpSession::default(), 1230 serial: 0, 1231 last_update: None, 1232 } 1233 } 1234 } 1235 1236 impl RepoStats { new(session: RrdpSession) -> Self1237 pub fn new(session: RrdpSession) -> Self { 1238 RepoStats { 1239 publishers: HashMap::new(), 1240 session, 1241 serial: 0, 1242 last_update: None, 1243 } 1244 } 1245 publish( &mut self, publisher: &PublisherHandle, publisher_stats: PublisherStats, notification: &Notification, )1246 pub fn publish( 1247 &mut self, 1248 publisher: &PublisherHandle, 1249 publisher_stats: PublisherStats, 1250 notification: &Notification, 1251 ) { 1252 self.publishers.insert(publisher.clone(), publisher_stats); 1253 self.serial = notification.serial(); 1254 self.last_update = Some(notification.time()); 1255 } 1256 session_reset(&mut self, notification: &Notification)1257 pub fn session_reset(&mut self, notification: &Notification) { 1258 self.session = notification.session(); 1259 self.serial = notification.serial(); 1260 self.last_update = Some(notification.time()) 1261 } 1262 new_publisher(&mut self, publisher: &PublisherHandle)1263 pub fn new_publisher(&mut self, publisher: &PublisherHandle) { 1264 self.publishers.insert(publisher.clone(), PublisherStats::default()); 1265 } 1266 remove_publisher(&mut self, publisher: &PublisherHandle, notification: &Notification)1267 pub fn remove_publisher(&mut self, publisher: &PublisherHandle, notification: &Notification) { 1268 self.publishers.remove(publisher); 1269 self.serial = notification.serial(); 1270 self.last_update = Some(notification.time()) 1271 } 1272 get_publishers(&self) -> &HashMap<PublisherHandle, PublisherStats>1273 pub fn get_publishers(&self) -> &HashMap<PublisherHandle, PublisherStats> { 1274 &self.publishers 1275 } 1276 stale_publishers(&self, seconds: i64) -> Vec<PublisherHandle>1277 pub fn stale_publishers(&self, seconds: i64) -> Vec<PublisherHandle> { 1278 let mut res = vec![]; 1279 for (publisher, stats) in self.publishers.iter() { 1280 if let Some(update_time) = stats.last_update { 1281 if Time::now().timestamp() - update_time.timestamp() >= seconds { 1282 res.push(publisher.clone()) 1283 } 1284 } else { 1285 res.push(publisher.clone()) 1286 } 1287 } 1288 res 1289 } 1290 last_update(&self) -> Option<Time>1291 pub fn last_update(&self) -> Option<Time> { 1292 self.last_update 1293 } 1294 serial(&self) -> u641295 pub fn serial(&self) -> u64 { 1296 self.serial 1297 } 1298 session(&self) -> RrdpSession1299 pub fn session(&self) -> RrdpSession { 1300 self.session 1301 } 1302 } 1303 1304 impl fmt::Display for RepoStats { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1305 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 1306 if let Some(update) = self.last_update() { 1307 writeln!(f, "RRDP updated: {}", update.to_rfc3339())?; 1308 } 1309 writeln!(f, "RRDP session: {}", self.session())?; 1310 writeln!(f, "RRDP serial: {}", self.serial())?; 1311 writeln!(f)?; 1312 writeln!(f, "Publisher, Objects, Size, Last Updated")?; 1313 for (publisher, stats) in self.get_publishers() { 1314 let update_str = match stats.last_update() { 1315 None => "never".to_string(), 1316 Some(update) => update.to_rfc3339(), 1317 }; 1318 writeln!( 1319 f, 1320 "{}, {}, {}, {}", 1321 publisher, 1322 stats.objects(), 1323 stats.size(), 1324 update_str 1325 )?; 1326 } 1327 1328 Ok(()) 1329 } 1330 } 1331 1332 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] 1333 pub struct PublisherStats { 1334 objects: usize, 1335 size: usize, 1336 last_update: Option<Time>, 1337 } 1338 1339 impl PublisherStats { new(current_objects: &CurrentObjects, last_update: Time) -> Self1340 pub fn new(current_objects: &CurrentObjects, last_update: Time) -> Self { 1341 let objects = current_objects.len(); 1342 let size = current_objects.size(); 1343 PublisherStats { 1344 objects, 1345 size, 1346 last_update: Some(last_update), 1347 } 1348 } 1349 objects(&self) -> usize1350 pub fn objects(&self) -> usize { 1351 self.objects 1352 } 1353 size(&self) -> usize1354 pub fn size(&self) -> usize { 1355 self.size 1356 } 1357 last_update(&self) -> Option<Time>1358 pub fn last_update(&self) -> Option<Time> { 1359 self.last_update 1360 } 1361 } 1362 1363 impl From<&CurrentObjects> for PublisherStats { from(objects: &CurrentObjects) -> Self1364 fn from(objects: &CurrentObjects) -> Self { 1365 PublisherStats { 1366 objects: objects.len(), 1367 size: objects.size(), 1368 last_update: None, 1369 } 1370 } 1371 } 1372 1373 impl Default for PublisherStats { default() -> Self1374 fn default() -> Self { 1375 PublisherStats { 1376 objects: 0, 1377 size: 0, 1378 last_update: None, 1379 } 1380 } 1381 } 1382