1 //! Apply a change.
2 use crate::change::{Atom, Change, EdgeMap, NewVertex};
3 use crate::changestore::ChangeStore;
4 use crate::missing_context::*;
5 use crate::pristine::*;
6 use crate::record::InodeUpdate;
7 use crate::{HashMap, HashSet};
8 use thiserror::Error;
9 pub(crate) mod edge;
10 pub(crate) use edge::*;
11 mod vertex;
12 pub(crate) use vertex::*;
13 
14 #[derive(Debug, Error)]
15 pub enum ApplyError<ChangestoreError: std::error::Error, TxnError: std::error::Error + 'static> {
16     #[error("Changestore error: {0}")]
17     Changestore(ChangestoreError),
18     #[error("Local change error: {err}")]
19     LocalChange {
20         #[from]
21         err: LocalApplyError<TxnError>,
22     },
23 }
24 
25 #[derive(Debug, Error)]
26 pub enum LocalApplyError<TxnError: std::error::Error + 'static> {
27     #[error("Dependency missing: {:?}", hash)]
28     DependencyMissing { hash: crate::pristine::Hash },
29     #[error("Change already on channel: {:?}", hash)]
30     ChangeAlreadyOnChannel { hash: crate::pristine::Hash },
31     #[error("Transaction error: {0}")]
32     Txn(TxnError),
33     #[error("Block error: {:?}", block)]
34     Block { block: Position<ChangeId> },
35     #[error("Invalid change")]
36     InvalidChange,
37 }
38 
39 impl<TxnError: std::error::Error> LocalApplyError<TxnError> {
from_missing(err: MissingError<TxnError>) -> Self40     fn from_missing(err: MissingError<TxnError>) -> Self {
41         match err {
42             MissingError::Txn(e) => LocalApplyError::Txn(e),
43             MissingError::Block(e) => e.into(),
44             MissingError::Inconsistent(_) => LocalApplyError::InvalidChange,
45         }
46     }
47 }
48 
49 impl<T: std::error::Error> From<crate::pristine::InconsistentChange<T>> for LocalApplyError<T> {
from(err: crate::pristine::InconsistentChange<T>) -> Self50     fn from(err: crate::pristine::InconsistentChange<T>) -> Self {
51         match err {
52             InconsistentChange::Txn(e) => LocalApplyError::Txn(e),
53             _ => LocalApplyError::InvalidChange,
54         }
55     }
56 }
57 
58 impl<T: std::error::Error> From<crate::pristine::TxnErr<T>> for LocalApplyError<T> {
from(err: crate::pristine::TxnErr<T>) -> Self59     fn from(err: crate::pristine::TxnErr<T>) -> Self {
60         LocalApplyError::Txn(err.0)
61     }
62 }
63 
64 impl<C: std::error::Error, T: std::error::Error> From<crate::pristine::TxnErr<T>>
65     for ApplyError<C, T>
66 {
from(err: crate::pristine::TxnErr<T>) -> Self67     fn from(err: crate::pristine::TxnErr<T>) -> Self {
68         LocalApplyError::Txn(err.0).into()
69     }
70 }
71 
72 impl<T: std::error::Error> From<crate::pristine::BlockError<T>> for LocalApplyError<T> {
from(err: crate::pristine::BlockError<T>) -> Self73     fn from(err: crate::pristine::BlockError<T>) -> Self {
74         match err {
75             BlockError::Txn(e) => LocalApplyError::Txn(e),
76             BlockError::Block { block } => LocalApplyError::Block { block },
77         }
78     }
79 }
80 
81 /// Apply a change to a channel. This function does not update the
82 /// inodes/tree tables, i.e. the correspondence between the pristine
83 /// and the working copy. Therefore, this function must be used only
84 /// on remote changes, or on "bare" repositories.
apply_change_ws<T: MutTxnT, P: ChangeStore>( changes: &P, txn: &mut T, channel: &mut T::Channel, hash: &Hash, workspace: &mut Workspace, ) -> Result<(u64, Merkle), ApplyError<P::Error, T::GraphError>>85 pub fn apply_change_ws<T: MutTxnT, P: ChangeStore>(
86     changes: &P,
87     txn: &mut T,
88     channel: &mut T::Channel,
89     hash: &Hash,
90     workspace: &mut Workspace,
91 ) -> Result<(u64, Merkle), ApplyError<P::Error, T::GraphError>> {
92     debug!("apply_change {:?}", hash.to_base32());
93     workspace.clear();
94     let change = changes.get_change(&hash).map_err(ApplyError::Changestore)?;
95 
96     for hash in change.dependencies.iter() {
97         if let Hash::None = hash {
98             continue;
99         }
100         if let Some(int) = txn.get_internal(&hash.into())? {
101             if txn.get_changeset(txn.changes(&channel), int)?.is_some() {
102                 continue;
103             }
104         }
105         return Err((LocalApplyError::DependencyMissing { hash: *hash }).into());
106     }
107 
108     let internal = if let Some(&p) = txn.get_internal(&hash.into())? {
109         p
110     } else {
111         let internal: ChangeId = make_changeid(txn, &hash)?;
112         register_change(txn, &internal, hash, &change)?;
113         internal
114     };
115     debug!("internal = {:?}", internal);
116     Ok(apply_change_to_channel(
117         txn, channel, internal, &hash, &change, workspace,
118     )?)
119 }
120 
apply_change_rec_ws<T: TxnT + MutTxnT, P: ChangeStore>( changes: &P, txn: &mut T, channel: &mut T::Channel, hash: &Hash, workspace: &mut Workspace, deps_only: bool, ) -> Result<(), ApplyError<P::Error, T::GraphError>>121 pub fn apply_change_rec_ws<T: TxnT + MutTxnT, P: ChangeStore>(
122     changes: &P,
123     txn: &mut T,
124     channel: &mut T::Channel,
125     hash: &Hash,
126     workspace: &mut Workspace,
127     deps_only: bool,
128 ) -> Result<(), ApplyError<P::Error, T::GraphError>> {
129     debug!("apply_change {:?}", hash.to_base32());
130     workspace.clear();
131     let mut dep_stack = vec![(*hash, true, !deps_only)];
132     let mut visited = HashSet::default();
133     while let Some((hash, first, actually_apply)) = dep_stack.pop() {
134         let change = changes.get_change(&hash).map_err(ApplyError::Changestore)?;
135         let shash: SerializedHash = (&hash).into();
136         if first {
137             if !visited.insert(hash) {
138                 continue;
139             }
140             if let Some(change_id) = txn.get_internal(&shash)? {
141                 if txn
142                     .get_changeset(txn.changes(&channel), change_id)?
143                     .is_some()
144                 {
145                     continue;
146                 }
147             }
148 
149             dep_stack.push((hash, false, actually_apply));
150             for &hash in change.dependencies.iter() {
151                 if let Hash::None = hash {
152                     continue;
153                 }
154                 dep_stack.push((hash, true, true))
155             }
156         } else if actually_apply {
157             let applied = if let Some(int) = txn.get_internal(&shash)? {
158                 txn.get_changeset(txn.changes(&channel), int)?.is_some()
159             } else {
160                 false
161             };
162             if !applied {
163                 let internal = if let Some(&p) = txn.get_internal(&shash)? {
164                     p
165                 } else {
166                     let internal: ChangeId = make_changeid(txn, &hash)?;
167                     register_change(txn, &internal, &hash, &change)?;
168                     internal
169                 };
170                 debug!("internal = {:?}", internal);
171                 workspace.clear();
172                 apply_change_to_channel(txn, channel, internal, &hash, &change, workspace)?;
173             }
174         }
175     }
176     Ok(())
177 }
178 
179 /// Same as [apply_change_ws], but allocates its own workspace.
apply_change<T: MutTxnT, P: ChangeStore>( changes: &P, txn: &mut T, channel: &mut T::Channel, hash: &Hash, ) -> Result<(u64, Merkle), ApplyError<P::Error, T::GraphError>>180 pub fn apply_change<T: MutTxnT, P: ChangeStore>(
181     changes: &P,
182     txn: &mut T,
183     channel: &mut T::Channel,
184     hash: &Hash,
185 ) -> Result<(u64, Merkle), ApplyError<P::Error, T::GraphError>> {
186     apply_change_ws(changes, txn, channel, hash, &mut Workspace::new())
187 }
188 
189 /// Same as [apply_change], but with a wrapped `txn` and `channel`.
apply_change_arc<T: MutTxnT, P: ChangeStore>( changes: &P, txn: &ArcTxn<T>, channel: &ChannelRef<T>, hash: &Hash, ) -> Result<(u64, Merkle), ApplyError<P::Error, T::GraphError>>190 pub fn apply_change_arc<T: MutTxnT, P: ChangeStore>(
191     changes: &P,
192     txn: &ArcTxn<T>,
193     channel: &ChannelRef<T>,
194     hash: &Hash,
195 ) -> Result<(u64, Merkle), ApplyError<P::Error, T::GraphError>> {
196     apply_change_ws(
197         changes,
198         &mut *txn.write(),
199         &mut *channel.write(),
200         hash,
201         &mut Workspace::new(),
202     )
203 }
204 
205 /// Same as [apply_change_ws], but allocates its own workspace.
apply_change_rec<T: MutTxnT, P: ChangeStore>( changes: &P, txn: &mut T, channel: &mut T::Channel, hash: &Hash, deps_only: bool, ) -> Result<(), ApplyError<P::Error, T::GraphError>>206 pub fn apply_change_rec<T: MutTxnT, P: ChangeStore>(
207     changes: &P,
208     txn: &mut T,
209     channel: &mut T::Channel,
210     hash: &Hash,
211     deps_only: bool,
212 ) -> Result<(), ApplyError<P::Error, T::GraphError>> {
213     apply_change_rec_ws(
214         changes,
215         txn,
216         channel,
217         hash,
218         &mut Workspace::new(),
219         deps_only,
220     )
221 }
222 
apply_change_to_channel<T: ChannelMutTxnT>( txn: &mut T, channel: &mut T::Channel, change_id: ChangeId, hash: &Hash, change: &Change, ws: &mut Workspace, ) -> Result<(u64, Merkle), LocalApplyError<T::GraphError>>223 fn apply_change_to_channel<T: ChannelMutTxnT>(
224     txn: &mut T,
225     channel: &mut T::Channel,
226     change_id: ChangeId,
227     hash: &Hash,
228     change: &Change,
229     ws: &mut Workspace,
230 ) -> Result<(u64, Merkle), LocalApplyError<T::GraphError>> {
231     ws.assert_empty();
232     let n = txn.apply_counter(channel);
233     debug!("apply_change_to_channel {:?} {:?}", change_id, hash);
234     let merkle =
235         if let Some(m) = txn.put_changes(channel, change_id, txn.apply_counter(channel), hash)? {
236             m
237         } else {
238             return Err(LocalApplyError::ChangeAlreadyOnChannel { hash: *hash });
239         };
240     debug!("apply change to channel");
241     let now = std::time::Instant::now();
242     for change_ in change.changes.iter() {
243         debug!("Applying {:?} (1)", change_);
244         for change_ in change_.iter() {
245             match *change_ {
246                 Atom::NewVertex(ref n) => {
247                     put_newvertex(txn, T::graph_mut(channel), change, ws, change_id, n)?
248                 }
249                 Atom::EdgeMap(ref n) => {
250                     for edge in n.edges.iter() {
251                         if !edge.flag.contains(EdgeFlags::DELETED) {
252                             put_newedge(
253                                 txn,
254                                 T::graph_mut(channel),
255                                 ws,
256                                 change_id,
257                                 n.inode,
258                                 edge,
259                                 |_, _| true,
260                                 |h| change.knows(h),
261                             )?;
262                         }
263                     }
264                 }
265             }
266         }
267     }
268     for change_ in change.changes.iter() {
269         debug!("Applying {:?} (2)", change_);
270         for change_ in change_.iter() {
271             if let Atom::EdgeMap(ref n) = *change_ {
272                 for edge in n.edges.iter() {
273                     if edge.flag.contains(EdgeFlags::DELETED) {
274                         put_newedge(
275                             txn,
276                             T::graph_mut(channel),
277                             ws,
278                             change_id,
279                             n.inode,
280                             edge,
281                             |_, _| true,
282                             |h| change.knows(h),
283                         )?;
284                     }
285                 }
286             }
287         }
288     }
289     crate::TIMERS.lock().unwrap().apply += now.elapsed();
290 
291     clean_obsolete_pseudo_edges(txn, T::graph_mut(channel), ws, change_id)?;
292 
293     info!("repairing missing contexts");
294     repair_missing_contexts(txn, T::graph_mut(channel), ws, change_id, change)?;
295     detect_folder_conflict_resolutions(
296         txn,
297         T::graph_mut(channel),
298         &mut ws.missing_context,
299         change_id,
300         change,
301     )
302     .map_err(LocalApplyError::from_missing)?;
303 
304     repair_cyclic_paths(txn, T::graph_mut(channel), ws)?;
305     info!("done applying change");
306     Ok((n, merkle))
307 }
308 
309 /// Apply a change created locally: serialize it, compute its hash, and
310 /// apply it. This function also registers changes in the filesystem
311 /// introduced by the change (file additions, deletions and moves), to
312 /// synchronise the pristine and the working copy after the
313 /// application.
apply_local_change_ws< T: ChannelMutTxnT + DepsMutTxnT<DepsError = <T as GraphTxnT>::GraphError> + TreeMutTxnT<TreeError = <T as GraphTxnT>::GraphError>, >( txn: &mut T, channel: &ChannelRef<T>, change: &Change, hash: &Hash, inode_updates: &HashMap<usize, InodeUpdate>, workspace: &mut Workspace, ) -> Result<(u64, Merkle), LocalApplyError<T::GraphError>>314 pub fn apply_local_change_ws<
315     T: ChannelMutTxnT
316         + DepsMutTxnT<DepsError = <T as GraphTxnT>::GraphError>
317         + TreeMutTxnT<TreeError = <T as GraphTxnT>::GraphError>,
318 >(
319     txn: &mut T,
320     channel: &ChannelRef<T>,
321     change: &Change,
322     hash: &Hash,
323     inode_updates: &HashMap<usize, InodeUpdate>,
324     workspace: &mut Workspace,
325 ) -> Result<(u64, Merkle), LocalApplyError<T::GraphError>> {
326     let mut channel = channel.write();
327     let internal: ChangeId = make_changeid(txn, hash)?;
328     debug!("make_changeid {:?} {:?}", hash, internal);
329 
330     for hash in change.dependencies.iter() {
331         if let Hash::None = hash {
332             continue;
333         }
334         if let Some(int) = txn.get_internal(&hash.into())? {
335             if txn.get_changeset(txn.changes(&channel), int)?.is_some() {
336                 continue;
337             }
338         }
339         return Err((LocalApplyError::DependencyMissing { hash: *hash }).into());
340     }
341 
342     register_change(txn, &internal, hash, &change)?;
343     let n = apply_change_to_channel(txn, &mut channel, internal, &hash, &change, workspace)?;
344     for (_, update) in inode_updates.iter() {
345         info!("updating {:?}", update);
346         update_inode(txn, &channel, internal, update)?;
347     }
348     Ok(n)
349 }
350 
351 /// Same as [apply_local_change_ws], but allocates its own workspace.
apply_local_change< T: ChannelMutTxnT + DepsMutTxnT<DepsError = <T as GraphTxnT>::GraphError> + TreeMutTxnT<TreeError = <T as GraphTxnT>::GraphError>, >( txn: &mut T, channel: &ChannelRef<T>, change: &Change, hash: &Hash, inode_updates: &HashMap<usize, InodeUpdate>, ) -> Result<(u64, Merkle), LocalApplyError<T::GraphError>>352 pub fn apply_local_change<
353     T: ChannelMutTxnT
354         + DepsMutTxnT<DepsError = <T as GraphTxnT>::GraphError>
355         + TreeMutTxnT<TreeError = <T as GraphTxnT>::GraphError>,
356 >(
357     txn: &mut T,
358     channel: &ChannelRef<T>,
359     change: &Change,
360     hash: &Hash,
361     inode_updates: &HashMap<usize, InodeUpdate>,
362 ) -> Result<(u64, Merkle), LocalApplyError<T::GraphError>> {
363     apply_local_change_ws(
364         txn,
365         channel,
366         change,
367         hash,
368         inode_updates,
369         &mut Workspace::new(),
370     )
371 }
372 
update_inode<T: ChannelTxnT + TreeMutTxnT<TreeError = <T as GraphTxnT>::GraphError>>( txn: &mut T, channel: &T::Channel, internal: ChangeId, update: &InodeUpdate, ) -> Result<(), LocalApplyError<T::TreeError>>373 fn update_inode<T: ChannelTxnT + TreeMutTxnT<TreeError = <T as GraphTxnT>::GraphError>>(
374     txn: &mut T,
375     channel: &T::Channel,
376     internal: ChangeId,
377     update: &InodeUpdate,
378 ) -> Result<(), LocalApplyError<T::TreeError>> {
379     debug!("update_inode {:?}", update);
380     match *update {
381         InodeUpdate::Add { inode, pos, .. } => {
382             let vertex = Position {
383                 change: internal,
384                 pos,
385             };
386             if txn
387                 .get_graph(txn.graph(channel), &vertex.inode_vertex(), None)?
388                 .is_some()
389             {
390                 debug!("Adding inodes: {:?} {:?}", inode, vertex);
391                 put_inodes_with_rev(txn, &inode, &vertex)?;
392             } else {
393                 debug!("Not adding inodes: {:?} {:?}", inode, vertex);
394             }
395         }
396         InodeUpdate::Deleted { inode } => {
397             if let Some(parent) = txn.get_revtree(&inode, None)?.map(|x| x.to_owned()) {
398                 del_tree_with_rev(txn, &parent, &inode)?;
399             }
400             // Delete the directory, if it's there.
401             txn.del_tree(&OwnedPathId::inode(inode), Some(&inode))?;
402             if let Some(&vertex) = txn.get_inodes(&inode, None)? {
403                 del_inodes_with_rev(txn, &inode, &vertex)?;
404             }
405         }
406     }
407     Ok(())
408 }
409 
410 #[derive(Default)]
411 pub struct Workspace {
412     parents: HashSet<Vertex<ChangeId>>,
413     children: HashSet<Vertex<ChangeId>>,
414     pseudo: Vec<(Vertex<ChangeId>, SerializedEdge, Position<Option<Hash>>)>,
415     deleted_by: HashSet<ChangeId>,
416     up_context: Vec<Vertex<ChangeId>>,
417     down_context: Vec<Vertex<ChangeId>>,
418     pub(crate) missing_context: crate::missing_context::Workspace,
419     rooted: HashMap<Vertex<ChangeId>, bool>,
420     adjbuf: Vec<SerializedEdge>,
421 }
422 
423 impl Workspace {
new() -> Self424     pub fn new() -> Self {
425         Self::default()
426     }
clear(&mut self)427     fn clear(&mut self) {
428         self.children.clear();
429         self.parents.clear();
430         self.pseudo.clear();
431         self.deleted_by.clear();
432         self.up_context.clear();
433         self.down_context.clear();
434         self.missing_context.clear();
435         self.rooted.clear();
436         self.adjbuf.clear();
437     }
assert_empty(&self)438     fn assert_empty(&self) {
439         assert!(self.children.is_empty());
440         assert!(self.parents.is_empty());
441         assert!(self.pseudo.is_empty());
442         assert!(self.deleted_by.is_empty());
443         assert!(self.up_context.is_empty());
444         assert!(self.down_context.is_empty());
445         self.missing_context.assert_empty();
446         assert!(self.rooted.is_empty());
447         assert!(self.adjbuf.is_empty());
448     }
449 }
450 
clean_obsolete_pseudo_edges<T: GraphMutTxnT>( txn: &mut T, channel: &mut T::Graph, ws: &mut Workspace, change_id: ChangeId, ) -> Result<(), LocalApplyError<T::GraphError>>451 pub(crate) fn clean_obsolete_pseudo_edges<T: GraphMutTxnT>(
452     txn: &mut T,
453     channel: &mut T::Graph,
454     ws: &mut Workspace,
455     change_id: ChangeId,
456 ) -> Result<(), LocalApplyError<T::GraphError>> {
457     for (next_vertex, p, inode) in ws.pseudo.drain(..) {
458         let (a, b) = if p.flag().is_parent() {
459             if let Ok(&dest) = txn.find_block_end(channel, p.dest()) {
460                 (dest, next_vertex)
461             } else {
462                 continue;
463             }
464         } else if let Ok(&dest) = txn.find_block(channel, p.dest()) {
465             (next_vertex, dest)
466         } else {
467             continue;
468         };
469         let a_is_alive = is_alive(txn, channel, &a)?;
470         let b_is_alive = is_alive(txn, channel, &b)?;
471         if a_is_alive && b_is_alive {
472             continue;
473         }
474         debug!(
475             "Deleting {:?} {:?} {:?} {:?}",
476             a,
477             b,
478             p.introduced_by(),
479             p.flag()
480         );
481         del_graph_with_rev(
482             txn,
483             channel,
484             p.flag() - EdgeFlags::PARENT,
485             a,
486             b,
487             p.introduced_by(),
488         )?;
489         if a_is_alive {
490             debug!("repair down");
491             debug_assert!(!b_is_alive);
492             crate::missing_context::repair_missing_down_context(
493                 txn,
494                 channel,
495                 &mut ws.missing_context,
496                 inode,
497                 b,
498                 &[a],
499             )
500             .map_err(LocalApplyError::from_missing)?
501         } else if b_is_alive && !p.flag().is_folder() {
502             debug!("repair up");
503             crate::missing_context::repair_missing_up_context(
504                 txn,
505                 channel,
506                 &mut ws.missing_context,
507                 change_id,
508                 inode,
509                 a,
510                 &[b],
511             )
512             .map_err(LocalApplyError::from_missing)?
513         }
514     }
515     Ok(())
516 }
517 
repair_missing_contexts<T: GraphMutTxnT>( txn: &mut T, channel: &mut T::Graph, ws: &mut Workspace, change_id: ChangeId, change: &Change, ) -> Result<(), LocalApplyError<T::GraphError>>518 fn repair_missing_contexts<T: GraphMutTxnT>(
519     txn: &mut T,
520     channel: &mut T::Graph,
521     ws: &mut Workspace,
522     change_id: ChangeId,
523     change: &Change,
524 ) -> Result<(), LocalApplyError<T::GraphError>> {
525     let now = std::time::Instant::now();
526     crate::missing_context::repair_parents_of_deleted(txn, channel, &mut ws.missing_context)
527         .map_err(LocalApplyError::from_missing)?;
528     for atom in change.changes.iter().flat_map(|r| r.iter()) {
529         match atom {
530             Atom::NewVertex(ref n) if !n.flag.is_folder() => {
531                 let vertex = Vertex {
532                     change: change_id,
533                     start: n.start,
534                     end: n.end,
535                 };
536                 repair_new_vertex_context_up(txn, channel, ws, change_id, n, vertex)?;
537                 repair_new_vertex_context_down(txn, channel, ws, change_id, n, vertex)?;
538             }
539             Atom::NewVertex(_) => {}
540             Atom::EdgeMap(ref n) => {
541                 repair_edge_context(txn, channel, ws, change_id, change, n)?;
542             }
543         }
544     }
545     crate::missing_context::delete_pseudo_edges(txn, channel, &mut ws.missing_context)
546         .map_err(LocalApplyError::from_missing)?;
547     crate::TIMERS.lock().unwrap().repair_context += now.elapsed();
548     Ok(())
549 }
550 
repair_new_vertex_context_up<T: GraphMutTxnT>( txn: &mut T, channel: &mut T::Graph, ws: &mut Workspace, change_id: ChangeId, n: &NewVertex<Option<Hash>>, vertex: Vertex<ChangeId>, ) -> Result<(), LocalApplyError<T::GraphError>>551 fn repair_new_vertex_context_up<T: GraphMutTxnT>(
552     txn: &mut T,
553     channel: &mut T::Graph,
554     ws: &mut Workspace,
555     change_id: ChangeId,
556     n: &NewVertex<Option<Hash>>,
557     vertex: Vertex<ChangeId>,
558 ) -> Result<(), LocalApplyError<T::GraphError>> {
559     for up in n.up_context.iter() {
560         let up = *txn.find_block_end(channel, internal_pos(txn, &up, change_id)?)?;
561         if !is_alive(txn, channel, &up)? {
562             debug!("repairing missing up context {:?} {:?}", up, vertex);
563             repair_missing_up_context(
564                 txn,
565                 channel,
566                 &mut ws.missing_context,
567                 change_id,
568                 n.inode,
569                 up,
570                 &[vertex],
571             )
572             .map_err(LocalApplyError::from_missing)?
573         }
574     }
575     Ok(())
576 }
577 
repair_new_vertex_context_down<T: GraphMutTxnT>( txn: &mut T, channel: &mut T::Graph, ws: &mut Workspace, change_id: ChangeId, n: &NewVertex<Option<Hash>>, vertex: Vertex<ChangeId>, ) -> Result<(), LocalApplyError<T::GraphError>>578 fn repair_new_vertex_context_down<T: GraphMutTxnT>(
579     txn: &mut T,
580     channel: &mut T::Graph,
581     ws: &mut Workspace,
582     change_id: ChangeId,
583     n: &NewVertex<Option<Hash>>,
584     vertex: Vertex<ChangeId>,
585 ) -> Result<(), LocalApplyError<T::GraphError>> {
586     debug!("repairing missing context for {:?}", vertex);
587     if n.flag.contains(EdgeFlags::FOLDER) {
588         return Ok(());
589     }
590     'outer: for down in n.down_context.iter() {
591         let down = *txn.find_block(channel, internal_pos(txn, &down, change_id)?)?;
592         for e in iter_adjacent(
593             txn,
594             channel,
595             down,
596             EdgeFlags::PARENT,
597             EdgeFlags::all() - EdgeFlags::DELETED,
598         )? {
599             let e = e?;
600             if e.introduced_by() != change_id {
601                 continue 'outer;
602             }
603         }
604         debug!("repairing missing down context {:?} {:?}", down, vertex);
605         repair_missing_down_context(
606             txn,
607             channel,
608             &mut ws.missing_context,
609             n.inode,
610             down,
611             &[vertex],
612         )
613         .map_err(LocalApplyError::from_missing)?
614     }
615     Ok(())
616 }
617 
repair_edge_context<T: GraphMutTxnT>( txn: &mut T, channel: &mut T::Graph, ws: &mut Workspace, change_id: ChangeId, change: &Change, n: &EdgeMap<Option<Hash>>, ) -> Result<(), LocalApplyError<T::GraphError>>618 fn repair_edge_context<T: GraphMutTxnT>(
619     txn: &mut T,
620     channel: &mut T::Graph,
621     ws: &mut Workspace,
622     change_id: ChangeId,
623     change: &Change,
624     n: &EdgeMap<Option<Hash>>,
625 ) -> Result<(), LocalApplyError<T::GraphError>> {
626     for e in n.edges.iter() {
627         assert!(!e.flag.contains(EdgeFlags::PARENT));
628         if e.flag.contains(EdgeFlags::DELETED) {
629             trace!("repairing context deleted {:?}", e);
630             repair_context_deleted(
631                 txn,
632                 channel,
633                 &mut ws.missing_context,
634                 n.inode,
635                 change_id,
636                 |h| change.knows(&h),
637                 e,
638             )
639             .map_err(LocalApplyError::from_missing)?
640         } else {
641             trace!("repairing context nondeleted {:?}", e);
642             repair_context_nondeleted(txn, channel, &mut ws.missing_context, n.inode, change_id, e)
643                 .map_err(LocalApplyError::from_missing)?
644         }
645     }
646     Ok(())
647 }
648 
repair_cyclic_paths<T: GraphMutTxnT>( txn: &mut T, channel: &mut T::Graph, ws: &mut Workspace, ) -> Result<(), LocalApplyError<T::GraphError>>649 pub(crate) fn repair_cyclic_paths<T: GraphMutTxnT>(
650     txn: &mut T,
651     channel: &mut T::Graph,
652     ws: &mut Workspace,
653 ) -> Result<(), LocalApplyError<T::GraphError>> {
654     let now = std::time::Instant::now();
655     let mut files = std::mem::replace(&mut ws.missing_context.files, HashSet::default());
656     for file in files.drain() {
657         if file.is_empty() {
658             if !is_rooted(txn, channel, file, ws)? {
659                 repair_edge(txn, channel, file, ws)?
660             }
661         } else {
662             let f0 = EdgeFlags::FOLDER;
663             let f1 = EdgeFlags::FOLDER | EdgeFlags::BLOCK | EdgeFlags::PSEUDO;
664             let mut iter = iter_adjacent(txn, channel, file, f0, f1)?;
665             if let Some(ee) = iter.next() {
666                 let ee = ee?;
667                 let dest = ee.dest().inode_vertex();
668                 if !is_rooted(txn, channel, dest, ws)? {
669                     repair_edge(txn, channel, dest, ws)?
670                 }
671             }
672         }
673     }
674     ws.missing_context.files = files;
675     crate::TIMERS.lock().unwrap().check_cyclic_paths += now.elapsed();
676     Ok(())
677 }
678 
repair_edge<T: GraphMutTxnT>( txn: &mut T, channel: &mut T::Graph, to0: Vertex<ChangeId>, ws: &mut Workspace, ) -> Result<(), LocalApplyError<T::GraphError>>679 fn repair_edge<T: GraphMutTxnT>(
680     txn: &mut T,
681     channel: &mut T::Graph,
682     to0: Vertex<ChangeId>,
683     ws: &mut Workspace,
684 ) -> Result<(), LocalApplyError<T::GraphError>> {
685     debug!("repair_edge {:?}", to0);
686     let mut stack = vec![(to0, true, true, true)];
687     ws.parents.clear();
688     while let Some((current, _, al, anc_al)) = stack.pop() {
689         if !ws.parents.insert(current) {
690             continue;
691         }
692         debug!("repair_cyclic {:?}", current);
693         if current != to0 {
694             stack.push((current, true, al, anc_al));
695         }
696         if current.is_root() {
697             debug!("root");
698             break;
699         }
700         if let Some(&true) = ws.rooted.get(&current) {
701             debug!("rooted");
702             break;
703         }
704         let f = EdgeFlags::PARENT | EdgeFlags::FOLDER;
705         let len = stack.len();
706         for parent in iter_adjacent(txn, channel, current, f, EdgeFlags::all())? {
707             let parent = parent?;
708             if parent.flag().is_parent() {
709                 let anc = txn.find_block_end(channel, parent.dest())?;
710                 debug!("is_rooted, parent = {:?}", parent);
711                 let al = if let Some(e) = iter_adjacent(
712                     txn,
713                     channel,
714                     *anc,
715                     f,
716                     f | EdgeFlags::BLOCK | EdgeFlags::PSEUDO,
717                 )?
718                 .next()
719                 {
720                     e?;
721                     true
722                 } else {
723                     false
724                 };
725                 debug!("al = {:?}, flag = {:?}", al, parent.flag());
726                 stack.push((*anc, false, parent.flag().is_deleted(), al));
727             }
728         }
729         if stack.len() == len {
730             stack.pop();
731         } else {
732             (&mut stack[len..]).sort_unstable_by(|a, b| a.3.cmp(&b.3))
733         }
734     }
735     let mut current = to0;
736     for (next, on_path, del, _) in stack {
737         if on_path {
738             if del {
739                 put_graph_with_rev(
740                     txn,
741                     channel,
742                     EdgeFlags::FOLDER | EdgeFlags::PSEUDO,
743                     next,
744                     current,
745                     ChangeId::ROOT,
746                 )?;
747             }
748             current = next
749         }
750     }
751     ws.parents.clear();
752     Ok(())
753 }
754 
is_rooted<T: GraphTxnT>( txn: &T, channel: &T::Graph, v: Vertex<ChangeId>, ws: &mut Workspace, ) -> Result<bool, LocalApplyError<T::GraphError>>755 fn is_rooted<T: GraphTxnT>(
756     txn: &T,
757     channel: &T::Graph,
758     v: Vertex<ChangeId>,
759     ws: &mut Workspace,
760 ) -> Result<bool, LocalApplyError<T::GraphError>> {
761     let mut alive = false;
762     assert!(v.is_empty());
763     for e in iter_adjacent(txn, channel, v, EdgeFlags::empty(), EdgeFlags::all())? {
764         let e = e?;
765         if e.flag().contains(EdgeFlags::PARENT) {
766             if e.flag() & (EdgeFlags::FOLDER | EdgeFlags::DELETED) == EdgeFlags::FOLDER {
767                 alive = true;
768                 break;
769             }
770         } else if !e.flag().is_deleted() {
771             alive = true;
772             break;
773         }
774     }
775     if !alive {
776         debug!("is_rooted, not alive");
777         return Ok(true);
778     }
779     // Recycling ws.up_context and ws.parents as a stack and a
780     // "visited" hashset, respectively.
781     let stack = &mut ws.up_context;
782     stack.clear();
783     stack.push(v);
784     let visited = &mut ws.parents;
785     visited.clear();
786 
787     while let Some(to) = stack.pop() {
788         debug!("is_rooted, pop = {:?}", to);
789         if to.is_root() {
790             stack.clear();
791             for v in visited.drain() {
792                 ws.rooted.insert(v, true);
793             }
794             return Ok(true);
795         }
796         if !visited.insert(to) {
797             continue;
798         }
799         if let Some(&rooted) = ws.rooted.get(&to) {
800             if rooted {
801                 for v in visited.drain() {
802                     ws.rooted.insert(v, true);
803                 }
804                 return Ok(true);
805             } else {
806                 continue;
807             }
808         }
809         let f = EdgeFlags::PARENT | EdgeFlags::FOLDER;
810         for parent in iter_adjacent(
811             txn,
812             channel,
813             to,
814             f,
815             f | EdgeFlags::PSEUDO | EdgeFlags::BLOCK,
816         )? {
817             let parent = parent?;
818             debug!("is_rooted, parent = {:?}", parent);
819             stack.push(*txn.find_block_end(channel, parent.dest())?)
820         }
821     }
822     for v in visited.drain() {
823         ws.rooted.insert(v, false);
824     }
825     Ok(false)
826 }
827