1# Copyright 2002, 2003 Ben Escoto 2# 3# This file is part of rdiff-backup. 4# 5# rdiff-backup is free software; you can redistribute it and/or modify 6# under the terms of the GNU General Public License as published by the 7# Free Software Foundation; either version 2 of the License, or (at your 8# option) any later version. 9# 10# rdiff-backup is distributed in the hope that it will be useful, but 11# WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13# General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with rdiff-backup; if not, write to the Free Software 17# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 18# 02110-1301, USA 19"""High level functions for mirroring and mirror+incrementing""" 20 21import errno 22from . import Globals, metadata, rorpiter, TempFile, Hardlink, robust, \ 23 increment, rpath, log, selection, Time, Rdiff, statistics, iterfile, \ 24 hash, longname 25 26 27def Mirror(src_rpath, dest_rpath): 28 """Turn dest_rpath into a copy of src_rpath""" 29 log.Log( 30 "Starting mirror %s to %s" % (src_rpath.get_safepath(), 31 dest_rpath.get_safepath()), 4) 32 SourceS = src_rpath.conn.backup.SourceStruct 33 DestS = dest_rpath.conn.backup.DestinationStruct 34 35 source_rpiter = SourceS.get_source_select() 36 DestS.set_rorp_cache(dest_rpath, source_rpiter, 0) 37 dest_sigiter = DestS.get_sigs(dest_rpath) 38 source_diffiter = SourceS.get_diffs(dest_sigiter) 39 DestS.patch(dest_rpath, source_diffiter) 40 41 42def Mirror_and_increment(src_rpath, dest_rpath, inc_rpath): 43 """Mirror + put increments in tree based at inc_rpath""" 44 log.Log( 45 "Starting increment operation %s to %s" % (src_rpath.get_safepath(), 46 dest_rpath.get_safepath()), 47 4) 48 SourceS = src_rpath.conn.backup.SourceStruct 49 DestS = dest_rpath.conn.backup.DestinationStruct 50 51 source_rpiter = SourceS.get_source_select() 52 DestS.set_rorp_cache(dest_rpath, source_rpiter, 1) 53 dest_sigiter = DestS.get_sigs(dest_rpath) 54 source_diffiter = SourceS.get_diffs(dest_sigiter) 55 DestS.patch_and_increment(dest_rpath, source_diffiter, inc_rpath) 56 57 58class SourceStruct: 59 """Hold info used on source side when backing up""" 60 _source_select = None # will be set to source Select iterator 61 62 @classmethod 63 def set_source_select(cls, rpath, tuplelist, *filelists): 64 """Initialize select object using tuplelist 65 66 Note that each list in filelists must each be passed as 67 separate arguments, so each is recognized as a file by the 68 connection. Otherwise we will get an error because a list 69 containing files can't be pickled. 70 71 Also, cls._source_select needs to be cached so get_diffs below 72 can retrieve the necessary rps. 73 74 """ 75 sel = selection.Select(rpath) 76 sel.ParseArgs(tuplelist, filelists) 77 sel_iter = sel.set_iter() 78 cache_size = Globals.pipeline_max_length * 3 # to and from+leeway 79 cls._source_select = rorpiter.CacheIndexable(sel_iter, cache_size) 80 Globals.set('select_mirror', sel_iter) 81 82 @classmethod 83 def get_source_select(cls): 84 """Return source select iterator, set by set_source_select""" 85 return cls._source_select 86 87 @classmethod 88 def get_diffs(cls, dest_sigiter): 89 """Return diffs of any files with signature in dest_sigiter""" 90 source_rps = cls._source_select 91 error_handler = robust.get_error_handler("ListError") 92 93 def attach_snapshot(diff_rorp, src_rp): 94 """Attach file of snapshot to diff_rorp, w/ error checking""" 95 fileobj = robust.check_common_error( 96 error_handler, rpath.RPath.open, (src_rp, "rb")) 97 if fileobj: 98 diff_rorp.setfile(hash.FileWrapper(fileobj)) 99 else: 100 diff_rorp.zero() 101 diff_rorp.set_attached_filetype('snapshot') 102 103 def attach_diff(diff_rorp, src_rp, dest_sig): 104 """Attach file of diff to diff_rorp, w/ error checking""" 105 fileobj = robust.check_common_error( 106 error_handler, Rdiff.get_delta_sigrp_hash, (dest_sig, src_rp)) 107 if fileobj: 108 diff_rorp.setfile(fileobj) 109 diff_rorp.set_attached_filetype('diff') 110 else: 111 diff_rorp.zero() 112 diff_rorp.set_attached_filetype('snapshot') 113 114 for dest_sig in dest_sigiter: 115 if dest_sig is iterfile.MiscIterFlushRepeat: 116 yield iterfile.MiscIterFlush # Flush buffer when get_sigs does 117 continue 118 src_rp = (source_rps.get(dest_sig.index) 119 or rpath.RORPath(dest_sig.index)) 120 diff_rorp = src_rp.getRORPath() 121 if dest_sig.isflaglinked(): 122 diff_rorp.flaglinked(dest_sig.get_link_flag()) 123 elif src_rp.isreg(): 124 reset_perms = False 125 if (Globals.process_uid != 0 and not src_rp.readable() 126 and src_rp.isowner()): 127 reset_perms = True 128 src_rp.chmod(0o400 | src_rp.getperms()) 129 130 if dest_sig.isreg(): 131 attach_diff(diff_rorp, src_rp, dest_sig) 132 else: 133 attach_snapshot(diff_rorp, src_rp) 134 135 if reset_perms: 136 src_rp.chmod(src_rp.getperms() & ~0o400) 137 else: 138 dest_sig.close_if_necessary() 139 diff_rorp.set_attached_filetype('snapshot') 140 yield diff_rorp 141 142 143class DestinationStruct: 144 """Hold info used by destination side when backing up""" 145 146 @classmethod 147 def get_dest_select(cls, rpath, use_metadata=1): 148 """ 149 150 Return destination select rorpath iterator 151 152 If metadata file doesn't exist, select all files on 153 destination except rdiff-backup-data directory. 154 155 """ 156 157 def get_iter_from_fs(): 158 """Get the combined iterator from the filesystem""" 159 sel = selection.Select(rpath) 160 sel.parse_rbdir_exclude() 161 return sel.set_iter() 162 163 metadata.SetManager() 164 if use_metadata: 165 rorp_iter = metadata.ManagerObj.GetAtTime(Time.prevtime) 166 if rorp_iter: 167 return rorp_iter 168 return get_iter_from_fs() 169 170 @classmethod 171 def set_rorp_cache(cls, baserp, source_iter, for_increment): 172 """ 173 174 Initialize cls.CCPP, the destination rorp cache 175 176 for_increment should be true if we are mirror+incrementing, 177 false if we are just mirroring. 178 179 """ 180 dest_iter = cls.get_dest_select(baserp, for_increment) 181 collated = rorpiter.Collate2Iters(source_iter, dest_iter) 182 cls.CCPP = CacheCollatedPostProcess( 183 collated, Globals.pipeline_max_length * 4, baserp) 184 # pipeline len adds some leeway over just*3 (to and from and back) 185 186 @classmethod 187 def get_sigs(cls, dest_base_rpath): 188 """Yield signatures of any changed destination files 189 190 If we are backing up across a pipe, we must flush the pipeline 191 every so often so it doesn't get congested on destination end. 192 193 """ 194 flush_threshold = Globals.pipeline_max_length - 2 195 num_rorps_seen = 0 196 for src_rorp, dest_rorp in cls.CCPP: 197 if (Globals.backup_reader is not Globals.backup_writer): 198 num_rorps_seen += 1 199 if (num_rorps_seen > flush_threshold): 200 num_rorps_seen = 0 201 yield iterfile.MiscIterFlushRepeat 202 if not (src_rorp and dest_rorp and src_rorp == dest_rorp 203 and (not Globals.preserve_hardlinks 204 or Hardlink.rorp_eq(src_rorp, dest_rorp))): 205 206 index = src_rorp and src_rorp.index or dest_rorp.index 207 sig = cls.get_one_sig(dest_base_rpath, index, src_rorp, 208 dest_rorp) 209 if sig: 210 cls.CCPP.flag_changed(index) 211 yield sig 212 213 @classmethod 214 def get_one_sig(cls, dest_base_rpath, index, src_rorp, dest_rorp): 215 """Return a signature given source and destination rorps""" 216 if (Globals.preserve_hardlinks and src_rorp 217 and Hardlink.islinked(src_rorp)): 218 dest_sig = rpath.RORPath(index) 219 dest_sig.flaglinked(Hardlink.get_link_index(src_rorp)) 220 elif dest_rorp: 221 dest_sig = dest_rorp.getRORPath() 222 if dest_rorp.isreg(): 223 dest_rp = longname.get_mirror_rp(dest_base_rpath, dest_rorp) 224 sig_fp = cls.get_one_sig_fp(dest_rp) 225 if sig_fp is None: 226 return None 227 dest_sig.setfile(sig_fp) 228 else: 229 dest_sig = rpath.RORPath(index) 230 return dest_sig 231 232 @classmethod 233 def get_one_sig_fp(cls, dest_rp): 234 """Return a signature fp of given index, corresponding to reg file""" 235 if not dest_rp.isreg(): 236 log.ErrorLog.write_if_open( 237 "UpdateError", dest_rp, 238 "File changed from regular file before signature") 239 return None 240 if (Globals.process_uid != 0 and not dest_rp.readable() 241 and dest_rp.isowner()): 242 # This branch can happen with root source and non-root 243 # destination. Permissions are changed permanently, which 244 # should propagate to the diffs 245 dest_rp.chmod(0o400 | dest_rp.getperms()) 246 try: 247 return Rdiff.get_signature(dest_rp) 248 except IOError as e: 249 if (e.errno == errno.EPERM or e.errno == errno.EACCES): 250 try: 251 # Try chmod'ing anyway -- This can work on NFS and AFS 252 # depending on the setup. We keep the if() statement 253 # above for performance reasons. 254 dest_rp.chmod(0o400 | dest_rp.getperms()) 255 return Rdiff.get_signature(dest_rp) 256 except (IOError, OSError): 257 log.Log.FatalError( 258 "Could not open %s for reading. Check " 259 "permissions on file." % (dest_rp.path, )) 260 else: 261 raise 262 263 @classmethod 264 def patch(cls, dest_rpath, source_diffiter, start_index=()): 265 """Patch dest_rpath with an rorpiter of diffs""" 266 ITR = rorpiter.IterTreeReducer(PatchITRB, [dest_rpath, cls.CCPP]) 267 for diff in rorpiter.FillInIter(source_diffiter, dest_rpath): 268 log.Log("Processing changed file %s" % diff.get_safeindexpath(), 5) 269 ITR(diff.index, diff) 270 ITR.Finish() 271 cls.CCPP.close() 272 dest_rpath.setdata() 273 274 @classmethod 275 def patch_and_increment(cls, dest_rpath, source_diffiter, inc_rpath): 276 """Patch dest_rpath with rorpiter of diffs and write increments""" 277 ITR = rorpiter.IterTreeReducer(IncrementITRB, 278 [dest_rpath, inc_rpath, cls.CCPP]) 279 for diff in rorpiter.FillInIter(source_diffiter, dest_rpath): 280 log.Log("Processing changed file %s" % diff.get_safeindexpath(), 5) 281 ITR(diff.index, diff) 282 ITR.Finish() 283 cls.CCPP.close() 284 dest_rpath.setdata() 285 286 287class CacheCollatedPostProcess: 288 """ 289 290 Cache a collated iter of (source_rorp, dest_rorp) pairs 291 292 This is necessary for three reasons: 293 294 1. The patch function may need the original source_rorp or 295 dest_rp information, which is not present in the diff it 296 receives. 297 298 2. The metadata must match what is stored in the destination 299 directory. If there is an error, either we do not update the 300 dest directory for that file and the old metadata is used, or 301 the file is deleted on the other end.. Thus we cannot write 302 any metadata until we know the file has been processed 303 correctly. 304 305 3. We may lack permissions on certain destination directories. 306 The permissions of these directories need to be relaxed before 307 we enter them to computer signatures, and then reset after we 308 are done patching everything inside them. 309 310 4. We need some place to put hashes (like SHA1) after computing 311 them and before writing them to the metadata. 312 313 The class caches older source_rorps and dest_rps so the patch 314 function can retrieve them if necessary. The patch function can 315 also update the processed correctly flag. When an item falls out 316 of the cache, we assume it has been processed, and write the 317 metadata for it. 318 319 """ 320 321 def __init__(self, collated_iter, cache_size, dest_root_rp): 322 """Initialize new CCWP.""" 323 self.iter = collated_iter # generates (source_rorp, dest_rorp) pairs 324 self.cache_size = cache_size 325 self.dest_root_rp = dest_root_rp 326 327 self.statfileobj = statistics.init_statfileobj() 328 if Globals.file_statistics: 329 statistics.FileStats.init() 330 self.metawriter = metadata.ManagerObj.GetWriter() 331 332 # the following should map indices to lists 333 # [source_rorp, dest_rorp, changed_flag, success_flag, increment] 334 335 # changed_flag should be true if the rorps are different, and 336 337 # success_flag should be 1 if dest_rorp has been successfully 338 # updated to source_rorp, and 2 if the destination file is 339 # deleted entirely. They both default to false (0). 340 341 # increment holds the RPath of the increment file if one 342 # exists. It is used to record file statistics. 343 344 self.cache_dict = {} 345 self.cache_indices = [] 346 347 # Contains a list of pairs (destination_rps, permissions) to 348 # be used to reset the permissions of certain directories 349 # after we're finished with them 350 self.dir_perms_list = [] 351 352 # Contains list of (index, (source_rorp, diff_rorp)) pairs for 353 # the parent directories of the last item in the cache. 354 self.parent_list = [] 355 356 def __iter__(self): 357 return self 358 359 def __next__(self): 360 """Return next (source_rorp, dest_rorp) pair. StopIteration passed""" 361 source_rorp, dest_rorp = next(self.iter) 362 self.pre_process(source_rorp, dest_rorp) 363 index = source_rorp and source_rorp.index or dest_rorp.index 364 self.cache_dict[index] = [source_rorp, dest_rorp, 0, 0, None] 365 self.cache_indices.append(index) 366 367 if len(self.cache_indices) > self.cache_size: 368 self.shorten_cache() 369 return source_rorp, dest_rorp 370 371 def pre_process(self, source_rorp, dest_rorp): 372 """Do initial processing on source_rorp and dest_rorp 373 374 It will not be clear whether source_rorp and dest_rorp have 375 errors at this point, so don't do anything which assumes they 376 will be backed up correctly. 377 378 """ 379 if Globals.preserve_hardlinks and source_rorp: 380 Hardlink.add_rorp(source_rorp, dest_rorp) 381 if (dest_rorp and dest_rorp.isdir() and Globals.process_uid != 0 382 and dest_rorp.getperms() % 0o1000 < 0o700): 383 self.unreadable_dir_init(source_rorp, dest_rorp) 384 385 def unreadable_dir_init(self, source_rorp, dest_rorp): 386 """Initialize an unreadable dir. 387 388 Make it readable, and if necessary, store the old permissions 389 in self.dir_perms_list so the old perms can be restored. 390 391 """ 392 dest_rp = self.dest_root_rp.new_index(dest_rorp.index) 393 dest_rp.chmod(0o700 | dest_rorp.getperms()) 394 if source_rorp and source_rorp.isdir(): 395 self.dir_perms_list.append((dest_rp, source_rorp.getperms())) 396 397 def shorten_cache(self): 398 """Remove one element from cache, possibly adding it to metadata""" 399 first_index = self.cache_indices[0] 400 del self.cache_indices[0] 401 try: 402 (old_source_rorp, old_dest_rorp, changed_flag, success_flag, 403 inc) = self.cache_dict[first_index] 404 except KeyError: # probably caused by error in file system (dup) 405 log.Log( 406 "Warning index %s missing from CCPP cache" % (first_index, ), 407 2) 408 return 409 del self.cache_dict[first_index] 410 self.post_process(old_source_rorp, old_dest_rorp, changed_flag, 411 success_flag, inc) 412 if self.dir_perms_list: 413 self.reset_dir_perms(first_index) 414 self.update_parent_list(first_index, old_source_rorp, old_dest_rorp) 415 416 def update_parent_list(self, index, src_rorp, dest_rorp): 417 """Update the parent cache with the recently expired main cache entry 418 419 This method keeps parent directories in the secondary parent 420 cache until all their children have expired from the main 421 cache. This is necessary because we may realize we need a 422 parent directory's information after we have processed many 423 subfiles. 424 425 """ 426 if not (src_rorp and src_rorp.isdir() 427 or dest_rorp and dest_rorp.isdir()): 428 return # neither is directory 429 if not self.parent_list: 430 assert index == (), index 431 else: 432 last_parent_index = self.parent_list[-1][0] 433 lp_index, li = len(last_parent_index), len(index) 434 if li > lp_index: # descended into previous parent 435 assert li == lp_index + 1, (index, last_parent_index) 436 else: # In new directory 437 assert last_parent_index[:li - 1] == index[:-1], index 438 self.parent_list = self.parent_list[:li] 439 self.parent_list.append((index, (src_rorp, dest_rorp))) 440 441 def post_process(self, source_rorp, dest_rorp, changed, success, inc): 442 """Post process source_rorp and dest_rorp. 443 444 The point of this is to write statistics and metadata. 445 446 changed will be true if the files have changed. success will 447 be true if the files have been successfully updated (this is 448 always false for un-changed files). 449 450 """ 451 if Globals.preserve_hardlinks and source_rorp: 452 Hardlink.del_rorp(source_rorp) 453 454 if not changed or success: 455 if source_rorp: 456 self.statfileobj.add_source_file(source_rorp) 457 if dest_rorp: 458 self.statfileobj.add_dest_file(dest_rorp) 459 if success == 0: 460 metadata_rorp = dest_rorp 461 elif success == 1: 462 metadata_rorp = source_rorp 463 else: 464 metadata_rorp = None # in case deleted because of ListError 465 if success == 1 or success == 2: 466 self.statfileobj.add_changed(source_rorp, dest_rorp) 467 468 if metadata_rorp and metadata_rorp.lstat(): 469 self.metawriter.write_object(metadata_rorp) 470 if Globals.file_statistics: 471 statistics.FileStats.update(source_rorp, dest_rorp, changed, inc) 472 473 def reset_dir_perms(self, current_index): 474 """Reset the permissions of directories when we have left them""" 475 dir_rp, perms = self.dir_perms_list[-1] 476 dir_index = dir_rp.index 477 if (current_index > dir_index 478 and current_index[:len(dir_index)] != dir_index): 479 dir_rp.chmod(perms) # out of directory, reset perms now 480 481 def in_cache(self, index): 482 """Return true if given index is cached""" 483 return index in self.cache_dict 484 485 def flag_success(self, index): 486 """Signal that the file with given index was updated successfully""" 487 self.cache_dict[index][3] = 1 488 489 def flag_deleted(self, index): 490 """Signal that the destination file was deleted""" 491 self.cache_dict[index][3] = 2 492 493 def flag_changed(self, index): 494 """Signal that the file with given index has changed""" 495 self.cache_dict[index][2] = 1 496 497 def set_inc(self, index, inc): 498 """Set the increment of the current file""" 499 self.cache_dict[index][4] = inc 500 501 def get_parent_rorps(self, index): 502 """Retrieve (src_rorp, dest_rorp) pair from parent cache""" 503 for parent_index, pair in self.parent_list: 504 if parent_index == index: 505 return pair 506 raise KeyError(index) 507 508 def get_rorps(self, index): 509 """Retrieve (source_rorp, dest_rorp) from cache""" 510 try: 511 return self.cache_dict[index][:2] 512 except KeyError: 513 return self.get_parent_rorps(index) 514 515 def get_source_rorp(self, index): 516 """Retrieve source_rorp with given index from cache""" 517 assert index >= self.cache_indices[0], \ 518 ("CCPP index out of order: %s %s" % 519 (repr(index), repr(self.cache_indices[0]))) 520 try: 521 return self.cache_dict[index][0] 522 except KeyError: 523 return self.get_parent_rorps(index)[0] 524 525 def get_mirror_rorp(self, index): 526 """Retrieve mirror_rorp with given index from cache""" 527 try: 528 return self.cache_dict[index][1] 529 except KeyError: 530 return self.get_parent_rorps(index)[1] 531 532 def update_hash(self, index, sha1sum): 533 """Update the source rorp's SHA1 hash""" 534 self.get_source_rorp(index).set_sha1(sha1sum) 535 536 def update_hardlink_hash(self, diff_rorp): 537 """Tag associated source_rorp with same hash diff_rorp points to""" 538 sha1sum = Hardlink.get_sha1(diff_rorp) 539 if not sha1sum: 540 return 541 source_rorp = self.get_source_rorp(diff_rorp.index) 542 source_rorp.set_sha1(sha1sum) 543 544 def close(self): 545 """Process the remaining elements in the cache""" 546 while self.cache_indices: 547 self.shorten_cache() 548 while self.dir_perms_list: 549 dir_rp, perms = self.dir_perms_list.pop() 550 dir_rp.chmod(perms) 551 self.metawriter.close() 552 metadata.ManagerObj.ConvertMetaToDiff() 553 554 555class PatchITRB(rorpiter.ITRBranch): 556 """Patch an rpath with the given diff iters (use with IterTreeReducer) 557 558 The main complication here involves directories. We have to 559 finish processing the directory after what's in the directory, as 560 the directory may have inappropriate permissions to alter the 561 contents or the dir's mtime could change as we change the 562 contents. 563 564 """ 565 566 def __init__(self, basis_root_rp, CCPP): 567 """Set basis_root_rp, the base of the tree to be incremented""" 568 self.basis_root_rp = basis_root_rp 569 assert basis_root_rp.conn is Globals.local_connection 570 self.statfileobj = (statistics.get_active_statfileobj() 571 or statistics.StatFileObj()) 572 self.dir_replacement, self.dir_update = None, None 573 self.CCPP = CCPP 574 self.error_handler = robust.get_error_handler("UpdateError") 575 576 def can_fast_process(self, index, diff_rorp): 577 """True if diff_rorp and mirror are not directories""" 578 mirror_rorp = self.CCPP.get_mirror_rorp(index) 579 return not (diff_rorp.isdir() or (mirror_rorp and mirror_rorp.isdir())) 580 581 def fast_process(self, index, diff_rorp): 582 """Patch base_rp with diff_rorp (case where neither is directory)""" 583 mirror_rp, discard = longname.get_mirror_inc_rps( 584 self.CCPP.get_rorps(index), self.basis_root_rp) 585 assert not mirror_rp.isdir(), mirror_rp 586 tf = TempFile.new(mirror_rp) 587 if self.patch_to_temp(mirror_rp, diff_rorp, tf): 588 if tf.lstat(): 589 if robust.check_common_error(self.error_handler, rpath.rename, 590 (tf, mirror_rp)) is None: 591 self.CCPP.flag_success(index) 592 else: 593 tf.delete() 594 elif mirror_rp and mirror_rp.lstat(): 595 mirror_rp.delete() 596 self.CCPP.flag_deleted(index) 597 else: 598 tf.setdata() 599 if tf.lstat(): 600 tf.delete() 601 602 def patch_to_temp(self, basis_rp, diff_rorp, new): 603 """Patch basis_rp, writing output in new, which doesn't exist yet 604 605 Returns true if able to write new as desired, false if 606 UpdateError or similar gets in the way. 607 608 """ 609 if diff_rorp.isflaglinked(): 610 self.patch_hardlink_to_temp(diff_rorp, new) 611 elif diff_rorp.get_attached_filetype() == 'snapshot': 612 result = self.patch_snapshot_to_temp(diff_rorp, new) 613 if not result: 614 return 0 615 elif result == 2: 616 return 1 # SpecialFile 617 elif not self.patch_diff_to_temp(basis_rp, diff_rorp, new): 618 return 0 619 if new.lstat(): 620 if diff_rorp.isflaglinked(): 621 if Globals.eas_write: 622 """ `isflaglinked() == True` implies that we are processing 623 the 2nd (or later) file in a group of files linked to an 624 inode. As such, we don't need to perform the usual 625 `copy_attribs(diff_rorp, new)` for the inode because that 626 was already done when the 1st file in the group was 627 processed. Nonetheless, we still must perform the following 628 task (which would have normally been performed by 629 `copy_attribs()`). Otherwise, the subsequent call to 630 `matches_cached_rorp(diff_rorp, new)` will fail because the 631 new rorp's metadata would be missing the extended attribute 632 data. 633 """ 634 new.data['ea'] = diff_rorp.get_ea() 635 else: 636 rpath.copy_attribs(diff_rorp, new) 637 return self.matches_cached_rorp(diff_rorp, new) 638 639 def patch_hardlink_to_temp(self, diff_rorp, new): 640 """Hardlink diff_rorp to temp, update hash if necessary""" 641 Hardlink.link_rp(diff_rorp, new, self.basis_root_rp) 642 self.CCPP.update_hardlink_hash(diff_rorp) 643 644 def patch_snapshot_to_temp(self, diff_rorp, new): 645 """Write diff_rorp to new, return true if successful 646 647 Returns 1 if normal success, 2 if special file is written, 648 whether or not it is successful. This is because special 649 files either fail with a SpecialFileError, or don't need to be 650 compared. 651 652 """ 653 if diff_rorp.isspecial(): 654 self.write_special(diff_rorp, new) 655 rpath.copy_attribs(diff_rorp, new) 656 return 2 657 658 report = robust.check_common_error(self.error_handler, rpath.copy, 659 (diff_rorp, new)) 660 if isinstance(report, hash.Report): 661 self.CCPP.update_hash(diff_rorp.index, report.sha1_digest) 662 return 1 663 return report != 0 # if == 0, error_handler caught something 664 665 def patch_diff_to_temp(self, basis_rp, diff_rorp, new): 666 """Apply diff_rorp to basis_rp, write output in new""" 667 assert diff_rorp.get_attached_filetype() == 'diff' 668 report = robust.check_common_error( 669 self.error_handler, Rdiff.patch_local, (basis_rp, diff_rorp, new)) 670 if isinstance(report, hash.Report): 671 self.CCPP.update_hash(diff_rorp.index, report.sha1_digest) 672 return 1 673 return report != 0 # if report == 0, error 674 675 def matches_cached_rorp(self, diff_rorp, new_rp): 676 """Return true if new_rp matches cached src rorp 677 678 This is a final check to make sure the temp file just written 679 matches the stats which we got earlier. If it doesn't it 680 could confuse the regress operation. This is only necessary 681 for regular files. 682 683 """ 684 if not new_rp.isreg(): 685 return 1 686 cached_rorp = self.CCPP.get_source_rorp(diff_rorp.index) 687 if cached_rorp and cached_rorp.equal_loose(new_rp): 688 return 1 689 log.ErrorLog.write_if_open( 690 "UpdateError", diff_rorp, "Updated mirror " 691 "temp file '%s' does not match source" % new_rp.get_safepath()) 692 return 0 693 694 def write_special(self, diff_rorp, new): 695 """Write diff_rorp (which holds special file) to new""" 696 eh = robust.get_error_handler("SpecialFileError") 697 if robust.check_common_error(eh, rpath.copy, (diff_rorp, new)) == 0: 698 new.setdata() 699 if new.lstat(): 700 new.delete() 701 new.touch() 702 703 def start_process(self, index, diff_rorp): 704 """Start processing directory - record information for later""" 705 self.base_rp, discard = longname.get_mirror_inc_rps( 706 self.CCPP.get_rorps(index), self.basis_root_rp) 707 if diff_rorp.isdir(): 708 self.prepare_dir(diff_rorp, self.base_rp) 709 elif self.set_dir_replacement(diff_rorp, self.base_rp): 710 if diff_rorp.lstat(): 711 self.CCPP.flag_success(index) 712 else: 713 self.CCPP.flag_deleted(index) 714 715 def set_dir_replacement(self, diff_rorp, base_rp): 716 """Set self.dir_replacement, which holds data until done with dir 717 718 This is used when base_rp is a dir, and diff_rorp is not. 719 Returns 1 for success or 0 for failure 720 721 """ 722 assert diff_rorp.get_attached_filetype() == 'snapshot' 723 self.dir_replacement = TempFile.new(base_rp) 724 if not self.patch_to_temp(None, diff_rorp, self.dir_replacement): 725 if self.dir_replacement.lstat(): 726 self.dir_replacement.delete() 727 # Was an error, so now restore original directory 728 rpath.copy_with_attribs( 729 self.CCPP.get_mirror_rorp(diff_rorp.index), 730 self.dir_replacement) 731 return 0 732 else: 733 return 1 734 735 def prepare_dir(self, diff_rorp, base_rp): 736 """Prepare base_rp to be a directory""" 737 self.dir_update = diff_rorp.getRORPath() # make copy in case changes 738 if not base_rp.isdir(): 739 if base_rp.lstat(): 740 self.base_rp.delete() 741 base_rp.setdata() 742 base_rp.mkdir() 743 self.CCPP.flag_success(diff_rorp.index) 744 else: # maybe no change, so query CCPP before tagging success 745 if self.CCPP.in_cache(diff_rorp.index): 746 self.CCPP.flag_success(diff_rorp.index) 747 748 def end_process(self): 749 """Finish processing directory""" 750 if self.dir_update: 751 assert self.base_rp.isdir() 752 rpath.copy_attribs(self.dir_update, self.base_rp) 753 754 if (Globals.process_uid != 0 755 and self.dir_update.getperms() % 0o1000 < 0o700): 756 # Directory was unreadable at start -- keep it readable 757 # until the end of the backup process. 758 self.base_rp.chmod(0o700 | self.dir_update.getperms()) 759 elif self.dir_replacement: 760 self.base_rp.rmdir() 761 if self.dir_replacement.lstat(): 762 rpath.rename(self.dir_replacement, self.base_rp) 763 764 765class IncrementITRB(PatchITRB): 766 """Patch an rpath with the given diff iters and write increments 767 768 Like PatchITRB, but this time also write increments. 769 770 """ 771 772 def __init__(self, basis_root_rp, inc_root_rp, rorp_cache): 773 self.inc_root_rp = inc_root_rp 774 PatchITRB.__init__(self, basis_root_rp, rorp_cache) 775 776 def fast_process(self, index, diff_rorp): 777 """Patch base_rp with diff_rorp and write increment (neither is dir)""" 778 mirror_rp, inc_prefix = longname.get_mirror_inc_rps( 779 self.CCPP.get_rorps(index), self.basis_root_rp, self.inc_root_rp) 780 tf = TempFile.new(mirror_rp) 781 if self.patch_to_temp(mirror_rp, diff_rorp, tf): 782 inc = robust.check_common_error(self.error_handler, 783 increment.Increment, 784 (tf, mirror_rp, inc_prefix)) 785 if inc is not None and not isinstance(inc, int): 786 self.CCPP.set_inc(index, inc) 787 if inc.isreg(): 788 inc.fsync_with_dir() # Write inc before rp changed 789 if tf.lstat(): 790 if robust.check_common_error(self.error_handler, 791 rpath.rename, 792 (tf, mirror_rp)) is None: 793 self.CCPP.flag_success(index) 794 else: 795 tf.delete() 796 elif mirror_rp.lstat(): 797 mirror_rp.delete() 798 self.CCPP.flag_deleted(index) 799 return # normal return, otherwise error occurred 800 tf.setdata() 801 if tf.lstat(): 802 tf.delete() 803 804 def start_process(self, index, diff_rorp): 805 """Start processing directory""" 806 self.base_rp, inc_prefix = longname.get_mirror_inc_rps( 807 self.CCPP.get_rorps(index), self.basis_root_rp, self.inc_root_rp) 808 self.base_rp.setdata() 809 assert diff_rorp.isdir() or self.base_rp.isdir(), \ 810 ("Either %s or %s must be a directory" % (repr(diff_rorp.get_safeindexpath()), 811 repr(self.base_rp.get_safepath()))) 812 if diff_rorp.isdir(): 813 inc = increment.Increment(diff_rorp, self.base_rp, inc_prefix) 814 if inc and inc.isreg(): 815 inc.fsync_with_dir() # must write inc before rp changed 816 self.base_rp.setdata() # in case written by increment above 817 self.prepare_dir(diff_rorp, self.base_rp) 818 elif self.set_dir_replacement(diff_rorp, self.base_rp): 819 inc = increment.Increment(self.dir_replacement, self.base_rp, 820 inc_prefix) 821 if inc: 822 self.CCPP.set_inc(index, inc) 823 self.CCPP.flag_success(index) 824