1 // This Source Code Form is subject to the terms of the Mozilla Public
2 // License, v. 2.0. If a copy of the MPL was not distributed with this
3 // file, You can obtain one at https://mozilla.org/MPL/2.0/.
4 
5 //! Ping collection, assembly & submission.
6 
7 use std::fs::{create_dir_all, File};
8 use std::io::Write;
9 use std::path::{Path, PathBuf};
10 
11 use log::info;
12 use serde_json::{json, Value as JsonValue};
13 
14 use crate::common_metric_data::{CommonMetricData, Lifetime};
15 use crate::metrics::{CounterMetric, DatetimeMetric, Metric, MetricType, PingType, TimeUnit};
16 use crate::storage::StorageManager;
17 use crate::upload::HeaderMap;
18 use crate::util::{get_iso_time_string, local_now_with_offset};
19 use crate::{
20     Glean, Result, DELETION_REQUEST_PINGS_DIRECTORY, INTERNAL_STORAGE, PENDING_PINGS_DIRECTORY,
21 };
22 
23 /// Holds everything you need to store or send a ping.
24 pub struct Ping<'a> {
25     /// The unique document id.
26     pub doc_id: &'a str,
27     /// The ping's name.
28     pub name: &'a str,
29     /// The path on the server to use when uplaoding this ping.
30     pub url_path: &'a str,
31     /// The payload, including `*_info` fields.
32     pub content: JsonValue,
33     /// The headers to upload with the payload.
34     pub headers: HeaderMap,
35 }
36 
37 /// Collect a ping's data, assemble it into its full payload and store it on disk.
38 pub struct PingMaker;
39 
merge(a: &mut JsonValue, b: &JsonValue)40 fn merge(a: &mut JsonValue, b: &JsonValue) {
41     match (a, b) {
42         (&mut JsonValue::Object(ref mut a), &JsonValue::Object(ref b)) => {
43             for (k, v) in b {
44                 merge(a.entry(k.clone()).or_insert(JsonValue::Null), v);
45             }
46         }
47         (a, b) => {
48             *a = b.clone();
49         }
50     }
51 }
52 
53 impl Default for PingMaker {
default() -> Self54     fn default() -> Self {
55         Self::new()
56     }
57 }
58 
59 impl PingMaker {
60     /// Creates a new [`PingMaker`].
new() -> Self61     pub fn new() -> Self {
62         Self
63     }
64 
65     /// Gets, and then increments, the sequence number for a given ping.
66     ///
67     /// This is crate-internal exclusively for enabling the migration tests.
get_ping_seq(&self, glean: &Glean, storage_name: &str) -> usize68     pub(super) fn get_ping_seq(&self, glean: &Glean, storage_name: &str) -> usize {
69         // Sequence numbers are stored as a counter under a name that includes the storage name
70         let seq = CounterMetric::new(CommonMetricData {
71             name: format!("{}#sequence", storage_name),
72             // We don't need a category, the name is already unique
73             category: "".into(),
74             send_in_pings: vec![INTERNAL_STORAGE.into()],
75             lifetime: Lifetime::User,
76             ..Default::default()
77         });
78 
79         let current_seq = match StorageManager.snapshot_metric(
80             glean.storage(),
81             INTERNAL_STORAGE,
82             &seq.meta().identifier(glean),
83             seq.meta().lifetime,
84         ) {
85             Some(Metric::Counter(i)) => i,
86             _ => 0,
87         };
88 
89         // Increase to next sequence id
90         seq.add(glean, 1);
91 
92         current_seq as usize
93     }
94 
95     /// Gets the formatted start and end times for this ping and update for the next ping.
get_start_end_times(&self, glean: &Glean, storage_name: &str) -> (String, String)96     fn get_start_end_times(&self, glean: &Glean, storage_name: &str) -> (String, String) {
97         let time_unit = TimeUnit::Minute;
98 
99         let start_time = DatetimeMetric::new(
100             CommonMetricData {
101                 name: format!("{}#start", storage_name),
102                 category: "".into(),
103                 send_in_pings: vec![INTERNAL_STORAGE.into()],
104                 lifetime: Lifetime::User,
105                 ..Default::default()
106             },
107             time_unit,
108         );
109 
110         // "start_time" is the time the ping was generated the last time.
111         // If not available, we use the date the Glean object was initialized.
112         let start_time_data = start_time
113             .get_value(glean, INTERNAL_STORAGE)
114             .unwrap_or_else(|| glean.start_time());
115         let end_time_data = local_now_with_offset();
116 
117         // Update the start time with the current time.
118         start_time.set(glean, Some(end_time_data));
119 
120         // Format the times.
121         let start_time_data = get_iso_time_string(start_time_data, time_unit);
122         let end_time_data = get_iso_time_string(end_time_data, time_unit);
123         (start_time_data, end_time_data)
124     }
125 
get_ping_info(&self, glean: &Glean, storage_name: &str, reason: Option<&str>) -> JsonValue126     fn get_ping_info(&self, glean: &Glean, storage_name: &str, reason: Option<&str>) -> JsonValue {
127         let (start_time, end_time) = self.get_start_end_times(glean, storage_name);
128         let mut map = json!({
129             "seq": self.get_ping_seq(glean, storage_name),
130             "start_time": start_time,
131             "end_time": end_time,
132         });
133 
134         if let Some(reason) = reason {
135             map.as_object_mut()
136                 .unwrap() // safe unwrap, we created the object above
137                 .insert("reason".to_string(), JsonValue::String(reason.to_string()));
138         };
139 
140         // Get the experiment data, if available.
141         if let Some(experiment_data) =
142             StorageManager.snapshot_experiments_as_json(glean.storage(), INTERNAL_STORAGE)
143         {
144             map.as_object_mut()
145                 .unwrap() // safe unwrap, we created the object above
146                 .insert("experiments".to_string(), experiment_data);
147         };
148 
149         map
150     }
151 
get_client_info(&self, glean: &Glean, include_client_id: bool) -> JsonValue152     fn get_client_info(&self, glean: &Glean, include_client_id: bool) -> JsonValue {
153         // Add the "telemetry_sdk_build", which is the glean-core version.
154         let mut map = json!({
155             "telemetry_sdk_build": crate::GLEAN_VERSION,
156         });
157 
158         // Flatten the whole thing.
159         if let Some(client_info) =
160             StorageManager.snapshot_as_json(glean.storage(), "glean_client_info", true)
161         {
162             let client_info_obj = client_info.as_object().unwrap(); // safe unwrap, snapshot always returns an object.
163             for (_key, value) in client_info_obj {
164                 merge(&mut map, value);
165             }
166         } else {
167             log::warn!("Empty client info data.");
168         }
169 
170         if !include_client_id {
171             // safe unwrap, we created the object above
172             map.as_object_mut().unwrap().remove("client_id");
173         }
174 
175         json!(map)
176     }
177 
178     /// Build the headers to be persisted and sent with a ping.
179     ///
180     /// Currently the only headers we persist are `X-Debug-ID` and `X-Source-Tags`.
181     ///
182     /// # Arguments
183     ///
184     /// * `glean` - the [`Glean`] instance to collect headers from.
185     ///
186     /// # Returns
187     ///
188     /// A map of header names to header values.
189     /// Might be empty if there are no extra headers to send.
190     /// ```
get_headers(&self, glean: &Glean) -> HeaderMap191     fn get_headers(&self, glean: &Glean) -> HeaderMap {
192         let mut headers_map = HeaderMap::new();
193 
194         if let Some(debug_view_tag) = glean.debug_view_tag() {
195             headers_map.insert("X-Debug-ID".to_string(), debug_view_tag.to_string());
196         }
197 
198         if let Some(source_tags) = glean.source_tags() {
199             headers_map.insert("X-Source-Tags".to_string(), source_tags.join(","));
200         }
201 
202         headers_map
203     }
204 
205     /// Collects a snapshot for the given ping from storage and attach required meta information.
206     ///
207     /// # Arguments
208     ///
209     /// * `glean` - the [`Glean`] instance to collect data from.
210     /// * `ping` - the ping to collect for.
211     /// * `reason` - an optional reason code to include in the ping.
212     /// * `doc_id` - the ping's unique document identifier.
213     /// * `url_path` - the path on the server to upload this ping to.
214     ///
215     /// # Returns
216     ///
217     /// A fully assembled representation of the ping payload and associated metadata.
218     /// If there is no data stored for the ping, `None` is returned.
collect<'a>( &self, glean: &Glean, ping: &'a PingType, reason: Option<&str>, doc_id: &'a str, url_path: &'a str, ) -> Option<Ping<'a>>219     pub fn collect<'a>(
220         &self,
221         glean: &Glean,
222         ping: &'a PingType,
223         reason: Option<&str>,
224         doc_id: &'a str,
225         url_path: &'a str,
226     ) -> Option<Ping<'a>> {
227         info!("Collecting {}", ping.name);
228 
229         let metrics_data = StorageManager.snapshot_as_json(glean.storage(), &ping.name, true);
230         let events_data = glean.event_storage().snapshot_as_json(&ping.name, true);
231 
232         let is_empty = metrics_data.is_none() && events_data.is_none();
233         if !ping.send_if_empty && is_empty {
234             info!("Storage for {} empty. Bailing out.", ping.name);
235             return None;
236         } else if is_empty {
237             info!("Storage for {} empty. Ping will still be sent.", ping.name);
238         }
239 
240         let ping_info = self.get_ping_info(glean, &ping.name, reason);
241         let client_info = self.get_client_info(glean, ping.include_client_id);
242 
243         let mut json = json!({
244             "ping_info": ping_info,
245             "client_info": client_info
246         });
247         let json_obj = json.as_object_mut()?;
248         if let Some(metrics_data) = metrics_data {
249             json_obj.insert("metrics".to_string(), metrics_data);
250         }
251         if let Some(events_data) = events_data {
252             json_obj.insert("events".to_string(), events_data);
253         }
254 
255         Some(Ping {
256             content: json,
257             name: &ping.name,
258             doc_id,
259             url_path,
260             headers: self.get_headers(glean),
261         })
262     }
263 
264     /// Collects a snapshot for the given ping from storage and attach required meta information.
265     ///
266     /// # Arguments
267     ///
268     /// * `glean` - the [`Glean`] instance to collect data from.
269     /// * `ping` - the ping to collect for.
270     /// * `reason` - an optional reason code to include in the ping.
271     ///
272     /// # Returns
273     ///
274     /// A fully assembled ping payload in a string encoded as JSON.
275     /// If there is no data stored for the ping, `None` is returned.
collect_string( &self, glean: &Glean, ping: &PingType, reason: Option<&str>, ) -> Option<String>276     pub fn collect_string(
277         &self,
278         glean: &Glean,
279         ping: &PingType,
280         reason: Option<&str>,
281     ) -> Option<String> {
282         self.collect(glean, ping, reason, "", "")
283             .map(|ping| ::serde_json::to_string_pretty(&ping.content).unwrap())
284     }
285 
286     /// Gets the path to a directory for ping storage.
287     ///
288     /// The directory will be created inside the `data_path`.
289     /// The `pings` directory (and its parents) is created if it does not exist.
get_pings_dir(&self, data_path: &Path, ping_type: Option<&str>) -> std::io::Result<PathBuf>290     fn get_pings_dir(&self, data_path: &Path, ping_type: Option<&str>) -> std::io::Result<PathBuf> {
291         // Use a special directory for deletion-request pings
292         let pings_dir = match ping_type {
293             Some(ping_type) if ping_type == "deletion-request" => {
294                 data_path.join(DELETION_REQUEST_PINGS_DIRECTORY)
295             }
296             _ => data_path.join(PENDING_PINGS_DIRECTORY),
297         };
298 
299         create_dir_all(&pings_dir)?;
300         Ok(pings_dir)
301     }
302 
303     /// Gets path to a directory for temporary storage.
304     ///
305     /// The directory will be created inside the `data_path`.
306     /// The `tmp` directory (and its parents) is created if it does not exist.
get_tmp_dir(&self, data_path: &Path) -> std::io::Result<PathBuf>307     fn get_tmp_dir(&self, data_path: &Path) -> std::io::Result<PathBuf> {
308         let pings_dir = data_path.join("tmp");
309         create_dir_all(&pings_dir)?;
310         Ok(pings_dir)
311     }
312 
313     /// Stores a ping to disk in the pings directory.
store_ping(&self, data_path: &Path, ping: &Ping) -> std::io::Result<()>314     pub fn store_ping(&self, data_path: &Path, ping: &Ping) -> std::io::Result<()> {
315         let pings_dir = self.get_pings_dir(data_path, Some(ping.name))?;
316         let temp_dir = self.get_tmp_dir(data_path)?;
317 
318         // Write to a temporary location and then move when done,
319         // for transactional writes.
320         let temp_ping_path = temp_dir.join(ping.doc_id);
321         let ping_path = pings_dir.join(ping.doc_id);
322 
323         log::debug!(
324             "Storing ping '{}' at '{}'",
325             ping.doc_id,
326             ping_path.display()
327         );
328 
329         {
330             let mut file = File::create(&temp_ping_path)?;
331             file.write_all(ping.url_path.as_bytes())?;
332             file.write_all(b"\n")?;
333             file.write_all(::serde_json::to_string(&ping.content)?.as_bytes())?;
334             if !ping.headers.is_empty() {
335                 file.write_all(b"\n{\"headers\":")?;
336                 file.write_all(::serde_json::to_string(&ping.headers)?.as_bytes())?;
337                 file.write_all(b"}")?;
338             }
339         }
340 
341         if let Err(e) = std::fs::rename(&temp_ping_path, &ping_path) {
342             log::warn!(
343                 "Unable to move '{}' to '{}",
344                 temp_ping_path.display(),
345                 ping_path.display()
346             );
347             return Err(e);
348         }
349 
350         Ok(())
351     }
352 
353     /// Clears any pending pings in the queue.
clear_pending_pings(&self, data_path: &Path) -> Result<()>354     pub fn clear_pending_pings(&self, data_path: &Path) -> Result<()> {
355         let pings_dir = self.get_pings_dir(data_path, None)?;
356 
357         std::fs::remove_dir_all(&pings_dir)?;
358         create_dir_all(&pings_dir)?;
359 
360         log::debug!("All pending pings deleted");
361 
362         Ok(())
363     }
364 }
365 
366 #[cfg(test)]
367 mod test {
368     use super::*;
369     use crate::tests::new_glean;
370 
371     #[test]
sequence_numbers_should_be_reset_when_toggling_uploading()372     fn sequence_numbers_should_be_reset_when_toggling_uploading() {
373         let (mut glean, _) = new_glean(None);
374         let ping_maker = PingMaker::new();
375 
376         assert_eq!(0, ping_maker.get_ping_seq(&glean, "custom"));
377         assert_eq!(1, ping_maker.get_ping_seq(&glean, "custom"));
378 
379         glean.set_upload_enabled(false);
380         assert_eq!(0, ping_maker.get_ping_seq(&glean, "custom"));
381         assert_eq!(0, ping_maker.get_ping_seq(&glean, "custom"));
382 
383         glean.set_upload_enabled(true);
384         assert_eq!(0, ping_maker.get_ping_seq(&glean, "custom"));
385         assert_eq!(1, ping_maker.get_ping_seq(&glean, "custom"));
386     }
387 }
388