1 //! Data objects used in the (RRDP) repository. I.e. the publish, update, and 2 //! withdraw elements, as well as the notification, snapshot and delta file 3 //! definitions. 4 use std::{ 5 fmt, 6 path::PathBuf, 7 {collections::HashMap, path::Path}, 8 }; 9 10 use bytes::Bytes; 11 use chrono::Duration; 12 use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; 13 use uuid::Uuid; 14 15 use rpki::{repository::x509::Time, uri}; 16 17 use crate::{ 18 commons::{ 19 api::{publication, Base64, HexEncodedHash}, 20 error::KrillIoError, 21 util::{file, xml::XmlWriter}, 22 }, 23 constants::RRDP_FIRST_SERIAL, 24 }; 25 26 const VERSION: &str = "1"; 27 const NS: &str = "http://www.ripe.net/rpki/rrdp"; 28 29 //------------ RrdpSession --------------------------------------------------- 30 #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] 31 pub struct RrdpSession(Uuid); 32 33 impl Default for RrdpSession { default() -> Self34 fn default() -> Self { 35 RrdpSession(Uuid::new_v4()) 36 } 37 } 38 39 impl RrdpSession { random() -> Self40 pub fn random() -> Self { 41 Self::default() 42 } 43 } 44 45 impl AsRef<Uuid> for RrdpSession { as_ref(&self) -> &Uuid46 fn as_ref(&self) -> &Uuid { 47 &self.0 48 } 49 } 50 51 impl Serialize for RrdpSession { serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: Serializer,52 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> 53 where 54 S: Serializer, 55 { 56 self.to_string().serialize(serializer) 57 } 58 } 59 60 impl<'de> Deserialize<'de> for RrdpSession { deserialize<D>(deserializer: D) -> Result<Self, D::Error> where D: Deserializer<'de>,61 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 62 where 63 D: Deserializer<'de>, 64 { 65 let string = String::deserialize(deserializer)?; 66 let uuid = Uuid::parse_str(&string).map_err(de::Error::custom)?; 67 68 Ok(RrdpSession(uuid)) 69 } 70 } 71 72 impl fmt::Display for RrdpSession { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result73 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 74 write!(f, "{}", self.0.to_hyphenated()) 75 } 76 } 77 78 //------------ PublishElement ------------------------------------------------ 79 80 /// The publishes as used in the RRDP protocol. 81 /// 82 /// Note that the difference with the publication protocol is the absence of 83 /// the tag. 84 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] 85 pub struct PublishElement { 86 base64: Base64, 87 uri: uri::Rsync, 88 } 89 90 impl PublishElement { new(base64: Base64, uri: uri::Rsync) -> Self91 pub fn new(base64: Base64, uri: uri::Rsync) -> Self { 92 PublishElement { base64, uri } 93 } 94 base64(&self) -> &Base6495 pub fn base64(&self) -> &Base64 { 96 &self.base64 97 } uri(&self) -> &uri::Rsync98 pub fn uri(&self) -> &uri::Rsync { 99 &self.uri 100 } size(&self) -> usize101 pub fn size(&self) -> usize { 102 self.base64.size() 103 } 104 as_withdraw(&self) -> WithdrawElement105 pub fn as_withdraw(&self) -> WithdrawElement { 106 WithdrawElement { 107 uri: self.uri.clone(), 108 hash: self.base64.to_encoded_hash(), 109 } 110 } 111 unpack(self) -> (uri::Rsync, Base64)112 pub fn unpack(self) -> (uri::Rsync, Base64) { 113 (self.uri, self.base64) 114 } 115 } 116 117 impl From<publication::Publish> for PublishElement { from(p: publication::Publish) -> Self118 fn from(p: publication::Publish) -> Self { 119 let (_tag, uri, base64) = p.unpack(); 120 PublishElement { base64, uri } 121 } 122 } 123 124 //------------ UpdateElement ------------------------------------------------- 125 126 /// The updates as used in the RRDP protocol. 127 /// 128 /// Note that the difference with the publication protocol is the absence of 129 /// the tag. 130 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] 131 pub struct UpdateElement { 132 uri: uri::Rsync, 133 hash: HexEncodedHash, 134 base64: Base64, 135 } 136 137 impl UpdateElement { uri(&self) -> &uri::Rsync138 pub fn uri(&self) -> &uri::Rsync { 139 &self.uri 140 } hash(&self) -> &HexEncodedHash141 pub fn hash(&self) -> &HexEncodedHash { 142 &self.hash 143 } base64(&self) -> &Base64144 pub fn base64(&self) -> &Base64 { 145 &self.base64 146 } size(&self) -> usize147 pub fn size(&self) -> usize { 148 self.base64.size() 149 } 150 } 151 152 impl From<publication::Update> for UpdateElement { from(u: publication::Update) -> Self153 fn from(u: publication::Update) -> Self { 154 let (_tag, uri, base64, hash) = u.unwrap(); 155 UpdateElement { uri, hash, base64 } 156 } 157 } 158 159 impl From<UpdateElement> for PublishElement { from(el: UpdateElement) -> Self160 fn from(el: UpdateElement) -> Self { 161 PublishElement { 162 uri: el.uri, 163 base64: el.base64, 164 } 165 } 166 } 167 168 //------------ WithdrawElement ----------------------------------------------- 169 170 /// The withdraws as used in the RRDP protocol. 171 /// 172 /// Note that the difference with the publication protocol is the absence of 173 /// the tag. 174 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] 175 pub struct WithdrawElement { 176 uri: uri::Rsync, 177 hash: HexEncodedHash, 178 } 179 180 impl WithdrawElement { uri(&self) -> &uri::Rsync181 pub fn uri(&self) -> &uri::Rsync { 182 &self.uri 183 } hash(&self) -> &HexEncodedHash184 pub fn hash(&self) -> &HexEncodedHash { 185 &self.hash 186 } 187 } 188 189 impl From<publication::Withdraw> for WithdrawElement { from(w: publication::Withdraw) -> Self190 fn from(w: publication::Withdraw) -> Self { 191 let (_tag, uri, hash) = w.unwrap(); 192 WithdrawElement { uri, hash } 193 } 194 } 195 196 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] 197 pub struct Notification { 198 session: RrdpSession, 199 serial: u64, 200 time: Time, 201 #[serde(skip_serializing_if = "Option::is_none")] 202 replaced: Option<Time>, 203 snapshot: SnapshotRef, 204 deltas: Vec<DeltaRef>, 205 last_delta: Option<u64>, 206 } 207 208 impl Notification { new(session: RrdpSession, serial: u64, snapshot: SnapshotRef, deltas: Vec<DeltaRef>) -> Self209 pub fn new(session: RrdpSession, serial: u64, snapshot: SnapshotRef, deltas: Vec<DeltaRef>) -> Self { 210 let last_delta = Self::find_last_delta(&deltas); 211 Notification { 212 session, 213 serial, 214 time: Time::now(), 215 replaced: None, 216 snapshot, 217 deltas, 218 last_delta, 219 } 220 } 221 time(&self) -> Time222 pub fn time(&self) -> Time { 223 self.time 224 } 225 226 #[deprecated] // use 'older_than_seconds' replaced_after(&self, timestamp: i64) -> bool227 pub fn replaced_after(&self, timestamp: i64) -> bool { 228 if let Some(replaced) = self.replaced { 229 replaced.timestamp() > timestamp 230 } else { 231 false 232 } 233 } 234 older_than_seconds(&self, seconds: i64) -> bool235 pub fn older_than_seconds(&self, seconds: i64) -> bool { 236 match self.replaced { 237 Some(time) => { 238 let then = Time::now() - Duration::seconds(seconds); 239 time < then 240 } 241 None => false, 242 } 243 } 244 replace(&mut self, time: Time)245 pub fn replace(&mut self, time: Time) { 246 self.replaced = Some(time); 247 } 248 serial(&self) -> u64249 pub fn serial(&self) -> u64 { 250 self.serial 251 } 252 session(&self) -> RrdpSession253 pub fn session(&self) -> RrdpSession { 254 self.session 255 } 256 last_delta(&self) -> Option<u64>257 pub fn last_delta(&self) -> Option<u64> { 258 self.last_delta 259 } 260 includes_delta(&self, delta: u64) -> bool261 pub fn includes_delta(&self, delta: u64) -> bool { 262 if let Some(last) = self.last_delta { 263 last <= delta 264 } else { 265 false 266 } 267 } 268 includes_snapshot(&self, version: u64) -> bool269 pub fn includes_snapshot(&self, version: u64) -> bool { 270 self.serial == version 271 } 272 find_last_delta(deltas: &[DeltaRef]) -> Option<u64>273 fn find_last_delta(deltas: &[DeltaRef]) -> Option<u64> { 274 if deltas.is_empty() { 275 None 276 } else { 277 let mut serial = deltas[0].serial; 278 for d in deltas { 279 if d.serial < serial { 280 serial = d.serial 281 } 282 } 283 284 Some(serial) 285 } 286 } 287 } 288 289 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] 290 pub struct NotificationUpdate { 291 time: Time, 292 session: Option<RrdpSession>, 293 snapshot: SnapshotRef, 294 delta: DeltaRef, 295 last_delta: u64, 296 } 297 298 impl NotificationUpdate { new( time: Time, session: Option<RrdpSession>, snapshot: SnapshotRef, delta: DeltaRef, last_delta: u64, ) -> Self299 pub fn new( 300 time: Time, 301 session: Option<RrdpSession>, 302 snapshot: SnapshotRef, 303 delta: DeltaRef, 304 last_delta: u64, 305 ) -> Self { 306 NotificationUpdate { 307 time, 308 session, 309 snapshot, 310 delta, 311 last_delta, 312 } 313 } 314 } 315 316 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] 317 pub struct NotificationCreate { 318 session: RrdpSession, 319 snapshot: SnapshotRef, 320 } 321 322 impl NotificationUpdate { unwrap(self) -> (Time, Option<RrdpSession>, SnapshotRef, DeltaRef, u64)323 pub fn unwrap(self) -> (Time, Option<RrdpSession>, SnapshotRef, DeltaRef, u64) { 324 (self.time, self.session, self.snapshot, self.delta, self.last_delta) 325 } 326 } 327 328 impl Notification { create(session: RrdpSession, snapshot: SnapshotRef) -> Self329 pub fn create(session: RrdpSession, snapshot: SnapshotRef) -> Self { 330 Notification::new(session, RRDP_FIRST_SERIAL, snapshot, vec![]) 331 } 332 write_xml(&self, path: &Path) -> Result<(), KrillIoError>333 pub fn write_xml(&self, path: &Path) -> Result<(), KrillIoError> { 334 trace!("Writing notification file: {}", path.to_string_lossy()); 335 let mut file = file::create_file_with_path(path)?; 336 337 XmlWriter::encode_to_file(&mut file, |w| { 338 let a = [ 339 ("xmlns", NS), 340 ("version", VERSION), 341 ("session_id", &format!("{}", self.session)), 342 ("serial", &format!("{}", self.serial)), 343 ]; 344 345 w.put_element("notification", Some(&a), |w| { 346 { 347 // snapshot ref 348 let uri = self.snapshot.uri.to_string(); 349 let a = [("uri", uri.as_str()), ("hash", self.snapshot.hash.as_ref())]; 350 w.put_element("snapshot", Some(&a), |w| w.empty())?; 351 } 352 353 { 354 // delta refs 355 for delta in &self.deltas { 356 let serial = format!("{}", delta.serial); 357 let uri = delta.file_ref.uri.to_string(); 358 let a = [ 359 ("serial", serial.as_ref()), 360 ("uri", uri.as_str()), 361 ("hash", delta.file_ref.hash.as_ref()), 362 ]; 363 w.put_element("delta", Some(&a), |w| w.empty())?; 364 } 365 } 366 367 Ok(()) 368 }) 369 }) 370 .map_err(|e| KrillIoError::new(format!("Could not write XML to: {}", path.to_string_lossy()), e)) 371 } 372 } 373 374 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] 375 pub struct FileRef { 376 uri: uri::Https, 377 path: PathBuf, 378 hash: HexEncodedHash, 379 } 380 381 impl FileRef { new(uri: uri::Https, path: PathBuf, hash: HexEncodedHash) -> Self382 pub fn new(uri: uri::Https, path: PathBuf, hash: HexEncodedHash) -> Self { 383 FileRef { uri, path, hash } 384 } uri(&self) -> &uri::Https385 pub fn uri(&self) -> &uri::Https { 386 &self.uri 387 } path(&self) -> &PathBuf388 pub fn path(&self) -> &PathBuf { 389 &self.path 390 } hash(&self) -> &HexEncodedHash391 pub fn hash(&self) -> &HexEncodedHash { 392 &self.hash 393 } 394 } 395 396 pub type SnapshotRef = FileRef; 397 398 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] 399 pub struct DeltaRef { 400 serial: u64, 401 file_ref: FileRef, 402 } 403 404 impl DeltaRef { new(serial: u64, file_ref: FileRef) -> Self405 pub fn new(serial: u64, file_ref: FileRef) -> Self { 406 DeltaRef { serial, file_ref } 407 } 408 serial(&self) -> u64409 pub fn serial(&self) -> u64 { 410 self.serial 411 } 412 } 413 414 impl AsRef<FileRef> for DeltaRef { as_ref(&self) -> &FileRef415 fn as_ref(&self) -> &FileRef { 416 &self.file_ref 417 } 418 } 419 420 //------------ CurrentObjects ------------------------------------------------ 421 422 /// Defines a current set of published elements. 423 /// 424 // Note this is mapped internally for speedy access, by hash, rather than uri 425 // for two reasons: 426 // a) URIs in RPKI may change in future 427 // b) The publish element as it appears in an RFC8182 snapshot.xml includes 428 // the uri and the base64, but not the hash. So keeping the actual elements 429 // around means we can be more efficient in producing that output. 430 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] 431 pub struct CurrentObjects(HashMap<HexEncodedHash, PublishElement>); 432 433 impl Default for CurrentObjects { default() -> Self434 fn default() -> Self { 435 CurrentObjects(HashMap::new()) 436 } 437 } 438 439 impl CurrentObjects { new(map: HashMap<HexEncodedHash, PublishElement>) -> Self440 pub fn new(map: HashMap<HexEncodedHash, PublishElement>) -> Self { 441 CurrentObjects(map) 442 } 443 elements(&self) -> Vec<&PublishElement>444 pub fn elements(&self) -> Vec<&PublishElement> { 445 let mut res = vec![]; 446 for el in self.0.values() { 447 res.push(el) 448 } 449 res 450 } 451 into_elements(self) -> Vec<PublishElement>452 pub fn into_elements(self) -> Vec<PublishElement> { 453 self.0.into_iter().map(|(_, e)| e).collect() 454 } 455 has_match(&self, hash: &HexEncodedHash, uri: &uri::Rsync) -> bool456 fn has_match(&self, hash: &HexEncodedHash, uri: &uri::Rsync) -> bool { 457 match self.0.get(hash) { 458 Some(el) => el.uri() == uri, 459 None => false, 460 } 461 } 462 verify_delta(&self, delta: &DeltaElements, jail: &uri::Rsync) -> Result<(), PublicationDeltaError>463 fn verify_delta(&self, delta: &DeltaElements, jail: &uri::Rsync) -> Result<(), PublicationDeltaError> { 464 for p in delta.publishes() { 465 if !jail.is_parent_of(p.uri()) { 466 return Err(PublicationDeltaError::outside(jail, p.uri())); 467 } 468 let hash = p.base64().to_encoded_hash(); 469 if self.0.contains_key(&hash) { 470 return Err(PublicationDeltaError::present(p.uri())); 471 } 472 } 473 474 for u in delta.updates() { 475 if !jail.is_parent_of(u.uri()) { 476 return Err(PublicationDeltaError::outside(jail, u.uri())); 477 } 478 if !self.has_match(u.hash(), u.uri()) { 479 return Err(PublicationDeltaError::no_match(u.uri())); 480 } 481 } 482 483 for w in delta.withdraws() { 484 if !jail.is_parent_of(w.uri()) { 485 return Err(PublicationDeltaError::outside(jail, w.uri())); 486 } 487 if !self.has_match(w.hash(), w.uri()) { 488 return Err(PublicationDeltaError::no_match(w.uri())); 489 } 490 } 491 492 Ok(()) 493 } 494 495 /// Applies a delta to CurrentObjects. This will verify that the 496 /// delta is legal with regards to existing objects, and the jail 497 /// specified for the publisher. apply_delta(&mut self, delta: DeltaElements, jail: &uri::Rsync) -> Result<(), PublicationDeltaError>498 pub fn apply_delta(&mut self, delta: DeltaElements, jail: &uri::Rsync) -> Result<(), PublicationDeltaError> { 499 self.verify_delta(&delta, jail)?; 500 501 let (publishes, updates, withdraws) = delta.unpack(); 502 503 for p in publishes { 504 let hash = p.base64().to_encoded_hash(); 505 self.0.insert(hash, p); 506 } 507 508 for u in updates { 509 self.0.remove(u.hash()); 510 let p: PublishElement = u.into(); 511 let hash = p.base64().to_encoded_hash(); 512 self.0.insert(hash, p); 513 } 514 515 for w in withdraws { 516 self.0.remove(w.hash()); 517 } 518 519 Ok(()) 520 } 521 len(&self) -> usize522 pub fn len(&self) -> usize { 523 self.0.len() 524 } 525 size(&self) -> usize526 pub fn size(&self) -> usize { 527 self.0.values().fold(0, |tot, el| tot + el.size()) 528 } 529 is_empty(&self) -> bool530 pub fn is_empty(&self) -> bool { 531 self.0.is_empty() 532 } 533 to_list_reply(&self) -> publication::ListReply534 pub fn to_list_reply(&self) -> publication::ListReply { 535 let elements = self 536 .0 537 .iter() 538 .map(|el| { 539 let hash = el.0.clone(); 540 let uri = el.1.uri().clone(); 541 publication::ListElement::new(uri, hash) 542 }) 543 .collect(); 544 545 publication::ListReply::new(elements) 546 } 547 } 548 549 //------------ PublicationDeltaError --------------------------------------------- 550 551 /// Issues with relation to verifying deltas. 552 #[derive(Clone, Debug)] 553 pub enum PublicationDeltaError { 554 UriOutsideJail(uri::Rsync, uri::Rsync), 555 ObjectAlreadyPresent(uri::Rsync), 556 NoObjectForHashAndOrUri(uri::Rsync), 557 } 558 559 impl fmt::Display for PublicationDeltaError { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result560 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 561 match self { 562 PublicationDeltaError::UriOutsideJail(uri, jail) => { 563 write!(f, "Publishing ({}) outside of jail URI ({}) is not allowed.", uri, jail) 564 } 565 PublicationDeltaError::ObjectAlreadyPresent(uri) => { 566 write!(f, "File already exists for uri (use update!): {}", uri) 567 } 568 PublicationDeltaError::NoObjectForHashAndOrUri(uri) => { 569 write!(f, "File does not match hash at uri: {}", uri) 570 } 571 } 572 } 573 } 574 575 impl PublicationDeltaError { outside(jail: &uri::Rsync, uri: &uri::Rsync) -> Self576 fn outside(jail: &uri::Rsync, uri: &uri::Rsync) -> Self { 577 PublicationDeltaError::UriOutsideJail(uri.clone(), jail.clone()) 578 } 579 present(uri: &uri::Rsync) -> Self580 fn present(uri: &uri::Rsync) -> Self { 581 PublicationDeltaError::ObjectAlreadyPresent(uri.clone()) 582 } 583 no_match(uri: &uri::Rsync) -> Self584 fn no_match(uri: &uri::Rsync) -> Self { 585 PublicationDeltaError::NoObjectForHashAndOrUri(uri.clone()) 586 } 587 } 588 589 //------------ RrdpFileRandom ------------------------------------------------ 590 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] 591 pub struct RrdpFileRandom(String); 592 593 impl Default for RrdpFileRandom { default() -> Self594 fn default() -> Self { 595 let mut bytes = [0; 8]; 596 openssl::rand::rand_bytes(&mut bytes).unwrap(); 597 let s = hex::encode(bytes); 598 RrdpFileRandom(s) 599 } 600 } 601 602 //------------ Snapshot ------------------------------------------------------ 603 604 /// A structure to contain the RRDP snapshot data. 605 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] 606 pub struct Snapshot { 607 session: RrdpSession, 608 serial: u64, 609 610 // By using the default (i.e. random) for deserializing where this is absent, we do not 611 // need to migrate data when upgrading to this version where we introduce this new field. 612 // Note that this will result in new random values for existing Snapshot files, but 613 // because we now also perform a session reset on start up (see issue #533) it will 614 // not matter that this value does not map to the previous path of where this snapshot 615 // was stored, as it will be promptly replaced and forgotten. 616 #[serde(default)] 617 random: RrdpFileRandom, 618 619 current_objects: CurrentObjects, 620 } 621 622 impl Snapshot { new(session: RrdpSession, serial: u64, current_objects: CurrentObjects) -> Self623 pub fn new(session: RrdpSession, serial: u64, current_objects: CurrentObjects) -> Self { 624 Snapshot { 625 session, 626 serial, 627 random: RrdpFileRandom::default(), 628 current_objects, 629 } 630 } 631 unpack(self) -> (RrdpSession, u64, CurrentObjects)632 pub fn unpack(self) -> (RrdpSession, u64, CurrentObjects) { 633 (self.session, self.serial, self.current_objects) 634 } 635 create(session: RrdpSession) -> Self636 pub fn create(session: RrdpSession) -> Self { 637 let current_objects = CurrentObjects::default(); 638 Snapshot { 639 session, 640 serial: RRDP_FIRST_SERIAL, 641 random: RrdpFileRandom::default(), 642 current_objects, 643 } 644 } 645 session_reset(&self, session: RrdpSession) -> Self646 pub fn session_reset(&self, session: RrdpSession) -> Self { 647 Snapshot { 648 session, 649 serial: RRDP_FIRST_SERIAL, 650 random: RrdpFileRandom::default(), 651 current_objects: self.current_objects.clone(), 652 } 653 } 654 elements(&self) -> Vec<&PublishElement>655 pub fn elements(&self) -> Vec<&PublishElement> { 656 self.current_objects.elements() 657 } 658 serial(&self) -> u64659 pub fn serial(&self) -> u64 { 660 self.serial 661 } 662 apply_delta(&mut self, elements: DeltaElements, jail: &uri::Rsync) -> Result<(), PublicationDeltaError>663 pub fn apply_delta(&mut self, elements: DeltaElements, jail: &uri::Rsync) -> Result<(), PublicationDeltaError> { 664 self.serial += 1; 665 self.random = RrdpFileRandom::default(); 666 self.current_objects.apply_delta(elements, jail) 667 } 668 size(&self) -> usize669 pub fn size(&self) -> usize { 670 self.current_objects.elements().iter().fold(0, |sum, p| sum + p.size()) 671 } 672 rel_path(&self) -> String673 fn rel_path(&self) -> String { 674 format!("{}/{}/{}/snapshot.xml", self.session, self.serial, self.random.0) 675 } 676 uri(&self, rrdp_base_uri: &uri::Https) -> uri::Https677 pub fn uri(&self, rrdp_base_uri: &uri::Https) -> uri::Https { 678 rrdp_base_uri.join(self.rel_path().as_ref()).unwrap() 679 } 680 path(&self, base_path: &Path) -> PathBuf681 pub fn path(&self, base_path: &Path) -> PathBuf { 682 base_path.join(self.rel_path()) 683 } 684 write_xml(&self, path: &Path) -> Result<(), KrillIoError>685 pub fn write_xml(&self, path: &Path) -> Result<(), KrillIoError> { 686 trace!("Writing snapshot file: {}", path.to_string_lossy()); 687 let vec = self.xml(); 688 let bytes = Bytes::from(vec); 689 690 file::save(&bytes, path) 691 } 692 xml(&self) -> Vec<u8>693 pub fn xml(&self) -> Vec<u8> { 694 XmlWriter::encode_vec(|w| { 695 let a = [ 696 ("xmlns", NS), 697 ("version", VERSION), 698 ("session_id", &format!("{}", self.session)), 699 ("serial", &format!("{}", self.serial)), 700 ]; 701 702 w.put_element("snapshot", Some(&a), |w| { 703 for el in self.current_objects.elements() { 704 let uri = el.uri.to_string(); 705 let atr = [("uri", uri.as_ref())]; 706 w.put_element("publish", Some(&atr), |w| w.put_text(el.base64.as_ref())) 707 .unwrap(); 708 } 709 Ok(()) 710 }) 711 }) 712 } 713 } 714 715 //------------ DeltaElements ------------------------------------------------- 716 717 /// Defines the elements for an RRDP delta. 718 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] 719 pub struct DeltaElements { 720 publishes: Vec<PublishElement>, 721 updates: Vec<UpdateElement>, 722 withdraws: Vec<WithdrawElement>, 723 } 724 725 impl DeltaElements { new(publishes: Vec<PublishElement>, updates: Vec<UpdateElement>, withdraws: Vec<WithdrawElement>) -> Self726 pub fn new(publishes: Vec<PublishElement>, updates: Vec<UpdateElement>, withdraws: Vec<WithdrawElement>) -> Self { 727 DeltaElements { 728 publishes, 729 updates, 730 withdraws, 731 } 732 } 733 unpack(self) -> (Vec<PublishElement>, Vec<UpdateElement>, Vec<WithdrawElement>)734 pub fn unpack(self) -> (Vec<PublishElement>, Vec<UpdateElement>, Vec<WithdrawElement>) { 735 (self.publishes, self.updates, self.withdraws) 736 } 737 len(&self) -> usize738 pub fn len(&self) -> usize { 739 self.publishes.len() + self.updates.len() + self.withdraws.len() 740 } 741 size(&self) -> usize742 pub fn size(&self) -> usize { 743 let sum_publishes = self.publishes.iter().fold(0, |sum, p| sum + p.size()); 744 let sum_updates = self.updates.iter().fold(0, |sum, u| sum + u.size()); 745 746 sum_publishes + sum_updates 747 } 748 is_empty(&self) -> bool749 pub fn is_empty(&self) -> bool { 750 self.len() == 0 751 } 752 publishes(&self) -> &Vec<PublishElement>753 pub fn publishes(&self) -> &Vec<PublishElement> { 754 &self.publishes 755 } 756 updates(&self) -> &Vec<UpdateElement>757 pub fn updates(&self) -> &Vec<UpdateElement> { 758 &self.updates 759 } 760 withdraws(&self) -> &Vec<WithdrawElement>761 pub fn withdraws(&self) -> &Vec<WithdrawElement> { 762 &self.withdraws 763 } 764 } 765 766 impl From<publication::PublishDelta> for DeltaElements { from(d: publication::PublishDelta) -> Self767 fn from(d: publication::PublishDelta) -> Self { 768 let (publishers, updates, withdraws) = d.unwrap(); 769 770 let publishes = publishers.into_iter().map(PublishElement::from).collect(); 771 let updates = updates.into_iter().map(UpdateElement::from).collect(); 772 let withdraws = withdraws.into_iter().map(WithdrawElement::from).collect(); 773 774 DeltaElements { 775 publishes, 776 updates, 777 withdraws, 778 } 779 } 780 } 781 782 //------------ Delta --------------------------------------------------------- 783 784 /// Defines an RRDP delta. 785 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] 786 pub struct Delta { 787 session: RrdpSession, 788 serial: u64, 789 790 // By using the default (i.e. random) for deserializing where this is absent, we do not 791 // need to migrate data when upgrading to this version where we introduce this new field. 792 // Note that this will result in new random values for existing Snapshot files, but 793 // because we now also perform a session reset on start up (see issue #533) it will 794 // not matter that this value does not map to the previous path of where this snapshot 795 // was stored, as it will be promptly replaced and forgotten. 796 #[serde(default)] 797 random: RrdpFileRandom, 798 799 time: Time, 800 elements: DeltaElements, 801 } 802 803 impl Delta { new(session: RrdpSession, serial: u64, elements: DeltaElements) -> Self804 pub fn new(session: RrdpSession, serial: u64, elements: DeltaElements) -> Self { 805 Delta { 806 session, 807 time: Time::now(), 808 random: RrdpFileRandom::default(), 809 serial, 810 elements, 811 } 812 } 813 session(&self) -> RrdpSession814 pub fn session(&self) -> RrdpSession { 815 self.session 816 } serial(&self) -> u64817 pub fn serial(&self) -> u64 { 818 self.serial 819 } time(&self) -> &Time820 pub fn time(&self) -> &Time { 821 &self.time 822 } 823 older_than_seconds(&self, seconds: i64) -> bool824 pub fn older_than_seconds(&self, seconds: i64) -> bool { 825 let then = Time::now() - Duration::seconds(seconds); 826 self.time < then 827 } 828 younger_than_seconds(&self, seconds: i64) -> bool829 pub fn younger_than_seconds(&self, seconds: i64) -> bool { 830 let then = Time::now() - Duration::seconds(seconds); 831 self.time > then 832 } 833 elements(&self) -> &DeltaElements834 pub fn elements(&self) -> &DeltaElements { 835 &self.elements 836 } 837 838 /// Total number of elements 839 /// 840 /// This is a cheap approximation of the size of the delta that can help 841 /// in determining the choice of how many deltas to include in a 842 /// notification file. len(&self) -> usize843 pub fn len(&self) -> usize { 844 self.elements.len() 845 } 846 is_empty(&self) -> bool847 pub fn is_empty(&self) -> bool { 848 self.elements.is_empty() 849 } 850 unwrap(self) -> (RrdpSession, u64, DeltaElements)851 pub fn unwrap(self) -> (RrdpSession, u64, DeltaElements) { 852 (self.session, self.serial, self.elements) 853 } 854 rel_path(&self) -> String855 fn rel_path(&self) -> String { 856 format!("{}/{}/{}/delta.xml", self.session, self.serial, self.random.0) 857 } 858 uri(&self, rrdp_base_uri: &uri::Https) -> uri::Https859 pub fn uri(&self, rrdp_base_uri: &uri::Https) -> uri::Https { 860 rrdp_base_uri.join(self.rel_path().as_ref()).unwrap() 861 } 862 path(&self, base_path: &Path) -> PathBuf863 pub fn path(&self, base_path: &Path) -> PathBuf { 864 base_path.join(self.rel_path()) 865 } 866 write_xml(&self, path: &Path) -> Result<(), KrillIoError>867 pub fn write_xml(&self, path: &Path) -> Result<(), KrillIoError> { 868 trace!("Writing delta file: {}", path.to_string_lossy()); 869 let vec = self.xml(); 870 let bytes = Bytes::from(vec); 871 file::save(&bytes, path)?; 872 873 Ok(()) 874 } 875 xml(&self) -> Vec<u8>876 pub fn xml(&self) -> Vec<u8> { 877 XmlWriter::encode_vec(|w| { 878 let a = [ 879 ("xmlns", NS), 880 ("version", VERSION), 881 ("session_id", &format!("{}", self.session)), 882 ("serial", &format!("{}", self.serial)), 883 ]; 884 885 w.put_element("delta", Some(&a), |w| { 886 for el in &self.elements.publishes { 887 let uri = el.uri.to_string(); 888 let atr = [("uri", uri.as_ref())]; 889 w.put_element("publish", Some(&atr), |w| w.put_text(el.base64.as_ref()))?; 890 } 891 892 for el in &self.elements.updates { 893 let uri = el.uri.to_string(); 894 let atr = [("uri", uri.as_ref()), ("hash", el.hash.as_ref())]; 895 w.put_element("publish", Some(&atr), |w| w.put_text(el.base64.as_ref()))?; 896 } 897 898 for el in &self.elements.withdraws { 899 let uri = el.uri.to_string(); 900 let atr = [("uri", uri.as_ref()), ("hash", el.hash.as_ref())]; 901 w.put_element("withdraw", Some(&atr), |w| w.empty())?; 902 } 903 904 Ok(()) 905 }) 906 }) 907 } 908 } 909