1# Copyright 2002, 2003 Ben Escoto
2#
3# This file is part of rdiff-backup.
4#
5# rdiff-backup is free software; you can redistribute it and/or modify
6# under the terms of the GNU General Public License as published by the
7# Free Software Foundation; either version 2 of the License, or (at your
8# option) any later version.
9#
10# rdiff-backup is distributed in the hope that it will be useful, but
11# WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13# General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with rdiff-backup; if not, write to the Free Software
17# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18# 02110-1301, USA
19"""High level functions for mirroring and mirror+incrementing"""
20
21import errno
22from . import Globals, metadata, rorpiter, TempFile, Hardlink, robust, \
23    increment, rpath, log, selection, Time, Rdiff, statistics, iterfile, \
24    hash, longname
25
26
27def Mirror(src_rpath, dest_rpath):
28    """Turn dest_rpath into a copy of src_rpath"""
29    log.Log(
30        "Starting mirror %s to %s" % (src_rpath.get_safepath(),
31                                      dest_rpath.get_safepath()), 4)
32    SourceS = src_rpath.conn.backup.SourceStruct
33    DestS = dest_rpath.conn.backup.DestinationStruct
34
35    source_rpiter = SourceS.get_source_select()
36    DestS.set_rorp_cache(dest_rpath, source_rpiter, 0)
37    dest_sigiter = DestS.get_sigs(dest_rpath)
38    source_diffiter = SourceS.get_diffs(dest_sigiter)
39    DestS.patch(dest_rpath, source_diffiter)
40
41
42def Mirror_and_increment(src_rpath, dest_rpath, inc_rpath):
43    """Mirror + put increments in tree based at inc_rpath"""
44    log.Log(
45        "Starting increment operation %s to %s" % (src_rpath.get_safepath(),
46                                                   dest_rpath.get_safepath()),
47        4)
48    SourceS = src_rpath.conn.backup.SourceStruct
49    DestS = dest_rpath.conn.backup.DestinationStruct
50
51    source_rpiter = SourceS.get_source_select()
52    DestS.set_rorp_cache(dest_rpath, source_rpiter, 1)
53    dest_sigiter = DestS.get_sigs(dest_rpath)
54    source_diffiter = SourceS.get_diffs(dest_sigiter)
55    DestS.patch_and_increment(dest_rpath, source_diffiter, inc_rpath)
56
57
58class SourceStruct:
59    """Hold info used on source side when backing up"""
60    _source_select = None  # will be set to source Select iterator
61
62    @classmethod
63    def set_source_select(cls, rpath, tuplelist, *filelists):
64        """Initialize select object using tuplelist
65
66        Note that each list in filelists must each be passed as
67        separate arguments, so each is recognized as a file by the
68        connection.  Otherwise we will get an error because a list
69        containing files can't be pickled.
70
71        Also, cls._source_select needs to be cached so get_diffs below
72        can retrieve the necessary rps.
73
74        """
75        sel = selection.Select(rpath)
76        sel.ParseArgs(tuplelist, filelists)
77        sel_iter = sel.set_iter()
78        cache_size = Globals.pipeline_max_length * 3  # to and from+leeway
79        cls._source_select = rorpiter.CacheIndexable(sel_iter, cache_size)
80        Globals.set('select_mirror', sel_iter)
81
82    @classmethod
83    def get_source_select(cls):
84        """Return source select iterator, set by set_source_select"""
85        return cls._source_select
86
87    @classmethod
88    def get_diffs(cls, dest_sigiter):
89        """Return diffs of any files with signature in dest_sigiter"""
90        source_rps = cls._source_select
91        error_handler = robust.get_error_handler("ListError")
92
93        def attach_snapshot(diff_rorp, src_rp):
94            """Attach file of snapshot to diff_rorp, w/ error checking"""
95            fileobj = robust.check_common_error(
96                error_handler, rpath.RPath.open, (src_rp, "rb"))
97            if fileobj:
98                diff_rorp.setfile(hash.FileWrapper(fileobj))
99            else:
100                diff_rorp.zero()
101            diff_rorp.set_attached_filetype('snapshot')
102
103        def attach_diff(diff_rorp, src_rp, dest_sig):
104            """Attach file of diff to diff_rorp, w/ error checking"""
105            fileobj = robust.check_common_error(
106                error_handler, Rdiff.get_delta_sigrp_hash, (dest_sig, src_rp))
107            if fileobj:
108                diff_rorp.setfile(fileobj)
109                diff_rorp.set_attached_filetype('diff')
110            else:
111                diff_rorp.zero()
112                diff_rorp.set_attached_filetype('snapshot')
113
114        for dest_sig in dest_sigiter:
115            if dest_sig is iterfile.MiscIterFlushRepeat:
116                yield iterfile.MiscIterFlush  # Flush buffer when get_sigs does
117                continue
118            src_rp = (source_rps.get(dest_sig.index)
119                      or rpath.RORPath(dest_sig.index))
120            diff_rorp = src_rp.getRORPath()
121            if dest_sig.isflaglinked():
122                diff_rorp.flaglinked(dest_sig.get_link_flag())
123            elif src_rp.isreg():
124                reset_perms = False
125                if (Globals.process_uid != 0 and not src_rp.readable()
126                        and src_rp.isowner()):
127                    reset_perms = True
128                    src_rp.chmod(0o400 | src_rp.getperms())
129
130                if dest_sig.isreg():
131                    attach_diff(diff_rorp, src_rp, dest_sig)
132                else:
133                    attach_snapshot(diff_rorp, src_rp)
134
135                if reset_perms:
136                    src_rp.chmod(src_rp.getperms() & ~0o400)
137            else:
138                dest_sig.close_if_necessary()
139                diff_rorp.set_attached_filetype('snapshot')
140            yield diff_rorp
141
142
143class DestinationStruct:
144    """Hold info used by destination side when backing up"""
145
146    @classmethod
147    def get_dest_select(cls, rpath, use_metadata=1):
148        """
149
150        Return destination select rorpath iterator
151
152        If metadata file doesn't exist, select all files on
153        destination except rdiff-backup-data directory.
154
155        """
156
157        def get_iter_from_fs():
158            """Get the combined iterator from the filesystem"""
159            sel = selection.Select(rpath)
160            sel.parse_rbdir_exclude()
161            return sel.set_iter()
162
163        metadata.SetManager()
164        if use_metadata:
165            rorp_iter = metadata.ManagerObj.GetAtTime(Time.prevtime)
166            if rorp_iter:
167                return rorp_iter
168        return get_iter_from_fs()
169
170    @classmethod
171    def set_rorp_cache(cls, baserp, source_iter, for_increment):
172        """
173
174        Initialize cls.CCPP, the destination rorp cache
175
176        for_increment should be true if we are mirror+incrementing,
177        false if we are just mirroring.
178
179        """
180        dest_iter = cls.get_dest_select(baserp, for_increment)
181        collated = rorpiter.Collate2Iters(source_iter, dest_iter)
182        cls.CCPP = CacheCollatedPostProcess(
183            collated, Globals.pipeline_max_length * 4, baserp)
184        # pipeline len adds some leeway over just*3 (to and from and back)
185
186    @classmethod
187    def get_sigs(cls, dest_base_rpath):
188        """Yield signatures of any changed destination files
189
190        If we are backing up across a pipe, we must flush the pipeline
191        every so often so it doesn't get congested on destination end.
192
193        """
194        flush_threshold = Globals.pipeline_max_length - 2
195        num_rorps_seen = 0
196        for src_rorp, dest_rorp in cls.CCPP:
197            if (Globals.backup_reader is not Globals.backup_writer):
198                num_rorps_seen += 1
199                if (num_rorps_seen > flush_threshold):
200                    num_rorps_seen = 0
201                    yield iterfile.MiscIterFlushRepeat
202            if not (src_rorp and dest_rorp and src_rorp == dest_rorp
203                    and (not Globals.preserve_hardlinks
204                         or Hardlink.rorp_eq(src_rorp, dest_rorp))):
205
206                index = src_rorp and src_rorp.index or dest_rorp.index
207                sig = cls.get_one_sig(dest_base_rpath, index, src_rorp,
208                                      dest_rorp)
209                if sig:
210                    cls.CCPP.flag_changed(index)
211                    yield sig
212
213    @classmethod
214    def get_one_sig(cls, dest_base_rpath, index, src_rorp, dest_rorp):
215        """Return a signature given source and destination rorps"""
216        if (Globals.preserve_hardlinks and src_rorp
217                and Hardlink.islinked(src_rorp)):
218            dest_sig = rpath.RORPath(index)
219            dest_sig.flaglinked(Hardlink.get_link_index(src_rorp))
220        elif dest_rorp:
221            dest_sig = dest_rorp.getRORPath()
222            if dest_rorp.isreg():
223                dest_rp = longname.get_mirror_rp(dest_base_rpath, dest_rorp)
224                sig_fp = cls.get_one_sig_fp(dest_rp)
225                if sig_fp is None:
226                    return None
227                dest_sig.setfile(sig_fp)
228        else:
229            dest_sig = rpath.RORPath(index)
230        return dest_sig
231
232    @classmethod
233    def get_one_sig_fp(cls, dest_rp):
234        """Return a signature fp of given index, corresponding to reg file"""
235        if not dest_rp.isreg():
236            log.ErrorLog.write_if_open(
237                "UpdateError", dest_rp,
238                "File changed from regular file before signature")
239            return None
240        if (Globals.process_uid != 0 and not dest_rp.readable()
241                and dest_rp.isowner()):
242            # This branch can happen with root source and non-root
243            # destination.  Permissions are changed permanently, which
244            # should propagate to the diffs
245            dest_rp.chmod(0o400 | dest_rp.getperms())
246        try:
247            return Rdiff.get_signature(dest_rp)
248        except IOError as e:
249            if (e.errno == errno.EPERM or e.errno == errno.EACCES):
250                try:
251                    # Try chmod'ing anyway -- This can work on NFS and AFS
252                    # depending on the setup. We keep the if() statement
253                    # above for performance reasons.
254                    dest_rp.chmod(0o400 | dest_rp.getperms())
255                    return Rdiff.get_signature(dest_rp)
256                except (IOError, OSError):
257                    log.Log.FatalError(
258                        "Could not open %s for reading. Check "
259                        "permissions on file." % (dest_rp.path, ))
260            else:
261                raise
262
263    @classmethod
264    def patch(cls, dest_rpath, source_diffiter, start_index=()):
265        """Patch dest_rpath with an rorpiter of diffs"""
266        ITR = rorpiter.IterTreeReducer(PatchITRB, [dest_rpath, cls.CCPP])
267        for diff in rorpiter.FillInIter(source_diffiter, dest_rpath):
268            log.Log("Processing changed file %s" % diff.get_safeindexpath(), 5)
269            ITR(diff.index, diff)
270        ITR.Finish()
271        cls.CCPP.close()
272        dest_rpath.setdata()
273
274    @classmethod
275    def patch_and_increment(cls, dest_rpath, source_diffiter, inc_rpath):
276        """Patch dest_rpath with rorpiter of diffs and write increments"""
277        ITR = rorpiter.IterTreeReducer(IncrementITRB,
278                                       [dest_rpath, inc_rpath, cls.CCPP])
279        for diff in rorpiter.FillInIter(source_diffiter, dest_rpath):
280            log.Log("Processing changed file %s" % diff.get_safeindexpath(), 5)
281            ITR(diff.index, diff)
282        ITR.Finish()
283        cls.CCPP.close()
284        dest_rpath.setdata()
285
286
287class CacheCollatedPostProcess:
288    """
289
290    Cache a collated iter of (source_rorp, dest_rorp) pairs
291
292    This is necessary for three reasons:
293
294    1.  The patch function may need the original source_rorp or
295        dest_rp information, which is not present in the diff it
296        receives.
297
298    2.  The metadata must match what is stored in the destination
299        directory.  If there is an error, either we do not update the
300        dest directory for that file and the old metadata is used, or
301        the file is deleted on the other end..  Thus we cannot write
302        any metadata until we know the file has been processed
303        correctly.
304
305    3.  We may lack permissions on certain destination directories.
306        The permissions of these directories need to be relaxed before
307        we enter them to computer signatures, and then reset after we
308        are done patching everything inside them.
309
310    4.  We need some place to put hashes (like SHA1) after computing
311        them and before writing them to the metadata.
312
313    The class caches older source_rorps and dest_rps so the patch
314    function can retrieve them if necessary.  The patch function can
315    also update the processed correctly flag.  When an item falls out
316    of the cache, we assume it has been processed, and write the
317    metadata for it.
318
319    """
320
321    def __init__(self, collated_iter, cache_size, dest_root_rp):
322        """Initialize new CCWP."""
323        self.iter = collated_iter  # generates (source_rorp, dest_rorp) pairs
324        self.cache_size = cache_size
325        self.dest_root_rp = dest_root_rp
326
327        self.statfileobj = statistics.init_statfileobj()
328        if Globals.file_statistics:
329            statistics.FileStats.init()
330        self.metawriter = metadata.ManagerObj.GetWriter()
331
332        # the following should map indices to lists
333        # [source_rorp, dest_rorp, changed_flag, success_flag, increment]
334
335        # changed_flag should be true if the rorps are different, and
336
337        # success_flag should be 1 if dest_rorp has been successfully
338        # updated to source_rorp, and 2 if the destination file is
339        # deleted entirely.  They both default to false (0).
340
341        # increment holds the RPath of the increment file if one
342        # exists.  It is used to record file statistics.
343
344        self.cache_dict = {}
345        self.cache_indices = []
346
347        # Contains a list of pairs (destination_rps, permissions) to
348        # be used to reset the permissions of certain directories
349        # after we're finished with them
350        self.dir_perms_list = []
351
352        # Contains list of (index, (source_rorp, diff_rorp)) pairs for
353        # the parent directories of the last item in the cache.
354        self.parent_list = []
355
356    def __iter__(self):
357        return self
358
359    def __next__(self):
360        """Return next (source_rorp, dest_rorp) pair.  StopIteration passed"""
361        source_rorp, dest_rorp = next(self.iter)
362        self.pre_process(source_rorp, dest_rorp)
363        index = source_rorp and source_rorp.index or dest_rorp.index
364        self.cache_dict[index] = [source_rorp, dest_rorp, 0, 0, None]
365        self.cache_indices.append(index)
366
367        if len(self.cache_indices) > self.cache_size:
368            self.shorten_cache()
369        return source_rorp, dest_rorp
370
371    def pre_process(self, source_rorp, dest_rorp):
372        """Do initial processing on source_rorp and dest_rorp
373
374        It will not be clear whether source_rorp and dest_rorp have
375        errors at this point, so don't do anything which assumes they
376        will be backed up correctly.
377
378        """
379        if Globals.preserve_hardlinks and source_rorp:
380            Hardlink.add_rorp(source_rorp, dest_rorp)
381        if (dest_rorp and dest_rorp.isdir() and Globals.process_uid != 0
382                and dest_rorp.getperms() % 0o1000 < 0o700):
383            self.unreadable_dir_init(source_rorp, dest_rorp)
384
385    def unreadable_dir_init(self, source_rorp, dest_rorp):
386        """Initialize an unreadable dir.
387
388        Make it readable, and if necessary, store the old permissions
389        in self.dir_perms_list so the old perms can be restored.
390
391        """
392        dest_rp = self.dest_root_rp.new_index(dest_rorp.index)
393        dest_rp.chmod(0o700 | dest_rorp.getperms())
394        if source_rorp and source_rorp.isdir():
395            self.dir_perms_list.append((dest_rp, source_rorp.getperms()))
396
397    def shorten_cache(self):
398        """Remove one element from cache, possibly adding it to metadata"""
399        first_index = self.cache_indices[0]
400        del self.cache_indices[0]
401        try:
402            (old_source_rorp, old_dest_rorp, changed_flag, success_flag,
403             inc) = self.cache_dict[first_index]
404        except KeyError:  # probably caused by error in file system (dup)
405            log.Log(
406                "Warning index %s missing from CCPP cache" % (first_index, ),
407                2)
408            return
409        del self.cache_dict[first_index]
410        self.post_process(old_source_rorp, old_dest_rorp, changed_flag,
411                          success_flag, inc)
412        if self.dir_perms_list:
413            self.reset_dir_perms(first_index)
414        self.update_parent_list(first_index, old_source_rorp, old_dest_rorp)
415
416    def update_parent_list(self, index, src_rorp, dest_rorp):
417        """Update the parent cache with the recently expired main cache entry
418
419        This method keeps parent directories in the secondary parent
420        cache until all their children have expired from the main
421        cache.  This is necessary because we may realize we need a
422        parent directory's information after we have processed many
423        subfiles.
424
425        """
426        if not (src_rorp and src_rorp.isdir()
427                or dest_rorp and dest_rorp.isdir()):
428            return  # neither is directory
429        if not self.parent_list:
430            assert index == (), index
431        else:
432            last_parent_index = self.parent_list[-1][0]
433            lp_index, li = len(last_parent_index), len(index)
434            if li > lp_index:  # descended into previous parent
435                assert li == lp_index + 1, (index, last_parent_index)
436            else:  # In new directory
437                assert last_parent_index[:li - 1] == index[:-1], index
438                self.parent_list = self.parent_list[:li]
439        self.parent_list.append((index, (src_rorp, dest_rorp)))
440
441    def post_process(self, source_rorp, dest_rorp, changed, success, inc):
442        """Post process source_rorp and dest_rorp.
443
444        The point of this is to write statistics and metadata.
445
446        changed will be true if the files have changed.  success will
447        be true if the files have been successfully updated (this is
448        always false for un-changed files).
449
450        """
451        if Globals.preserve_hardlinks and source_rorp:
452            Hardlink.del_rorp(source_rorp)
453
454        if not changed or success:
455            if source_rorp:
456                self.statfileobj.add_source_file(source_rorp)
457            if dest_rorp:
458                self.statfileobj.add_dest_file(dest_rorp)
459        if success == 0:
460            metadata_rorp = dest_rorp
461        elif success == 1:
462            metadata_rorp = source_rorp
463        else:
464            metadata_rorp = None  # in case deleted because of ListError
465        if success == 1 or success == 2:
466            self.statfileobj.add_changed(source_rorp, dest_rorp)
467
468        if metadata_rorp and metadata_rorp.lstat():
469            self.metawriter.write_object(metadata_rorp)
470        if Globals.file_statistics:
471            statistics.FileStats.update(source_rorp, dest_rorp, changed, inc)
472
473    def reset_dir_perms(self, current_index):
474        """Reset the permissions of directories when we have left them"""
475        dir_rp, perms = self.dir_perms_list[-1]
476        dir_index = dir_rp.index
477        if (current_index > dir_index
478                and current_index[:len(dir_index)] != dir_index):
479            dir_rp.chmod(perms)  # out of directory, reset perms now
480
481    def in_cache(self, index):
482        """Return true if given index is cached"""
483        return index in self.cache_dict
484
485    def flag_success(self, index):
486        """Signal that the file with given index was updated successfully"""
487        self.cache_dict[index][3] = 1
488
489    def flag_deleted(self, index):
490        """Signal that the destination file was deleted"""
491        self.cache_dict[index][3] = 2
492
493    def flag_changed(self, index):
494        """Signal that the file with given index has changed"""
495        self.cache_dict[index][2] = 1
496
497    def set_inc(self, index, inc):
498        """Set the increment of the current file"""
499        self.cache_dict[index][4] = inc
500
501    def get_parent_rorps(self, index):
502        """Retrieve (src_rorp, dest_rorp) pair from parent cache"""
503        for parent_index, pair in self.parent_list:
504            if parent_index == index:
505                return pair
506        raise KeyError(index)
507
508    def get_rorps(self, index):
509        """Retrieve (source_rorp, dest_rorp) from cache"""
510        try:
511            return self.cache_dict[index][:2]
512        except KeyError:
513            return self.get_parent_rorps(index)
514
515    def get_source_rorp(self, index):
516        """Retrieve source_rorp with given index from cache"""
517        assert index >= self.cache_indices[0], \
518            ("CCPP index out of order: %s %s" %
519                (repr(index), repr(self.cache_indices[0])))
520        try:
521            return self.cache_dict[index][0]
522        except KeyError:
523            return self.get_parent_rorps(index)[0]
524
525    def get_mirror_rorp(self, index):
526        """Retrieve mirror_rorp with given index from cache"""
527        try:
528            return self.cache_dict[index][1]
529        except KeyError:
530            return self.get_parent_rorps(index)[1]
531
532    def update_hash(self, index, sha1sum):
533        """Update the source rorp's SHA1 hash"""
534        self.get_source_rorp(index).set_sha1(sha1sum)
535
536    def update_hardlink_hash(self, diff_rorp):
537        """Tag associated source_rorp with same hash diff_rorp points to"""
538        sha1sum = Hardlink.get_sha1(diff_rorp)
539        if not sha1sum:
540            return
541        source_rorp = self.get_source_rorp(diff_rorp.index)
542        source_rorp.set_sha1(sha1sum)
543
544    def close(self):
545        """Process the remaining elements in the cache"""
546        while self.cache_indices:
547            self.shorten_cache()
548        while self.dir_perms_list:
549            dir_rp, perms = self.dir_perms_list.pop()
550            dir_rp.chmod(perms)
551        self.metawriter.close()
552        metadata.ManagerObj.ConvertMetaToDiff()
553
554
555class PatchITRB(rorpiter.ITRBranch):
556    """Patch an rpath with the given diff iters (use with IterTreeReducer)
557
558    The main complication here involves directories.  We have to
559    finish processing the directory after what's in the directory, as
560    the directory may have inappropriate permissions to alter the
561    contents or the dir's mtime could change as we change the
562    contents.
563
564    """
565
566    def __init__(self, basis_root_rp, CCPP):
567        """Set basis_root_rp, the base of the tree to be incremented"""
568        self.basis_root_rp = basis_root_rp
569        assert basis_root_rp.conn is Globals.local_connection
570        self.statfileobj = (statistics.get_active_statfileobj()
571                            or statistics.StatFileObj())
572        self.dir_replacement, self.dir_update = None, None
573        self.CCPP = CCPP
574        self.error_handler = robust.get_error_handler("UpdateError")
575
576    def can_fast_process(self, index, diff_rorp):
577        """True if diff_rorp and mirror are not directories"""
578        mirror_rorp = self.CCPP.get_mirror_rorp(index)
579        return not (diff_rorp.isdir() or (mirror_rorp and mirror_rorp.isdir()))
580
581    def fast_process(self, index, diff_rorp):
582        """Patch base_rp with diff_rorp (case where neither is directory)"""
583        mirror_rp, discard = longname.get_mirror_inc_rps(
584            self.CCPP.get_rorps(index), self.basis_root_rp)
585        assert not mirror_rp.isdir(), mirror_rp
586        tf = TempFile.new(mirror_rp)
587        if self.patch_to_temp(mirror_rp, diff_rorp, tf):
588            if tf.lstat():
589                if robust.check_common_error(self.error_handler, rpath.rename,
590                                             (tf, mirror_rp)) is None:
591                    self.CCPP.flag_success(index)
592                else:
593                    tf.delete()
594            elif mirror_rp and mirror_rp.lstat():
595                mirror_rp.delete()
596                self.CCPP.flag_deleted(index)
597        else:
598            tf.setdata()
599            if tf.lstat():
600                tf.delete()
601
602    def patch_to_temp(self, basis_rp, diff_rorp, new):
603        """Patch basis_rp, writing output in new, which doesn't exist yet
604
605        Returns true if able to write new as desired, false if
606        UpdateError or similar gets in the way.
607
608        """
609        if diff_rorp.isflaglinked():
610            self.patch_hardlink_to_temp(diff_rorp, new)
611        elif diff_rorp.get_attached_filetype() == 'snapshot':
612            result = self.patch_snapshot_to_temp(diff_rorp, new)
613            if not result:
614                return 0
615            elif result == 2:
616                return 1  # SpecialFile
617        elif not self.patch_diff_to_temp(basis_rp, diff_rorp, new):
618            return 0
619        if new.lstat():
620            if diff_rorp.isflaglinked():
621                if Globals.eas_write:
622                    """ `isflaglinked() == True` implies that we are processing
623                    the 2nd (or later) file in a group of files linked to an
624                    inode.  As such, we don't need to perform the usual
625                    `copy_attribs(diff_rorp, new)` for the inode because that
626                    was already done when the 1st file in the group was
627                    processed.  Nonetheless, we still must perform the following
628                    task (which would have normally been performed by
629                    `copy_attribs()`).  Otherwise, the subsequent call to
630                    `matches_cached_rorp(diff_rorp, new)` will fail because the
631                    new rorp's metadata would be missing the extended attribute
632                    data.
633                    """
634                    new.data['ea'] = diff_rorp.get_ea()
635            else:
636                rpath.copy_attribs(diff_rorp, new)
637        return self.matches_cached_rorp(diff_rorp, new)
638
639    def patch_hardlink_to_temp(self, diff_rorp, new):
640        """Hardlink diff_rorp to temp, update hash if necessary"""
641        Hardlink.link_rp(diff_rorp, new, self.basis_root_rp)
642        self.CCPP.update_hardlink_hash(diff_rorp)
643
644    def patch_snapshot_to_temp(self, diff_rorp, new):
645        """Write diff_rorp to new, return true if successful
646
647        Returns 1 if normal success, 2 if special file is written,
648        whether or not it is successful.  This is because special
649        files either fail with a SpecialFileError, or don't need to be
650        compared.
651
652        """
653        if diff_rorp.isspecial():
654            self.write_special(diff_rorp, new)
655            rpath.copy_attribs(diff_rorp, new)
656            return 2
657
658        report = robust.check_common_error(self.error_handler, rpath.copy,
659                                           (diff_rorp, new))
660        if isinstance(report, hash.Report):
661            self.CCPP.update_hash(diff_rorp.index, report.sha1_digest)
662            return 1
663        return report != 0  # if == 0, error_handler caught something
664
665    def patch_diff_to_temp(self, basis_rp, diff_rorp, new):
666        """Apply diff_rorp to basis_rp, write output in new"""
667        assert diff_rorp.get_attached_filetype() == 'diff'
668        report = robust.check_common_error(
669            self.error_handler, Rdiff.patch_local, (basis_rp, diff_rorp, new))
670        if isinstance(report, hash.Report):
671            self.CCPP.update_hash(diff_rorp.index, report.sha1_digest)
672            return 1
673        return report != 0  # if report == 0, error
674
675    def matches_cached_rorp(self, diff_rorp, new_rp):
676        """Return true if new_rp matches cached src rorp
677
678        This is a final check to make sure the temp file just written
679        matches the stats which we got earlier.  If it doesn't it
680        could confuse the regress operation.  This is only necessary
681        for regular files.
682
683        """
684        if not new_rp.isreg():
685            return 1
686        cached_rorp = self.CCPP.get_source_rorp(diff_rorp.index)
687        if cached_rorp and cached_rorp.equal_loose(new_rp):
688            return 1
689        log.ErrorLog.write_if_open(
690            "UpdateError", diff_rorp, "Updated mirror "
691            "temp file '%s' does not match source" % new_rp.get_safepath())
692        return 0
693
694    def write_special(self, diff_rorp, new):
695        """Write diff_rorp (which holds special file) to new"""
696        eh = robust.get_error_handler("SpecialFileError")
697        if robust.check_common_error(eh, rpath.copy, (diff_rorp, new)) == 0:
698            new.setdata()
699            if new.lstat():
700                new.delete()
701            new.touch()
702
703    def start_process(self, index, diff_rorp):
704        """Start processing directory - record information for later"""
705        self.base_rp, discard = longname.get_mirror_inc_rps(
706            self.CCPP.get_rorps(index), self.basis_root_rp)
707        if diff_rorp.isdir():
708            self.prepare_dir(diff_rorp, self.base_rp)
709        elif self.set_dir_replacement(diff_rorp, self.base_rp):
710            if diff_rorp.lstat():
711                self.CCPP.flag_success(index)
712            else:
713                self.CCPP.flag_deleted(index)
714
715    def set_dir_replacement(self, diff_rorp, base_rp):
716        """Set self.dir_replacement, which holds data until done with dir
717
718        This is used when base_rp is a dir, and diff_rorp is not.
719        Returns 1 for success or 0 for failure
720
721        """
722        assert diff_rorp.get_attached_filetype() == 'snapshot'
723        self.dir_replacement = TempFile.new(base_rp)
724        if not self.patch_to_temp(None, diff_rorp, self.dir_replacement):
725            if self.dir_replacement.lstat():
726                self.dir_replacement.delete()
727            # Was an error, so now restore original directory
728            rpath.copy_with_attribs(
729                self.CCPP.get_mirror_rorp(diff_rorp.index),
730                self.dir_replacement)
731            return 0
732        else:
733            return 1
734
735    def prepare_dir(self, diff_rorp, base_rp):
736        """Prepare base_rp to be a directory"""
737        self.dir_update = diff_rorp.getRORPath()  # make copy in case changes
738        if not base_rp.isdir():
739            if base_rp.lstat():
740                self.base_rp.delete()
741            base_rp.setdata()
742            base_rp.mkdir()
743            self.CCPP.flag_success(diff_rorp.index)
744        else:  # maybe no change, so query CCPP before tagging success
745            if self.CCPP.in_cache(diff_rorp.index):
746                self.CCPP.flag_success(diff_rorp.index)
747
748    def end_process(self):
749        """Finish processing directory"""
750        if self.dir_update:
751            assert self.base_rp.isdir()
752            rpath.copy_attribs(self.dir_update, self.base_rp)
753
754            if (Globals.process_uid != 0
755                    and self.dir_update.getperms() % 0o1000 < 0o700):
756                # Directory was unreadable at start -- keep it readable
757                # until the end of the backup process.
758                self.base_rp.chmod(0o700 | self.dir_update.getperms())
759        elif self.dir_replacement:
760            self.base_rp.rmdir()
761            if self.dir_replacement.lstat():
762                rpath.rename(self.dir_replacement, self.base_rp)
763
764
765class IncrementITRB(PatchITRB):
766    """Patch an rpath with the given diff iters and write increments
767
768    Like PatchITRB, but this time also write increments.
769
770    """
771
772    def __init__(self, basis_root_rp, inc_root_rp, rorp_cache):
773        self.inc_root_rp = inc_root_rp
774        PatchITRB.__init__(self, basis_root_rp, rorp_cache)
775
776    def fast_process(self, index, diff_rorp):
777        """Patch base_rp with diff_rorp and write increment (neither is dir)"""
778        mirror_rp, inc_prefix = longname.get_mirror_inc_rps(
779            self.CCPP.get_rorps(index), self.basis_root_rp, self.inc_root_rp)
780        tf = TempFile.new(mirror_rp)
781        if self.patch_to_temp(mirror_rp, diff_rorp, tf):
782            inc = robust.check_common_error(self.error_handler,
783                                            increment.Increment,
784                                            (tf, mirror_rp, inc_prefix))
785            if inc is not None and not isinstance(inc, int):
786                self.CCPP.set_inc(index, inc)
787                if inc.isreg():
788                    inc.fsync_with_dir()  # Write inc before rp changed
789                if tf.lstat():
790                    if robust.check_common_error(self.error_handler,
791                                                 rpath.rename,
792                                                 (tf, mirror_rp)) is None:
793                        self.CCPP.flag_success(index)
794                    else:
795                        tf.delete()
796                elif mirror_rp.lstat():
797                    mirror_rp.delete()
798                    self.CCPP.flag_deleted(index)
799                return  # normal return, otherwise error occurred
800        tf.setdata()
801        if tf.lstat():
802            tf.delete()
803
804    def start_process(self, index, diff_rorp):
805        """Start processing directory"""
806        self.base_rp, inc_prefix = longname.get_mirror_inc_rps(
807            self.CCPP.get_rorps(index), self.basis_root_rp, self.inc_root_rp)
808        self.base_rp.setdata()
809        assert diff_rorp.isdir() or self.base_rp.isdir(), \
810            ("Either %s or %s must be a directory" % (repr(diff_rorp.get_safeindexpath()),
811             repr(self.base_rp.get_safepath())))
812        if diff_rorp.isdir():
813            inc = increment.Increment(diff_rorp, self.base_rp, inc_prefix)
814            if inc and inc.isreg():
815                inc.fsync_with_dir()  # must write inc before rp changed
816            self.base_rp.setdata()  # in case written by increment above
817            self.prepare_dir(diff_rorp, self.base_rp)
818        elif self.set_dir_replacement(diff_rorp, self.base_rp):
819            inc = increment.Increment(self.dir_replacement, self.base_rp,
820                                      inc_prefix)
821            if inc:
822                self.CCPP.set_inc(index, inc)
823                self.CCPP.flag_success(index)
824