1 //! Parsing and processing of RRDP responses. 2 //! 3 //! This module provides the scaffolding for client-side processing of the 4 //! RPKI Repository Delta Protocol (RRDP) as defined in [RFC 8182]. 5 //! 6 //! Processing is done in two parts. The RRDP notification file is parsed into 7 //! a value of type [`NotificationFile`]. Processing of snapshot and delta 8 //! files is done incrementally via the [`ProcessSnapshot`] and 9 //! [`ProcessDelta`] traits since these files can become rather big. 10 //! 11 //! The module does not provide an HTTP client. Rather, it relies on the 12 //! `std::io::Read` trait for processing. As such, it is also not compatible 13 //! with async processing. 14 //! 15 //! A note on terminology: to avoid confusion, the term ‘file’ refers to the 16 //! RRDP data itself, i.e., the notification, snapshot, and delta files. The 17 //! repository’s content synchronized using RRDP also consists of a set of 18 //! files, which we will refer to as ‘objects.’ 19 //! 20 //! [RFC 8182]: https://tools.ietf.org/html/rfc8182 21 22 #![cfg(feature = "rrdp")] 23 24 use std::{error, fmt, hash, io, str}; 25 use std::io::Read; 26 use std::convert::TryFrom; 27 use std::convert::TryInto; 28 use std::ops::Deref; 29 use bytes::Bytes; 30 use log::info; 31 use ring::digest; 32 use uuid::Uuid; 33 use crate::{uri, xml}; 34 use crate::xml::decode::{Content, Error as XmlError, Reader, Name}; 35 36 #[cfg(feature = "serde")] use std::str::FromStr; 37 #[cfg(feature = "serde")] use serde::{ 38 Deserialize, Deserializer, Serialize, Serializer 39 }; 40 41 42 //------------ NotificationFile ---------------------------------------------- 43 44 /// The RRDP Update Notification File. 45 /// 46 /// This type represents the decoded content of the RRDP Update Notification 47 /// File. It can be read from a reader via the [`parse`][Self::parse] 48 /// function. All elements are accessible as attributes. 49 #[derive(Clone, Debug, Eq, PartialEq)] 50 pub struct NotificationFile { 51 /// The identifier of the current session of the server. 52 session_id: Uuid, 53 54 /// The serial number of the most recent update. 55 serial: u64, 56 57 /// Information about the most recent snapshot. 58 snapshot: SnapshotInfo, 59 60 /// The list of available delta updates. 61 deltas: Vec<DeltaInfo>, 62 } 63 64 /// # Data Access 65 /// 66 impl NotificationFile { 67 /// Creates a new notification file from the given components. new( session_id: Uuid, serial: u64, snapshot: UriAndHash, deltas: Vec<DeltaInfo>, ) -> Self68 pub fn new( 69 session_id: Uuid, 70 serial: u64, 71 snapshot: UriAndHash, 72 deltas: Vec<DeltaInfo>, 73 ) -> Self { 74 NotificationFile { session_id, serial, snapshot, deltas } 75 } 76 77 /// Returns the identifier of the current session of the server. 78 /// 79 /// Delta updates can only be used if the session ID of the last processed 80 /// update matches this value. session_id(&self) -> Uuid81 pub fn session_id(&self) -> Uuid { 82 self.session_id 83 } 84 85 /// Returns the serial number of the most recent update. 86 /// 87 /// Serial numbers increase by one between each update. serial(&self) -> u6488 pub fn serial(&self) -> u64 { 89 self.serial 90 } 91 92 /// Returns information about the most recent snapshot. 93 /// 94 /// The snapshot contains a complete set of all data published via the 95 /// repository. It can be processed using the [`ProcessSnapshot`] trait. snapshot(&self) -> &SnapshotInfo96 pub fn snapshot(&self) -> &SnapshotInfo { 97 &self.snapshot 98 } 99 100 /// Returns the list of available delta updates. 101 /// 102 /// Deltas can be processed using the [`ProcessDelta`] trait. deltas(&self) -> &[DeltaInfo]103 pub fn deltas(&self) -> &[DeltaInfo] { 104 &self.deltas 105 } 106 107 /// Sorts the deltas by increasing serial numbers. 108 /// 109 /// In other words, the delta with the smallest serial number will 110 /// appear at the beginning of the sequence. sort_deltas(&mut self)111 pub fn sort_deltas(&mut self) { 112 self.deltas.sort_by_key(|delta| delta.serial()); 113 } 114 115 /// Sorts the deltas by decreasing serial numbers. 116 /// 117 /// In other words, the delta with the largest serial number will 118 /// appear at the beginning of the sequence. reverse_sort_deltas(&mut self)119 pub fn reverse_sort_deltas(&mut self) { 120 self.deltas.sort_by(|a,b| b.serial.cmp(&a.serial)); 121 } 122 123 /// Sorts, verifies, and optionally limits the list of deltas. 124 /// 125 /// Sorts the deltas by increasing serial number. If `limit` is given, 126 /// it then retains at most that many of the newest deltas. 127 /// 128 /// Returns whether there are no gaps in the retained deltas. sort_and_verify_deltas(&mut self, limit: Option<usize>) -> bool129 pub fn sort_and_verify_deltas(&mut self, limit: Option<usize>) -> bool { 130 if !self.deltas.is_empty() { 131 self.sort_deltas(); 132 133 if let Some(limit) = limit { 134 if limit < self.deltas.len() { 135 let offset = self.deltas.len() - limit; 136 self.deltas.drain(..offset); 137 } 138 } 139 140 let mut last_seen = self.deltas[0].serial(); 141 for delta in &self.deltas[1..] { 142 if last_seen + 1 != delta.serial() { 143 return false; 144 } else { 145 last_seen = delta.serial() 146 } 147 } 148 } 149 150 true 151 } 152 153 } 154 155 /// # XML support 156 /// 157 impl NotificationFile { 158 /// Parses the notification file from its XML representation. parse<R: io::BufRead>(reader: R) -> Result<Self, XmlError>159 pub fn parse<R: io::BufRead>(reader: R) -> Result<Self, XmlError> { 160 let mut reader = Reader::new(reader); 161 162 let mut session_id = None; 163 let mut serial = None; 164 let mut outer = reader.start(|element| { 165 if element.name() != NOTIFICATION { 166 return Err(XmlError::Malformed) 167 } 168 169 element.attributes(|name, value| match name { 170 b"version" => { 171 if value.ascii_into::<u8>()? != 1 { 172 return Err(XmlError::Malformed) 173 } 174 Ok(()) 175 } 176 b"session_id" => { 177 session_id = Some(value.ascii_into()?); 178 Ok(()) 179 } 180 b"serial" => { 181 serial = Some(value.ascii_into()?); 182 Ok(()) 183 } 184 _ => Err(XmlError::Malformed) 185 }) 186 })?; 187 188 let mut snapshot = None; 189 190 let mut deltas = vec![]; 191 192 while let Some(mut content) = outer.take_opt_element(&mut reader, 193 |element| { 194 match element.name() { 195 SNAPSHOT => { 196 if snapshot.is_some() { 197 return Err(XmlError::Malformed) 198 } 199 let mut uri = None; 200 let mut hash = None; 201 element.attributes(|name, value| match name { 202 b"uri" => { 203 uri = Some(value.ascii_into()?); 204 Ok(()) 205 } 206 b"hash" => { 207 hash = Some(value.ascii_into()?); 208 Ok(()) 209 } 210 _ => Err(XmlError::Malformed) 211 })?; 212 match (uri, hash) { 213 (Some(uri), Some(hash)) => { 214 snapshot = Some(UriAndHash::new(uri, hash)); 215 Ok(()) 216 } 217 _ => Err(XmlError::Malformed) 218 } 219 } 220 DELTA => { 221 let mut serial = None; 222 let mut uri = None; 223 let mut hash = None; 224 element.attributes(|name, value| match name { 225 b"serial" => { 226 serial = Some(value.ascii_into()?); 227 Ok(()) 228 } 229 b"uri" => { 230 uri = Some(value.ascii_into()?); 231 Ok(()) 232 } 233 b"hash" => { 234 hash = Some(value.ascii_into()?); 235 Ok(()) 236 } 237 _ => Err(XmlError::Malformed) 238 })?; 239 match (serial, uri, hash) { 240 (Some(serial), Some(uri), Some(hash)) => { 241 deltas.push(DeltaInfo::new(serial, uri, hash)); 242 Ok(()) 243 } 244 _ => Err(XmlError::Malformed) 245 } 246 } 247 _ => Err(XmlError::Malformed) 248 } 249 })? { 250 content.take_end(&mut reader)?; 251 } 252 253 outer.take_end(&mut reader)?; 254 reader.end()?; 255 256 match (session_id, serial, snapshot) { 257 (Some(session_id), Some(serial), Some(snapshot)) => { 258 Ok(NotificationFile { session_id, serial, snapshot, deltas }) 259 } 260 _ => Err(XmlError::Malformed) 261 } 262 } 263 264 /// Writes the notification file as RFC 8182 XML. write_xml( &self, writer: &mut impl io::Write ) -> Result<(), io::Error>265 pub fn write_xml( 266 &self, writer: &mut impl io::Write 267 ) -> Result<(), io::Error> { 268 let mut writer = xml::encode::Writer::new(writer); 269 writer.element(NOTIFICATION.into_unqualified())? 270 .attr("xmlns", NS)? 271 .attr("version", "1")? 272 .attr("session_id", &self.session_id)? 273 .attr("serial", &self.serial)? 274 .content(|content| { 275 // add snapshot 276 content.element(SNAPSHOT.into_unqualified())? 277 .attr("uri", self.snapshot.uri())? 278 .attr("hash", &self.snapshot.hash())? 279 ; 280 281 // add deltas 282 for delta in self.deltas() { 283 content.element(DELTA.into_unqualified())? 284 .attr("serial", &delta.serial())? 285 .attr("uri", delta.uri())? 286 .attr("hash", &delta.hash())? 287 ; 288 } 289 290 Ok(()) 291 })?; 292 writer.done() 293 } 294 } 295 296 297 //------------ PublishElement ------------------------------------------------ 298 299 /// Am RPKI object to be published for the first time. 300 /// 301 /// This type defines an RRDP publish element as found in RRDP Snapshots and 302 /// Deltas. See [`UpdateElement`] for the related element that replaces a 303 /// previous element for the same uri. 304 #[derive(Clone, Debug, Eq, PartialEq)] 305 pub struct PublishElement { 306 /// The URI of the object to be published. 307 uri: uri::Rsync, 308 309 /// The content of the object to be published. 310 /// 311 /// This is the raw content. It is _not_ Base64 encoded. 312 data: Bytes, 313 } 314 315 impl PublishElement { 316 /// Creates a new publish element from the object URI and content. 317 /// 318 /// The content provided via `data` is the raw content and must not yet 319 /// be Base64 encoded. new( uri: uri::Rsync, data: Bytes, ) -> Self320 pub fn new( 321 uri: uri::Rsync, 322 data: Bytes, 323 ) -> Self { 324 PublishElement { uri, data } 325 } 326 327 /// Returns the published object’s URI. uri(&self) -> &uri::Rsync328 pub fn uri(&self) -> &uri::Rsync { 329 &self.uri 330 } 331 332 /// Returns the published object’s content. data(&self) -> &Bytes333 pub fn data(&self) -> &Bytes { 334 &self.data 335 } 336 337 /// Converts `self` into the object’s URI and content. unpack(self) -> (uri::Rsync, Bytes)338 pub fn unpack(self) -> (uri::Rsync, Bytes) { 339 (self.uri, self.data) 340 } 341 342 /// Writes the publish element’s XML. write_xml( &self, content: &mut xml::encode::Content<impl io::Write> ) -> Result<(), io::Error>343 fn write_xml( 344 &self, 345 content: &mut xml::encode::Content<impl io::Write> 346 ) -> Result<(), io::Error> { 347 content.element(PUBLISH.into_unqualified())? 348 .attr("uri", &self.uri)? 349 .content(|content| { 350 content.base64(self.data.as_ref()) 351 })?; 352 Ok(()) 353 } 354 } 355 356 357 //------------ UpdateElement ------------------------------------------------- 358 359 /// An RPKI object to be updated with new content. 360 /// 361 /// This type defines an RRDP update element as found in RRDP deltas. It is 362 /// like a [`PublishElement`] except that it replaces an existing object for 363 /// a URI. 364 #[derive(Clone, Debug, Eq, PartialEq)] 365 pub struct UpdateElement { 366 /// The URI of the object to be updated. 367 uri: uri::Rsync, 368 369 /// The SHA-256 hash of the previous content of the object. 370 hash: Hash, 371 372 /// The new content of the object. 373 /// 374 /// This is the raw content. It is _not_ Base64 encoded. 375 data: Bytes, 376 } 377 378 impl UpdateElement { 379 /// Creates a new update element from its components. new(uri: uri::Rsync, hash: Hash, data: Bytes) -> Self380 pub fn new(uri: uri::Rsync, hash: Hash, data: Bytes) -> Self { 381 UpdateElement { uri, hash, data } 382 } 383 384 /// Returns the URI of the object to update. uri(&self) -> &uri::Rsync385 pub fn uri(&self) -> &uri::Rsync { 386 &self.uri 387 } 388 389 /// Returns the hash of the previous content. hash(&self) -> &Hash390 pub fn hash(&self) -> &Hash { 391 &self.hash 392 } 393 394 /// Returns the new content of the object. data(&self) -> &Bytes395 pub fn data(&self) -> &Bytes { 396 &self.data 397 } 398 399 /// Unpacks the update element into its components. unpack(self) -> (uri::Rsync, Hash, Bytes)400 pub fn unpack(self) -> (uri::Rsync, Hash, Bytes) { 401 (self.uri, self.hash, self.data) 402 } 403 } 404 405 impl UpdateElement { 406 /// Writes the update element’s XML. write_xml( &self, content: &mut xml::encode::Content<impl io::Write> ) -> Result<(), io::Error>407 fn write_xml( 408 &self, 409 content: &mut xml::encode::Content<impl io::Write> 410 ) -> Result<(), io::Error> { 411 content.element(PUBLISH.into_unqualified())? 412 .attr("uri", &self.uri)? 413 .attr("hash", &self.hash)? 414 .content(|content| { 415 content.base64(self.data.as_ref()) 416 })?; 417 Ok(()) 418 } 419 } 420 421 422 //------------ WithdrawElement ----------------------------------------------- 423 424 /// An RPKI object is to be delete. 425 /// 426 /// This type defines an RRDP update element as found in RRDP deltas. It is 427 /// like a [`PublishElement`] except that it removes an existing object. 428 #[derive(Clone, Debug, Eq, PartialEq)] 429 pub struct WithdrawElement { 430 /// The URI of the object to be deleted. 431 uri: uri::Rsync, 432 433 /// The SHA-256 hash of the content of the object to be deleted. 434 hash: Hash, 435 } 436 437 impl WithdrawElement { 438 /// Creates a new withdraw element from a URI and content hash. new(uri: uri::Rsync, hash: Hash) -> Self439 pub fn new(uri: uri::Rsync, hash: Hash) -> Self { 440 WithdrawElement { uri, hash } 441 } 442 443 /// Returns the URI of the object to be deleted. uri(&self) -> &uri::Rsync444 pub fn uri(&self) -> &uri::Rsync { 445 &self.uri 446 } 447 448 /// Returns the hash over the content of the object to be deleted. hash(&self) -> &Hash449 pub fn hash(&self) -> &Hash { 450 &self.hash 451 } 452 453 /// Converts the withdraw element into its URI and hash. unpack(self) -> (uri::Rsync, Hash)454 pub fn unpack(self) -> (uri::Rsync, Hash) { 455 (self.uri, self.hash) 456 } 457 } 458 459 impl WithdrawElement { 460 /// Writes the withdraw element’s XML. write_xml( &self, content: &mut xml::encode::Content<impl io::Write> ) -> Result<(), io::Error>461 fn write_xml( 462 &self, 463 content: &mut xml::encode::Content<impl io::Write> 464 ) -> Result<(), io::Error> { 465 content.element(WITHDRAW.into_unqualified())? 466 .attr("uri", &self.uri)? 467 .attr("hash", &self.hash)?; 468 Ok(()) 469 } 470 } 471 472 473 //------------ DeltaElement -------------------------------------------------- 474 475 /// A single element of a RRDP delta. 476 #[derive(Clone, Debug, Eq, PartialEq)] 477 pub enum DeltaElement { 478 /// The element publishes a new object. 479 Publish(PublishElement), 480 481 /// The element updates an existing object. 482 Update(UpdateElement), 483 484 /// The element deletes an existing object. 485 Withdraw(WithdrawElement) 486 } 487 488 impl DeltaElement { 489 /// Writes the element’s XML. write_xml( &self, content: &mut xml::encode::Content<impl io::Write> ) -> Result<(), io::Error>490 fn write_xml( 491 &self, 492 content: &mut xml::encode::Content<impl io::Write> 493 ) -> Result<(), io::Error> { 494 match self { 495 DeltaElement::Publish(p) => p.write_xml(content), 496 DeltaElement::Update(u) => u.write_xml(content), 497 DeltaElement::Withdraw(w) => w.write_xml(content) 498 } 499 } 500 } 501 502 ///--- From 503 504 impl From<PublishElement> for DeltaElement { from(src: PublishElement) -> Self505 fn from(src: PublishElement) -> Self { 506 DeltaElement::Publish(src) 507 } 508 } 509 510 impl From<UpdateElement> for DeltaElement { from(src: UpdateElement) -> Self511 fn from(src: UpdateElement) -> Self { 512 DeltaElement::Update(src) 513 } 514 } 515 516 impl From<WithdrawElement> for DeltaElement { from(src: WithdrawElement) -> Self517 fn from(src: WithdrawElement) -> Self { 518 DeltaElement::Withdraw(src) 519 } 520 } 521 522 523 //------------ Snapshot ------------------------------------------------------ 524 525 /// An RRDP snapshot. 526 /// 527 /// This type represents an owned RRDP snapshot containing the RRDP session 528 /// ID, serial number and all published elements. 529 #[derive(Clone, Debug, Eq, PartialEq)] 530 pub struct Snapshot { 531 /// The RRDP session of this snapshot. 532 session_id: Uuid, 533 534 /// The serial number of the update of this snapshot. 535 serial: u64, 536 537 /// The objects published through this snapshot. 538 elements: Vec<PublishElement>, 539 } 540 541 impl Snapshot { 542 /// Creates a new snapshot from its components. new( session_id: Uuid, serial: u64, elements: Vec<PublishElement>, ) -> Self543 pub fn new( 544 session_id: Uuid, 545 serial: u64, 546 elements: Vec<PublishElement>, 547 ) -> Self { 548 Snapshot { session_id, serial, elements } 549 } 550 551 /// Returns the session ID of this snapshot. session_id(&self) -> Uuid552 pub fn session_id(&self) -> Uuid { 553 self.session_id 554 } 555 556 /// Returns the serial number of the update represented by this snapshot. serial(&self) -> u64557 pub fn serial(&self) -> u64 { 558 self.serial 559 } 560 561 /// Returns the list of objects published by the snapshot. elements(&self) -> &[PublishElement]562 pub fn elements(&self) -> &[PublishElement] { 563 &self.elements 564 } 565 566 /// Converts the snapshots into its elements. into_elements(self) -> Vec<PublishElement>567 pub fn into_elements(self) -> Vec<PublishElement> { 568 self.elements 569 } 570 } 571 572 /// # XML Support 573 /// 574 impl Snapshot { 575 /// Parses the snapshot from its XML representation. parse<R: io::BufRead>( reader: R ) -> Result<Self, ProcessError>576 pub fn parse<R: io::BufRead>( 577 reader: R 578 ) -> Result<Self, ProcessError> { 579 let mut builder = SnapshotBuilder { 580 session_id: None, 581 serial: None, 582 elements: vec![] 583 }; 584 585 builder.process(reader)?; 586 builder.try_into() 587 } 588 589 /// Writes the snapshot’s XML representation. write_xml( &self, writer: &mut impl io::Write ) -> Result<(), io::Error>590 pub fn write_xml( 591 &self, writer: &mut impl io::Write 592 ) -> Result<(), io::Error> { 593 let mut writer = xml::encode::Writer::new(writer); 594 writer.element(SNAPSHOT.into_unqualified())? 595 .attr("xmlns", NS)? 596 .attr("version", "1")? 597 .attr("session_id", &self.session_id)? 598 .attr("serial", &self.serial)? 599 .content(|content| { 600 for el in &self.elements { 601 el.write_xml(content)?; 602 } 603 Ok(()) 604 })?; 605 writer.done() 606 } 607 } 608 609 610 //------------ SnapshotBuilder ----------------------------------------------- 611 612 struct SnapshotBuilder { 613 session_id: Option<Uuid>, 614 serial: Option<u64>, 615 elements: Vec<PublishElement>, 616 } 617 618 impl ProcessSnapshot for SnapshotBuilder { 619 type Err = ProcessError; 620 meta(&mut self, session_id: Uuid, serial: u64) -> Result<(), Self::Err>621 fn meta(&mut self, session_id: Uuid, serial: u64) -> Result<(), Self::Err> { 622 self.session_id = Some(session_id); 623 self.serial = Some(serial); 624 Ok(()) 625 } 626 publish(&mut self, uri: uri::Rsync, data: &mut ObjectReader) -> Result<(), Self::Err>627 fn publish(&mut self, uri: uri::Rsync, data: &mut ObjectReader) -> Result<(), Self::Err> { 628 let mut buf = Vec::new(); 629 data.read_to_end(&mut buf)?; 630 let data = Bytes::from(buf); 631 let element = PublishElement { uri, data }; 632 self.elements.push(element); 633 Ok(()) 634 } 635 } 636 637 impl TryFrom<SnapshotBuilder> for Snapshot { 638 type Error = ProcessError; 639 try_from(builder: SnapshotBuilder) -> Result<Self, Self::Error>640 fn try_from(builder: SnapshotBuilder) -> Result<Self, Self::Error> { 641 let session_id = builder.session_id.ok_or( 642 ProcessError::Xml(XmlError::Malformed) 643 )?; 644 645 let serial = builder.serial.ok_or( 646 ProcessError::Xml(XmlError::Malformed) 647 )?; 648 649 Ok(Snapshot { session_id, serial, elements: builder.elements }) 650 } 651 } 652 653 654 //------------ ProcessSnapshot ----------------------------------------------- 655 656 /// A type that can process an RRDP snapshot. 657 /// 658 /// The trait contains two required methods: [`meta`][Self::meta] is called 659 /// once at the beginning of the snapshot and gives the processor a chance to 660 /// check if the session ID and serial number are as expected. Then, 661 /// [`publish`][Self::publish] is called for each published object. 662 /// The processor can abort at any time by returning an error. 663 /// 664 /// The provided method [`process`][Self::process] drives the actual 665 /// processing and should thus be called when using a type that implements 666 /// this trait. 667 pub trait ProcessSnapshot { 668 /// The error type returned by the processor. 669 type Err: From<ProcessError>; 670 671 /// Processes the snapshot meta data. 672 /// 673 /// The method is called before any other method and is passed the 674 /// session ID and serial number encountered in the outermost tag of the 675 /// snapshot file’s XML. If they don’t match the expected values, the 676 /// processor should abort processing by returning an error. meta( &mut self, session_id: Uuid, serial: u64, ) -> Result<(), Self::Err>677 fn meta( 678 &mut self, 679 session_id: Uuid, 680 serial: u64, 681 ) -> Result<(), Self::Err>; 682 683 /// Processes a published object. 684 /// 685 /// The object is identified by the provided rsync URI. The object’s data 686 /// is provided via a reader. publish( &mut self, uri: uri::Rsync, data: &mut ObjectReader, ) -> Result<(), Self::Err>687 fn publish( 688 &mut self, 689 uri: uri::Rsync, 690 data: &mut ObjectReader, 691 ) -> Result<(), Self::Err>; 692 693 /// Processes a snapshot file. 694 /// 695 /// The file’s content is read from `reader`. The two required methods 696 /// are called as appropriate. If the reader fails, parsing fails, or the 697 /// methods return an error, processing is aborted and an error is 698 /// returned. process<R: io::BufRead>( &mut self, reader: R ) -> Result<(), Self::Err>699 fn process<R: io::BufRead>( 700 &mut self, 701 reader: R 702 ) -> Result<(), Self::Err> { 703 let mut reader = Reader::new(reader); 704 705 let mut session_id = None; 706 let mut serial = None; 707 let mut outer = reader.start(|element| { 708 if element.name() != SNAPSHOT { 709 info!("Bad outer: not snapshot, but {:?}", element.name()); 710 return Err(XmlError::Malformed) 711 } 712 element.attributes(|name, value| match name { 713 b"version" => { 714 if value.ascii_into::<u8>()? != 1 { 715 info!("Bad version"); 716 return Err(XmlError::Malformed) 717 } 718 Ok(()) 719 } 720 b"session_id" => { 721 session_id = Some(value.ascii_into()?); 722 Ok(()) 723 } 724 b"serial" => { 725 serial = Some(value.ascii_into()?); 726 Ok(()) 727 } 728 _ => { 729 info!("Bad attribute on snapshot."); 730 Err(XmlError::Malformed) 731 } 732 }) 733 }).map_err(Into::into)?; 734 735 match (session_id, serial) { 736 (Some(session_id), Some(serial)) => { 737 self.meta(session_id, serial)?; 738 } 739 _ => { 740 info!("Missing session or serial"); 741 return Err(ProcessError::malformed().into()) 742 } 743 } 744 745 loop { 746 let mut uri = None; 747 let inner = outer.take_opt_element(&mut reader, |element| { 748 if element.name() != PUBLISH { 749 info!("Bad inner: not publish"); 750 return Err(ProcessError::malformed()) 751 } 752 element.attributes(|name, value| match name { 753 b"uri" => { 754 uri = Some(value.ascii_into()?); 755 Ok(()) 756 } 757 _ => { 758 info!("Bad attribute on publish."); 759 Err(ProcessError::malformed()) 760 } 761 }) 762 })?; 763 let mut inner = match inner { 764 Some(inner) => inner, 765 None => break 766 }; 767 let uri = match uri { 768 Some(uri) => uri, 769 None => return Err(ProcessError::malformed().into()) 770 }; 771 ObjectReader::process(&mut inner, &mut reader, |reader| { 772 self.publish(uri, reader) 773 })?; 774 } 775 776 outer.take_end(&mut reader).map_err(Into::into)?; 777 reader.end().map_err(Into::into)?; 778 Ok(()) 779 } 780 } 781 782 783 //------------ Delta --------------------------------------------------------- 784 785 /// An RRDP delta. 786 /// 787 /// This type represents an owned RRDP snapshot containing the RRDP session 788 /// ID, serial number and all its elements. 789 #[derive(Clone, Debug, Eq, PartialEq)] 790 pub struct Delta { 791 /// The RRDP session ID of the delta. 792 session_id: Uuid, 793 794 /// The serial number of this delta. 795 serial: u64, 796 797 /// The objects changed by this delta. 798 elements: Vec<DeltaElement> 799 } 800 801 /// # Data Access 802 /// 803 impl Delta { 804 /// Creates a new delta from session ID, serial number, and elements. new( session_id: Uuid, serial: u64, elements: Vec<DeltaElement> ) -> Self805 pub fn new( 806 session_id: Uuid, 807 serial: u64, 808 elements: Vec<DeltaElement> 809 ) -> Self { 810 Delta { session_id, serial, elements } 811 } 812 813 /// Returns the session ID of the RRDP session this delta is part of. session_id(&self) -> Uuid814 pub fn session_id(&self) -> Uuid { 815 self.session_id 816 } 817 818 /// Returns the serial number of this delta. 819 /// 820 /// The serial number is identical to that of the snapshot this delta 821 /// updates _to._ serial(&self) -> u64822 pub fn serial(&self) -> u64 { 823 self.serial 824 } 825 826 /// The list of objects changed by this delta. elements(&self) -> &[DeltaElement]827 pub fn elements(&self) -> &[DeltaElement] { 828 &self.elements 829 } 830 831 /// Converts the delta into its elements. into_elements(self) -> Vec<DeltaElement>832 pub fn into_elements(self) -> Vec<DeltaElement> { 833 self.elements 834 } 835 } 836 837 /// # Decoding and Encoding XML 838 /// 839 impl Delta { 840 /// Parses the delta from its XML representation. parse<R: io::BufRead>( reader: R ) -> Result<Self, ProcessError>841 pub fn parse<R: io::BufRead>( 842 reader: R 843 ) -> Result<Self, ProcessError> { 844 let mut builder = DeltaBuilder { 845 session_id: None, 846 serial: None, 847 elements: vec![] 848 }; 849 850 builder.process(reader)?; 851 builder.try_into() 852 } 853 854 /// Write the delta’s XML representation. write_xml( &self, writer: &mut impl io::Write ) -> Result<(), io::Error>855 pub fn write_xml( 856 &self, writer: &mut impl io::Write 857 ) -> Result<(), io::Error> { 858 let mut writer = xml::encode::Writer::new(writer); 859 writer.element(DELTA.into_unqualified())? 860 .attr("xmlns", NS)? 861 .attr("version", "1")? 862 .attr("session_id", &self.session_id)? 863 .attr("serial", &self.serial)? 864 .content(|content| { 865 for el in &self.elements { 866 el.write_xml(content)?; 867 } 868 Ok(()) 869 })?; 870 writer.done() 871 } 872 } 873 874 875 //------------ DeltaBuilder -------------------------------------------------- 876 877 struct DeltaBuilder { 878 session_id: Option<Uuid>, 879 serial: Option<u64>, 880 elements: Vec<DeltaElement> 881 } 882 883 impl ProcessDelta for DeltaBuilder { 884 type Err = ProcessError; 885 meta( &mut self, session_id: Uuid, serial: u64, ) -> Result<(), Self::Err>886 fn meta( 887 &mut self, 888 session_id: Uuid, 889 serial: u64, 890 ) -> Result<(), Self::Err> { 891 self.session_id = Some(session_id); 892 self.serial = Some(serial); 893 Ok(()) 894 } 895 publish( &mut self, uri: uri::Rsync, hash_opt: Option<Hash>, data: &mut ObjectReader, ) -> Result<(), Self::Err>896 fn publish( 897 &mut self, 898 uri: uri::Rsync, 899 hash_opt: Option<Hash>, 900 data: &mut ObjectReader, 901 ) -> Result<(), Self::Err> { 902 let mut buf = Vec::new(); 903 data.read_to_end(&mut buf)?; 904 let data = Bytes::from(buf); 905 match hash_opt { 906 Some(hash) => { 907 let update = UpdateElement { uri, hash, data}; 908 self.elements.push(DeltaElement::Update(update)); 909 }, 910 None => { 911 let publish = PublishElement { uri, data}; 912 self.elements.push(DeltaElement::Publish(publish)); 913 } 914 } 915 Ok(()) 916 } 917 withdraw( &mut self, uri: uri::Rsync, hash: Hash, ) -> Result<(), Self::Err>918 fn withdraw( 919 &mut self, 920 uri: uri::Rsync, 921 hash: Hash, 922 ) -> Result<(), Self::Err> { 923 let withdraw = WithdrawElement { uri, hash }; 924 self.elements.push(DeltaElement::Withdraw(withdraw)); 925 Ok(()) 926 } 927 } 928 929 impl TryFrom<DeltaBuilder> for Delta { 930 type Error = ProcessError; 931 try_from(builder: DeltaBuilder) -> Result<Self, Self::Error>932 fn try_from(builder: DeltaBuilder) -> Result<Self, Self::Error> { 933 let session_id = builder.session_id.ok_or( 934 ProcessError::Xml(XmlError::Malformed) 935 )?; 936 937 let serial = builder.serial.ok_or( 938 ProcessError::Xml(XmlError::Malformed) 939 )?; 940 941 Ok(Delta { session_id, serial, elements: builder.elements }) 942 943 } 944 } 945 946 //------------ ProcessDelta -------------------------------------------------- 947 948 /// A type that can process an RRDP delta. 949 /// 950 /// The trait contains three required methods: [`meta`][Self::meta] is called 951 /// once at the beginning of the snapshot and gives the processor a chance to 952 /// check if the session ID and serial number are as expected. Then, 953 /// [`publish`][Self::publish] is called for each newly published or 954 /// updated object and [`withdraw`][Self::withdraw] is called for each 955 /// deleted object. The processor can abort at any time by returning an error. 956 /// 957 /// The provided method [`process`][Self::process] drives the actual 958 /// processing and should thus be called when using a type that implements 959 /// this trait. 960 pub trait ProcessDelta { 961 /// The error type returned by the processor. 962 type Err: From<ProcessError>; 963 964 /// Processes the delta meta data. 965 /// 966 /// The method is called before any other method and is passed the 967 /// session ID and serial number encountered in the outermost tag of the 968 /// delta file’s XML. If they don’t match the expected values, the 969 /// processor should abort processing by returning an error. meta( &mut self, session_id: Uuid, serial: u64, ) -> Result<(), Self::Err>970 fn meta( 971 &mut self, 972 session_id: Uuid, 973 serial: u64, 974 ) -> Result<(), Self::Err>; 975 976 /// Processes a published object. 977 /// 978 /// The object is identified by the rsync URI provided in `uri`. If the 979 /// object is updated, the hash over the previous content of the object 980 /// is given in `hash`. If the object is newly published, `hash` will be 981 /// `None`. The (new) content of the object is provided via the reader in 982 /// `data`. publish( &mut self, uri: uri::Rsync, hash: Option<Hash>, data: &mut ObjectReader, ) -> Result<(), Self::Err>983 fn publish( 984 &mut self, 985 uri: uri::Rsync, 986 hash: Option<Hash>, 987 data: &mut ObjectReader, 988 ) -> Result<(), Self::Err>; 989 990 /// Processes a withdrawn object. 991 /// 992 /// The object is identified by the rsync URI provided in `uri`. The hash 993 /// over the expected content of the object to be deleted is given in 994 /// `hash`. withdraw( &mut self, uri: uri::Rsync, hash: Hash, ) -> Result<(), Self::Err>995 fn withdraw( 996 &mut self, 997 uri: uri::Rsync, 998 hash: Hash, 999 ) -> Result<(), Self::Err>; 1000 1001 1002 /// Processes a delta file. 1003 /// 1004 /// The file’s content is taken from `reader`. The content is parsed and 1005 /// the three required methods are called as required. 1006 /// 1007 /// If the reader fails, parsing fails, or the methods return an error, 1008 /// processing is aborted and an error is returned. process<R: io::BufRead>( &mut self, reader: R ) -> Result<(), Self::Err>1009 fn process<R: io::BufRead>( 1010 &mut self, 1011 reader: R 1012 ) -> Result<(), Self::Err> { 1013 let mut reader = Reader::new(reader); 1014 1015 let mut session_id = None; 1016 let mut serial = None; 1017 let mut outer = reader.start(|element| { 1018 if element.name() != DELTA { 1019 return Err(ProcessError::malformed()) 1020 } 1021 element.attributes(|name, value| match name { 1022 b"version" => { 1023 if value.ascii_into::<u8>()? != 1 { 1024 return Err(ProcessError::malformed()) 1025 } 1026 Ok(()) 1027 } 1028 b"session_id" => { 1029 session_id = Some(value.ascii_into()?); 1030 Ok(()) 1031 } 1032 b"serial" => { 1033 serial = Some(value.ascii_into()?); 1034 Ok(()) 1035 } 1036 _ => Err(ProcessError::malformed()) 1037 }) 1038 })?; 1039 1040 match (session_id, serial) { 1041 (Some(session_id), Some(serial)) => { 1042 self.meta(session_id, serial)?; 1043 } 1044 _ => return Err(ProcessError::malformed().into()) 1045 } 1046 1047 loop { 1048 let mut action = None; 1049 let mut uri = None; 1050 let mut hash = None; 1051 let inner = outer.take_opt_element(&mut reader, |element| { 1052 match element.name() { 1053 PUBLISH => action = Some(Action::Publish), 1054 WITHDRAW => action = Some(Action::Withdraw), 1055 _ => return Err(ProcessError::malformed()), 1056 }; 1057 element.attributes(|name, value| match name { 1058 b"uri" => { 1059 uri = Some(value.ascii_into()?); 1060 Ok(()) 1061 } 1062 b"hash" => { 1063 hash = Some(value.ascii_into()?); 1064 Ok(()) 1065 } 1066 _ => Err(ProcessError::malformed()) 1067 }) 1068 })?; 1069 let mut inner = match inner { 1070 Some(inner) => inner, 1071 None => break 1072 }; 1073 let uri = match uri { 1074 Some(uri) => uri, 1075 None => return Err(ProcessError::malformed().into()) 1076 }; 1077 match action.unwrap() { // Or we'd have exited already. 1078 Action::Publish => { 1079 ObjectReader::process( 1080 &mut inner, &mut reader, 1081 |reader| self.publish(uri, hash, reader) 1082 )?; 1083 } 1084 Action::Withdraw => { 1085 let hash = match hash { 1086 Some(hash) => hash, 1087 None => return Err(ProcessError::malformed().into()) 1088 }; 1089 self.withdraw(uri, hash)?; 1090 inner.take_end(&mut reader).map_err(Into::into)?; 1091 } 1092 } 1093 } 1094 outer.take_end(&mut reader).map_err(Into::into)?; 1095 reader.end().map_err(Into::into)?; 1096 Ok(()) 1097 } 1098 1099 } 1100 1101 1102 //------------ SnapshotInfo -------------------------------------------------- 1103 1104 /// The URI and HASH of the current snapshot for a [`NotificationFile`]. 1105 pub type SnapshotInfo = UriAndHash; 1106 1107 1108 //------------ DeltaInfo ----------------------------------------------------- 1109 1110 /// The serial, URI and HASH of a delta in a [`NotificationFile`]. 1111 #[derive(Clone, Debug, Eq, PartialEq)] 1112 pub struct DeltaInfo { 1113 serial: u64, 1114 uri_and_hash: UriAndHash 1115 } 1116 1117 impl DeltaInfo { 1118 /// Creates a new info from its components. new(serial: u64, uri: uri::Https, hash: Hash) -> Self1119 pub fn new(serial: u64, uri: uri::Https, hash: Hash) -> Self { 1120 DeltaInfo { 1121 serial, 1122 uri_and_hash: UriAndHash::new(uri, hash) 1123 } 1124 } 1125 1126 /// Returns the serial number of this delta. serial(&self) -> u641127 pub fn serial(&self) -> u64 { 1128 self.serial 1129 } 1130 } 1131 1132 impl Deref for DeltaInfo { 1133 type Target = UriAndHash; 1134 deref(&self) -> &Self::Target1135 fn deref(&self) -> &Self::Target { 1136 &self.uri_and_hash 1137 } 1138 } 1139 1140 //------------ UriAndHash ---------------------------------------------------- 1141 1142 /// The URI of an RRDP file and a SHA-256 hash over its content. 1143 /// 1144 /// In order to detect accidental or malicious modifications of the data 1145 /// all references to RRDP files are given with a SHA-256 hash over the 1146 /// expected content of that file, allowing a client to verify they got the 1147 /// right file. 1148 #[derive(Clone, Debug, Eq, PartialEq)] 1149 pub struct UriAndHash { 1150 /// The URI of the RRDP file. 1151 uri: uri::Https, 1152 1153 /// The expected SHA-256 hash over the file’s content. 1154 hash: Hash, 1155 } 1156 1157 impl UriAndHash { 1158 /// Creates a new URI-and-hash pair. new(uri: uri::Https, hash: Hash) -> Self1159 pub fn new(uri: uri::Https, hash: Hash) -> Self { 1160 UriAndHash { uri, hash } 1161 } 1162 1163 /// Returns a reference to the URI. uri(&self) -> &uri::Https1164 pub fn uri(&self) -> &uri::Https { 1165 &self.uri 1166 } 1167 1168 /// Returns the expected SHA-256 hash. hash(&self) -> Hash1169 pub fn hash(&self) -> Hash { 1170 self.hash 1171 } 1172 1173 /// Converts the pair into just the URI. into_uri(self) -> uri::Https1174 pub fn into_uri(self) -> uri::Https { 1175 self.uri 1176 } 1177 1178 /// Converts `self` into a pair of URI and hash. into_pair(self) -> (uri::Https, Hash)1179 pub fn into_pair(self) -> (uri::Https, Hash) { 1180 (self.uri, self.hash) 1181 } 1182 } 1183 1184 1185 //------------ Hash ---------------------------------------------------------- 1186 1187 /// A hash over RRDP data. 1188 /// 1189 /// This hash is used both for verifying the correctness of RRDP files as well 1190 /// as update or deletion of the right objects. 1191 /// 1192 /// RRDP exclusively uses SHA-256 and provides no means of choosing a different 1193 /// algorithm. Consequently, this type is a wrapper around a 32 byte array 1194 /// holding SHA-256 output. 1195 #[derive(Clone, Copy, Eq, hash::Hash, PartialEq)] 1196 #[repr(transparent)] // ensure that size_of::<Hash>() == 32. 1197 pub struct Hash([u8; 32]); 1198 1199 impl Hash { 1200 /// Returns a reference to the octets as a slice. as_slice(&self) -> &[u8]1201 pub fn as_slice(&self) -> &[u8] { 1202 self.0.as_ref() 1203 } 1204 1205 /// Returns a new Hash from the provided data from_data(data: &[u8]) -> Self1206 pub fn from_data(data: &[u8]) -> Self { 1207 let digest = digest::digest(&digest::SHA256, data); 1208 Self::try_from(digest.as_ref()).unwrap() 1209 } 1210 1211 /// Returns whether this hash matches the provided data matches(&self, data: &[u8]) -> bool1212 pub fn matches(&self, data: &[u8]) -> bool { 1213 let data_hash = Self::from_data(data); 1214 *self == data_hash 1215 } 1216 } 1217 1218 1219 //--- From, TryFrom, and FromStr 1220 1221 impl From<[u8;32]> for Hash { from(value: [u8;32]) -> Hash1222 fn from(value: [u8;32]) -> Hash { 1223 Hash(value) 1224 } 1225 } 1226 1227 impl From<Hash> for [u8; 32] { from(src: Hash) -> Self1228 fn from(src: Hash) -> Self { 1229 src.0 1230 } 1231 } 1232 1233 impl<'a> TryFrom<&'a [u8]> for Hash { 1234 type Error = std::array::TryFromSliceError; 1235 try_from(src: &'a [u8]) -> Result<Self, Self::Error>1236 fn try_from(src: &'a [u8]) -> Result<Self, Self::Error> { 1237 TryFrom::try_from(src).map(Hash) 1238 } 1239 } 1240 1241 impl TryFrom<digest::Digest> for Hash { 1242 type Error = AlgorithmError; 1243 try_from(digest: digest::Digest) -> Result<Self, Self::Error>1244 fn try_from(digest: digest::Digest) -> Result<Self, Self::Error> { 1245 // XXX This doesn’t properly check the algorithm. 1246 TryFrom::try_from( 1247 digest.as_ref() 1248 ).map(Hash).map_err(|_| AlgorithmError(())) 1249 } 1250 } 1251 1252 impl str::FromStr for Hash { 1253 type Err = ParseHashError; 1254 1255 /// Parses a string into a hash. 1256 /// 1257 /// The string must consist of exactly 64 hexadecimal digits and nothing 1258 /// else. from_str(s: &str) -> Result<Self, Self::Err>1259 fn from_str(s: &str) -> Result<Self, Self::Err> { 1260 if s.len() != 64 { 1261 return Err(ParseHashError::BAD_LENGTH) 1262 } 1263 let mut res = [0u8; 32]; 1264 let mut s = s.chars(); 1265 for octet in &mut res { 1266 let first = s.next().ok_or( 1267 ParseHashError::BAD_LENGTH 1268 )?.to_digit(16).ok_or( 1269 ParseHashError::BAD_CHARS 1270 )?; 1271 let second = s.next().ok_or( 1272 ParseHashError::BAD_LENGTH 1273 )?.to_digit(16).ok_or( 1274 ParseHashError::BAD_CHARS 1275 )?; 1276 *octet = (first << 4 | second) as u8; 1277 } 1278 Ok(Hash(res)) 1279 } 1280 } 1281 1282 1283 //--- AsRef 1284 1285 impl AsRef<[u8]> for Hash { as_ref(&self) -> &[u8]1286 fn as_ref(&self) -> &[u8] { 1287 self.0.as_ref() 1288 } 1289 } 1290 1291 1292 //--- PartialEq 1293 // 1294 // PartialEq<Self> and Eq are derived. 1295 1296 impl PartialEq<digest::Digest> for Hash { eq(&self, other: &digest::Digest) -> bool1297 fn eq(&self, other: &digest::Digest) -> bool { 1298 // XXX This doesn’t properly check the algorithm. 1299 self.0.as_ref() == other.as_ref() 1300 } 1301 } 1302 1303 1304 //--- Display and Debug 1305 1306 impl fmt::Display for Hash { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1307 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 1308 for &ch in self.as_slice() { 1309 write!(f, "{:02x}", ch)?; 1310 } 1311 Ok(()) 1312 } 1313 } 1314 1315 impl fmt::Debug for Hash { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1316 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 1317 write!(f, "Hash({})", self) 1318 } 1319 } 1320 1321 //--- Serialize and Deserialize 1322 1323 #[cfg(feature = "serde")] 1324 impl Serialize for Hash { serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: Serializer1325 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> 1326 where S: Serializer { 1327 self.to_string().serialize(serializer) 1328 } 1329 } 1330 1331 #[cfg(feature = "serde")] 1332 impl<'de> Deserialize<'de> for Hash { deserialize<D>(deserializer: D) -> Result<Self, D::Error> where D: Deserializer<'de>1333 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 1334 where D: Deserializer<'de> { 1335 let hex_str = String::deserialize(deserializer)?; 1336 Hash::from_str(&hex_str).map_err(serde::de::Error::custom) 1337 } 1338 } 1339 1340 1341 //------------ Action -------------------------------------------------------- 1342 1343 /// The choice of actions in a delta file. 1344 enum Action { 1345 /// An object is to be inserted or updated. 1346 /// 1347 /// The object is to be updated, if a hash is given. Otherwise it is 1348 /// to be inserted. 1349 Publish, 1350 1351 /// An object is to be deleted. 1352 Withdraw, 1353 } 1354 1355 1356 //------------ ObjectReader -------------------------------------------------- 1357 1358 /// A reader providing the content of an object. 1359 /// 1360 /// The content is included in base64 encoding in the RRDP’s XML. This reader 1361 /// provides access to the decoded data via the standard `Read` trait. 1362 pub struct ObjectReader<'a>( 1363 /// The base64 encoded data. 1364 base64::read::DecoderReader<'a, &'a [u8]> 1365 ); 1366 1367 impl<'a> ObjectReader<'a> { 1368 /// Processes an element with optional XML PCDATA as object content. 1369 /// 1370 /// An object reader is created and passed to the closure `op` for 1371 /// actual processing. 1372 /// 1373 /// This method expects the next XML event to either be text or the end 1374 /// of an element. It will process both. process<R, T, E, F> ( content: &mut Content, reader: &mut Reader<R>, op: F ) -> Result<T, E> where R: io::BufRead, E: From<ProcessError>, F: FnOnce(&mut ObjectReader) -> Result<T, E>1375 fn process<R, T, E, F> ( 1376 content: &mut Content, 1377 reader: &mut Reader<R>, 1378 op: F 1379 ) -> Result<T, E> 1380 where 1381 R: io::BufRead, 1382 E: From<ProcessError>, 1383 F: FnOnce(&mut ObjectReader) -> Result<T, E> 1384 { 1385 // XXX This could probably do with a bit of optimization. 1386 let data_b64 = content.take_opt_final_text(reader, |text| { 1387 // The text is supposed to be xsd:base64Binary which only allows 1388 // the base64 characters plus whitespace. 1389 Ok(text.to_ascii()?.as_bytes().iter().filter_map(|b| { 1390 if b.is_ascii_whitespace() { None } 1391 else { Some(*b) } 1392 }).collect::<Vec<_>>()) 1393 })?.unwrap_or_else(Vec::new); 1394 let mut data_b64 = data_b64.as_slice(); 1395 op( 1396 &mut ObjectReader(base64::read::DecoderReader::new( 1397 &mut data_b64, base64::STANDARD 1398 )) 1399 ) 1400 } 1401 } 1402 1403 impl<'a> io::Read for ObjectReader<'a> { read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error>1404 fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> { 1405 self.0.read(buf) 1406 } 1407 } 1408 1409 1410 //------------ XML Names ----------------------------------------------------- 1411 1412 const NS: &[u8] = b"http://www.ripe.net/rpki/rrdp"; 1413 const NOTIFICATION: Name = Name::qualified(NS, b"notification"); 1414 const SNAPSHOT: Name = Name::qualified(NS, b"snapshot"); 1415 const DELTA: Name = Name::qualified(NS, b"delta"); 1416 const PUBLISH: Name = Name::qualified(NS, b"publish"); 1417 const WITHDRAW: Name = Name::qualified(NS, b"withdraw"); 1418 1419 1420 //============ Errors ======================================================== 1421 1422 //------------ AlgorithmError ------------------------------------------------ 1423 1424 /// A digest was of the wrong algorithm. 1425 #[derive(Clone, Copy, Debug)] 1426 pub struct AlgorithmError(()); 1427 1428 impl fmt::Display for AlgorithmError { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1429 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 1430 f.write_str("algorithm mismatch") 1431 } 1432 } 1433 1434 impl error::Error for AlgorithmError { } 1435 1436 1437 //------------ ParseHashError ------------------------------------------------ 1438 1439 /// An error happened while parsing a hash. 1440 #[derive(Clone, Copy, Debug)] 1441 pub struct ParseHashError(&'static str); 1442 1443 impl ParseHashError { 1444 /// The error when the hash value was of the wrong length. 1445 const BAD_LENGTH: Self = ParseHashError("invalid length"); 1446 1447 /// The error when the hash value contained illegal characters. 1448 const BAD_CHARS: Self = ParseHashError("invalid characters"); 1449 } 1450 1451 impl fmt::Display for ParseHashError { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1452 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 1453 f.write_str(self.0) 1454 } 1455 } 1456 1457 impl error::Error for ParseHashError { } 1458 1459 1460 //------------ ProcessError -------------------------------------------------- 1461 1462 /// An error occurred while processing RRDP data. 1463 #[derive(Debug)] 1464 pub enum ProcessError { 1465 /// An IO error happened. 1466 Io(io::Error), 1467 1468 /// The XML was not correctly formed. 1469 Xml(XmlError), 1470 } 1471 1472 impl ProcessError { 1473 /// Creates an error when the XML was malformed. malformed() -> Self1474 fn malformed() -> Self { 1475 ProcessError::Xml(XmlError::Malformed) 1476 } 1477 } 1478 1479 impl From<io::Error> for ProcessError { from(err: io::Error) -> Self1480 fn from(err: io::Error) -> Self { 1481 ProcessError::Io(err) 1482 } 1483 } 1484 1485 impl From<XmlError> for ProcessError { from(err: XmlError) -> Self1486 fn from(err: XmlError) -> Self { 1487 ProcessError::Xml(err) 1488 } 1489 } 1490 1491 impl fmt::Display for ProcessError { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1492 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 1493 match self { 1494 ProcessError::Io(ref inner) => inner.fmt(f), 1495 ProcessError::Xml(ref inner) => inner.fmt(f), 1496 } 1497 } 1498 } 1499 1500 impl error::Error for ProcessError { } 1501 1502 1503 //============ Tests ========================================================= 1504 1505 #[cfg(test)] 1506 mod test { 1507 use std::str::from_utf8_unchecked; 1508 1509 use super::*; 1510 1511 pub struct Test; 1512 1513 impl ProcessSnapshot for Test { 1514 type Err = ProcessError; 1515 meta( &mut self, _session_id: Uuid, _serial: u64, ) -> Result<(), Self::Err>1516 fn meta( 1517 &mut self, 1518 _session_id: Uuid, 1519 _serial: u64, 1520 ) -> Result<(), Self::Err> { 1521 Ok(()) 1522 } 1523 publish( &mut self, _uri: uri::Rsync, _data: &mut ObjectReader, ) -> Result<(), Self::Err>1524 fn publish( 1525 &mut self, 1526 _uri: uri::Rsync, 1527 _data: &mut ObjectReader, 1528 ) -> Result<(), Self::Err> { 1529 Ok(()) 1530 } 1531 } 1532 1533 impl ProcessDelta for Test { 1534 type Err = ProcessError; 1535 meta( &mut self, _session_id: Uuid, _serial: u64, ) -> Result<(), Self::Err>1536 fn meta( 1537 &mut self, 1538 _session_id: Uuid, 1539 _serial: u64, 1540 ) -> Result<(), Self::Err> { 1541 Ok(()) 1542 } 1543 publish( &mut self, _uri: uri::Rsync, _hash: Option<Hash>, _data: &mut ObjectReader, ) -> Result<(), Self::Err>1544 fn publish( 1545 &mut self, 1546 _uri: uri::Rsync, 1547 _hash: Option<Hash>, 1548 _data: &mut ObjectReader, 1549 ) -> Result<(), Self::Err> { 1550 Ok(()) 1551 } 1552 withdraw( &mut self, _uri: uri::Rsync, _hash: Hash, ) -> Result<(), Self::Err>1553 fn withdraw( 1554 &mut self, 1555 _uri: uri::Rsync, 1556 _hash: Hash, 1557 ) -> Result<(), Self::Err> { 1558 Ok(()) 1559 } 1560 } 1561 1562 #[test] ripe_notification()1563 fn ripe_notification() { 1564 NotificationFile::parse( 1565 include_bytes!("../test-data/ripe-notification.xml").as_ref() 1566 ).unwrap(); 1567 } 1568 1569 #[test] lolz_notification()1570 fn lolz_notification() { 1571 assert!( 1572 NotificationFile::parse( 1573 include_bytes!("../test-data/lolz-notification.xml").as_ref() 1574 ).is_err() 1575 ); 1576 } 1577 1578 #[test] gaps_notification()1579 fn gaps_notification() { 1580 let mut notification_without_gaps = NotificationFile::parse( 1581 include_bytes!("../test-data/ripe-notification.xml").as_ref() 1582 ).unwrap(); 1583 assert!(notification_without_gaps.sort_and_verify_deltas(None)); 1584 1585 let mut notification_with_gaps = NotificationFile::parse( 1586 include_bytes!("../test-data/ripe-notification-with-gaps.xml").as_ref() 1587 ).unwrap(); 1588 assert!(!notification_with_gaps.sort_and_verify_deltas(None)); 1589 } 1590 1591 #[test] limit_notification_deltas()1592 fn limit_notification_deltas() { 1593 let mut notification_without_gaps = NotificationFile::parse( 1594 include_bytes!("../test-data/ripe-notification.xml").as_ref() 1595 ).unwrap(); 1596 assert!(notification_without_gaps.sort_and_verify_deltas(Some(2))); 1597 1598 assert_eq!(2, notification_without_gaps.deltas().len()); 1599 assert_eq!(notification_without_gaps.deltas().first().unwrap().serial(), notification_without_gaps.serial() - 1); 1600 assert_eq!(notification_without_gaps.deltas().last().unwrap().serial(), notification_without_gaps.serial()); 1601 } 1602 1603 1604 #[test] unsorted_notification()1605 fn unsorted_notification() { 1606 let mut from_sorted = NotificationFile::parse( 1607 include_bytes!("../test-data/ripe-notification.xml").as_ref() 1608 ).unwrap(); 1609 1610 let mut from_unsorted = NotificationFile::parse( 1611 include_bytes!("../test-data/ripe-notification-unsorted.xml").as_ref() 1612 ).unwrap(); 1613 1614 assert_ne!(from_sorted, from_unsorted); 1615 1616 from_unsorted.reverse_sort_deltas(); 1617 assert_eq!(from_sorted, from_unsorted); 1618 1619 from_unsorted.sort_deltas(); 1620 assert_ne!(from_sorted, from_unsorted); 1621 1622 from_sorted.sort_deltas(); 1623 assert_eq!(from_sorted, from_unsorted); 1624 } 1625 1626 #[test] ripe_snapshot()1627 fn ripe_snapshot() { 1628 <Test as ProcessSnapshot>::process( 1629 &mut Test, 1630 include_bytes!("../test-data/ripe-snapshot.xml").as_ref() 1631 ).unwrap(); 1632 } 1633 1634 #[test] ripe_delta()1635 fn ripe_delta() { 1636 <Test as ProcessDelta>::process( 1637 &mut Test, 1638 include_bytes!("../test-data/ripe-delta.xml").as_ref() 1639 ).unwrap(); 1640 } 1641 1642 #[test] hash_to_hash()1643 fn hash_to_hash() { 1644 use std::str::FromStr; 1645 1646 let string = "this is a test"; 1647 let sha256 = "2e99758548972a8e8822ad47fa1017ff72f06f3ff6a016851f45c398732bc50c"; 1648 let hash = Hash::from_str(sha256).unwrap(); 1649 let hash_from_data = Hash::from_data(string.as_bytes()); 1650 assert_eq!(hash, hash_from_data); 1651 assert!(hash.matches(string.as_bytes())); 1652 } 1653 1654 #[test] notification_from_to_xml()1655 fn notification_from_to_xml() { 1656 let notification = NotificationFile::parse( 1657 include_bytes!("../test-data/ripe-notification.xml").as_ref() 1658 ).unwrap(); 1659 1660 let mut vec = vec![]; 1661 notification.write_xml(&mut vec).unwrap(); 1662 1663 let xml = unsafe { 1664 from_utf8_unchecked(vec.as_ref()) 1665 }; 1666 1667 let notification_parsed = NotificationFile::parse(xml.as_bytes()).unwrap(); 1668 1669 assert_eq!(notification, notification_parsed); 1670 } 1671 1672 #[test] snapshot_from_to_xml()1673 fn snapshot_from_to_xml() { 1674 let data = include_bytes!("../test-data/ripe-snapshot.xml"); 1675 let snapshot = Snapshot::parse(data.as_ref()).unwrap(); 1676 1677 let mut vec = vec![]; 1678 snapshot.write_xml(&mut vec).unwrap(); 1679 1680 let xml = unsafe { 1681 from_utf8_unchecked(vec.as_ref()) 1682 }; 1683 1684 let snapshot_parsed = Snapshot::parse(xml.as_bytes()).unwrap(); 1685 1686 assert_eq!(snapshot, snapshot_parsed); 1687 } 1688 1689 #[test] delta_from_to_xml()1690 fn delta_from_to_xml() { 1691 let data = include_bytes!("../test-data/ripe-delta.xml"); 1692 let delta = Delta::parse(data.as_ref()).unwrap(); 1693 1694 let mut vec = vec![]; 1695 delta.write_xml(&mut vec).unwrap(); 1696 1697 let xml = unsafe { 1698 from_utf8_unchecked(vec.as_ref()) 1699 }; 1700 1701 let delta_parsed = Delta::parse(xml.as_bytes()).unwrap(); 1702 1703 assert_eq!(delta, delta_parsed); 1704 } 1705 1706 #[test] snapshot_content()1707 fn snapshot_content() { 1708 const CONTENT: &[u8] = b"foo bar\n"; 1709 let snapshot = br#" 1710 <snapshot version="1" 1711 session_id="a2d845c4-5b91-4015-a2b7-988c03ce232a" 1712 serial="1742" 1713 xmlns="http://www.ripe.net/rpki/rrdp" 1714 > 1715 <publish 1716 uri="rsync://example.com/some/path" 1717 > 1718 Zm9vIGJhcgo= 1719 </publish> 1720 <publish 1721 uri="rsync://example.com/some/other" 1722 /> 1723 <publish 1724 uri="rsync://example.com/some/third" 1725 ></publish> 1726 </snapshot> 1727 "#; 1728 1729 let snapshot = Snapshot::parse(&mut snapshot.as_ref()).unwrap(); 1730 assert_eq!(snapshot.elements.len(), 3); 1731 assert_eq!( 1732 snapshot.elements[0], 1733 PublishElement::new( 1734 uri::Rsync::from_str("rsync://example.com/some/path").unwrap(), 1735 Bytes::copy_from_slice(CONTENT) 1736 ) 1737 ); 1738 assert_eq!( 1739 snapshot.elements[1], 1740 PublishElement::new( 1741 uri::Rsync::from_str("rsync://example.com/some/other").unwrap(), 1742 Bytes::new() 1743 ) 1744 ); 1745 assert_eq!( 1746 snapshot.elements[2], 1747 PublishElement::new( 1748 uri::Rsync::from_str("rsync://example.com/some/third").unwrap(), 1749 Bytes::new() 1750 ) 1751 ); 1752 } 1753 } 1754