1 //! Apply a change.
2 use crate::change::{Atom, Change, EdgeMap, NewVertex};
3 use crate::changestore::ChangeStore;
4 use crate::missing_context::*;
5 use crate::pristine::*;
6 use crate::record::InodeUpdate;
7 use crate::{HashMap, HashSet};
8 use thiserror::Error;
9 pub(crate) mod edge;
10 pub(crate) use edge::*;
11 mod vertex;
12 pub(crate) use vertex::*;
13
14 #[derive(Debug, Error)]
15 pub enum ApplyError<ChangestoreError: std::error::Error, TxnError: std::error::Error + 'static> {
16 #[error("Changestore error: {0}")]
17 Changestore(ChangestoreError),
18 #[error("Local change error: {err}")]
19 LocalChange {
20 #[from]
21 err: LocalApplyError<TxnError>,
22 },
23 }
24
25 #[derive(Debug, Error)]
26 pub enum LocalApplyError<TxnError: std::error::Error + 'static> {
27 #[error("Dependency missing: {:?}", hash)]
28 DependencyMissing { hash: crate::pristine::Hash },
29 #[error("Change already on channel: {:?}", hash)]
30 ChangeAlreadyOnChannel { hash: crate::pristine::Hash },
31 #[error("Transaction error: {0}")]
32 Txn(TxnError),
33 #[error("Block error: {:?}", block)]
34 Block { block: Position<ChangeId> },
35 #[error("Invalid change")]
36 InvalidChange,
37 }
38
39 impl<TxnError: std::error::Error> LocalApplyError<TxnError> {
from_missing(err: MissingError<TxnError>) -> Self40 fn from_missing(err: MissingError<TxnError>) -> Self {
41 match err {
42 MissingError::Txn(e) => LocalApplyError::Txn(e),
43 MissingError::Block(e) => e.into(),
44 MissingError::Inconsistent(_) => LocalApplyError::InvalidChange,
45 }
46 }
47 }
48
49 impl<T: std::error::Error> From<crate::pristine::InconsistentChange<T>> for LocalApplyError<T> {
from(err: crate::pristine::InconsistentChange<T>) -> Self50 fn from(err: crate::pristine::InconsistentChange<T>) -> Self {
51 match err {
52 InconsistentChange::Txn(e) => LocalApplyError::Txn(e),
53 _ => LocalApplyError::InvalidChange,
54 }
55 }
56 }
57
58 impl<T: std::error::Error> From<crate::pristine::TxnErr<T>> for LocalApplyError<T> {
from(err: crate::pristine::TxnErr<T>) -> Self59 fn from(err: crate::pristine::TxnErr<T>) -> Self {
60 LocalApplyError::Txn(err.0)
61 }
62 }
63
64 impl<C: std::error::Error, T: std::error::Error> From<crate::pristine::TxnErr<T>>
65 for ApplyError<C, T>
66 {
from(err: crate::pristine::TxnErr<T>) -> Self67 fn from(err: crate::pristine::TxnErr<T>) -> Self {
68 LocalApplyError::Txn(err.0).into()
69 }
70 }
71
72 impl<T: std::error::Error> From<crate::pristine::BlockError<T>> for LocalApplyError<T> {
from(err: crate::pristine::BlockError<T>) -> Self73 fn from(err: crate::pristine::BlockError<T>) -> Self {
74 match err {
75 BlockError::Txn(e) => LocalApplyError::Txn(e),
76 BlockError::Block { block } => LocalApplyError::Block { block },
77 }
78 }
79 }
80
81 /// Apply a change to a channel. This function does not update the
82 /// inodes/tree tables, i.e. the correspondence between the pristine
83 /// and the working copy. Therefore, this function must be used only
84 /// on remote changes, or on "bare" repositories.
apply_change_ws<T: MutTxnT, P: ChangeStore>( changes: &P, txn: &mut T, channel: &mut T::Channel, hash: &Hash, workspace: &mut Workspace, ) -> Result<(u64, Merkle), ApplyError<P::Error, T::GraphError>>85 pub fn apply_change_ws<T: MutTxnT, P: ChangeStore>(
86 changes: &P,
87 txn: &mut T,
88 channel: &mut T::Channel,
89 hash: &Hash,
90 workspace: &mut Workspace,
91 ) -> Result<(u64, Merkle), ApplyError<P::Error, T::GraphError>> {
92 debug!("apply_change {:?}", hash.to_base32());
93 workspace.clear();
94 let change = changes.get_change(&hash).map_err(ApplyError::Changestore)?;
95
96 for hash in change.dependencies.iter() {
97 if let Hash::None = hash {
98 continue;
99 }
100 if let Some(int) = txn.get_internal(&hash.into())? {
101 if txn.get_changeset(txn.changes(&channel), int)?.is_some() {
102 continue;
103 }
104 }
105 return Err((LocalApplyError::DependencyMissing { hash: *hash }).into());
106 }
107
108 let internal = if let Some(&p) = txn.get_internal(&hash.into())? {
109 p
110 } else {
111 let internal: ChangeId = make_changeid(txn, &hash)?;
112 register_change(txn, &internal, hash, &change)?;
113 internal
114 };
115 debug!("internal = {:?}", internal);
116 Ok(apply_change_to_channel(
117 txn, channel, internal, &hash, &change, workspace,
118 )?)
119 }
120
apply_change_rec_ws<T: TxnT + MutTxnT, P: ChangeStore>( changes: &P, txn: &mut T, channel: &mut T::Channel, hash: &Hash, workspace: &mut Workspace, deps_only: bool, ) -> Result<(), ApplyError<P::Error, T::GraphError>>121 pub fn apply_change_rec_ws<T: TxnT + MutTxnT, P: ChangeStore>(
122 changes: &P,
123 txn: &mut T,
124 channel: &mut T::Channel,
125 hash: &Hash,
126 workspace: &mut Workspace,
127 deps_only: bool,
128 ) -> Result<(), ApplyError<P::Error, T::GraphError>> {
129 debug!("apply_change {:?}", hash.to_base32());
130 workspace.clear();
131 let mut dep_stack = vec![(*hash, true, !deps_only)];
132 let mut visited = HashSet::default();
133 while let Some((hash, first, actually_apply)) = dep_stack.pop() {
134 let change = changes.get_change(&hash).map_err(ApplyError::Changestore)?;
135 let shash: SerializedHash = (&hash).into();
136 if first {
137 if !visited.insert(hash) {
138 continue;
139 }
140 if let Some(change_id) = txn.get_internal(&shash)? {
141 if txn
142 .get_changeset(txn.changes(&channel), change_id)?
143 .is_some()
144 {
145 continue;
146 }
147 }
148
149 dep_stack.push((hash, false, actually_apply));
150 for &hash in change.dependencies.iter() {
151 if let Hash::None = hash {
152 continue;
153 }
154 dep_stack.push((hash, true, true))
155 }
156 } else if actually_apply {
157 let applied = if let Some(int) = txn.get_internal(&shash)? {
158 txn.get_changeset(txn.changes(&channel), int)?.is_some()
159 } else {
160 false
161 };
162 if !applied {
163 let internal = if let Some(&p) = txn.get_internal(&shash)? {
164 p
165 } else {
166 let internal: ChangeId = make_changeid(txn, &hash)?;
167 register_change(txn, &internal, &hash, &change)?;
168 internal
169 };
170 debug!("internal = {:?}", internal);
171 workspace.clear();
172 apply_change_to_channel(txn, channel, internal, &hash, &change, workspace)?;
173 }
174 }
175 }
176 Ok(())
177 }
178
179 /// Same as [apply_change_ws], but allocates its own workspace.
apply_change<T: MutTxnT, P: ChangeStore>( changes: &P, txn: &mut T, channel: &mut T::Channel, hash: &Hash, ) -> Result<(u64, Merkle), ApplyError<P::Error, T::GraphError>>180 pub fn apply_change<T: MutTxnT, P: ChangeStore>(
181 changes: &P,
182 txn: &mut T,
183 channel: &mut T::Channel,
184 hash: &Hash,
185 ) -> Result<(u64, Merkle), ApplyError<P::Error, T::GraphError>> {
186 apply_change_ws(changes, txn, channel, hash, &mut Workspace::new())
187 }
188
189 /// Same as [apply_change], but with a wrapped `txn` and `channel`.
apply_change_arc<T: MutTxnT, P: ChangeStore>( changes: &P, txn: &ArcTxn<T>, channel: &ChannelRef<T>, hash: &Hash, ) -> Result<(u64, Merkle), ApplyError<P::Error, T::GraphError>>190 pub fn apply_change_arc<T: MutTxnT, P: ChangeStore>(
191 changes: &P,
192 txn: &ArcTxn<T>,
193 channel: &ChannelRef<T>,
194 hash: &Hash,
195 ) -> Result<(u64, Merkle), ApplyError<P::Error, T::GraphError>> {
196 apply_change_ws(
197 changes,
198 &mut *txn.write(),
199 &mut *channel.write(),
200 hash,
201 &mut Workspace::new(),
202 )
203 }
204
205 /// Same as [apply_change_ws], but allocates its own workspace.
apply_change_rec<T: MutTxnT, P: ChangeStore>( changes: &P, txn: &mut T, channel: &mut T::Channel, hash: &Hash, deps_only: bool, ) -> Result<(), ApplyError<P::Error, T::GraphError>>206 pub fn apply_change_rec<T: MutTxnT, P: ChangeStore>(
207 changes: &P,
208 txn: &mut T,
209 channel: &mut T::Channel,
210 hash: &Hash,
211 deps_only: bool,
212 ) -> Result<(), ApplyError<P::Error, T::GraphError>> {
213 apply_change_rec_ws(
214 changes,
215 txn,
216 channel,
217 hash,
218 &mut Workspace::new(),
219 deps_only,
220 )
221 }
222
apply_change_to_channel<T: ChannelMutTxnT>( txn: &mut T, channel: &mut T::Channel, change_id: ChangeId, hash: &Hash, change: &Change, ws: &mut Workspace, ) -> Result<(u64, Merkle), LocalApplyError<T::GraphError>>223 fn apply_change_to_channel<T: ChannelMutTxnT>(
224 txn: &mut T,
225 channel: &mut T::Channel,
226 change_id: ChangeId,
227 hash: &Hash,
228 change: &Change,
229 ws: &mut Workspace,
230 ) -> Result<(u64, Merkle), LocalApplyError<T::GraphError>> {
231 ws.assert_empty();
232 let n = txn.apply_counter(channel);
233 debug!("apply_change_to_channel {:?} {:?}", change_id, hash);
234 let merkle =
235 if let Some(m) = txn.put_changes(channel, change_id, txn.apply_counter(channel), hash)? {
236 m
237 } else {
238 return Err(LocalApplyError::ChangeAlreadyOnChannel { hash: *hash });
239 };
240 debug!("apply change to channel");
241 let now = std::time::Instant::now();
242 for change_ in change.changes.iter() {
243 debug!("Applying {:?} (1)", change_);
244 for change_ in change_.iter() {
245 match *change_ {
246 Atom::NewVertex(ref n) => {
247 put_newvertex(txn, T::graph_mut(channel), change, ws, change_id, n)?
248 }
249 Atom::EdgeMap(ref n) => {
250 for edge in n.edges.iter() {
251 if !edge.flag.contains(EdgeFlags::DELETED) {
252 put_newedge(
253 txn,
254 T::graph_mut(channel),
255 ws,
256 change_id,
257 n.inode,
258 edge,
259 |_, _| true,
260 |h| change.knows(h),
261 )?;
262 }
263 }
264 }
265 }
266 }
267 }
268 for change_ in change.changes.iter() {
269 debug!("Applying {:?} (2)", change_);
270 for change_ in change_.iter() {
271 if let Atom::EdgeMap(ref n) = *change_ {
272 for edge in n.edges.iter() {
273 if edge.flag.contains(EdgeFlags::DELETED) {
274 put_newedge(
275 txn,
276 T::graph_mut(channel),
277 ws,
278 change_id,
279 n.inode,
280 edge,
281 |_, _| true,
282 |h| change.knows(h),
283 )?;
284 }
285 }
286 }
287 }
288 }
289 crate::TIMERS.lock().unwrap().apply += now.elapsed();
290
291 clean_obsolete_pseudo_edges(txn, T::graph_mut(channel), ws, change_id)?;
292
293 info!("repairing missing contexts");
294 repair_missing_contexts(txn, T::graph_mut(channel), ws, change_id, change)?;
295 detect_folder_conflict_resolutions(
296 txn,
297 T::graph_mut(channel),
298 &mut ws.missing_context,
299 change_id,
300 change,
301 )
302 .map_err(LocalApplyError::from_missing)?;
303
304 repair_cyclic_paths(txn, T::graph_mut(channel), ws)?;
305 info!("done applying change");
306 Ok((n, merkle))
307 }
308
309 /// Apply a change created locally: serialize it, compute its hash, and
310 /// apply it. This function also registers changes in the filesystem
311 /// introduced by the change (file additions, deletions and moves), to
312 /// synchronise the pristine and the working copy after the
313 /// application.
apply_local_change_ws< T: ChannelMutTxnT + DepsMutTxnT<DepsError = <T as GraphTxnT>::GraphError> + TreeMutTxnT<TreeError = <T as GraphTxnT>::GraphError>, >( txn: &mut T, channel: &ChannelRef<T>, change: &Change, hash: &Hash, inode_updates: &HashMap<usize, InodeUpdate>, workspace: &mut Workspace, ) -> Result<(u64, Merkle), LocalApplyError<T::GraphError>>314 pub fn apply_local_change_ws<
315 T: ChannelMutTxnT
316 + DepsMutTxnT<DepsError = <T as GraphTxnT>::GraphError>
317 + TreeMutTxnT<TreeError = <T as GraphTxnT>::GraphError>,
318 >(
319 txn: &mut T,
320 channel: &ChannelRef<T>,
321 change: &Change,
322 hash: &Hash,
323 inode_updates: &HashMap<usize, InodeUpdate>,
324 workspace: &mut Workspace,
325 ) -> Result<(u64, Merkle), LocalApplyError<T::GraphError>> {
326 let mut channel = channel.write();
327 let internal: ChangeId = make_changeid(txn, hash)?;
328 debug!("make_changeid {:?} {:?}", hash, internal);
329
330 for hash in change.dependencies.iter() {
331 if let Hash::None = hash {
332 continue;
333 }
334 if let Some(int) = txn.get_internal(&hash.into())? {
335 if txn.get_changeset(txn.changes(&channel), int)?.is_some() {
336 continue;
337 }
338 }
339 return Err((LocalApplyError::DependencyMissing { hash: *hash }).into());
340 }
341
342 register_change(txn, &internal, hash, &change)?;
343 let n = apply_change_to_channel(txn, &mut channel, internal, &hash, &change, workspace)?;
344 for (_, update) in inode_updates.iter() {
345 info!("updating {:?}", update);
346 update_inode(txn, &channel, internal, update)?;
347 }
348 Ok(n)
349 }
350
351 /// Same as [apply_local_change_ws], but allocates its own workspace.
apply_local_change< T: ChannelMutTxnT + DepsMutTxnT<DepsError = <T as GraphTxnT>::GraphError> + TreeMutTxnT<TreeError = <T as GraphTxnT>::GraphError>, >( txn: &mut T, channel: &ChannelRef<T>, change: &Change, hash: &Hash, inode_updates: &HashMap<usize, InodeUpdate>, ) -> Result<(u64, Merkle), LocalApplyError<T::GraphError>>352 pub fn apply_local_change<
353 T: ChannelMutTxnT
354 + DepsMutTxnT<DepsError = <T as GraphTxnT>::GraphError>
355 + TreeMutTxnT<TreeError = <T as GraphTxnT>::GraphError>,
356 >(
357 txn: &mut T,
358 channel: &ChannelRef<T>,
359 change: &Change,
360 hash: &Hash,
361 inode_updates: &HashMap<usize, InodeUpdate>,
362 ) -> Result<(u64, Merkle), LocalApplyError<T::GraphError>> {
363 apply_local_change_ws(
364 txn,
365 channel,
366 change,
367 hash,
368 inode_updates,
369 &mut Workspace::new(),
370 )
371 }
372
update_inode<T: ChannelTxnT + TreeMutTxnT<TreeError = <T as GraphTxnT>::GraphError>>( txn: &mut T, channel: &T::Channel, internal: ChangeId, update: &InodeUpdate, ) -> Result<(), LocalApplyError<T::TreeError>>373 fn update_inode<T: ChannelTxnT + TreeMutTxnT<TreeError = <T as GraphTxnT>::GraphError>>(
374 txn: &mut T,
375 channel: &T::Channel,
376 internal: ChangeId,
377 update: &InodeUpdate,
378 ) -> Result<(), LocalApplyError<T::TreeError>> {
379 debug!("update_inode {:?}", update);
380 match *update {
381 InodeUpdate::Add { inode, pos, .. } => {
382 let vertex = Position {
383 change: internal,
384 pos,
385 };
386 if txn
387 .get_graph(txn.graph(channel), &vertex.inode_vertex(), None)?
388 .is_some()
389 {
390 debug!("Adding inodes: {:?} {:?}", inode, vertex);
391 put_inodes_with_rev(txn, &inode, &vertex)?;
392 } else {
393 debug!("Not adding inodes: {:?} {:?}", inode, vertex);
394 }
395 }
396 InodeUpdate::Deleted { inode } => {
397 if let Some(parent) = txn.get_revtree(&inode, None)?.map(|x| x.to_owned()) {
398 del_tree_with_rev(txn, &parent, &inode)?;
399 }
400 // Delete the directory, if it's there.
401 txn.del_tree(&OwnedPathId::inode(inode), Some(&inode))?;
402 if let Some(&vertex) = txn.get_inodes(&inode, None)? {
403 del_inodes_with_rev(txn, &inode, &vertex)?;
404 }
405 }
406 }
407 Ok(())
408 }
409
410 #[derive(Default)]
411 pub struct Workspace {
412 parents: HashSet<Vertex<ChangeId>>,
413 children: HashSet<Vertex<ChangeId>>,
414 pseudo: Vec<(Vertex<ChangeId>, SerializedEdge, Position<Option<Hash>>)>,
415 deleted_by: HashSet<ChangeId>,
416 up_context: Vec<Vertex<ChangeId>>,
417 down_context: Vec<Vertex<ChangeId>>,
418 pub(crate) missing_context: crate::missing_context::Workspace,
419 rooted: HashMap<Vertex<ChangeId>, bool>,
420 adjbuf: Vec<SerializedEdge>,
421 }
422
423 impl Workspace {
new() -> Self424 pub fn new() -> Self {
425 Self::default()
426 }
clear(&mut self)427 fn clear(&mut self) {
428 self.children.clear();
429 self.parents.clear();
430 self.pseudo.clear();
431 self.deleted_by.clear();
432 self.up_context.clear();
433 self.down_context.clear();
434 self.missing_context.clear();
435 self.rooted.clear();
436 self.adjbuf.clear();
437 }
assert_empty(&self)438 fn assert_empty(&self) {
439 assert!(self.children.is_empty());
440 assert!(self.parents.is_empty());
441 assert!(self.pseudo.is_empty());
442 assert!(self.deleted_by.is_empty());
443 assert!(self.up_context.is_empty());
444 assert!(self.down_context.is_empty());
445 self.missing_context.assert_empty();
446 assert!(self.rooted.is_empty());
447 assert!(self.adjbuf.is_empty());
448 }
449 }
450
clean_obsolete_pseudo_edges<T: GraphMutTxnT>( txn: &mut T, channel: &mut T::Graph, ws: &mut Workspace, change_id: ChangeId, ) -> Result<(), LocalApplyError<T::GraphError>>451 pub(crate) fn clean_obsolete_pseudo_edges<T: GraphMutTxnT>(
452 txn: &mut T,
453 channel: &mut T::Graph,
454 ws: &mut Workspace,
455 change_id: ChangeId,
456 ) -> Result<(), LocalApplyError<T::GraphError>> {
457 for (next_vertex, p, inode) in ws.pseudo.drain(..) {
458 let (a, b) = if p.flag().is_parent() {
459 if let Ok(&dest) = txn.find_block_end(channel, p.dest()) {
460 (dest, next_vertex)
461 } else {
462 continue;
463 }
464 } else if let Ok(&dest) = txn.find_block(channel, p.dest()) {
465 (next_vertex, dest)
466 } else {
467 continue;
468 };
469 let a_is_alive = is_alive(txn, channel, &a)?;
470 let b_is_alive = is_alive(txn, channel, &b)?;
471 if a_is_alive && b_is_alive {
472 continue;
473 }
474 debug!(
475 "Deleting {:?} {:?} {:?} {:?}",
476 a,
477 b,
478 p.introduced_by(),
479 p.flag()
480 );
481 del_graph_with_rev(
482 txn,
483 channel,
484 p.flag() - EdgeFlags::PARENT,
485 a,
486 b,
487 p.introduced_by(),
488 )?;
489 if a_is_alive {
490 debug!("repair down");
491 debug_assert!(!b_is_alive);
492 crate::missing_context::repair_missing_down_context(
493 txn,
494 channel,
495 &mut ws.missing_context,
496 inode,
497 b,
498 &[a],
499 )
500 .map_err(LocalApplyError::from_missing)?
501 } else if b_is_alive && !p.flag().is_folder() {
502 debug!("repair up");
503 crate::missing_context::repair_missing_up_context(
504 txn,
505 channel,
506 &mut ws.missing_context,
507 change_id,
508 inode,
509 a,
510 &[b],
511 )
512 .map_err(LocalApplyError::from_missing)?
513 }
514 }
515 Ok(())
516 }
517
repair_missing_contexts<T: GraphMutTxnT>( txn: &mut T, channel: &mut T::Graph, ws: &mut Workspace, change_id: ChangeId, change: &Change, ) -> Result<(), LocalApplyError<T::GraphError>>518 fn repair_missing_contexts<T: GraphMutTxnT>(
519 txn: &mut T,
520 channel: &mut T::Graph,
521 ws: &mut Workspace,
522 change_id: ChangeId,
523 change: &Change,
524 ) -> Result<(), LocalApplyError<T::GraphError>> {
525 let now = std::time::Instant::now();
526 crate::missing_context::repair_parents_of_deleted(txn, channel, &mut ws.missing_context)
527 .map_err(LocalApplyError::from_missing)?;
528 for atom in change.changes.iter().flat_map(|r| r.iter()) {
529 match atom {
530 Atom::NewVertex(ref n) if !n.flag.is_folder() => {
531 let vertex = Vertex {
532 change: change_id,
533 start: n.start,
534 end: n.end,
535 };
536 repair_new_vertex_context_up(txn, channel, ws, change_id, n, vertex)?;
537 repair_new_vertex_context_down(txn, channel, ws, change_id, n, vertex)?;
538 }
539 Atom::NewVertex(_) => {}
540 Atom::EdgeMap(ref n) => {
541 repair_edge_context(txn, channel, ws, change_id, change, n)?;
542 }
543 }
544 }
545 crate::missing_context::delete_pseudo_edges(txn, channel, &mut ws.missing_context)
546 .map_err(LocalApplyError::from_missing)?;
547 crate::TIMERS.lock().unwrap().repair_context += now.elapsed();
548 Ok(())
549 }
550
repair_new_vertex_context_up<T: GraphMutTxnT>( txn: &mut T, channel: &mut T::Graph, ws: &mut Workspace, change_id: ChangeId, n: &NewVertex<Option<Hash>>, vertex: Vertex<ChangeId>, ) -> Result<(), LocalApplyError<T::GraphError>>551 fn repair_new_vertex_context_up<T: GraphMutTxnT>(
552 txn: &mut T,
553 channel: &mut T::Graph,
554 ws: &mut Workspace,
555 change_id: ChangeId,
556 n: &NewVertex<Option<Hash>>,
557 vertex: Vertex<ChangeId>,
558 ) -> Result<(), LocalApplyError<T::GraphError>> {
559 for up in n.up_context.iter() {
560 let up = *txn.find_block_end(channel, internal_pos(txn, &up, change_id)?)?;
561 if !is_alive(txn, channel, &up)? {
562 debug!("repairing missing up context {:?} {:?}", up, vertex);
563 repair_missing_up_context(
564 txn,
565 channel,
566 &mut ws.missing_context,
567 change_id,
568 n.inode,
569 up,
570 &[vertex],
571 )
572 .map_err(LocalApplyError::from_missing)?
573 }
574 }
575 Ok(())
576 }
577
repair_new_vertex_context_down<T: GraphMutTxnT>( txn: &mut T, channel: &mut T::Graph, ws: &mut Workspace, change_id: ChangeId, n: &NewVertex<Option<Hash>>, vertex: Vertex<ChangeId>, ) -> Result<(), LocalApplyError<T::GraphError>>578 fn repair_new_vertex_context_down<T: GraphMutTxnT>(
579 txn: &mut T,
580 channel: &mut T::Graph,
581 ws: &mut Workspace,
582 change_id: ChangeId,
583 n: &NewVertex<Option<Hash>>,
584 vertex: Vertex<ChangeId>,
585 ) -> Result<(), LocalApplyError<T::GraphError>> {
586 debug!("repairing missing context for {:?}", vertex);
587 if n.flag.contains(EdgeFlags::FOLDER) {
588 return Ok(());
589 }
590 'outer: for down in n.down_context.iter() {
591 let down = *txn.find_block(channel, internal_pos(txn, &down, change_id)?)?;
592 for e in iter_adjacent(
593 txn,
594 channel,
595 down,
596 EdgeFlags::PARENT,
597 EdgeFlags::all() - EdgeFlags::DELETED,
598 )? {
599 let e = e?;
600 if e.introduced_by() != change_id {
601 continue 'outer;
602 }
603 }
604 debug!("repairing missing down context {:?} {:?}", down, vertex);
605 repair_missing_down_context(
606 txn,
607 channel,
608 &mut ws.missing_context,
609 n.inode,
610 down,
611 &[vertex],
612 )
613 .map_err(LocalApplyError::from_missing)?
614 }
615 Ok(())
616 }
617
repair_edge_context<T: GraphMutTxnT>( txn: &mut T, channel: &mut T::Graph, ws: &mut Workspace, change_id: ChangeId, change: &Change, n: &EdgeMap<Option<Hash>>, ) -> Result<(), LocalApplyError<T::GraphError>>618 fn repair_edge_context<T: GraphMutTxnT>(
619 txn: &mut T,
620 channel: &mut T::Graph,
621 ws: &mut Workspace,
622 change_id: ChangeId,
623 change: &Change,
624 n: &EdgeMap<Option<Hash>>,
625 ) -> Result<(), LocalApplyError<T::GraphError>> {
626 for e in n.edges.iter() {
627 assert!(!e.flag.contains(EdgeFlags::PARENT));
628 if e.flag.contains(EdgeFlags::DELETED) {
629 trace!("repairing context deleted {:?}", e);
630 repair_context_deleted(
631 txn,
632 channel,
633 &mut ws.missing_context,
634 n.inode,
635 change_id,
636 |h| change.knows(&h),
637 e,
638 )
639 .map_err(LocalApplyError::from_missing)?
640 } else {
641 trace!("repairing context nondeleted {:?}", e);
642 repair_context_nondeleted(txn, channel, &mut ws.missing_context, n.inode, change_id, e)
643 .map_err(LocalApplyError::from_missing)?
644 }
645 }
646 Ok(())
647 }
648
repair_cyclic_paths<T: GraphMutTxnT>( txn: &mut T, channel: &mut T::Graph, ws: &mut Workspace, ) -> Result<(), LocalApplyError<T::GraphError>>649 pub(crate) fn repair_cyclic_paths<T: GraphMutTxnT>(
650 txn: &mut T,
651 channel: &mut T::Graph,
652 ws: &mut Workspace,
653 ) -> Result<(), LocalApplyError<T::GraphError>> {
654 let now = std::time::Instant::now();
655 let mut files = std::mem::replace(&mut ws.missing_context.files, HashSet::default());
656 for file in files.drain() {
657 if file.is_empty() {
658 if !is_rooted(txn, channel, file, ws)? {
659 repair_edge(txn, channel, file, ws)?
660 }
661 } else {
662 let f0 = EdgeFlags::FOLDER;
663 let f1 = EdgeFlags::FOLDER | EdgeFlags::BLOCK | EdgeFlags::PSEUDO;
664 let mut iter = iter_adjacent(txn, channel, file, f0, f1)?;
665 if let Some(ee) = iter.next() {
666 let ee = ee?;
667 let dest = ee.dest().inode_vertex();
668 if !is_rooted(txn, channel, dest, ws)? {
669 repair_edge(txn, channel, dest, ws)?
670 }
671 }
672 }
673 }
674 ws.missing_context.files = files;
675 crate::TIMERS.lock().unwrap().check_cyclic_paths += now.elapsed();
676 Ok(())
677 }
678
repair_edge<T: GraphMutTxnT>( txn: &mut T, channel: &mut T::Graph, to0: Vertex<ChangeId>, ws: &mut Workspace, ) -> Result<(), LocalApplyError<T::GraphError>>679 fn repair_edge<T: GraphMutTxnT>(
680 txn: &mut T,
681 channel: &mut T::Graph,
682 to0: Vertex<ChangeId>,
683 ws: &mut Workspace,
684 ) -> Result<(), LocalApplyError<T::GraphError>> {
685 debug!("repair_edge {:?}", to0);
686 let mut stack = vec![(to0, true, true, true)];
687 ws.parents.clear();
688 while let Some((current, _, al, anc_al)) = stack.pop() {
689 if !ws.parents.insert(current) {
690 continue;
691 }
692 debug!("repair_cyclic {:?}", current);
693 if current != to0 {
694 stack.push((current, true, al, anc_al));
695 }
696 if current.is_root() {
697 debug!("root");
698 break;
699 }
700 if let Some(&true) = ws.rooted.get(¤t) {
701 debug!("rooted");
702 break;
703 }
704 let f = EdgeFlags::PARENT | EdgeFlags::FOLDER;
705 let len = stack.len();
706 for parent in iter_adjacent(txn, channel, current, f, EdgeFlags::all())? {
707 let parent = parent?;
708 if parent.flag().is_parent() {
709 let anc = txn.find_block_end(channel, parent.dest())?;
710 debug!("is_rooted, parent = {:?}", parent);
711 let al = if let Some(e) = iter_adjacent(
712 txn,
713 channel,
714 *anc,
715 f,
716 f | EdgeFlags::BLOCK | EdgeFlags::PSEUDO,
717 )?
718 .next()
719 {
720 e?;
721 true
722 } else {
723 false
724 };
725 debug!("al = {:?}, flag = {:?}", al, parent.flag());
726 stack.push((*anc, false, parent.flag().is_deleted(), al));
727 }
728 }
729 if stack.len() == len {
730 stack.pop();
731 } else {
732 (&mut stack[len..]).sort_unstable_by(|a, b| a.3.cmp(&b.3))
733 }
734 }
735 let mut current = to0;
736 for (next, on_path, del, _) in stack {
737 if on_path {
738 if del {
739 put_graph_with_rev(
740 txn,
741 channel,
742 EdgeFlags::FOLDER | EdgeFlags::PSEUDO,
743 next,
744 current,
745 ChangeId::ROOT,
746 )?;
747 }
748 current = next
749 }
750 }
751 ws.parents.clear();
752 Ok(())
753 }
754
is_rooted<T: GraphTxnT>( txn: &T, channel: &T::Graph, v: Vertex<ChangeId>, ws: &mut Workspace, ) -> Result<bool, LocalApplyError<T::GraphError>>755 fn is_rooted<T: GraphTxnT>(
756 txn: &T,
757 channel: &T::Graph,
758 v: Vertex<ChangeId>,
759 ws: &mut Workspace,
760 ) -> Result<bool, LocalApplyError<T::GraphError>> {
761 let mut alive = false;
762 assert!(v.is_empty());
763 for e in iter_adjacent(txn, channel, v, EdgeFlags::empty(), EdgeFlags::all())? {
764 let e = e?;
765 if e.flag().contains(EdgeFlags::PARENT) {
766 if e.flag() & (EdgeFlags::FOLDER | EdgeFlags::DELETED) == EdgeFlags::FOLDER {
767 alive = true;
768 break;
769 }
770 } else if !e.flag().is_deleted() {
771 alive = true;
772 break;
773 }
774 }
775 if !alive {
776 debug!("is_rooted, not alive");
777 return Ok(true);
778 }
779 // Recycling ws.up_context and ws.parents as a stack and a
780 // "visited" hashset, respectively.
781 let stack = &mut ws.up_context;
782 stack.clear();
783 stack.push(v);
784 let visited = &mut ws.parents;
785 visited.clear();
786
787 while let Some(to) = stack.pop() {
788 debug!("is_rooted, pop = {:?}", to);
789 if to.is_root() {
790 stack.clear();
791 for v in visited.drain() {
792 ws.rooted.insert(v, true);
793 }
794 return Ok(true);
795 }
796 if !visited.insert(to) {
797 continue;
798 }
799 if let Some(&rooted) = ws.rooted.get(&to) {
800 if rooted {
801 for v in visited.drain() {
802 ws.rooted.insert(v, true);
803 }
804 return Ok(true);
805 } else {
806 continue;
807 }
808 }
809 let f = EdgeFlags::PARENT | EdgeFlags::FOLDER;
810 for parent in iter_adjacent(
811 txn,
812 channel,
813 to,
814 f,
815 f | EdgeFlags::PSEUDO | EdgeFlags::BLOCK,
816 )? {
817 let parent = parent?;
818 debug!("is_rooted, parent = {:?}", parent);
819 stack.push(*txn.find_block_end(channel, parent.dest())?)
820 }
821 }
822 for v in visited.drain() {
823 ws.rooted.insert(v, false);
824 }
825 Ok(false)
826 }
827