1 //! A simple message queue, responsible for listening for (CA) events,
2 //! making them available for triggered processing, such as publishing
3 //! signed material, or asking a newly added parent for resource
4 //! entitlements.
5 
6 use std::{
7     collections::{HashMap, VecDeque},
8     fmt,
9     sync::RwLock,
10 };
11 
12 use rpki::repository::x509::Time;
13 
14 use crate::{
15     commons::{
16         api::{Handle, ParentHandle, ResourceClassName, RevocationRequest},
17         eventsourcing::{self, Event},
18     },
19     daemon::ca::{CaEvt, CaEvtDet, CertAuth},
20 };
21 
22 //------------ QueueTask ----------------------------------------------------
23 
24 /// This type contains tasks with the details needed for triggered processing.
25 #[derive(Clone, Debug, Eq, PartialEq)]
26 #[allow(clippy::large_enum_variant)]
27 pub enum QueueTask {
28     ServerStarted,
29 
30     SyncRepo {
31         ca: Handle,
32     },
33     RescheduleSyncRepo {
34         ca: Handle,
35         due: Time,
36     },
37 
38     SyncParent {
39         ca: Handle,
40         parent: ParentHandle,
41     },
42     RescheduleSyncParent {
43         ca: Handle,
44         parent: ParentHandle,
45         due: Time,
46     },
47 
48     ResourceClassRemoved {
49         ca: Handle,
50         parent: ParentHandle,
51         revocation_requests: HashMap<ResourceClassName, Vec<RevocationRequest>>,
52     },
53     UnexpectedKey {
54         ca: Handle,
55         rcn: ResourceClassName,
56         revocation_request: RevocationRequest,
57     },
58 }
59 
60 impl fmt::Display for QueueTask {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result61     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
62         match self {
63             QueueTask::ServerStarted => write!(f, "Server just started"),
64             QueueTask::SyncRepo { ca } => write!(f, "synchronize repo for '{}'", ca),
65             QueueTask::RescheduleSyncRepo { ca, due } => write!(
66                 f,
67                 "reschedule failed synchronize repo for '{}' at: {}",
68                 ca,
69                 due.to_rfc3339()
70             ),
71             QueueTask::SyncParent { ca, parent } => write!(f, "synchronize CA '{}' with parent '{}'", ca, parent),
72             QueueTask::RescheduleSyncParent { ca, parent, due } => write!(
73                 f,
74                 "reschedule failed synchronize CA '{}' with parent '{}' for {}",
75                 ca,
76                 parent,
77                 due.to_rfc3339()
78             ),
79             QueueTask::ResourceClassRemoved { ca, .. } => {
80                 write!(f, "resource class removed for '{}' ", ca)
81             }
82             QueueTask::UnexpectedKey { ca, rcn, .. } => {
83                 write!(f, "unexpected key found for '{}' resource class: '{}'", ca, rcn)
84             }
85         }
86     }
87 }
88 
89 #[derive(Debug)]
90 pub struct MessageQueue {
91     q: RwLock<VecDeque<QueueTask>>,
92 }
93 
94 impl Default for MessageQueue {
default() -> Self95     fn default() -> Self {
96         let mut vec = VecDeque::new();
97         vec.push_back(QueueTask::ServerStarted);
98         MessageQueue { q: RwLock::new(vec) }
99     }
100 }
101 
102 impl MessageQueue {
pop_all(&self) -> Vec<QueueTask>103     pub fn pop_all(&self) -> Vec<QueueTask> {
104         let mut res = vec![];
105         let mut q = self.q.write().unwrap();
106         while let Some(evt) = q.pop_front() {
107             res.push(evt);
108         }
109         res
110     }
111 
schedule(&self, task: QueueTask)112     fn schedule(&self, task: QueueTask) {
113         let mut q = self.q.write().unwrap();
114         q.push_back(task);
115     }
116 
117     /// Schedules that a CA synchronizes with its repositories.
schedule_sync_repo(&self, ca: Handle)118     pub fn schedule_sync_repo(&self, ca: Handle) {
119         self.schedule(QueueTask::SyncRepo { ca });
120     }
121 
122     /// RE-Schedules that a CA synchronizes with its repositories. This function
123     /// takes a time argument to indicate *when* the resynchronization should be
124     /// attempted.
reschedule_sync_repo(&self, ca: Handle, due: Time)125     pub fn reschedule_sync_repo(&self, ca: Handle, due: Time) {
126         self.schedule(QueueTask::RescheduleSyncRepo { ca, due });
127     }
128 
schedule_sync_parent(&self, ca: Handle, parent: ParentHandle)129     pub fn schedule_sync_parent(&self, ca: Handle, parent: ParentHandle) {
130         self.schedule(QueueTask::SyncParent { ca, parent });
131     }
132 
reschedule_sync_parent(&self, ca: Handle, parent: ParentHandle, due: Time)133     pub fn reschedule_sync_parent(&self, ca: Handle, parent: ParentHandle, due: Time) {
134         self.schedule(QueueTask::RescheduleSyncParent { ca, parent, due });
135     }
136 
drop_sync_parent(&self, dropping_ca: &Handle, parent_to_drop: &ParentHandle)137     fn drop_sync_parent(&self, dropping_ca: &Handle, parent_to_drop: &ParentHandle) {
138         let mut q = self.q.write().unwrap();
139         q.retain(|existing| match existing {
140             QueueTask::SyncParent { ca, parent } | QueueTask::RescheduleSyncParent { ca, parent, .. } => {
141                 dropping_ca != ca || parent_to_drop != parent
142             }
143             _ => true,
144         });
145     }
146 }
147 
148 /// Implement listening for CertAuth Published events.
149 impl eventsourcing::PostSaveEventListener<CertAuth> for MessageQueue {
listen(&self, ca: &CertAuth, events: &[CaEvt])150     fn listen(&self, ca: &CertAuth, events: &[CaEvt]) {
151         for event in events {
152             trace!("Seen CertAuth event '{}'", event);
153 
154             let handle = event.handle();
155 
156             match event.details() {
157                 CaEvtDet::RoasUpdated { .. }
158                 | CaEvtDet::AspaObjectsUpdated { .. }
159                 | CaEvtDet::ChildCertificatesUpdated { .. }
160                 | CaEvtDet::ChildKeyRevoked { .. }
161                 | CaEvtDet::KeyPendingToNew { .. }
162                 | CaEvtDet::KeyPendingToActive { .. }
163                 | CaEvtDet::KeyRollFinished { .. } => self.schedule_sync_repo(handle.clone()),
164 
165                 CaEvtDet::KeyRollActivated {
166                     resource_class_name, ..
167                 } => {
168                     if let Ok(parent) = ca.parent_for_rc(resource_class_name) {
169                         self.schedule_sync_parent(handle.clone(), parent.clone());
170                     }
171                     self.schedule_sync_repo(handle.clone());
172                 }
173 
174                 CaEvtDet::ParentRemoved { parent } => {
175                     self.drop_sync_parent(handle, parent);
176                     self.schedule_sync_repo(handle.clone());
177                 }
178 
179                 CaEvtDet::ResourceClassRemoved {
180                     resource_class_name,
181                     parent,
182                     revoke_requests,
183                 } => {
184                     self.schedule_sync_repo(handle.clone());
185 
186                     let mut revocations_map = HashMap::new();
187                     revocations_map.insert(resource_class_name.clone(), revoke_requests.clone());
188 
189                     self.schedule(QueueTask::ResourceClassRemoved {
190                         ca: handle.clone(),
191                         parent: parent.clone(),
192                         revocation_requests: revocations_map,
193                     })
194                 }
195 
196                 CaEvtDet::UnexpectedKeyFound {
197                     resource_class_name,
198                     revoke_req,
199                 } => self.schedule(QueueTask::UnexpectedKey {
200                     ca: handle.clone(),
201                     rcn: resource_class_name.clone(),
202                     revocation_request: revoke_req.clone(),
203                 }),
204 
205                 CaEvtDet::ParentAdded { parent, .. } => {
206                     self.schedule_sync_parent(handle.clone(), parent.clone());
207                 }
208                 CaEvtDet::RepoUpdated { .. } => {
209                     for parent in ca.parents() {
210                         self.schedule_sync_parent(handle.clone(), parent.clone());
211                     }
212                 }
213                 CaEvtDet::CertificateRequested {
214                     resource_class_name, ..
215                 } => {
216                     if let Ok(parent) = ca.parent_for_rc(resource_class_name) {
217                         self.schedule_sync_parent(handle.clone(), parent.clone());
218                     }
219                 }
220 
221                 _ => {}
222             }
223         }
224     }
225 }
226