1 // This Source Code Form is subject to the terms of the Mozilla Public 2 // License, v. 2.0. If a copy of the MPL was not distributed with this 3 // file, You can obtain one at https://mozilla.org/MPL/2.0/. 4 5 use std::collections::HashMap; 6 use std::fs; 7 use std::fs::{create_dir_all, File, OpenOptions}; 8 use std::io::BufRead; 9 use std::io::BufReader; 10 use std::io::Write; 11 use std::path::{Path, PathBuf}; 12 use std::sync::RwLock; 13 14 use serde::{Deserialize, Serialize}; 15 use serde_json::{json, Value as JsonValue}; 16 17 use crate::coverage::record_coverage; 18 use crate::CommonMetricData; 19 use crate::Glean; 20 use crate::Result; 21 22 /// Represents the recorded data for a single event. 23 #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] 24 pub struct RecordedEvent { 25 /// The timestamp of when the event was recorded. 26 /// 27 /// This allows to order events from a single process run. 28 pub timestamp: u64, 29 30 /// The event's category. 31 /// 32 /// This is defined by users in the metrics file. 33 pub category: String, 34 35 /// The event's name. 36 /// 37 /// This is defined by users in the metrics file. 38 pub name: String, 39 40 /// A map of all extra data values. 41 /// 42 /// The set of allowed extra keys is defined by users in the metrics file. 43 #[serde(skip_serializing_if = "Option::is_none")] 44 pub extra: Option<HashMap<String, String>>, 45 } 46 47 impl RecordedEvent { 48 /// Serialize an event to JSON, adjusting its timestamp relative to a base timestamp serialize_relative(&self, timestamp_offset: u64) -> JsonValue49 fn serialize_relative(&self, timestamp_offset: u64) -> JsonValue { 50 json!(&RecordedEvent { 51 timestamp: self.timestamp - timestamp_offset, 52 category: self.category.clone(), 53 name: self.name.clone(), 54 extra: self.extra.clone(), 55 }) 56 } 57 } 58 59 /// This struct handles the in-memory and on-disk storage logic for events. 60 /// 61 /// So that the data survives shutting down of the application, events are stored 62 /// in an append-only file on disk, in addition to the store in memory. Each line 63 /// of this file records a single event in JSON, exactly as it will be sent in the 64 /// ping. There is one file per store. 65 /// 66 /// When restarting the application, these on-disk files are checked, and if any are 67 /// found, they are loaded, queued for sending and flushed immediately before any 68 /// further events are collected. This is because the timestamps for these events 69 /// may have come from a previous boot of the device, and therefore will not be 70 /// compatible with any newly-collected events. 71 #[derive(Debug)] 72 pub struct EventDatabase { 73 /// Path to directory of on-disk event files 74 pub path: PathBuf, 75 /// The in-memory list of events 76 event_stores: RwLock<HashMap<String, Vec<RecordedEvent>>>, 77 /// A lock to be held when doing operations on the filesystem 78 file_lock: RwLock<()>, 79 } 80 81 impl EventDatabase { 82 /// Creates a new event database. 83 /// 84 /// # Arguments 85 /// 86 /// * `data_path` - The directory to store events in. A new directory 87 /// * `events` - will be created inside of this directory. new(data_path: &Path) -> Result<Self>88 pub fn new(data_path: &Path) -> Result<Self> { 89 let path = data_path.join("events"); 90 create_dir_all(&path)?; 91 92 Ok(Self { 93 path, 94 event_stores: RwLock::new(HashMap::new()), 95 file_lock: RwLock::new(()), 96 }) 97 } 98 99 /// Initializes events storage after Glean is fully initialized and ready to send pings. 100 /// 101 /// This must be called once on application startup, e.g. from 102 /// [Glean.initialize], but after we are ready to send pings, since this 103 /// could potentially collect and send pings. 104 /// 105 /// If there are any events queued on disk, it loads them into memory so 106 /// that the memory and disk representations are in sync. 107 /// 108 /// Secondly, if this is the first time the application has been run since 109 /// rebooting, any pings containing events are assembled into pings and cleared 110 /// immediately, since their timestamps won't be compatible with the timestamps 111 /// we would create during this boot of the device. 112 /// 113 /// # Arguments 114 /// 115 /// * `glean` - The Glean instance. 116 /// 117 /// # Returns 118 /// 119 /// Whether at least one ping was generated. flush_pending_events_on_startup(&self, glean: &Glean) -> bool120 pub fn flush_pending_events_on_startup(&self, glean: &Glean) -> bool { 121 match self.load_events_from_disk() { 122 Ok(_) => self.send_all_events(glean), 123 Err(err) => { 124 log::warn!("Error loading events from disk: {}", err); 125 false 126 } 127 } 128 } 129 load_events_from_disk(&self) -> Result<()>130 fn load_events_from_disk(&self) -> Result<()> { 131 // NOTE: The order of locks here is important. 132 // In other code parts we might acquire the `file_lock` when we already have acquired 133 // a lock on `event_stores`. 134 // This is a potential lock-order-inversion. 135 let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning 136 let _lock = self.file_lock.read().unwrap(); // safe unwrap, only error case is poisoning 137 138 for entry in fs::read_dir(&self.path)? { 139 let entry = entry?; 140 if entry.file_type()?.is_file() { 141 let store_name = entry.file_name().into_string()?; 142 let file = BufReader::new(File::open(entry.path())?); 143 db.insert( 144 store_name, 145 file.lines() 146 .filter_map(|line| line.ok()) 147 .filter_map(|line| serde_json::from_str::<RecordedEvent>(&line).ok()) 148 .collect(), 149 ); 150 } 151 } 152 Ok(()) 153 } 154 send_all_events(&self, glean: &Glean) -> bool155 fn send_all_events(&self, glean: &Glean) -> bool { 156 let store_names = { 157 let db = self.event_stores.read().unwrap(); // safe unwrap, only error case is poisoning 158 db.keys().cloned().collect::<Vec<String>>() 159 }; 160 161 let mut ping_sent = false; 162 for store_name in store_names { 163 ping_sent |= glean.submit_ping_by_name(&store_name, Some("startup")); 164 } 165 166 ping_sent 167 } 168 169 /// Records an event in the desired stores. 170 /// 171 /// # Arguments 172 /// 173 /// * `glean` - The Glean instance. 174 /// * `meta` - The metadata about the event metric. Used to get the category, 175 /// name and stores for the metric. 176 /// * `timestamp` - The timestamp of the event, in milliseconds. Must use a 177 /// monotonically increasing timer (this value is obtained on the 178 /// platform-specific side). 179 /// * `extra` - Extra data values, mapping strings to strings. record( &self, glean: &Glean, meta: &CommonMetricData, timestamp: u64, extra: Option<HashMap<String, String>>, )180 pub fn record( 181 &self, 182 glean: &Glean, 183 meta: &CommonMetricData, 184 timestamp: u64, 185 extra: Option<HashMap<String, String>>, 186 ) { 187 // If upload is disabled we don't want to record. 188 if !glean.is_upload_enabled() { 189 return; 190 } 191 192 // Create RecordedEvent object, and its JSON form for serialization 193 // on disk. 194 let event = RecordedEvent { 195 timestamp, 196 category: meta.category.to_string(), 197 name: meta.name.to_string(), 198 extra, 199 }; 200 let event_json = serde_json::to_string(&event).unwrap(); // safe unwrap, event can always be serialized 201 202 // Store the event in memory and on disk to each of the stores. 203 let mut stores_to_submit: Vec<&str> = Vec::new(); 204 { 205 let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning 206 for store_name in meta.send_in_pings.iter() { 207 let store = db.entry(store_name.to_string()).or_insert_with(Vec::new); 208 store.push(event.clone()); 209 self.write_event_to_disk(store_name, &event_json); 210 if store.len() == glean.get_max_events() { 211 stores_to_submit.push(&store_name); 212 } 213 } 214 } 215 216 // If any of the event stores reached maximum size, submit the pings 217 // containing those events immediately. 218 for store_name in stores_to_submit { 219 glean.submit_ping_by_name(store_name, Some("max_capacity")); 220 } 221 } 222 223 /// Writes an event to a single store on disk. 224 /// 225 /// # Arguments 226 /// 227 /// * `store_name` - The name of the store. 228 /// * `event_json` - The event content, as a single-line JSON-encoded string. write_event_to_disk(&self, store_name: &str, event_json: &str)229 fn write_event_to_disk(&self, store_name: &str, event_json: &str) { 230 let _lock = self.file_lock.write().unwrap(); // safe unwrap, only error case is poisoning 231 if let Err(err) = OpenOptions::new() 232 .create(true) 233 .append(true) 234 .open(self.path.join(store_name)) 235 .and_then(|mut file| writeln!(file, "{}", event_json)) 236 { 237 log::warn!("IO error writing event to store '{}': {}", store_name, err); 238 } 239 } 240 241 /// Gets a snapshot of the stored event data as a JsonValue. 242 /// 243 /// # Arguments 244 /// 245 /// * `store_name` - The name of the desired store. 246 /// * `clear_store` - Whether to clear the store after snapshotting. 247 /// 248 /// # Returns 249 /// 250 /// A array of events, JSON encoded, if any. Otherwise `None`. snapshot_as_json(&self, store_name: &str, clear_store: bool) -> Option<JsonValue>251 pub fn snapshot_as_json(&self, store_name: &str, clear_store: bool) -> Option<JsonValue> { 252 let result = { 253 let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning 254 db.get_mut(&store_name.to_string()).and_then(|store| { 255 if !store.is_empty() { 256 // Timestamps may have been recorded out-of-order, so sort the events 257 // by the timestamp. 258 // We can't insert events in order as-we-go, because we also append 259 // events to a file on disk, where this would be expensive. Best to 260 // handle this in every case (whether events came from disk or memory) 261 // in a single location. 262 store.sort_by(|a, b| a.timestamp.cmp(&b.timestamp)); 263 let first_timestamp = store[0].timestamp; 264 let snapshot = store 265 .iter() 266 .map(|e| e.serialize_relative(first_timestamp)) 267 .collect(); 268 Some(snapshot) 269 } else { 270 log::warn!("Unexpectly got empty event store for '{}'", store_name); 271 None 272 } 273 }) 274 }; 275 276 if clear_store { 277 self.event_stores 278 .write() 279 .unwrap() // safe unwrap, only error case is poisoning 280 .remove(&store_name.to_string()); 281 282 let _lock = self.file_lock.write().unwrap(); // safe unwrap, only error case is poisoning 283 if let Err(err) = fs::remove_file(self.path.join(store_name)) { 284 match err.kind() { 285 std::io::ErrorKind::NotFound => { 286 // silently drop this error, the file was already non-existing 287 } 288 _ => log::warn!("Error removing events queue file '{}': {}", store_name, err), 289 } 290 } 291 } 292 293 result 294 } 295 296 /// Clears all stored events, both in memory and on-disk. clear_all(&self) -> Result<()>297 pub fn clear_all(&self) -> Result<()> { 298 // safe unwrap, only error case is poisoning 299 self.event_stores.write().unwrap().clear(); 300 301 // safe unwrap, only error case is poisoning 302 let _lock = self.file_lock.write().unwrap(); 303 std::fs::remove_dir_all(&self.path)?; 304 create_dir_all(&self.path)?; 305 306 Ok(()) 307 } 308 309 /// **Test-only API (exported for FFI purposes).** 310 /// 311 /// Returns whether there are any events currently stored for the given even 312 /// metric. 313 /// 314 /// This doesn't clear the stored value. test_has_value<'a>(&'a self, meta: &'a CommonMetricData, store_name: &str) -> bool315 pub fn test_has_value<'a>(&'a self, meta: &'a CommonMetricData, store_name: &str) -> bool { 316 record_coverage(&meta.base_identifier()); 317 318 self.event_stores 319 .read() 320 .unwrap() // safe unwrap, only error case is poisoning 321 .get(&store_name.to_string()) 322 .into_iter() 323 .flatten() 324 .any(|event| event.name == meta.name && event.category == meta.category) 325 } 326 327 /// **Test-only API (exported for FFI purposes).** 328 /// 329 /// Gets the vector of currently stored events for the given event metric in 330 /// the given store. 331 /// 332 /// This doesn't clear the stored value. test_get_value<'a>( &'a self, meta: &'a CommonMetricData, store_name: &str, ) -> Option<Vec<RecordedEvent>>333 pub fn test_get_value<'a>( 334 &'a self, 335 meta: &'a CommonMetricData, 336 store_name: &str, 337 ) -> Option<Vec<RecordedEvent>> { 338 record_coverage(&meta.base_identifier()); 339 340 let value: Vec<RecordedEvent> = self 341 .event_stores 342 .read() 343 .unwrap() // safe unwrap, only error case is poisoning 344 .get(&store_name.to_string()) 345 .into_iter() 346 .flatten() 347 .filter(|event| event.name == meta.name && event.category == meta.category) 348 .cloned() 349 .collect(); 350 if !value.is_empty() { 351 Some(value) 352 } else { 353 None 354 } 355 } 356 } 357 358 #[cfg(test)] 359 mod test { 360 use super::*; 361 use crate::tests::new_glean; 362 use crate::CommonMetricData; 363 364 #[test] handle_truncated_events_on_disk()365 fn handle_truncated_events_on_disk() { 366 let t = tempfile::tempdir().unwrap(); 367 368 { 369 let db = EventDatabase::new(&t.path()).unwrap(); 370 db.write_event_to_disk("events", "{\"timestamp\": 500"); 371 db.write_event_to_disk("events", "{\"timestamp\""); 372 db.write_event_to_disk( 373 "events", 374 "{\"timestamp\": 501, \"category\": \"ui\", \"name\": \"click\"}", 375 ); 376 } 377 378 { 379 let db = EventDatabase::new(&t.path()).unwrap(); 380 db.load_events_from_disk().unwrap(); 381 let events = &db.event_stores.read().unwrap()["events"]; 382 assert_eq!(1, events.len()); 383 } 384 } 385 386 #[test] stable_serialization()387 fn stable_serialization() { 388 let event_empty = RecordedEvent { 389 timestamp: 2, 390 category: "cat".to_string(), 391 name: "name".to_string(), 392 extra: None, 393 }; 394 395 let mut data = HashMap::new(); 396 data.insert("a key".to_string(), "a value".to_string()); 397 let event_data = RecordedEvent { 398 timestamp: 2, 399 category: "cat".to_string(), 400 name: "name".to_string(), 401 extra: Some(data), 402 }; 403 404 let event_empty_json = ::serde_json::to_string_pretty(&event_empty).unwrap(); 405 let event_data_json = ::serde_json::to_string_pretty(&event_data).unwrap(); 406 407 assert_eq!( 408 event_empty, 409 serde_json::from_str(&event_empty_json).unwrap() 410 ); 411 assert_eq!(event_data, serde_json::from_str(&event_data_json).unwrap()); 412 } 413 414 #[test] deserialize_existing_data()415 fn deserialize_existing_data() { 416 let event_empty_json = r#" 417 { 418 "timestamp": 2, 419 "category": "cat", 420 "name": "name" 421 } 422 "#; 423 424 let event_data_json = r#" 425 { 426 "timestamp": 2, 427 "category": "cat", 428 "name": "name", 429 "extra": { 430 "a key": "a value" 431 } 432 } 433 "#; 434 435 let event_empty = RecordedEvent { 436 timestamp: 2, 437 category: "cat".to_string(), 438 name: "name".to_string(), 439 extra: None, 440 }; 441 442 let mut data = HashMap::new(); 443 data.insert("a key".to_string(), "a value".to_string()); 444 let event_data = RecordedEvent { 445 timestamp: 2, 446 category: "cat".to_string(), 447 name: "name".to_string(), 448 extra: Some(data), 449 }; 450 451 assert_eq!( 452 event_empty, 453 serde_json::from_str(&event_empty_json).unwrap() 454 ); 455 assert_eq!(event_data, serde_json::from_str(&event_data_json).unwrap()); 456 } 457 458 #[test] doesnt_record_when_upload_is_disabled()459 fn doesnt_record_when_upload_is_disabled() { 460 let (mut glean, dir) = new_glean(None); 461 let db = EventDatabase::new(dir.path()).unwrap(); 462 463 let test_storage = "test-storage"; 464 let test_category = "category"; 465 let test_name = "name"; 466 let test_timestamp = 2; 467 let test_meta = CommonMetricData::new(test_category, test_name, test_storage); 468 let event_data = RecordedEvent { 469 timestamp: test_timestamp, 470 category: test_category.to_string(), 471 name: test_name.to_string(), 472 extra: None, 473 }; 474 475 // Upload is not yet disabled, 476 // so let's check that everything is getting recorded as expected. 477 db.record(&glean, &test_meta, 2, None); 478 { 479 let event_stores = db.event_stores.read().unwrap(); 480 assert_eq!(&event_data, &event_stores.get(test_storage).unwrap()[0]); 481 assert_eq!(event_stores.get(test_storage).unwrap().len(), 1); 482 } 483 484 glean.set_upload_enabled(false); 485 486 // Now that upload is disabled, let's check nothing is recorded. 487 db.record(&glean, &test_meta, 2, None); 488 { 489 let event_stores = db.event_stores.read().unwrap(); 490 assert_eq!(event_stores.get(test_storage).unwrap().len(), 1); 491 } 492 } 493 } 494