1 use std::{collections::HashMap, ops::Deref, sync::Arc};
2 
3 use api::{Publish, Update, Withdraw};
4 use futures::future::join_all;
5 use tokio::sync::Mutex;
6 
7 use bytes::Bytes;
8 use chrono::Duration;
9 
10 use rpki::{repository::crypto::KeyIdentifier, uri};
11 
12 use crate::{
13     commons::{
14         actor::Actor,
15         api::{
16             self, AddChildRequest, AspaCustomer, AspaDefinitionList, AspaDefinitionUpdates, AspaProvidersUpdate,
17             Base64, CaCommandDetails, CaCommandResult, CertAuthList, CertAuthSummary, ChildCaInfo, ChildHandle,
18             CommandHistory, CommandHistoryCriteria, Entitlements, Handle, IssuanceRequest, IssuanceResponse, ListReply,
19             ParentCaContact, ParentCaReq, ParentHandle, PublishDelta, RcvdCert, RepositoryContact, ResourceClassName,
20             ResourceSet, RevocationRequest, RevocationResponse, RtaName, StoredEffect, UpdateChildRequest,
21         },
22         api::{rrdp::PublishElement, Timestamp},
23         crypto::{IdCert, KrillSigner, ProtocolCms, ProtocolCmsBuilder},
24         error::Error,
25         eventsourcing::{Aggregate, AggregateStore, Command, CommandKey},
26         remote::cmslogger::CmsLogger,
27         remote::{rfc6492, rfc8181, rfc8183},
28         util::httpclient,
29         KrillResult,
30     },
31     constants::{CASERVER_DIR, REQUEUE_DELAY_SECONDS, STATUS_DIR},
32     daemon::{
33         auth::common::permissions::Permission,
34         ca::{
35             self, ta_handle, CaObjectsStore, CaStatus, CertAuth, Cmd, CmdDet, DeprecatedRepository, IniDet,
36             ResourceTaggedAttestation, RouteAuthorizationUpdates, RtaContentRequest, RtaPrepareRequest, StatusStore,
37         },
38         config::Config,
39         mq::MessageQueue,
40     },
41     pubd::RepositoryManager,
42 };
43 
44 //------------ CaLocks ------------------------------------------------------
45 
46 pub struct CaLockMap(HashMap<Handle, tokio::sync::RwLock<()>>);
47 
48 impl CaLockMap {
create_ca_lock(&mut self, ca: &Handle)49     fn create_ca_lock(&mut self, ca: &Handle) {
50         self.0.insert(ca.clone(), tokio::sync::RwLock::new(()));
51     }
52 
has_ca(&self, ca: &Handle) -> bool53     fn has_ca(&self, ca: &Handle) -> bool {
54         self.0.contains_key(ca)
55     }
56 
drop_ca_lock(&mut self, ca: &Handle)57     fn drop_ca_lock(&mut self, ca: &Handle) {
58         self.0.remove(ca);
59     }
60 }
61 
62 impl Default for CaLockMap {
default() -> Self63     fn default() -> Self {
64         CaLockMap(HashMap::new())
65     }
66 }
67 
68 pub struct CaLock<'a> {
69     map: tokio::sync::RwLockReadGuard<'a, CaLockMap>,
70     ca: Handle,
71 }
72 
73 impl CaLock<'_> {
read(&self) -> tokio::sync::RwLockReadGuard<'_, ()>74     async fn read(&self) -> tokio::sync::RwLockReadGuard<'_, ()> {
75         self.map.0.get(&self.ca).unwrap().read().await
76     }
77 
write(&self) -> tokio::sync::RwLockWriteGuard<'_, ()>78     async fn write(&self) -> tokio::sync::RwLockWriteGuard<'_, ()> {
79         self.map.0.get(&self.ca).unwrap().write().await
80     }
81 }
82 
83 pub struct CaLocks {
84     locks: tokio::sync::RwLock<CaLockMap>,
85 }
86 
87 impl Default for CaLocks {
default() -> Self88     fn default() -> Self {
89         CaLocks {
90             locks: tokio::sync::RwLock::new(CaLockMap::default()),
91         }
92     }
93 }
94 
95 impl CaLocks {
ca(&self, ca: &Handle) -> CaLock<'_>96     async fn ca(&self, ca: &Handle) -> CaLock<'_> {
97         // self.create_lock_if_needed(ca).await;
98         {
99             let map = self.locks.read().await;
100             if map.has_ca(ca) {
101                 return CaLock { map, ca: ca.clone() };
102             }
103         }
104 
105         {
106             let mut lock = self.locks.write().await;
107             lock.create_ca_lock(ca);
108         }
109 
110         let map = self.locks.read().await;
111         CaLock { map, ca: ca.clone() }
112     }
113 
drop_ca(&self, ca: &Handle)114     async fn drop_ca(&self, ca: &Handle) {
115         let mut map = self.locks.write().await;
116         map.drop_ca_lock(ca);
117     }
118 }
119 
120 //------------ CaManager -----------------------------------------------------
121 
122 #[derive(Clone)]
123 pub struct CaManager {
124     ca_store: Arc<AggregateStore<CertAuth>>,
125     ca_objects_store: Arc<CaObjectsStore>,
126     status_store: Arc<Mutex<StatusStore>>,
127     locks: Arc<CaLocks>,
128     config: Arc<Config>,
129     signer: Arc<KrillSigner>,
130 }
131 
132 impl CaManager {
133     /// Builds a new CaServer. Will return an error if the CA store cannot be initialized.
build(config: Arc<Config>, mq: Arc<MessageQueue>, signer: Arc<KrillSigner>) -> KrillResult<Self>134     pub async fn build(config: Arc<Config>, mq: Arc<MessageQueue>, signer: Arc<KrillSigner>) -> KrillResult<Self> {
135         // Create the AggregateStore for the event-sourced `CertAuth` structures that handle
136         // most CA functions.
137         let mut ca_store = AggregateStore::<CertAuth>::disk(&config.data_dir, CASERVER_DIR)?;
138 
139         if config.always_recover_data {
140             // If the user chose to 'always recover data' then do so.
141             // This is slow, but it will ensure that all commands and events are accounted for,
142             // and there are no incomplete changes where some but not all files for a change were
143             // written to disk.
144             ca_store.recover()?;
145         } else if let Err(e) = ca_store.warm() {
146             // Otherwise we just tried to 'warm' the cache. This serves two purposes:
147             // 1. this ensures that all `CertAuth` structs are available in memory
148             // 2. this ensures that there are no apparent data issues
149             //
150             // If there are issues, then complain and try to recover.
151             error!(
152                 "Could not warm up cache, data seems corrupt. Will try to recover!! Error was: {}",
153                 e
154             );
155             ca_store.recover()?;
156         }
157 
158         // Create the `CaObjectStore` that is responsible for maintaining CA objects: the `CaObjects`
159         // for a CA gets copies of all ROAs and delegated certificates from the `CertAuth` and is responsible
160         // for manifests and CRL generation.
161         let ca_objects_store = Arc::new(CaObjectsStore::disk(config.clone(), signer.clone())?);
162 
163         // Register the `CaObjectsStore` as a pre-save listener to the 'ca_store' so that it can update
164         // its ROAs and delegated certificates and/or generate manifests and CRLs when relevant changes
165         // occur in a `CertAuth`.
166         ca_store.add_pre_save_listener(ca_objects_store.clone());
167 
168         // Register the `MessageQueue` as a post-save listener to 'ca_store' so that relevant changes in
169         // a `CertAuth` can trigger follow up actions. Most importantly: synchronize with a parent CA or
170         // the RPKI repository.
171         ca_store.add_post_save_listener(mq);
172 
173         // Create the status store which will maintain the last known connection status between each CA
174         // and their parent(s) and repository.
175         let status_store = StatusStore::new(&config.data_dir, STATUS_DIR)?;
176 
177         // Create the per-CA lock structure so that we can guarantee safe access to each CA, while allowing
178         // multiple CAs in a single Krill instance to interact: e.g. a child can talk to its parent and they
179         // are locked individually.
180         let locks = Arc::new(CaLocks::default());
181 
182         Ok(CaManager {
183             ca_store: Arc::new(ca_store),
184             ca_objects_store,
185             status_store: Arc::new(Mutex::new(status_store)),
186             locks,
187             config,
188             signer,
189         })
190     }
191 
testbed_enabled(&self) -> bool192     pub fn testbed_enabled(&self) -> bool {
193         self.config.testbed().is_some()
194     }
195 
196     /// Gets the TrustAnchor, if present. Returns an error if the TA is uninitialized.
get_trust_anchor(&self) -> KrillResult<Arc<CertAuth>>197     pub async fn get_trust_anchor(&self) -> KrillResult<Arc<CertAuth>> {
198         let ta_handle = ca::ta_handle();
199         let lock = self.locks.ca(&ta_handle).await;
200         let _ = lock.read().await;
201         self.ca_store.get_latest(&ta_handle).map_err(Error::AggregateStoreError)
202     }
203 
204     /// Initializes an embedded trust anchor with all resources.
init_ta( &self, ta_aia: uri::Rsync, ta_uris: Vec<uri::Https>, repo_manager: &Arc<RepositoryManager>, actor: &Actor, ) -> KrillResult<()>205     pub async fn init_ta(
206         &self,
207         ta_aia: uri::Rsync,
208         ta_uris: Vec<uri::Https>,
209         repo_manager: &Arc<RepositoryManager>,
210         actor: &Actor,
211     ) -> KrillResult<()> {
212         let ta_handle = ca::ta_handle();
213         let lock = self.locks.ca(&ta_handle).await;
214         let _ = lock.write().await;
215         if self.ca_store.has(&ta_handle)? {
216             Err(Error::TaAlreadyInitialized)
217         } else {
218             // init normal CA
219             let init = IniDet::init(&ta_handle, self.signer.deref())?;
220             self.ca_store.add(init)?;
221 
222             // add to repo
223             let ta = self.get_trust_anchor().await?;
224             let pub_req = ta.publisher_request();
225             repo_manager.create_publisher(pub_req, actor)?;
226             let repository_response = repo_manager.repository_response(&ta_handle)?;
227             let contact = RepositoryContact::new(repository_response);
228 
229             let upd_repo_cmd = CmdDet::update_repo(&ta_handle, contact, self.signer.clone(), actor);
230             self.ca_store.command(upd_repo_cmd)?;
231 
232             // make trust anchor
233             let make_ta_cmd =
234                 CmdDet::make_trust_anchor(&ta_handle, ta_uris, Some(ta_aia.clone()), self.signer.clone(), actor);
235             let ta = self.ca_store.command(make_ta_cmd)?;
236 
237             // receive the self signed cert (now as child of self)
238             let ta_cert = ta.parent(&ta_handle).unwrap().to_ta_cert();
239             let rcvd_cert = RcvdCert::new(ta_cert.clone(), ta_aia, ResourceSet::all_resources());
240 
241             let rcv_cert = CmdDet::upd_received_cert(
242                 &ta_handle,
243                 ResourceClassName::default(),
244                 rcvd_cert,
245                 self.config.clone(),
246                 self.signer.clone(),
247                 actor,
248             );
249             self.ca_store.command(rcv_cert)?;
250 
251             Ok(())
252         }
253     }
254 
255     /// Send a command to a CA
send_command(&self, cmd: Cmd) -> KrillResult<Arc<CertAuth>>256     async fn send_command(&self, cmd: Cmd) -> KrillResult<Arc<CertAuth>> {
257         let lock = self.locks.ca(cmd.handle()).await;
258         let _ = lock.write().await;
259         self.ca_store.command(cmd)
260     }
261 
262     /// Republish the embedded TA and CAs if needed, i.e. if they are close
263     /// to their next update time.
republish_all(&self) -> KrillResult<Vec<Handle>>264     pub async fn republish_all(&self) -> KrillResult<Vec<Handle>> {
265         self.ca_objects_store.reissue_all()
266     }
267 }
268 
269 /// # CA instances and identity
270 ///
271 impl CaManager {
272     /// Initializes a CA without a repo, no parents, no children, no nothing
init_ca(&self, handle: &Handle) -> KrillResult<()>273     pub fn init_ca(&self, handle: &Handle) -> KrillResult<()> {
274         if handle == &ta_handle() || handle.as_str() == "version" {
275             Err(Error::TaNameReserved)
276         } else if self.ca_store.has(handle)? {
277             Err(Error::CaDuplicate(handle.clone()))
278         } else {
279             // Initialize the CA in self.ca_store, but note that there is no need to create
280             // a new CA entry in self.ca_objects_store or self.status_store, because they will
281             // generate empty default entries if needed.
282             let init = IniDet::init(handle, self.signer.deref())?;
283             self.ca_store.add(init)?;
284             Ok(())
285         }
286     }
287 
288     /// Updates the self-signed ID certificate for a CA. Use this with care as
289     /// RFC 8183 only talks about initial ID exchanges in the form of XML files.
290     /// It does not talk about updating identity certificates and keys. Krill supports
291     /// that a new ID key pair and certificate is generated, and has functions to update
292     /// this for a parent, a child, a repo and a publisher, but other implementations may
293     /// not support that identities are updated after initialization.
ca_update_id(&self, handle: Handle, actor: &Actor) -> KrillResult<()>294     pub async fn ca_update_id(&self, handle: Handle, actor: &Actor) -> KrillResult<()> {
295         let cmd = CmdDet::update_id(&handle, self.signer.clone(), actor);
296         self.send_command(cmd).await?;
297         Ok(())
298     }
299 
300     /// Get the CAs that the given actor is permitted to see.
ca_list(&self, actor: &Actor) -> KrillResult<CertAuthList>301     pub fn ca_list(&self, actor: &Actor) -> KrillResult<CertAuthList> {
302         Ok(CertAuthList::new(
303             self.ca_store
304                 .list()?
305                 .into_iter()
306                 .filter(|handle| matches!(actor.is_allowed(Permission::CA_READ, handle.clone()), Ok(true)))
307                 .map(CertAuthSummary::new)
308                 .collect(),
309         ))
310     }
311 
312     /// Gets a CA by the given handle, returns an `Err(ServerError::UnknownCA)` if it
313     /// does not exist.
get_ca(&self, handle: &Handle) -> KrillResult<Arc<CertAuth>>314     pub async fn get_ca(&self, handle: &Handle) -> KrillResult<Arc<CertAuth>> {
315         let lock = self.locks.ca(handle).await;
316         let _ = lock.read().await;
317         self.ca_store
318             .get_latest(handle)
319             .map_err(|_| Error::CaUnknown(handle.clone()))
320     }
321 
322     /// Checks whether a CA by the given handle exists.
has_ca(&self, handle: &Handle) -> KrillResult<bool>323     pub fn has_ca(&self, handle: &Handle) -> KrillResult<bool> {
324         self.ca_store.has(handle).map_err(Error::AggregateStoreError)
325     }
326 
327     /// Gets current CA status
get_ca_status(&self, ca: &Handle) -> KrillResult<Arc<CaStatus>>328     pub async fn get_ca_status(&self, ca: &Handle) -> KrillResult<Arc<CaStatus>> {
329         if self.has_ca(ca)? {
330             self.status_store.lock().await.get_ca_status(ca).await
331         } else {
332             Err(Error::CaUnknown(ca.clone()))
333         }
334     }
335 
336     /// Delete a CA. Let it do best effort revocation requests and withdraw
337     /// all its objects first. Note that any children of this CA will be left
338     /// orphaned, and they will only learn of this sad fact when they choose
339     /// to call home.
delete_ca(&self, ca_handle: &Handle, actor: &Actor) -> KrillResult<()>340     pub async fn delete_ca(&self, ca_handle: &Handle, actor: &Actor) -> KrillResult<()> {
341         warn!("Deleting CA '{}' as requested by: {}", ca_handle, actor);
342 
343         let ca = self.get_ca(ca_handle).await?;
344 
345         // Request revocations from all parents - best effort
346         info!(
347             "Will try to request revocations from all parents CA '{}' before removing it.",
348             ca_handle
349         );
350         for parent in ca.parents() {
351             if let Err(e) = self.ca_parent_revoke(ca_handle, parent).await {
352                 warn!(
353                     "Removing CA '{}', but could not send revoke requests to parent '{}': {}",
354                     ca_handle, parent, e
355                 );
356             }
357         }
358 
359         // Clean all repos - again best effort
360         info!(
361             "Will try to clean up all repositories for CA '{}' before removing it.",
362             ca_handle
363         );
364         let mut repos: Vec<RepositoryContact> = self
365             .ca_repo_elements(ca_handle)
366             .await?
367             .into_iter()
368             .map(|(contact, _)| contact)
369             .collect();
370 
371         for deprecated in self.ca_deprecated_repos(ca_handle)? {
372             repos.push(deprecated.into());
373         }
374 
375         for repo_contact in repos {
376             if self.ca_repo_sync(ca_handle, &repo_contact, vec![]).await.is_err() {
377                 info!(
378                     "Could not clean up deprecated repository. This is fine - objects there are no longer referenced."
379                 );
380             }
381         }
382 
383         self.ca_store.drop_aggregate(ca_handle)?;
384         self.status_store.lock().await.remove_ca(ca_handle).await?;
385 
386         self.locks.drop_ca(ca_handle).await;
387 
388         Ok(())
389     }
390 
391     /// Re-synchronize the CAs and CaStatus
392     ///
393     /// - remove any surplus CA status entries
394     /// - create missing CA status entries
395     /// - check children for existing CAs:
396     ///    - remove surplus from status
397     ///    - add missing
resync_ca_statuses(&self) -> KrillResult<()>398     pub async fn resync_ca_statuses(&self) -> KrillResult<()> {
399         let cas = self.ca_store.list()?;
400 
401         let mut ca_statuses = self.status_store.lock().await.cas().await?;
402 
403         // loop over existing CAs and get their status
404         for ca_handle in cas {
405             let ca = self.get_ca(&ca_handle).await?;
406             let status = match ca_statuses.remove(&ca_handle) {
407                 Some(status) => status,
408                 None => {
409                     // Getting a missing status will ensure that a new empty status is generated.
410                     self.status_store.lock().await.get_ca_status(&ca_handle).await?
411                 }
412             };
413 
414             let mut status_children = status.children().clone();
415 
416             // add default status for missing children
417             for child in ca.children() {
418                 if status_children.remove(child).is_none() {
419                     self.status_store
420                         .lock()
421                         .await
422                         .set_child_default_if_missing(&ca_handle, child)
423                         .await?;
424                 }
425             }
426 
427             // remove surplus children status
428             for surplus_child in status_children.keys() {
429                 self.status_store
430                     .lock()
431                     .await
432                     .remove_child(&ca_handle, surplus_child)
433                     .await?;
434             }
435         }
436 
437         // remove the status for any left-over CAs with status
438         for surplus_ca in ca_statuses.keys() {
439             info!("Removing the cached status for a removed CA: {}", surplus_ca);
440             self.status_store.lock().await.remove_ca(surplus_ca).await?;
441         }
442 
443         Ok(())
444     }
445 }
446 
447 /// # CA History
448 ///
449 impl CaManager {
450     /// Gets the history for a CA.
ca_history(&self, handle: &Handle, crit: CommandHistoryCriteria) -> KrillResult<CommandHistory>451     pub async fn ca_history(&self, handle: &Handle, crit: CommandHistoryCriteria) -> KrillResult<CommandHistory> {
452         let ca_lock = self.locks.ca(handle).await;
453         let _lock = ca_lock.read().await;
454         Ok(self.ca_store.command_history(handle, crit)?)
455     }
456 
457     /// Shows the details for a CA command.
ca_command_details(&self, handle: &Handle, command: CommandKey) -> KrillResult<CaCommandDetails>458     pub fn ca_command_details(&self, handle: &Handle, command: CommandKey) -> KrillResult<CaCommandDetails> {
459         let command = self.ca_store.get_command(handle, &command)?;
460 
461         let effect = command.effect().clone();
462         match effect {
463             StoredEffect::Error { msg } => Ok(CaCommandDetails::new(command, CaCommandResult::error(msg))),
464             StoredEffect::Success { events } => {
465                 let mut stored_events = vec![];
466                 for version in events {
467                     let evt = self.ca_store.get_event(handle, version)?.ok_or_else(|| {
468                         Error::Custom(format!("Cannot find evt: {} in history for CA: {}", version, handle))
469                     })?;
470                     stored_events.push(evt);
471                 }
472 
473                 Ok(CaCommandDetails::new(command, CaCommandResult::events(stored_events)))
474             }
475         }
476     }
477 }
478 
479 /// # CAs as parents
480 ///
481 impl CaManager {
482     /// Adds a child under a CA. The 'service_uri' is used here so that
483     /// the appropriate `ParentCaContact` can be returned. If the `AddChildRequest`
484     /// contains resources not held by this CA, then an `Error::CaChildExtraResources`
485     /// is returned.
ca_add_child( &self, ca: &Handle, req: AddChildRequest, service_uri: &uri::Https, actor: &Actor, ) -> KrillResult<ParentCaContact>486     pub async fn ca_add_child(
487         &self,
488         ca: &Handle,
489         req: AddChildRequest,
490         service_uri: &uri::Https,
491         actor: &Actor,
492     ) -> KrillResult<ParentCaContact> {
493         info!("CA '{}' process add child request: {}", &ca, &req);
494         let (child_handle, child_res, id_cert) = req.unpack();
495 
496         let add_child = CmdDet::child_add(ca, child_handle.clone(), id_cert, child_res, actor);
497         self.send_command(add_child).await?;
498 
499         self.ca_parent_contact(ca, child_handle, service_uri).await
500     }
501 
502     /// Show details for a child under the CA.
ca_show_child(&self, ca: &Handle, child: &ChildHandle) -> KrillResult<ChildCaInfo>503     pub async fn ca_show_child(&self, ca: &Handle, child: &ChildHandle) -> KrillResult<ChildCaInfo> {
504         trace!("Finding details for CA: {} under parent: {}", child, ca);
505         let ca = self.get_ca(ca).await?;
506         ca.get_child(child).map(|details| details.clone().into())
507     }
508 
509     /// Show a contact for a child.
ca_parent_contact( &self, ca_handle: &Handle, child_handle: ChildHandle, service_uri: &uri::Https, ) -> KrillResult<ParentCaContact>510     pub async fn ca_parent_contact(
511         &self,
512         ca_handle: &Handle,
513         child_handle: ChildHandle,
514         service_uri: &uri::Https,
515     ) -> KrillResult<ParentCaContact> {
516         let response = self.ca_parent_response(ca_handle, child_handle, service_uri).await?;
517         Ok(ParentCaContact::for_rfc6492(response))
518     }
519 
520     /// Gets an RFC8183 Parent Response for the child.
ca_parent_response( &self, ca: &Handle, child_handle: ChildHandle, service_uri: &uri::Https, ) -> KrillResult<rfc8183::ParentResponse>521     pub async fn ca_parent_response(
522         &self,
523         ca: &Handle,
524         child_handle: ChildHandle,
525         service_uri: &uri::Https,
526     ) -> KrillResult<rfc8183::ParentResponse> {
527         let ca = self.get_ca(ca).await?;
528         let service_uri = format!("{}rfc6492/{}", service_uri.to_string(), ca.handle());
529         let service_uri = uri::Https::from_string(service_uri).unwrap();
530         let service_uri = rfc8183::ServiceUri::Https(service_uri);
531 
532         Ok(rfc8183::ParentResponse::new(
533             None,
534             ca.id_cert().clone(),
535             ca.handle().clone(),
536             child_handle,
537             service_uri,
538         ))
539     }
540 
541     /// Update a child under this CA. The submitted `UpdateChildRequest` can contain a
542     /// new `IdCert`, or `ResourceSet`, or both. When resources are updated, the existing
543     /// resource entitlements are replaced by the new value - i.e. this is not a delta
544     /// and it affects all Internet Number Resource (INR) types (IPv4, IPV6, ASN). Setting
545     /// resource entitlements beyond the resources held by the parent CA will return
546     /// an `Error::CaChildExtraResources`.
ca_child_update( &self, ca: &Handle, child: ChildHandle, req: UpdateChildRequest, actor: &Actor, ) -> KrillResult<()>547     pub async fn ca_child_update(
548         &self,
549         ca: &Handle,
550         child: ChildHandle,
551         req: UpdateChildRequest,
552         actor: &Actor,
553     ) -> KrillResult<()> {
554         let (id_opt, resources_opt, suspend_opt) = req.unpack();
555 
556         if let Some(id) = id_opt {
557             self.send_command(CmdDet::child_update_id(ca, child.clone(), id, actor))
558                 .await?;
559         }
560         if let Some(resources) = resources_opt {
561             self.send_command(CmdDet::child_update_resources(ca, child.clone(), resources, actor))
562                 .await?;
563         }
564         if let Some(suspend) = suspend_opt {
565             if suspend {
566                 self.send_command(CmdDet::child_suspend_inactive(ca, child, actor))
567                     .await?;
568             } else {
569                 self.send_command(CmdDet::child_unsuspend(ca, child, actor)).await?;
570             }
571         }
572         Ok(())
573     }
574 
575     /// Removes a child from this CA. This will also ensure that certificates issued to the child
576     /// are revoked and withdrawn.
ca_child_remove(&self, ca: &Handle, child: ChildHandle, actor: &Actor) -> KrillResult<()>577     pub async fn ca_child_remove(&self, ca: &Handle, child: ChildHandle, actor: &Actor) -> KrillResult<()> {
578         self.status_store.lock().await.remove_child(ca, &child).await?;
579         self.send_command(CmdDet::child_remove(ca, child, actor)).await?;
580 
581         Ok(())
582     }
583 
584     /// Processes an RFC 6492 request sent to this CA:
585     /// - parses the message bytes
586     /// - validates the request
587     /// - processes the child request
588     /// - signs a response and returns the bytes
rfc6492( &self, ca_handle: &Handle, msg_bytes: Bytes, user_agent: Option<String>, actor: &Actor, ) -> KrillResult<Bytes>589     pub async fn rfc6492(
590         &self,
591         ca_handle: &Handle,
592         msg_bytes: Bytes,
593         user_agent: Option<String>,
594         actor: &Actor,
595     ) -> KrillResult<Bytes> {
596         let ca = self.get_ca(ca_handle).await?;
597 
598         let msg = match ProtocolCms::decode(msg_bytes.as_ref(), false) {
599             Ok(msg) => msg,
600             Err(e) => {
601                 let msg = format!(
602                     "Could not decode RFC6492 message for: {}, msg: {}, err: {}",
603                     ca_handle,
604                     Base64::from_content(msg_bytes.as_ref()),
605                     e
606                 );
607                 return Err(Error::custom(msg));
608             }
609         };
610 
611         let content = ca.verify_rfc6492(msg)?;
612 
613         let (child_handle, recipient, content) = content.unpack();
614 
615         // If the child was suspended, because it was inactive, then we can now conclude
616         // that it's become active again. So unsuspend it first, before processing the request
617         // further.
618         let child_ca = ca.get_child(&child_handle)?;
619         if child_ca.is_suspended() {
620             info!(
621                 "Child '{}' under CA '{}' became active again, will unsuspend it.",
622                 child_handle, ca_handle
623             );
624             let req = UpdateChildRequest::unsuspend();
625             self.ca_child_update(ca_handle, child_handle.clone(), req, actor)
626                 .await?;
627         }
628 
629         let cms_logger = CmsLogger::for_rfc6492_rcvd(self.config.rfc6492_log_dir.as_ref(), &recipient, &child_handle);
630 
631         let (res, should_log_cms) = match content {
632             rfc6492::Content::Qry(rfc6492::Qry::Revoke(req)) => {
633                 let res = self.revoke(ca_handle, child_handle.clone(), req, actor).await?;
634                 let msg = rfc6492::Message::revoke_response(child_handle.clone(), recipient, res);
635                 (self.wrap_rfc6492_response(ca_handle, msg).await, true)
636             }
637             rfc6492::Content::Qry(rfc6492::Qry::List) => {
638                 let entitlements = self.list(ca_handle, &child_handle).await?;
639                 let msg = rfc6492::Message::list_response(child_handle.clone(), recipient, entitlements);
640                 (self.wrap_rfc6492_response(ca_handle, msg).await, false)
641             }
642             rfc6492::Content::Qry(rfc6492::Qry::Issue(req)) => {
643                 let res = self.issue(ca_handle, &child_handle, req, actor).await?;
644                 let msg = rfc6492::Message::issue_response(child_handle.clone(), recipient, res);
645                 (self.wrap_rfc6492_response(ca_handle, msg).await, true)
646             }
647             _ => (Err(Error::custom("Unsupported RFC6492 message")), true),
648         };
649 
650         // Log CMS messages if needed, and if enabled by config (this is a no-op if it isn't)
651         match &res {
652             Ok(reply_bytes) => {
653                 if should_log_cms {
654                     cms_logger.received(&msg_bytes)?;
655                     cms_logger.reply(reply_bytes)?;
656                 }
657             }
658             Err(e) => {
659                 cms_logger.received(&msg_bytes)?;
660                 cms_logger.err(e)?;
661             }
662         }
663 
664         // Set child status
665         match &res {
666             Ok(_) => {
667                 self.status_store
668                     .lock()
669                     .await
670                     .set_child_success(ca.handle(), &child_handle, user_agent)
671                     .await?;
672             }
673             Err(e) => {
674                 self.status_store
675                     .lock()
676                     .await
677                     .set_child_failure(ca.handle(), &child_handle, user_agent, e)
678                     .await?;
679             }
680         }
681 
682         res
683     }
684 
wrap_rfc6492_response(&self, handle: &Handle, msg: rfc6492::Message) -> KrillResult<Bytes>685     async fn wrap_rfc6492_response(&self, handle: &Handle, msg: rfc6492::Message) -> KrillResult<Bytes> {
686         trace!("RFC6492 Response wrapping for {}", handle);
687         self.get_ca(handle)
688             .await?
689             .sign_rfc6492_response(msg, self.signer.deref())
690     }
691 
692     /// List the entitlements for a child: 3.3.2 of RFC 6492.
list(&self, ca: &Handle, child: &Handle) -> KrillResult<Entitlements>693     async fn list(&self, ca: &Handle, child: &Handle) -> KrillResult<Entitlements> {
694         let ca = self.get_ca(ca).await?;
695         Ok(ca.list(child, &self.config.issuance_timing)?)
696     }
697 
698     /// Issue a Certificate in response to an RFC 6492 Certificate Issuance request sent by a child.
699     ///
700     /// See: https://tools.ietf.org/html/rfc6492#section3.4.1-2
issue( &self, ca: &Handle, child: &ChildHandle, issue_req: IssuanceRequest, actor: &Actor, ) -> KrillResult<IssuanceResponse>701     async fn issue(
702         &self,
703         ca: &Handle,
704         child: &ChildHandle,
705         issue_req: IssuanceRequest,
706         actor: &Actor,
707     ) -> KrillResult<IssuanceResponse> {
708         let class_name = issue_req.class_name();
709         let pub_key = issue_req.csr().public_key();
710 
711         let cmd = CmdDet::child_certify(
712             ca,
713             child.clone(),
714             issue_req.clone(),
715             self.config.clone(),
716             self.signer.clone(),
717             actor,
718         );
719 
720         let ca = self.send_command(cmd).await?;
721 
722         // The updated CA will now include the newly issued certificate.
723         let response = ca.issuance_response(child, class_name, pub_key, &self.config.issuance_timing)?;
724 
725         Ok(response)
726     }
727 
728     /// Process an RFC 6492  revocation request sent by a child.
729     /// See: https://tools.ietf.org/html/rfc6492#section3.5.1-2
revoke( &self, ca_handle: &Handle, child: ChildHandle, revoke_request: RevocationRequest, actor: &Actor, ) -> KrillResult<RevocationResponse>730     async fn revoke(
731         &self,
732         ca_handle: &Handle,
733         child: ChildHandle,
734         revoke_request: RevocationRequest,
735         actor: &Actor,
736     ) -> KrillResult<RevocationResponse> {
737         let res = (&revoke_request).into(); // response provided that no errors are returned earlier
738 
739         let cmd = CmdDet::child_revoke_key(ca_handle, child, revoke_request, actor);
740         self.send_command(cmd).await?;
741 
742         Ok(res)
743     }
744 }
745 
746 /// # CAs as children
747 ///
748 impl CaManager {
749     /// Adds a new parent, or updates an existing parent of a CA. Adding a parent will trigger that the
750     /// CA connects to this new parent in order to learn its resource entitlements and set up the resource
751     /// class(es) under it, and request certificate(s).
ca_parent_add_or_update( &self, handle: Handle, parent_req: ParentCaReq, actor: &Actor, ) -> KrillResult<()>752     pub async fn ca_parent_add_or_update(
753         &self,
754         handle: Handle,
755         parent_req: ParentCaReq,
756         actor: &Actor,
757     ) -> KrillResult<()> {
758         let ca = self.get_ca(&handle).await?;
759 
760         let (parent, contact) = parent_req.unpack();
761 
762         let cmd = if !ca.parent_known(&parent) {
763             CmdDet::add_parent(&handle, parent, contact, actor)
764         } else {
765             CmdDet::update_parent(&handle, parent, contact, actor)
766         };
767 
768         self.send_command(cmd).await?;
769         Ok(())
770     }
771 
772     /// Removes a parent from a CA, this will trigger that best effort revocations of existing
773     /// keys under this parent are requested. Any resource classes under the parent will be removed
774     /// and all relevant content will be withdrawn from the repository.
ca_parent_remove(&self, handle: Handle, parent: ParentHandle, actor: &Actor) -> KrillResult<()>775     pub async fn ca_parent_remove(&self, handle: Handle, parent: ParentHandle, actor: &Actor) -> KrillResult<()> {
776         // best effort, request revocations for any remaining keys under this parent.
777         if let Err(e) = self.ca_parent_revoke(&handle, &parent).await {
778             warn!(
779                 "Removing parent '{}' from CA '{}', but could not send revoke requests: {}",
780                 parent, handle, e
781             );
782         }
783 
784         self.status_store.lock().await.remove_parent(&handle, &parent).await?;
785 
786         let upd = CmdDet::remove_parent(&handle, parent, actor);
787         self.send_command(upd).await?;
788         Ok(())
789     }
790 
791     /// Send revocation requests for a parent of a CA when the parent is removed.
ca_parent_revoke(&self, handle: &Handle, parent: &ParentHandle) -> KrillResult<()>792     pub async fn ca_parent_revoke(&self, handle: &Handle, parent: &ParentHandle) -> KrillResult<()> {
793         let ca = self.get_ca(handle).await?;
794         let revoke_requests = ca.revoke_under_parent(parent, &self.signer)?;
795         self.send_revoke_requests(handle, parent, revoke_requests).await?;
796         Ok(())
797     }
798 
799     /// Refresh all CAs:
800     /// - process all CAs in parallel
801     /// - process all parents for CAs in parallel
802     ///    - send pending requests if present, or
803     ///    - ask parent for updates and process if present
804     ///
805     /// Note: this function can be called manually through the API, but is normally
806     ///       triggered in the background, every 10 mins by default, or as configured
807     ///       by 'ca_refresh' in the configuration.
cas_refresh_all(&self, started: Timestamp, actor: &Actor)808     pub async fn cas_refresh_all(&self, started: Timestamp, actor: &Actor) {
809         if let Ok(cas) = self.ca_store.list() {
810             let mut updates = vec![];
811 
812             for ca_handle in cas {
813                 updates.push(self.cas_refresh_single(ca_handle, started, actor));
814             }
815 
816             join_all(updates).await;
817         }
818     }
819 
820     /// Refresh a single CA with its parents, and possibly suspend inactive children.
cas_refresh_single(&self, ca_handle: Handle, started: Timestamp, actor: &Actor)821     pub async fn cas_refresh_single(&self, ca_handle: Handle, started: Timestamp, actor: &Actor) {
822         self.ca_sync_parents(&ca_handle, actor).await;
823         self.ca_suspend_inactive_children(&ca_handle, started, actor).await;
824     }
825 
826     /// Suspend child CAs
ca_suspend_inactive_children(&self, ca_handle: &Handle, started: Timestamp, actor: &Actor)827     async fn ca_suspend_inactive_children(&self, ca_handle: &Handle, started: Timestamp, actor: &Actor) {
828         // Set threshold hours if it was configured AND this server has been started
829         // longer ago than the hours specified. Otherwise we risk that *all* children
830         // without prior recorded status are suspended on upgrade, or that *all* children
831         // are suspended if the server had been down for more than the threshold hours.
832         let threshold_seconds = self
833             .config
834             .suspend_child_after_inactive_seconds()
835             .filter(|secs| started < Timestamp::now_minus_seconds(*secs));
836 
837         // suspend inactive children, if so configured
838         if let Some(threshold_seconds) = threshold_seconds {
839             if let Ok(ca_status) = self.get_ca_status(ca_handle).await {
840                 let connections = ca_status.get_children_connection_stats();
841 
842                 for child in connections.suspension_candidates(threshold_seconds) {
843                     let threshold_string = if threshold_seconds >= 3600 {
844                         format!("{} hours", threshold_seconds / 3600)
845                     } else {
846                         format!("{} seconds", threshold_seconds)
847                     };
848 
849                     info!(
850                         "Child '{}' under CA '{}' was inactive for more than {}. Will suspend it.",
851                         child, ca_handle, threshold_string
852                     );
853                     if let Err(e) = self
854                         .status_store
855                         .lock()
856                         .await
857                         .set_child_suspended(ca_handle, &child)
858                         .await
859                     {
860                         panic!("System level error encountered while updating ca status: {}", e);
861                     }
862 
863                     let req = UpdateChildRequest::suspend();
864                     if let Err(e) = self.ca_child_update(ca_handle, child, req, actor).await {
865                         error!("Could not suspend inactive child, error: {}", e);
866                     }
867                 }
868             }
869         }
870     }
871 
872     /// Synchronizes a CA with its parents - up to the configures batch size.
873     /// Remaining parents will be done in a future run.
ca_sync_parents(&self, ca_handle: &Handle, actor: &Actor)874     async fn ca_sync_parents(&self, ca_handle: &Handle, actor: &Actor) {
875         let mut updates = vec![];
876 
877         if let Ok(ca) = self.get_ca(ca_handle).await {
878             // get updates from parents
879             {
880                 if ca.nr_parents() <= self.config.ca_refresh_parents_batch_size {
881                     // Nr of parents is below batch size, so just process all of them
882                     for parent in ca.parents() {
883                         updates.push(self.ca_sync_parent_infallible(ca_handle.clone(), parent.clone(), actor.clone()));
884                     }
885                 } else {
886                     // more parents than the batch size exist, so get candidates based on
887                     // the known parent statuses for this CA.
888                     match self.status_store.lock().await.get_ca_status(ca_handle).await {
889                         Err(e) => {
890                             panic!("System level error encountered while updating ca status: {}", e);
891                         }
892                         Ok(status) => {
893                             for parent in status
894                                 .parents()
895                                 .sync_candidates(ca.parents().collect(), self.config.ca_refresh_parents_batch_size)
896                             {
897                                 updates.push(self.ca_sync_parent_infallible(
898                                     ca_handle.clone(),
899                                     parent.clone(),
900                                     actor.clone(),
901                                 ));
902                             }
903                         }
904                     };
905                 }
906             }
907             join_all(updates).await;
908         }
909     }
910 
911     /// Synchronizes a CA with a parent, logging failures.
ca_sync_parent_infallible(&self, ca: Handle, parent: ParentHandle, actor: Actor)912     async fn ca_sync_parent_infallible(&self, ca: Handle, parent: ParentHandle, actor: Actor) {
913         if let Err(e) = self.ca_sync_parent(&ca, &parent, &actor).await {
914             error!(
915                 "Failed to synchronize CA '{}' with parent '{}'. Error was: {}",
916                 ca, parent, e
917             );
918         }
919     }
920 
921     /// Synchronizes a CA with one of its parents:
922     ///   - send pending requests if present; otherwise
923     ///   - get and process updated entitlements
924     ///
925     /// Note: if new request events are generated as a result of processing updated entitlements
926     ///       then they will trigger that this synchronization is called again so that the pending
927     ///       requests can be sent.
ca_sync_parent(&self, handle: &Handle, parent: &ParentHandle, actor: &Actor) -> KrillResult<()>928     pub async fn ca_sync_parent(&self, handle: &Handle, parent: &ParentHandle, actor: &Actor) -> KrillResult<()> {
929         let ca = self.get_ca(handle).await?;
930 
931         if ca.has_pending_requests(parent) {
932             self.send_requests(handle, parent, actor).await
933         } else {
934             self.get_updates_from_parent(handle, parent, actor).await
935         }
936     }
937 
938     /// Try to get updates from a specific parent of a CA.
get_updates_from_parent(&self, handle: &Handle, parent: &ParentHandle, actor: &Actor) -> KrillResult<()>939     async fn get_updates_from_parent(&self, handle: &Handle, parent: &ParentHandle, actor: &Actor) -> KrillResult<()> {
940         if handle != &ta_handle() {
941             let ca = self.get_ca(handle).await?;
942 
943             if ca.repository_contact().is_ok() {
944                 let ca = self.get_ca(handle).await?;
945                 let parent_contact = ca.parent(parent)?;
946                 let entitlements = self
947                     .get_entitlements_from_contact(handle, parent, parent_contact, true)
948                     .await?;
949 
950                 self.update_entitlements(handle, parent.clone(), entitlements, actor)
951                     .await?;
952             }
953         }
954         Ok(())
955     }
956 
957     /// Sends requests to a specific parent for the CA matching handle.
send_requests(&self, handle: &Handle, parent: &ParentHandle, actor: &Actor) -> KrillResult<()>958     async fn send_requests(&self, handle: &Handle, parent: &ParentHandle, actor: &Actor) -> KrillResult<()> {
959         self.send_revoke_requests_handle_responses(handle, parent, actor)
960             .await?;
961         self.send_cert_requests_handle_responses(handle, parent, actor).await
962     }
963 
send_revoke_requests_handle_responses( &self, handle: &Handle, parent: &ParentHandle, actor: &Actor, ) -> KrillResult<()>964     async fn send_revoke_requests_handle_responses(
965         &self,
966         handle: &Handle,
967         parent: &ParentHandle,
968         actor: &Actor,
969     ) -> KrillResult<()> {
970         let child = self.get_ca(handle).await?;
971         let requests = child.revoke_requests(parent);
972 
973         let revoke_responses = self.send_revoke_requests(handle, parent, requests).await?;
974 
975         for (rcn, revoke_responses) in revoke_responses.into_iter() {
976             for response in revoke_responses.into_iter() {
977                 let cmd = CmdDet::key_roll_finish(handle, rcn.clone(), response, actor);
978                 self.send_command(cmd).await?;
979             }
980         }
981 
982         Ok(())
983     }
984 
send_revoke_requests( &self, handle: &Handle, parent: &ParentHandle, revoke_requests: HashMap<ResourceClassName, Vec<RevocationRequest>>, ) -> KrillResult<HashMap<ResourceClassName, Vec<RevocationResponse>>>985     pub async fn send_revoke_requests(
986         &self,
987         handle: &Handle,
988         parent: &ParentHandle,
989         revoke_requests: HashMap<ResourceClassName, Vec<RevocationRequest>>,
990     ) -> KrillResult<HashMap<ResourceClassName, Vec<RevocationResponse>>> {
991         let child = self.get_ca(handle).await?;
992         match child.parent(parent)? {
993             ParentCaContact::Ta(_) => Err(Error::TaNotAllowed),
994 
995             ParentCaContact::Rfc6492(parent_res) => {
996                 let parent_uri = parent_res.service_uri();
997 
998                 let next_run_seconds = self.config.ca_refresh_seconds as i64;
999 
1000                 match self
1001                     .send_revoke_requests_rfc6492(revoke_requests, &child.id_key(), parent_res)
1002                     .await
1003                 {
1004                     Err(e) => {
1005                         self.status_store
1006                             .lock()
1007                             .await
1008                             .set_parent_failure(handle, parent, parent_uri, &e, next_run_seconds)
1009                             .await?;
1010                         Err(e)
1011                     }
1012                     Ok(res) => {
1013                         self.status_store
1014                             .lock()
1015                             .await
1016                             .set_parent_last_updated(handle, parent, parent_uri, next_run_seconds)
1017                             .await?;
1018                         Ok(res)
1019                     }
1020                 }
1021             }
1022         }
1023     }
1024 
send_revoke_unexpected_key( &self, handle: &Handle, rcn: ResourceClassName, revocation: RevocationRequest, ) -> KrillResult<HashMap<ResourceClassName, Vec<RevocationResponse>>>1025     pub async fn send_revoke_unexpected_key(
1026         &self,
1027         handle: &Handle,
1028         rcn: ResourceClassName,
1029         revocation: RevocationRequest,
1030     ) -> KrillResult<HashMap<ResourceClassName, Vec<RevocationResponse>>> {
1031         let child = self.ca_store.get_latest(handle)?;
1032         let parent = child.parent_for_rc(&rcn)?;
1033         let mut requests = HashMap::new();
1034         requests.insert(rcn, vec![revocation]);
1035 
1036         self.send_revoke_requests(handle, parent, requests).await
1037     }
1038 
send_revoke_requests_rfc6492( &self, revoke_requests: HashMap<ResourceClassName, Vec<RevocationRequest>>, signing_key: &KeyIdentifier, parent_res: &rfc8183::ParentResponse, ) -> KrillResult<HashMap<ResourceClassName, Vec<RevocationResponse>>>1039     async fn send_revoke_requests_rfc6492(
1040         &self,
1041         revoke_requests: HashMap<ResourceClassName, Vec<RevocationRequest>>,
1042         signing_key: &KeyIdentifier,
1043         parent_res: &rfc8183::ParentResponse,
1044     ) -> KrillResult<HashMap<ResourceClassName, Vec<RevocationResponse>>> {
1045         let mut revoke_map = HashMap::new();
1046 
1047         for (rcn, revoke_requests) in revoke_requests.into_iter() {
1048             let mut revocations = vec![];
1049             for req in revoke_requests.into_iter() {
1050                 let sender = parent_res.child_handle().clone();
1051                 let recipient = parent_res.parent_handle().clone();
1052                 let cms_logger = CmsLogger::for_rfc6492_sent(self.config.rfc6492_log_dir.as_ref(), &sender, &recipient);
1053 
1054                 let revoke = rfc6492::Message::revoke(sender, recipient, req.clone());
1055 
1056                 let response = self
1057                     .send_rfc6492_and_validate_response(signing_key, parent_res, revoke.into_bytes(), Some(&cms_logger))
1058                     .await?;
1059 
1060                 match response {
1061                     rfc6492::Res::Revoke(revoke_response) => revocations.push(revoke_response),
1062                     rfc6492::Res::NotPerformed(e) => {
1063                         // If we get one of the following responses:
1064                         //    1301         revoke - no such resource class
1065                         //    1302         revoke - no such key
1066                         //
1067                         // Then we can consider this revocation redundant from the parent side, so just add it
1068                         // as revoked to this CA and move on. While this may be unexpected this is unlikely to
1069                         // be a problem. If we would keep insisting that the parent revokes a key they already
1070                         // revoked, then we can end up in a stuck loop.
1071                         //
1072                         // More importantly we should re-sync things if we get 12** errors to certificate sign
1073                         // requests, but that is done in another function.
1074                         if e.status() == 1301 || e.status() == 1302 {
1075                             let revoke_response = (&req).into();
1076                             revocations.push(revoke_response)
1077                         } else {
1078                             return Err(Error::Rfc6492NotPerformed(e));
1079                         }
1080                     }
1081                     rfc6492::Res::List(_) => return Err(Error::custom("Got a List response to revoke request??")),
1082                     rfc6492::Res::Issue(_) => return Err(Error::custom("Issue response to revoke request??")),
1083                 }
1084             }
1085 
1086             revoke_map.insert(rcn, revocations);
1087         }
1088 
1089         Ok(revoke_map)
1090     }
1091 
send_cert_requests_handle_responses( &self, handle: &Handle, parent: &ParentHandle, actor: &Actor, ) -> KrillResult<()>1092     async fn send_cert_requests_handle_responses(
1093         &self,
1094         handle: &Handle,
1095         parent: &ParentHandle,
1096         actor: &Actor,
1097     ) -> KrillResult<()> {
1098         let child = self.get_ca(handle).await?;
1099         let requests = child.cert_requests(parent);
1100         let signing_key = child.id_key();
1101         let parent_res = child.parent(parent)?.parent_response().ok_or(Error::TaNotAllowed)?;
1102 
1103         let sender = parent_res.child_handle();
1104         let recipient = parent_res.parent_handle();
1105         let cms_logger = Some(CmsLogger::for_rfc6492_sent(
1106             self.config.rfc6492_log_dir.as_ref(),
1107             sender,
1108             recipient,
1109         ));
1110 
1111         // We may need to do work for multiple resource class and there may therefore be
1112         // multiple errors. We want to keep track of those, rather than bailing out on the
1113         // first error, because an issue in one resource class does not necessarily mean
1114         // that there should be an issue in the the others.
1115         //
1116         // Of course for most CAs there will only be one resource class under a parent,
1117         // but we need to be prepared to deal with N classes.
1118         let mut errors = vec![];
1119 
1120         for (rcn, requests) in requests.into_iter() {
1121             // We could have multiple requests in a single resource class (multiple keys during rollover)
1122             for req in requests {
1123                 let msg = rfc6492::Message::issue(sender.clone(), recipient.clone(), req).into_bytes();
1124 
1125                 match self
1126                     .send_rfc6492_and_validate_response(&signing_key, parent_res, msg, cms_logger.as_ref())
1127                     .await
1128                 {
1129                     Err(e) => {
1130                         // If any of the requests for an RC results in an error, then
1131                         // record the error and break the loop. We will sync again.
1132                         errors.push(Error::CaParentSyncError(
1133                             handle.clone(),
1134                             parent.clone(),
1135                             rcn.clone(),
1136                             e.to_string(),
1137                         ));
1138                         break;
1139                     }
1140                     Ok(response) => {
1141                         match response {
1142                             rfc6492::Res::Issue(issuance) => {
1143                                 // Update the received certificate.
1144                                 //
1145                                 // In a typical exchange we will only have one key under an RC under a
1146                                 // parent. During a key roll there may be multiple keys and requests. It
1147                                 // is still fine to update the received certificate for key "A" even if we
1148                                 // would get an error for the request for key "B". The reason is such an
1149                                 // *unlikely* failure would still trigger an appropriate response at
1150                                 // the resource class level in the next loop iteration below.
1151                                 let (_, _, _, issued) = issuance.unwrap();
1152                                 if let Err(e) = self
1153                                     .send_command(CmdDet::upd_received_cert(
1154                                         handle,
1155                                         rcn.clone(),
1156                                         RcvdCert::from(issued),
1157                                         self.config.clone(),
1158                                         self.signer.clone(),
1159                                         actor,
1160                                     ))
1161                                     .await
1162                                 {
1163                                     // Note that sending the command to update a received certificate
1164                                     // cannot fail unless there are bigger issues like this being the wrong
1165                                     // response for this resource class. This would be extremely odd because
1166                                     // we only just asked the resource class which request to send. Still, in
1167                                     // order to handle this the most graceful way we can, we should just drop
1168                                     // this resource class and report an error. If there are are still resource
1169                                     // entitlements under the parent for this resource class, then a new class
1170                                     // will be automatically created when we synchronize the entitlements again.
1171 
1172                                     let reason = format!("received certificate cannot be added, error: {}", e);
1173 
1174                                     self.send_command(CmdDet::drop_resource_class(
1175                                         handle,
1176                                         rcn.clone(),
1177                                         reason.clone(),
1178                                         self.signer.clone(),
1179                                         actor,
1180                                     ))
1181                                     .await?;
1182 
1183                                     // push the error for reporting, this will also trigger that the CA will
1184                                     // sync with its parent again - and then it will just find revocation
1185                                     // requests for this RC - which are sent on a best effort basis
1186                                     errors.push(Error::CaParentSyncError(
1187                                         handle.clone(),
1188                                         parent.clone(),
1189                                         rcn.clone(),
1190                                         reason,
1191                                     ));
1192                                     break;
1193                                 }
1194                             }
1195                             rfc6492::Res::NotPerformed(not_performed) => {
1196                                 match not_performed.status() {
1197                                     1201 | 1202 => {
1198                                         // Okay, so it looks like the parent *just* told the CA that it was entitled
1199                                         // to certain resources in a resource class and now in response to certificate
1200                                         // sign request they say the resource class is gone (1201), or there are no resources
1201                                         // in it (1202). This can happen as a result of a race condition if the child CA
1202                                         // was asking the entitlements just moments before the parent removed them.
1203 
1204                                         let reason = "parent removed entitlement to resource class".to_string();
1205 
1206                                         self.send_command(CmdDet::drop_resource_class(
1207                                             handle,
1208                                             rcn.clone(),
1209                                             reason.clone(),
1210                                             self.signer.clone(),
1211                                             actor,
1212                                         ))
1213                                         .await?;
1214 
1215                                         // push the error for reporting, this will also trigger that the CA will
1216                                         // sync with its parent again - and then it will just find revocation
1217                                         // requests for this RC - which are sent on a best effort basis
1218                                         errors.push(Error::CaParentSyncError(
1219                                             handle.clone(),
1220                                             parent.clone(),
1221                                             rcn.clone(),
1222                                             reason,
1223                                         ));
1224                                         break;
1225                                     }
1226                                     1204 => {
1227                                         // The parent says that the CA is re-using a key across RCs. Krill CAs never
1228                                         // re-use keys - so this is extremely unlikely. Still there seems to be a
1229                                         // disagreement and in this case the parent has the last word. Recovering by
1230                                         // dropping all keys in the RC and making a new pending key should be possible,
1231                                         // but it's complicated with regards to corner cases: e.g. what if we were in
1232                                         // the middle of key roll..
1233                                         //
1234                                         // So, the most straightforward way to deal with this is by dropping this current
1235                                         // RC altogether. Then the CA will find its resource entitlements in a future
1236                                         // synchronization with the parent and just create a new RC - and issue all
1237                                         // eligible certificates and ROAs under it.
1238 
1239                                         let reason = "parent claims we are re-using keys".to_string();
1240                                         self.send_command(CmdDet::drop_resource_class(
1241                                             handle,
1242                                             rcn.clone(),
1243                                             reason.clone(),
1244                                             self.signer.clone(),
1245                                             actor,
1246                                         ))
1247                                         .await?;
1248 
1249                                         // push the error for reporting, this will also trigger that the CA will
1250                                         // sync with its parent again - and then it will just find revocation
1251                                         // requests for this RC - which are sent on a best effort basis
1252                                         errors.push(Error::CaParentSyncError(
1253                                             handle.clone(),
1254                                             parent.clone(),
1255                                             rcn.clone(),
1256                                             reason,
1257                                         ));
1258                                         break;
1259                                     }
1260                                     _ => {
1261                                         // Other not performed responses can be due to temporary issues at the
1262                                         // parent (e.g. it had an internal error of some kind), or because of
1263                                         // protocol version mismatches and such (in future maybe?).
1264                                         //
1265                                         // In any event we cannot take any action to recover, so just report
1266                                         // them and let the schedular try to sync with the parent again.
1267                                         let issue = format!(
1268                                             "parent returned not performed response to certificate request: {}",
1269                                             not_performed
1270                                         );
1271                                         errors.push(Error::CaParentSyncError(
1272                                             handle.clone(),
1273                                             parent.clone(),
1274                                             rcn.clone(),
1275                                             issue,
1276                                         ));
1277                                         break;
1278                                     }
1279                                 }
1280                             }
1281                             rfc6492::Res::List(_) => {
1282                                 // A list response to certificate sign request??
1283                                 let issue = "parent returned a list response to a certificate request".to_string();
1284                                 errors.push(Error::CaParentSyncError(
1285                                     handle.clone(),
1286                                     parent.clone(),
1287                                     rcn.clone(),
1288                                     issue,
1289                                 ));
1290                                 break;
1291                             }
1292                             rfc6492::Res::Revoke(_) => {
1293                                 // A list response to certificate sign request??
1294                                 let issue = "parent returned a revoke response to a certificate request".to_string();
1295                                 errors.push(Error::CaParentSyncError(
1296                                     handle.clone(),
1297                                     parent.clone(),
1298                                     rcn.clone(),
1299                                     issue,
1300                                 ));
1301                                 break;
1302                             }
1303                         }
1304                     }
1305                 }
1306             }
1307         }
1308 
1309         let uri = parent_res.service_uri();
1310         if errors.is_empty() {
1311             self.status_store
1312                 .lock()
1313                 .await
1314                 .set_parent_last_updated(handle, parent, uri, self.config.ca_refresh_seconds as i64)
1315                 .await?;
1316 
1317             Ok(())
1318         } else {
1319             let e = if errors.len() == 1 {
1320                 errors.pop().unwrap()
1321             } else {
1322                 Error::Multiple(errors)
1323             };
1324 
1325             self.status_store
1326                 .lock()
1327                 .await
1328                 .set_parent_failure(handle, parent, uri, &e, REQUEUE_DELAY_SECONDS)
1329                 .await?;
1330 
1331             Err(e)
1332         }
1333     }
1334 
1335     /// Updates the CA resource classes, if entitlements are different from
1336     /// what the CA currently has under this parent. Returns [`Ok(true)`] in
1337     /// case there were any updates, implying that there will be open requests
1338     /// for the parent CA.
update_entitlements( &self, handle: &Handle, parent: ParentHandle, entitlements: Entitlements, actor: &Actor, ) -> KrillResult<bool>1339     async fn update_entitlements(
1340         &self,
1341         handle: &Handle,
1342         parent: ParentHandle,
1343         entitlements: Entitlements,
1344         actor: &Actor,
1345     ) -> KrillResult<bool> {
1346         let current_version = self.get_ca(handle).await?.version();
1347 
1348         let update_entitlements_command =
1349             CmdDet::update_entitlements(handle, parent, entitlements, self.signer.clone(), actor);
1350 
1351         let new_version = self.send_command(update_entitlements_command).await?.version();
1352 
1353         Ok(new_version > current_version)
1354     }
1355 
get_entitlements_from_contact( &self, ca: &Handle, parent: &ParentHandle, contact: &ParentCaContact, existing_parent: bool, ) -> KrillResult<api::Entitlements>1356     pub async fn get_entitlements_from_contact(
1357         &self,
1358         ca: &Handle,
1359         parent: &ParentHandle,
1360         contact: &ParentCaContact,
1361         existing_parent: bool,
1362     ) -> KrillResult<api::Entitlements> {
1363         match contact {
1364             ParentCaContact::Ta(_) => Err(Error::TaNotAllowed),
1365             ParentCaContact::Rfc6492(res) => {
1366                 let result = self.get_entitlements_rfc6492(ca, res).await;
1367                 let uri = res.service_uri();
1368                 let next_run_seconds = self.config.ca_refresh_seconds as i64;
1369 
1370                 match &result {
1371                     Err(error) => {
1372                         if existing_parent {
1373                             // only update the status store with errors for existing parents
1374                             // otherwise we end up with entries if a new parent is rejected because
1375                             // of the error.
1376                             self.status_store
1377                                 .lock()
1378                                 .await
1379                                 .set_parent_failure(ca, parent, uri, error, next_run_seconds)
1380                                 .await?;
1381                         }
1382                     }
1383                     Ok(entitlements) => {
1384                         self.status_store
1385                             .lock()
1386                             .await
1387                             .set_parent_entitlements(ca, parent, uri, entitlements, next_run_seconds)
1388                             .await?;
1389                     }
1390                 }
1391                 result
1392             }
1393         }
1394     }
1395 
get_entitlements_rfc6492( &self, handle: &Handle, parent_res: &rfc8183::ParentResponse, ) -> KrillResult<api::Entitlements>1396     async fn get_entitlements_rfc6492(
1397         &self,
1398         handle: &Handle,
1399         parent_res: &rfc8183::ParentResponse,
1400     ) -> KrillResult<api::Entitlements> {
1401         let child = self.ca_store.get_latest(handle)?;
1402 
1403         // create a list request
1404         let sender = parent_res.child_handle().clone();
1405         let recipient = parent_res.parent_handle().clone();
1406 
1407         let list = rfc6492::Message::list(sender, recipient);
1408 
1409         let response = self
1410             .send_rfc6492_and_validate_response(&child.id_key(), parent_res, list.into_bytes(), None)
1411             .await?;
1412 
1413         match response {
1414             rfc6492::Res::NotPerformed(np) => Err(Error::Custom(format!("Not performed: {}", np))),
1415             rfc6492::Res::List(ent) => Ok(ent),
1416             _ => Err(Error::custom("Got unexpected response to list query")),
1417         }
1418     }
1419 
send_rfc6492_and_validate_response( &self, signing_key: &KeyIdentifier, parent_res: &rfc8183::ParentResponse, msg: Bytes, cms_logger: Option<&CmsLogger>, ) -> KrillResult<rfc6492::Res>1420     async fn send_rfc6492_and_validate_response(
1421         &self,
1422         signing_key: &KeyIdentifier,
1423         parent_res: &rfc8183::ParentResponse,
1424         msg: Bytes,
1425         cms_logger: Option<&CmsLogger>,
1426     ) -> KrillResult<rfc6492::Res> {
1427         let response = self
1428             .send_protocol_msg_and_validate(
1429                 signing_key,
1430                 parent_res.service_uri(),
1431                 parent_res.id_cert(),
1432                 rfc6492::CONTENT_TYPE,
1433                 msg,
1434                 cms_logger,
1435             )
1436             .await?;
1437 
1438         rfc6492::Message::from_signed_message(&response)
1439             .map_err(Error::custom)?
1440             .into_reply()
1441             .map_err(Error::custom)
1442     }
1443 }
1444 
1445 /// # Publishing
1446 ///
1447 impl CaManager {
1448     /// Synchronize all CAs with their repositories. Meant to be called by the background
1449     /// schedular. This will log issues, but will not fail on errors with individual CAs -
1450     /// because otherwise this would prevent other CAs from syncing. Note however, that the
1451     /// repository status is tracked per CA and can be monitored.
1452     ///
1453     /// This function can still fail on internal errors, e.g. I/O issues when saving state
1454     /// changes to the repo status structure.
cas_repo_sync_all(&self, actor: &Actor)1455     pub async fn cas_repo_sync_all(&self, actor: &Actor) {
1456         match self.ca_list(actor) {
1457             Ok(ca_list) => {
1458                 for ca in ca_list.cas() {
1459                     let ca_handle = ca.handle();
1460                     if let Err(e) = self.cas_repo_sync_single(ca_handle).await {
1461                         error!(
1462                             "Could not synchronize CA '{}' with its repository/-ies. Error: {}",
1463                             ca_handle, e
1464                         );
1465                     }
1466                 }
1467             }
1468             Err(e) => error!("Could not get CA list! {}", e),
1469         }
1470     }
1471 
1472     /// Synchronize a CA with its repositories.
1473     ///
1474     /// Note typically a CA will have only one active repository, but in case
1475     /// there are multiple during a migration, this function will ensure that
1476     /// they are all synchronized.
1477     ///
1478     /// In case the CA had deprecated repositories, then a clean up will be
1479     /// attempted. I.e. the CA will try to withdraw all objects from the deprecated
1480     /// repository. If this clean up fails then the number of clean-up attempts
1481     /// for the repository in question is incremented, and this function will
1482     /// fail. When there have been 5 failed attempts, then the old repository
1483     /// is assumed to be unreachable and it will be dropped - i.e. the CA will
1484     /// no longer try to clean up objects.
cas_repo_sync_single(&self, ca_handle: &Handle) -> KrillResult<()>1485     pub async fn cas_repo_sync_single(&self, ca_handle: &Handle) -> KrillResult<()> {
1486         // Note that this is a no-op for new CAs which do not yet have any repository configured.
1487         for (repo_contact, ca_elements) in self.ca_repo_elements(ca_handle).await? {
1488             self.ca_repo_sync(ca_handle, &repo_contact, ca_elements).await?;
1489         }
1490 
1491         // Clean-up of old repos
1492         for deprecated in self.ca_deprecated_repos(ca_handle)? {
1493             info!(
1494                 "Will try to clean up deprecated repository '{}' for CA '{}'",
1495                 deprecated.contact(),
1496                 ca_handle
1497             );
1498 
1499             if let Err(e) = self.ca_repo_sync(ca_handle, deprecated.contact(), vec![]).await {
1500                 warn!("Could not clean up deprecated repository: {}", e);
1501 
1502                 if deprecated.clean_attempts() < 5 {
1503                     self.ca_deprecated_repo_increment_clean_attempts(ca_handle, deprecated.contact())?;
1504                     return Err(e);
1505                 }
1506             }
1507 
1508             self.ca_deprecated_repo_remove(ca_handle, deprecated.contact())?;
1509         }
1510 
1511         Ok(())
1512     }
1513 
ca_repo_sync( &self, ca_handle: &Handle, repo_contact: &RepositoryContact, publish_elements: Vec<PublishElement>, ) -> KrillResult<()>1514     async fn ca_repo_sync(
1515         &self,
1516         ca_handle: &Handle,
1517         repo_contact: &RepositoryContact,
1518         publish_elements: Vec<PublishElement>,
1519     ) -> KrillResult<()> {
1520         let list_reply = self.send_rfc8181_list(ca_handle, repo_contact.response()).await?;
1521 
1522         #[allow(clippy::mutable_key_type)]
1523         let delta = {
1524             let elements: HashMap<_, _> = list_reply.into_elements().into_iter().map(|el| el.unpack()).collect();
1525 
1526             let mut all_objects: HashMap<_, _> = publish_elements.into_iter().map(|el| el.unpack()).collect();
1527 
1528             let mut withdraws = vec![];
1529             let mut updates = vec![];
1530             for (uri, hash) in elements.into_iter() {
1531                 match all_objects.remove(&uri) {
1532                     Some(base64) => {
1533                         if base64.to_encoded_hash() != hash {
1534                             updates.push(Update::new(None, uri, base64, hash))
1535                         }
1536                     }
1537                     None => withdraws.push(Withdraw::new(None, uri, hash)),
1538                 }
1539             }
1540             let publishes = all_objects
1541                 .into_iter()
1542                 .map(|(uri, base64)| Publish::new(None, uri, base64))
1543                 .collect();
1544 
1545             PublishDelta::new(publishes, updates, withdraws)
1546         };
1547 
1548         self.send_rfc8181_delta(ca_handle, repo_contact.response(), delta)
1549             .await?;
1550 
1551         Ok(())
1552     }
1553 
1554     /// Get the current objects for a CA for each repository that it's using.
1555     ///
1556     /// Notes:
1557     /// - typically a CA will use only one repository, but during migrations there may be multiple.
1558     /// - these object may not have been published (yet) - check `ca_repo_status`.
ca_repo_elements(&self, ca: &Handle) -> KrillResult<HashMap<RepositoryContact, Vec<PublishElement>>>1559     pub async fn ca_repo_elements(&self, ca: &Handle) -> KrillResult<HashMap<RepositoryContact, Vec<PublishElement>>> {
1560         Ok(self.ca_objects_store.ca_objects(ca)?.repo_elements_map())
1561     }
1562 
1563     /// Get deprecated repositories so that they can be cleaned.
ca_deprecated_repos(&self, ca: &Handle) -> KrillResult<Vec<DeprecatedRepository>>1564     pub fn ca_deprecated_repos(&self, ca: &Handle) -> KrillResult<Vec<DeprecatedRepository>> {
1565         Ok(self.ca_objects_store.ca_objects(ca)?.deprecated_repos().clone())
1566     }
1567 
1568     /// Remove a deprecated repo
ca_deprecated_repo_remove(&self, ca: &Handle, to_remove: &RepositoryContact) -> KrillResult<()>1569     pub fn ca_deprecated_repo_remove(&self, ca: &Handle, to_remove: &RepositoryContact) -> KrillResult<()> {
1570         self.ca_objects_store.with_ca_objects(ca, |objects| {
1571             objects.deprecated_repo_remove(to_remove);
1572             Ok(())
1573         })
1574     }
1575 
1576     /// Increase the clean attempt counter for a deprecated repository
ca_deprecated_repo_increment_clean_attempts( &self, ca: &Handle, contact: &RepositoryContact, ) -> KrillResult<()>1577     pub fn ca_deprecated_repo_increment_clean_attempts(
1578         &self,
1579         ca: &Handle,
1580         contact: &RepositoryContact,
1581     ) -> KrillResult<()> {
1582         self.ca_objects_store.with_ca_objects(ca, |objects| {
1583             objects.deprecated_repo_inc_clean_attempts(contact);
1584             Ok(())
1585         })
1586     }
1587 
1588     /// Update repository where a CA publishes.
update_repo( &self, handle: Handle, new_contact: RepositoryContact, check_repo: bool, actor: &Actor, ) -> KrillResult<()>1589     pub async fn update_repo(
1590         &self,
1591         handle: Handle,
1592         new_contact: RepositoryContact,
1593         check_repo: bool,
1594         actor: &Actor,
1595     ) -> KrillResult<()> {
1596         if check_repo {
1597             // First verify that this repository can be reached and responds to a list request.
1598             self.send_rfc8181_list(&handle, new_contact.response())
1599                 .await
1600                 .map_err(|e| Error::CaRepoIssue(handle.clone(), e.to_string()))?;
1601         }
1602         let cmd = CmdDet::update_repo(&handle, new_contact, self.signer.clone(), actor);
1603         self.send_command(cmd).await?;
1604         Ok(())
1605     }
1606 
send_rfc8181_list( &self, ca_handle: &Handle, repository: &rfc8183::RepositoryResponse, ) -> KrillResult<ListReply>1607     async fn send_rfc8181_list(
1608         &self,
1609         ca_handle: &Handle,
1610         repository: &rfc8183::RepositoryResponse,
1611     ) -> KrillResult<ListReply> {
1612         let uri = repository.service_uri();
1613 
1614         let reply = match self
1615             .send_rfc8181_and_validate_response(ca_handle, repository, rfc8181::Message::list_query().into_bytes())
1616             .await
1617         {
1618             Err(e) => {
1619                 self.status_store
1620                     .lock()
1621                     .await
1622                     .set_status_repo_failure(ca_handle, uri.clone(), &e)
1623                     .await?;
1624                 return Err(e);
1625             }
1626             Ok(reply) => reply,
1627         };
1628 
1629         let next_update = self
1630             .ca_objects_store
1631             .ca_objects(ca_handle)?
1632             .closest_next_update()
1633             .unwrap_or_else(|| Timestamp::now_plus_hours(self.config.republish_hours()));
1634 
1635         match reply {
1636             rfc8181::ReplyMessage::ListReply(list_reply) => {
1637                 self.status_store
1638                     .lock()
1639                     .await
1640                     .set_status_repo_success(ca_handle, uri.clone(), next_update)
1641                     .await?;
1642                 Ok(list_reply)
1643             }
1644             rfc8181::ReplyMessage::SuccessReply => {
1645                 let err = Error::custom("Got success reply to list query?!");
1646                 self.status_store
1647                     .lock()
1648                     .await
1649                     .set_status_repo_failure(ca_handle, uri.clone(), &err)
1650                     .await?;
1651                 Err(err)
1652             }
1653             rfc8181::ReplyMessage::ErrorReply(e) => {
1654                 let err = Error::Custom(format!("Got error reply: {}", e));
1655                 self.status_store
1656                     .lock()
1657                     .await
1658                     .set_status_repo_failure(ca_handle, uri.clone(), &err)
1659                     .await?;
1660                 Err(err)
1661             }
1662         }
1663     }
1664 
send_rfc8181_delta( &self, ca_handle: &Handle, repository: &rfc8183::RepositoryResponse, delta: PublishDelta, ) -> KrillResult<()>1665     pub async fn send_rfc8181_delta(
1666         &self,
1667         ca_handle: &Handle,
1668         repository: &rfc8183::RepositoryResponse,
1669         delta: PublishDelta,
1670     ) -> KrillResult<()> {
1671         let message = rfc8181::Message::publish_delta_query(delta);
1672         let uri = repository.service_uri();
1673 
1674         let reply = match self
1675             .send_rfc8181_and_validate_response(ca_handle, repository, message.into_bytes())
1676             .await
1677         {
1678             Ok(reply) => reply,
1679             Err(e) => {
1680                 self.status_store
1681                     .lock()
1682                     .await
1683                     .set_status_repo_failure(ca_handle, uri.clone(), &e)
1684                     .await?;
1685                 return Err(e);
1686             }
1687         };
1688 
1689         match reply {
1690             rfc8181::ReplyMessage::SuccessReply => {
1691                 // Get all the currently published elements in ALL REPOS.
1692                 // TODO: reflect the status for each REPO in the API / UI?
1693                 // We probably should.. though it should be extremely rare and short-lived to
1694                 // have more than one repository.
1695                 let ca_objects = self.ca_objects_store.ca_objects(ca_handle)?;
1696                 let published = ca_objects.all_publish_elements();
1697                 let next_update = ca_objects
1698                     .closest_next_update()
1699                     .unwrap_or_else(|| Timestamp::now_plus_hours(self.config.republish_hours()));
1700 
1701                 self.status_store
1702                     .lock()
1703                     .await
1704                     .set_status_repo_published(ca_handle, uri.clone(), published, next_update)
1705                     .await?;
1706                 Ok(())
1707             }
1708             rfc8181::ReplyMessage::ErrorReply(e) => {
1709                 let err = Error::Custom(format!("Got error reply: {}", e));
1710                 self.status_store
1711                     .lock()
1712                     .await
1713                     .set_status_repo_failure(ca_handle, uri.clone(), &err)
1714                     .await?;
1715                 Err(err)
1716             }
1717             rfc8181::ReplyMessage::ListReply(_) => {
1718                 let err = Error::custom("Got list reply to delta query?!");
1719                 self.status_store
1720                     .lock()
1721                     .await
1722                     .set_status_repo_failure(ca_handle, uri.clone(), &err)
1723                     .await?;
1724                 Err(err)
1725             }
1726         }
1727     }
1728 
send_rfc8181_and_validate_response( &self, ca_handle: &Handle, repository: &rfc8183::RepositoryResponse, msg: Bytes, ) -> KrillResult<rfc8181::ReplyMessage>1729     async fn send_rfc8181_and_validate_response(
1730         &self,
1731         ca_handle: &Handle,
1732         repository: &rfc8183::RepositoryResponse,
1733         msg: Bytes,
1734     ) -> KrillResult<rfc8181::ReplyMessage> {
1735         let ca = self.get_ca(ca_handle).await?;
1736 
1737         let cms_logger = CmsLogger::for_rfc8181_sent(self.config.rfc8181_log_dir.as_ref(), ca_handle);
1738 
1739         let response = self
1740             .send_protocol_msg_and_validate(
1741                 &ca.id_key(),
1742                 repository.service_uri(),
1743                 repository.id_cert(),
1744                 rfc8181::CONTENT_TYPE,
1745                 msg,
1746                 Some(&cms_logger),
1747             )
1748             .await?;
1749 
1750         rfc8181::Message::from_signed_message(&response)
1751             .map_err(Error::custom)?
1752             .into_reply()
1753             .map_err(Error::custom)
1754     }
1755 }
1756 
1757 /// # Support sending RFC 6492 and 8181 'protocol' messages, and verifying responses.
1758 ///
1759 impl CaManager {
send_protocol_msg_and_validate( &self, signing_key: &KeyIdentifier, service_uri: &rfc8183::ServiceUri, service_id: &IdCert, content_type: &str, msg: Bytes, cms_logger: Option<&CmsLogger>, ) -> KrillResult<ProtocolCms>1760     async fn send_protocol_msg_and_validate(
1761         &self,
1762         signing_key: &KeyIdentifier,
1763         service_uri: &rfc8183::ServiceUri,
1764         service_id: &IdCert,
1765         content_type: &str,
1766         msg: Bytes,
1767         cms_logger: Option<&CmsLogger>,
1768     ) -> KrillResult<ProtocolCms> {
1769         let signed_msg = ProtocolCmsBuilder::create(signing_key, self.signer.deref(), msg)
1770             .map_err(Error::signer)?
1771             .as_bytes();
1772 
1773         let uri = service_uri.to_string();
1774 
1775         let timeout = self.config.post_protocol_msg_timeout_seconds;
1776 
1777         let res = httpclient::post_binary_with_full_ua(&uri, &signed_msg, content_type, timeout)
1778             .await
1779             .map_err(Error::HttpClientError)?;
1780 
1781         if let Some(logger) = cms_logger {
1782             logger.sent(&signed_msg)?;
1783             logger.reply(&res)?;
1784         }
1785 
1786         // unpack and validate response
1787         let msg = match ProtocolCms::decode(res.as_ref(), false).map_err(Error::custom) {
1788             Ok(msg) => msg,
1789             Err(e) => {
1790                 error!("Could not parse protocol response");
1791                 return Err(e);
1792             }
1793         };
1794 
1795         if let Err(e) = msg.validate(service_id) {
1796             error!("Could not validate protocol response: {}", base64::encode(res.as_ref()));
1797             return Err(Error::custom(e));
1798         }
1799 
1800         Ok(msg)
1801     }
1802 }
1803 
1804 /// # Autonomous System Provider Authorization functions
1805 ///
1806 impl CaManager {
1807     /// Show current ASPA definitions for this CA.
ca_aspas_definitions_show(&self, ca: Handle) -> KrillResult<AspaDefinitionList>1808     pub async fn ca_aspas_definitions_show(&self, ca: Handle) -> KrillResult<AspaDefinitionList> {
1809         let ca = self.get_ca(&ca).await?;
1810         Ok(ca.aspas_definitions_show())
1811     }
1812 
1813     /// Add a new ASPA definition for this CA and the customer ASN in the update.
ca_aspas_definitions_update( &self, ca: Handle, updates: AspaDefinitionUpdates, actor: &Actor, ) -> KrillResult<()>1814     pub async fn ca_aspas_definitions_update(
1815         &self,
1816         ca: Handle,
1817         updates: AspaDefinitionUpdates,
1818         actor: &Actor,
1819     ) -> KrillResult<()> {
1820         self.send_command(CmdDet::aspas_definitions_update(
1821             &ca,
1822             updates,
1823             self.config.clone(),
1824             self.signer.clone(),
1825             actor,
1826         ))
1827         .await?;
1828         Ok(())
1829     }
1830 
1831     /// Update the ASPA definition for this CA and the customer ASN in the update.
ca_aspas_update_aspa( &self, ca: Handle, customer: AspaCustomer, update: AspaProvidersUpdate, actor: &Actor, ) -> KrillResult<()>1832     pub async fn ca_aspas_update_aspa(
1833         &self,
1834         ca: Handle,
1835         customer: AspaCustomer,
1836         update: AspaProvidersUpdate,
1837         actor: &Actor,
1838     ) -> KrillResult<()> {
1839         self.send_command(CmdDet::aspas_update_aspa(
1840             &ca,
1841             customer,
1842             update,
1843             self.config.clone(),
1844             self.signer.clone(),
1845             actor,
1846         ))
1847         .await?;
1848         Ok(())
1849     }
1850 }
1851 
1852 /// # Route Authorization functions
1853 ///
1854 impl CaManager {
1855     /// Update the routes authorized by a CA. This will trigger that ROAs
1856     /// are made in the resource classes that contain the prefixes. If the
1857     /// update is rejected, e.g. because the CA does not have the necessary
1858     /// prefixes then an `Error::RoaDeltaError` will be returned.
1859     /// If the update is successful, new manifest(s) and CRL(s) will be created,
1860     /// and resynchronization between the CA and its repository will be triggered.
1861     /// Finally note that ROAs may be issues on a per prefix basis, or aggregated
1862     /// by ASN based on the defaults or values configured.
ca_routes_update( &self, ca: Handle, updates: RouteAuthorizationUpdates, actor: &Actor, ) -> KrillResult<()>1863     pub async fn ca_routes_update(
1864         &self,
1865         ca: Handle,
1866         updates: RouteAuthorizationUpdates,
1867         actor: &Actor,
1868     ) -> KrillResult<()> {
1869         self.send_command(CmdDet::route_authorizations_update(
1870             &ca,
1871             updates,
1872             self.config.clone(),
1873             self.signer.clone(),
1874             actor,
1875         ))
1876         .await?;
1877         Ok(())
1878     }
1879 
1880     /// Re-issue about to expire objects in all CAs. This is a no-op in case
1881     /// ROAs do not need re-issuance. If new objects are created they will also
1882     /// be published (event will trigger that MFT and CRL are also made, and
1883     /// and the CA in question synchronizes with its repository).
1884     ///
1885     /// Note: this does not re-issue delegated CA certificates, because child
1886     /// CAs are expected to note extended validity eligibility and request
1887     /// updated certificates themselves.
renew_objects_all(&self, actor: &Actor) -> KrillResult<()>1888     pub async fn renew_objects_all(&self, actor: &Actor) -> KrillResult<()> {
1889         for ca in self.ca_store.list()? {
1890             let cmd = Cmd::new(
1891                 &ca,
1892                 None,
1893                 CmdDet::RouteAuthorizationsRenew(self.config.clone(), self.signer.clone()),
1894                 actor,
1895             );
1896             self.send_command(cmd).await?;
1897 
1898             let cmd = Cmd::new(
1899                 &ca,
1900                 None,
1901                 CmdDet::AspasRenew(self.config.clone(), self.signer.clone()),
1902                 actor,
1903             );
1904             self.send_command(cmd).await?;
1905         }
1906         Ok(())
1907     }
1908 
1909     /// Force the reissuance of all ROAs in all CAs. This function was added
1910     /// because we need to re-issue ROAs in Krill 0.9.3 to force that a short
1911     /// subject CN is used for the EE certificate: i.e. the SKI rather than the
1912     /// full public key. But there may also be other cases in future where
1913     /// forcing to re-issue ROAs may be useful.
force_renew_roas_all(&self, actor: &Actor) -> KrillResult<()>1914     pub async fn force_renew_roas_all(&self, actor: &Actor) -> KrillResult<()> {
1915         for ca in self.ca_store.list()? {
1916             let cmd = Cmd::new(
1917                 &ca,
1918                 None,
1919                 CmdDet::RouteAuthorizationsForceRenew(self.config.clone(), self.signer.clone()),
1920                 actor,
1921             );
1922             self.send_command(cmd).await?;
1923         }
1924         Ok(())
1925     }
1926 }
1927 
1928 /// # Resource Tagged Attestation functions
1929 ///
1930 impl CaManager {
1931     /// Sign a one-off single-signed RTA
rta_sign( &self, ca: Handle, name: RtaName, request: RtaContentRequest, actor: &Actor, ) -> KrillResult<()>1932     pub async fn rta_sign(
1933         &self,
1934         ca: Handle,
1935         name: RtaName,
1936         request: RtaContentRequest,
1937         actor: &Actor,
1938     ) -> KrillResult<()> {
1939         let cmd = CmdDet::rta_sign(&ca, name, request, self.signer.clone(), actor);
1940         self.send_command(cmd).await?;
1941         Ok(())
1942     }
1943 
1944     /// Prepare a multi-singed RTA
rta_multi_prep( &self, ca: &Handle, name: RtaName, request: RtaPrepareRequest, actor: &Actor, ) -> KrillResult<()>1945     pub async fn rta_multi_prep(
1946         &self,
1947         ca: &Handle,
1948         name: RtaName,
1949         request: RtaPrepareRequest,
1950         actor: &Actor,
1951     ) -> KrillResult<()> {
1952         let cmd = CmdDet::rta_multi_prep(ca, name, request, self.signer.clone(), actor);
1953         self.send_command(cmd).await?;
1954         Ok(())
1955     }
1956 
1957     /// Co-sign an existing RTA
rta_multi_cosign( &self, ca: Handle, name: RtaName, rta: ResourceTaggedAttestation, actor: &Actor, ) -> KrillResult<()>1958     pub async fn rta_multi_cosign(
1959         &self,
1960         ca: Handle,
1961         name: RtaName,
1962         rta: ResourceTaggedAttestation,
1963         actor: &Actor,
1964     ) -> KrillResult<()> {
1965         let cmd = CmdDet::rta_multi_sign(&ca, name, rta, self.signer.clone(), actor);
1966         self.send_command(cmd).await?;
1967         Ok(())
1968     }
1969 }
1970 
1971 /// CA Key Roll functions
1972 ///
1973 impl CaManager {
1974     /// Initiate an RFC 6489 key roll for all active keys in a CA older than the specified duration.
ca_keyroll_init(&self, handle: Handle, max_age: Duration, actor: &Actor) -> KrillResult<()>1975     pub async fn ca_keyroll_init(&self, handle: Handle, max_age: Duration, actor: &Actor) -> KrillResult<()> {
1976         let init_key_roll = CmdDet::key_roll_init(&handle, max_age, self.signer.clone(), actor);
1977         self.send_command(init_key_roll).await?;
1978         Ok(())
1979     }
1980 
1981     /// Activate a new key, as part of the key roll process (RFC 6489). Only new keys that
1982     /// have an age equal to or greater than the staging period are promoted. The RFC mandates
1983     /// a staging period of 24 hours, but we may use a shorter period for testing and/or emergency
1984     /// manual key rolls.
ca_keyroll_activate(&self, handle: Handle, staging: Duration, actor: &Actor) -> KrillResult<()>1985     pub async fn ca_keyroll_activate(&self, handle: Handle, staging: Duration, actor: &Actor) -> KrillResult<()> {
1986         let activate_cmd = CmdDet::key_roll_activate(&handle, staging, self.config.clone(), self.signer.clone(), actor);
1987         self.send_command(activate_cmd).await?;
1988         Ok(())
1989     }
1990 }
1991