1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this
3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4 
5 use std::{error::Error, fmt};
6 
7 use serde::{Deserialize, Serialize};
8 
9 use super::{Guid, Payload, ServerTimestamp};
10 
11 /// A bridged Sync engine implements all the methods needed to support
12 /// Desktop Sync.
13 pub trait BridgedEngine {
14     /// The type returned for errors.
15     type Error;
16 
17     /// Returns the last sync time, in milliseconds, for this engine's
18     /// collection. This is called before each sync, to determine the lower
19     /// bound for new records to fetch from the server.
last_sync(&self) -> Result<i64, Self::Error>20     fn last_sync(&self) -> Result<i64, Self::Error>;
21 
22     /// Sets the last sync time, in milliseconds. This is called throughout
23     /// the sync, to fast-forward the stored last sync time to match the
24     /// timestamp on the uploaded records.
set_last_sync(&self, last_sync_millis: i64) -> Result<(), Self::Error>25     fn set_last_sync(&self, last_sync_millis: i64) -> Result<(), Self::Error>;
26 
27     /// Returns the sync ID for this engine's collection. This is only used in
28     /// tests.
sync_id(&self) -> Result<Option<String>, Self::Error>29     fn sync_id(&self) -> Result<Option<String>, Self::Error>;
30 
31     /// Resets the sync ID for this engine's collection, returning the new ID.
32     /// As a side effect, implementations should reset all local Sync state,
33     /// as in `reset`.
reset_sync_id(&self) -> Result<String, Self::Error>34     fn reset_sync_id(&self) -> Result<String, Self::Error>;
35 
36     /// Ensures that the locally stored sync ID for this engine's collection
37     /// matches the `new_sync_id` from the server. If the two don't match,
38     /// implementations should reset all local Sync state, as in `reset`.
39     /// This method returns the assigned sync ID, which can be either the
40     /// `new_sync_id`, or a different one if the engine wants to force other
41     /// devices to reset their Sync state for this collection the next time they
42     /// sync.
ensure_current_sync_id(&self, new_sync_id: &str) -> Result<String, Self::Error>43     fn ensure_current_sync_id(&self, new_sync_id: &str) -> Result<String, Self::Error>;
44 
45     /// Indicates that the engine is about to start syncing. This is called
46     /// once per sync, and always before `store_incoming`.
sync_started(&self) -> Result<(), Self::Error>47     fn sync_started(&self) -> Result<(), Self::Error>;
48 
49     /// Stages a batch of incoming Sync records. This is called multiple
50     /// times per sync, once for each batch. Implementations can use the
51     /// signal to check if the operation was aborted, and cancel any
52     /// pending work.
store_incoming(&self, incoming_cleartexts: &[IncomingEnvelope]) -> Result<(), Self::Error>53     fn store_incoming(&self, incoming_cleartexts: &[IncomingEnvelope]) -> Result<(), Self::Error>;
54 
55     /// Applies all staged records, reconciling changes on both sides and
56     /// resolving conflicts. Returns a list of records to upload.
apply(&self) -> Result<ApplyResults, Self::Error>57     fn apply(&self) -> Result<ApplyResults, Self::Error>;
58 
59     /// Indicates that the given record IDs were uploaded successfully to the
60     /// server. This is called multiple times per sync, once for each batch
61     /// upload.
set_uploaded(&self, server_modified_millis: i64, ids: &[Guid]) -> Result<(), Self::Error>62     fn set_uploaded(&self, server_modified_millis: i64, ids: &[Guid]) -> Result<(), Self::Error>;
63 
64     /// Indicates that all records have been uploaded. At this point, any record
65     /// IDs marked for upload that haven't been passed to `set_uploaded`, can be
66     /// assumed to have failed: for example, because the server rejected a record
67     /// with an invalid TTL or sort index.
sync_finished(&self) -> Result<(), Self::Error>68     fn sync_finished(&self) -> Result<(), Self::Error>;
69 
70     /// Resets all local Sync state, including any change flags, mirrors, and
71     /// the last sync time, such that the next sync is treated as a first sync
72     /// with all new local data. Does not erase any local user data.
reset(&self) -> Result<(), Self::Error>73     fn reset(&self) -> Result<(), Self::Error>;
74 
75     /// Erases all local user data for this collection, and any Sync metadata.
76     /// This method is destructive, and unused for most collections.
wipe(&self) -> Result<(), Self::Error>77     fn wipe(&self) -> Result<(), Self::Error>;
78 }
79 
80 #[derive(Clone, Debug, Default)]
81 pub struct ApplyResults {
82     /// List of records
83     pub envelopes: Vec<OutgoingEnvelope>,
84     /// The number of incoming records whose contents were merged because they
85     /// changed on both sides. None indicates we aren't reporting this
86     /// information.
87     pub num_reconciled: Option<usize>,
88 }
89 
90 impl ApplyResults {
new(envelopes: Vec<OutgoingEnvelope>, num_reconciled: impl Into<Option<usize>>) -> Self91     pub fn new(envelopes: Vec<OutgoingEnvelope>, num_reconciled: impl Into<Option<usize>>) -> Self {
92         Self {
93             envelopes,
94             num_reconciled: num_reconciled.into(),
95         }
96     }
97 }
98 
99 // Shorthand for engines that don't care.
100 impl From<Vec<OutgoingEnvelope>> for ApplyResults {
from(envelopes: Vec<OutgoingEnvelope>) -> Self101     fn from(envelopes: Vec<OutgoingEnvelope>) -> Self {
102         Self {
103             envelopes,
104             num_reconciled: None,
105         }
106     }
107 }
108 
109 /// An envelope for an incoming item, passed to `BridgedEngine::store_incoming`.
110 /// Envelopes are a halfway point between BSOs, the format used for all items on
111 /// the Sync server, and records, which are specific to each engine.
112 ///
113 /// A BSO is a JSON object with metadata fields (`id`, `modifed`, `sortindex`),
114 /// and a BSO payload that is itself a JSON string. For encrypted records, the
115 /// BSO payload has a ciphertext, which must be decrypted to yield a cleartext.
116 /// The cleartext is a JSON string (that's three levels of JSON wrapping, if
117 /// you're keeping score: the BSO itself, BSO payload, and cleartext) with the
118 /// actual record payload.
119 ///
120 /// An envelope combines the metadata fields from the BSO, and the cleartext
121 /// from the encrypted BSO payload.
122 #[derive(Clone, Debug, Deserialize)]
123 pub struct IncomingEnvelope {
124     pub id: Guid,
125     pub modified: ServerTimestamp,
126     #[serde(default)]
127     pub sortindex: Option<i32>,
128     #[serde(default)]
129     pub ttl: Option<u32>,
130     // Don't provide access to the cleartext directly. We want all callers to
131     // use `IncomingEnvelope::payload`, so that we can validate the cleartext.
132     cleartext: String,
133 }
134 
135 impl IncomingEnvelope {
136     /// Parses and returns the record payload from this envelope. Returns an
137     /// error if the envelope's cleartext isn't valid JSON, or the payload is
138     /// invalid.
payload(&self) -> Result<Payload, PayloadError>139     pub fn payload(&self) -> Result<Payload, PayloadError> {
140         let payload: Payload = serde_json::from_str(&self.cleartext)?;
141         if payload.id != self.id {
142             return Err(PayloadError::MismatchedId {
143                 envelope: self.id.clone(),
144                 payload: payload.id,
145             });
146         }
147         // Remove auto field data from payload and replace with real data
148         Ok(payload
149             .with_auto_field("ttl", self.ttl)
150             .with_auto_field("sortindex", self.sortindex))
151     }
152 }
153 
154 /// An envelope for an outgoing item, returned from `BridgedEngine::apply`. This
155 /// is similar to `IncomingEnvelope`, but omits fields that are only set by the
156 /// server, like `modified`.
157 #[derive(Clone, Debug, Serialize)]
158 pub struct OutgoingEnvelope {
159     id: Guid,
160     cleartext: String,
161     sortindex: Option<i32>,
162     ttl: Option<u32>,
163 }
164 
165 impl From<Payload> for OutgoingEnvelope {
from(mut payload: Payload) -> Self166     fn from(mut payload: Payload) -> Self {
167         let id = payload.id.clone();
168         // Remove auto field data from OutgoingEnvelope payload
169         let ttl = payload.take_auto_field("ttl");
170         let sortindex = payload.take_auto_field("sortindex");
171         OutgoingEnvelope {
172             id,
173             cleartext: payload.into_json_string(),
174             sortindex,
175             ttl,
176         }
177     }
178 }
179 
180 /// An error that indicates a payload is invalid.
181 #[derive(Debug)]
182 pub enum PayloadError {
183     /// The payload contains invalid JSON.
184     Invalid(serde_json::Error),
185     /// The ID of the BSO in the envelope doesn't match the ID in the payload.
186     MismatchedId { envelope: Guid, payload: Guid },
187 }
188 
189 impl Error for PayloadError {}
190 
191 impl From<serde_json::Error> for PayloadError {
from(err: serde_json::Error) -> PayloadError192     fn from(err: serde_json::Error) -> PayloadError {
193         PayloadError::Invalid(err)
194     }
195 }
196 
197 impl fmt::Display for PayloadError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result198     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199         match self {
200             PayloadError::Invalid(err) => err.fmt(f),
201             PayloadError::MismatchedId { envelope, payload } => write!(
202                 f,
203                 "ID `{}` in envelope doesn't match `{}` in payload",
204                 envelope, payload
205             ),
206         }
207     }
208 }
209