1 use std::{
2     collections::HashMap,
3     fmt,
4     path::Path,
5     str::FromStr,
6     sync::{Arc, RwLock},
7 };
8 
9 use serde::{de::DeserializeOwned, Deserialize, Serialize};
10 
11 use rpki::repository::x509::Time;
12 
13 use crate::commons::eventsourcing::{
14     cmd::{Command, StoredCommandBuilder},
15     Aggregate, Event, KeyStoreKey, KeyValueError, KeyValueStore, PostSaveEventListener, StoredCommand,
16     WithStorableDetails,
17 };
18 use crate::commons::{
19     api::{CommandHistory, CommandHistoryCriteria, CommandHistoryRecord, Handle, Label},
20     error::KrillIoError,
21     util::KrillVersion,
22 };
23 
24 use super::PreSaveEventListener;
25 
26 pub type StoreResult<T> = Result<T, AggregateStoreError>;
27 
28 //------------ Storable ------------------------------------------------------
29 
30 pub trait Storable: Clone + Serialize + DeserializeOwned + Sized + 'static {}
31 impl<T: Clone + Serialize + DeserializeOwned + Sized + 'static> Storable for T {}
32 
33 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
34 pub struct StoredValueInfo {
35     pub snapshot_version: u64,
36     pub last_event: u64,
37     pub last_command: u64,
38     pub last_update: Time,
39 }
40 
41 impl Default for StoredValueInfo {
default() -> Self42     fn default() -> Self {
43         StoredValueInfo {
44             snapshot_version: 0,
45             last_event: 0,
46             last_command: 0,
47             last_update: Time::now(),
48         }
49     }
50 }
51 
52 //------------ CommandKey ----------------------------------------------------
53 
54 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
55 pub struct CommandKey {
56     pub sequence: u64,
57     pub timestamp_secs: i64,
58     pub label: Label,
59 }
60 
61 impl CommandKey {
new(sequence: u64, time: Time, label: Label) -> Self62     pub fn new(sequence: u64, time: Time, label: Label) -> Self {
63         CommandKey {
64             sequence,
65             timestamp_secs: time.timestamp(),
66             label,
67         }
68     }
69 
for_stored<S: WithStorableDetails>(command: &StoredCommand<S>) -> CommandKey70     pub fn for_stored<S: WithStorableDetails>(command: &StoredCommand<S>) -> CommandKey {
71         CommandKey::new(command.sequence(), command.time(), command.details().summary().label)
72     }
73 
matches_crit(&self, crit: &CommandHistoryCriteria) -> bool74     pub fn matches_crit(&self, crit: &CommandHistoryCriteria) -> bool {
75         crit.matches_timestamp_secs(self.timestamp_secs)
76             && crit.matches_label(&self.label)
77             && crit.matches_sequence(self.sequence)
78     }
79 }
80 
81 impl fmt::Display for CommandKey {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result82     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
83         write!(f, "command--{}--{}--{}", self.timestamp_secs, self.sequence, self.label)
84     }
85 }
86 
87 impl FromStr for CommandKey {
88     type Err = CommandKeyError;
89 
from_str(s: &str) -> Result<Self, Self::Err>90     fn from_str(s: &str) -> Result<Self, Self::Err> {
91         let parts: Vec<&str> = s.split("--").collect();
92         if parts.len() != 4 || parts[0] != "command" {
93             Err(CommandKeyError(s.to_string()))
94         } else {
95             let timestamp_secs = i64::from_str(parts[1]).map_err(|_| CommandKeyError(s.to_string()))?;
96             let sequence = u64::from_str(parts[2]).map_err(|_| CommandKeyError(s.to_string()))?;
97             // strip .json if present on the label part
98             let label = {
99                 let end = parts[3].to_string();
100                 let last = if end.ends_with(".json") {
101                     end.len() - 5
102                 } else {
103                     end.len()
104                 };
105                 (end[0..last]).to_string()
106             };
107 
108             Ok(CommandKey {
109                 sequence,
110                 timestamp_secs,
111                 label,
112             })
113         }
114     }
115 }
116 
117 //------------ CommandKeyError -----------------------------------------------
118 
119 #[derive(Clone, Debug, Eq, PartialEq)]
120 pub struct CommandKeyError(String);
121 
122 impl fmt::Display for CommandKeyError {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result123     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
124         write!(f, "invalid command key: {}", self.0)
125     }
126 }
127 
128 //------------ AggregateStore ------------------------------------------------
129 
130 /// This type is responsible for managing aggregates.
131 pub struct AggregateStore<A: Aggregate> {
132     kv: KeyValueStore,
133     cache: RwLock<HashMap<Handle, Arc<A>>>,
134     pre_save_listeners: Vec<Arc<dyn PreSaveEventListener<A>>>,
135     post_save_listeners: Vec<Arc<dyn PostSaveEventListener<A>>>,
136     outer_lock: RwLock<()>,
137 }
138 
139 /// # Starting up
140 ///
141 impl<A: Aggregate> AggregateStore<A>
142 where
143     A::Error: From<AggregateStoreError>,
144 {
145     /// Creates an AggregateStore using a disk based KeyValueStore
disk(work_dir: &Path, name_space: &str) -> StoreResult<Self>146     pub fn disk(work_dir: &Path, name_space: &str) -> StoreResult<Self> {
147         let mut path = work_dir.to_path_buf();
148         path.push(name_space);
149         let existed = path.exists();
150 
151         let kv = KeyValueStore::disk(work_dir, name_space)?;
152         let cache = RwLock::new(HashMap::new());
153         let pre_save_listeners = vec![];
154         let post_save_listeners = vec![];
155         let outer_lock = RwLock::new(());
156 
157         let store = AggregateStore {
158             kv,
159             cache,
160             pre_save_listeners,
161             post_save_listeners,
162             outer_lock,
163         };
164 
165         if !existed {
166             store.set_version(&KrillVersion::current())?;
167         }
168 
169         Ok(store)
170     }
171 
172     /// Warms up the cache, to be used after startup. Will fail if any aggregates fail to load
173     /// in which case a 'recover' operation can be tried.
warm(&self) -> StoreResult<()>174     pub fn warm(&self) -> StoreResult<()> {
175         for handle in self.list()? {
176             self.warm_aggregate(&handle)?;
177         }
178         Ok(())
179     }
180 
181     /// Warm the cache for a specific aggregate. If successful save the latest snapshot
182     /// as well (will help in case of migrations where snapshots were dropped).
183     ///
184     /// In case any surplus event(s) and/or command(s) are encountered, i.e. extra entries not
185     /// recorded in the 'info.json' which is always saved last on state changes - then it is
186     /// assumed that an incomplete transaction took place. The surplus entries will be archived
187     /// and warnings will be reported.
warm_aggregate(&self, handle: &Handle) -> StoreResult<()>188     pub fn warm_aggregate(&self, handle: &Handle) -> StoreResult<()> {
189         let agg = self
190             .get_latest(handle)
191             .map_err(|e| AggregateStoreError::WarmupFailed(handle.clone(), e.to_string()))?;
192 
193         // check that last command and event are consistent with
194         // the info, if not fail warmup and force recover
195         let info = self.get_info(handle)?;
196 
197         // for events we can just check if the next event, after
198         // the last event in the info exists
199         if self.get_event::<A::Event>(handle, info.last_event + 1)?.is_some() {
200             warn!(
201                 "Found surplus event(s) for '{}' when warming up the cache. Will archive!",
202                 handle
203             );
204             self.archive_surplus_events(handle, info.last_event + 1)?;
205         }
206 
207         // Check if there are any commands with a sequence after the last
208         // recorded sequence in the info.
209         let mut crit = CommandHistoryCriteria::default();
210         crit.set_after_sequence(info.last_command);
211         let surplus_commands = self.command_keys_ascending(handle, &crit)?;
212         if !surplus_commands.is_empty() {
213             warn!(
214                 "Found surplus command(s) for '{}' when warming up the cache. Will archive!",
215                 handle
216             );
217 
218             for command in surplus_commands {
219                 self.archive_surplus_command(handle, &command)?;
220             }
221         }
222 
223         // Save the snapshot if it does not yet match the latest state
224         if info.snapshot_version != agg.version() {
225             info!("Updating snapshot for '{}', to decrease future load times.", handle);
226             self.store_snapshot(handle, agg.as_ref())?;
227         }
228 
229         Ok(())
230     }
231 
232     /// Recovers aggregates to the latest consistent saved in the keystore by verifying
233     /// all commands, and the corresponding events. Use this in case the state on disk is
234     /// found to be inconsistent. I.e. the `warm` function failed and Krill exited.
235     ///
236     /// Note Krill has an option to *always* use this recover function when it starts,
237     /// but the default is that it just uses `warm` function instead. The reason for this
238     /// is that `recover` can take longer, and that it could lead silent recovery without
239     /// alerting to operators to underlying issues.
recover(&self) -> StoreResult<()>240     pub fn recover(&self) -> StoreResult<()> {
241         let criteria = CommandHistoryCriteria::default();
242         for handle in self.list()? {
243             info!("Will recover state for '{}'", &handle);
244 
245             // Check
246             // - All commands, archive bad commands
247             // - All events, archive bad events
248             // - Keep track of last known good command and event
249             // - Archive all commands and events after
250             //
251             // Rebuild state up to event:
252             //   - use snapshot - archive if bad
253             //   - use back-up snapshot if snapshot is no good - archive if bad
254             //   - start from init event if back-up snapshot is bad, or if the version exceeds last good event
255             //   - process events from (back-up) snapshot up to last good event
256             //
257             //  If still good:
258             //   - save snapshot
259             //   - save info
260 
261             let mut last_good_cmd = 0;
262             let mut last_good_evt = 0;
263             let mut last_update = Time::now();
264 
265             // Check all commands and associated events
266             let mut all_ok = true;
267 
268             let command_keys = self.command_keys_ascending(&handle, &criteria)?;
269             info!("Processing {} commands for {}", command_keys.len(), handle);
270             for (counter, command_key) in command_keys.into_iter().enumerate() {
271                 if counter % 100 == 0 {
272                     info!("Processed {} commands", counter);
273                 }
274 
275                 if all_ok {
276                     if let Ok(cmd) = self.get_command::<A::StorableCommandDetails>(&handle, &command_key) {
277                         if let Some(events) = cmd.effect().events() {
278                             for version in events {
279                                 if let Ok(Some(_)) = self.get_event::<A::Event>(&handle, *version) {
280                                     last_good_evt = *version;
281                                 } else {
282                                     all_ok = false;
283                                 }
284                             }
285                         }
286                         last_good_cmd = cmd.sequence();
287                         last_update = cmd.time();
288                     } else {
289                         all_ok = false;
290                     }
291                 }
292                 if !all_ok {
293                     warn!(
294                         "Command {} was corrupt, or not all events could be loaded. Will archive surplus",
295                         command_key
296                     );
297                     // Bad command or event encountered.. archive surplus commands
298                     // note that we will clean surplus events later
299                     self.archive_surplus_command(&handle, &command_key)?;
300                 }
301             }
302 
303             self.archive_surplus_events(&handle, last_good_evt + 1)?;
304 
305             if !all_ok {
306                 warn!(
307                     "State for '{}' can only be recovered to version: {}. Check corrupt and surplus dirs",
308                     &handle, last_good_evt
309                 );
310             }
311 
312             // Get the latest aggregate, not that this ensures that the snapshots
313             // are checked, and archived if corrupt, or if they are after the last_good_evt
314             let agg = self
315                 .get_aggregate(&handle, Some(last_good_evt))?
316                 .ok_or_else(|| AggregateStoreError::CouldNotRecover(handle.clone()))?;
317 
318             let snapshot_version = agg.version();
319 
320             let info = StoredValueInfo {
321                 last_event: last_good_evt,
322                 last_command: last_good_cmd,
323                 last_update,
324                 snapshot_version,
325             };
326 
327             self.store_snapshot(&handle, &agg)?;
328 
329             self.cache_update(&handle, Arc::new(agg));
330 
331             self.save_info(&handle, &info)?;
332         }
333 
334         Ok(())
335     }
336 
337     /// Adds a listener that will receive all events before they are stored.
add_pre_save_listener<L: PreSaveEventListener<A>>(&mut self, sync_listener: Arc<L>)338     pub fn add_pre_save_listener<L: PreSaveEventListener<A>>(&mut self, sync_listener: Arc<L>) {
339         self.pre_save_listeners.push(sync_listener);
340     }
341 
342     /// Adds a listener that will receive a reference to all events after they are stored.
add_post_save_listener<L: PostSaveEventListener<A>>(&mut self, listener: Arc<L>)343     pub fn add_post_save_listener<L: PostSaveEventListener<A>>(&mut self, listener: Arc<L>) {
344         self.post_save_listeners.push(listener);
345     }
346 }
347 
348 /// # Manage Aggregates
349 ///
350 impl<A: Aggregate> AggregateStore<A>
351 where
352     A::Error: From<AggregateStoreError>,
353 {
354     /// Gets the latest version for the given aggregate. Returns
355     /// an AggregateStoreError::UnknownAggregate in case the aggregate
356     /// does not exist.
get_latest(&self, handle: &Handle) -> StoreResult<Arc<A>>357     pub fn get_latest(&self, handle: &Handle) -> StoreResult<Arc<A>> {
358         let _lock = self.outer_lock.read().unwrap();
359         self.get_latest_no_lock(handle)
360     }
361 
362     /// Adds a new aggregate instance based on the init event.
add(&self, init: A::InitEvent) -> StoreResult<Arc<A>>363     pub fn add(&self, init: A::InitEvent) -> StoreResult<Arc<A>> {
364         let _lock = self.outer_lock.write().unwrap();
365 
366         self.store_event(&init)?;
367 
368         let handle = init.handle().clone();
369 
370         let aggregate = A::init(init).map_err(|_| AggregateStoreError::InitError(handle.clone()))?;
371         self.store_snapshot(&handle, &aggregate)?;
372 
373         let info = StoredValueInfo::default();
374         self.save_info(&handle, &info)?;
375 
376         let arc = Arc::new(aggregate);
377         self.cache_update(&handle, arc.clone());
378 
379         Ok(arc)
380     }
381 
382     /// Send a command to the latest aggregate referenced by the handle in the command.
383     ///
384     /// This will:
385     /// - Retrieve the latest aggregate for this command.
386     /// - Call the A::process_command function
387     /// on success:
388     ///   - call pre-save listeners with events
389     ///   - save command and events
390     ///   - call post-save listeners with events
391     ///   - return aggregate
392     /// on no-op (empty event list):
393     ///   - do not save anything, return aggregate
394     /// on error:
395     ///   - save command and error, return error
command(&self, cmd: A::Command) -> Result<Arc<A>, A::Error>396     pub fn command(&self, cmd: A::Command) -> Result<Arc<A>, A::Error> {
397         debug!("Processing command {}", cmd);
398 
399         let _lock = self.outer_lock.write().unwrap();
400 
401         // Get the latest arc.
402         let handle = cmd.handle().clone();
403 
404         let mut info = self.get_info(&handle)?;
405         info.last_update = Time::now();
406         info.last_command += 1;
407 
408         let mut latest = self.get_latest_no_lock(&handle)?;
409 
410         if let Some(version) = cmd.version() {
411             if version != latest.version() {
412                 error!(
413                     "Version conflict updating '{}', expected version: {}, found: {}",
414                     handle,
415                     version,
416                     latest.version()
417                 );
418 
419                 return Err(A::Error::from(AggregateStoreError::ConcurrentModification(handle)));
420             }
421         }
422 
423         let stored_command_builder = StoredCommandBuilder::new(&cmd, latest.version(), info.last_command);
424 
425         let res = match latest.process_command(cmd) {
426             Err(e) => {
427                 let stored_command = stored_command_builder.finish_with_error(&e);
428                 self.store_command(stored_command)?;
429                 Err(e)
430             }
431             Ok(events) => {
432                 if events.is_empty() {
433                     return Ok(latest); // otherwise the version info will be updated
434                 } else {
435                     let agg = Arc::make_mut(&mut latest);
436 
437                     // Using a lock on the hashmap here to ensure that all updates happen sequentially.
438                     // It would be better to get a lock only for this specific aggregate. So it may be
439                     // worth rethinking the structure.
440                     //
441                     // That said.. saving and applying events is really quick, so this should not hurt
442                     // performance much.
443                     //
444                     // Also note that we don't need the lock to update the inner arc in the cache. We
445                     // just need it to be in scope until we are done updating.
446                     let mut cache = self.cache.write().unwrap();
447 
448                     // It should be impossible to get events for the wrong aggregate, and the wrong
449                     // versions, because we are doing the update here inside the outer lock, and aggregates
450                     // generally do not lie about who do they are.
451                     //
452                     // Still.. some defensive coding in case we do have some issue. Double check that the
453                     // events are for this aggregate, and are a contiguous sequence of version starting with
454                     // this version.
455                     let version_before = agg.version();
456                     let nr_events = events.len() as u64;
457 
458                     // Event numbers apply to the current version of an aggregate, so the first event
459                     // here applies to the current version (before applying) and the 2nd to +1 and so
460                     // on.
461                     info.last_event = version_before + nr_events - 1;
462 
463                     for i in 0..nr_events {
464                         let event = &events[i as usize];
465                         let expected_version = version_before + i;
466                         if event.version() != expected_version || event.handle() != &handle {
467                             error!("Unexpected event: {}", event);
468                             return Err(A::Error::from(AggregateStoreError::WrongEventForAggregate(
469                                 handle,
470                                 event.handle().clone(),
471                                 expected_version,
472                                 event.version(),
473                             )));
474                         }
475                     }
476 
477                     // Time to start saving things.
478                     let stored_command = stored_command_builder.finish_with_events(events.as_slice());
479 
480                     // If persistence fails, then complain loudly, and exit. Krill should not keep running, because this would
481                     // result in discrepancies between state in memory and state on disk. Let Krill crash and an operator investigate.
482                     // See issue: https://github.com/NLnetLabs/krill/issues/322
483                     if let Err(e) = self.store_command(stored_command) {
484                         error!("Cannot save state for '{}'. Got error: {}", handle, e);
485                         error!("Will now exit Krill - please verify that the disk can be written to and is not full");
486                         std::process::exit(1);
487                     }
488 
489                     // Apply events, check that the aggregate can be updated, and make sure
490                     // we have an updated version so we can store it.
491                     for event in &events {
492                         agg.apply(event.clone());
493                     }
494 
495                     // Apply events to pre save listeners which may still return errors
496                     for pre_save_listener in &self.pre_save_listeners {
497                         pre_save_listener.as_ref().listen(agg, events.as_slice())?;
498                     }
499 
500                     // Nothing broke, so it's safe to store the command, events and aggregate
501                     for event in &events {
502                         self.store_event(event)?;
503                     }
504                     info.snapshot_version = agg.version();
505                     self.store_snapshot(&handle, agg)?;
506 
507                     cache.insert(handle.clone(), Arc::new(agg.clone()));
508 
509                     // Now send the events to the 'post-save' listeners.
510                     for listener in &self.post_save_listeners {
511                         listener.as_ref().listen(agg, events.as_slice());
512                     }
513 
514                     Ok(latest)
515                 }
516             }
517         };
518 
519         self.save_info(&handle, &info)?;
520 
521         res
522     }
523 
524     /// Returns true if an instance exists for the id
has(&self, id: &Handle) -> Result<bool, AggregateStoreError>525     pub fn has(&self, id: &Handle) -> Result<bool, AggregateStoreError> {
526         let _lock = self.outer_lock.read().unwrap();
527         self.kv
528             .has_scope(id.to_string())
529             .map_err(AggregateStoreError::KeyStoreError)
530     }
531 
532     /// Lists all known ids.
list(&self) -> Result<Vec<Handle>, AggregateStoreError>533     pub fn list(&self) -> Result<Vec<Handle>, AggregateStoreError> {
534         let _lock = self.outer_lock.read().unwrap();
535         self.aggregates()
536     }
537 }
538 
539 /// # Manage Commands
540 ///
541 impl<A: Aggregate> AggregateStore<A>
542 where
543     A::Error: From<AggregateStoreError>,
544 {
545     /// Find all commands that fit the criteria and return history
command_history( &self, id: &Handle, crit: CommandHistoryCriteria, ) -> Result<CommandHistory, AggregateStoreError>546     pub fn command_history(
547         &self,
548         id: &Handle,
549         crit: CommandHistoryCriteria,
550     ) -> Result<CommandHistory, AggregateStoreError> {
551         let offset = crit.offset();
552 
553         let command_keys = self.command_keys_ascending(id, &crit)?;
554 
555         let rows = match crit.rows_limit() {
556             Some(limit) => limit,
557             None => command_keys.len(),
558         };
559 
560         let mut commands: Vec<CommandHistoryRecord> = Vec::with_capacity(rows);
561         let mut skipped = 0;
562         let mut total = 0;
563 
564         for command_key in command_keys {
565             total += 1;
566             if skipped < offset {
567                 skipped += 1;
568             } else if commands.len() < rows {
569                 let key = Self::key_for_command(id, &command_key);
570                 let stored: StoredCommand<A::StorableCommandDetails> = self
571                     .kv
572                     .get(&key)?
573                     .ok_or_else(|| AggregateStoreError::CommandNotFound(id.clone(), command_key))?;
574 
575                 let stored = stored.into();
576                 commands.push(stored);
577             }
578         }
579         Ok(CommandHistory::new(offset, total, commands))
580     }
581 
582     /// Get the command for this key, if it exists
get_command<D: WithStorableDetails>( &self, id: &Handle, command_key: &CommandKey, ) -> Result<StoredCommand<D>, AggregateStoreError>583     pub fn get_command<D: WithStorableDetails>(
584         &self,
585         id: &Handle,
586         command_key: &CommandKey,
587     ) -> Result<StoredCommand<D>, AggregateStoreError> {
588         let key = Self::key_for_command(id, command_key);
589         match self.kv.get(&key) {
590             Ok(Some(cmd)) => Ok(cmd),
591             Ok(None) => Err(AggregateStoreError::CommandNotFound(id.clone(), command_key.clone())),
592             Err(e) => {
593                 error!(
594                     "Found corrupt command at: {}, will try to archive. Error was: {}",
595                     key, e
596                 );
597                 self.kv.archive_corrupt(&key)?;
598                 Err(AggregateStoreError::CommandCorrupt(id.clone(), command_key.clone()))
599             }
600         }
601     }
602 
603     /// Get the value for this key, if any exists.
get_event<V: Event>(&self, id: &Handle, version: u64) -> Result<Option<V>, AggregateStoreError>604     pub fn get_event<V: Event>(&self, id: &Handle, version: u64) -> Result<Option<V>, AggregateStoreError> {
605         let key = Self::key_for_event(id, version);
606         match self.kv.get(&key) {
607             Ok(res_opt) => Ok(res_opt),
608             Err(e) => {
609                 error!(
610                     "Found corrupt event for {}, version {}, archiving. Error: {}",
611                     id, version, e
612                 );
613                 self.kv.archive_corrupt(&key)?;
614                 Err(AggregateStoreError::EventCorrupt(id.clone(), version))
615             }
616         }
617     }
618 }
619 
620 impl<A: Aggregate> AggregateStore<A>
621 where
622     A::Error: From<AggregateStoreError>,
623 {
has_updates(&self, id: &Handle, aggregate: &A) -> StoreResult<bool>624     fn has_updates(&self, id: &Handle, aggregate: &A) -> StoreResult<bool> {
625         Ok(self.get_event::<A::Event>(id, aggregate.version())?.is_some())
626     }
627 
cache_get(&self, id: &Handle) -> Option<Arc<A>>628     fn cache_get(&self, id: &Handle) -> Option<Arc<A>> {
629         self.cache.read().unwrap().get(id).cloned()
630     }
631 
cache_remove(&self, id: &Handle)632     fn cache_remove(&self, id: &Handle) {
633         self.cache.write().unwrap().remove(id);
634     }
635 
cache_update(&self, id: &Handle, arc: Arc<A>)636     fn cache_update(&self, id: &Handle, arc: Arc<A>) {
637         self.cache.write().unwrap().insert(id.clone(), arc);
638     }
639 
get_latest_no_lock(&self, handle: &Handle) -> StoreResult<Arc<A>>640     fn get_latest_no_lock(&self, handle: &Handle) -> StoreResult<Arc<A>> {
641         trace!("Trying to load aggregate id: {}", handle);
642 
643         let info_key = Self::key_for_info(handle);
644         let limit = self
645             .kv
646             .get::<StoredValueInfo>(&info_key)
647             .map_err(|_| AggregateStoreError::InfoCorrupt(handle.clone()))?
648             .map(|info| info.last_event);
649 
650         match self.cache_get(handle) {
651             None => match self.get_aggregate(handle, limit)? {
652                 None => {
653                     error!("Could not load aggregate with id: {} from disk", handle);
654                     Err(AggregateStoreError::UnknownAggregate(handle.clone()))
655                 }
656                 Some(agg) => {
657                     let arc: Arc<A> = Arc::new(agg);
658                     self.cache_update(handle, arc.clone());
659                     trace!("Loaded aggregate id: {} from disk", handle);
660                     Ok(arc)
661                 }
662             },
663             Some(mut arc) => {
664                 if self.has_updates(handle, &arc)? {
665                     let agg = Arc::make_mut(&mut arc);
666                     self.update_aggregate(handle, agg, limit)?;
667                 }
668                 trace!("Loaded aggregate id: {} from memory", handle);
669                 Ok(arc)
670             }
671         }
672     }
673 }
674 
675 /// # Manage values in the KeyValue store
676 ///
677 impl<A: Aggregate> AggregateStore<A>
678 where
679     A::Error: From<AggregateStoreError>,
680 {
key_version() -> KeyStoreKey681     fn key_version() -> KeyStoreKey {
682         KeyStoreKey::simple("version".to_string())
683     }
684 
key_for_info(agg: &Handle) -> KeyStoreKey685     fn key_for_info(agg: &Handle) -> KeyStoreKey {
686         KeyStoreKey::scoped(agg.to_string(), "info.json".to_string())
687     }
688 
key_for_snapshot(agg: &Handle) -> KeyStoreKey689     fn key_for_snapshot(agg: &Handle) -> KeyStoreKey {
690         KeyStoreKey::scoped(agg.to_string(), "snapshot.json".to_string())
691     }
692 
key_for_backup_snapshot(agg: &Handle) -> KeyStoreKey693     fn key_for_backup_snapshot(agg: &Handle) -> KeyStoreKey {
694         KeyStoreKey::scoped(agg.to_string(), "snapshot-bk.json".to_string())
695     }
696 
key_for_new_snapshot(agg: &Handle) -> KeyStoreKey697     fn key_for_new_snapshot(agg: &Handle) -> KeyStoreKey {
698         KeyStoreKey::scoped(agg.to_string(), "snapshot-new.json".to_string())
699     }
700 
key_for_event(agg: &Handle, version: u64) -> KeyStoreKey701     fn key_for_event(agg: &Handle, version: u64) -> KeyStoreKey {
702         KeyStoreKey::scoped(agg.to_string(), format!("delta-{}.json", version))
703     }
704 
key_for_command(agg: &Handle, command: &CommandKey) -> KeyStoreKey705     fn key_for_command(agg: &Handle, command: &CommandKey) -> KeyStoreKey {
706         KeyStoreKey::scoped(agg.to_string(), format!("{}.json", command))
707     }
708 
get_version(&self) -> Result<KrillVersion, AggregateStoreError>709     pub fn get_version(&self) -> Result<KrillVersion, AggregateStoreError> {
710         match self.kv.get::<KrillVersion>(&Self::key_version())? {
711             Some(version) => Ok(version),
712             None => Ok(KrillVersion::v0_5_0_or_before()),
713         }
714     }
715 
set_version(&self, version: &KrillVersion) -> Result<(), AggregateStoreError>716     pub fn set_version(&self, version: &KrillVersion) -> Result<(), AggregateStoreError> {
717         self.kv.store(&Self::key_version(), version)?;
718         Ok(())
719     }
720 
command_keys_ascending( &self, id: &Handle, crit: &CommandHistoryCriteria, ) -> Result<Vec<CommandKey>, AggregateStoreError>721     fn command_keys_ascending(
722         &self,
723         id: &Handle,
724         crit: &CommandHistoryCriteria,
725     ) -> Result<Vec<CommandKey>, AggregateStoreError> {
726         let mut command_keys = vec![];
727 
728         for key in self.kv.keys(Some(id.to_string()), "command--")? {
729             match CommandKey::from_str(key.name()) {
730                 Ok(command_key) => {
731                     if command_key.matches_crit(crit) {
732                         command_keys.push(command_key);
733                     }
734                 }
735                 Err(_) => {
736                     warn!("Found strange command-like key in disk key-value store: {}", key.name());
737                 }
738             }
739         }
740 
741         command_keys.sort_by(|a, b| a.sequence.cmp(&b.sequence));
742 
743         Ok(command_keys)
744     }
745 
746     /// Private, should be called through `list` which takes care of locking.
aggregates(&self) -> Result<Vec<Handle>, AggregateStoreError>747     fn aggregates(&self) -> Result<Vec<Handle>, AggregateStoreError> {
748         let mut res = vec![];
749 
750         for scope in self.kv.scopes()? {
751             if let Ok(handle) = Handle::from_str(&scope) {
752                 res.push(handle)
753             }
754         }
755 
756         Ok(res)
757     }
758 
759     /// Clean surplus events
archive_surplus_events(&self, id: &Handle, from: u64) -> Result<(), AggregateStoreError>760     fn archive_surplus_events(&self, id: &Handle, from: u64) -> Result<(), AggregateStoreError> {
761         for key in self.kv.keys(Some(id.to_string()), "delta-")? {
762             let name = key.name();
763             if name.starts_with("delta-") && name.ends_with(".json") {
764                 let start = 6;
765                 let end = name.len() - 5;
766                 if end > start {
767                     if let Ok(v) = u64::from_str(&name[start..end]) {
768                         if v >= from {
769                             let key = Self::key_for_event(id, v);
770                             warn!("Archiving surplus event for '{}': {}", id, key);
771                             self.kv
772                                 .archive_surplus(&key)
773                                 .map_err(AggregateStoreError::KeyStoreError)?
774                         }
775                     }
776                 }
777             }
778         }
779         Ok(())
780     }
781 
782     /// Archive a surplus value for a key
archive_surplus_command(&self, id: &Handle, key: &CommandKey) -> Result<(), AggregateStoreError>783     fn archive_surplus_command(&self, id: &Handle, key: &CommandKey) -> Result<(), AggregateStoreError> {
784         let key = Self::key_for_command(id, key);
785         warn!("Archiving surplus command for '{}': {}", id, key);
786         self.kv
787             .archive_surplus(&key)
788             .map_err(AggregateStoreError::KeyStoreError)
789     }
790 
791     /// MUST check if the event already exists and return an error if it does.
store_event<V: Event>(&self, event: &V) -> Result<(), AggregateStoreError>792     fn store_event<V: Event>(&self, event: &V) -> Result<(), AggregateStoreError> {
793         let id = event.handle();
794         let version = event.version();
795         let key = Self::key_for_event(id, version);
796         self.kv.store_new(&key, event)?;
797         Ok(())
798     }
799 
store_command<S: WithStorableDetails>(&self, command: StoredCommand<S>) -> Result<(), AggregateStoreError>800     fn store_command<S: WithStorableDetails>(&self, command: StoredCommand<S>) -> Result<(), AggregateStoreError> {
801         let id = command.handle();
802 
803         let command_key = CommandKey::for_stored(&command);
804         let key = Self::key_for_command(id, &command_key);
805 
806         self.kv.store_new(&key, &command)?;
807         Ok(())
808     }
809 
810     /// Get the latest aggregate
811     /// limit to the event nr, i.e. the resulting aggregate version will be limit + 1
get_aggregate(&self, id: &Handle, limit: Option<u64>) -> Result<Option<A>, AggregateStoreError>812     fn get_aggregate(&self, id: &Handle, limit: Option<u64>) -> Result<Option<A>, AggregateStoreError> {
813         // 1) Try to get a snapshot.
814         // 2) If that fails, or if it exceeds the limit, try the backup
815         // 3) If that fails, try to get the init event.
816         //
817         // Then replay all newer events that can be found up to the version (or latest if version is None)
818         trace!("Getting aggregate for '{}'", id);
819 
820         let mut aggregate_opt: Option<A> = None;
821 
822         let snapshot_key = Self::key_for_snapshot(id);
823 
824         match self.kv.get::<A>(&snapshot_key) {
825             Err(e) => {
826                 // snapshot file was present and corrupt
827                 error!(
828                     "Could not parse snapshot for '{}', archiving as corrupt. Error was: {}",
829                     id, e
830                 );
831                 self.kv.archive_corrupt(&snapshot_key)?;
832             }
833             Ok(Some(agg)) => {
834                 // snapshot present and okay
835                 trace!("Found snapshot for '{}'", id);
836                 if let Some(limit) = limit {
837                     if limit >= agg.version() - 1 {
838                         aggregate_opt = Some(agg)
839                     } else {
840                         trace!("Discarding snapshot after limit '{}'", id);
841                         self.kv.archive_surplus(&snapshot_key)?;
842                     }
843                 } else {
844                     debug!("Found valid snapshot for '{}'", id);
845                     aggregate_opt = Some(agg)
846                 }
847             }
848             Ok(None) => {}
849         }
850 
851         if aggregate_opt.is_none() {
852             warn!("No snapshot found for '{}' will try backup snapshot", id);
853             let backup_snapshot_key = Self::key_for_backup_snapshot(id);
854             match self.kv.get::<A>(&backup_snapshot_key) {
855                 Err(e) => {
856                     // backup snapshot present and corrupt
857                     error!(
858                         "Could not parse backup snapshot for '{}', archiving as corrupt. Error: {}",
859                         id, e
860                     );
861                     self.kv.archive_corrupt(&backup_snapshot_key)?;
862                 }
863                 Ok(Some(agg)) => {
864                     trace!("Found backup snapshot for '{}'", id);
865                     if let Some(limit) = limit {
866                         if limit >= agg.version() - 1 {
867                             aggregate_opt = Some(agg)
868                         } else {
869                             trace!("Discarding backup snapshot after limit '{}'", id);
870                             self.kv.archive_surplus(&backup_snapshot_key)?;
871                         }
872                     } else {
873                         debug!("Found valid backup snapshot for '{}'", id);
874                         aggregate_opt = Some(agg)
875                     }
876                 }
877                 Ok(None) => {}
878             }
879         }
880 
881         if aggregate_opt.is_none() {
882             warn!("No snapshots found for '{}' will try from initialization event.", id);
883             let init_key = Self::key_for_event(id, 0);
884             aggregate_opt = match self.kv.get::<A::InitEvent>(&init_key)? {
885                 Some(e) => {
886                     trace!("Rebuilding aggregate {} from init event", id);
887                     Some(A::init(e).map_err(|_| AggregateStoreError::InitError(id.clone()))?)
888                 }
889                 None => None,
890             }
891         }
892 
893         match aggregate_opt {
894             None => Ok(None),
895             Some(mut aggregate) => {
896                 self.update_aggregate(id, &mut aggregate, limit)?;
897                 Ok(Some(aggregate))
898             }
899         }
900     }
901 
update_aggregate(&self, id: &Handle, aggregate: &mut A, limit: Option<u64>) -> Result<(), AggregateStoreError>902     fn update_aggregate(&self, id: &Handle, aggregate: &mut A, limit: Option<u64>) -> Result<(), AggregateStoreError> {
903         let start = aggregate.version();
904         let limit = if let Some(limit) = limit {
905             debug!("Will attempt to update '{}' using explicit limit", id);
906             limit
907         } else if let Ok(info) = self.get_info(id) {
908             debug!("Will attempt to update '{}' using limit from info", id);
909             info.last_event
910         } else {
911             let nr_events = self.kv.keys(Some(id.to_string()), "delta-")?.len();
912             if nr_events < 1 {
913                 return Err(AggregateStoreError::InfoMissing(id.clone()));
914             } else {
915                 let limit = (nr_events - 1) as u64;
916                 debug!("Will attempt to update '{}' from limit based on nr of events", id,);
917                 limit
918             }
919         };
920 
921         if limit == aggregate.version() - 1 {
922             // already at version, done
923             // note that an event has the version of the aggregate it *affects*. So delta 10 results in version 11.
924             debug!("Snapshot for '{}' is up to date", id);
925             return Ok(());
926         }
927 
928         debug!(
929             "Will attempt to update '{}' from version: {} to: {}",
930             id,
931             start,
932             limit + 1
933         );
934 
935         if start > limit {
936             return Err(AggregateStoreError::ReplayError(id.clone(), limit, start));
937         }
938 
939         for version in start..limit + 1 {
940             if let Some(e) = self.get_event(id, version)? {
941                 if aggregate.version() != version {
942                     error!("Trying to apply event to wrong version of aggregate in replay");
943                     return Err(AggregateStoreError::ReplayError(id.clone(), limit, version));
944                 }
945                 aggregate.apply(e);
946                 debug!("Applied event nr {} to aggregate {}", version, id);
947             } else {
948                 return Err(AggregateStoreError::ReplayError(id.clone(), limit, version));
949             }
950         }
951 
952         Ok(())
953     }
954 
955     /// Saves the latest snapshot - overwrites any previous snapshot.
store_snapshot<V: Aggregate>(&self, id: &Handle, aggregate: &V) -> Result<(), AggregateStoreError>956     fn store_snapshot<V: Aggregate>(&self, id: &Handle, aggregate: &V) -> Result<(), AggregateStoreError> {
957         let snapshot_new = Self::key_for_new_snapshot(id);
958         let snapshot_current = Self::key_for_snapshot(id);
959         let snapshot_backup = Self::key_for_backup_snapshot(id);
960 
961         self.kv.store(&snapshot_new, aggregate)?;
962 
963         if self.kv.has(&snapshot_backup)? {
964             self.kv.drop_key(&snapshot_backup)?;
965         }
966         if self.kv.has(&snapshot_current)? {
967             self.kv.move_key(&snapshot_current, &snapshot_backup)?;
968         }
969         self.kv.move_key(&snapshot_new, &snapshot_current)?;
970 
971         Ok(())
972     }
973 
974     /// Drop an aggregate, completely. Handle with care!
drop_aggregate(&self, id: &Handle) -> Result<(), AggregateStoreError>975     pub fn drop_aggregate(&self, id: &Handle) -> Result<(), AggregateStoreError> {
976         self.cache_remove(id);
977         self.kv.drop_scope(id.as_str())?;
978         Ok(())
979     }
980 
get_info(&self, id: &Handle) -> Result<StoredValueInfo, AggregateStoreError>981     fn get_info(&self, id: &Handle) -> Result<StoredValueInfo, AggregateStoreError> {
982         let key = Self::key_for_info(id);
983         let info = self
984             .kv
985             .get(&key)
986             .map_err(|_| AggregateStoreError::InfoCorrupt(id.clone()))?;
987         info.ok_or_else(|| AggregateStoreError::InfoMissing(id.clone()))
988     }
989 
save_info(&self, id: &Handle, info: &StoredValueInfo) -> Result<(), AggregateStoreError>990     fn save_info(&self, id: &Handle, info: &StoredValueInfo) -> Result<(), AggregateStoreError> {
991         let key = Self::key_for_info(id);
992         self.kv.store(&key, info).map_err(AggregateStoreError::KeyStoreError)
993     }
994 }
995 
996 //------------ AggregateStoreError -------------------------------------------
997 
998 /// This type defines possible Errors for the AggregateStore
999 #[derive(Debug)]
1000 pub enum AggregateStoreError {
1001     IoError(KrillIoError),
1002     KeyStoreError(KeyValueError),
1003     NotInitialized,
1004     UnknownAggregate(Handle),
1005     InitError(Handle),
1006     ReplayError(Handle, u64, u64),
1007     InfoMissing(Handle),
1008     InfoCorrupt(Handle),
1009     WrongEventForAggregate(Handle, Handle, u64, u64),
1010     ConcurrentModification(Handle),
1011     UnknownCommand(Handle, u64),
1012     CommandOffsetTooLarge(u64, u64),
1013     WarmupFailed(Handle, String),
1014     CouldNotRecover(Handle),
1015     CouldNotArchive(Handle, String),
1016     CommandCorrupt(Handle, CommandKey),
1017     CommandNotFound(Handle, CommandKey),
1018     EventCorrupt(Handle, u64),
1019 }
1020 
1021 impl fmt::Display for AggregateStoreError {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1022     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1023         match self {
1024             AggregateStoreError::IoError(e) => e.fmt(f),
1025             AggregateStoreError::KeyStoreError(e) => write!(f, "KeyStore Error: {}", e),
1026             AggregateStoreError::NotInitialized => write!(f, "This aggregate store is not initialized"),
1027             AggregateStoreError::UnknownAggregate(handle) => write!(f, "unknown entity: {}", handle),
1028             AggregateStoreError::InitError(handle) => {
1029                 write!(f, "Init event exists for '{}', but cannot be applied", handle)
1030             }
1031             AggregateStoreError::ReplayError(handle, target_version, fail_version) => write!(
1032                 f,
1033                 "Cannot reconstruct '{}' to version '{}', failed at version {}",
1034                 handle, target_version, fail_version
1035             ),
1036             AggregateStoreError::InfoMissing(handle) => write!(f, "Missing stored value info for '{}'", handle),
1037             AggregateStoreError::InfoCorrupt(handle) => write!(f, "Corrupt stored value info for '{}'", handle),
1038             AggregateStoreError::WrongEventForAggregate(expected, found, expected_v, found_v) => {
1039                 write!(
1040                     f,
1041                     "event not applicable to entity. Expected: {} {}, found: {} {}",
1042                     expected, expected_v, found, found_v
1043                 )
1044             }
1045             AggregateStoreError::ConcurrentModification(handle) => {
1046                 write!(f, "concurrent modification attempt for entity: '{}'", handle)
1047             }
1048             AggregateStoreError::UnknownCommand(handle, seq) => write!(
1049                 f,
1050                 "Aggregate '{}' does not have command with sequence '{}'",
1051                 handle, seq
1052             ),
1053             AggregateStoreError::CommandOffsetTooLarge(offset, total) => {
1054                 write!(f, "Offset '{}' exceeds total '{}'", offset, total)
1055             }
1056             AggregateStoreError::WarmupFailed(handle, e) => {
1057                 write!(f, "Could not rebuild state for '{}': {}", handle, e)
1058             }
1059             AggregateStoreError::CouldNotRecover(handle) => write!(
1060                 f,
1061                 "Could not recover state for '{}', aborting recover. Use backup!!",
1062                 handle
1063             ),
1064             AggregateStoreError::CouldNotArchive(handle, e) => write!(
1065                 f,
1066                 "Could not archive commands and events for '{}'. Error: {}",
1067                 handle, e
1068             ),
1069             AggregateStoreError::CommandCorrupt(handle, key) => {
1070                 write!(f, "StoredCommand '{}' for '{}' was corrupt", handle, key)
1071             }
1072             AggregateStoreError::CommandNotFound(handle, key) => {
1073                 write!(f, "StoredCommand '{}' for '{}' cannot be found", handle, key)
1074             }
1075             AggregateStoreError::EventCorrupt(handle, version) => {
1076                 write!(f, "Stored event '{}' for '{}' was corrupt", handle, version)
1077             }
1078         }
1079     }
1080 }
1081 
1082 impl From<KeyValueError> for AggregateStoreError {
from(e: KeyValueError) -> Self1083     fn from(e: KeyValueError) -> Self {
1084         AggregateStoreError::KeyStoreError(e)
1085     }
1086 }
1087 
1088 //------------ Tests ---------------------------------------------------------
1089 
1090 #[cfg(test)]
1091 mod tests {
1092 
1093     use super::*;
1094 
1095     #[test]
command_key_to_from_str()1096     fn command_key_to_from_str() {
1097         let key_str = "command--1576389600--87--cmd-ca-publish";
1098         let key = CommandKey::from_str(key_str).unwrap();
1099         assert_eq!(key_str, &key.to_string());
1100 
1101         let key_with_dot_json_str = "command--1576389600--87--cmd-ca-publish.json";
1102         let key_with_dot_json = CommandKey::from_str(key_with_dot_json_str).unwrap();
1103 
1104         assert_eq!(key, key_with_dot_json);
1105     }
1106 }
1107