1 //! Data objects used in the (RRDP) repository. I.e. the publish, update, and
2 //! withdraw elements, as well as the notification, snapshot and delta file
3 //! definitions.
4 use std::{
5     fmt,
6     path::PathBuf,
7     {collections::HashMap, path::Path},
8 };
9 
10 use bytes::Bytes;
11 use chrono::Duration;
12 use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
13 use uuid::Uuid;
14 
15 use rpki::{repository::x509::Time, uri};
16 
17 use crate::{
18     commons::{
19         api::{publication, Base64, HexEncodedHash},
20         error::KrillIoError,
21         util::{file, xml::XmlWriter},
22     },
23     constants::RRDP_FIRST_SERIAL,
24 };
25 
26 const VERSION: &str = "1";
27 const NS: &str = "http://www.ripe.net/rpki/rrdp";
28 
29 //------------ RrdpSession ---------------------------------------------------
30 #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
31 pub struct RrdpSession(Uuid);
32 
33 impl Default for RrdpSession {
default() -> Self34     fn default() -> Self {
35         RrdpSession(Uuid::new_v4())
36     }
37 }
38 
39 impl RrdpSession {
random() -> Self40     pub fn random() -> Self {
41         Self::default()
42     }
43 }
44 
45 impl AsRef<Uuid> for RrdpSession {
as_ref(&self) -> &Uuid46     fn as_ref(&self) -> &Uuid {
47         &self.0
48     }
49 }
50 
51 impl Serialize for RrdpSession {
serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: Serializer,52     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
53     where
54         S: Serializer,
55     {
56         self.to_string().serialize(serializer)
57     }
58 }
59 
60 impl<'de> Deserialize<'de> for RrdpSession {
deserialize<D>(deserializer: D) -> Result<Self, D::Error> where D: Deserializer<'de>,61     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
62     where
63         D: Deserializer<'de>,
64     {
65         let string = String::deserialize(deserializer)?;
66         let uuid = Uuid::parse_str(&string).map_err(de::Error::custom)?;
67 
68         Ok(RrdpSession(uuid))
69     }
70 }
71 
72 impl fmt::Display for RrdpSession {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result73     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
74         write!(f, "{}", self.0.to_hyphenated())
75     }
76 }
77 
78 //------------ PublishElement ------------------------------------------------
79 
80 /// The publishes as used in the RRDP protocol.
81 ///
82 /// Note that the difference with the publication protocol is the absence of
83 /// the tag.
84 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
85 pub struct PublishElement {
86     base64: Base64,
87     uri: uri::Rsync,
88 }
89 
90 impl PublishElement {
new(base64: Base64, uri: uri::Rsync) -> Self91     pub fn new(base64: Base64, uri: uri::Rsync) -> Self {
92         PublishElement { base64, uri }
93     }
94 
base64(&self) -> &Base6495     pub fn base64(&self) -> &Base64 {
96         &self.base64
97     }
uri(&self) -> &uri::Rsync98     pub fn uri(&self) -> &uri::Rsync {
99         &self.uri
100     }
size(&self) -> usize101     pub fn size(&self) -> usize {
102         self.base64.size()
103     }
104 
as_withdraw(&self) -> WithdrawElement105     pub fn as_withdraw(&self) -> WithdrawElement {
106         WithdrawElement {
107             uri: self.uri.clone(),
108             hash: self.base64.to_encoded_hash(),
109         }
110     }
111 
unpack(self) -> (uri::Rsync, Base64)112     pub fn unpack(self) -> (uri::Rsync, Base64) {
113         (self.uri, self.base64)
114     }
115 }
116 
117 impl From<publication::Publish> for PublishElement {
from(p: publication::Publish) -> Self118     fn from(p: publication::Publish) -> Self {
119         let (_tag, uri, base64) = p.unpack();
120         PublishElement { base64, uri }
121     }
122 }
123 
124 //------------ UpdateElement -------------------------------------------------
125 
126 /// The updates as used in the RRDP protocol.
127 ///
128 /// Note that the difference with the publication protocol is the absence of
129 /// the tag.
130 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
131 pub struct UpdateElement {
132     uri: uri::Rsync,
133     hash: HexEncodedHash,
134     base64: Base64,
135 }
136 
137 impl UpdateElement {
uri(&self) -> &uri::Rsync138     pub fn uri(&self) -> &uri::Rsync {
139         &self.uri
140     }
hash(&self) -> &HexEncodedHash141     pub fn hash(&self) -> &HexEncodedHash {
142         &self.hash
143     }
base64(&self) -> &Base64144     pub fn base64(&self) -> &Base64 {
145         &self.base64
146     }
size(&self) -> usize147     pub fn size(&self) -> usize {
148         self.base64.size()
149     }
150 }
151 
152 impl From<publication::Update> for UpdateElement {
from(u: publication::Update) -> Self153     fn from(u: publication::Update) -> Self {
154         let (_tag, uri, base64, hash) = u.unwrap();
155         UpdateElement { uri, hash, base64 }
156     }
157 }
158 
159 impl From<UpdateElement> for PublishElement {
from(el: UpdateElement) -> Self160     fn from(el: UpdateElement) -> Self {
161         PublishElement {
162             uri: el.uri,
163             base64: el.base64,
164         }
165     }
166 }
167 
168 //------------ WithdrawElement -----------------------------------------------
169 
170 /// The withdraws as used in the RRDP protocol.
171 ///
172 /// Note that the difference with the publication protocol is the absence of
173 /// the tag.
174 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
175 pub struct WithdrawElement {
176     uri: uri::Rsync,
177     hash: HexEncodedHash,
178 }
179 
180 impl WithdrawElement {
uri(&self) -> &uri::Rsync181     pub fn uri(&self) -> &uri::Rsync {
182         &self.uri
183     }
hash(&self) -> &HexEncodedHash184     pub fn hash(&self) -> &HexEncodedHash {
185         &self.hash
186     }
187 }
188 
189 impl From<publication::Withdraw> for WithdrawElement {
from(w: publication::Withdraw) -> Self190     fn from(w: publication::Withdraw) -> Self {
191         let (_tag, uri, hash) = w.unwrap();
192         WithdrawElement { uri, hash }
193     }
194 }
195 
196 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
197 pub struct Notification {
198     session: RrdpSession,
199     serial: u64,
200     time: Time,
201     #[serde(skip_serializing_if = "Option::is_none")]
202     replaced: Option<Time>,
203     snapshot: SnapshotRef,
204     deltas: Vec<DeltaRef>,
205     last_delta: Option<u64>,
206 }
207 
208 impl Notification {
new(session: RrdpSession, serial: u64, snapshot: SnapshotRef, deltas: Vec<DeltaRef>) -> Self209     pub fn new(session: RrdpSession, serial: u64, snapshot: SnapshotRef, deltas: Vec<DeltaRef>) -> Self {
210         let last_delta = Self::find_last_delta(&deltas);
211         Notification {
212             session,
213             serial,
214             time: Time::now(),
215             replaced: None,
216             snapshot,
217             deltas,
218             last_delta,
219         }
220     }
221 
time(&self) -> Time222     pub fn time(&self) -> Time {
223         self.time
224     }
225 
226     #[deprecated] // use 'older_than_seconds'
replaced_after(&self, timestamp: i64) -> bool227     pub fn replaced_after(&self, timestamp: i64) -> bool {
228         if let Some(replaced) = self.replaced {
229             replaced.timestamp() > timestamp
230         } else {
231             false
232         }
233     }
234 
older_than_seconds(&self, seconds: i64) -> bool235     pub fn older_than_seconds(&self, seconds: i64) -> bool {
236         match self.replaced {
237             Some(time) => {
238                 let then = Time::now() - Duration::seconds(seconds);
239                 time < then
240             }
241             None => false,
242         }
243     }
244 
replace(&mut self, time: Time)245     pub fn replace(&mut self, time: Time) {
246         self.replaced = Some(time);
247     }
248 
serial(&self) -> u64249     pub fn serial(&self) -> u64 {
250         self.serial
251     }
252 
session(&self) -> RrdpSession253     pub fn session(&self) -> RrdpSession {
254         self.session
255     }
256 
last_delta(&self) -> Option<u64>257     pub fn last_delta(&self) -> Option<u64> {
258         self.last_delta
259     }
260 
includes_delta(&self, delta: u64) -> bool261     pub fn includes_delta(&self, delta: u64) -> bool {
262         if let Some(last) = self.last_delta {
263             last <= delta
264         } else {
265             false
266         }
267     }
268 
includes_snapshot(&self, version: u64) -> bool269     pub fn includes_snapshot(&self, version: u64) -> bool {
270         self.serial == version
271     }
272 
find_last_delta(deltas: &[DeltaRef]) -> Option<u64>273     fn find_last_delta(deltas: &[DeltaRef]) -> Option<u64> {
274         if deltas.is_empty() {
275             None
276         } else {
277             let mut serial = deltas[0].serial;
278             for d in deltas {
279                 if d.serial < serial {
280                     serial = d.serial
281                 }
282             }
283 
284             Some(serial)
285         }
286     }
287 }
288 
289 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
290 pub struct NotificationUpdate {
291     time: Time,
292     session: Option<RrdpSession>,
293     snapshot: SnapshotRef,
294     delta: DeltaRef,
295     last_delta: u64,
296 }
297 
298 impl NotificationUpdate {
new( time: Time, session: Option<RrdpSession>, snapshot: SnapshotRef, delta: DeltaRef, last_delta: u64, ) -> Self299     pub fn new(
300         time: Time,
301         session: Option<RrdpSession>,
302         snapshot: SnapshotRef,
303         delta: DeltaRef,
304         last_delta: u64,
305     ) -> Self {
306         NotificationUpdate {
307             time,
308             session,
309             snapshot,
310             delta,
311             last_delta,
312         }
313     }
314 }
315 
316 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
317 pub struct NotificationCreate {
318     session: RrdpSession,
319     snapshot: SnapshotRef,
320 }
321 
322 impl NotificationUpdate {
unwrap(self) -> (Time, Option<RrdpSession>, SnapshotRef, DeltaRef, u64)323     pub fn unwrap(self) -> (Time, Option<RrdpSession>, SnapshotRef, DeltaRef, u64) {
324         (self.time, self.session, self.snapshot, self.delta, self.last_delta)
325     }
326 }
327 
328 impl Notification {
create(session: RrdpSession, snapshot: SnapshotRef) -> Self329     pub fn create(session: RrdpSession, snapshot: SnapshotRef) -> Self {
330         Notification::new(session, RRDP_FIRST_SERIAL, snapshot, vec![])
331     }
332 
write_xml(&self, path: &Path) -> Result<(), KrillIoError>333     pub fn write_xml(&self, path: &Path) -> Result<(), KrillIoError> {
334         trace!("Writing notification file: {}", path.to_string_lossy());
335         let mut file = file::create_file_with_path(path)?;
336 
337         XmlWriter::encode_to_file(&mut file, |w| {
338             let a = [
339                 ("xmlns", NS),
340                 ("version", VERSION),
341                 ("session_id", &format!("{}", self.session)),
342                 ("serial", &format!("{}", self.serial)),
343             ];
344 
345             w.put_element("notification", Some(&a), |w| {
346                 {
347                     // snapshot ref
348                     let uri = self.snapshot.uri.to_string();
349                     let a = [("uri", uri.as_str()), ("hash", self.snapshot.hash.as_ref())];
350                     w.put_element("snapshot", Some(&a), |w| w.empty())?;
351                 }
352 
353                 {
354                     // delta refs
355                     for delta in &self.deltas {
356                         let serial = format!("{}", delta.serial);
357                         let uri = delta.file_ref.uri.to_string();
358                         let a = [
359                             ("serial", serial.as_ref()),
360                             ("uri", uri.as_str()),
361                             ("hash", delta.file_ref.hash.as_ref()),
362                         ];
363                         w.put_element("delta", Some(&a), |w| w.empty())?;
364                     }
365                 }
366 
367                 Ok(())
368             })
369         })
370         .map_err(|e| KrillIoError::new(format!("Could not write XML to: {}", path.to_string_lossy()), e))
371     }
372 }
373 
374 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
375 pub struct FileRef {
376     uri: uri::Https,
377     path: PathBuf,
378     hash: HexEncodedHash,
379 }
380 
381 impl FileRef {
new(uri: uri::Https, path: PathBuf, hash: HexEncodedHash) -> Self382     pub fn new(uri: uri::Https, path: PathBuf, hash: HexEncodedHash) -> Self {
383         FileRef { uri, path, hash }
384     }
uri(&self) -> &uri::Https385     pub fn uri(&self) -> &uri::Https {
386         &self.uri
387     }
path(&self) -> &PathBuf388     pub fn path(&self) -> &PathBuf {
389         &self.path
390     }
hash(&self) -> &HexEncodedHash391     pub fn hash(&self) -> &HexEncodedHash {
392         &self.hash
393     }
394 }
395 
396 pub type SnapshotRef = FileRef;
397 
398 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
399 pub struct DeltaRef {
400     serial: u64,
401     file_ref: FileRef,
402 }
403 
404 impl DeltaRef {
new(serial: u64, file_ref: FileRef) -> Self405     pub fn new(serial: u64, file_ref: FileRef) -> Self {
406         DeltaRef { serial, file_ref }
407     }
408 
serial(&self) -> u64409     pub fn serial(&self) -> u64 {
410         self.serial
411     }
412 }
413 
414 impl AsRef<FileRef> for DeltaRef {
as_ref(&self) -> &FileRef415     fn as_ref(&self) -> &FileRef {
416         &self.file_ref
417     }
418 }
419 
420 //------------ CurrentObjects ------------------------------------------------
421 
422 /// Defines a current set of published elements.
423 ///
424 // Note this is mapped internally for speedy access, by hash, rather than uri
425 // for two reasons:
426 // a) URIs in RPKI may change in future
427 // b) The publish element as it appears in an RFC8182 snapshot.xml includes
428 // the uri and the base64, but not the hash. So keeping the actual elements
429 // around means we can be more efficient in producing that output.
430 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
431 pub struct CurrentObjects(HashMap<HexEncodedHash, PublishElement>);
432 
433 impl Default for CurrentObjects {
default() -> Self434     fn default() -> Self {
435         CurrentObjects(HashMap::new())
436     }
437 }
438 
439 impl CurrentObjects {
new(map: HashMap<HexEncodedHash, PublishElement>) -> Self440     pub fn new(map: HashMap<HexEncodedHash, PublishElement>) -> Self {
441         CurrentObjects(map)
442     }
443 
elements(&self) -> Vec<&PublishElement>444     pub fn elements(&self) -> Vec<&PublishElement> {
445         let mut res = vec![];
446         for el in self.0.values() {
447             res.push(el)
448         }
449         res
450     }
451 
into_elements(self) -> Vec<PublishElement>452     pub fn into_elements(self) -> Vec<PublishElement> {
453         self.0.into_iter().map(|(_, e)| e).collect()
454     }
455 
has_match(&self, hash: &HexEncodedHash, uri: &uri::Rsync) -> bool456     fn has_match(&self, hash: &HexEncodedHash, uri: &uri::Rsync) -> bool {
457         match self.0.get(hash) {
458             Some(el) => el.uri() == uri,
459             None => false,
460         }
461     }
462 
verify_delta(&self, delta: &DeltaElements, jail: &uri::Rsync) -> Result<(), PublicationDeltaError>463     fn verify_delta(&self, delta: &DeltaElements, jail: &uri::Rsync) -> Result<(), PublicationDeltaError> {
464         for p in delta.publishes() {
465             if !jail.is_parent_of(p.uri()) {
466                 return Err(PublicationDeltaError::outside(jail, p.uri()));
467             }
468             let hash = p.base64().to_encoded_hash();
469             if self.0.contains_key(&hash) {
470                 return Err(PublicationDeltaError::present(p.uri()));
471             }
472         }
473 
474         for u in delta.updates() {
475             if !jail.is_parent_of(u.uri()) {
476                 return Err(PublicationDeltaError::outside(jail, u.uri()));
477             }
478             if !self.has_match(u.hash(), u.uri()) {
479                 return Err(PublicationDeltaError::no_match(u.uri()));
480             }
481         }
482 
483         for w in delta.withdraws() {
484             if !jail.is_parent_of(w.uri()) {
485                 return Err(PublicationDeltaError::outside(jail, w.uri()));
486             }
487             if !self.has_match(w.hash(), w.uri()) {
488                 return Err(PublicationDeltaError::no_match(w.uri()));
489             }
490         }
491 
492         Ok(())
493     }
494 
495     /// Applies a delta to CurrentObjects. This will verify that the
496     /// delta is legal with regards to existing objects, and the jail
497     /// specified for the publisher.
apply_delta(&mut self, delta: DeltaElements, jail: &uri::Rsync) -> Result<(), PublicationDeltaError>498     pub fn apply_delta(&mut self, delta: DeltaElements, jail: &uri::Rsync) -> Result<(), PublicationDeltaError> {
499         self.verify_delta(&delta, jail)?;
500 
501         let (publishes, updates, withdraws) = delta.unpack();
502 
503         for p in publishes {
504             let hash = p.base64().to_encoded_hash();
505             self.0.insert(hash, p);
506         }
507 
508         for u in updates {
509             self.0.remove(u.hash());
510             let p: PublishElement = u.into();
511             let hash = p.base64().to_encoded_hash();
512             self.0.insert(hash, p);
513         }
514 
515         for w in withdraws {
516             self.0.remove(w.hash());
517         }
518 
519         Ok(())
520     }
521 
len(&self) -> usize522     pub fn len(&self) -> usize {
523         self.0.len()
524     }
525 
size(&self) -> usize526     pub fn size(&self) -> usize {
527         self.0.values().fold(0, |tot, el| tot + el.size())
528     }
529 
is_empty(&self) -> bool530     pub fn is_empty(&self) -> bool {
531         self.0.is_empty()
532     }
533 
to_list_reply(&self) -> publication::ListReply534     pub fn to_list_reply(&self) -> publication::ListReply {
535         let elements = self
536             .0
537             .iter()
538             .map(|el| {
539                 let hash = el.0.clone();
540                 let uri = el.1.uri().clone();
541                 publication::ListElement::new(uri, hash)
542             })
543             .collect();
544 
545         publication::ListReply::new(elements)
546     }
547 }
548 
549 //------------ PublicationDeltaError ---------------------------------------------
550 
551 /// Issues with relation to verifying deltas.
552 #[derive(Clone, Debug)]
553 pub enum PublicationDeltaError {
554     UriOutsideJail(uri::Rsync, uri::Rsync),
555     ObjectAlreadyPresent(uri::Rsync),
556     NoObjectForHashAndOrUri(uri::Rsync),
557 }
558 
559 impl fmt::Display for PublicationDeltaError {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result560     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
561         match self {
562             PublicationDeltaError::UriOutsideJail(uri, jail) => {
563                 write!(f, "Publishing ({}) outside of jail URI ({}) is not allowed.", uri, jail)
564             }
565             PublicationDeltaError::ObjectAlreadyPresent(uri) => {
566                 write!(f, "File already exists for uri (use update!): {}", uri)
567             }
568             PublicationDeltaError::NoObjectForHashAndOrUri(uri) => {
569                 write!(f, "File does not match hash at uri: {}", uri)
570             }
571         }
572     }
573 }
574 
575 impl PublicationDeltaError {
outside(jail: &uri::Rsync, uri: &uri::Rsync) -> Self576     fn outside(jail: &uri::Rsync, uri: &uri::Rsync) -> Self {
577         PublicationDeltaError::UriOutsideJail(uri.clone(), jail.clone())
578     }
579 
present(uri: &uri::Rsync) -> Self580     fn present(uri: &uri::Rsync) -> Self {
581         PublicationDeltaError::ObjectAlreadyPresent(uri.clone())
582     }
583 
no_match(uri: &uri::Rsync) -> Self584     fn no_match(uri: &uri::Rsync) -> Self {
585         PublicationDeltaError::NoObjectForHashAndOrUri(uri.clone())
586     }
587 }
588 
589 //------------ RrdpFileRandom ------------------------------------------------
590 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
591 pub struct RrdpFileRandom(String);
592 
593 impl Default for RrdpFileRandom {
default() -> Self594     fn default() -> Self {
595         let mut bytes = [0; 8];
596         openssl::rand::rand_bytes(&mut bytes).unwrap();
597         let s = hex::encode(bytes);
598         RrdpFileRandom(s)
599     }
600 }
601 
602 //------------ Snapshot ------------------------------------------------------
603 
604 /// A structure to contain the RRDP snapshot data.
605 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
606 pub struct Snapshot {
607     session: RrdpSession,
608     serial: u64,
609 
610     // By using the default (i.e. random) for deserializing where this is absent, we do not
611     // need to migrate data when upgrading to this version where we introduce this new field.
612     // Note that this will result in new random values for existing Snapshot files, but
613     // because we now also perform a session reset on start up (see issue #533) it will
614     // not matter that this value does not map to the previous path of where this snapshot
615     // was stored, as it will be promptly replaced and forgotten.
616     #[serde(default)]
617     random: RrdpFileRandom,
618 
619     current_objects: CurrentObjects,
620 }
621 
622 impl Snapshot {
new(session: RrdpSession, serial: u64, current_objects: CurrentObjects) -> Self623     pub fn new(session: RrdpSession, serial: u64, current_objects: CurrentObjects) -> Self {
624         Snapshot {
625             session,
626             serial,
627             random: RrdpFileRandom::default(),
628             current_objects,
629         }
630     }
631 
unpack(self) -> (RrdpSession, u64, CurrentObjects)632     pub fn unpack(self) -> (RrdpSession, u64, CurrentObjects) {
633         (self.session, self.serial, self.current_objects)
634     }
635 
create(session: RrdpSession) -> Self636     pub fn create(session: RrdpSession) -> Self {
637         let current_objects = CurrentObjects::default();
638         Snapshot {
639             session,
640             serial: RRDP_FIRST_SERIAL,
641             random: RrdpFileRandom::default(),
642             current_objects,
643         }
644     }
645 
session_reset(&self, session: RrdpSession) -> Self646     pub fn session_reset(&self, session: RrdpSession) -> Self {
647         Snapshot {
648             session,
649             serial: RRDP_FIRST_SERIAL,
650             random: RrdpFileRandom::default(),
651             current_objects: self.current_objects.clone(),
652         }
653     }
654 
elements(&self) -> Vec<&PublishElement>655     pub fn elements(&self) -> Vec<&PublishElement> {
656         self.current_objects.elements()
657     }
658 
serial(&self) -> u64659     pub fn serial(&self) -> u64 {
660         self.serial
661     }
662 
apply_delta(&mut self, elements: DeltaElements, jail: &uri::Rsync) -> Result<(), PublicationDeltaError>663     pub fn apply_delta(&mut self, elements: DeltaElements, jail: &uri::Rsync) -> Result<(), PublicationDeltaError> {
664         self.serial += 1;
665         self.random = RrdpFileRandom::default();
666         self.current_objects.apply_delta(elements, jail)
667     }
668 
size(&self) -> usize669     pub fn size(&self) -> usize {
670         self.current_objects.elements().iter().fold(0, |sum, p| sum + p.size())
671     }
672 
rel_path(&self) -> String673     fn rel_path(&self) -> String {
674         format!("{}/{}/{}/snapshot.xml", self.session, self.serial, self.random.0)
675     }
676 
uri(&self, rrdp_base_uri: &uri::Https) -> uri::Https677     pub fn uri(&self, rrdp_base_uri: &uri::Https) -> uri::Https {
678         rrdp_base_uri.join(self.rel_path().as_ref()).unwrap()
679     }
680 
path(&self, base_path: &Path) -> PathBuf681     pub fn path(&self, base_path: &Path) -> PathBuf {
682         base_path.join(self.rel_path())
683     }
684 
write_xml(&self, path: &Path) -> Result<(), KrillIoError>685     pub fn write_xml(&self, path: &Path) -> Result<(), KrillIoError> {
686         trace!("Writing snapshot file: {}", path.to_string_lossy());
687         let vec = self.xml();
688         let bytes = Bytes::from(vec);
689 
690         file::save(&bytes, path)
691     }
692 
xml(&self) -> Vec<u8>693     pub fn xml(&self) -> Vec<u8> {
694         XmlWriter::encode_vec(|w| {
695             let a = [
696                 ("xmlns", NS),
697                 ("version", VERSION),
698                 ("session_id", &format!("{}", self.session)),
699                 ("serial", &format!("{}", self.serial)),
700             ];
701 
702             w.put_element("snapshot", Some(&a), |w| {
703                 for el in self.current_objects.elements() {
704                     let uri = el.uri.to_string();
705                     let atr = [("uri", uri.as_ref())];
706                     w.put_element("publish", Some(&atr), |w| w.put_text(el.base64.as_ref()))
707                         .unwrap();
708                 }
709                 Ok(())
710             })
711         })
712     }
713 }
714 
715 //------------ DeltaElements -------------------------------------------------
716 
717 /// Defines the elements for an RRDP delta.
718 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
719 pub struct DeltaElements {
720     publishes: Vec<PublishElement>,
721     updates: Vec<UpdateElement>,
722     withdraws: Vec<WithdrawElement>,
723 }
724 
725 impl DeltaElements {
new(publishes: Vec<PublishElement>, updates: Vec<UpdateElement>, withdraws: Vec<WithdrawElement>) -> Self726     pub fn new(publishes: Vec<PublishElement>, updates: Vec<UpdateElement>, withdraws: Vec<WithdrawElement>) -> Self {
727         DeltaElements {
728             publishes,
729             updates,
730             withdraws,
731         }
732     }
733 
unpack(self) -> (Vec<PublishElement>, Vec<UpdateElement>, Vec<WithdrawElement>)734     pub fn unpack(self) -> (Vec<PublishElement>, Vec<UpdateElement>, Vec<WithdrawElement>) {
735         (self.publishes, self.updates, self.withdraws)
736     }
737 
len(&self) -> usize738     pub fn len(&self) -> usize {
739         self.publishes.len() + self.updates.len() + self.withdraws.len()
740     }
741 
size(&self) -> usize742     pub fn size(&self) -> usize {
743         let sum_publishes = self.publishes.iter().fold(0, |sum, p| sum + p.size());
744         let sum_updates = self.updates.iter().fold(0, |sum, u| sum + u.size());
745 
746         sum_publishes + sum_updates
747     }
748 
is_empty(&self) -> bool749     pub fn is_empty(&self) -> bool {
750         self.len() == 0
751     }
752 
publishes(&self) -> &Vec<PublishElement>753     pub fn publishes(&self) -> &Vec<PublishElement> {
754         &self.publishes
755     }
756 
updates(&self) -> &Vec<UpdateElement>757     pub fn updates(&self) -> &Vec<UpdateElement> {
758         &self.updates
759     }
760 
withdraws(&self) -> &Vec<WithdrawElement>761     pub fn withdraws(&self) -> &Vec<WithdrawElement> {
762         &self.withdraws
763     }
764 }
765 
766 impl From<publication::PublishDelta> for DeltaElements {
from(d: publication::PublishDelta) -> Self767     fn from(d: publication::PublishDelta) -> Self {
768         let (publishers, updates, withdraws) = d.unwrap();
769 
770         let publishes = publishers.into_iter().map(PublishElement::from).collect();
771         let updates = updates.into_iter().map(UpdateElement::from).collect();
772         let withdraws = withdraws.into_iter().map(WithdrawElement::from).collect();
773 
774         DeltaElements {
775             publishes,
776             updates,
777             withdraws,
778         }
779     }
780 }
781 
782 //------------ Delta ---------------------------------------------------------
783 
784 /// Defines an RRDP delta.
785 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
786 pub struct Delta {
787     session: RrdpSession,
788     serial: u64,
789 
790     // By using the default (i.e. random) for deserializing where this is absent, we do not
791     // need to migrate data when upgrading to this version where we introduce this new field.
792     // Note that this will result in new random values for existing Snapshot files, but
793     // because we now also perform a session reset on start up (see issue #533) it will
794     // not matter that this value does not map to the previous path of where this snapshot
795     // was stored, as it will be promptly replaced and forgotten.
796     #[serde(default)]
797     random: RrdpFileRandom,
798 
799     time: Time,
800     elements: DeltaElements,
801 }
802 
803 impl Delta {
new(session: RrdpSession, serial: u64, elements: DeltaElements) -> Self804     pub fn new(session: RrdpSession, serial: u64, elements: DeltaElements) -> Self {
805         Delta {
806             session,
807             time: Time::now(),
808             random: RrdpFileRandom::default(),
809             serial,
810             elements,
811         }
812     }
813 
session(&self) -> RrdpSession814     pub fn session(&self) -> RrdpSession {
815         self.session
816     }
serial(&self) -> u64817     pub fn serial(&self) -> u64 {
818         self.serial
819     }
time(&self) -> &Time820     pub fn time(&self) -> &Time {
821         &self.time
822     }
823 
older_than_seconds(&self, seconds: i64) -> bool824     pub fn older_than_seconds(&self, seconds: i64) -> bool {
825         let then = Time::now() - Duration::seconds(seconds);
826         self.time < then
827     }
828 
younger_than_seconds(&self, seconds: i64) -> bool829     pub fn younger_than_seconds(&self, seconds: i64) -> bool {
830         let then = Time::now() - Duration::seconds(seconds);
831         self.time > then
832     }
833 
elements(&self) -> &DeltaElements834     pub fn elements(&self) -> &DeltaElements {
835         &self.elements
836     }
837 
838     /// Total number of elements
839     ///
840     /// This is a cheap approximation of the size of the delta that can help
841     /// in determining the choice of how many deltas to include in a
842     /// notification file.
len(&self) -> usize843     pub fn len(&self) -> usize {
844         self.elements.len()
845     }
846 
is_empty(&self) -> bool847     pub fn is_empty(&self) -> bool {
848         self.elements.is_empty()
849     }
850 
unwrap(self) -> (RrdpSession, u64, DeltaElements)851     pub fn unwrap(self) -> (RrdpSession, u64, DeltaElements) {
852         (self.session, self.serial, self.elements)
853     }
854 
rel_path(&self) -> String855     fn rel_path(&self) -> String {
856         format!("{}/{}/{}/delta.xml", self.session, self.serial, self.random.0)
857     }
858 
uri(&self, rrdp_base_uri: &uri::Https) -> uri::Https859     pub fn uri(&self, rrdp_base_uri: &uri::Https) -> uri::Https {
860         rrdp_base_uri.join(self.rel_path().as_ref()).unwrap()
861     }
862 
path(&self, base_path: &Path) -> PathBuf863     pub fn path(&self, base_path: &Path) -> PathBuf {
864         base_path.join(self.rel_path())
865     }
866 
write_xml(&self, path: &Path) -> Result<(), KrillIoError>867     pub fn write_xml(&self, path: &Path) -> Result<(), KrillIoError> {
868         trace!("Writing delta file: {}", path.to_string_lossy());
869         let vec = self.xml();
870         let bytes = Bytes::from(vec);
871         file::save(&bytes, path)?;
872 
873         Ok(())
874     }
875 
xml(&self) -> Vec<u8>876     pub fn xml(&self) -> Vec<u8> {
877         XmlWriter::encode_vec(|w| {
878             let a = [
879                 ("xmlns", NS),
880                 ("version", VERSION),
881                 ("session_id", &format!("{}", self.session)),
882                 ("serial", &format!("{}", self.serial)),
883             ];
884 
885             w.put_element("delta", Some(&a), |w| {
886                 for el in &self.elements.publishes {
887                     let uri = el.uri.to_string();
888                     let atr = [("uri", uri.as_ref())];
889                     w.put_element("publish", Some(&atr), |w| w.put_text(el.base64.as_ref()))?;
890                 }
891 
892                 for el in &self.elements.updates {
893                     let uri = el.uri.to_string();
894                     let atr = [("uri", uri.as_ref()), ("hash", el.hash.as_ref())];
895                     w.put_element("publish", Some(&atr), |w| w.put_text(el.base64.as_ref()))?;
896                 }
897 
898                 for el in &self.elements.withdraws {
899                     let uri = el.uri.to_string();
900                     let atr = [("uri", uri.as_ref()), ("hash", el.hash.as_ref())];
901                     w.put_element("withdraw", Some(&atr), |w| w.empty())?;
902                 }
903 
904                 Ok(())
905             })
906         })
907     }
908 }
909