1 // Copyright 2017 The xi-editor Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //! Architecture for synchronizing a CRDT with the ledger. Separated into a
16 //! module so that it is easier to add other sync stores later.
17
18 use std::io::Write;
19 use std::sync::mpsc::{Receiver, RecvError, Sender};
20
21 use log;
22
23 use apps_ledger_services_public::*;
24 use fidl::{self, Future, Promise};
25 use fuchsia::read_entire_vmo;
26 use magenta::{Channel, ChannelOpts, HandleBase};
27 use serde_json;
28
29 use super::ledger::{self, ledger_crash_callback};
30 use tabs::{BufferContainerRef, BufferIdentifier};
31 use xi_rope::engine::Engine;
32
33 // TODO switch these to bincode
state_to_buf(state: &Engine) -> Vec<u8>34 fn state_to_buf(state: &Engine) -> Vec<u8> {
35 serde_json::to_vec(state).unwrap()
36 }
37
buf_to_state(buf: &[u8]) -> Result<Engine, serde_json::Error>38 fn buf_to_state(buf: &[u8]) -> Result<Engine, serde_json::Error> {
39 serde_json::from_slice(buf)
40 }
41
42 /// Stores state needed by the container to perform synchronization.
43 pub struct SyncStore {
44 page: Page_Proxy,
45 key: Vec<u8>,
46 updates: Sender<SyncMsg>,
47 transaction_pending: bool,
48 buffer: BufferIdentifier,
49 }
50
51 impl SyncStore {
52 /// - `page` is a reference to the Ledger page to store data under.
53 /// - `key` is the key the `Syncable` managed by this `SyncStore` will be stored under.
54 /// This example only supports storing things under a single key per page.
55 /// - `updates` is a channel to a `SyncUpdater` that will handle events.
56 ///
57 /// Returns a sync store and schedules the loading of initial
58 /// state and subscribes to state updates for this document.
new( mut page: Page_Proxy, key: Vec<u8>, updates: Sender<SyncMsg>, buffer: BufferIdentifier, ) -> SyncStore59 pub fn new(
60 mut page: Page_Proxy,
61 key: Vec<u8>,
62 updates: Sender<SyncMsg>,
63 buffer: BufferIdentifier,
64 ) -> SyncStore {
65 let (s1, s2) = Channel::create(ChannelOpts::Normal).unwrap();
66 let watcher_client = PageWatcher_Client::from_handle(s1.into_handle());
67 let watcher_client_ptr =
68 ::fidl::InterfacePtr { inner: watcher_client, version: PageWatcher_Metadata::VERSION };
69
70 let watcher = PageWatcherServer { updates: updates.clone(), buffer: buffer.clone() };
71 let _ = fidl::Server::new(watcher, s2).spawn();
72
73 let (mut snap, snap_request) = PageSnapshot_new_pair();
74 page.get_snapshot(snap_request, Some(key.clone()), Some(watcher_client_ptr))
75 .with(ledger_crash_callback);
76
77 let initial_state_chan = updates.clone();
78 let initial_buffer = buffer.clone();
79 snap.get(key.clone()).with(move |raw_res| {
80 match raw_res.map(|res| ledger::value_result(res)) {
81 Ok(Ok(Some(buf))) => {
82 initial_state_chan
83 .send(SyncMsg::NewState {
84 buffer: initial_buffer,
85 new_buf: buf,
86 done: None,
87 })
88 .unwrap();
89 }
90 Ok(Ok(None)) => (), // No initial state saved yet
91 Err(err) => error!("FIDL failed on initial response: {:?}", err),
92 Ok(Err(err)) => error!("Ledger failed to retrieve key: {:?}", err),
93 }
94 });
95
96 SyncStore { page, key, updates, buffer, transaction_pending: false }
97 }
98
99 /// Called whenever this app changed its own state and would like to
100 /// persist the changes to the ledger. Changes can't be committed
101 /// immediately since we have to wait for PageWatcher changes that may not
102 /// have arrived yet.
state_changed(&mut self)103 pub fn state_changed(&mut self) {
104 if !self.transaction_pending {
105 self.transaction_pending = true;
106 let ready_future = self.page.start_transaction();
107 let done_chan = self.updates.clone();
108 let buffer = self.buffer.clone();
109 ready_future.with(move |res| match res {
110 Ok(ledger::OK) => {
111 done_chan.send(SyncMsg::TransactionReady { buffer }).unwrap();
112 }
113 Ok(err_status) => error!("Ledger failed to start transaction: {:?}", err_status),
114 Err(err) => error!("FIDL failed on starting transaction: {:?}", err),
115 });
116 }
117 }
118
119 /// Should be called in SyncContainer::transaction_ready to persist the current state.
commit_transaction(&mut self, state: &Engine)120 pub fn commit_transaction(&mut self, state: &Engine) {
121 assert!(self.transaction_pending, "must call state_changed (and wait) before commit");
122 self.page.put(self.key.clone(), state_to_buf(state)).with(ledger_crash_callback);
123 self.page.commit().with(ledger_crash_callback);
124 self.transaction_pending = false;
125 }
126 }
127
128 /// All the different asynchronous events the updater thread needs to listen for and act on
129 pub enum SyncMsg {
130 NewState {
131 buffer: BufferIdentifier,
132 new_buf: Vec<u8>,
133 done: Option<Promise<Option<PageSnapshot_Server>, fidl::Error>>,
134 },
135 TransactionReady {
136 buffer: BufferIdentifier,
137 },
138 /// Shut down the updater thread
139 Stop,
140 }
141
142 /// We want to be able to register to receive events from inside the
143 /// `SyncStore`/`SyncContainer` but from there we don't have access to the
144 /// Mutex that holds the container, so we give channel Senders to all the
145 /// futures so that they can all trigger events in one place that does have
146 /// the right reference.
147 ///
148 /// Additionally, the individual `Editor`s aren't wrapped in a `Mutex` so we
149 /// have to hold a `BufferContainerRef` and use `BufferIdentifier`s with one
150 /// `SyncUpdater` for all buffers.
151 pub struct SyncUpdater<W: Write> {
152 container_ref: BufferContainerRef<W>,
153 chan: Receiver<SyncMsg>,
154 }
155
156 impl<W: Write + Send + 'static> SyncUpdater<W> {
new(container_ref: BufferContainerRef<W>, chan: Receiver<SyncMsg>) -> SyncUpdater<W>157 pub fn new(container_ref: BufferContainerRef<W>, chan: Receiver<SyncMsg>) -> SyncUpdater<W> {
158 SyncUpdater { container_ref, chan }
159 }
160
161 /// Run this in a thread, it will return when it encounters an error
162 /// reading the channel or when the `Stop` message is recieved.
work(&self) -> Result<(), RecvError>163 pub fn work(&self) -> Result<(), RecvError> {
164 loop {
165 let msg = self.chan.recv()?;
166 match msg {
167 SyncMsg::Stop => return Ok(()),
168 SyncMsg::TransactionReady { buffer } => {
169 let mut container = self.container_ref.lock();
170 // if the buffer was closed, hopefully the page connection was as well, which I hope aborts transactions
171 if let Some(mut editor) = container.editor_for_buffer_mut(&buffer) {
172 editor.transaction_ready();
173 }
174 }
175 SyncMsg::NewState { new_buf, done, buffer } => {
176 let mut container = self.container_ref.lock();
177 match (container.editor_for_buffer_mut(&buffer), buf_to_state(&new_buf)) {
178 (Some(mut editor), Ok(new_state)) => {
179 editor.merge_new_state(new_state);
180 if let Some(promise) = done {
181 promise.set_ok(None);
182 }
183 }
184 (None, _) => (), // buffer was closed
185 (_, Err(err)) => error!("Ledger was set to invalid state: {:?}", err),
186 }
187 }
188 }
189 }
190 }
191 }
192
193 struct PageWatcherServer {
194 updates: Sender<SyncMsg>,
195 buffer: BufferIdentifier,
196 }
197
198 impl PageWatcher for PageWatcherServer {
on_change( &mut self, page_change: PageChange, result_state: ResultState, ) -> Future<Option<PageSnapshot_Server>, fidl::Error>199 fn on_change(
200 &mut self,
201 page_change: PageChange,
202 result_state: ResultState,
203 ) -> Future<Option<PageSnapshot_Server>, fidl::Error> {
204 let (future, done) = Future::make_promise();
205
206 let value_opt = page_change.changes.get(0).and_then(|c| c.value.as_ref());
207 if let (ledger::RESULT_COMPLETED, Some(value_vmo)) = (result_state, value_opt) {
208 let new_buf = read_entire_vmo(value_vmo).expect("failed to read key Vmo");
209 self.updates
210 .send(SyncMsg::NewState { buffer: self.buffer.clone(), new_buf, done: Some(done) })
211 .unwrap();
212 } else {
213 error!("Xi state corrupted, should have one key but has multiple.");
214 // I don't think this should be a FIDL-level error, so set okay
215 done.set_ok(None);
216 }
217
218 future
219 }
220 }
221
222 impl PageWatcher_Stub for PageWatcherServer {
223 // Use default dispatching, but we could override it here.
224 }
225 impl_fidl_stub!(PageWatcherServer: PageWatcher_Stub);
226
227 // ============= Conflict resolution
228
start_conflict_resolver_factory(ledger: &mut Ledger_Proxy, key: Vec<u8>)229 pub fn start_conflict_resolver_factory(ledger: &mut Ledger_Proxy, key: Vec<u8>) {
230 let (s1, s2) = Channel::create(ChannelOpts::Normal).unwrap();
231 let resolver_client = ConflictResolverFactory_Client::from_handle(s1.into_handle());
232 let resolver_client_ptr = ::fidl::InterfacePtr {
233 inner: resolver_client,
234 version: ConflictResolverFactory_Metadata::VERSION,
235 };
236
237 let _ = fidl::Server::new(ConflictResolverFactoryServer { key }, s2).spawn();
238
239 ledger.set_conflict_resolver_factory(Some(resolver_client_ptr)).with(ledger_crash_callback);
240 }
241
242 struct ConflictResolverFactoryServer {
243 key: Vec<u8>,
244 }
245
246 impl ConflictResolverFactory for ConflictResolverFactoryServer {
get_policy(&mut self, _page_id: Vec<u8>) -> Future<MergePolicy, ::fidl::Error>247 fn get_policy(&mut self, _page_id: Vec<u8>) -> Future<MergePolicy, ::fidl::Error> {
248 Future::done(Ok(MergePolicy_Custom))
249 }
250
251 /// Our resolvers are the same for every page
new_conflict_resolver(&mut self, _page_id: Vec<u8>, resolver: ConflictResolver_Server)252 fn new_conflict_resolver(&mut self, _page_id: Vec<u8>, resolver: ConflictResolver_Server) {
253 let _ = fidl::Server::new(
254 ConflictResolverServer { key: self.key.clone() },
255 resolver.into_channel(),
256 )
257 .spawn();
258 }
259 }
260
261 impl ConflictResolverFactory_Stub for ConflictResolverFactoryServer {
262 // Use default dispatching, but we could override it here.
263 }
264 impl_fidl_stub!(ConflictResolverFactoryServer: ConflictResolverFactory_Stub);
265
state_from_snapshot<F>( snapshot: ::fidl::InterfacePtr<PageSnapshot_Client>, key: Vec<u8>, done: F, ) where F: Send + FnOnce(Result<Option<Engine>, ()>) + 'static,266 fn state_from_snapshot<F>(
267 snapshot: ::fidl::InterfacePtr<PageSnapshot_Client>,
268 key: Vec<u8>,
269 done: F,
270 ) where
271 F: Send + FnOnce(Result<Option<Engine>, ()>) + 'static,
272 {
273 assert_eq!(PageSnapshot_Metadata::VERSION, snapshot.version);
274 let mut snapshot_proxy = PageSnapshot_new_Proxy(snapshot.inner);
275 // TODO get a reference when too big
276 snapshot_proxy.get(key).with(move |raw_res| {
277 let state = match raw_res.map(|res| ledger::value_result(res)) {
278 // the .ok() has the behavior of acting like invalid state is empty
279 // and thus deleting invalid state and overwriting it with good state
280 Ok(Ok(Some(buf))) => Ok(buf_to_state(&buf).ok()),
281 Ok(Ok(None)) => {
282 info!("No state in conflicting page");
283 Ok(None)
284 }
285 Err(err) => {
286 warn!("FIDL failed on initial response: {:?}", err);
287 Err(())
288 }
289 Ok(Err(err)) => {
290 warn!("Ledger failed to retrieve key: {:?}", err);
291 Err(())
292 }
293 };
294 done(state);
295 });
296 }
297
298 struct ConflictResolverServer {
299 key: Vec<u8>,
300 }
301
302 impl ConflictResolver for ConflictResolverServer {
resolve( &mut self, left: ::fidl::InterfacePtr<PageSnapshot_Client>, right: ::fidl::InterfacePtr<PageSnapshot_Client>, _common_version: Option<::fidl::InterfacePtr<PageSnapshot_Client>>, result_provider: ::fidl::InterfacePtr<MergeResultProvider_Client>, )303 fn resolve(
304 &mut self,
305 left: ::fidl::InterfacePtr<PageSnapshot_Client>,
306 right: ::fidl::InterfacePtr<PageSnapshot_Client>,
307 _common_version: Option<::fidl::InterfacePtr<PageSnapshot_Client>>,
308 result_provider: ::fidl::InterfacePtr<MergeResultProvider_Client>,
309 ) {
310 // TODO in the futures-rs future, do this in parallel with Future combinators
311 let key2 = self.key.clone();
312 state_from_snapshot(left, self.key.clone(), move |e1_opt| {
313 let key3 = key2.clone();
314 state_from_snapshot(right, key2, move |e2_opt| {
315 let result_opt = match (e1_opt, e2_opt) {
316 (Ok(Some(mut e1)), Ok(Some(e2))) => {
317 e1.merge(&e2);
318 Some(e1)
319 }
320 // one engine didn't exist yet, I'm not sure if Ledger actually generates a conflict in this case
321 (Ok(Some(e)), Ok(None)) | (Ok(None), Ok(Some(e))) => Some(e),
322 // failed to get one of the engines, we can't do the merge properly
323 (Err(()), _) | (_, Err(())) => None,
324 // if state is invalid or missing on both sides, can't merge
325 (Ok(None), Ok(None)) => None,
326 };
327 if let Some(out_state) = result_opt {
328 let buf = state_to_buf(&out_state);
329 // TODO use a reference here when buf is too big
330 let new_value = Some(Box::new(BytesOrReference::Bytes(buf)));
331 let merged = MergedValue {
332 key: key3,
333 source: ValueSource_New,
334 new_value,
335 priority: Priority_Eager,
336 };
337 assert_eq!(MergeResultProvider_Metadata::VERSION, result_provider.version);
338 let mut result_provider_proxy =
339 MergeResultProvider_new_Proxy(result_provider.inner);
340 result_provider_proxy.merge(vec![merged]);
341 result_provider_proxy.done().with(ledger_crash_callback);
342 }
343 });
344 });
345 }
346 }
347
348 impl ConflictResolver_Stub for ConflictResolverServer {
349 // Use default dispatching, but we could override it here.
350 }
351 impl_fidl_stub!(ConflictResolverServer: ConflictResolver_Stub);
352