1 //! A simple message queue, responsible for listening for (CA) events, 2 //! making them available for triggered processing, such as publishing 3 //! signed material, or asking a newly added parent for resource 4 //! entitlements. 5 6 use std::{ 7 collections::{HashMap, VecDeque}, 8 fmt, 9 sync::RwLock, 10 }; 11 12 use rpki::repository::x509::Time; 13 14 use crate::{ 15 commons::{ 16 api::{Handle, ParentHandle, ResourceClassName, RevocationRequest}, 17 eventsourcing::{self, Event}, 18 }, 19 daemon::ca::{CaEvt, CaEvtDet, CertAuth}, 20 }; 21 22 //------------ QueueTask ---------------------------------------------------- 23 24 /// This type contains tasks with the details needed for triggered processing. 25 #[derive(Clone, Debug, Eq, PartialEq)] 26 #[allow(clippy::large_enum_variant)] 27 pub enum QueueTask { 28 ServerStarted, 29 30 SyncRepo { 31 ca: Handle, 32 }, 33 RescheduleSyncRepo { 34 ca: Handle, 35 due: Time, 36 }, 37 38 SyncParent { 39 ca: Handle, 40 parent: ParentHandle, 41 }, 42 RescheduleSyncParent { 43 ca: Handle, 44 parent: ParentHandle, 45 due: Time, 46 }, 47 48 ResourceClassRemoved { 49 ca: Handle, 50 parent: ParentHandle, 51 revocation_requests: HashMap<ResourceClassName, Vec<RevocationRequest>>, 52 }, 53 UnexpectedKey { 54 ca: Handle, 55 rcn: ResourceClassName, 56 revocation_request: RevocationRequest, 57 }, 58 } 59 60 impl fmt::Display for QueueTask { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result61 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 62 match self { 63 QueueTask::ServerStarted => write!(f, "Server just started"), 64 QueueTask::SyncRepo { ca } => write!(f, "synchronize repo for '{}'", ca), 65 QueueTask::RescheduleSyncRepo { ca, due } => write!( 66 f, 67 "reschedule failed synchronize repo for '{}' at: {}", 68 ca, 69 due.to_rfc3339() 70 ), 71 QueueTask::SyncParent { ca, parent } => write!(f, "synchronize CA '{}' with parent '{}'", ca, parent), 72 QueueTask::RescheduleSyncParent { ca, parent, due } => write!( 73 f, 74 "reschedule failed synchronize CA '{}' with parent '{}' for {}", 75 ca, 76 parent, 77 due.to_rfc3339() 78 ), 79 QueueTask::ResourceClassRemoved { ca, .. } => { 80 write!(f, "resource class removed for '{}' ", ca) 81 } 82 QueueTask::UnexpectedKey { ca, rcn, .. } => { 83 write!(f, "unexpected key found for '{}' resource class: '{}'", ca, rcn) 84 } 85 } 86 } 87 } 88 89 #[derive(Debug)] 90 pub struct MessageQueue { 91 q: RwLock<VecDeque<QueueTask>>, 92 } 93 94 impl Default for MessageQueue { default() -> Self95 fn default() -> Self { 96 let mut vec = VecDeque::new(); 97 vec.push_back(QueueTask::ServerStarted); 98 MessageQueue { q: RwLock::new(vec) } 99 } 100 } 101 102 impl MessageQueue { pop_all(&self) -> Vec<QueueTask>103 pub fn pop_all(&self) -> Vec<QueueTask> { 104 let mut res = vec![]; 105 let mut q = self.q.write().unwrap(); 106 while let Some(evt) = q.pop_front() { 107 res.push(evt); 108 } 109 res 110 } 111 schedule(&self, task: QueueTask)112 fn schedule(&self, task: QueueTask) { 113 let mut q = self.q.write().unwrap(); 114 q.push_back(task); 115 } 116 117 /// Schedules that a CA synchronizes with its repositories. schedule_sync_repo(&self, ca: Handle)118 pub fn schedule_sync_repo(&self, ca: Handle) { 119 self.schedule(QueueTask::SyncRepo { ca }); 120 } 121 122 /// RE-Schedules that a CA synchronizes with its repositories. This function 123 /// takes a time argument to indicate *when* the resynchronization should be 124 /// attempted. reschedule_sync_repo(&self, ca: Handle, due: Time)125 pub fn reschedule_sync_repo(&self, ca: Handle, due: Time) { 126 self.schedule(QueueTask::RescheduleSyncRepo { ca, due }); 127 } 128 schedule_sync_parent(&self, ca: Handle, parent: ParentHandle)129 pub fn schedule_sync_parent(&self, ca: Handle, parent: ParentHandle) { 130 self.schedule(QueueTask::SyncParent { ca, parent }); 131 } 132 reschedule_sync_parent(&self, ca: Handle, parent: ParentHandle, due: Time)133 pub fn reschedule_sync_parent(&self, ca: Handle, parent: ParentHandle, due: Time) { 134 self.schedule(QueueTask::RescheduleSyncParent { ca, parent, due }); 135 } 136 drop_sync_parent(&self, dropping_ca: &Handle, parent_to_drop: &ParentHandle)137 fn drop_sync_parent(&self, dropping_ca: &Handle, parent_to_drop: &ParentHandle) { 138 let mut q = self.q.write().unwrap(); 139 q.retain(|existing| match existing { 140 QueueTask::SyncParent { ca, parent } | QueueTask::RescheduleSyncParent { ca, parent, .. } => { 141 dropping_ca != ca || parent_to_drop != parent 142 } 143 _ => true, 144 }); 145 } 146 } 147 148 /// Implement listening for CertAuth Published events. 149 impl eventsourcing::PostSaveEventListener<CertAuth> for MessageQueue { listen(&self, ca: &CertAuth, events: &[CaEvt])150 fn listen(&self, ca: &CertAuth, events: &[CaEvt]) { 151 for event in events { 152 trace!("Seen CertAuth event '{}'", event); 153 154 let handle = event.handle(); 155 156 match event.details() { 157 CaEvtDet::RoasUpdated { .. } 158 | CaEvtDet::AspaObjectsUpdated { .. } 159 | CaEvtDet::ChildCertificatesUpdated { .. } 160 | CaEvtDet::ChildKeyRevoked { .. } 161 | CaEvtDet::KeyPendingToNew { .. } 162 | CaEvtDet::KeyPendingToActive { .. } 163 | CaEvtDet::KeyRollFinished { .. } => self.schedule_sync_repo(handle.clone()), 164 165 CaEvtDet::KeyRollActivated { 166 resource_class_name, .. 167 } => { 168 if let Ok(parent) = ca.parent_for_rc(resource_class_name) { 169 self.schedule_sync_parent(handle.clone(), parent.clone()); 170 } 171 self.schedule_sync_repo(handle.clone()); 172 } 173 174 CaEvtDet::ParentRemoved { parent } => { 175 self.drop_sync_parent(handle, parent); 176 self.schedule_sync_repo(handle.clone()); 177 } 178 179 CaEvtDet::ResourceClassRemoved { 180 resource_class_name, 181 parent, 182 revoke_requests, 183 } => { 184 self.schedule_sync_repo(handle.clone()); 185 186 let mut revocations_map = HashMap::new(); 187 revocations_map.insert(resource_class_name.clone(), revoke_requests.clone()); 188 189 self.schedule(QueueTask::ResourceClassRemoved { 190 ca: handle.clone(), 191 parent: parent.clone(), 192 revocation_requests: revocations_map, 193 }) 194 } 195 196 CaEvtDet::UnexpectedKeyFound { 197 resource_class_name, 198 revoke_req, 199 } => self.schedule(QueueTask::UnexpectedKey { 200 ca: handle.clone(), 201 rcn: resource_class_name.clone(), 202 revocation_request: revoke_req.clone(), 203 }), 204 205 CaEvtDet::ParentAdded { parent, .. } => { 206 self.schedule_sync_parent(handle.clone(), parent.clone()); 207 } 208 CaEvtDet::RepoUpdated { .. } => { 209 for parent in ca.parents() { 210 self.schedule_sync_parent(handle.clone(), parent.clone()); 211 } 212 } 213 CaEvtDet::CertificateRequested { 214 resource_class_name, .. 215 } => { 216 if let Ok(parent) = ca.parent_for_rc(resource_class_name) { 217 self.schedule_sync_parent(handle.clone(), parent.clone()); 218 } 219 } 220 221 _ => {} 222 } 223 } 224 } 225 } 226