1 // Copyright 2017 The xi-editor Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 //! Architecture for synchronizing a CRDT with the ledger. Separated into a
16 //! module so that it is easier to add other sync stores later.
17 
18 use std::io::Write;
19 use std::sync::mpsc::{Receiver, RecvError, Sender};
20 
21 use log;
22 
23 use apps_ledger_services_public::*;
24 use fidl::{self, Future, Promise};
25 use fuchsia::read_entire_vmo;
26 use magenta::{Channel, ChannelOpts, HandleBase};
27 use serde_json;
28 
29 use super::ledger::{self, ledger_crash_callback};
30 use tabs::{BufferContainerRef, BufferIdentifier};
31 use xi_rope::engine::Engine;
32 
33 // TODO switch these to bincode
state_to_buf(state: &Engine) -> Vec<u8>34 fn state_to_buf(state: &Engine) -> Vec<u8> {
35     serde_json::to_vec(state).unwrap()
36 }
37 
buf_to_state(buf: &[u8]) -> Result<Engine, serde_json::Error>38 fn buf_to_state(buf: &[u8]) -> Result<Engine, serde_json::Error> {
39     serde_json::from_slice(buf)
40 }
41 
42 /// Stores state needed by the container to perform synchronization.
43 pub struct SyncStore {
44     page: Page_Proxy,
45     key: Vec<u8>,
46     updates: Sender<SyncMsg>,
47     transaction_pending: bool,
48     buffer: BufferIdentifier,
49 }
50 
51 impl SyncStore {
52     /// - `page` is a reference to the Ledger page to store data under.
53     /// - `key` is the key the `Syncable` managed by this `SyncStore` will be stored under.
54     ///    This example only supports storing things under a single key per page.
55     /// - `updates` is a channel to a `SyncUpdater` that will handle events.
56     ///
57     /// Returns a sync store and schedules the loading of initial
58     /// state and subscribes to state updates for this document.
new( mut page: Page_Proxy, key: Vec<u8>, updates: Sender<SyncMsg>, buffer: BufferIdentifier, ) -> SyncStore59     pub fn new(
60         mut page: Page_Proxy,
61         key: Vec<u8>,
62         updates: Sender<SyncMsg>,
63         buffer: BufferIdentifier,
64     ) -> SyncStore {
65         let (s1, s2) = Channel::create(ChannelOpts::Normal).unwrap();
66         let watcher_client = PageWatcher_Client::from_handle(s1.into_handle());
67         let watcher_client_ptr =
68             ::fidl::InterfacePtr { inner: watcher_client, version: PageWatcher_Metadata::VERSION };
69 
70         let watcher = PageWatcherServer { updates: updates.clone(), buffer: buffer.clone() };
71         let _ = fidl::Server::new(watcher, s2).spawn();
72 
73         let (mut snap, snap_request) = PageSnapshot_new_pair();
74         page.get_snapshot(snap_request, Some(key.clone()), Some(watcher_client_ptr))
75             .with(ledger_crash_callback);
76 
77         let initial_state_chan = updates.clone();
78         let initial_buffer = buffer.clone();
79         snap.get(key.clone()).with(move |raw_res| {
80             match raw_res.map(|res| ledger::value_result(res)) {
81                 Ok(Ok(Some(buf))) => {
82                     initial_state_chan
83                         .send(SyncMsg::NewState {
84                             buffer: initial_buffer,
85                             new_buf: buf,
86                             done: None,
87                         })
88                         .unwrap();
89                 }
90                 Ok(Ok(None)) => (), // No initial state saved yet
91                 Err(err) => error!("FIDL failed on initial response: {:?}", err),
92                 Ok(Err(err)) => error!("Ledger failed to retrieve key: {:?}", err),
93             }
94         });
95 
96         SyncStore { page, key, updates, buffer, transaction_pending: false }
97     }
98 
99     /// Called whenever this app changed its own state and would like to
100     /// persist the changes to the ledger. Changes can't be committed
101     /// immediately since we have to wait for PageWatcher changes that may not
102     /// have arrived yet.
state_changed(&mut self)103     pub fn state_changed(&mut self) {
104         if !self.transaction_pending {
105             self.transaction_pending = true;
106             let ready_future = self.page.start_transaction();
107             let done_chan = self.updates.clone();
108             let buffer = self.buffer.clone();
109             ready_future.with(move |res| match res {
110                 Ok(ledger::OK) => {
111                     done_chan.send(SyncMsg::TransactionReady { buffer }).unwrap();
112                 }
113                 Ok(err_status) => error!("Ledger failed to start transaction: {:?}", err_status),
114                 Err(err) => error!("FIDL failed on starting transaction: {:?}", err),
115             });
116         }
117     }
118 
119     /// Should be called in SyncContainer::transaction_ready to persist the current state.
commit_transaction(&mut self, state: &Engine)120     pub fn commit_transaction(&mut self, state: &Engine) {
121         assert!(self.transaction_pending, "must call state_changed (and wait) before commit");
122         self.page.put(self.key.clone(), state_to_buf(state)).with(ledger_crash_callback);
123         self.page.commit().with(ledger_crash_callback);
124         self.transaction_pending = false;
125     }
126 }
127 
128 /// All the different asynchronous events the updater thread needs to listen for and act on
129 pub enum SyncMsg {
130     NewState {
131         buffer: BufferIdentifier,
132         new_buf: Vec<u8>,
133         done: Option<Promise<Option<PageSnapshot_Server>, fidl::Error>>,
134     },
135     TransactionReady {
136         buffer: BufferIdentifier,
137     },
138     /// Shut down the updater thread
139     Stop,
140 }
141 
142 /// We want to be able to register to receive events from inside the
143 /// `SyncStore`/`SyncContainer` but from there we don't have access to the
144 /// Mutex that holds the container, so we give channel Senders to all the
145 /// futures so that they can all trigger events in one place that does have
146 /// the right reference.
147 ///
148 /// Additionally, the individual `Editor`s aren't wrapped in a `Mutex` so we
149 /// have to hold a `BufferContainerRef` and use `BufferIdentifier`s with one
150 /// `SyncUpdater` for all buffers.
151 pub struct SyncUpdater<W: Write> {
152     container_ref: BufferContainerRef<W>,
153     chan: Receiver<SyncMsg>,
154 }
155 
156 impl<W: Write + Send + 'static> SyncUpdater<W> {
new(container_ref: BufferContainerRef<W>, chan: Receiver<SyncMsg>) -> SyncUpdater<W>157     pub fn new(container_ref: BufferContainerRef<W>, chan: Receiver<SyncMsg>) -> SyncUpdater<W> {
158         SyncUpdater { container_ref, chan }
159     }
160 
161     /// Run this in a thread, it will return when it encounters an error
162     /// reading the channel or when the `Stop` message is recieved.
work(&self) -> Result<(), RecvError>163     pub fn work(&self) -> Result<(), RecvError> {
164         loop {
165             let msg = self.chan.recv()?;
166             match msg {
167                 SyncMsg::Stop => return Ok(()),
168                 SyncMsg::TransactionReady { buffer } => {
169                     let mut container = self.container_ref.lock();
170                     // if the buffer was closed, hopefully the page connection was as well, which I hope aborts transactions
171                     if let Some(mut editor) = container.editor_for_buffer_mut(&buffer) {
172                         editor.transaction_ready();
173                     }
174                 }
175                 SyncMsg::NewState { new_buf, done, buffer } => {
176                     let mut container = self.container_ref.lock();
177                     match (container.editor_for_buffer_mut(&buffer), buf_to_state(&new_buf)) {
178                         (Some(mut editor), Ok(new_state)) => {
179                             editor.merge_new_state(new_state);
180                             if let Some(promise) = done {
181                                 promise.set_ok(None);
182                             }
183                         }
184                         (None, _) => (), // buffer was closed
185                         (_, Err(err)) => error!("Ledger was set to invalid state: {:?}", err),
186                     }
187                 }
188             }
189         }
190     }
191 }
192 
193 struct PageWatcherServer {
194     updates: Sender<SyncMsg>,
195     buffer: BufferIdentifier,
196 }
197 
198 impl PageWatcher for PageWatcherServer {
on_change( &mut self, page_change: PageChange, result_state: ResultState, ) -> Future<Option<PageSnapshot_Server>, fidl::Error>199     fn on_change(
200         &mut self,
201         page_change: PageChange,
202         result_state: ResultState,
203     ) -> Future<Option<PageSnapshot_Server>, fidl::Error> {
204         let (future, done) = Future::make_promise();
205 
206         let value_opt = page_change.changes.get(0).and_then(|c| c.value.as_ref());
207         if let (ledger::RESULT_COMPLETED, Some(value_vmo)) = (result_state, value_opt) {
208             let new_buf = read_entire_vmo(value_vmo).expect("failed to read key Vmo");
209             self.updates
210                 .send(SyncMsg::NewState { buffer: self.buffer.clone(), new_buf, done: Some(done) })
211                 .unwrap();
212         } else {
213             error!("Xi state corrupted, should have one key but has multiple.");
214             // I don't think this should be a FIDL-level error, so set okay
215             done.set_ok(None);
216         }
217 
218         future
219     }
220 }
221 
222 impl PageWatcher_Stub for PageWatcherServer {
223     // Use default dispatching, but we could override it here.
224 }
225 impl_fidl_stub!(PageWatcherServer: PageWatcher_Stub);
226 
227 // ============= Conflict resolution
228 
start_conflict_resolver_factory(ledger: &mut Ledger_Proxy, key: Vec<u8>)229 pub fn start_conflict_resolver_factory(ledger: &mut Ledger_Proxy, key: Vec<u8>) {
230     let (s1, s2) = Channel::create(ChannelOpts::Normal).unwrap();
231     let resolver_client = ConflictResolverFactory_Client::from_handle(s1.into_handle());
232     let resolver_client_ptr = ::fidl::InterfacePtr {
233         inner: resolver_client,
234         version: ConflictResolverFactory_Metadata::VERSION,
235     };
236 
237     let _ = fidl::Server::new(ConflictResolverFactoryServer { key }, s2).spawn();
238 
239     ledger.set_conflict_resolver_factory(Some(resolver_client_ptr)).with(ledger_crash_callback);
240 }
241 
242 struct ConflictResolverFactoryServer {
243     key: Vec<u8>,
244 }
245 
246 impl ConflictResolverFactory for ConflictResolverFactoryServer {
get_policy(&mut self, _page_id: Vec<u8>) -> Future<MergePolicy, ::fidl::Error>247     fn get_policy(&mut self, _page_id: Vec<u8>) -> Future<MergePolicy, ::fidl::Error> {
248         Future::done(Ok(MergePolicy_Custom))
249     }
250 
251     /// Our resolvers are the same for every page
new_conflict_resolver(&mut self, _page_id: Vec<u8>, resolver: ConflictResolver_Server)252     fn new_conflict_resolver(&mut self, _page_id: Vec<u8>, resolver: ConflictResolver_Server) {
253         let _ = fidl::Server::new(
254             ConflictResolverServer { key: self.key.clone() },
255             resolver.into_channel(),
256         )
257         .spawn();
258     }
259 }
260 
261 impl ConflictResolverFactory_Stub for ConflictResolverFactoryServer {
262     // Use default dispatching, but we could override it here.
263 }
264 impl_fidl_stub!(ConflictResolverFactoryServer: ConflictResolverFactory_Stub);
265 
state_from_snapshot<F>( snapshot: ::fidl::InterfacePtr<PageSnapshot_Client>, key: Vec<u8>, done: F, ) where F: Send + FnOnce(Result<Option<Engine>, ()>) + 'static,266 fn state_from_snapshot<F>(
267     snapshot: ::fidl::InterfacePtr<PageSnapshot_Client>,
268     key: Vec<u8>,
269     done: F,
270 ) where
271     F: Send + FnOnce(Result<Option<Engine>, ()>) + 'static,
272 {
273     assert_eq!(PageSnapshot_Metadata::VERSION, snapshot.version);
274     let mut snapshot_proxy = PageSnapshot_new_Proxy(snapshot.inner);
275     // TODO get a reference when too big
276     snapshot_proxy.get(key).with(move |raw_res| {
277         let state = match raw_res.map(|res| ledger::value_result(res)) {
278             // the .ok() has the behavior of acting like invalid state is empty
279             // and thus deleting invalid state and overwriting it with good state
280             Ok(Ok(Some(buf))) => Ok(buf_to_state(&buf).ok()),
281             Ok(Ok(None)) => {
282                 info!("No state in conflicting page");
283                 Ok(None)
284             }
285             Err(err) => {
286                 warn!("FIDL failed on initial response: {:?}", err);
287                 Err(())
288             }
289             Ok(Err(err)) => {
290                 warn!("Ledger failed to retrieve key: {:?}", err);
291                 Err(())
292             }
293         };
294         done(state);
295     });
296 }
297 
298 struct ConflictResolverServer {
299     key: Vec<u8>,
300 }
301 
302 impl ConflictResolver for ConflictResolverServer {
resolve( &mut self, left: ::fidl::InterfacePtr<PageSnapshot_Client>, right: ::fidl::InterfacePtr<PageSnapshot_Client>, _common_version: Option<::fidl::InterfacePtr<PageSnapshot_Client>>, result_provider: ::fidl::InterfacePtr<MergeResultProvider_Client>, )303     fn resolve(
304         &mut self,
305         left: ::fidl::InterfacePtr<PageSnapshot_Client>,
306         right: ::fidl::InterfacePtr<PageSnapshot_Client>,
307         _common_version: Option<::fidl::InterfacePtr<PageSnapshot_Client>>,
308         result_provider: ::fidl::InterfacePtr<MergeResultProvider_Client>,
309     ) {
310         // TODO in the futures-rs future, do this in parallel with Future combinators
311         let key2 = self.key.clone();
312         state_from_snapshot(left, self.key.clone(), move |e1_opt| {
313             let key3 = key2.clone();
314             state_from_snapshot(right, key2, move |e2_opt| {
315                 let result_opt = match (e1_opt, e2_opt) {
316                     (Ok(Some(mut e1)), Ok(Some(e2))) => {
317                         e1.merge(&e2);
318                         Some(e1)
319                     }
320                     // one engine didn't exist yet, I'm not sure if Ledger actually generates a conflict in this case
321                     (Ok(Some(e)), Ok(None)) | (Ok(None), Ok(Some(e))) => Some(e),
322                     // failed to get one of the engines, we can't do the merge properly
323                     (Err(()), _) | (_, Err(())) => None,
324                     // if state is invalid or missing on both sides, can't merge
325                     (Ok(None), Ok(None)) => None,
326                 };
327                 if let Some(out_state) = result_opt {
328                     let buf = state_to_buf(&out_state);
329                     // TODO use a reference here when buf is too big
330                     let new_value = Some(Box::new(BytesOrReference::Bytes(buf)));
331                     let merged = MergedValue {
332                         key: key3,
333                         source: ValueSource_New,
334                         new_value,
335                         priority: Priority_Eager,
336                     };
337                     assert_eq!(MergeResultProvider_Metadata::VERSION, result_provider.version);
338                     let mut result_provider_proxy =
339                         MergeResultProvider_new_Proxy(result_provider.inner);
340                     result_provider_proxy.merge(vec![merged]);
341                     result_provider_proxy.done().with(ledger_crash_callback);
342                 }
343             });
344         });
345     }
346 }
347 
348 impl ConflictResolver_Stub for ConflictResolverServer {
349     // Use default dispatching, but we could override it here.
350 }
351 impl_fidl_stub!(ConflictResolverServer: ConflictResolver_Stub);
352