1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 use std::{error::Error, fmt}; 6 7 use serde::{Deserialize, Serialize}; 8 9 use super::{Guid, Payload, ServerTimestamp}; 10 11 /// A bridged Sync engine implements all the methods needed to support 12 /// Desktop Sync. 13 pub trait BridgedEngine { 14 /// The type returned for errors. 15 type Error; 16 17 /// Returns the last sync time, in milliseconds, for this engine's 18 /// collection. This is called before each sync, to determine the lower 19 /// bound for new records to fetch from the server. last_sync(&self) -> Result<i64, Self::Error>20 fn last_sync(&self) -> Result<i64, Self::Error>; 21 22 /// Sets the last sync time, in milliseconds. This is called throughout 23 /// the sync, to fast-forward the stored last sync time to match the 24 /// timestamp on the uploaded records. set_last_sync(&self, last_sync_millis: i64) -> Result<(), Self::Error>25 fn set_last_sync(&self, last_sync_millis: i64) -> Result<(), Self::Error>; 26 27 /// Returns the sync ID for this engine's collection. This is only used in 28 /// tests. sync_id(&self) -> Result<Option<String>, Self::Error>29 fn sync_id(&self) -> Result<Option<String>, Self::Error>; 30 31 /// Resets the sync ID for this engine's collection, returning the new ID. 32 /// As a side effect, implementations should reset all local Sync state, 33 /// as in `reset`. reset_sync_id(&self) -> Result<String, Self::Error>34 fn reset_sync_id(&self) -> Result<String, Self::Error>; 35 36 /// Ensures that the locally stored sync ID for this engine's collection 37 /// matches the `new_sync_id` from the server. If the two don't match, 38 /// implementations should reset all local Sync state, as in `reset`. 39 /// This method returns the assigned sync ID, which can be either the 40 /// `new_sync_id`, or a different one if the engine wants to force other 41 /// devices to reset their Sync state for this collection the next time they 42 /// sync. ensure_current_sync_id(&self, new_sync_id: &str) -> Result<String, Self::Error>43 fn ensure_current_sync_id(&self, new_sync_id: &str) -> Result<String, Self::Error>; 44 45 /// Indicates that the engine is about to start syncing. This is called 46 /// once per sync, and always before `store_incoming`. sync_started(&self) -> Result<(), Self::Error>47 fn sync_started(&self) -> Result<(), Self::Error>; 48 49 /// Stages a batch of incoming Sync records. This is called multiple 50 /// times per sync, once for each batch. Implementations can use the 51 /// signal to check if the operation was aborted, and cancel any 52 /// pending work. store_incoming(&self, incoming_cleartexts: &[IncomingEnvelope]) -> Result<(), Self::Error>53 fn store_incoming(&self, incoming_cleartexts: &[IncomingEnvelope]) -> Result<(), Self::Error>; 54 55 /// Applies all staged records, reconciling changes on both sides and 56 /// resolving conflicts. Returns a list of records to upload. apply(&self) -> Result<ApplyResults, Self::Error>57 fn apply(&self) -> Result<ApplyResults, Self::Error>; 58 59 /// Indicates that the given record IDs were uploaded successfully to the 60 /// server. This is called multiple times per sync, once for each batch 61 /// upload. set_uploaded(&self, server_modified_millis: i64, ids: &[Guid]) -> Result<(), Self::Error>62 fn set_uploaded(&self, server_modified_millis: i64, ids: &[Guid]) -> Result<(), Self::Error>; 63 64 /// Indicates that all records have been uploaded. At this point, any record 65 /// IDs marked for upload that haven't been passed to `set_uploaded`, can be 66 /// assumed to have failed: for example, because the server rejected a record 67 /// with an invalid TTL or sort index. sync_finished(&self) -> Result<(), Self::Error>68 fn sync_finished(&self) -> Result<(), Self::Error>; 69 70 /// Resets all local Sync state, including any change flags, mirrors, and 71 /// the last sync time, such that the next sync is treated as a first sync 72 /// with all new local data. Does not erase any local user data. reset(&self) -> Result<(), Self::Error>73 fn reset(&self) -> Result<(), Self::Error>; 74 75 /// Erases all local user data for this collection, and any Sync metadata. 76 /// This method is destructive, and unused for most collections. wipe(&self) -> Result<(), Self::Error>77 fn wipe(&self) -> Result<(), Self::Error>; 78 } 79 80 #[derive(Clone, Debug, Default)] 81 pub struct ApplyResults { 82 /// List of records 83 pub envelopes: Vec<OutgoingEnvelope>, 84 /// The number of incoming records whose contents were merged because they 85 /// changed on both sides. None indicates we aren't reporting this 86 /// information. 87 pub num_reconciled: Option<usize>, 88 } 89 90 impl ApplyResults { new(envelopes: Vec<OutgoingEnvelope>, num_reconciled: impl Into<Option<usize>>) -> Self91 pub fn new(envelopes: Vec<OutgoingEnvelope>, num_reconciled: impl Into<Option<usize>>) -> Self { 92 Self { 93 envelopes, 94 num_reconciled: num_reconciled.into(), 95 } 96 } 97 } 98 99 // Shorthand for engines that don't care. 100 impl From<Vec<OutgoingEnvelope>> for ApplyResults { from(envelopes: Vec<OutgoingEnvelope>) -> Self101 fn from(envelopes: Vec<OutgoingEnvelope>) -> Self { 102 Self { 103 envelopes, 104 num_reconciled: None, 105 } 106 } 107 } 108 109 /// An envelope for an incoming item, passed to `BridgedEngine::store_incoming`. 110 /// Envelopes are a halfway point between BSOs, the format used for all items on 111 /// the Sync server, and records, which are specific to each engine. 112 /// 113 /// A BSO is a JSON object with metadata fields (`id`, `modifed`, `sortindex`), 114 /// and a BSO payload that is itself a JSON string. For encrypted records, the 115 /// BSO payload has a ciphertext, which must be decrypted to yield a cleartext. 116 /// The cleartext is a JSON string (that's three levels of JSON wrapping, if 117 /// you're keeping score: the BSO itself, BSO payload, and cleartext) with the 118 /// actual record payload. 119 /// 120 /// An envelope combines the metadata fields from the BSO, and the cleartext 121 /// from the encrypted BSO payload. 122 #[derive(Clone, Debug, Deserialize)] 123 pub struct IncomingEnvelope { 124 pub id: Guid, 125 pub modified: ServerTimestamp, 126 #[serde(default)] 127 pub sortindex: Option<i32>, 128 #[serde(default)] 129 pub ttl: Option<u32>, 130 // Don't provide access to the cleartext directly. We want all callers to 131 // use `IncomingEnvelope::payload`, so that we can validate the cleartext. 132 cleartext: String, 133 } 134 135 impl IncomingEnvelope { 136 /// Parses and returns the record payload from this envelope. Returns an 137 /// error if the envelope's cleartext isn't valid JSON, or the payload is 138 /// invalid. payload(&self) -> Result<Payload, PayloadError>139 pub fn payload(&self) -> Result<Payload, PayloadError> { 140 let payload: Payload = serde_json::from_str(&self.cleartext)?; 141 if payload.id != self.id { 142 return Err(PayloadError::MismatchedId { 143 envelope: self.id.clone(), 144 payload: payload.id, 145 }); 146 } 147 // Remove auto field data from payload and replace with real data 148 Ok(payload 149 .with_auto_field("ttl", self.ttl) 150 .with_auto_field("sortindex", self.sortindex)) 151 } 152 } 153 154 /// An envelope for an outgoing item, returned from `BridgedEngine::apply`. This 155 /// is similar to `IncomingEnvelope`, but omits fields that are only set by the 156 /// server, like `modified`. 157 #[derive(Clone, Debug, Serialize)] 158 pub struct OutgoingEnvelope { 159 id: Guid, 160 cleartext: String, 161 sortindex: Option<i32>, 162 ttl: Option<u32>, 163 } 164 165 impl From<Payload> for OutgoingEnvelope { from(mut payload: Payload) -> Self166 fn from(mut payload: Payload) -> Self { 167 let id = payload.id.clone(); 168 // Remove auto field data from OutgoingEnvelope payload 169 let ttl = payload.take_auto_field("ttl"); 170 let sortindex = payload.take_auto_field("sortindex"); 171 OutgoingEnvelope { 172 id, 173 cleartext: payload.into_json_string(), 174 sortindex, 175 ttl, 176 } 177 } 178 } 179 180 /// An error that indicates a payload is invalid. 181 #[derive(Debug)] 182 pub enum PayloadError { 183 /// The payload contains invalid JSON. 184 Invalid(serde_json::Error), 185 /// The ID of the BSO in the envelope doesn't match the ID in the payload. 186 MismatchedId { envelope: Guid, payload: Guid }, 187 } 188 189 impl Error for PayloadError {} 190 191 impl From<serde_json::Error> for PayloadError { from(err: serde_json::Error) -> PayloadError192 fn from(err: serde_json::Error) -> PayloadError { 193 PayloadError::Invalid(err) 194 } 195 } 196 197 impl fmt::Display for PayloadError { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result198 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 199 match self { 200 PayloadError::Invalid(err) => err.fmt(f), 201 PayloadError::MismatchedId { envelope, payload } => write!( 202 f, 203 "ID `{}` in envelope doesn't match `{}` in payload", 204 envelope, payload 205 ), 206 } 207 } 208 } 209