1 // This Source Code Form is subject to the terms of the Mozilla Public
2 // License, v. 2.0. If a copy of the MPL was not distributed with this
3 // file, You can obtain one at https://mozilla.org/MPL/2.0/.
4 
5 use std::collections::HashMap;
6 use std::fs;
7 use std::fs::{create_dir_all, File, OpenOptions};
8 use std::io::BufRead;
9 use std::io::BufReader;
10 use std::io::Write;
11 use std::path::{Path, PathBuf};
12 use std::sync::RwLock;
13 
14 use serde::{Deserialize, Serialize};
15 use serde_json::{json, Value as JsonValue};
16 
17 use crate::coverage::record_coverage;
18 use crate::CommonMetricData;
19 use crate::Glean;
20 use crate::Result;
21 
22 /// Represents the recorded data for a single event.
23 #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
24 pub struct RecordedEvent {
25     /// The timestamp of when the event was recorded.
26     ///
27     /// This allows to order events from a single process run.
28     pub timestamp: u64,
29 
30     /// The event's category.
31     ///
32     /// This is defined by users in the metrics file.
33     pub category: String,
34 
35     /// The event's name.
36     ///
37     /// This is defined by users in the metrics file.
38     pub name: String,
39 
40     /// A map of all extra data values.
41     ///
42     /// The set of allowed extra keys is defined by users in the metrics file.
43     #[serde(skip_serializing_if = "Option::is_none")]
44     pub extra: Option<HashMap<String, String>>,
45 }
46 
47 impl RecordedEvent {
48     /// Serialize an event to JSON, adjusting its timestamp relative to a base timestamp
serialize_relative(&self, timestamp_offset: u64) -> JsonValue49     fn serialize_relative(&self, timestamp_offset: u64) -> JsonValue {
50         json!(&RecordedEvent {
51             timestamp: self.timestamp - timestamp_offset,
52             category: self.category.clone(),
53             name: self.name.clone(),
54             extra: self.extra.clone(),
55         })
56     }
57 }
58 
59 /// This struct handles the in-memory and on-disk storage logic for events.
60 ///
61 /// So that the data survives shutting down of the application, events are stored
62 /// in an append-only file on disk, in addition to the store in memory. Each line
63 /// of this file records a single event in JSON, exactly as it will be sent in the
64 /// ping. There is one file per store.
65 ///
66 /// When restarting the application, these on-disk files are checked, and if any are
67 /// found, they are loaded, queued for sending and flushed immediately before any
68 /// further events are collected. This is because the timestamps for these events
69 /// may have come from a previous boot of the device, and therefore will not be
70 /// compatible with any newly-collected events.
71 #[derive(Debug)]
72 pub struct EventDatabase {
73     /// Path to directory of on-disk event files
74     pub path: PathBuf,
75     /// The in-memory list of events
76     event_stores: RwLock<HashMap<String, Vec<RecordedEvent>>>,
77     /// A lock to be held when doing operations on the filesystem
78     file_lock: RwLock<()>,
79 }
80 
81 impl EventDatabase {
82     /// Creates a new event database.
83     ///
84     /// # Arguments
85     ///
86     /// * `data_path` - The directory to store events in. A new directory
87     /// * `events` - will be created inside of this directory.
new(data_path: &Path) -> Result<Self>88     pub fn new(data_path: &Path) -> Result<Self> {
89         let path = data_path.join("events");
90         create_dir_all(&path)?;
91 
92         Ok(Self {
93             path,
94             event_stores: RwLock::new(HashMap::new()),
95             file_lock: RwLock::new(()),
96         })
97     }
98 
99     /// Initializes events storage after Glean is fully initialized and ready to send pings.
100     ///
101     /// This must be called once on application startup, e.g. from
102     /// [Glean.initialize], but after we are ready to send pings, since this
103     /// could potentially collect and send pings.
104     ///
105     /// If there are any events queued on disk, it loads them into memory so
106     /// that the memory and disk representations are in sync.
107     ///
108     /// Secondly, if this is the first time the application has been run since
109     /// rebooting, any pings containing events are assembled into pings and cleared
110     /// immediately, since their timestamps won't be compatible with the timestamps
111     /// we would create during this boot of the device.
112     ///
113     /// # Arguments
114     ///
115     /// * `glean` - The Glean instance.
116     ///
117     /// # Returns
118     ///
119     /// Whether at least one ping was generated.
flush_pending_events_on_startup(&self, glean: &Glean) -> bool120     pub fn flush_pending_events_on_startup(&self, glean: &Glean) -> bool {
121         match self.load_events_from_disk() {
122             Ok(_) => self.send_all_events(glean),
123             Err(err) => {
124                 log::warn!("Error loading events from disk: {}", err);
125                 false
126             }
127         }
128     }
129 
load_events_from_disk(&self) -> Result<()>130     fn load_events_from_disk(&self) -> Result<()> {
131         // NOTE: The order of locks here is important.
132         // In other code parts we might acquire the `file_lock` when we already have acquired
133         // a lock on `event_stores`.
134         // This is a potential lock-order-inversion.
135         let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
136         let _lock = self.file_lock.read().unwrap(); // safe unwrap, only error case is poisoning
137 
138         for entry in fs::read_dir(&self.path)? {
139             let entry = entry?;
140             if entry.file_type()?.is_file() {
141                 let store_name = entry.file_name().into_string()?;
142                 let file = BufReader::new(File::open(entry.path())?);
143                 db.insert(
144                     store_name,
145                     file.lines()
146                         .filter_map(|line| line.ok())
147                         .filter_map(|line| serde_json::from_str::<RecordedEvent>(&line).ok())
148                         .collect(),
149                 );
150             }
151         }
152         Ok(())
153     }
154 
send_all_events(&self, glean: &Glean) -> bool155     fn send_all_events(&self, glean: &Glean) -> bool {
156         let store_names = {
157             let db = self.event_stores.read().unwrap(); // safe unwrap, only error case is poisoning
158             db.keys().cloned().collect::<Vec<String>>()
159         };
160 
161         let mut ping_sent = false;
162         for store_name in store_names {
163             ping_sent |= glean.submit_ping_by_name(&store_name, Some("startup"));
164         }
165 
166         ping_sent
167     }
168 
169     /// Records an event in the desired stores.
170     ///
171     /// # Arguments
172     ///
173     /// * `glean` - The Glean instance.
174     /// * `meta` - The metadata about the event metric. Used to get the category,
175     ///   name and stores for the metric.
176     /// * `timestamp` - The timestamp of the event, in milliseconds. Must use a
177     ///   monotonically increasing timer (this value is obtained on the
178     ///   platform-specific side).
179     /// * `extra` - Extra data values, mapping strings to strings.
record( &self, glean: &Glean, meta: &CommonMetricData, timestamp: u64, extra: Option<HashMap<String, String>>, )180     pub fn record(
181         &self,
182         glean: &Glean,
183         meta: &CommonMetricData,
184         timestamp: u64,
185         extra: Option<HashMap<String, String>>,
186     ) {
187         // If upload is disabled we don't want to record.
188         if !glean.is_upload_enabled() {
189             return;
190         }
191 
192         // Create RecordedEvent object, and its JSON form for serialization
193         // on disk.
194         let event = RecordedEvent {
195             timestamp,
196             category: meta.category.to_string(),
197             name: meta.name.to_string(),
198             extra,
199         };
200         let event_json = serde_json::to_string(&event).unwrap(); // safe unwrap, event can always be serialized
201 
202         // Store the event in memory and on disk to each of the stores.
203         let mut stores_to_submit: Vec<&str> = Vec::new();
204         {
205             let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
206             for store_name in meta.send_in_pings.iter() {
207                 let store = db.entry(store_name.to_string()).or_insert_with(Vec::new);
208                 store.push(event.clone());
209                 self.write_event_to_disk(store_name, &event_json);
210                 if store.len() == glean.get_max_events() {
211                     stores_to_submit.push(&store_name);
212                 }
213             }
214         }
215 
216         // If any of the event stores reached maximum size, submit the pings
217         // containing those events immediately.
218         for store_name in stores_to_submit {
219             glean.submit_ping_by_name(store_name, Some("max_capacity"));
220         }
221     }
222 
223     /// Writes an event to a single store on disk.
224     ///
225     /// # Arguments
226     ///
227     /// * `store_name` - The name of the store.
228     /// * `event_json` - The event content, as a single-line JSON-encoded string.
write_event_to_disk(&self, store_name: &str, event_json: &str)229     fn write_event_to_disk(&self, store_name: &str, event_json: &str) {
230         let _lock = self.file_lock.write().unwrap(); // safe unwrap, only error case is poisoning
231         if let Err(err) = OpenOptions::new()
232             .create(true)
233             .append(true)
234             .open(self.path.join(store_name))
235             .and_then(|mut file| writeln!(file, "{}", event_json))
236         {
237             log::warn!("IO error writing event to store '{}': {}", store_name, err);
238         }
239     }
240 
241     /// Gets a snapshot of the stored event data as a JsonValue.
242     ///
243     /// # Arguments
244     ///
245     /// * `store_name` - The name of the desired store.
246     /// * `clear_store` - Whether to clear the store after snapshotting.
247     ///
248     /// # Returns
249     ///
250     /// A array of events, JSON encoded, if any. Otherwise `None`.
snapshot_as_json(&self, store_name: &str, clear_store: bool) -> Option<JsonValue>251     pub fn snapshot_as_json(&self, store_name: &str, clear_store: bool) -> Option<JsonValue> {
252         let result = {
253             let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
254             db.get_mut(&store_name.to_string()).and_then(|store| {
255                 if !store.is_empty() {
256                     // Timestamps may have been recorded out-of-order, so sort the events
257                     // by the timestamp.
258                     // We can't insert events in order as-we-go, because we also append
259                     // events to a file on disk, where this would be expensive. Best to
260                     // handle this in every case (whether events came from disk or memory)
261                     // in a single location.
262                     store.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
263                     let first_timestamp = store[0].timestamp;
264                     let snapshot = store
265                         .iter()
266                         .map(|e| e.serialize_relative(first_timestamp))
267                         .collect();
268                     Some(snapshot)
269                 } else {
270                     log::warn!("Unexpectly got empty event store for '{}'", store_name);
271                     None
272                 }
273             })
274         };
275 
276         if clear_store {
277             self.event_stores
278                 .write()
279                 .unwrap() // safe unwrap, only error case is poisoning
280                 .remove(&store_name.to_string());
281 
282             let _lock = self.file_lock.write().unwrap(); // safe unwrap, only error case is poisoning
283             if let Err(err) = fs::remove_file(self.path.join(store_name)) {
284                 match err.kind() {
285                     std::io::ErrorKind::NotFound => {
286                         // silently drop this error, the file was already non-existing
287                     }
288                     _ => log::warn!("Error removing events queue file '{}': {}", store_name, err),
289                 }
290             }
291         }
292 
293         result
294     }
295 
296     /// Clears all stored events, both in memory and on-disk.
clear_all(&self) -> Result<()>297     pub fn clear_all(&self) -> Result<()> {
298         // safe unwrap, only error case is poisoning
299         self.event_stores.write().unwrap().clear();
300 
301         // safe unwrap, only error case is poisoning
302         let _lock = self.file_lock.write().unwrap();
303         std::fs::remove_dir_all(&self.path)?;
304         create_dir_all(&self.path)?;
305 
306         Ok(())
307     }
308 
309     /// **Test-only API (exported for FFI purposes).**
310     ///
311     /// Returns whether there are any events currently stored for the given even
312     /// metric.
313     ///
314     /// This doesn't clear the stored value.
test_has_value<'a>(&'a self, meta: &'a CommonMetricData, store_name: &str) -> bool315     pub fn test_has_value<'a>(&'a self, meta: &'a CommonMetricData, store_name: &str) -> bool {
316         record_coverage(&meta.base_identifier());
317 
318         self.event_stores
319             .read()
320             .unwrap() // safe unwrap, only error case is poisoning
321             .get(&store_name.to_string())
322             .into_iter()
323             .flatten()
324             .any(|event| event.name == meta.name && event.category == meta.category)
325     }
326 
327     /// **Test-only API (exported for FFI purposes).**
328     ///
329     /// Gets the vector of currently stored events for the given event metric in
330     /// the given store.
331     ///
332     /// This doesn't clear the stored value.
test_get_value<'a>( &'a self, meta: &'a CommonMetricData, store_name: &str, ) -> Option<Vec<RecordedEvent>>333     pub fn test_get_value<'a>(
334         &'a self,
335         meta: &'a CommonMetricData,
336         store_name: &str,
337     ) -> Option<Vec<RecordedEvent>> {
338         record_coverage(&meta.base_identifier());
339 
340         let value: Vec<RecordedEvent> = self
341             .event_stores
342             .read()
343             .unwrap() // safe unwrap, only error case is poisoning
344             .get(&store_name.to_string())
345             .into_iter()
346             .flatten()
347             .filter(|event| event.name == meta.name && event.category == meta.category)
348             .cloned()
349             .collect();
350         if !value.is_empty() {
351             Some(value)
352         } else {
353             None
354         }
355     }
356 }
357 
358 #[cfg(test)]
359 mod test {
360     use super::*;
361     use crate::tests::new_glean;
362     use crate::CommonMetricData;
363 
364     #[test]
handle_truncated_events_on_disk()365     fn handle_truncated_events_on_disk() {
366         let t = tempfile::tempdir().unwrap();
367 
368         {
369             let db = EventDatabase::new(&t.path()).unwrap();
370             db.write_event_to_disk("events", "{\"timestamp\": 500");
371             db.write_event_to_disk("events", "{\"timestamp\"");
372             db.write_event_to_disk(
373                 "events",
374                 "{\"timestamp\": 501, \"category\": \"ui\", \"name\": \"click\"}",
375             );
376         }
377 
378         {
379             let db = EventDatabase::new(&t.path()).unwrap();
380             db.load_events_from_disk().unwrap();
381             let events = &db.event_stores.read().unwrap()["events"];
382             assert_eq!(1, events.len());
383         }
384     }
385 
386     #[test]
stable_serialization()387     fn stable_serialization() {
388         let event_empty = RecordedEvent {
389             timestamp: 2,
390             category: "cat".to_string(),
391             name: "name".to_string(),
392             extra: None,
393         };
394 
395         let mut data = HashMap::new();
396         data.insert("a key".to_string(), "a value".to_string());
397         let event_data = RecordedEvent {
398             timestamp: 2,
399             category: "cat".to_string(),
400             name: "name".to_string(),
401             extra: Some(data),
402         };
403 
404         let event_empty_json = ::serde_json::to_string_pretty(&event_empty).unwrap();
405         let event_data_json = ::serde_json::to_string_pretty(&event_data).unwrap();
406 
407         assert_eq!(
408             event_empty,
409             serde_json::from_str(&event_empty_json).unwrap()
410         );
411         assert_eq!(event_data, serde_json::from_str(&event_data_json).unwrap());
412     }
413 
414     #[test]
deserialize_existing_data()415     fn deserialize_existing_data() {
416         let event_empty_json = r#"
417 {
418   "timestamp": 2,
419   "category": "cat",
420   "name": "name"
421 }
422             "#;
423 
424         let event_data_json = r#"
425 {
426   "timestamp": 2,
427   "category": "cat",
428   "name": "name",
429   "extra": {
430     "a key": "a value"
431   }
432 }
433         "#;
434 
435         let event_empty = RecordedEvent {
436             timestamp: 2,
437             category: "cat".to_string(),
438             name: "name".to_string(),
439             extra: None,
440         };
441 
442         let mut data = HashMap::new();
443         data.insert("a key".to_string(), "a value".to_string());
444         let event_data = RecordedEvent {
445             timestamp: 2,
446             category: "cat".to_string(),
447             name: "name".to_string(),
448             extra: Some(data),
449         };
450 
451         assert_eq!(
452             event_empty,
453             serde_json::from_str(&event_empty_json).unwrap()
454         );
455         assert_eq!(event_data, serde_json::from_str(&event_data_json).unwrap());
456     }
457 
458     #[test]
doesnt_record_when_upload_is_disabled()459     fn doesnt_record_when_upload_is_disabled() {
460         let (mut glean, dir) = new_glean(None);
461         let db = EventDatabase::new(dir.path()).unwrap();
462 
463         let test_storage = "test-storage";
464         let test_category = "category";
465         let test_name = "name";
466         let test_timestamp = 2;
467         let test_meta = CommonMetricData::new(test_category, test_name, test_storage);
468         let event_data = RecordedEvent {
469             timestamp: test_timestamp,
470             category: test_category.to_string(),
471             name: test_name.to_string(),
472             extra: None,
473         };
474 
475         // Upload is not yet disabled,
476         // so let's check that everything is getting recorded as expected.
477         db.record(&glean, &test_meta, 2, None);
478         {
479             let event_stores = db.event_stores.read().unwrap();
480             assert_eq!(&event_data, &event_stores.get(test_storage).unwrap()[0]);
481             assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
482         }
483 
484         glean.set_upload_enabled(false);
485 
486         // Now that upload is disabled, let's check nothing is recorded.
487         db.record(&glean, &test_meta, 2, None);
488         {
489             let event_stores = db.event_stores.read().unwrap();
490             assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
491         }
492     }
493 }
494