1 //! Parsing and processing of RRDP responses.
2 //!
3 //! This module provides the scaffolding for client-side processing of the
4 //! RPKI Repository Delta Protocol (RRDP) as defined in [RFC 8182].
5 //!
6 //! Processing is done in two parts. The RRDP notification file is parsed into
7 //! a value of type [`NotificationFile`]. Processing of snapshot and delta
8 //! files is done incrementally via the [`ProcessSnapshot`] and
9 //! [`ProcessDelta`] traits since these files can become rather big.
10 //!
11 //! The module does not provide an HTTP client. Rather, it relies on the
12 //! `std::io::Read` trait for processing. As such, it is also not compatible
13 //! with async processing.
14 //!
15 //! A note on terminology: to avoid confusion, the term ‘file’ refers to the
16 //! RRDP data itself, i.e., the notification, snapshot, and delta files. The
17 //! repository’s content synchronized using RRDP also consists of a set of
18 //! files, which we will refer to as ‘objects.’
19 //!
20 //! [RFC 8182]: https://tools.ietf.org/html/rfc8182
21 
22 #![cfg(feature = "rrdp")]
23 
24 use std::{error, fmt, hash, io, str};
25 use std::io::Read;
26 use std::convert::TryFrom;
27 use std::convert::TryInto;
28 use std::ops::Deref;
29 use bytes::Bytes;
30 use log::info;
31 use ring::digest;
32 use uuid::Uuid;
33 use crate::{uri, xml};
34 use crate::xml::decode::{Content, Error as XmlError, Reader, Name};
35 
36 #[cfg(feature = "serde")] use std::str::FromStr;
37 #[cfg(feature = "serde")] use serde::{
38     Deserialize, Deserializer, Serialize, Serializer
39 };
40 
41 
42 //------------ NotificationFile ----------------------------------------------
43 
44 /// The RRDP Update Notification File.
45 ///
46 /// This type represents the decoded content of the RRDP Update Notification
47 /// File. It can be read from a reader via the [`parse`][Self::parse]
48 /// function. All elements are accessible as attributes.
49 #[derive(Clone, Debug, Eq, PartialEq)]
50 pub struct NotificationFile {
51     /// The identifier of the current session of the server.
52     session_id: Uuid,
53 
54     /// The serial number of the most recent update.
55     serial: u64,
56 
57     /// Information about the most recent snapshot.
58     snapshot: SnapshotInfo,
59 
60     /// The list of available delta updates.
61     deltas: Vec<DeltaInfo>,
62 }
63 
64 /// # Data Access
65 ///
66 impl NotificationFile {
67     /// Creates a new notification file from the given components.
new( session_id: Uuid, serial: u64, snapshot: UriAndHash, deltas: Vec<DeltaInfo>, ) -> Self68     pub fn new(
69         session_id: Uuid,
70         serial: u64,
71         snapshot: UriAndHash,
72         deltas: Vec<DeltaInfo>,
73     ) -> Self {
74         NotificationFile { session_id, serial, snapshot, deltas }
75     }
76 
77     /// Returns the identifier of the current session of the server.
78     ///
79     /// Delta updates can only be used if the session ID of the last processed
80     /// update matches this value.
session_id(&self) -> Uuid81     pub fn session_id(&self) -> Uuid {
82         self.session_id
83     }
84 
85     /// Returns the serial number of the most recent update.
86     ///
87     /// Serial numbers increase by one between each update.
serial(&self) -> u6488     pub fn serial(&self) -> u64 {
89         self.serial
90     }
91 
92     /// Returns information about the most recent snapshot.
93     ///
94     /// The snapshot contains a complete set of all data published via the
95     /// repository. It can be processed using the [`ProcessSnapshot`] trait.
snapshot(&self) -> &SnapshotInfo96     pub fn snapshot(&self) -> &SnapshotInfo {
97         &self.snapshot
98     }
99 
100     /// Returns the list of available delta updates.
101     ///
102     /// Deltas can be processed using the [`ProcessDelta`] trait.
deltas(&self) -> &[DeltaInfo]103     pub fn deltas(&self) -> &[DeltaInfo] {
104         &self.deltas
105     }
106 
107     /// Sorts the deltas by increasing serial numbers.
108     ///
109     /// In other words, the delta with the smallest serial number will
110     /// appear at the beginning of the sequence.
sort_deltas(&mut self)111     pub fn sort_deltas(&mut self) {
112         self.deltas.sort_by_key(|delta| delta.serial());
113     }
114 
115     /// Sorts the deltas by decreasing serial numbers.
116     ///
117     /// In other words, the delta with the largest serial number will
118     /// appear at the beginning of the sequence.
reverse_sort_deltas(&mut self)119     pub fn reverse_sort_deltas(&mut self) {
120         self.deltas.sort_by(|a,b| b.serial.cmp(&a.serial));
121     }
122 
123     /// Sorts, verifies, and optionally limits the list of deltas.
124     ///
125     /// Sorts the deltas by increasing serial number. If `limit` is given,
126     /// it then retains at most that many of the newest deltas.
127     ///
128     /// Returns whether there are no gaps in the retained deltas.
sort_and_verify_deltas(&mut self, limit: Option<usize>) -> bool129     pub fn sort_and_verify_deltas(&mut self, limit: Option<usize>) -> bool {
130         if !self.deltas.is_empty() {
131             self.sort_deltas();
132 
133             if let Some(limit) = limit {
134                 if limit < self.deltas.len() {
135                     let offset = self.deltas.len() - limit;
136                     self.deltas.drain(..offset);
137                 }
138             }
139 
140             let mut last_seen = self.deltas[0].serial();
141             for delta in &self.deltas[1..] {
142                 if last_seen + 1 != delta.serial() {
143                     return false;
144                 } else {
145                     last_seen = delta.serial()
146                 }
147             }
148         }
149 
150         true
151     }
152 
153 }
154 
155 /// # XML support
156 ///
157 impl NotificationFile {
158     /// Parses the notification file from its XML representation.
parse<R: io::BufRead>(reader: R) -> Result<Self, XmlError>159     pub fn parse<R: io::BufRead>(reader: R) -> Result<Self, XmlError> {
160         let mut reader = Reader::new(reader);
161 
162         let mut session_id = None;
163         let mut serial = None;
164         let mut outer = reader.start(|element| {
165             if element.name() != NOTIFICATION {
166                 return Err(XmlError::Malformed)
167             }
168 
169             element.attributes(|name, value| match name {
170                 b"version" => {
171                     if value.ascii_into::<u8>()? != 1 {
172                         return Err(XmlError::Malformed)
173                     }
174                     Ok(())
175                 }
176                 b"session_id" => {
177                     session_id = Some(value.ascii_into()?);
178                     Ok(())
179                 }
180                 b"serial" => {
181                     serial = Some(value.ascii_into()?);
182                     Ok(())
183                 }
184                 _ => Err(XmlError::Malformed)
185             })
186         })?;
187 
188         let mut snapshot = None;
189 
190         let mut deltas = vec![];
191 
192         while let Some(mut content) = outer.take_opt_element(&mut reader,
193                                                              |element| {
194             match element.name() {
195                 SNAPSHOT => {
196                     if snapshot.is_some() {
197                         return Err(XmlError::Malformed)
198                     }
199                     let mut uri = None;
200                     let mut hash = None;
201                     element.attributes(|name, value| match name {
202                         b"uri" => {
203                             uri = Some(value.ascii_into()?);
204                             Ok(())
205                         }
206                         b"hash" => {
207                             hash = Some(value.ascii_into()?);
208                             Ok(())
209                         }
210                         _ => Err(XmlError::Malformed)
211                     })?;
212                     match (uri, hash) {
213                         (Some(uri), Some(hash)) => {
214                             snapshot = Some(UriAndHash::new(uri, hash));
215                             Ok(())
216                         }
217                         _ => Err(XmlError::Malformed)
218                     }
219                 }
220                 DELTA => {
221                     let mut serial = None;
222                     let mut uri = None;
223                     let mut hash = None;
224                     element.attributes(|name, value| match name {
225                         b"serial" => {
226                             serial = Some(value.ascii_into()?);
227                             Ok(())
228                         }
229                         b"uri" => {
230                             uri = Some(value.ascii_into()?);
231                             Ok(())
232                         }
233                         b"hash" => {
234                             hash = Some(value.ascii_into()?);
235                             Ok(())
236                         }
237                         _ => Err(XmlError::Malformed)
238                     })?;
239                     match (serial, uri, hash) {
240                         (Some(serial), Some(uri), Some(hash)) => {
241                             deltas.push(DeltaInfo::new(serial, uri, hash));
242                             Ok(())
243                         }
244                         _ => Err(XmlError::Malformed)
245                     }
246                 }
247                 _ => Err(XmlError::Malformed)
248             }
249         })? {
250             content.take_end(&mut reader)?;
251         }
252 
253         outer.take_end(&mut reader)?;
254         reader.end()?;
255 
256         match (session_id, serial, snapshot) {
257             (Some(session_id), Some(serial), Some(snapshot)) => {
258                 Ok(NotificationFile { session_id, serial, snapshot, deltas })
259             }
260             _ => Err(XmlError::Malformed)
261         }
262     }
263 
264     /// Writes the notification file as RFC 8182 XML.
write_xml( &self, writer: &mut impl io::Write ) -> Result<(), io::Error>265     pub fn write_xml(
266         &self, writer: &mut impl io::Write
267     ) -> Result<(), io::Error> {
268         let mut writer = xml::encode::Writer::new(writer);
269         writer.element(NOTIFICATION.into_unqualified())?
270             .attr("xmlns", NS)?
271             .attr("version", "1")?
272             .attr("session_id", &self.session_id)?
273             .attr("serial", &self.serial)?
274             .content(|content| {
275                 // add snapshot
276                 content.element(SNAPSHOT.into_unqualified())?
277                     .attr("uri", self.snapshot.uri())?
278                     .attr("hash", &self.snapshot.hash())?
279                 ;
280 
281                 // add deltas
282                 for delta in self.deltas() {
283                     content.element(DELTA.into_unqualified())?
284                         .attr("serial", &delta.serial())?
285                         .attr("uri", delta.uri())?
286                         .attr("hash", &delta.hash())?
287                     ;
288                 }
289 
290                 Ok(())
291             })?;
292         writer.done()
293     }
294 }
295 
296 
297 //------------ PublishElement ------------------------------------------------
298 
299 /// Am RPKI object to be published for the first time.
300 ///
301 /// This type defines an RRDP publish element as found in RRDP Snapshots and
302 /// Deltas. See [`UpdateElement`] for the related element that replaces a
303 /// previous element for the same uri.
304 #[derive(Clone, Debug, Eq, PartialEq)]
305 pub struct PublishElement {
306     /// The URI of the object to be published.
307     uri: uri::Rsync,
308 
309     /// The content of the object to be published.
310     ///
311     /// This is the raw content. It is _not_ Base64 encoded.
312     data: Bytes,
313 }
314 
315 impl PublishElement {
316     /// Creates a new publish element from the object URI and content.
317     ///
318     /// The content provided via `data` is the raw content and must not yet
319     /// be Base64 encoded.
new( uri: uri::Rsync, data: Bytes, ) -> Self320     pub fn new(
321         uri: uri::Rsync,
322         data: Bytes,
323     ) -> Self {
324         PublishElement { uri, data }
325     }
326 
327     /// Returns the published object’s URI.
uri(&self) -> &uri::Rsync328     pub fn uri(&self) -> &uri::Rsync {
329         &self.uri
330     }
331 
332     /// Returns the published object’s content.
data(&self) -> &Bytes333     pub fn data(&self) -> &Bytes {
334         &self.data
335     }
336 
337     /// Converts `self` into the object’s URI and content.
unpack(self) -> (uri::Rsync, Bytes)338     pub fn unpack(self) -> (uri::Rsync, Bytes) {
339         (self.uri, self.data)
340     }
341 
342     /// Writes the publish element’s XML.
write_xml( &self, content: &mut xml::encode::Content<impl io::Write> ) -> Result<(), io::Error>343     fn write_xml(
344         &self,
345         content: &mut xml::encode::Content<impl io::Write>
346     ) -> Result<(), io::Error> {
347         content.element(PUBLISH.into_unqualified())?
348             .attr("uri", &self.uri)?
349             .content(|content| {
350                 content.base64(self.data.as_ref())
351             })?;
352         Ok(())
353     }
354 }
355 
356 
357 //------------ UpdateElement -------------------------------------------------
358 
359 /// An RPKI object to be updated with new content.
360 ///
361 /// This type defines an RRDP update element as found in RRDP deltas. It is
362 /// like a [`PublishElement`] except that it replaces an existing object for
363 /// a URI.
364 #[derive(Clone, Debug, Eq, PartialEq)]
365 pub struct UpdateElement {
366     /// The URI of the object to be updated.
367     uri: uri::Rsync,
368 
369     /// The SHA-256 hash of the previous content of the object.
370     hash: Hash,
371 
372     /// The new content of the object.
373     ///
374     /// This is the raw content. It is _not_ Base64 encoded.
375     data: Bytes,
376 }
377 
378 impl UpdateElement {
379     /// Creates a new update element from its components.
new(uri: uri::Rsync, hash: Hash, data: Bytes) -> Self380     pub fn new(uri: uri::Rsync, hash: Hash, data: Bytes) -> Self {
381         UpdateElement { uri, hash, data }
382     }
383 
384     /// Returns the URI of the object to update.
uri(&self) -> &uri::Rsync385     pub fn uri(&self) -> &uri::Rsync {
386         &self.uri
387     }
388 
389     /// Returns the hash of the previous content.
hash(&self) -> &Hash390     pub fn hash(&self) -> &Hash {
391         &self.hash
392     }
393 
394     /// Returns the new content of the object.
data(&self) -> &Bytes395     pub fn data(&self) -> &Bytes {
396         &self.data
397     }
398 
399     /// Unpacks the update element into its components.
unpack(self) -> (uri::Rsync, Hash, Bytes)400     pub fn unpack(self) -> (uri::Rsync, Hash, Bytes) {
401         (self.uri, self.hash, self.data)
402     }
403 }
404 
405 impl UpdateElement {
406     /// Writes the update element’s XML.
write_xml( &self, content: &mut xml::encode::Content<impl io::Write> ) -> Result<(), io::Error>407     fn write_xml(
408         &self,
409         content: &mut xml::encode::Content<impl io::Write>
410     ) -> Result<(), io::Error> {
411         content.element(PUBLISH.into_unqualified())?
412             .attr("uri", &self.uri)?
413             .attr("hash", &self.hash)?
414             .content(|content| {
415                 content.base64(self.data.as_ref())
416             })?;
417         Ok(())
418     }
419 }
420 
421 
422 //------------ WithdrawElement -----------------------------------------------
423 
424 /// An RPKI object is to be delete.
425 ///
426 /// This type defines an RRDP update element as found in RRDP deltas.  It is
427 /// like a [`PublishElement`] except that it removes an existing object.
428 #[derive(Clone, Debug, Eq, PartialEq)]
429 pub struct WithdrawElement {
430     /// The URI of the object to be deleted.
431     uri: uri::Rsync,
432 
433     /// The SHA-256 hash of the content of the object to be deleted.
434     hash: Hash,
435 }
436 
437 impl WithdrawElement {
438     /// Creates a new withdraw element from a URI and content hash.
new(uri: uri::Rsync, hash: Hash) -> Self439     pub fn new(uri: uri::Rsync, hash: Hash) -> Self {
440         WithdrawElement { uri, hash }
441     }
442 
443     /// Returns the URI of the object to be deleted.
uri(&self) -> &uri::Rsync444     pub fn uri(&self) -> &uri::Rsync {
445         &self.uri
446     }
447 
448     /// Returns the hash over the content of the object to be deleted.
hash(&self) -> &Hash449     pub fn hash(&self) -> &Hash {
450         &self.hash
451     }
452 
453     /// Converts the withdraw element into its URI and hash.
unpack(self) -> (uri::Rsync, Hash)454     pub fn unpack(self) -> (uri::Rsync, Hash) {
455         (self.uri, self.hash)
456     }
457 }
458 
459 impl WithdrawElement {
460     /// Writes the withdraw element’s XML.
write_xml( &self, content: &mut xml::encode::Content<impl io::Write> ) -> Result<(), io::Error>461     fn write_xml(
462         &self,
463         content: &mut xml::encode::Content<impl io::Write>
464     ) -> Result<(), io::Error> {
465         content.element(WITHDRAW.into_unqualified())?
466             .attr("uri", &self.uri)?
467             .attr("hash", &self.hash)?;
468         Ok(())
469     }
470 }
471 
472 
473 //------------ DeltaElement --------------------------------------------------
474 
475 /// A single element of a RRDP delta.
476 #[derive(Clone, Debug, Eq, PartialEq)]
477 pub enum DeltaElement {
478     /// The element publishes a new object.
479     Publish(PublishElement),
480 
481     /// The element updates an existing object.
482     Update(UpdateElement),
483 
484     /// The element deletes an existing object.
485     Withdraw(WithdrawElement)
486 }
487 
488 impl DeltaElement {
489     /// Writes the element’s XML.
write_xml( &self, content: &mut xml::encode::Content<impl io::Write> ) -> Result<(), io::Error>490     fn write_xml(
491         &self,
492         content: &mut xml::encode::Content<impl io::Write>
493     ) -> Result<(), io::Error> {
494         match self {
495             DeltaElement::Publish(p) => p.write_xml(content),
496             DeltaElement::Update(u) => u.write_xml(content),
497             DeltaElement::Withdraw(w) => w.write_xml(content)
498         }
499     }
500 }
501 
502 ///--- From
503 
504 impl From<PublishElement> for DeltaElement {
from(src: PublishElement) -> Self505     fn from(src: PublishElement) -> Self {
506         DeltaElement::Publish(src)
507     }
508 }
509 
510 impl From<UpdateElement> for DeltaElement {
from(src: UpdateElement) -> Self511     fn from(src: UpdateElement) -> Self {
512         DeltaElement::Update(src)
513     }
514 }
515 
516 impl From<WithdrawElement> for DeltaElement {
from(src: WithdrawElement) -> Self517     fn from(src: WithdrawElement) -> Self {
518         DeltaElement::Withdraw(src)
519     }
520 }
521 
522 
523 //------------ Snapshot ------------------------------------------------------
524 
525 /// An RRDP snapshot.
526 ///
527 /// This type represents an owned RRDP snapshot containing the RRDP session
528 /// ID, serial number and all published elements.
529 #[derive(Clone, Debug, Eq, PartialEq)]
530 pub struct Snapshot {
531     /// The RRDP session of this snapshot.
532     session_id: Uuid,
533 
534     /// The serial number of the update of this snapshot.
535     serial: u64,
536 
537     /// The objects published through this snapshot.
538     elements: Vec<PublishElement>,
539 }
540 
541 impl Snapshot {
542     /// Creates a new snapshot from its components.
new( session_id: Uuid, serial: u64, elements: Vec<PublishElement>, ) -> Self543     pub fn new(
544         session_id: Uuid,
545         serial: u64,
546         elements: Vec<PublishElement>,
547     ) -> Self {
548         Snapshot { session_id, serial, elements }
549     }
550 
551     /// Returns the session ID of this snapshot.
session_id(&self) -> Uuid552     pub fn session_id(&self) -> Uuid {
553         self.session_id
554     }
555 
556     /// Returns the serial number of the update represented by this snapshot.
serial(&self) -> u64557     pub fn serial(&self) -> u64 {
558         self.serial
559     }
560 
561     /// Returns the list of objects published by the snapshot.
elements(&self) -> &[PublishElement]562     pub fn elements(&self) -> &[PublishElement] {
563         &self.elements
564     }
565 
566     /// Converts the snapshots into its elements.
into_elements(self) -> Vec<PublishElement>567     pub fn into_elements(self) -> Vec<PublishElement> {
568         self.elements
569     }
570 }
571 
572 /// # XML Support
573 ///
574 impl Snapshot {
575     /// Parses the snapshot from its XML representation.
parse<R: io::BufRead>( reader: R ) -> Result<Self, ProcessError>576     pub fn parse<R: io::BufRead>(
577         reader: R
578     ) -> Result<Self, ProcessError> {
579         let mut builder = SnapshotBuilder {
580             session_id: None,
581             serial: None,
582             elements: vec![]
583         };
584 
585         builder.process(reader)?;
586         builder.try_into()
587     }
588 
589     /// Writes the snapshot’s XML representation.
write_xml( &self, writer: &mut impl io::Write ) -> Result<(), io::Error>590     pub fn write_xml(
591         &self, writer: &mut impl io::Write
592     ) -> Result<(), io::Error> {
593         let mut writer = xml::encode::Writer::new(writer);
594         writer.element(SNAPSHOT.into_unqualified())?
595             .attr("xmlns", NS)?
596             .attr("version", "1")?
597             .attr("session_id", &self.session_id)?
598             .attr("serial", &self.serial)?
599             .content(|content| {
600                 for el in &self.elements {
601                     el.write_xml(content)?;
602                 }
603                 Ok(())
604             })?;
605         writer.done()
606     }
607 }
608 
609 
610 //------------ SnapshotBuilder -----------------------------------------------
611 
612 struct SnapshotBuilder {
613     session_id: Option<Uuid>,
614     serial: Option<u64>,
615     elements: Vec<PublishElement>,
616 }
617 
618 impl ProcessSnapshot for SnapshotBuilder {
619     type Err = ProcessError;
620 
meta(&mut self, session_id: Uuid, serial: u64) -> Result<(), Self::Err>621     fn meta(&mut self, session_id: Uuid, serial: u64) -> Result<(), Self::Err> {
622         self.session_id = Some(session_id);
623         self.serial = Some(serial);
624         Ok(())
625     }
626 
publish(&mut self, uri: uri::Rsync, data: &mut ObjectReader) -> Result<(), Self::Err>627     fn publish(&mut self, uri: uri::Rsync, data: &mut ObjectReader) -> Result<(), Self::Err> {
628         let mut buf = Vec::new();
629         data.read_to_end(&mut buf)?;
630         let data = Bytes::from(buf);
631         let element = PublishElement { uri, data };
632         self.elements.push(element);
633         Ok(())
634     }
635 }
636 
637 impl TryFrom<SnapshotBuilder> for Snapshot {
638     type Error = ProcessError;
639 
try_from(builder: SnapshotBuilder) -> Result<Self, Self::Error>640     fn try_from(builder: SnapshotBuilder) -> Result<Self, Self::Error> {
641         let session_id = builder.session_id.ok_or(
642             ProcessError::Xml(XmlError::Malformed)
643         )?;
644 
645         let serial = builder.serial.ok_or(
646             ProcessError::Xml(XmlError::Malformed)
647         )?;
648 
649         Ok(Snapshot { session_id, serial, elements: builder.elements })
650     }
651 }
652 
653 
654 //------------ ProcessSnapshot -----------------------------------------------
655 
656 /// A type that can process an RRDP snapshot.
657 ///
658 /// The trait contains two required methods: [`meta`][Self::meta] is called
659 /// once at the beginning of the snapshot and gives the processor a chance to
660 /// check if the session ID and serial number are as expected. Then,
661 /// [`publish`][Self::publish] is called for each published object.
662 /// The processor can abort at any time by returning an error.
663 ///
664 /// The provided method [`process`][Self::process] drives the actual
665 /// processing and should thus be called when using a type that implements
666 /// this trait.
667 pub trait ProcessSnapshot {
668     /// The error type returned by the processor.
669     type Err: From<ProcessError>;
670 
671     /// Processes the snapshot meta data.
672     ///
673     /// The method is called before any other method and is passed the
674     /// session ID and serial number encountered in the outermost tag of the
675     /// snapshot file’s XML. If they don’t match the expected values, the
676     /// processor should abort processing by returning an error.
meta( &mut self, session_id: Uuid, serial: u64, ) -> Result<(), Self::Err>677     fn meta(
678         &mut self,
679         session_id: Uuid,
680         serial: u64,
681     ) -> Result<(), Self::Err>;
682 
683     /// Processes a published object.
684     ///
685     /// The object is identified by the provided rsync URI. The object’s data
686     /// is provided via a reader.
publish( &mut self, uri: uri::Rsync, data: &mut ObjectReader, ) -> Result<(), Self::Err>687     fn publish(
688         &mut self,
689         uri: uri::Rsync,
690         data: &mut ObjectReader,
691     ) -> Result<(), Self::Err>;
692 
693     /// Processes a snapshot file.
694     ///
695     /// The file’s content is read from `reader`. The two required methods
696     /// are called as appropriate. If the reader fails, parsing fails, or the
697     /// methods return an error, processing is aborted and an error is
698     /// returned.
process<R: io::BufRead>( &mut self, reader: R ) -> Result<(), Self::Err>699     fn process<R: io::BufRead>(
700         &mut self,
701         reader: R
702     ) -> Result<(), Self::Err> {
703         let mut reader = Reader::new(reader);
704 
705         let mut session_id = None;
706         let mut serial = None;
707         let mut outer = reader.start(|element| {
708             if element.name() != SNAPSHOT {
709                 info!("Bad outer: not snapshot, but {:?}", element.name());
710                 return Err(XmlError::Malformed)
711             }
712             element.attributes(|name, value| match name {
713                 b"version" => {
714                     if value.ascii_into::<u8>()? != 1 {
715                         info!("Bad version");
716                         return Err(XmlError::Malformed)
717                     }
718                     Ok(())
719                 }
720                 b"session_id" => {
721                     session_id = Some(value.ascii_into()?);
722                     Ok(())
723                 }
724                 b"serial" => {
725                     serial = Some(value.ascii_into()?);
726                     Ok(())
727                 }
728                 _ => {
729                     info!("Bad attribute on snapshot.");
730                     Err(XmlError::Malformed)
731                 }
732             })
733         }).map_err(Into::into)?;
734 
735         match (session_id, serial) {
736             (Some(session_id), Some(serial)) => {
737                 self.meta(session_id, serial)?;
738             }
739             _ => {
740                 info!("Missing session or serial");
741                 return Err(ProcessError::malformed().into())
742             }
743         }
744 
745         loop {
746             let mut uri = None;
747             let inner = outer.take_opt_element(&mut reader, |element| {
748                 if element.name() != PUBLISH {
749                 info!("Bad inner: not publish");
750                     return Err(ProcessError::malformed())
751                 }
752                 element.attributes(|name, value| match name {
753                     b"uri" => {
754                         uri = Some(value.ascii_into()?);
755                         Ok(())
756                     }
757                     _ => {
758                         info!("Bad attribute on publish.");
759                         Err(ProcessError::malformed())
760                     }
761                 })
762             })?;
763             let mut inner = match inner {
764                 Some(inner) => inner,
765                 None => break
766             };
767             let uri = match uri {
768                 Some(uri) => uri,
769                 None => return Err(ProcessError::malformed().into())
770             };
771             ObjectReader::process(&mut inner, &mut reader, |reader| {
772                 self.publish(uri, reader)
773             })?;
774         }
775 
776         outer.take_end(&mut reader).map_err(Into::into)?;
777         reader.end().map_err(Into::into)?;
778         Ok(())
779     }
780 }
781 
782 
783 //------------ Delta ---------------------------------------------------------
784 
785 /// An RRDP delta.
786 ///
787 /// This type represents an owned RRDP snapshot containing the RRDP session
788 /// ID, serial number and all its elements.
789 #[derive(Clone, Debug, Eq, PartialEq)]
790 pub struct Delta {
791     /// The RRDP session ID of the delta.
792     session_id: Uuid,
793 
794     /// The serial number of this delta.
795     serial: u64,
796 
797     /// The objects changed by this delta.
798     elements: Vec<DeltaElement>
799 }
800 
801 /// # Data Access
802 ///
803 impl Delta {
804     /// Creates a new delta from session ID, serial number, and elements.
new( session_id: Uuid, serial: u64, elements: Vec<DeltaElement> ) -> Self805     pub fn new(
806         session_id: Uuid,
807         serial: u64,
808         elements: Vec<DeltaElement>
809     ) -> Self {
810         Delta { session_id, serial, elements }
811     }
812 
813     /// Returns the session ID of the RRDP session this delta is part of.
session_id(&self) -> Uuid814     pub fn session_id(&self) -> Uuid {
815         self.session_id
816     }
817 
818     /// Returns the serial number of this delta.
819     ///
820     /// The serial number is identical to that of the snapshot this delta
821     /// updates _to._
serial(&self) -> u64822     pub fn serial(&self) -> u64 {
823         self.serial
824     }
825 
826     /// The list of objects changed by this delta.
elements(&self) -> &[DeltaElement]827     pub fn elements(&self) -> &[DeltaElement] {
828         &self.elements
829     }
830 
831     /// Converts the delta into its elements.
into_elements(self) -> Vec<DeltaElement>832     pub fn into_elements(self) -> Vec<DeltaElement> {
833         self.elements
834     }
835 }
836 
837 /// # Decoding and Encoding XML
838 ///
839 impl Delta {
840     /// Parses the delta from its XML representation.
parse<R: io::BufRead>( reader: R ) -> Result<Self, ProcessError>841     pub fn parse<R: io::BufRead>(
842         reader: R
843     ) -> Result<Self, ProcessError> {
844         let mut builder = DeltaBuilder {
845             session_id: None,
846             serial: None,
847             elements: vec![]
848         };
849 
850         builder.process(reader)?;
851         builder.try_into()
852     }
853 
854     /// Write the delta’s XML representation.
write_xml( &self, writer: &mut impl io::Write ) -> Result<(), io::Error>855     pub fn write_xml(
856         &self, writer: &mut impl io::Write
857     ) -> Result<(), io::Error> {
858         let mut writer = xml::encode::Writer::new(writer);
859         writer.element(DELTA.into_unqualified())?
860             .attr("xmlns", NS)?
861             .attr("version", "1")?
862             .attr("session_id", &self.session_id)?
863             .attr("serial", &self.serial)?
864             .content(|content| {
865                 for el in &self.elements {
866                     el.write_xml(content)?;
867                 }
868                 Ok(())
869             })?;
870         writer.done()
871     }
872 }
873 
874 
875 //------------ DeltaBuilder --------------------------------------------------
876 
877 struct DeltaBuilder {
878     session_id: Option<Uuid>,
879     serial: Option<u64>,
880     elements: Vec<DeltaElement>
881 }
882 
883 impl ProcessDelta for DeltaBuilder {
884     type Err = ProcessError;
885 
meta( &mut self, session_id: Uuid, serial: u64, ) -> Result<(), Self::Err>886     fn meta(
887         &mut self,
888         session_id: Uuid,
889         serial: u64,
890     ) -> Result<(), Self::Err> {
891         self.session_id = Some(session_id);
892         self.serial = Some(serial);
893         Ok(())
894     }
895 
publish( &mut self, uri: uri::Rsync, hash_opt: Option<Hash>, data: &mut ObjectReader, ) -> Result<(), Self::Err>896     fn publish(
897         &mut self,
898         uri: uri::Rsync,
899         hash_opt: Option<Hash>,
900         data: &mut ObjectReader,
901     ) -> Result<(), Self::Err> {
902         let mut buf = Vec::new();
903         data.read_to_end(&mut buf)?;
904         let data = Bytes::from(buf);
905         match hash_opt {
906             Some(hash) => {
907                 let update = UpdateElement { uri, hash, data};
908                 self.elements.push(DeltaElement::Update(update));
909             },
910             None => {
911                 let publish = PublishElement { uri, data};
912                 self.elements.push(DeltaElement::Publish(publish));
913             }
914         }
915         Ok(())
916     }
917 
withdraw( &mut self, uri: uri::Rsync, hash: Hash, ) -> Result<(), Self::Err>918     fn withdraw(
919         &mut self,
920         uri: uri::Rsync,
921         hash: Hash,
922     ) -> Result<(), Self::Err> {
923         let withdraw = WithdrawElement { uri, hash };
924         self.elements.push(DeltaElement::Withdraw(withdraw));
925         Ok(())
926     }
927 }
928 
929 impl TryFrom<DeltaBuilder> for Delta {
930     type Error = ProcessError;
931 
try_from(builder: DeltaBuilder) -> Result<Self, Self::Error>932     fn try_from(builder: DeltaBuilder) -> Result<Self, Self::Error> {
933         let session_id = builder.session_id.ok_or(
934             ProcessError::Xml(XmlError::Malformed)
935         )?;
936 
937         let serial = builder.serial.ok_or(
938             ProcessError::Xml(XmlError::Malformed)
939         )?;
940 
941         Ok(Delta { session_id, serial, elements: builder.elements })
942 
943     }
944 }
945 
946 //------------ ProcessDelta --------------------------------------------------
947 
948 /// A type that can process an RRDP delta.
949 ///
950 /// The trait contains three required methods: [`meta`][Self::meta] is called
951 /// once at the beginning of the snapshot and gives the processor a chance to
952 /// check if the session ID and serial number are as expected. Then,
953 /// [`publish`][Self::publish] is called for each newly published or
954 /// updated object and [`withdraw`][Self::withdraw] is called for each
955 /// deleted object. The processor can abort at any time by returning an error.
956 ///
957 /// The provided method [`process`][Self::process] drives the actual
958 /// processing and should thus be called when using a type that implements
959 /// this trait.
960 pub trait ProcessDelta {
961     /// The error type returned by the processor.
962     type Err: From<ProcessError>;
963 
964     /// Processes the delta meta data.
965     ///
966     /// The method is called before any other method and is passed the
967     /// session ID and serial number encountered in the outermost tag of the
968     /// delta file’s XML. If they don’t match the expected values, the
969     /// processor should abort processing by returning an error.
meta( &mut self, session_id: Uuid, serial: u64, ) -> Result<(), Self::Err>970     fn meta(
971         &mut self,
972         session_id: Uuid,
973         serial: u64,
974     ) -> Result<(), Self::Err>;
975 
976     /// Processes a published object.
977     ///
978     /// The object is identified by the rsync URI provided in `uri`. If the
979     /// object is updated, the hash over the previous content of the object
980     /// is given in `hash`. If the object is newly published, `hash` will be
981     /// `None`. The (new) content of the object is provided via the reader in
982     /// `data`.
publish( &mut self, uri: uri::Rsync, hash: Option<Hash>, data: &mut ObjectReader, ) -> Result<(), Self::Err>983     fn publish(
984         &mut self,
985         uri: uri::Rsync,
986         hash: Option<Hash>,
987         data: &mut ObjectReader,
988     ) -> Result<(), Self::Err>;
989 
990     /// Processes a withdrawn object.
991     ///
992     /// The object is identified by the rsync URI provided in `uri`. The hash
993     /// over the expected content of the object to be deleted is given in
994     /// `hash`.
withdraw( &mut self, uri: uri::Rsync, hash: Hash, ) -> Result<(), Self::Err>995     fn withdraw(
996         &mut self,
997         uri: uri::Rsync,
998         hash: Hash,
999     ) -> Result<(), Self::Err>;
1000 
1001 
1002     /// Processes a delta file.
1003     ///
1004     /// The file’s content is taken from `reader`. The content is parsed and
1005     /// the three required methods are called as required.
1006     ///
1007     /// If the reader fails, parsing fails, or the methods return an error,
1008     /// processing is aborted and an error is returned.
process<R: io::BufRead>( &mut self, reader: R ) -> Result<(), Self::Err>1009     fn process<R: io::BufRead>(
1010         &mut self,
1011         reader: R
1012     ) -> Result<(), Self::Err> {
1013         let mut reader = Reader::new(reader);
1014 
1015         let mut session_id = None;
1016         let mut serial = None;
1017         let mut outer = reader.start(|element| {
1018             if element.name() != DELTA {
1019                 return Err(ProcessError::malformed())
1020             }
1021             element.attributes(|name, value| match name {
1022                 b"version" => {
1023                     if value.ascii_into::<u8>()? != 1 {
1024                         return Err(ProcessError::malformed())
1025                     }
1026                     Ok(())
1027                 }
1028                 b"session_id" => {
1029                     session_id = Some(value.ascii_into()?);
1030                     Ok(())
1031                 }
1032                 b"serial" => {
1033                     serial = Some(value.ascii_into()?);
1034                     Ok(())
1035                 }
1036                 _ => Err(ProcessError::malformed())
1037             })
1038         })?;
1039 
1040         match (session_id, serial) {
1041             (Some(session_id), Some(serial)) => {
1042                 self.meta(session_id, serial)?;
1043             }
1044             _ => return Err(ProcessError::malformed().into())
1045         }
1046 
1047         loop {
1048             let mut action = None;
1049             let mut uri = None;
1050             let mut hash = None;
1051             let inner = outer.take_opt_element(&mut reader, |element| {
1052                 match element.name() {
1053                     PUBLISH => action = Some(Action::Publish),
1054                     WITHDRAW => action = Some(Action::Withdraw),
1055                     _ => return Err(ProcessError::malformed()),
1056                 };
1057                 element.attributes(|name, value| match name {
1058                     b"uri" => {
1059                         uri = Some(value.ascii_into()?);
1060                         Ok(())
1061                     }
1062                     b"hash" => {
1063                         hash = Some(value.ascii_into()?);
1064                         Ok(())
1065                     }
1066                     _ => Err(ProcessError::malformed())
1067                 })
1068             })?;
1069             let mut inner = match inner {
1070                 Some(inner) => inner,
1071                 None => break
1072             };
1073             let uri = match uri {
1074                 Some(uri) => uri,
1075                 None => return Err(ProcessError::malformed().into())
1076             };
1077             match action.unwrap() { // Or we'd have exited already.
1078                 Action::Publish => {
1079                     ObjectReader::process(
1080                         &mut inner, &mut reader,
1081                         |reader| self.publish(uri, hash, reader)
1082                     )?;
1083                 }
1084                 Action::Withdraw => {
1085                     let hash = match hash {
1086                         Some(hash) => hash,
1087                         None => return Err(ProcessError::malformed().into())
1088                     };
1089                     self.withdraw(uri, hash)?;
1090                     inner.take_end(&mut reader).map_err(Into::into)?;
1091                 }
1092             }
1093         }
1094         outer.take_end(&mut reader).map_err(Into::into)?;
1095         reader.end().map_err(Into::into)?;
1096         Ok(())
1097     }
1098 
1099 }
1100 
1101 
1102 //------------ SnapshotInfo --------------------------------------------------
1103 
1104 /// The URI and HASH of the current snapshot for a [`NotificationFile`].
1105 pub type SnapshotInfo = UriAndHash;
1106 
1107 
1108 //------------ DeltaInfo -----------------------------------------------------
1109 
1110 /// The serial, URI and HASH of a delta in a [`NotificationFile`].
1111 #[derive(Clone, Debug, Eq, PartialEq)]
1112 pub struct DeltaInfo {
1113     serial: u64,
1114     uri_and_hash: UriAndHash
1115 }
1116 
1117 impl DeltaInfo {
1118     /// Creates a new info from its components.
new(serial: u64, uri: uri::Https, hash: Hash) -> Self1119     pub fn new(serial: u64, uri: uri::Https, hash: Hash) -> Self {
1120         DeltaInfo {
1121             serial,
1122             uri_and_hash: UriAndHash::new(uri, hash)
1123         }
1124     }
1125 
1126     /// Returns the serial number of this delta.
serial(&self) -> u641127     pub fn serial(&self) -> u64 {
1128         self.serial
1129     }
1130 }
1131 
1132 impl Deref for DeltaInfo {
1133     type Target = UriAndHash;
1134 
deref(&self) -> &Self::Target1135     fn deref(&self) -> &Self::Target {
1136         &self.uri_and_hash
1137     }
1138 }
1139 
1140 //------------ UriAndHash ----------------------------------------------------
1141 
1142 /// The URI of an RRDP file and a SHA-256 hash over its content.
1143 ///
1144 /// In order to detect accidental or malicious modifications of the data
1145 /// all references to RRDP files are given with a SHA-256 hash over the
1146 /// expected content of that file, allowing a client to verify they got the
1147 /// right file.
1148 #[derive(Clone, Debug, Eq, PartialEq)]
1149 pub struct UriAndHash {
1150     /// The URI of the RRDP file.
1151     uri: uri::Https,
1152 
1153     /// The expected SHA-256 hash over the file’s content.
1154     hash: Hash,
1155 }
1156 
1157 impl UriAndHash {
1158     /// Creates a new URI-and-hash pair.
new(uri: uri::Https, hash: Hash) -> Self1159     pub fn new(uri: uri::Https, hash: Hash) -> Self {
1160         UriAndHash { uri, hash }
1161     }
1162 
1163     /// Returns a reference to the URI.
uri(&self) -> &uri::Https1164     pub fn uri(&self) -> &uri::Https {
1165         &self.uri
1166     }
1167 
1168     /// Returns the expected SHA-256 hash.
hash(&self) -> Hash1169     pub fn hash(&self) -> Hash {
1170         self.hash
1171     }
1172 
1173     /// Converts the pair into just the URI.
into_uri(self) -> uri::Https1174     pub fn into_uri(self) -> uri::Https {
1175         self.uri
1176     }
1177 
1178     /// Converts `self` into a pair of URI and hash.
into_pair(self) -> (uri::Https, Hash)1179     pub fn into_pair(self) -> (uri::Https, Hash) {
1180         (self.uri, self.hash)
1181     }
1182 }
1183 
1184 
1185 //------------ Hash ----------------------------------------------------------
1186 
1187 /// A hash over RRDP data.
1188 ///
1189 /// This hash is used both for verifying the correctness of RRDP files as well
1190 /// as update or deletion of the right objects.
1191 ///
1192 /// RRDP exclusively uses SHA-256 and provides no means of choosing a different
1193 /// algorithm. Consequently, this type is a wrapper around a 32 byte array
1194 /// holding SHA-256 output.
1195 #[derive(Clone, Copy, Eq, hash::Hash, PartialEq)]
1196 #[repr(transparent)] // ensure that size_of::<Hash>() == 32.
1197 pub struct Hash([u8; 32]);
1198 
1199 impl Hash {
1200     /// Returns a reference to the octets as a slice.
as_slice(&self) -> &[u8]1201     pub fn as_slice(&self) -> &[u8] {
1202         self.0.as_ref()
1203     }
1204 
1205     /// Returns a new Hash from the provided data
from_data(data: &[u8]) -> Self1206     pub fn from_data(data: &[u8]) -> Self {
1207         let digest = digest::digest(&digest::SHA256, data);
1208         Self::try_from(digest.as_ref()).unwrap()
1209     }
1210 
1211     /// Returns whether this hash matches the provided data
matches(&self, data: &[u8]) -> bool1212     pub fn matches(&self, data: &[u8]) -> bool {
1213         let data_hash = Self::from_data(data);
1214         *self == data_hash
1215     }
1216 }
1217 
1218 
1219 //--- From, TryFrom, and FromStr
1220 
1221 impl From<[u8;32]> for Hash {
from(value: [u8;32]) -> Hash1222     fn from(value: [u8;32]) -> Hash {
1223         Hash(value)
1224     }
1225 }
1226 
1227 impl From<Hash> for [u8; 32] {
from(src: Hash) -> Self1228     fn from(src: Hash) -> Self {
1229         src.0
1230     }
1231 }
1232 
1233 impl<'a> TryFrom<&'a [u8]> for Hash {
1234     type Error = std::array::TryFromSliceError;
1235 
try_from(src: &'a [u8]) -> Result<Self, Self::Error>1236     fn try_from(src: &'a [u8]) -> Result<Self, Self::Error> {
1237         TryFrom::try_from(src).map(Hash)
1238     }
1239 }
1240 
1241 impl TryFrom<digest::Digest> for Hash {
1242     type Error = AlgorithmError;
1243 
try_from(digest: digest::Digest) -> Result<Self, Self::Error>1244     fn try_from(digest: digest::Digest) -> Result<Self, Self::Error> {
1245         // XXX This doesn’t properly check the algorithm.
1246         TryFrom::try_from(
1247             digest.as_ref()
1248         ).map(Hash).map_err(|_| AlgorithmError(()))
1249     }
1250 }
1251 
1252 impl str::FromStr for Hash {
1253     type Err = ParseHashError;
1254 
1255     /// Parses a string into a hash.
1256     ///
1257     /// The string must consist of exactly 64 hexadecimal digits and nothing
1258     /// else.
from_str(s: &str) -> Result<Self, Self::Err>1259     fn from_str(s: &str) -> Result<Self, Self::Err> {
1260         if s.len() != 64 {
1261             return Err(ParseHashError::BAD_LENGTH)
1262         }
1263         let mut res = [0u8; 32];
1264         let mut s = s.chars();
1265         for octet in &mut res {
1266             let first = s.next().ok_or(
1267                 ParseHashError::BAD_LENGTH
1268             )?.to_digit(16).ok_or(
1269                 ParseHashError::BAD_CHARS
1270             )?;
1271             let second = s.next().ok_or(
1272                 ParseHashError::BAD_LENGTH
1273             )?.to_digit(16).ok_or(
1274                 ParseHashError::BAD_CHARS
1275             )?;
1276             *octet = (first << 4 | second) as u8;
1277         }
1278         Ok(Hash(res))
1279     }
1280 }
1281 
1282 
1283 //--- AsRef
1284 
1285 impl AsRef<[u8]> for Hash {
as_ref(&self) -> &[u8]1286     fn as_ref(&self) -> &[u8] {
1287         self.0.as_ref()
1288     }
1289 }
1290 
1291 
1292 //--- PartialEq
1293 //
1294 // PartialEq<Self> and Eq are derived.
1295 
1296 impl PartialEq<digest::Digest> for Hash {
eq(&self, other: &digest::Digest) -> bool1297     fn eq(&self, other: &digest::Digest) -> bool {
1298         // XXX This doesn’t properly check the algorithm.
1299         self.0.as_ref() == other.as_ref()
1300     }
1301 }
1302 
1303 
1304 //--- Display and Debug
1305 
1306 impl fmt::Display for Hash {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1307     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1308         for &ch in self.as_slice() {
1309             write!(f, "{:02x}", ch)?;
1310         }
1311         Ok(())
1312     }
1313 }
1314 
1315 impl fmt::Debug for Hash {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1316     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1317         write!(f, "Hash({})", self)
1318     }
1319 }
1320 
1321 //--- Serialize and Deserialize
1322 
1323 #[cfg(feature = "serde")]
1324 impl Serialize for Hash {
serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: Serializer1325     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1326     where S: Serializer {
1327         self.to_string().serialize(serializer)
1328     }
1329 }
1330 
1331 #[cfg(feature = "serde")]
1332 impl<'de> Deserialize<'de> for Hash {
deserialize<D>(deserializer: D) -> Result<Self, D::Error> where D: Deserializer<'de>1333     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1334     where D: Deserializer<'de> {
1335         let hex_str = String::deserialize(deserializer)?;
1336         Hash::from_str(&hex_str).map_err(serde::de::Error::custom)
1337     }
1338 }
1339 
1340 
1341 //------------ Action --------------------------------------------------------
1342 
1343 /// The choice of actions in a delta file.
1344 enum Action {
1345     /// An object is to be inserted or updated.
1346     ///
1347     /// The object is to be updated, if a hash is given. Otherwise it is
1348     /// to be inserted.
1349     Publish,
1350 
1351     /// An object is to be deleted.
1352     Withdraw,
1353 }
1354 
1355 
1356 //------------ ObjectReader --------------------------------------------------
1357 
1358 /// A reader providing the content of an object.
1359 ///
1360 /// The content is included in base64 encoding in the RRDP’s XML. This reader
1361 /// provides access to the decoded data via the standard `Read` trait.
1362 pub struct ObjectReader<'a>(
1363     /// The base64 encoded data.
1364     base64::read::DecoderReader<'a, &'a [u8]>
1365 );
1366 
1367 impl<'a> ObjectReader<'a> {
1368     /// Processes an element with optional XML PCDATA as object content.
1369     ///
1370     /// An object reader is created and passed to the closure `op` for
1371     /// actual processing.
1372     ///
1373     /// This method expects the next XML event to either be text or the end
1374     /// of an element. It will process both.
process<R, T, E, F> ( content: &mut Content, reader: &mut Reader<R>, op: F ) -> Result<T, E> where R: io::BufRead, E: From<ProcessError>, F: FnOnce(&mut ObjectReader) -> Result<T, E>1375     fn process<R, T, E, F> (
1376         content: &mut Content,
1377         reader: &mut Reader<R>,
1378         op: F
1379     ) -> Result<T, E>
1380     where
1381         R: io::BufRead,
1382         E: From<ProcessError>,
1383         F: FnOnce(&mut ObjectReader) -> Result<T, E>
1384     {
1385         // XXX This could probably do with a bit of optimization.
1386         let data_b64 = content.take_opt_final_text(reader, |text| {
1387             // The text is supposed to be xsd:base64Binary which only allows
1388             // the base64 characters plus whitespace.
1389             Ok(text.to_ascii()?.as_bytes().iter().filter_map(|b| {
1390                     if b.is_ascii_whitespace() { None }
1391                     else { Some(*b) }
1392             }).collect::<Vec<_>>())
1393         })?.unwrap_or_else(Vec::new);
1394         let mut data_b64 = data_b64.as_slice();
1395         op(
1396             &mut ObjectReader(base64::read::DecoderReader::new(
1397                 &mut data_b64, base64::STANDARD
1398             ))
1399         )
1400     }
1401 }
1402 
1403 impl<'a> io::Read for ObjectReader<'a> {
read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error>1404     fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
1405         self.0.read(buf)
1406     }
1407 }
1408 
1409 
1410 //------------ XML Names -----------------------------------------------------
1411 
1412 const NS: &[u8] = b"http://www.ripe.net/rpki/rrdp";
1413 const NOTIFICATION: Name = Name::qualified(NS, b"notification");
1414 const SNAPSHOT: Name = Name::qualified(NS, b"snapshot");
1415 const DELTA: Name = Name::qualified(NS, b"delta");
1416 const PUBLISH: Name = Name::qualified(NS, b"publish");
1417 const WITHDRAW: Name = Name::qualified(NS, b"withdraw");
1418 
1419 
1420 //============ Errors ========================================================
1421 
1422 //------------ AlgorithmError ------------------------------------------------
1423 
1424 /// A digest was of the wrong algorithm.
1425 #[derive(Clone, Copy, Debug)]
1426 pub struct AlgorithmError(());
1427 
1428 impl fmt::Display for AlgorithmError {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1429     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1430         f.write_str("algorithm mismatch")
1431     }
1432 }
1433 
1434 impl error::Error for AlgorithmError { }
1435 
1436 
1437 //------------ ParseHashError ------------------------------------------------
1438 
1439 /// An error happened while parsing a hash.
1440 #[derive(Clone, Copy, Debug)]
1441 pub struct ParseHashError(&'static str);
1442 
1443 impl ParseHashError {
1444     /// The error when the hash value was of the wrong length.
1445     const BAD_LENGTH: Self = ParseHashError("invalid length");
1446 
1447     /// The error when the hash value contained illegal characters.
1448     const BAD_CHARS: Self = ParseHashError("invalid characters");
1449 }
1450 
1451 impl fmt::Display for ParseHashError {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1452     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1453         f.write_str(self.0)
1454     }
1455 }
1456 
1457 impl error::Error for ParseHashError { }
1458 
1459 
1460 //------------ ProcessError --------------------------------------------------
1461 
1462 /// An error occurred while processing RRDP data.
1463 #[derive(Debug)]
1464 pub enum ProcessError {
1465     /// An IO error happened.
1466     Io(io::Error),
1467 
1468     /// The XML was not correctly formed.
1469     Xml(XmlError),
1470 }
1471 
1472 impl ProcessError {
1473     /// Creates an error when the XML was malformed.
malformed() -> Self1474     fn malformed() -> Self {
1475         ProcessError::Xml(XmlError::Malformed)
1476     }
1477 }
1478 
1479 impl From<io::Error> for ProcessError {
from(err: io::Error) -> Self1480     fn from(err: io::Error) -> Self {
1481         ProcessError::Io(err)
1482     }
1483 }
1484 
1485 impl From<XmlError> for ProcessError {
from(err: XmlError) -> Self1486     fn from(err: XmlError) -> Self {
1487         ProcessError::Xml(err)
1488     }
1489 }
1490 
1491 impl fmt::Display for ProcessError {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1492     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1493         match self {
1494             ProcessError::Io(ref inner) => inner.fmt(f),
1495             ProcessError::Xml(ref inner) => inner.fmt(f),
1496         }
1497     }
1498 }
1499 
1500 impl error::Error for ProcessError { }
1501 
1502 
1503 //============ Tests =========================================================
1504 
1505 #[cfg(test)]
1506 mod test {
1507     use std::str::from_utf8_unchecked;
1508 
1509     use super::*;
1510 
1511     pub struct Test;
1512 
1513     impl ProcessSnapshot for Test {
1514         type Err = ProcessError;
1515 
meta( &mut self, _session_id: Uuid, _serial: u64, ) -> Result<(), Self::Err>1516         fn meta(
1517             &mut self,
1518             _session_id: Uuid,
1519             _serial: u64,
1520         ) -> Result<(), Self::Err> {
1521             Ok(())
1522         }
1523 
publish( &mut self, _uri: uri::Rsync, _data: &mut ObjectReader, ) -> Result<(), Self::Err>1524         fn publish(
1525             &mut self,
1526             _uri: uri::Rsync,
1527             _data: &mut ObjectReader,
1528         ) -> Result<(), Self::Err> {
1529             Ok(())
1530         }
1531     }
1532 
1533     impl ProcessDelta for Test {
1534         type Err = ProcessError;
1535 
meta( &mut self, _session_id: Uuid, _serial: u64, ) -> Result<(), Self::Err>1536         fn meta(
1537             &mut self,
1538             _session_id: Uuid,
1539             _serial: u64,
1540         ) -> Result<(), Self::Err> {
1541             Ok(())
1542         }
1543 
publish( &mut self, _uri: uri::Rsync, _hash: Option<Hash>, _data: &mut ObjectReader, ) -> Result<(), Self::Err>1544         fn publish(
1545             &mut self,
1546             _uri: uri::Rsync,
1547             _hash: Option<Hash>,
1548             _data: &mut ObjectReader,
1549         ) -> Result<(), Self::Err> {
1550             Ok(())
1551         }
1552 
withdraw( &mut self, _uri: uri::Rsync, _hash: Hash, ) -> Result<(), Self::Err>1553         fn withdraw(
1554             &mut self,
1555             _uri: uri::Rsync,
1556             _hash: Hash,
1557         ) -> Result<(), Self::Err> {
1558             Ok(())
1559         }
1560     }
1561 
1562     #[test]
ripe_notification()1563     fn ripe_notification() {
1564         NotificationFile::parse(
1565             include_bytes!("../test-data/ripe-notification.xml").as_ref()
1566         ).unwrap();
1567     }
1568 
1569     #[test]
lolz_notification()1570     fn lolz_notification() {
1571         assert!(
1572             NotificationFile::parse(
1573                 include_bytes!("../test-data/lolz-notification.xml").as_ref()
1574             ).is_err()
1575         );
1576     }
1577 
1578     #[test]
gaps_notification()1579     fn gaps_notification() {
1580         let mut notification_without_gaps =  NotificationFile::parse(
1581             include_bytes!("../test-data/ripe-notification.xml").as_ref()
1582         ).unwrap();
1583         assert!(notification_without_gaps.sort_and_verify_deltas(None));
1584 
1585         let mut notification_with_gaps =  NotificationFile::parse(
1586             include_bytes!("../test-data/ripe-notification-with-gaps.xml").as_ref()
1587         ).unwrap();
1588         assert!(!notification_with_gaps.sort_and_verify_deltas(None));
1589     }
1590 
1591     #[test]
limit_notification_deltas()1592     fn limit_notification_deltas() {
1593         let mut notification_without_gaps =  NotificationFile::parse(
1594             include_bytes!("../test-data/ripe-notification.xml").as_ref()
1595         ).unwrap();
1596         assert!(notification_without_gaps.sort_and_verify_deltas(Some(2)));
1597 
1598         assert_eq!(2, notification_without_gaps.deltas().len());
1599         assert_eq!(notification_without_gaps.deltas().first().unwrap().serial(), notification_without_gaps.serial() - 1);
1600         assert_eq!(notification_without_gaps.deltas().last().unwrap().serial(), notification_without_gaps.serial());
1601     }
1602 
1603 
1604     #[test]
unsorted_notification()1605     fn unsorted_notification() {
1606         let mut from_sorted = NotificationFile::parse(
1607             include_bytes!("../test-data/ripe-notification.xml").as_ref()
1608         ).unwrap();
1609 
1610         let mut from_unsorted = NotificationFile::parse(
1611             include_bytes!("../test-data/ripe-notification-unsorted.xml").as_ref()
1612         ).unwrap();
1613 
1614         assert_ne!(from_sorted, from_unsorted);
1615 
1616         from_unsorted.reverse_sort_deltas();
1617         assert_eq!(from_sorted, from_unsorted);
1618 
1619         from_unsorted.sort_deltas();
1620         assert_ne!(from_sorted, from_unsorted);
1621 
1622         from_sorted.sort_deltas();
1623         assert_eq!(from_sorted, from_unsorted);
1624     }
1625 
1626     #[test]
ripe_snapshot()1627     fn ripe_snapshot() {
1628         <Test as ProcessSnapshot>::process(
1629             &mut Test,
1630             include_bytes!("../test-data/ripe-snapshot.xml").as_ref()
1631         ).unwrap();
1632     }
1633 
1634     #[test]
ripe_delta()1635     fn ripe_delta() {
1636         <Test as ProcessDelta>::process(
1637             &mut Test,
1638             include_bytes!("../test-data/ripe-delta.xml").as_ref()
1639         ).unwrap();
1640     }
1641 
1642     #[test]
hash_to_hash()1643     fn hash_to_hash() {
1644         use std::str::FromStr;
1645 
1646         let string = "this is a test";
1647         let sha256 = "2e99758548972a8e8822ad47fa1017ff72f06f3ff6a016851f45c398732bc50c";
1648         let hash = Hash::from_str(sha256).unwrap();
1649         let hash_from_data = Hash::from_data(string.as_bytes());
1650         assert_eq!(hash, hash_from_data);
1651         assert!(hash.matches(string.as_bytes()));
1652     }
1653 
1654     #[test]
notification_from_to_xml()1655     fn notification_from_to_xml() {
1656         let notification = NotificationFile::parse(
1657             include_bytes!("../test-data/ripe-notification.xml").as_ref()
1658         ).unwrap();
1659 
1660         let mut vec = vec![];
1661         notification.write_xml(&mut vec).unwrap();
1662 
1663         let xml = unsafe {
1664             from_utf8_unchecked(vec.as_ref())
1665         };
1666 
1667         let notification_parsed = NotificationFile::parse(xml.as_bytes()).unwrap();
1668 
1669         assert_eq!(notification, notification_parsed);
1670     }
1671 
1672     #[test]
snapshot_from_to_xml()1673     fn snapshot_from_to_xml() {
1674         let data = include_bytes!("../test-data/ripe-snapshot.xml");
1675         let snapshot = Snapshot::parse(data.as_ref()).unwrap();
1676 
1677         let mut vec = vec![];
1678         snapshot.write_xml(&mut vec).unwrap();
1679 
1680         let xml = unsafe {
1681             from_utf8_unchecked(vec.as_ref())
1682         };
1683 
1684         let snapshot_parsed = Snapshot::parse(xml.as_bytes()).unwrap();
1685 
1686         assert_eq!(snapshot, snapshot_parsed);
1687     }
1688 
1689     #[test]
delta_from_to_xml()1690     fn delta_from_to_xml() {
1691         let data = include_bytes!("../test-data/ripe-delta.xml");
1692         let delta = Delta::parse(data.as_ref()).unwrap();
1693 
1694         let mut vec = vec![];
1695         delta.write_xml(&mut vec).unwrap();
1696 
1697         let xml = unsafe {
1698             from_utf8_unchecked(vec.as_ref())
1699         };
1700 
1701         let delta_parsed = Delta::parse(xml.as_bytes()).unwrap();
1702 
1703         assert_eq!(delta, delta_parsed);
1704     }
1705 
1706     #[test]
snapshot_content()1707     fn snapshot_content() {
1708         const CONTENT: &[u8] = b"foo bar\n";
1709         let snapshot = br#"
1710             <snapshot version="1"
1711                 session_id="a2d845c4-5b91-4015-a2b7-988c03ce232a"
1712                 serial="1742"
1713                 xmlns="http://www.ripe.net/rpki/rrdp"
1714             >
1715                 <publish
1716                     uri="rsync://example.com/some/path"
1717                 >
1718                     Zm9vIGJhcgo=
1719                 </publish>
1720                 <publish
1721                     uri="rsync://example.com/some/other"
1722                 />
1723                 <publish
1724                     uri="rsync://example.com/some/third"
1725                 ></publish>
1726             </snapshot>
1727         "#;
1728 
1729         let snapshot = Snapshot::parse(&mut snapshot.as_ref()).unwrap();
1730         assert_eq!(snapshot.elements.len(), 3);
1731         assert_eq!(
1732             snapshot.elements[0],
1733             PublishElement::new(
1734                 uri::Rsync::from_str("rsync://example.com/some/path").unwrap(),
1735                 Bytes::copy_from_slice(CONTENT)
1736             )
1737         );
1738         assert_eq!(
1739             snapshot.elements[1],
1740             PublishElement::new(
1741                 uri::Rsync::from_str("rsync://example.com/some/other").unwrap(),
1742                 Bytes::new()
1743             )
1744         );
1745         assert_eq!(
1746             snapshot.elements[2],
1747             PublishElement::new(
1748                 uri::Rsync::from_str("rsync://example.com/some/third").unwrap(),
1749                 Bytes::new()
1750             )
1751         );
1752     }
1753 }
1754