1 //! Support parsing announcements in RIS Dumps 2 //! 3 //! http://www.ris.ripe.net/dumps/riswhoisdump.IPv4.gz 4 5 use std::{ 6 fmt, 7 io::{BufRead, Read}, 8 num::ParseIntError, 9 str::FromStr, 10 }; 11 12 use bytes::Bytes; 13 use libflate::gzip::Decoder; 14 15 use crate::commons::{ 16 api::{AsNumber, AuthorizationFmtError, TypedPrefix}, 17 bgp::Announcement, 18 error::KrillIoError, 19 }; 20 21 pub struct RisDumpLoader { 22 bgp_risdumps_v4_uri: String, 23 bgp_risdumps_v6_uri: String, 24 } 25 26 impl RisDumpLoader { new(bgp_risdumps_v4_uri: &str, bgp_risdumps_v6_uri: &str) -> Self27 pub fn new(bgp_risdumps_v4_uri: &str, bgp_risdumps_v6_uri: &str) -> Self { 28 RisDumpLoader { 29 bgp_risdumps_v4_uri: bgp_risdumps_v4_uri.to_string(), 30 bgp_risdumps_v6_uri: bgp_risdumps_v6_uri.to_string(), 31 } 32 } 33 download_updates(&self) -> Result<Vec<Announcement>, RisDumpError>34 pub async fn download_updates(&self) -> Result<Vec<Announcement>, RisDumpError> { 35 let v4_bytes: Bytes = reqwest::get(&self.bgp_risdumps_v4_uri).await?.bytes().await?; 36 37 let v4_bytes = Self::gunzip(v4_bytes)?; 38 39 let mut res = Self::parse_dump(v4_bytes.as_slice())?; 40 41 let v6_bytes: Bytes = reqwest::get(&self.bgp_risdumps_v6_uri).await?.bytes().await?; 42 43 let v6_bytes = Self::gunzip(v6_bytes)?; 44 45 res.append(&mut Self::parse_dump(v6_bytes.as_slice())?); 46 47 Ok(res) 48 } 49 gunzip(bytes: Bytes) -> Result<Vec<u8>, RisDumpError>50 fn gunzip(bytes: Bytes) -> Result<Vec<u8>, RisDumpError> { 51 let mut gunzipped: Vec<u8> = vec![]; 52 let mut decoder = Decoder::new(bytes.as_ref()) 53 .map_err(|e| RisDumpError::UnzipError(format!("Could not unzip dump file: {}", e)))?; 54 55 decoder 56 .read_to_end(&mut gunzipped) 57 .map_err(|e| RisDumpError::UnzipError(format!("Could not unzip dump file: {}", e)))?; 58 59 Ok(gunzipped) 60 } 61 parse_dump(bytes: &[u8]) -> Result<Vec<Announcement>, RisDumpError>62 fn parse_dump(bytes: &[u8]) -> Result<Vec<Announcement>, RisDumpError> { 63 let mut res = vec![]; 64 for lines_res in bytes.lines() { 65 let line = lines_res.map_err(RisDumpError::parse_error)?; 66 if line.is_empty() || line.starts_with('%') { 67 continue; 68 } 69 70 let mut values = line.split_whitespace(); 71 72 let asn_str = values.next().ok_or(RisDumpError::MissingColumn)?; 73 let prefix_str = values.next().ok_or(RisDumpError::MissingColumn)?; 74 let peers = values.next().ok_or(RisDumpError::MissingColumn)?; 75 76 if u32::from_str(peers)? <= 5 { 77 continue; 78 } 79 80 if asn_str.contains('{') { 81 continue; // assets not supported (not important here either) 82 } 83 84 let asn = AsNumber::from_str(asn_str)?; 85 let prefix = TypedPrefix::from_str(prefix_str)?; 86 87 let ann = Announcement::new(asn, prefix); 88 res.push(ann); 89 } 90 Ok(res) 91 } 92 } 93 94 //------------ Error -------------------------------------------------------- 95 96 #[derive(Debug)] 97 pub enum RisDumpError { 98 ReqwestError(reqwest::Error), 99 MissingColumn, 100 ParseError(String), 101 IoError(KrillIoError), 102 UnzipError(String), 103 } 104 105 impl fmt::Display for RisDumpError { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result106 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 107 match self { 108 RisDumpError::ReqwestError(e) => write!(f, "Cannot get uri: {}", e), 109 RisDumpError::MissingColumn => write!(f, "Missing column in announcements input"), 110 RisDumpError::ParseError(s) => write!(f, "Error parsing announcements: {}", s), 111 RisDumpError::IoError(e) => write!(f, "IO error: {}", e), 112 RisDumpError::UnzipError(s) => write!(f, "Error unzipping: {}", s), 113 } 114 } 115 } 116 117 impl RisDumpError { parse_error(e: impl fmt::Display) -> Self118 fn parse_error(e: impl fmt::Display) -> Self { 119 RisDumpError::ParseError(format!("{}", e)) 120 } 121 } 122 123 impl From<AuthorizationFmtError> for RisDumpError { from(e: AuthorizationFmtError) -> Self124 fn from(e: AuthorizationFmtError) -> Self { 125 Self::parse_error(e) 126 } 127 } 128 129 impl From<ParseIntError> for RisDumpError { from(e: ParseIntError) -> Self130 fn from(e: ParseIntError) -> Self { 131 RisDumpError::parse_error(e) 132 } 133 } 134 135 impl From<reqwest::Error> for RisDumpError { from(e: reqwest::Error) -> RisDumpError136 fn from(e: reqwest::Error) -> RisDumpError { 137 RisDumpError::ReqwestError(e) 138 } 139 } 140 141 impl From<KrillIoError> for RisDumpError { from(e: KrillIoError) -> Self142 fn from(e: KrillIoError) -> Self { 143 RisDumpError::IoError(e) 144 } 145 } 146 147 //------------ Tests -------------------------------------------------------- 148 149 #[cfg(test)] 150 mod tests { 151 use super::*; 152 153 #[tokio::test] 154 #[ignore] download_bgp_ris_dumps()155 async fn download_bgp_ris_dumps() { 156 let bgp_ris_dump_v4_uri = "http://www.ris.ripe.net/dumps/riswhoisdump.IPv4.gz"; 157 let bgp_ris_dump_v6_uri = "http://www.ris.ripe.net/dumps/riswhoisdump.IPv6.gz"; 158 159 let loader = RisDumpLoader::new(bgp_ris_dump_v4_uri, bgp_ris_dump_v6_uri); 160 let announcements = loader.download_updates().await.unwrap(); 161 162 assert!(!announcements.is_empty()) 163 } 164 } 165