1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5 //! Manage recording sync telemetry. Assumes some external telemetry
6 //! library/code which manages submitting.
7
8 use std::collections::HashMap;
9 use std::time;
10
11 use serde::{ser, Serialize, Serializer};
12
13 // A test helper, used by the many test modules below.
14 #[cfg(test)]
assert_json<T: ?Sized>(v: &T, expected: serde_json::Value) where T: serde::Serialize,15 fn assert_json<T: ?Sized>(v: &T, expected: serde_json::Value)
16 where
17 T: serde::Serialize,
18 {
19 assert_eq!(
20 serde_json::to_value(&v).expect("should get a value"),
21 expected
22 );
23 }
24
25 /// What we record for 'when' and 'took' in a telemetry record.
26 #[derive(Debug, Serialize)]
27 struct WhenTook {
28 when: f64,
29 #[serde(skip_serializing_if = "crate::skip_if_default")]
30 took: u64,
31 }
32
33 /// What we track while recording 'when' and 'took. It serializes as a WhenTook,
34 /// except when .finished() hasn't been called, in which case it panics.
35 #[derive(Debug)]
36 enum Stopwatch {
37 Started(time::SystemTime, time::Instant),
38 Finished(WhenTook),
39 }
40
41 impl Default for Stopwatch {
default() -> Self42 fn default() -> Self {
43 Stopwatch::new()
44 }
45 }
46
47 impl Stopwatch {
new() -> Self48 fn new() -> Self {
49 Stopwatch::Started(time::SystemTime::now(), time::Instant::now())
50 }
51
52 // For tests we don't want real timestamps because we test against literals.
53 #[cfg(test)]
finished(&self) -> Self54 fn finished(&self) -> Self {
55 Stopwatch::Finished(WhenTook { when: 0.0, took: 0 })
56 }
57
58 #[cfg(not(test))]
finished(&self) -> Self59 fn finished(&self) -> Self {
60 match self {
61 Stopwatch::Started(st, si) => {
62 let std = st.duration_since(time::UNIX_EPOCH).unwrap_or_default();
63 let when = std.as_secs() as f64; // we don't want sub-sec accuracy. Do we need to write a float?
64
65 let sid = si.elapsed();
66 let took = sid.as_secs() * 1000 + (u64::from(sid.subsec_nanos()) / 1_000_000);
67 Stopwatch::Finished(WhenTook { when, took })
68 }
69 _ => {
70 unreachable!("can't finish twice");
71 }
72 }
73 }
74 }
75
76 impl Serialize for Stopwatch {
serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> where S: Serializer,77 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
78 where
79 S: Serializer,
80 {
81 match self {
82 Stopwatch::Started(_, _) => Err(ser::Error::custom("StopWatch has not been finished")),
83 Stopwatch::Finished(c) => c.serialize(serializer),
84 }
85 }
86 }
87
88 #[cfg(test)]
89 mod stopwatch_tests {
90 use super::*;
91
92 // A wrapper struct because we flatten - this struct should serialize with
93 // 'when' and 'took' keys (but with no 'sw'.)
94 #[derive(Debug, Serialize)]
95 struct WT {
96 #[serde(flatten)]
97 sw: Stopwatch,
98 }
99
100 #[test]
test_not_finished()101 fn test_not_finished() {
102 let wt = WT {
103 sw: Stopwatch::new(),
104 };
105 serde_json::to_string(&wt).expect_err("unfinished stopwatch should fail");
106 }
107
108 #[test]
test()109 fn test() {
110 assert_json(
111 &WT {
112 sw: Stopwatch::Finished(WhenTook { when: 1.0, took: 1 }),
113 },
114 serde_json::json!({"when": 1.0, "took": 1}),
115 );
116 assert_json(
117 &WT {
118 sw: Stopwatch::Finished(WhenTook { when: 1.0, took: 0 }),
119 },
120 serde_json::json!({"when": 1.0}),
121 );
122 }
123 }
124
125 /// A generic "Event" - suitable for all kinds of pings (although this module
126 /// only cares about the sync ping)
127 #[derive(Debug, Serialize)]
128 pub struct Event {
129 // We use static str references as we expect values to be literals.
130 object: &'static str,
131
132 method: &'static str,
133
134 // Maybe "value" should be a string?
135 #[serde(skip_serializing_if = "Option::is_none")]
136 value: Option<&'static str>,
137
138 // we expect the keys to be literals but values are real strings.
139 #[serde(skip_serializing_if = "Option::is_none")]
140 extra: Option<HashMap<&'static str, String>>,
141 }
142
143 impl Event {
new(object: &'static str, method: &'static str) -> Self144 pub fn new(object: &'static str, method: &'static str) -> Self {
145 assert!(object.len() <= 20);
146 assert!(method.len() <= 20);
147 Self {
148 object,
149 method,
150 value: None,
151 extra: None,
152 }
153 }
154
value(mut self, v: &'static str) -> Self155 pub fn value(mut self, v: &'static str) -> Self {
156 assert!(v.len() <= 80);
157 self.value = Some(v);
158 self
159 }
160
extra(mut self, key: &'static str, val: String) -> Self161 pub fn extra(mut self, key: &'static str, val: String) -> Self {
162 assert!(key.len() <= 15);
163 assert!(val.len() <= 85);
164 match self.extra {
165 None => self.extra = Some(HashMap::new()),
166 Some(ref e) => assert!(e.len() < 10),
167 }
168 self.extra.as_mut().unwrap().insert(key, val);
169 self
170 }
171 }
172
173 #[cfg(test)]
174 mod test_events {
175 use super::*;
176
177 #[test]
178 #[should_panic]
test_invalid_length_ctor()179 fn test_invalid_length_ctor() {
180 Event::new("A very long object value", "Method");
181 }
182
183 #[test]
184 #[should_panic]
test_invalid_length_extra_key()185 fn test_invalid_length_extra_key() {
186 Event::new("O", "M").extra("A very long key value", "v".to_string());
187 }
188
189 #[test]
190 #[should_panic]
test_invalid_length_extra_val()191 fn test_invalid_length_extra_val() {
192 let l = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ
193 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
194 Event::new("O", "M").extra("k", l.to_string());
195 }
196
197 #[test]
198 #[should_panic]
test_too_many_extras()199 fn test_too_many_extras() {
200 let l = "abcdefghijk";
201 let mut e = Event::new("Object", "Method");
202 for i in 0..l.len() {
203 e = e.extra(&l[i..=i], "v".to_string());
204 }
205 }
206
207 #[test]
test_json()208 fn test_json() {
209 assert_json(
210 &Event::new("Object", "Method").value("Value"),
211 serde_json::json!({"object": "Object", "method": "Method", "value": "Value"}),
212 );
213
214 assert_json(
215 &Event::new("Object", "Method").extra("one", "one".to_string()),
216 serde_json::json!({"object": "Object",
217 "method": "Method",
218 "extra": {"one": "one"}
219 }),
220 )
221 }
222 }
223
224 /// A Sync failure.
225 #[derive(Debug, Serialize)]
226 #[serde(tag = "name")]
227 pub enum SyncFailure {
228 #[serde(rename = "shutdownerror")]
229 Shutdown,
230
231 #[serde(rename = "othererror")]
232 Other { error: String },
233
234 #[serde(rename = "unexpectederror")]
235 Unexpected { error: String },
236
237 #[serde(rename = "autherror")]
238 Auth { from: &'static str },
239
240 #[serde(rename = "httperror")]
241 Http { code: u16 },
242 }
243
244 #[cfg(test)]
245 mod test {
246 use super::*;
247
248 #[test]
reprs()249 fn reprs() {
250 assert_json(
251 &SyncFailure::Shutdown,
252 serde_json::json!({"name": "shutdownerror"}),
253 );
254
255 assert_json(
256 &SyncFailure::Other {
257 error: "dunno".to_string(),
258 },
259 serde_json::json!({"name": "othererror", "error": "dunno"}),
260 );
261
262 assert_json(
263 &SyncFailure::Unexpected {
264 error: "dunno".to_string(),
265 },
266 serde_json::json!({"name": "unexpectederror", "error": "dunno"}),
267 );
268
269 assert_json(
270 &SyncFailure::Auth { from: "FxA" },
271 serde_json::json!({"name": "autherror", "from": "FxA"}),
272 );
273
274 assert_json(
275 &SyncFailure::Http { code: 500 },
276 serde_json::json!({"name": "httperror", "code": 500}),
277 );
278 }
279 }
280
281 /// Incoming record for an engine's sync
282 #[derive(Debug, Default, Serialize)]
283 pub struct EngineIncoming {
284 #[serde(skip_serializing_if = "crate::skip_if_default")]
285 applied: u32,
286
287 #[serde(skip_serializing_if = "crate::skip_if_default")]
288 failed: u32,
289
290 #[serde(rename = "newFailed")]
291 #[serde(skip_serializing_if = "crate::skip_if_default")]
292 new_failed: u32,
293
294 #[serde(skip_serializing_if = "crate::skip_if_default")]
295 reconciled: u32,
296 }
297
298 impl EngineIncoming {
new() -> Self299 pub fn new() -> Self {
300 Self {
301 ..Default::default()
302 }
303 }
304
305 // A helper used via skip_serializing_if
is_empty(inc: &Option<Self>) -> bool306 fn is_empty(inc: &Option<Self>) -> bool {
307 match inc {
308 Some(a) => a.applied == 0 && a.failed == 0 && a.new_failed == 0 && a.reconciled == 0,
309 None => true,
310 }
311 }
312
313 /// Increment the value of `applied` by `n`.
314 #[inline]
applied(&mut self, n: u32)315 pub fn applied(&mut self, n: u32) {
316 self.applied += n;
317 }
318
319 /// Increment the value of `failed` by `n`.
320 #[inline]
failed(&mut self, n: u32)321 pub fn failed(&mut self, n: u32) {
322 self.failed += n;
323 }
324
325 /// Increment the value of `new_failed` by `n`.
326 #[inline]
new_failed(&mut self, n: u32)327 pub fn new_failed(&mut self, n: u32) {
328 self.new_failed += n;
329 }
330
331 /// Increment the value of `reconciled` by `n`.
332 #[inline]
reconciled(&mut self, n: u32)333 pub fn reconciled(&mut self, n: u32) {
334 self.reconciled += n;
335 }
336
337 /// Get the value of `applied`. Mostly useful for testing.
338 #[inline]
get_applied(&self) -> u32339 pub fn get_applied(&self) -> u32 {
340 self.applied
341 }
342
343 /// Get the value of `failed`. Mostly useful for testing.
344 #[inline]
get_failed(&self) -> u32345 pub fn get_failed(&self) -> u32 {
346 self.failed
347 }
348
349 /// Get the value of `new_failed`. Mostly useful for testing.
350 #[inline]
get_new_failed(&self) -> u32351 pub fn get_new_failed(&self) -> u32 {
352 self.new_failed
353 }
354
355 /// Get the value of `reconciled`. Mostly useful for testing.
356 #[inline]
get_reconciled(&self) -> u32357 pub fn get_reconciled(&self) -> u32 {
358 self.reconciled
359 }
360 }
361
362 /// Outgoing record for an engine's sync
363 #[derive(Debug, Default, Serialize)]
364 pub struct EngineOutgoing {
365 #[serde(skip_serializing_if = "crate::skip_if_default")]
366 sent: usize,
367
368 #[serde(skip_serializing_if = "crate::skip_if_default")]
369 failed: usize,
370 }
371
372 impl EngineOutgoing {
new() -> Self373 pub fn new() -> Self {
374 EngineOutgoing {
375 ..Default::default()
376 }
377 }
378
379 #[inline]
sent(&mut self, n: usize)380 pub fn sent(&mut self, n: usize) {
381 self.sent += n;
382 }
383
384 #[inline]
failed(&mut self, n: usize)385 pub fn failed(&mut self, n: usize) {
386 self.failed += n;
387 }
388 }
389
390 /// One engine's sync.
391 #[derive(Debug, Serialize)]
392 pub struct Engine {
393 name: String,
394
395 #[serde(flatten)]
396 when_took: Stopwatch,
397
398 #[serde(skip_serializing_if = "EngineIncoming::is_empty")]
399 incoming: Option<EngineIncoming>,
400
401 #[serde(skip_serializing_if = "Vec::is_empty")]
402 outgoing: Vec<EngineOutgoing>, // one for each batch posted.
403
404 #[serde(skip_serializing_if = "Option::is_none")]
405 #[serde(rename = "failureReason")]
406 failure: Option<SyncFailure>,
407
408 #[serde(skip_serializing_if = "Option::is_none")]
409 validation: Option<Validation>,
410 }
411
412 impl Engine {
new(name: impl Into<String>) -> Self413 pub fn new(name: impl Into<String>) -> Self {
414 Self {
415 name: name.into(),
416 when_took: Stopwatch::new(),
417 incoming: None,
418 outgoing: Vec::new(),
419 failure: None,
420 validation: None,
421 }
422 }
423
incoming(&mut self, inc: EngineIncoming)424 pub fn incoming(&mut self, inc: EngineIncoming) {
425 assert!(self.incoming.is_none());
426 self.incoming = Some(inc);
427 }
428
outgoing(&mut self, out: EngineOutgoing)429 pub fn outgoing(&mut self, out: EngineOutgoing) {
430 self.outgoing.push(out);
431 }
432
failure(&mut self, err: impl Into<SyncFailure>)433 pub fn failure(&mut self, err: impl Into<SyncFailure>) {
434 // Currently we take the first error, under the assumption that the
435 // first is the most important and all others stem from that.
436 let failure = err.into();
437 if self.failure.is_none() {
438 self.failure = Some(failure);
439 } else {
440 log::warn!(
441 "engine already has recorded a failure of {:?} - ignoring {:?}",
442 &self.failure,
443 &failure
444 );
445 }
446 }
447
validation(&mut self, v: Validation)448 pub fn validation(&mut self, v: Validation) {
449 assert!(self.validation.is_none());
450 self.validation = Some(v);
451 }
452
finished(&mut self)453 fn finished(&mut self) {
454 self.when_took = self.when_took.finished();
455 }
456 }
457
458 #[derive(Debug, Default, Serialize)]
459 pub struct Validation {
460 version: u32,
461
462 #[serde(skip_serializing_if = "Vec::is_empty")]
463 problems: Vec<Problem>,
464
465 #[serde(skip_serializing_if = "Option::is_none")]
466 #[serde(rename = "failureReason")]
467 failure: Option<SyncFailure>,
468 }
469
470 impl Validation {
with_version(version: u32) -> Validation471 pub fn with_version(version: u32) -> Validation {
472 Validation {
473 version,
474 ..Validation::default()
475 }
476 }
477
problem(&mut self, name: &'static str, count: usize) -> &mut Self478 pub fn problem(&mut self, name: &'static str, count: usize) -> &mut Self {
479 if count > 0 {
480 self.problems.push(Problem { name, count });
481 }
482 self
483 }
484 }
485
486 #[derive(Debug, Default, Serialize)]
487 pub struct Problem {
488 name: &'static str,
489 #[serde(skip_serializing_if = "crate::skip_if_default")]
490 count: usize,
491 }
492
493 #[cfg(test)]
494 mod engine_tests {
495 use super::*;
496
497 #[test]
test_engine()498 fn test_engine() {
499 let mut e = Engine::new("test_engine");
500 e.finished();
501 assert_json(&e, serde_json::json!({"name": "test_engine", "when": 0.0}));
502 }
503
504 #[test]
test_engine_not_finished()505 fn test_engine_not_finished() {
506 let e = Engine::new("test_engine");
507 serde_json::to_value(&e).expect_err("unfinished stopwatch should fail");
508 }
509
510 #[test]
test_incoming()511 fn test_incoming() {
512 let mut i = EngineIncoming::new();
513 i.applied(1);
514 i.failed(2);
515 let mut e = Engine::new("TestEngine");
516 e.incoming(i);
517 e.finished();
518 assert_json(
519 &e,
520 serde_json::json!({"name": "TestEngine", "when": 0.0, "incoming": {"applied": 1, "failed": 2}}),
521 );
522 }
523
524 #[test]
test_outgoing()525 fn test_outgoing() {
526 let mut o = EngineOutgoing::new();
527 o.sent(2);
528 o.failed(1);
529 let mut e = Engine::new("TestEngine");
530 e.outgoing(o);
531 e.finished();
532 assert_json(
533 &e,
534 serde_json::json!({"name": "TestEngine", "when": 0.0, "outgoing": [{"sent": 2, "failed": 1}]}),
535 );
536 }
537
538 #[test]
test_failure()539 fn test_failure() {
540 let mut e = Engine::new("TestEngine");
541 e.failure(SyncFailure::Http { code: 500 });
542 e.finished();
543 assert_json(
544 &e,
545 serde_json::json!({"name": "TestEngine",
546 "when": 0.0,
547 "failureReason": {"name": "httperror", "code": 500}
548 }),
549 );
550 }
551
552 #[test]
test_raw()553 fn test_raw() {
554 let mut e = Engine::new("TestEngine");
555 let mut inc = EngineIncoming::new();
556 inc.applied(10);
557 e.incoming(inc);
558 let mut out = EngineOutgoing::new();
559 out.sent(1);
560 e.outgoing(out);
561 e.failure(SyncFailure::Http { code: 500 });
562 e.finished();
563
564 assert_eq!(e.outgoing.len(), 1);
565 assert_eq!(e.incoming.as_ref().unwrap().applied, 10);
566 assert_eq!(e.outgoing[0].sent, 1);
567 assert!(e.failure.is_some());
568 serde_json::to_string(&e).expect("should get json");
569 }
570 }
571
572 /// A single sync. May have many engines, may have its own failure.
573 #[derive(Debug, Serialize, Default)]
574 pub struct SyncTelemetry {
575 #[serde(flatten)]
576 when_took: Stopwatch,
577
578 #[serde(skip_serializing_if = "Vec::is_empty")]
579 engines: Vec<Engine>,
580
581 #[serde(skip_serializing_if = "Option::is_none")]
582 #[serde(rename = "failureReason")]
583 failure: Option<SyncFailure>,
584 }
585
586 impl SyncTelemetry {
new() -> Self587 pub fn new() -> Self {
588 Default::default()
589 }
590
engine(&mut self, mut e: Engine)591 pub fn engine(&mut self, mut e: Engine) {
592 e.finished();
593 self.engines.push(e);
594 }
595
failure(&mut self, failure: SyncFailure)596 pub fn failure(&mut self, failure: SyncFailure) {
597 assert!(self.failure.is_none());
598 self.failure = Some(failure);
599 }
600
601 // Note that unlike other 'finished' methods, this isn't private - someone
602 // needs to explicitly call this before handling the json payload to
603 // whatever ends up submitting it.
finished(&mut self)604 pub fn finished(&mut self) {
605 self.when_took = self.when_took.finished();
606 }
607 }
608
609 #[cfg(test)]
610 mod sync_tests {
611 use super::*;
612
613 #[test]
test_accum()614 fn test_accum() {
615 let mut s = SyncTelemetry::new();
616 let mut inc = EngineIncoming::new();
617 inc.applied(10);
618 let mut e = Engine::new("test_engine");
619 e.incoming(inc);
620 e.failure(SyncFailure::Http { code: 500 });
621 e.finished();
622 s.engine(e);
623 s.finished();
624
625 assert_json(
626 &s,
627 serde_json::json!({
628 "when": 0.0,
629 "engines": [{
630 "name":"test_engine",
631 "when":0.0,
632 "incoming": {
633 "applied": 10
634 },
635 "failureReason": {
636 "name": "httperror",
637 "code": 500
638 }
639 }]
640 }),
641 );
642 }
643
644 #[test]
test_multi_engine()645 fn test_multi_engine() {
646 let mut inc_e1 = EngineIncoming::new();
647 inc_e1.applied(1);
648 let mut e1 = Engine::new("test_engine");
649 e1.incoming(inc_e1);
650
651 let mut inc_e2 = EngineIncoming::new();
652 inc_e2.failed(1);
653 let mut e2 = Engine::new("test_engine_2");
654 e2.incoming(inc_e2);
655 let mut out_e2 = EngineOutgoing::new();
656 out_e2.sent(1);
657 e2.outgoing(out_e2);
658
659 let mut s = SyncTelemetry::new();
660 s.engine(e1);
661 s.engine(e2);
662 s.failure(SyncFailure::Http { code: 500 });
663 s.finished();
664 assert_json(
665 &s,
666 serde_json::json!({
667 "when": 0.0,
668 "engines": [{
669 "name": "test_engine",
670 "when": 0.0,
671 "incoming": {
672 "applied": 1
673 }
674 },{
675 "name": "test_engine_2",
676 "when": 0.0,
677 "incoming": {
678 "failed": 1
679 },
680 "outgoing": [{
681 "sent": 1
682 }]
683 }],
684 "failureReason": {
685 "name": "httperror",
686 "code": 500
687 }
688 }),
689 );
690 }
691 }
692
693 /// The Sync ping payload, as documented at
694 /// https://firefox-source-docs.mozilla.org/toolkit/components/telemetry/telemetry/data/sync-ping.html.
695 /// May have many syncs, may have many events. However, due to the architecture
696 /// of apps which use these components, this payload is almost certainly not
697 /// suitable for submitting directly. For example, we will always return a
698 /// payload with exactly 1 sync, and it will not know certain other fields
699 /// in the payload, such as the *hashed* FxA device ID (see
700 /// https://searchfox.org/mozilla-central/rev/c3ebaf6de2d481c262c04bb9657eaf76bf47e2ac/services/sync/modules/browserid_identity.js#185
701 /// for an example of how the device ID is constructed). The intention is that
702 /// consumers of this will use this to create a "real" payload - eg, accumulating
703 /// until some threshold number of syncs is reached, and contributing
704 /// additional data which only the consumer knows.
705 #[derive(Debug, Serialize, Default)]
706 pub struct SyncTelemetryPing {
707 version: u32,
708
709 uid: Option<String>,
710
711 #[serde(skip_serializing_if = "Vec::is_empty")]
712 events: Vec<Event>,
713
714 #[serde(skip_serializing_if = "Vec::is_empty")]
715 syncs: Vec<SyncTelemetry>,
716 }
717
718 impl SyncTelemetryPing {
new() -> Self719 pub fn new() -> Self {
720 Self {
721 version: 1,
722 ..Default::default()
723 }
724 }
725
uid(&mut self, uid: String)726 pub fn uid(&mut self, uid: String) {
727 if let Some(ref existing) = self.uid {
728 if *existing != uid {
729 log::warn!("existing uid ${} being replaced by {}", existing, uid);
730 }
731 }
732 self.uid = Some(uid);
733 }
734
sync(&mut self, mut s: SyncTelemetry)735 pub fn sync(&mut self, mut s: SyncTelemetry) {
736 s.finished();
737 self.syncs.push(s);
738 }
739
event(&mut self, e: Event)740 pub fn event(&mut self, e: Event) {
741 self.events.push(e);
742 }
743 }
744
745 ffi_support::implement_into_ffi_by_json!(SyncTelemetryPing);
746
747 #[cfg(test)]
748 mod ping_tests {
749 use super::*;
750 #[test]
test_ping()751 fn test_ping() {
752 let engine = Engine::new("test");
753 let mut s = SyncTelemetry::new();
754 s.engine(engine);
755 let mut p = SyncTelemetryPing::new();
756 p.uid("user-id".into());
757 p.sync(s);
758 let event = Event::new("foo", "bar");
759 p.event(event);
760 assert_json(
761 &p,
762 serde_json::json!({
763 "events": [{
764 "method": "bar", "object": "foo"
765 }],
766 "syncs": [{
767 "engines": [{
768 "name": "test", "when": 0.0
769 }],
770 "when": 0.0
771 }],
772 "uid": "user-id",
773 "version": 1
774 }),
775 );
776 }
777 }
778