1 use std::{ 2 collections::HashMap, 3 fmt, 4 path::Path, 5 str::FromStr, 6 sync::{Arc, RwLock}, 7 }; 8 9 use serde::{de::DeserializeOwned, Deserialize, Serialize}; 10 11 use rpki::repository::x509::Time; 12 13 use crate::commons::eventsourcing::{ 14 cmd::{Command, StoredCommandBuilder}, 15 Aggregate, Event, KeyStoreKey, KeyValueError, KeyValueStore, PostSaveEventListener, StoredCommand, 16 WithStorableDetails, 17 }; 18 use crate::commons::{ 19 api::{CommandHistory, CommandHistoryCriteria, CommandHistoryRecord, Handle, Label}, 20 error::KrillIoError, 21 util::KrillVersion, 22 }; 23 24 use super::PreSaveEventListener; 25 26 pub type StoreResult<T> = Result<T, AggregateStoreError>; 27 28 //------------ Storable ------------------------------------------------------ 29 30 pub trait Storable: Clone + Serialize + DeserializeOwned + Sized + 'static {} 31 impl<T: Clone + Serialize + DeserializeOwned + Sized + 'static> Storable for T {} 32 33 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] 34 pub struct StoredValueInfo { 35 pub snapshot_version: u64, 36 pub last_event: u64, 37 pub last_command: u64, 38 pub last_update: Time, 39 } 40 41 impl Default for StoredValueInfo { default() -> Self42 fn default() -> Self { 43 StoredValueInfo { 44 snapshot_version: 0, 45 last_event: 0, 46 last_command: 0, 47 last_update: Time::now(), 48 } 49 } 50 } 51 52 //------------ CommandKey ---------------------------------------------------- 53 54 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] 55 pub struct CommandKey { 56 pub sequence: u64, 57 pub timestamp_secs: i64, 58 pub label: Label, 59 } 60 61 impl CommandKey { new(sequence: u64, time: Time, label: Label) -> Self62 pub fn new(sequence: u64, time: Time, label: Label) -> Self { 63 CommandKey { 64 sequence, 65 timestamp_secs: time.timestamp(), 66 label, 67 } 68 } 69 for_stored<S: WithStorableDetails>(command: &StoredCommand<S>) -> CommandKey70 pub fn for_stored<S: WithStorableDetails>(command: &StoredCommand<S>) -> CommandKey { 71 CommandKey::new(command.sequence(), command.time(), command.details().summary().label) 72 } 73 matches_crit(&self, crit: &CommandHistoryCriteria) -> bool74 pub fn matches_crit(&self, crit: &CommandHistoryCriteria) -> bool { 75 crit.matches_timestamp_secs(self.timestamp_secs) 76 && crit.matches_label(&self.label) 77 && crit.matches_sequence(self.sequence) 78 } 79 } 80 81 impl fmt::Display for CommandKey { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result82 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 83 write!(f, "command--{}--{}--{}", self.timestamp_secs, self.sequence, self.label) 84 } 85 } 86 87 impl FromStr for CommandKey { 88 type Err = CommandKeyError; 89 from_str(s: &str) -> Result<Self, Self::Err>90 fn from_str(s: &str) -> Result<Self, Self::Err> { 91 let parts: Vec<&str> = s.split("--").collect(); 92 if parts.len() != 4 || parts[0] != "command" { 93 Err(CommandKeyError(s.to_string())) 94 } else { 95 let timestamp_secs = i64::from_str(parts[1]).map_err(|_| CommandKeyError(s.to_string()))?; 96 let sequence = u64::from_str(parts[2]).map_err(|_| CommandKeyError(s.to_string()))?; 97 // strip .json if present on the label part 98 let label = { 99 let end = parts[3].to_string(); 100 let last = if end.ends_with(".json") { 101 end.len() - 5 102 } else { 103 end.len() 104 }; 105 (end[0..last]).to_string() 106 }; 107 108 Ok(CommandKey { 109 sequence, 110 timestamp_secs, 111 label, 112 }) 113 } 114 } 115 } 116 117 //------------ CommandKeyError ----------------------------------------------- 118 119 #[derive(Clone, Debug, Eq, PartialEq)] 120 pub struct CommandKeyError(String); 121 122 impl fmt::Display for CommandKeyError { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result123 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 124 write!(f, "invalid command key: {}", self.0) 125 } 126 } 127 128 //------------ AggregateStore ------------------------------------------------ 129 130 /// This type is responsible for managing aggregates. 131 pub struct AggregateStore<A: Aggregate> { 132 kv: KeyValueStore, 133 cache: RwLock<HashMap<Handle, Arc<A>>>, 134 pre_save_listeners: Vec<Arc<dyn PreSaveEventListener<A>>>, 135 post_save_listeners: Vec<Arc<dyn PostSaveEventListener<A>>>, 136 outer_lock: RwLock<()>, 137 } 138 139 /// # Starting up 140 /// 141 impl<A: Aggregate> AggregateStore<A> 142 where 143 A::Error: From<AggregateStoreError>, 144 { 145 /// Creates an AggregateStore using a disk based KeyValueStore disk(work_dir: &Path, name_space: &str) -> StoreResult<Self>146 pub fn disk(work_dir: &Path, name_space: &str) -> StoreResult<Self> { 147 let mut path = work_dir.to_path_buf(); 148 path.push(name_space); 149 let existed = path.exists(); 150 151 let kv = KeyValueStore::disk(work_dir, name_space)?; 152 let cache = RwLock::new(HashMap::new()); 153 let pre_save_listeners = vec![]; 154 let post_save_listeners = vec![]; 155 let outer_lock = RwLock::new(()); 156 157 let store = AggregateStore { 158 kv, 159 cache, 160 pre_save_listeners, 161 post_save_listeners, 162 outer_lock, 163 }; 164 165 if !existed { 166 store.set_version(&KrillVersion::current())?; 167 } 168 169 Ok(store) 170 } 171 172 /// Warms up the cache, to be used after startup. Will fail if any aggregates fail to load 173 /// in which case a 'recover' operation can be tried. warm(&self) -> StoreResult<()>174 pub fn warm(&self) -> StoreResult<()> { 175 for handle in self.list()? { 176 self.warm_aggregate(&handle)?; 177 } 178 Ok(()) 179 } 180 181 /// Warm the cache for a specific aggregate. If successful save the latest snapshot 182 /// as well (will help in case of migrations where snapshots were dropped). 183 /// 184 /// In case any surplus event(s) and/or command(s) are encountered, i.e. extra entries not 185 /// recorded in the 'info.json' which is always saved last on state changes - then it is 186 /// assumed that an incomplete transaction took place. The surplus entries will be archived 187 /// and warnings will be reported. warm_aggregate(&self, handle: &Handle) -> StoreResult<()>188 pub fn warm_aggregate(&self, handle: &Handle) -> StoreResult<()> { 189 let agg = self 190 .get_latest(handle) 191 .map_err(|e| AggregateStoreError::WarmupFailed(handle.clone(), e.to_string()))?; 192 193 // check that last command and event are consistent with 194 // the info, if not fail warmup and force recover 195 let info = self.get_info(handle)?; 196 197 // for events we can just check if the next event, after 198 // the last event in the info exists 199 if self.get_event::<A::Event>(handle, info.last_event + 1)?.is_some() { 200 warn!( 201 "Found surplus event(s) for '{}' when warming up the cache. Will archive!", 202 handle 203 ); 204 self.archive_surplus_events(handle, info.last_event + 1)?; 205 } 206 207 // Check if there are any commands with a sequence after the last 208 // recorded sequence in the info. 209 let mut crit = CommandHistoryCriteria::default(); 210 crit.set_after_sequence(info.last_command); 211 let surplus_commands = self.command_keys_ascending(handle, &crit)?; 212 if !surplus_commands.is_empty() { 213 warn!( 214 "Found surplus command(s) for '{}' when warming up the cache. Will archive!", 215 handle 216 ); 217 218 for command in surplus_commands { 219 self.archive_surplus_command(handle, &command)?; 220 } 221 } 222 223 // Save the snapshot if it does not yet match the latest state 224 if info.snapshot_version != agg.version() { 225 info!("Updating snapshot for '{}', to decrease future load times.", handle); 226 self.store_snapshot(handle, agg.as_ref())?; 227 } 228 229 Ok(()) 230 } 231 232 /// Recovers aggregates to the latest consistent saved in the keystore by verifying 233 /// all commands, and the corresponding events. Use this in case the state on disk is 234 /// found to be inconsistent. I.e. the `warm` function failed and Krill exited. 235 /// 236 /// Note Krill has an option to *always* use this recover function when it starts, 237 /// but the default is that it just uses `warm` function instead. The reason for this 238 /// is that `recover` can take longer, and that it could lead silent recovery without 239 /// alerting to operators to underlying issues. recover(&self) -> StoreResult<()>240 pub fn recover(&self) -> StoreResult<()> { 241 let criteria = CommandHistoryCriteria::default(); 242 for handle in self.list()? { 243 info!("Will recover state for '{}'", &handle); 244 245 // Check 246 // - All commands, archive bad commands 247 // - All events, archive bad events 248 // - Keep track of last known good command and event 249 // - Archive all commands and events after 250 // 251 // Rebuild state up to event: 252 // - use snapshot - archive if bad 253 // - use back-up snapshot if snapshot is no good - archive if bad 254 // - start from init event if back-up snapshot is bad, or if the version exceeds last good event 255 // - process events from (back-up) snapshot up to last good event 256 // 257 // If still good: 258 // - save snapshot 259 // - save info 260 261 let mut last_good_cmd = 0; 262 let mut last_good_evt = 0; 263 let mut last_update = Time::now(); 264 265 // Check all commands and associated events 266 let mut all_ok = true; 267 268 let command_keys = self.command_keys_ascending(&handle, &criteria)?; 269 info!("Processing {} commands for {}", command_keys.len(), handle); 270 for (counter, command_key) in command_keys.into_iter().enumerate() { 271 if counter % 100 == 0 { 272 info!("Processed {} commands", counter); 273 } 274 275 if all_ok { 276 if let Ok(cmd) = self.get_command::<A::StorableCommandDetails>(&handle, &command_key) { 277 if let Some(events) = cmd.effect().events() { 278 for version in events { 279 if let Ok(Some(_)) = self.get_event::<A::Event>(&handle, *version) { 280 last_good_evt = *version; 281 } else { 282 all_ok = false; 283 } 284 } 285 } 286 last_good_cmd = cmd.sequence(); 287 last_update = cmd.time(); 288 } else { 289 all_ok = false; 290 } 291 } 292 if !all_ok { 293 warn!( 294 "Command {} was corrupt, or not all events could be loaded. Will archive surplus", 295 command_key 296 ); 297 // Bad command or event encountered.. archive surplus commands 298 // note that we will clean surplus events later 299 self.archive_surplus_command(&handle, &command_key)?; 300 } 301 } 302 303 self.archive_surplus_events(&handle, last_good_evt + 1)?; 304 305 if !all_ok { 306 warn!( 307 "State for '{}' can only be recovered to version: {}. Check corrupt and surplus dirs", 308 &handle, last_good_evt 309 ); 310 } 311 312 // Get the latest aggregate, not that this ensures that the snapshots 313 // are checked, and archived if corrupt, or if they are after the last_good_evt 314 let agg = self 315 .get_aggregate(&handle, Some(last_good_evt))? 316 .ok_or_else(|| AggregateStoreError::CouldNotRecover(handle.clone()))?; 317 318 let snapshot_version = agg.version(); 319 320 let info = StoredValueInfo { 321 last_event: last_good_evt, 322 last_command: last_good_cmd, 323 last_update, 324 snapshot_version, 325 }; 326 327 self.store_snapshot(&handle, &agg)?; 328 329 self.cache_update(&handle, Arc::new(agg)); 330 331 self.save_info(&handle, &info)?; 332 } 333 334 Ok(()) 335 } 336 337 /// Adds a listener that will receive all events before they are stored. add_pre_save_listener<L: PreSaveEventListener<A>>(&mut self, sync_listener: Arc<L>)338 pub fn add_pre_save_listener<L: PreSaveEventListener<A>>(&mut self, sync_listener: Arc<L>) { 339 self.pre_save_listeners.push(sync_listener); 340 } 341 342 /// Adds a listener that will receive a reference to all events after they are stored. add_post_save_listener<L: PostSaveEventListener<A>>(&mut self, listener: Arc<L>)343 pub fn add_post_save_listener<L: PostSaveEventListener<A>>(&mut self, listener: Arc<L>) { 344 self.post_save_listeners.push(listener); 345 } 346 } 347 348 /// # Manage Aggregates 349 /// 350 impl<A: Aggregate> AggregateStore<A> 351 where 352 A::Error: From<AggregateStoreError>, 353 { 354 /// Gets the latest version for the given aggregate. Returns 355 /// an AggregateStoreError::UnknownAggregate in case the aggregate 356 /// does not exist. get_latest(&self, handle: &Handle) -> StoreResult<Arc<A>>357 pub fn get_latest(&self, handle: &Handle) -> StoreResult<Arc<A>> { 358 let _lock = self.outer_lock.read().unwrap(); 359 self.get_latest_no_lock(handle) 360 } 361 362 /// Adds a new aggregate instance based on the init event. add(&self, init: A::InitEvent) -> StoreResult<Arc<A>>363 pub fn add(&self, init: A::InitEvent) -> StoreResult<Arc<A>> { 364 let _lock = self.outer_lock.write().unwrap(); 365 366 self.store_event(&init)?; 367 368 let handle = init.handle().clone(); 369 370 let aggregate = A::init(init).map_err(|_| AggregateStoreError::InitError(handle.clone()))?; 371 self.store_snapshot(&handle, &aggregate)?; 372 373 let info = StoredValueInfo::default(); 374 self.save_info(&handle, &info)?; 375 376 let arc = Arc::new(aggregate); 377 self.cache_update(&handle, arc.clone()); 378 379 Ok(arc) 380 } 381 382 /// Send a command to the latest aggregate referenced by the handle in the command. 383 /// 384 /// This will: 385 /// - Retrieve the latest aggregate for this command. 386 /// - Call the A::process_command function 387 /// on success: 388 /// - call pre-save listeners with events 389 /// - save command and events 390 /// - call post-save listeners with events 391 /// - return aggregate 392 /// on no-op (empty event list): 393 /// - do not save anything, return aggregate 394 /// on error: 395 /// - save command and error, return error command(&self, cmd: A::Command) -> Result<Arc<A>, A::Error>396 pub fn command(&self, cmd: A::Command) -> Result<Arc<A>, A::Error> { 397 debug!("Processing command {}", cmd); 398 399 let _lock = self.outer_lock.write().unwrap(); 400 401 // Get the latest arc. 402 let handle = cmd.handle().clone(); 403 404 let mut info = self.get_info(&handle)?; 405 info.last_update = Time::now(); 406 info.last_command += 1; 407 408 let mut latest = self.get_latest_no_lock(&handle)?; 409 410 if let Some(version) = cmd.version() { 411 if version != latest.version() { 412 error!( 413 "Version conflict updating '{}', expected version: {}, found: {}", 414 handle, 415 version, 416 latest.version() 417 ); 418 419 return Err(A::Error::from(AggregateStoreError::ConcurrentModification(handle))); 420 } 421 } 422 423 let stored_command_builder = StoredCommandBuilder::new(&cmd, latest.version(), info.last_command); 424 425 let res = match latest.process_command(cmd) { 426 Err(e) => { 427 let stored_command = stored_command_builder.finish_with_error(&e); 428 self.store_command(stored_command)?; 429 Err(e) 430 } 431 Ok(events) => { 432 if events.is_empty() { 433 return Ok(latest); // otherwise the version info will be updated 434 } else { 435 let agg = Arc::make_mut(&mut latest); 436 437 // Using a lock on the hashmap here to ensure that all updates happen sequentially. 438 // It would be better to get a lock only for this specific aggregate. So it may be 439 // worth rethinking the structure. 440 // 441 // That said.. saving and applying events is really quick, so this should not hurt 442 // performance much. 443 // 444 // Also note that we don't need the lock to update the inner arc in the cache. We 445 // just need it to be in scope until we are done updating. 446 let mut cache = self.cache.write().unwrap(); 447 448 // It should be impossible to get events for the wrong aggregate, and the wrong 449 // versions, because we are doing the update here inside the outer lock, and aggregates 450 // generally do not lie about who do they are. 451 // 452 // Still.. some defensive coding in case we do have some issue. Double check that the 453 // events are for this aggregate, and are a contiguous sequence of version starting with 454 // this version. 455 let version_before = agg.version(); 456 let nr_events = events.len() as u64; 457 458 // Event numbers apply to the current version of an aggregate, so the first event 459 // here applies to the current version (before applying) and the 2nd to +1 and so 460 // on. 461 info.last_event = version_before + nr_events - 1; 462 463 for i in 0..nr_events { 464 let event = &events[i as usize]; 465 let expected_version = version_before + i; 466 if event.version() != expected_version || event.handle() != &handle { 467 error!("Unexpected event: {}", event); 468 return Err(A::Error::from(AggregateStoreError::WrongEventForAggregate( 469 handle, 470 event.handle().clone(), 471 expected_version, 472 event.version(), 473 ))); 474 } 475 } 476 477 // Time to start saving things. 478 let stored_command = stored_command_builder.finish_with_events(events.as_slice()); 479 480 // If persistence fails, then complain loudly, and exit. Krill should not keep running, because this would 481 // result in discrepancies between state in memory and state on disk. Let Krill crash and an operator investigate. 482 // See issue: https://github.com/NLnetLabs/krill/issues/322 483 if let Err(e) = self.store_command(stored_command) { 484 error!("Cannot save state for '{}'. Got error: {}", handle, e); 485 error!("Will now exit Krill - please verify that the disk can be written to and is not full"); 486 std::process::exit(1); 487 } 488 489 // Apply events, check that the aggregate can be updated, and make sure 490 // we have an updated version so we can store it. 491 for event in &events { 492 agg.apply(event.clone()); 493 } 494 495 // Apply events to pre save listeners which may still return errors 496 for pre_save_listener in &self.pre_save_listeners { 497 pre_save_listener.as_ref().listen(agg, events.as_slice())?; 498 } 499 500 // Nothing broke, so it's safe to store the command, events and aggregate 501 for event in &events { 502 self.store_event(event)?; 503 } 504 info.snapshot_version = agg.version(); 505 self.store_snapshot(&handle, agg)?; 506 507 cache.insert(handle.clone(), Arc::new(agg.clone())); 508 509 // Now send the events to the 'post-save' listeners. 510 for listener in &self.post_save_listeners { 511 listener.as_ref().listen(agg, events.as_slice()); 512 } 513 514 Ok(latest) 515 } 516 } 517 }; 518 519 self.save_info(&handle, &info)?; 520 521 res 522 } 523 524 /// Returns true if an instance exists for the id has(&self, id: &Handle) -> Result<bool, AggregateStoreError>525 pub fn has(&self, id: &Handle) -> Result<bool, AggregateStoreError> { 526 let _lock = self.outer_lock.read().unwrap(); 527 self.kv 528 .has_scope(id.to_string()) 529 .map_err(AggregateStoreError::KeyStoreError) 530 } 531 532 /// Lists all known ids. list(&self) -> Result<Vec<Handle>, AggregateStoreError>533 pub fn list(&self) -> Result<Vec<Handle>, AggregateStoreError> { 534 let _lock = self.outer_lock.read().unwrap(); 535 self.aggregates() 536 } 537 } 538 539 /// # Manage Commands 540 /// 541 impl<A: Aggregate> AggregateStore<A> 542 where 543 A::Error: From<AggregateStoreError>, 544 { 545 /// Find all commands that fit the criteria and return history command_history( &self, id: &Handle, crit: CommandHistoryCriteria, ) -> Result<CommandHistory, AggregateStoreError>546 pub fn command_history( 547 &self, 548 id: &Handle, 549 crit: CommandHistoryCriteria, 550 ) -> Result<CommandHistory, AggregateStoreError> { 551 let offset = crit.offset(); 552 553 let command_keys = self.command_keys_ascending(id, &crit)?; 554 555 let rows = match crit.rows_limit() { 556 Some(limit) => limit, 557 None => command_keys.len(), 558 }; 559 560 let mut commands: Vec<CommandHistoryRecord> = Vec::with_capacity(rows); 561 let mut skipped = 0; 562 let mut total = 0; 563 564 for command_key in command_keys { 565 total += 1; 566 if skipped < offset { 567 skipped += 1; 568 } else if commands.len() < rows { 569 let key = Self::key_for_command(id, &command_key); 570 let stored: StoredCommand<A::StorableCommandDetails> = self 571 .kv 572 .get(&key)? 573 .ok_or_else(|| AggregateStoreError::CommandNotFound(id.clone(), command_key))?; 574 575 let stored = stored.into(); 576 commands.push(stored); 577 } 578 } 579 Ok(CommandHistory::new(offset, total, commands)) 580 } 581 582 /// Get the command for this key, if it exists get_command<D: WithStorableDetails>( &self, id: &Handle, command_key: &CommandKey, ) -> Result<StoredCommand<D>, AggregateStoreError>583 pub fn get_command<D: WithStorableDetails>( 584 &self, 585 id: &Handle, 586 command_key: &CommandKey, 587 ) -> Result<StoredCommand<D>, AggregateStoreError> { 588 let key = Self::key_for_command(id, command_key); 589 match self.kv.get(&key) { 590 Ok(Some(cmd)) => Ok(cmd), 591 Ok(None) => Err(AggregateStoreError::CommandNotFound(id.clone(), command_key.clone())), 592 Err(e) => { 593 error!( 594 "Found corrupt command at: {}, will try to archive. Error was: {}", 595 key, e 596 ); 597 self.kv.archive_corrupt(&key)?; 598 Err(AggregateStoreError::CommandCorrupt(id.clone(), command_key.clone())) 599 } 600 } 601 } 602 603 /// Get the value for this key, if any exists. get_event<V: Event>(&self, id: &Handle, version: u64) -> Result<Option<V>, AggregateStoreError>604 pub fn get_event<V: Event>(&self, id: &Handle, version: u64) -> Result<Option<V>, AggregateStoreError> { 605 let key = Self::key_for_event(id, version); 606 match self.kv.get(&key) { 607 Ok(res_opt) => Ok(res_opt), 608 Err(e) => { 609 error!( 610 "Found corrupt event for {}, version {}, archiving. Error: {}", 611 id, version, e 612 ); 613 self.kv.archive_corrupt(&key)?; 614 Err(AggregateStoreError::EventCorrupt(id.clone(), version)) 615 } 616 } 617 } 618 } 619 620 impl<A: Aggregate> AggregateStore<A> 621 where 622 A::Error: From<AggregateStoreError>, 623 { has_updates(&self, id: &Handle, aggregate: &A) -> StoreResult<bool>624 fn has_updates(&self, id: &Handle, aggregate: &A) -> StoreResult<bool> { 625 Ok(self.get_event::<A::Event>(id, aggregate.version())?.is_some()) 626 } 627 cache_get(&self, id: &Handle) -> Option<Arc<A>>628 fn cache_get(&self, id: &Handle) -> Option<Arc<A>> { 629 self.cache.read().unwrap().get(id).cloned() 630 } 631 cache_remove(&self, id: &Handle)632 fn cache_remove(&self, id: &Handle) { 633 self.cache.write().unwrap().remove(id); 634 } 635 cache_update(&self, id: &Handle, arc: Arc<A>)636 fn cache_update(&self, id: &Handle, arc: Arc<A>) { 637 self.cache.write().unwrap().insert(id.clone(), arc); 638 } 639 get_latest_no_lock(&self, handle: &Handle) -> StoreResult<Arc<A>>640 fn get_latest_no_lock(&self, handle: &Handle) -> StoreResult<Arc<A>> { 641 trace!("Trying to load aggregate id: {}", handle); 642 643 let info_key = Self::key_for_info(handle); 644 let limit = self 645 .kv 646 .get::<StoredValueInfo>(&info_key) 647 .map_err(|_| AggregateStoreError::InfoCorrupt(handle.clone()))? 648 .map(|info| info.last_event); 649 650 match self.cache_get(handle) { 651 None => match self.get_aggregate(handle, limit)? { 652 None => { 653 error!("Could not load aggregate with id: {} from disk", handle); 654 Err(AggregateStoreError::UnknownAggregate(handle.clone())) 655 } 656 Some(agg) => { 657 let arc: Arc<A> = Arc::new(agg); 658 self.cache_update(handle, arc.clone()); 659 trace!("Loaded aggregate id: {} from disk", handle); 660 Ok(arc) 661 } 662 }, 663 Some(mut arc) => { 664 if self.has_updates(handle, &arc)? { 665 let agg = Arc::make_mut(&mut arc); 666 self.update_aggregate(handle, agg, limit)?; 667 } 668 trace!("Loaded aggregate id: {} from memory", handle); 669 Ok(arc) 670 } 671 } 672 } 673 } 674 675 /// # Manage values in the KeyValue store 676 /// 677 impl<A: Aggregate> AggregateStore<A> 678 where 679 A::Error: From<AggregateStoreError>, 680 { key_version() -> KeyStoreKey681 fn key_version() -> KeyStoreKey { 682 KeyStoreKey::simple("version".to_string()) 683 } 684 key_for_info(agg: &Handle) -> KeyStoreKey685 fn key_for_info(agg: &Handle) -> KeyStoreKey { 686 KeyStoreKey::scoped(agg.to_string(), "info.json".to_string()) 687 } 688 key_for_snapshot(agg: &Handle) -> KeyStoreKey689 fn key_for_snapshot(agg: &Handle) -> KeyStoreKey { 690 KeyStoreKey::scoped(agg.to_string(), "snapshot.json".to_string()) 691 } 692 key_for_backup_snapshot(agg: &Handle) -> KeyStoreKey693 fn key_for_backup_snapshot(agg: &Handle) -> KeyStoreKey { 694 KeyStoreKey::scoped(agg.to_string(), "snapshot-bk.json".to_string()) 695 } 696 key_for_new_snapshot(agg: &Handle) -> KeyStoreKey697 fn key_for_new_snapshot(agg: &Handle) -> KeyStoreKey { 698 KeyStoreKey::scoped(agg.to_string(), "snapshot-new.json".to_string()) 699 } 700 key_for_event(agg: &Handle, version: u64) -> KeyStoreKey701 fn key_for_event(agg: &Handle, version: u64) -> KeyStoreKey { 702 KeyStoreKey::scoped(agg.to_string(), format!("delta-{}.json", version)) 703 } 704 key_for_command(agg: &Handle, command: &CommandKey) -> KeyStoreKey705 fn key_for_command(agg: &Handle, command: &CommandKey) -> KeyStoreKey { 706 KeyStoreKey::scoped(agg.to_string(), format!("{}.json", command)) 707 } 708 get_version(&self) -> Result<KrillVersion, AggregateStoreError>709 pub fn get_version(&self) -> Result<KrillVersion, AggregateStoreError> { 710 match self.kv.get::<KrillVersion>(&Self::key_version())? { 711 Some(version) => Ok(version), 712 None => Ok(KrillVersion::v0_5_0_or_before()), 713 } 714 } 715 set_version(&self, version: &KrillVersion) -> Result<(), AggregateStoreError>716 pub fn set_version(&self, version: &KrillVersion) -> Result<(), AggregateStoreError> { 717 self.kv.store(&Self::key_version(), version)?; 718 Ok(()) 719 } 720 command_keys_ascending( &self, id: &Handle, crit: &CommandHistoryCriteria, ) -> Result<Vec<CommandKey>, AggregateStoreError>721 fn command_keys_ascending( 722 &self, 723 id: &Handle, 724 crit: &CommandHistoryCriteria, 725 ) -> Result<Vec<CommandKey>, AggregateStoreError> { 726 let mut command_keys = vec![]; 727 728 for key in self.kv.keys(Some(id.to_string()), "command--")? { 729 match CommandKey::from_str(key.name()) { 730 Ok(command_key) => { 731 if command_key.matches_crit(crit) { 732 command_keys.push(command_key); 733 } 734 } 735 Err(_) => { 736 warn!("Found strange command-like key in disk key-value store: {}", key.name()); 737 } 738 } 739 } 740 741 command_keys.sort_by(|a, b| a.sequence.cmp(&b.sequence)); 742 743 Ok(command_keys) 744 } 745 746 /// Private, should be called through `list` which takes care of locking. aggregates(&self) -> Result<Vec<Handle>, AggregateStoreError>747 fn aggregates(&self) -> Result<Vec<Handle>, AggregateStoreError> { 748 let mut res = vec![]; 749 750 for scope in self.kv.scopes()? { 751 if let Ok(handle) = Handle::from_str(&scope) { 752 res.push(handle) 753 } 754 } 755 756 Ok(res) 757 } 758 759 /// Clean surplus events archive_surplus_events(&self, id: &Handle, from: u64) -> Result<(), AggregateStoreError>760 fn archive_surplus_events(&self, id: &Handle, from: u64) -> Result<(), AggregateStoreError> { 761 for key in self.kv.keys(Some(id.to_string()), "delta-")? { 762 let name = key.name(); 763 if name.starts_with("delta-") && name.ends_with(".json") { 764 let start = 6; 765 let end = name.len() - 5; 766 if end > start { 767 if let Ok(v) = u64::from_str(&name[start..end]) { 768 if v >= from { 769 let key = Self::key_for_event(id, v); 770 warn!("Archiving surplus event for '{}': {}", id, key); 771 self.kv 772 .archive_surplus(&key) 773 .map_err(AggregateStoreError::KeyStoreError)? 774 } 775 } 776 } 777 } 778 } 779 Ok(()) 780 } 781 782 /// Archive a surplus value for a key archive_surplus_command(&self, id: &Handle, key: &CommandKey) -> Result<(), AggregateStoreError>783 fn archive_surplus_command(&self, id: &Handle, key: &CommandKey) -> Result<(), AggregateStoreError> { 784 let key = Self::key_for_command(id, key); 785 warn!("Archiving surplus command for '{}': {}", id, key); 786 self.kv 787 .archive_surplus(&key) 788 .map_err(AggregateStoreError::KeyStoreError) 789 } 790 791 /// MUST check if the event already exists and return an error if it does. store_event<V: Event>(&self, event: &V) -> Result<(), AggregateStoreError>792 fn store_event<V: Event>(&self, event: &V) -> Result<(), AggregateStoreError> { 793 let id = event.handle(); 794 let version = event.version(); 795 let key = Self::key_for_event(id, version); 796 self.kv.store_new(&key, event)?; 797 Ok(()) 798 } 799 store_command<S: WithStorableDetails>(&self, command: StoredCommand<S>) -> Result<(), AggregateStoreError>800 fn store_command<S: WithStorableDetails>(&self, command: StoredCommand<S>) -> Result<(), AggregateStoreError> { 801 let id = command.handle(); 802 803 let command_key = CommandKey::for_stored(&command); 804 let key = Self::key_for_command(id, &command_key); 805 806 self.kv.store_new(&key, &command)?; 807 Ok(()) 808 } 809 810 /// Get the latest aggregate 811 /// limit to the event nr, i.e. the resulting aggregate version will be limit + 1 get_aggregate(&self, id: &Handle, limit: Option<u64>) -> Result<Option<A>, AggregateStoreError>812 fn get_aggregate(&self, id: &Handle, limit: Option<u64>) -> Result<Option<A>, AggregateStoreError> { 813 // 1) Try to get a snapshot. 814 // 2) If that fails, or if it exceeds the limit, try the backup 815 // 3) If that fails, try to get the init event. 816 // 817 // Then replay all newer events that can be found up to the version (or latest if version is None) 818 trace!("Getting aggregate for '{}'", id); 819 820 let mut aggregate_opt: Option<A> = None; 821 822 let snapshot_key = Self::key_for_snapshot(id); 823 824 match self.kv.get::<A>(&snapshot_key) { 825 Err(e) => { 826 // snapshot file was present and corrupt 827 error!( 828 "Could not parse snapshot for '{}', archiving as corrupt. Error was: {}", 829 id, e 830 ); 831 self.kv.archive_corrupt(&snapshot_key)?; 832 } 833 Ok(Some(agg)) => { 834 // snapshot present and okay 835 trace!("Found snapshot for '{}'", id); 836 if let Some(limit) = limit { 837 if limit >= agg.version() - 1 { 838 aggregate_opt = Some(agg) 839 } else { 840 trace!("Discarding snapshot after limit '{}'", id); 841 self.kv.archive_surplus(&snapshot_key)?; 842 } 843 } else { 844 debug!("Found valid snapshot for '{}'", id); 845 aggregate_opt = Some(agg) 846 } 847 } 848 Ok(None) => {} 849 } 850 851 if aggregate_opt.is_none() { 852 warn!("No snapshot found for '{}' will try backup snapshot", id); 853 let backup_snapshot_key = Self::key_for_backup_snapshot(id); 854 match self.kv.get::<A>(&backup_snapshot_key) { 855 Err(e) => { 856 // backup snapshot present and corrupt 857 error!( 858 "Could not parse backup snapshot for '{}', archiving as corrupt. Error: {}", 859 id, e 860 ); 861 self.kv.archive_corrupt(&backup_snapshot_key)?; 862 } 863 Ok(Some(agg)) => { 864 trace!("Found backup snapshot for '{}'", id); 865 if let Some(limit) = limit { 866 if limit >= agg.version() - 1 { 867 aggregate_opt = Some(agg) 868 } else { 869 trace!("Discarding backup snapshot after limit '{}'", id); 870 self.kv.archive_surplus(&backup_snapshot_key)?; 871 } 872 } else { 873 debug!("Found valid backup snapshot for '{}'", id); 874 aggregate_opt = Some(agg) 875 } 876 } 877 Ok(None) => {} 878 } 879 } 880 881 if aggregate_opt.is_none() { 882 warn!("No snapshots found for '{}' will try from initialization event.", id); 883 let init_key = Self::key_for_event(id, 0); 884 aggregate_opt = match self.kv.get::<A::InitEvent>(&init_key)? { 885 Some(e) => { 886 trace!("Rebuilding aggregate {} from init event", id); 887 Some(A::init(e).map_err(|_| AggregateStoreError::InitError(id.clone()))?) 888 } 889 None => None, 890 } 891 } 892 893 match aggregate_opt { 894 None => Ok(None), 895 Some(mut aggregate) => { 896 self.update_aggregate(id, &mut aggregate, limit)?; 897 Ok(Some(aggregate)) 898 } 899 } 900 } 901 update_aggregate(&self, id: &Handle, aggregate: &mut A, limit: Option<u64>) -> Result<(), AggregateStoreError>902 fn update_aggregate(&self, id: &Handle, aggregate: &mut A, limit: Option<u64>) -> Result<(), AggregateStoreError> { 903 let start = aggregate.version(); 904 let limit = if let Some(limit) = limit { 905 debug!("Will attempt to update '{}' using explicit limit", id); 906 limit 907 } else if let Ok(info) = self.get_info(id) { 908 debug!("Will attempt to update '{}' using limit from info", id); 909 info.last_event 910 } else { 911 let nr_events = self.kv.keys(Some(id.to_string()), "delta-")?.len(); 912 if nr_events < 1 { 913 return Err(AggregateStoreError::InfoMissing(id.clone())); 914 } else { 915 let limit = (nr_events - 1) as u64; 916 debug!("Will attempt to update '{}' from limit based on nr of events", id,); 917 limit 918 } 919 }; 920 921 if limit == aggregate.version() - 1 { 922 // already at version, done 923 // note that an event has the version of the aggregate it *affects*. So delta 10 results in version 11. 924 debug!("Snapshot for '{}' is up to date", id); 925 return Ok(()); 926 } 927 928 debug!( 929 "Will attempt to update '{}' from version: {} to: {}", 930 id, 931 start, 932 limit + 1 933 ); 934 935 if start > limit { 936 return Err(AggregateStoreError::ReplayError(id.clone(), limit, start)); 937 } 938 939 for version in start..limit + 1 { 940 if let Some(e) = self.get_event(id, version)? { 941 if aggregate.version() != version { 942 error!("Trying to apply event to wrong version of aggregate in replay"); 943 return Err(AggregateStoreError::ReplayError(id.clone(), limit, version)); 944 } 945 aggregate.apply(e); 946 debug!("Applied event nr {} to aggregate {}", version, id); 947 } else { 948 return Err(AggregateStoreError::ReplayError(id.clone(), limit, version)); 949 } 950 } 951 952 Ok(()) 953 } 954 955 /// Saves the latest snapshot - overwrites any previous snapshot. store_snapshot<V: Aggregate>(&self, id: &Handle, aggregate: &V) -> Result<(), AggregateStoreError>956 fn store_snapshot<V: Aggregate>(&self, id: &Handle, aggregate: &V) -> Result<(), AggregateStoreError> { 957 let snapshot_new = Self::key_for_new_snapshot(id); 958 let snapshot_current = Self::key_for_snapshot(id); 959 let snapshot_backup = Self::key_for_backup_snapshot(id); 960 961 self.kv.store(&snapshot_new, aggregate)?; 962 963 if self.kv.has(&snapshot_backup)? { 964 self.kv.drop_key(&snapshot_backup)?; 965 } 966 if self.kv.has(&snapshot_current)? { 967 self.kv.move_key(&snapshot_current, &snapshot_backup)?; 968 } 969 self.kv.move_key(&snapshot_new, &snapshot_current)?; 970 971 Ok(()) 972 } 973 974 /// Drop an aggregate, completely. Handle with care! drop_aggregate(&self, id: &Handle) -> Result<(), AggregateStoreError>975 pub fn drop_aggregate(&self, id: &Handle) -> Result<(), AggregateStoreError> { 976 self.cache_remove(id); 977 self.kv.drop_scope(id.as_str())?; 978 Ok(()) 979 } 980 get_info(&self, id: &Handle) -> Result<StoredValueInfo, AggregateStoreError>981 fn get_info(&self, id: &Handle) -> Result<StoredValueInfo, AggregateStoreError> { 982 let key = Self::key_for_info(id); 983 let info = self 984 .kv 985 .get(&key) 986 .map_err(|_| AggregateStoreError::InfoCorrupt(id.clone()))?; 987 info.ok_or_else(|| AggregateStoreError::InfoMissing(id.clone())) 988 } 989 save_info(&self, id: &Handle, info: &StoredValueInfo) -> Result<(), AggregateStoreError>990 fn save_info(&self, id: &Handle, info: &StoredValueInfo) -> Result<(), AggregateStoreError> { 991 let key = Self::key_for_info(id); 992 self.kv.store(&key, info).map_err(AggregateStoreError::KeyStoreError) 993 } 994 } 995 996 //------------ AggregateStoreError ------------------------------------------- 997 998 /// This type defines possible Errors for the AggregateStore 999 #[derive(Debug)] 1000 pub enum AggregateStoreError { 1001 IoError(KrillIoError), 1002 KeyStoreError(KeyValueError), 1003 NotInitialized, 1004 UnknownAggregate(Handle), 1005 InitError(Handle), 1006 ReplayError(Handle, u64, u64), 1007 InfoMissing(Handle), 1008 InfoCorrupt(Handle), 1009 WrongEventForAggregate(Handle, Handle, u64, u64), 1010 ConcurrentModification(Handle), 1011 UnknownCommand(Handle, u64), 1012 CommandOffsetTooLarge(u64, u64), 1013 WarmupFailed(Handle, String), 1014 CouldNotRecover(Handle), 1015 CouldNotArchive(Handle, String), 1016 CommandCorrupt(Handle, CommandKey), 1017 CommandNotFound(Handle, CommandKey), 1018 EventCorrupt(Handle, u64), 1019 } 1020 1021 impl fmt::Display for AggregateStoreError { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1022 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 1023 match self { 1024 AggregateStoreError::IoError(e) => e.fmt(f), 1025 AggregateStoreError::KeyStoreError(e) => write!(f, "KeyStore Error: {}", e), 1026 AggregateStoreError::NotInitialized => write!(f, "This aggregate store is not initialized"), 1027 AggregateStoreError::UnknownAggregate(handle) => write!(f, "unknown entity: {}", handle), 1028 AggregateStoreError::InitError(handle) => { 1029 write!(f, "Init event exists for '{}', but cannot be applied", handle) 1030 } 1031 AggregateStoreError::ReplayError(handle, target_version, fail_version) => write!( 1032 f, 1033 "Cannot reconstruct '{}' to version '{}', failed at version {}", 1034 handle, target_version, fail_version 1035 ), 1036 AggregateStoreError::InfoMissing(handle) => write!(f, "Missing stored value info for '{}'", handle), 1037 AggregateStoreError::InfoCorrupt(handle) => write!(f, "Corrupt stored value info for '{}'", handle), 1038 AggregateStoreError::WrongEventForAggregate(expected, found, expected_v, found_v) => { 1039 write!( 1040 f, 1041 "event not applicable to entity. Expected: {} {}, found: {} {}", 1042 expected, expected_v, found, found_v 1043 ) 1044 } 1045 AggregateStoreError::ConcurrentModification(handle) => { 1046 write!(f, "concurrent modification attempt for entity: '{}'", handle) 1047 } 1048 AggregateStoreError::UnknownCommand(handle, seq) => write!( 1049 f, 1050 "Aggregate '{}' does not have command with sequence '{}'", 1051 handle, seq 1052 ), 1053 AggregateStoreError::CommandOffsetTooLarge(offset, total) => { 1054 write!(f, "Offset '{}' exceeds total '{}'", offset, total) 1055 } 1056 AggregateStoreError::WarmupFailed(handle, e) => { 1057 write!(f, "Could not rebuild state for '{}': {}", handle, e) 1058 } 1059 AggregateStoreError::CouldNotRecover(handle) => write!( 1060 f, 1061 "Could not recover state for '{}', aborting recover. Use backup!!", 1062 handle 1063 ), 1064 AggregateStoreError::CouldNotArchive(handle, e) => write!( 1065 f, 1066 "Could not archive commands and events for '{}'. Error: {}", 1067 handle, e 1068 ), 1069 AggregateStoreError::CommandCorrupt(handle, key) => { 1070 write!(f, "StoredCommand '{}' for '{}' was corrupt", handle, key) 1071 } 1072 AggregateStoreError::CommandNotFound(handle, key) => { 1073 write!(f, "StoredCommand '{}' for '{}' cannot be found", handle, key) 1074 } 1075 AggregateStoreError::EventCorrupt(handle, version) => { 1076 write!(f, "Stored event '{}' for '{}' was corrupt", handle, version) 1077 } 1078 } 1079 } 1080 } 1081 1082 impl From<KeyValueError> for AggregateStoreError { from(e: KeyValueError) -> Self1083 fn from(e: KeyValueError) -> Self { 1084 AggregateStoreError::KeyStoreError(e) 1085 } 1086 } 1087 1088 //------------ Tests --------------------------------------------------------- 1089 1090 #[cfg(test)] 1091 mod tests { 1092 1093 use super::*; 1094 1095 #[test] command_key_to_from_str()1096 fn command_key_to_from_str() { 1097 let key_str = "command--1576389600--87--cmd-ca-publish"; 1098 let key = CommandKey::from_str(key_str).unwrap(); 1099 assert_eq!(key_str, &key.to_string()); 1100 1101 let key_with_dot_json_str = "command--1576389600--87--cmd-ca-publish.json"; 1102 let key_with_dot_json = CommandKey::from_str(key_with_dot_json_str).unwrap(); 1103 1104 assert_eq!(key, key_with_dot_json); 1105 } 1106 } 1107