1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this
3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4 
5 //! Manage recording sync telemetry. Assumes some external telemetry
6 //! library/code which manages submitting.
7 
8 use std::collections::HashMap;
9 use std::time;
10 
11 use serde::{ser, Serialize, Serializer};
12 
13 // A test helper, used by the many test modules below.
14 #[cfg(test)]
assert_json<T: ?Sized>(v: &T, expected: serde_json::Value) where T: serde::Serialize,15 fn assert_json<T: ?Sized>(v: &T, expected: serde_json::Value)
16 where
17     T: serde::Serialize,
18 {
19     assert_eq!(
20         serde_json::to_value(&v).expect("should get a value"),
21         expected
22     );
23 }
24 
25 /// What we record for 'when' and 'took' in a telemetry record.
26 #[derive(Debug, Serialize)]
27 struct WhenTook {
28     when: f64,
29     #[serde(skip_serializing_if = "crate::skip_if_default")]
30     took: u64,
31 }
32 
33 /// What we track while recording 'when' and 'took. It serializes as a WhenTook,
34 /// except when .finished() hasn't been called, in which case it panics.
35 #[derive(Debug)]
36 enum Stopwatch {
37     Started(time::SystemTime, time::Instant),
38     Finished(WhenTook),
39 }
40 
41 impl Default for Stopwatch {
default() -> Self42     fn default() -> Self {
43         Stopwatch::new()
44     }
45 }
46 
47 impl Stopwatch {
new() -> Self48     fn new() -> Self {
49         Stopwatch::Started(time::SystemTime::now(), time::Instant::now())
50     }
51 
52     // For tests we don't want real timestamps because we test against literals.
53     #[cfg(test)]
finished(&self) -> Self54     fn finished(&self) -> Self {
55         Stopwatch::Finished(WhenTook { when: 0.0, took: 0 })
56     }
57 
58     #[cfg(not(test))]
finished(&self) -> Self59     fn finished(&self) -> Self {
60         match self {
61             Stopwatch::Started(st, si) => {
62                 let std = st.duration_since(time::UNIX_EPOCH).unwrap_or_default();
63                 let when = std.as_secs() as f64; // we don't want sub-sec accuracy. Do we need to write a float?
64 
65                 let sid = si.elapsed();
66                 let took = sid.as_secs() * 1000 + (u64::from(sid.subsec_nanos()) / 1_000_000);
67                 Stopwatch::Finished(WhenTook { when, took })
68             }
69             _ => {
70                 unreachable!("can't finish twice");
71             }
72         }
73     }
74 }
75 
76 impl Serialize for Stopwatch {
serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> where S: Serializer,77     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
78     where
79         S: Serializer,
80     {
81         match self {
82             Stopwatch::Started(_, _) => Err(ser::Error::custom("StopWatch has not been finished")),
83             Stopwatch::Finished(c) => c.serialize(serializer),
84         }
85     }
86 }
87 
88 #[cfg(test)]
89 mod stopwatch_tests {
90     use super::*;
91 
92     // A wrapper struct because we flatten - this struct should serialize with
93     // 'when' and 'took' keys (but with no 'sw'.)
94     #[derive(Debug, Serialize)]
95     struct WT {
96         #[serde(flatten)]
97         sw: Stopwatch,
98     }
99 
100     #[test]
test_not_finished()101     fn test_not_finished() {
102         let wt = WT {
103             sw: Stopwatch::new(),
104         };
105         serde_json::to_string(&wt).expect_err("unfinished stopwatch should fail");
106     }
107 
108     #[test]
test()109     fn test() {
110         assert_json(
111             &WT {
112                 sw: Stopwatch::Finished(WhenTook { when: 1.0, took: 1 }),
113             },
114             serde_json::json!({"when": 1.0, "took": 1}),
115         );
116         assert_json(
117             &WT {
118                 sw: Stopwatch::Finished(WhenTook { when: 1.0, took: 0 }),
119             },
120             serde_json::json!({"when": 1.0}),
121         );
122     }
123 }
124 
125 /// A generic "Event" - suitable for all kinds of pings (although this module
126 /// only cares about the sync ping)
127 #[derive(Debug, Serialize)]
128 pub struct Event {
129     // We use static str references as we expect values to be literals.
130     object: &'static str,
131 
132     method: &'static str,
133 
134     // Maybe "value" should be a string?
135     #[serde(skip_serializing_if = "Option::is_none")]
136     value: Option<&'static str>,
137 
138     // we expect the keys to be literals but values are real strings.
139     #[serde(skip_serializing_if = "Option::is_none")]
140     extra: Option<HashMap<&'static str, String>>,
141 }
142 
143 impl Event {
new(object: &'static str, method: &'static str) -> Self144     pub fn new(object: &'static str, method: &'static str) -> Self {
145         assert!(object.len() <= 20);
146         assert!(method.len() <= 20);
147         Self {
148             object,
149             method,
150             value: None,
151             extra: None,
152         }
153     }
154 
value(mut self, v: &'static str) -> Self155     pub fn value(mut self, v: &'static str) -> Self {
156         assert!(v.len() <= 80);
157         self.value = Some(v);
158         self
159     }
160 
extra(mut self, key: &'static str, val: String) -> Self161     pub fn extra(mut self, key: &'static str, val: String) -> Self {
162         assert!(key.len() <= 15);
163         assert!(val.len() <= 85);
164         match self.extra {
165             None => self.extra = Some(HashMap::new()),
166             Some(ref e) => assert!(e.len() < 10),
167         }
168         self.extra.as_mut().unwrap().insert(key, val);
169         self
170     }
171 }
172 
173 #[cfg(test)]
174 mod test_events {
175     use super::*;
176 
177     #[test]
178     #[should_panic]
test_invalid_length_ctor()179     fn test_invalid_length_ctor() {
180         Event::new("A very long object value", "Method");
181     }
182 
183     #[test]
184     #[should_panic]
test_invalid_length_extra_key()185     fn test_invalid_length_extra_key() {
186         Event::new("O", "M").extra("A very long key value", "v".to_string());
187     }
188 
189     #[test]
190     #[should_panic]
test_invalid_length_extra_val()191     fn test_invalid_length_extra_val() {
192         let l = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ
193                 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
194         Event::new("O", "M").extra("k", l.to_string());
195     }
196 
197     #[test]
198     #[should_panic]
test_too_many_extras()199     fn test_too_many_extras() {
200         let l = "abcdefghijk";
201         let mut e = Event::new("Object", "Method");
202         for i in 0..l.len() {
203             e = e.extra(&l[i..=i], "v".to_string());
204         }
205     }
206 
207     #[test]
test_json()208     fn test_json() {
209         assert_json(
210             &Event::new("Object", "Method").value("Value"),
211             serde_json::json!({"object": "Object", "method": "Method", "value": "Value"}),
212         );
213 
214         assert_json(
215             &Event::new("Object", "Method").extra("one", "one".to_string()),
216             serde_json::json!({"object": "Object",
217              "method": "Method",
218              "extra": {"one": "one"}
219             }),
220         )
221     }
222 }
223 
224 /// A Sync failure.
225 #[derive(Debug, Serialize)]
226 #[serde(tag = "name")]
227 pub enum SyncFailure {
228     #[serde(rename = "shutdownerror")]
229     Shutdown,
230 
231     #[serde(rename = "othererror")]
232     Other { error: String },
233 
234     #[serde(rename = "unexpectederror")]
235     Unexpected { error: String },
236 
237     #[serde(rename = "autherror")]
238     Auth { from: &'static str },
239 
240     #[serde(rename = "httperror")]
241     Http { code: u16 },
242 }
243 
244 #[cfg(test)]
245 mod test {
246     use super::*;
247 
248     #[test]
reprs()249     fn reprs() {
250         assert_json(
251             &SyncFailure::Shutdown,
252             serde_json::json!({"name": "shutdownerror"}),
253         );
254 
255         assert_json(
256             &SyncFailure::Other {
257                 error: "dunno".to_string(),
258             },
259             serde_json::json!({"name": "othererror", "error": "dunno"}),
260         );
261 
262         assert_json(
263             &SyncFailure::Unexpected {
264                 error: "dunno".to_string(),
265             },
266             serde_json::json!({"name": "unexpectederror", "error": "dunno"}),
267         );
268 
269         assert_json(
270             &SyncFailure::Auth { from: "FxA" },
271             serde_json::json!({"name": "autherror", "from": "FxA"}),
272         );
273 
274         assert_json(
275             &SyncFailure::Http { code: 500 },
276             serde_json::json!({"name": "httperror", "code": 500}),
277         );
278     }
279 }
280 
281 /// Incoming record for an engine's sync
282 #[derive(Debug, Default, Serialize)]
283 pub struct EngineIncoming {
284     #[serde(skip_serializing_if = "crate::skip_if_default")]
285     applied: u32,
286 
287     #[serde(skip_serializing_if = "crate::skip_if_default")]
288     failed: u32,
289 
290     #[serde(rename = "newFailed")]
291     #[serde(skip_serializing_if = "crate::skip_if_default")]
292     new_failed: u32,
293 
294     #[serde(skip_serializing_if = "crate::skip_if_default")]
295     reconciled: u32,
296 }
297 
298 impl EngineIncoming {
new() -> Self299     pub fn new() -> Self {
300         Self {
301             ..Default::default()
302         }
303     }
304 
305     // A helper used via skip_serializing_if
is_empty(inc: &Option<Self>) -> bool306     fn is_empty(inc: &Option<Self>) -> bool {
307         match inc {
308             Some(a) => a.applied == 0 && a.failed == 0 && a.new_failed == 0 && a.reconciled == 0,
309             None => true,
310         }
311     }
312 
313     /// Increment the value of `applied` by `n`.
314     #[inline]
applied(&mut self, n: u32)315     pub fn applied(&mut self, n: u32) {
316         self.applied += n;
317     }
318 
319     /// Increment the value of `failed` by `n`.
320     #[inline]
failed(&mut self, n: u32)321     pub fn failed(&mut self, n: u32) {
322         self.failed += n;
323     }
324 
325     /// Increment the value of `new_failed` by `n`.
326     #[inline]
new_failed(&mut self, n: u32)327     pub fn new_failed(&mut self, n: u32) {
328         self.new_failed += n;
329     }
330 
331     /// Increment the value of `reconciled` by `n`.
332     #[inline]
reconciled(&mut self, n: u32)333     pub fn reconciled(&mut self, n: u32) {
334         self.reconciled += n;
335     }
336 
337     /// Get the value of `applied`. Mostly useful for testing.
338     #[inline]
get_applied(&self) -> u32339     pub fn get_applied(&self) -> u32 {
340         self.applied
341     }
342 
343     /// Get the value of `failed`. Mostly useful for testing.
344     #[inline]
get_failed(&self) -> u32345     pub fn get_failed(&self) -> u32 {
346         self.failed
347     }
348 
349     /// Get the value of `new_failed`. Mostly useful for testing.
350     #[inline]
get_new_failed(&self) -> u32351     pub fn get_new_failed(&self) -> u32 {
352         self.new_failed
353     }
354 
355     /// Get the value of `reconciled`. Mostly useful for testing.
356     #[inline]
get_reconciled(&self) -> u32357     pub fn get_reconciled(&self) -> u32 {
358         self.reconciled
359     }
360 }
361 
362 /// Outgoing record for an engine's sync
363 #[derive(Debug, Default, Serialize)]
364 pub struct EngineOutgoing {
365     #[serde(skip_serializing_if = "crate::skip_if_default")]
366     sent: usize,
367 
368     #[serde(skip_serializing_if = "crate::skip_if_default")]
369     failed: usize,
370 }
371 
372 impl EngineOutgoing {
new() -> Self373     pub fn new() -> Self {
374         EngineOutgoing {
375             ..Default::default()
376         }
377     }
378 
379     #[inline]
sent(&mut self, n: usize)380     pub fn sent(&mut self, n: usize) {
381         self.sent += n;
382     }
383 
384     #[inline]
failed(&mut self, n: usize)385     pub fn failed(&mut self, n: usize) {
386         self.failed += n;
387     }
388 }
389 
390 /// One engine's sync.
391 #[derive(Debug, Serialize)]
392 pub struct Engine {
393     name: String,
394 
395     #[serde(flatten)]
396     when_took: Stopwatch,
397 
398     #[serde(skip_serializing_if = "EngineIncoming::is_empty")]
399     incoming: Option<EngineIncoming>,
400 
401     #[serde(skip_serializing_if = "Vec::is_empty")]
402     outgoing: Vec<EngineOutgoing>, // one for each batch posted.
403 
404     #[serde(skip_serializing_if = "Option::is_none")]
405     #[serde(rename = "failureReason")]
406     failure: Option<SyncFailure>,
407 
408     #[serde(skip_serializing_if = "Option::is_none")]
409     validation: Option<Validation>,
410 }
411 
412 impl Engine {
new(name: impl Into<String>) -> Self413     pub fn new(name: impl Into<String>) -> Self {
414         Self {
415             name: name.into(),
416             when_took: Stopwatch::new(),
417             incoming: None,
418             outgoing: Vec::new(),
419             failure: None,
420             validation: None,
421         }
422     }
423 
incoming(&mut self, inc: EngineIncoming)424     pub fn incoming(&mut self, inc: EngineIncoming) {
425         assert!(self.incoming.is_none());
426         self.incoming = Some(inc);
427     }
428 
outgoing(&mut self, out: EngineOutgoing)429     pub fn outgoing(&mut self, out: EngineOutgoing) {
430         self.outgoing.push(out);
431     }
432 
failure(&mut self, err: impl Into<SyncFailure>)433     pub fn failure(&mut self, err: impl Into<SyncFailure>) {
434         // Currently we take the first error, under the assumption that the
435         // first is the most important and all others stem from that.
436         let failure = err.into();
437         if self.failure.is_none() {
438             self.failure = Some(failure);
439         } else {
440             log::warn!(
441                 "engine already has recorded a failure of {:?} - ignoring {:?}",
442                 &self.failure,
443                 &failure
444             );
445         }
446     }
447 
validation(&mut self, v: Validation)448     pub fn validation(&mut self, v: Validation) {
449         assert!(self.validation.is_none());
450         self.validation = Some(v);
451     }
452 
finished(&mut self)453     fn finished(&mut self) {
454         self.when_took = self.when_took.finished();
455     }
456 }
457 
458 #[derive(Debug, Default, Serialize)]
459 pub struct Validation {
460     version: u32,
461 
462     #[serde(skip_serializing_if = "Vec::is_empty")]
463     problems: Vec<Problem>,
464 
465     #[serde(skip_serializing_if = "Option::is_none")]
466     #[serde(rename = "failureReason")]
467     failure: Option<SyncFailure>,
468 }
469 
470 impl Validation {
with_version(version: u32) -> Validation471     pub fn with_version(version: u32) -> Validation {
472         Validation {
473             version,
474             ..Validation::default()
475         }
476     }
477 
problem(&mut self, name: &'static str, count: usize) -> &mut Self478     pub fn problem(&mut self, name: &'static str, count: usize) -> &mut Self {
479         if count > 0 {
480             self.problems.push(Problem { name, count });
481         }
482         self
483     }
484 }
485 
486 #[derive(Debug, Default, Serialize)]
487 pub struct Problem {
488     name: &'static str,
489     #[serde(skip_serializing_if = "crate::skip_if_default")]
490     count: usize,
491 }
492 
493 #[cfg(test)]
494 mod engine_tests {
495     use super::*;
496 
497     #[test]
test_engine()498     fn test_engine() {
499         let mut e = Engine::new("test_engine");
500         e.finished();
501         assert_json(&e, serde_json::json!({"name": "test_engine", "when": 0.0}));
502     }
503 
504     #[test]
test_engine_not_finished()505     fn test_engine_not_finished() {
506         let e = Engine::new("test_engine");
507         serde_json::to_value(&e).expect_err("unfinished stopwatch should fail");
508     }
509 
510     #[test]
test_incoming()511     fn test_incoming() {
512         let mut i = EngineIncoming::new();
513         i.applied(1);
514         i.failed(2);
515         let mut e = Engine::new("TestEngine");
516         e.incoming(i);
517         e.finished();
518         assert_json(
519             &e,
520             serde_json::json!({"name": "TestEngine", "when": 0.0, "incoming": {"applied": 1, "failed": 2}}),
521         );
522     }
523 
524     #[test]
test_outgoing()525     fn test_outgoing() {
526         let mut o = EngineOutgoing::new();
527         o.sent(2);
528         o.failed(1);
529         let mut e = Engine::new("TestEngine");
530         e.outgoing(o);
531         e.finished();
532         assert_json(
533             &e,
534             serde_json::json!({"name": "TestEngine", "when": 0.0, "outgoing": [{"sent": 2, "failed": 1}]}),
535         );
536     }
537 
538     #[test]
test_failure()539     fn test_failure() {
540         let mut e = Engine::new("TestEngine");
541         e.failure(SyncFailure::Http { code: 500 });
542         e.finished();
543         assert_json(
544             &e,
545             serde_json::json!({"name": "TestEngine",
546              "when": 0.0,
547              "failureReason": {"name": "httperror", "code": 500}
548             }),
549         );
550     }
551 
552     #[test]
test_raw()553     fn test_raw() {
554         let mut e = Engine::new("TestEngine");
555         let mut inc = EngineIncoming::new();
556         inc.applied(10);
557         e.incoming(inc);
558         let mut out = EngineOutgoing::new();
559         out.sent(1);
560         e.outgoing(out);
561         e.failure(SyncFailure::Http { code: 500 });
562         e.finished();
563 
564         assert_eq!(e.outgoing.len(), 1);
565         assert_eq!(e.incoming.as_ref().unwrap().applied, 10);
566         assert_eq!(e.outgoing[0].sent, 1);
567         assert!(e.failure.is_some());
568         serde_json::to_string(&e).expect("should get json");
569     }
570 }
571 
572 /// A single sync. May have many engines, may have its own failure.
573 #[derive(Debug, Serialize, Default)]
574 pub struct SyncTelemetry {
575     #[serde(flatten)]
576     when_took: Stopwatch,
577 
578     #[serde(skip_serializing_if = "Vec::is_empty")]
579     engines: Vec<Engine>,
580 
581     #[serde(skip_serializing_if = "Option::is_none")]
582     #[serde(rename = "failureReason")]
583     failure: Option<SyncFailure>,
584 }
585 
586 impl SyncTelemetry {
new() -> Self587     pub fn new() -> Self {
588         Default::default()
589     }
590 
engine(&mut self, mut e: Engine)591     pub fn engine(&mut self, mut e: Engine) {
592         e.finished();
593         self.engines.push(e);
594     }
595 
failure(&mut self, failure: SyncFailure)596     pub fn failure(&mut self, failure: SyncFailure) {
597         assert!(self.failure.is_none());
598         self.failure = Some(failure);
599     }
600 
601     // Note that unlike other 'finished' methods, this isn't private - someone
602     // needs to explicitly call this before handling the json payload to
603     // whatever ends up submitting it.
finished(&mut self)604     pub fn finished(&mut self) {
605         self.when_took = self.when_took.finished();
606     }
607 }
608 
609 #[cfg(test)]
610 mod sync_tests {
611     use super::*;
612 
613     #[test]
test_accum()614     fn test_accum() {
615         let mut s = SyncTelemetry::new();
616         let mut inc = EngineIncoming::new();
617         inc.applied(10);
618         let mut e = Engine::new("test_engine");
619         e.incoming(inc);
620         e.failure(SyncFailure::Http { code: 500 });
621         e.finished();
622         s.engine(e);
623         s.finished();
624 
625         assert_json(
626             &s,
627             serde_json::json!({
628                 "when": 0.0,
629                 "engines": [{
630                     "name":"test_engine",
631                     "when":0.0,
632                     "incoming": {
633                         "applied": 10
634                     },
635                     "failureReason": {
636                         "name": "httperror",
637                         "code": 500
638                     }
639                 }]
640             }),
641         );
642     }
643 
644     #[test]
test_multi_engine()645     fn test_multi_engine() {
646         let mut inc_e1 = EngineIncoming::new();
647         inc_e1.applied(1);
648         let mut e1 = Engine::new("test_engine");
649         e1.incoming(inc_e1);
650 
651         let mut inc_e2 = EngineIncoming::new();
652         inc_e2.failed(1);
653         let mut e2 = Engine::new("test_engine_2");
654         e2.incoming(inc_e2);
655         let mut out_e2 = EngineOutgoing::new();
656         out_e2.sent(1);
657         e2.outgoing(out_e2);
658 
659         let mut s = SyncTelemetry::new();
660         s.engine(e1);
661         s.engine(e2);
662         s.failure(SyncFailure::Http { code: 500 });
663         s.finished();
664         assert_json(
665             &s,
666             serde_json::json!({
667                 "when": 0.0,
668                 "engines": [{
669                     "name": "test_engine",
670                     "when": 0.0,
671                     "incoming": {
672                         "applied": 1
673                     }
674                 },{
675                     "name": "test_engine_2",
676                     "when": 0.0,
677                     "incoming": {
678                         "failed": 1
679                     },
680                     "outgoing": [{
681                         "sent": 1
682                     }]
683                 }],
684                 "failureReason": {
685                     "name": "httperror",
686                     "code": 500
687                 }
688             }),
689         );
690     }
691 }
692 
693 /// The Sync ping payload, as documented at
694 /// https://firefox-source-docs.mozilla.org/toolkit/components/telemetry/telemetry/data/sync-ping.html.
695 /// May have many syncs, may have many events. However, due to the architecture
696 /// of apps which use these components, this payload is almost certainly not
697 /// suitable for submitting directly. For example, we will always return a
698 /// payload with exactly 1 sync, and it will not know certain other fields
699 /// in the payload, such as the *hashed* FxA device ID (see
700 /// https://searchfox.org/mozilla-central/rev/c3ebaf6de2d481c262c04bb9657eaf76bf47e2ac/services/sync/modules/browserid_identity.js#185
701 /// for an example of how the device ID is constructed). The intention is that
702 /// consumers of this will use this to create a "real" payload - eg, accumulating
703 /// until some threshold number of syncs is reached, and contributing
704 /// additional data which only the consumer knows.
705 #[derive(Debug, Serialize, Default)]
706 pub struct SyncTelemetryPing {
707     version: u32,
708 
709     uid: Option<String>,
710 
711     #[serde(skip_serializing_if = "Vec::is_empty")]
712     events: Vec<Event>,
713 
714     #[serde(skip_serializing_if = "Vec::is_empty")]
715     syncs: Vec<SyncTelemetry>,
716 }
717 
718 impl SyncTelemetryPing {
new() -> Self719     pub fn new() -> Self {
720         Self {
721             version: 1,
722             ..Default::default()
723         }
724     }
725 
uid(&mut self, uid: String)726     pub fn uid(&mut self, uid: String) {
727         if let Some(ref existing) = self.uid {
728             if *existing != uid {
729                 log::warn!("existing uid ${} being replaced by {}", existing, uid);
730             }
731         }
732         self.uid = Some(uid);
733     }
734 
sync(&mut self, mut s: SyncTelemetry)735     pub fn sync(&mut self, mut s: SyncTelemetry) {
736         s.finished();
737         self.syncs.push(s);
738     }
739 
event(&mut self, e: Event)740     pub fn event(&mut self, e: Event) {
741         self.events.push(e);
742     }
743 }
744 
745 ffi_support::implement_into_ffi_by_json!(SyncTelemetryPing);
746 
747 #[cfg(test)]
748 mod ping_tests {
749     use super::*;
750     #[test]
test_ping()751     fn test_ping() {
752         let engine = Engine::new("test");
753         let mut s = SyncTelemetry::new();
754         s.engine(engine);
755         let mut p = SyncTelemetryPing::new();
756         p.uid("user-id".into());
757         p.sync(s);
758         let event = Event::new("foo", "bar");
759         p.event(event);
760         assert_json(
761             &p,
762             serde_json::json!({
763                 "events": [{
764                     "method": "bar", "object": "foo"
765                 }],
766                 "syncs": [{
767                     "engines": [{
768                         "name": "test", "when": 0.0
769                     }],
770                     "when": 0.0
771                 }],
772                 "uid": "user-id",
773                 "version": 1
774             }),
775         );
776     }
777 }
778