1# -*- coding: utf-8 -*- 2# © Copyright EnterpriseDB UK Limited 2011-2021 3# 4# This file is part of Barman. 5# 6# Barman is free software: you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation, either version 3 of the License, or 9# (at your option) any later version. 10# 11# Barman is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with Barman. If not, see <http://www.gnu.org/licenses/>. 18 19""" 20Copy controller module 21 22A copy controller will handle the copy between a series of files and directory, 23and their final destination. 24""" 25 26import collections 27import datetime 28import logging 29import os.path 30import re 31import shutil 32import signal 33import tempfile 34from functools import partial 35from multiprocessing import Lock, Pool 36 37import dateutil.tz 38 39from barman.command_wrappers import RsyncPgData 40from barman.exceptions import CommandFailedException, RsyncListFilesFailure 41from barman.utils import human_readable_timedelta, total_seconds 42 43_logger = logging.getLogger(__name__) 44 45_worker_callable = None 46""" 47Global variable containing a callable used to execute the jobs. 48Initialized by `_init_worker` and used by `_run_worker` function. 49This variable must be None outside a multiprocessing worker Process. 50""" 51 52# Parallel copy bucket size (10GB) 53BUCKET_SIZE = 1024 * 1024 * 1024 * 10 54 55 56def _init_worker(func): 57 """ 58 Store the callable used to execute jobs passed to `_run_worker` function 59 60 :param callable func: the callable to invoke for every job 61 """ 62 global _worker_callable 63 _worker_callable = func 64 65 66def _run_worker(job): 67 """ 68 Execute a job using the callable set using `_init_worker` function 69 70 :param _RsyncJob job: the job to be executed 71 """ 72 global _worker_callable 73 assert ( 74 _worker_callable is not None 75 ), "Worker has not been initialized with `_init_worker`" 76 77 # This is the entrypoint of the worker process. Since the KeyboardInterrupt 78 # exceptions is handled by the main process, let's forget about Ctrl-C 79 # here. 80 # When the parent process will receive a KeyboardInterrupt, it will ask 81 # the pool to terminate its workers and then terminate itself. 82 signal.signal(signal.SIGINT, signal.SIG_IGN) 83 return _worker_callable(job) 84 85 86class _RsyncJob(object): 87 """ 88 A job to be executed by a worker Process 89 """ 90 91 def __init__(self, item_idx, description, id=None, file_list=None, checksum=None): 92 """ 93 :param int item_idx: The index of copy item containing this job 94 :param str description: The description of the job, used for logging 95 :param int id: Job ID (as in bucket) 96 :param list[RsyncCopyController._FileItem] file_list: Path to the file 97 containing the file list 98 :param bool checksum: Whether to force the checksum verification 99 """ 100 self.id = id 101 self.item_idx = item_idx 102 self.description = description 103 self.file_list = file_list 104 self.checksum = checksum 105 106 # Statistics 107 self.copy_start_time = None 108 self.copy_end_time = None 109 110 111class _FileItem(collections.namedtuple("_FileItem", "mode size date path")): 112 """ 113 This named tuple is used to store the content each line of the output 114 of a "rsync --list-only" call 115 """ 116 117 118class _RsyncCopyItem(object): 119 """ 120 Internal data object that contains the information about one of the items 121 that have to be copied during a RsyncCopyController run. 122 """ 123 124 def __init__( 125 self, 126 label, 127 src, 128 dst, 129 exclude=None, 130 exclude_and_protect=None, 131 include=None, 132 is_directory=False, 133 bwlimit=None, 134 reuse=None, 135 item_class=None, 136 optional=False, 137 ): 138 """ 139 The "label" parameter is meant to be used for error messages 140 and logging. 141 142 If "src" or "dst" content begin with a ':' character, it is a remote 143 path. Only local paths are supported in "reuse" argument. 144 145 If "reuse" parameter is provided and is not None, it is used to 146 implement the incremental copy. This only works if "is_directory" is 147 True 148 149 :param str label: a symbolic name for this item 150 :param str src: source directory. 151 :param str dst: destination directory. 152 :param list[str] exclude: list of patterns to be excluded from the 153 copy. The destination will be deleted if present. 154 :param list[str] exclude_and_protect: list of patterns to be excluded 155 from the copy. The destination will be preserved if present. 156 :param list[str] include: list of patterns to be included in the 157 copy even if excluded. 158 :param bool is_directory: Whether the item points to a directory. 159 :param bwlimit: bandwidth limit to be enforced. (KiB) 160 :param str|None reuse: the reference path for incremental mode. 161 :param str|None item_class: If specified carries a meta information 162 about what the object to be copied is. 163 :param bool optional: Whether a failure copying this object should be 164 treated as a fatal failure. This only works if "is_directory" is 165 False 166 """ 167 self.label = label 168 self.src = src 169 self.dst = dst 170 self.exclude = exclude 171 self.exclude_and_protect = exclude_and_protect 172 self.include = include 173 self.is_directory = is_directory 174 self.bwlimit = bwlimit 175 self.reuse = reuse 176 self.item_class = item_class 177 self.optional = optional 178 179 # Attributes that will e filled during the analysis 180 self.temp_dir = None 181 self.dir_file = None 182 self.exclude_and_protect_file = None 183 self.safe_list = None 184 self.check_list = None 185 186 # Statistics 187 self.analysis_start_time = None 188 self.analysis_end_time = None 189 190 # Ensure that the user specified the item class, since it is mandatory 191 # to correctly handle the item 192 assert self.item_class 193 194 def __str__(self): 195 # Prepare strings for messages 196 formatted_class = self.item_class 197 formatted_name = self.src 198 if self.src.startswith(":"): 199 formatted_class = "remote " + self.item_class 200 formatted_name = self.src[1:] 201 formatted_class += " directory" if self.is_directory else " file" 202 203 # Log the operation that is being executed 204 if self.item_class in ( 205 RsyncCopyController.PGDATA_CLASS, 206 RsyncCopyController.PGCONTROL_CLASS, 207 ): 208 return "%s: %s" % (formatted_class, formatted_name) 209 else: 210 return "%s '%s': %s" % (formatted_class, self.label, formatted_name) 211 212 213class RsyncCopyController(object): 214 """ 215 Copy a list of files and directory to their final destination. 216 """ 217 218 # Constants to be used as "item_class" values 219 PGDATA_CLASS = "PGDATA" 220 TABLESPACE_CLASS = "tablespace" 221 PGCONTROL_CLASS = "pg_control" 222 CONFIG_CLASS = "config" 223 224 # This regular expression is used to parse each line of the output 225 # of a "rsync --list-only" call. This regexp has been tested with any known 226 # version of upstream rsync that is supported (>= 3.0.4) 227 LIST_ONLY_RE = re.compile( 228 r""" 229 ^ # start of the line 230 231 # capture the mode (es. "-rw-------") 232 (?P<mode>[-\w]+) 233 \s+ 234 235 # size is an integer 236 (?P<size>\d+) 237 \s+ 238 239 # The date field can have two different form 240 (?P<date> 241 # "2014/06/05 18:00:00" if the sending rsync is compiled 242 # with HAVE_STRFTIME 243 [\d/]+\s+[\d:]+ 244 | 245 # "Thu Jun 5 18:00:00 2014" otherwise 246 \w+\s+\w+\s+\d+\s+[\d:]+\s+\d+ 247 ) 248 \s+ 249 250 # all the remaining characters are part of filename 251 (?P<path>.+) 252 253 $ # end of the line 254 """, 255 re.VERBOSE, 256 ) 257 258 # This regular expression is used to ignore error messages regarding 259 # vanished files that are not really an error. It is used because 260 # in some cases rsync reports it with exit code 23 which could also mean 261 # a fatal error 262 VANISHED_RE = re.compile( 263 r""" 264 ^ # start of the line 265 ( 266 # files which vanished before rsync start 267 rsync:\ link_stat\ ".+"\ failed:\ No\ such\ file\ or\ directory\ \(2\) 268 | 269 # files which vanished after rsync start 270 file\ has\ vanished:\ ".+" 271 | 272 # files which have been truncated during transfer 273 rsync:\ read\ errors\ mapping\ ".+":\ No\ data\ available\ \(61\) 274 | 275 # final summary 276 rsync\ error:\ .* \(code\ 23\)\ at\ main\.c\(\d+\) 277 \ \[(generator|receiver|sender)=[^\]]+\] 278 ) 279 $ # end of the line 280 """, 281 re.VERBOSE + re.IGNORECASE, 282 ) 283 284 def __init__( 285 self, 286 path=None, 287 ssh_command=None, 288 ssh_options=None, 289 network_compression=False, 290 reuse_backup=None, 291 safe_horizon=None, 292 exclude=None, 293 retry_times=0, 294 retry_sleep=0, 295 workers=1, 296 ): 297 """ 298 :param str|None path: the PATH where rsync executable will be searched 299 :param str|None ssh_command: the ssh executable to be used 300 to access remote paths 301 :param list[str]|None ssh_options: list of ssh options to be used 302 to access remote paths 303 :param boolean network_compression: whether to use the network 304 compression 305 :param str|None reuse_backup: if "link" or "copy" enables 306 the incremental copy feature 307 :param datetime.datetime|None safe_horizon: if set, assumes that every 308 files older than it are save to copy without checksum verification. 309 :param list[str]|None exclude: list of patterns to be excluded 310 from the copy 311 :param int retry_times: The number of times to retry a failed operation 312 :param int retry_sleep: Sleep time between two retry 313 :param int workers: The number of parallel copy workers 314 """ 315 316 super(RsyncCopyController, self).__init__() 317 self.path = path 318 self.ssh_command = ssh_command 319 self.ssh_options = ssh_options 320 self.network_compression = network_compression 321 self.reuse_backup = reuse_backup 322 self.safe_horizon = safe_horizon 323 self.exclude = exclude 324 self.retry_times = retry_times 325 self.retry_sleep = retry_sleep 326 self.workers = workers 327 328 self._logger_lock = Lock() 329 330 # Assume we are running with a recent rsync (>= 3.1) 331 self.rsync_has_ignore_missing_args = True 332 333 self.item_list = [] 334 """List of items to be copied""" 335 336 self.rsync_cache = {} 337 """A cache of RsyncPgData objects""" 338 339 # Attributes used for progress reporting 340 341 self.total_steps = None 342 """Total number of steps""" 343 344 self.current_step = None 345 """Current step number""" 346 347 self.temp_dir = None 348 """Temp dir used to store the status during the copy""" 349 350 # Statistics 351 352 self.jobs_done = None 353 """Already finished jobs list""" 354 355 self.copy_start_time = None 356 """Copy start time""" 357 358 self.copy_end_time = None 359 """Copy end time""" 360 361 def add_directory( 362 self, 363 label, 364 src, 365 dst, 366 exclude=None, 367 exclude_and_protect=None, 368 include=None, 369 bwlimit=None, 370 reuse=None, 371 item_class=None, 372 ): 373 """ 374 Add a directory that we want to copy. 375 376 If "src" or "dst" content begin with a ':' character, it is a remote 377 path. Only local paths are supported in "reuse" argument. 378 379 If "reuse" parameter is provided and is not None, it is used to 380 implement the incremental copy. This only works if "is_directory" is 381 True 382 383 :param str label: symbolic name to be used for error messages 384 and logging. 385 :param str src: source directory. 386 :param str dst: destination directory. 387 :param list[str] exclude: list of patterns to be excluded from the 388 copy. The destination will be deleted if present. 389 :param list[str] exclude_and_protect: list of patterns to be excluded 390 from the copy. The destination will be preserved if present. 391 :param list[str] include: list of patterns to be included in the 392 copy even if excluded. 393 :param bwlimit: bandwidth limit to be enforced. (KiB) 394 :param str|None reuse: the reference path for incremental mode. 395 :param str item_class: If specified carries a meta information about 396 what the object to be copied is. 397 """ 398 self.item_list.append( 399 _RsyncCopyItem( 400 label=label, 401 src=src, 402 dst=dst, 403 is_directory=True, 404 bwlimit=bwlimit, 405 reuse=reuse, 406 item_class=item_class, 407 optional=False, 408 exclude=exclude, 409 exclude_and_protect=exclude_and_protect, 410 include=include, 411 ) 412 ) 413 414 def add_file(self, label, src, dst, item_class=None, optional=False): 415 """ 416 Add a file that we want to copy 417 418 :param str label: symbolic name to be used for error messages 419 and logging. 420 :param str src: source directory. 421 :param str dst: destination directory. 422 :param str item_class: If specified carries a meta information about 423 what the object to be copied is. 424 :param bool optional: Whether a failure copying this object should be 425 treated as a fatal failure. 426 """ 427 self.item_list.append( 428 _RsyncCopyItem( 429 label=label, 430 src=src, 431 dst=dst, 432 is_directory=False, 433 bwlimit=None, 434 reuse=None, 435 item_class=item_class, 436 optional=optional, 437 ) 438 ) 439 440 def _rsync_factory(self, item): 441 """ 442 Build the RsyncPgData object required for copying the provided item 443 444 :param _RsyncCopyItem item: information about a copy operation 445 :rtype: RsyncPgData 446 """ 447 # If the object already exists, use it 448 if item in self.rsync_cache: 449 return self.rsync_cache[item] 450 451 # Prepare the command arguments 452 args = self._reuse_args(item.reuse) 453 # Merge the global exclude with the one into the item object 454 if self.exclude and item.exclude: 455 exclude = self.exclude + item.exclude 456 else: 457 exclude = self.exclude or item.exclude 458 459 # Using `--ignore-missing-args` could fail in case 460 # the local or the remote rsync is older than 3.1. 461 # In that case we expect that during the analyze phase 462 # we get an error. The analyze code must catch that error 463 # and retry after flushing the rsync cache. 464 if self.rsync_has_ignore_missing_args: 465 args.append("--ignore-missing-args") 466 467 # TODO: remove debug output or use it to progress tracking 468 # By adding a double '--itemize-changes' option, the rsync 469 # output will contain the full list of files that have been 470 # touched, even those that have not changed 471 args.append("--itemize-changes") 472 args.append("--itemize-changes") 473 474 # Build the rsync object that will execute the copy 475 rsync = RsyncPgData( 476 path=self.path, 477 ssh=self.ssh_command, 478 ssh_options=self.ssh_options, 479 args=args, 480 bwlimit=item.bwlimit, 481 network_compression=self.network_compression, 482 exclude=exclude, 483 exclude_and_protect=item.exclude_and_protect, 484 include=item.include, 485 retry_times=self.retry_times, 486 retry_sleep=self.retry_sleep, 487 retry_handler=partial(self._retry_handler, item), 488 ) 489 self.rsync_cache[item] = rsync 490 return rsync 491 492 def _rsync_set_pre_31_mode(self): 493 """ 494 Stop using `--ignore-missing-args` and restore rsync < 3.1 495 compatibility 496 """ 497 _logger.info( 498 "Detected rsync version less than 3.1. " 499 "top using '--ignore-missing-args' argument." 500 ) 501 self.rsync_has_ignore_missing_args = False 502 self.rsync_cache.clear() 503 504 def copy(self): 505 """ 506 Execute the actual copy 507 """ 508 # Store the start time 509 self.copy_start_time = datetime.datetime.now() 510 511 # Create a temporary directory to hold the file lists. 512 self.temp_dir = tempfile.mkdtemp(suffix="", prefix="barman-") 513 # The following try block is to make sure the temporary directory 514 # will be removed on exit and all the pool workers 515 # have been terminated. 516 pool = None 517 try: 518 # Initialize the counters used by progress reporting 519 self._progress_init() 520 _logger.info("Copy started (safe before %r)", self.safe_horizon) 521 522 # Execute some preliminary steps for each item to be copied 523 for item in self.item_list: 524 525 # The initial preparation is necessary only for directories 526 if not item.is_directory: 527 continue 528 529 # Store the analysis start time 530 item.analysis_start_time = datetime.datetime.now() 531 532 # Analyze the source and destination directory content 533 _logger.info(self._progress_message("[global] analyze %s" % item)) 534 self._analyze_directory(item) 535 536 # Prepare the target directories, removing any unneeded file 537 _logger.info( 538 self._progress_message( 539 "[global] create destination directories and delete " 540 "unknown files for %s" % item 541 ) 542 ) 543 self._create_dir_and_purge(item) 544 545 # Store the analysis end time 546 item.analysis_end_time = datetime.datetime.now() 547 548 # Init the list of jobs done. Every job will be added to this list 549 # once finished. The content will be used to calculate statistics 550 # about the copy process. 551 self.jobs_done = [] 552 553 # The jobs are executed using a parallel processes pool 554 # Each job is generated by `self._job_generator`, it is executed by 555 # `_run_worker` using `self._execute_job`, which has been set 556 # calling `_init_worker` function during the Pool initialization. 557 pool = Pool( 558 processes=self.workers, 559 initializer=_init_worker, 560 initargs=(self._execute_job,), 561 ) 562 for job in pool.imap_unordered( 563 _run_worker, self._job_generator(exclude_classes=[self.PGCONTROL_CLASS]) 564 ): 565 # Store the finished job for further analysis 566 self.jobs_done.append(job) 567 568 # The PGCONTROL_CLASS items must always be copied last 569 for job in pool.imap_unordered( 570 _run_worker, self._job_generator(include_classes=[self.PGCONTROL_CLASS]) 571 ): 572 # Store the finished job for further analysis 573 self.jobs_done.append(job) 574 575 except KeyboardInterrupt: 576 _logger.info( 577 "Copy interrupted by the user (safe before %s)", self.safe_horizon 578 ) 579 raise 580 except BaseException: 581 _logger.info("Copy failed (safe before %s)", self.safe_horizon) 582 raise 583 else: 584 _logger.info("Copy finished (safe before %s)", self.safe_horizon) 585 finally: 586 # The parent process may have finished naturally or have been 587 # interrupted by an exception (i.e. due to a copy error or 588 # the user pressing Ctrl-C). 589 # At this point we must make sure that all the workers have been 590 # correctly terminated before continuing. 591 if pool: 592 pool.terminate() 593 pool.join() 594 # Clean up the temp dir, any exception raised here is logged 595 # and discarded to not clobber an eventual exception being handled. 596 try: 597 shutil.rmtree(self.temp_dir) 598 except EnvironmentError as e: 599 _logger.error("Error cleaning up '%s' (%s)", self.temp_dir, e) 600 self.temp_dir = None 601 602 # Store the end time 603 self.copy_end_time = datetime.datetime.now() 604 605 def _job_generator(self, include_classes=None, exclude_classes=None): 606 """ 607 Generate the jobs to be executed by the workers 608 609 :param list[str]|None include_classes: If not none, copy only the items 610 which have one of the specified classes. 611 :param list[str]|None exclude_classes: If not none, skip all items 612 which have one of the specified classes. 613 :rtype: iter[_RsyncJob] 614 """ 615 for item_idx, item in enumerate(self.item_list): 616 617 # Skip items of classes which are not required 618 if include_classes and item.item_class not in include_classes: 619 continue 620 if exclude_classes and item.item_class in exclude_classes: 621 continue 622 623 # If the item is a directory then copy it in two stages, 624 # otherwise copy it using a plain rsync 625 if item.is_directory: 626 627 # Copy the safe files using the default rsync algorithm 628 msg = self._progress_message("[%%s] %%s copy safe files from %s" % item) 629 phase_skipped = True 630 for i, bucket in enumerate(self._fill_buckets(item.safe_list)): 631 phase_skipped = False 632 yield _RsyncJob( 633 item_idx, 634 id=i, 635 description=msg, 636 file_list=bucket, 637 checksum=False, 638 ) 639 if phase_skipped: 640 _logger.info(msg, "global", "skipping") 641 642 # Copy the check files forcing rsync to verify the checksum 643 msg = self._progress_message( 644 "[%%s] %%s copy files with checksum from %s" % item 645 ) 646 phase_skipped = True 647 for i, bucket in enumerate(self._fill_buckets(item.check_list)): 648 phase_skipped = False 649 yield _RsyncJob( 650 item_idx, id=i, description=msg, file_list=bucket, checksum=True 651 ) 652 if phase_skipped: 653 _logger.info(msg, "global", "skipping") 654 655 else: 656 # Copy the file using plain rsync 657 msg = self._progress_message("[%%s] %%s copy %s" % item) 658 yield _RsyncJob(item_idx, description=msg) 659 660 def _fill_buckets(self, file_list): 661 """ 662 Generate buckets for parallel copy 663 664 :param list[_FileItem] file_list: list of file to transfer 665 :rtype: iter[list[_FileItem]] 666 """ 667 # If there is only one worker, fall back to copying all file at once 668 if self.workers < 2: 669 yield file_list 670 return 671 672 # Create `self.workers` buckets 673 buckets = [[] for _ in range(self.workers)] 674 bucket_sizes = [0 for _ in range(self.workers)] 675 pos = -1 676 # Sort the list by size 677 for entry in sorted(file_list, key=lambda item: item.size): 678 # Try to fill the file in a bucket 679 for i in range(self.workers): 680 pos = (pos + 1) % self.workers 681 new_size = bucket_sizes[pos] + entry.size 682 if new_size < BUCKET_SIZE: 683 bucket_sizes[pos] = new_size 684 buckets[pos].append(entry) 685 break 686 else: 687 # All the buckets are filled, so return them all 688 for i in range(self.workers): 689 if len(buckets[i]) > 0: 690 yield buckets[i] 691 # Clear the bucket 692 buckets[i] = [] 693 bucket_sizes[i] = 0 694 # Put the current file in the first bucket 695 bucket_sizes[0] = entry.size 696 buckets[0].append(entry) 697 pos = 0 698 # Send all the remaining buckets 699 for i in range(self.workers): 700 if len(buckets[i]) > 0: 701 yield buckets[i] 702 703 def _execute_job(self, job): 704 """ 705 Execute a `_RsyncJob` in a worker process 706 707 :type job: _RsyncJob 708 """ 709 item = self.item_list[job.item_idx] 710 if job.id is not None: 711 bucket = "bucket %s" % job.id 712 else: 713 bucket = "global" 714 # Build the rsync object required for the copy 715 rsync = self._rsync_factory(item) 716 # Store the start time 717 job.copy_start_time = datetime.datetime.now() 718 # Write in the log that the job is starting 719 with self._logger_lock: 720 _logger.info(job.description, bucket, "starting") 721 if item.is_directory: 722 # A directory item must always have checksum and file_list set 723 assert ( 724 job.file_list is not None 725 ), "A directory item must not have a None `file_list` attribute" 726 assert ( 727 job.checksum is not None 728 ), "A directory item must not have a None `checksum` attribute" 729 730 # Generate a unique name for the file containing the list of files 731 file_list_path = os.path.join( 732 self.temp_dir, 733 "%s_%s_%s.list" 734 % (item.label, "check" if job.checksum else "safe", os.getpid()), 735 ) 736 737 # Write the list, one path per line 738 with open(file_list_path, "w") as file_list: 739 for entry in job.file_list: 740 assert isinstance(entry, _FileItem), ( 741 "expect %r to be a _FileItem" % entry 742 ) 743 file_list.write(entry.path + "\n") 744 745 self._copy( 746 rsync, 747 item.src, 748 item.dst, 749 file_list=file_list_path, 750 checksum=job.checksum, 751 ) 752 else: 753 # A file must never have checksum and file_list set 754 assert ( 755 job.file_list is None 756 ), "A file item must have a None `file_list` attribute" 757 assert ( 758 job.checksum is None 759 ), "A file item must have a None `checksum` attribute" 760 rsync(item.src, item.dst, allowed_retval=(0, 23, 24)) 761 if rsync.ret == 23: 762 if item.optional: 763 _logger.warning("Ignoring error reading %s", item) 764 else: 765 raise CommandFailedException( 766 dict(ret=rsync.ret, out=rsync.out, err=rsync.err) 767 ) 768 # Store the stop time 769 job.copy_end_time = datetime.datetime.now() 770 # Write in the log that the job is finished 771 with self._logger_lock: 772 _logger.info( 773 job.description, 774 bucket, 775 "finished (duration: %s)" 776 % human_readable_timedelta(job.copy_end_time - job.copy_start_time), 777 ) 778 # Return the job to the caller, for statistics purpose 779 return job 780 781 def _progress_init(self): 782 """ 783 Init counters used by progress logging 784 """ 785 self.total_steps = 0 786 for item in self.item_list: 787 # Directories require 4 steps, files only one 788 if item.is_directory: 789 self.total_steps += 4 790 else: 791 self.total_steps += 1 792 self.current_step = 0 793 794 def _progress_message(self, msg): 795 """ 796 Log a message containing the progress 797 798 :param str msg: the message 799 :return srt: message to log 800 """ 801 self.current_step += 1 802 return "Copy step %s of %s: %s" % (self.current_step, self.total_steps, msg) 803 804 def _reuse_args(self, reuse_directory): 805 """ 806 If reuse_backup is 'copy' or 'link', build the rsync option to enable 807 the reuse, otherwise returns an empty list 808 809 :param str reuse_directory: the local path with data to be reused 810 :rtype: list[str] 811 """ 812 if self.reuse_backup in ("copy", "link") and reuse_directory is not None: 813 return ["--%s-dest=%s" % (self.reuse_backup, reuse_directory)] 814 else: 815 return [] 816 817 def _retry_handler(self, item, command, args, kwargs, attempt, exc): 818 """ 819 820 :param _RsyncCopyItem item: The item that is being processed 821 :param RsyncPgData command: Command object being executed 822 :param list args: command args 823 :param dict kwargs: command kwargs 824 :param int attempt: attempt number (starting from 0) 825 :param CommandFailedException exc: the exception which caused the 826 failure 827 """ 828 _logger.warn("Failure executing rsync on %s (attempt %s)", item, attempt) 829 _logger.warn("Retrying in %s seconds", self.retry_sleep) 830 831 def _analyze_directory(self, item): 832 """ 833 Analyzes the status of source and destination directories identifying 834 the files that are safe from the point of view of a PostgreSQL backup. 835 836 The safe_horizon value is the timestamp of the beginning of the 837 older backup involved in copy (as source or destination). Any files 838 updated after that timestamp, must be checked as they could have been 839 modified during the backup - and we do not reply WAL files to update 840 them. 841 842 The destination directory must exist. 843 844 If the "safe_horizon" parameter is None, we cannot make any 845 assumptions about what can be considered "safe", so we must check 846 everything with checksums enabled. 847 848 If "ref" parameter is provided and is not None, it is looked up 849 instead of the "dst" dir. This is useful when we are copying files 850 using '--link-dest' and '--copy-dest' rsync options. 851 In this case, both the "dst" and "ref" dir must exist and 852 the "dst" dir must be empty. 853 854 If source or destination path begin with a ':' character, 855 it is a remote path. Only local paths are supported in "ref" argument. 856 857 :param _RsyncCopyItem item: information about a copy operation 858 """ 859 860 # If reference is not set we use dst as reference path 861 ref = item.reuse 862 if ref is None: 863 ref = item.dst 864 865 # Make sure the ref path ends with a '/' or rsync will add the 866 # last path component to all the returned items during listing 867 if ref[-1] != "/": 868 ref += "/" 869 870 # Build a hash containing all files present on reference directory. 871 # Directories are not included 872 try: 873 ref_hash = {} 874 ref_has_content = False 875 for file_item in self._list_files(item, ref): 876 if file_item.path != "." and not ( 877 item.label == "pgdata" and file_item.path == "pg_tblspc" 878 ): 879 ref_has_content = True 880 if file_item.mode[0] != "d": 881 ref_hash[file_item.path] = file_item 882 except (CommandFailedException, RsyncListFilesFailure) as e: 883 # Here we set ref_hash to None, thus disable the code that marks as 884 # "safe matching" those destination files with different time or 885 # size, even if newer than "safe_horizon". As a result, all files 886 # newer than "safe_horizon" will be checked through checksums. 887 ref_hash = None 888 _logger.error( 889 "Unable to retrieve reference directory file list. " 890 "Using only source file information to decide which files" 891 " need to be copied with checksums enabled: %s" % e 892 ) 893 894 # The 'dir.list' file will contain every directory in the 895 # source tree 896 item.dir_file = os.path.join(self.temp_dir, "%s_dir.list" % item.label) 897 dir_list = open(item.dir_file, "w+") 898 # The 'protect.list' file will contain a filter rule to protect 899 # each file present in the source tree. It will be used during 900 # the first phase to delete all the extra files on destination. 901 item.exclude_and_protect_file = os.path.join( 902 self.temp_dir, "%s_exclude_and_protect.filter" % item.label 903 ) 904 exclude_and_protect_filter = open(item.exclude_and_protect_file, "w+") 905 906 if not ref_has_content: 907 # If the destination directory is empty then include all 908 # directories and exclude all files. This stops the rsync 909 # command which runs during the _create_dir_and_purge function 910 # from copying the entire contents of the source directory and 911 # ensures it only creates the directories. 912 exclude_and_protect_filter.write("+ */\n") 913 exclude_and_protect_filter.write("- *\n") 914 915 # The `safe_list` will contain all items older than 916 # safe_horizon, as well as files that we know rsync will 917 # check anyway due to a difference in mtime or size 918 item.safe_list = [] 919 # The `check_list` will contain all items that need 920 # to be copied with checksum option enabled 921 item.check_list = [] 922 for entry in self._list_files(item, item.src): 923 # If item is a directory, we only need to save it in 'dir.list' 924 if entry.mode[0] == "d": 925 dir_list.write(entry.path + "\n") 926 continue 927 928 # Add every file in the source path to the list of files 929 # to be protected from deletion ('exclude_and_protect.filter') 930 # But only if we know the destination directory is non-empty 931 if ref_has_content: 932 exclude_and_protect_filter.write("P /" + entry.path + "\n") 933 exclude_and_protect_filter.write("- /" + entry.path + "\n") 934 935 # If source item is older than safe_horizon, 936 # add it to 'safe.list' 937 if self.safe_horizon and entry.date < self.safe_horizon: 938 item.safe_list.append(entry) 939 continue 940 941 # If ref_hash is None, it means we failed to retrieve the 942 # destination file list. We assume the only safe way is to 943 # check every file that is older than safe_horizon 944 if ref_hash is None: 945 item.check_list.append(entry) 946 continue 947 948 # If source file differs by time or size from the matching 949 # destination, rsync will discover the difference in any case. 950 # It is then safe to skip checksum check here. 951 dst_item = ref_hash.get(entry.path, None) 952 if dst_item is None: 953 item.safe_list.append(entry) 954 continue 955 956 different_size = dst_item.size != entry.size 957 different_date = dst_item.date != entry.date 958 if different_size or different_date: 959 item.safe_list.append(entry) 960 continue 961 962 # All remaining files must be checked with checksums enabled 963 item.check_list.append(entry) 964 965 # Close all the control files 966 dir_list.close() 967 exclude_and_protect_filter.close() 968 969 def _create_dir_and_purge(self, item): 970 """ 971 Create destination directories and delete any unknown file 972 973 :param _RsyncCopyItem item: information about a copy operation 974 """ 975 976 # Build the rsync object required for the analysis 977 rsync = self._rsync_factory(item) 978 979 # Create directories and delete any unknown file 980 self._rsync_ignore_vanished_files( 981 rsync, 982 "--recursive", 983 "--delete", 984 "--files-from=%s" % item.dir_file, 985 "--filter", 986 "merge %s" % item.exclude_and_protect_file, 987 item.src, 988 item.dst, 989 check=True, 990 ) 991 992 def _copy(self, rsync, src, dst, file_list, checksum=False): 993 """ 994 The method execute the call to rsync, using as source a 995 a list of files, and adding the the checksum option if required by the 996 caller. 997 998 :param Rsync rsync: the Rsync object used to retrieve the list of files 999 inside the directories 1000 for copy purposes 1001 :param str src: source directory 1002 :param str dst: destination directory 1003 :param str file_list: path to the file containing the sources for rsync 1004 :param bool checksum: if checksum argument for rsync is required 1005 """ 1006 # Build the rsync call args 1007 args = ["--files-from=%s" % file_list] 1008 if checksum: 1009 # Add checksum option if needed 1010 args.append("--checksum") 1011 self._rsync_ignore_vanished_files(rsync, src, dst, *args, check=True) 1012 1013 def _list_files(self, item, path): 1014 """ 1015 This method recursively retrieves a list of files contained in a 1016 directory, either local or remote (if starts with ':') 1017 1018 :param _RsyncCopyItem item: information about a copy operation 1019 :param str path: the path we want to inspect 1020 :except CommandFailedException: if rsync call fails 1021 :except RsyncListFilesFailure: if rsync output can't be parsed 1022 """ 1023 _logger.debug("list_files: %r", path) 1024 1025 # Build the rsync object required for the analysis 1026 rsync = self._rsync_factory(item) 1027 1028 try: 1029 # Use the --no-human-readable option to avoid digit groupings 1030 # in "size" field with rsync >= 3.1.0. 1031 # Ref: http://ftp.samba.org/pub/rsync/src/rsync-3.1.0-NEWS 1032 rsync.get_output( 1033 "--no-human-readable", "--list-only", "-r", path, check=True 1034 ) 1035 except CommandFailedException: 1036 # This could fail due to the local or the remote rsync 1037 # older than 3.1. IF so, fallback to pre 3.1 mode 1038 if self.rsync_has_ignore_missing_args and rsync.ret in ( 1039 12, # Error in rsync protocol data stream (remote) 1040 1, 1041 ): # Syntax or usage error (local) 1042 self._rsync_set_pre_31_mode() 1043 # Recursive call, uses the compatibility mode 1044 for item in self._list_files(item, path): 1045 yield item 1046 return 1047 else: 1048 raise 1049 1050 # Cache tzlocal object we need to build dates 1051 tzinfo = dateutil.tz.tzlocal() 1052 for line in rsync.out.splitlines(): 1053 line = line.rstrip() 1054 match = self.LIST_ONLY_RE.match(line) 1055 if match: 1056 mode = match.group("mode") 1057 # no exceptions here: the regexp forces 'size' to be an integer 1058 size = int(match.group("size")) 1059 try: 1060 date_str = match.group("date") 1061 # The date format has been validated by LIST_ONLY_RE. 1062 # Use "2014/06/05 18:00:00" format if the sending rsync 1063 # is compiled with HAVE_STRFTIME, otherwise use 1064 # "Thu Jun 5 18:00:00 2014" format 1065 if date_str[0].isdigit(): 1066 date = datetime.datetime.strptime(date_str, "%Y/%m/%d %H:%M:%S") 1067 else: 1068 date = datetime.datetime.strptime( 1069 date_str, "%a %b %d %H:%M:%S %Y" 1070 ) 1071 date = date.replace(tzinfo=tzinfo) 1072 except (TypeError, ValueError): 1073 # This should not happen, due to the regexp 1074 msg = ( 1075 "Unable to parse rsync --list-only output line " 1076 "(date): '%s'" % line 1077 ) 1078 _logger.exception(msg) 1079 raise RsyncListFilesFailure(msg) 1080 path = match.group("path") 1081 yield _FileItem(mode, size, date, path) 1082 else: 1083 # This is a hard error, as we are unable to parse the output 1084 # of rsync. It can only happen with a modified or unknown 1085 # rsync version (perhaps newer than 3.1?) 1086 msg = "Unable to parse rsync --list-only output line: '%s'" % line 1087 _logger.error(msg) 1088 raise RsyncListFilesFailure(msg) 1089 1090 def _rsync_ignore_vanished_files(self, rsync, *args, **kwargs): 1091 """ 1092 Wrap an Rsync.get_output() call and ignore missing args 1093 1094 TODO: when rsync 3.1 will be widespread, replace this 1095 with --ignore-missing-args argument 1096 1097 :param Rsync rsync: the Rsync object used to execute the copy 1098 """ 1099 kwargs["allowed_retval"] = (0, 23, 24) 1100 rsync.get_output(*args, **kwargs) 1101 # If return code is 23 and there is any error which doesn't match 1102 # the VANISHED_RE regexp raise an error 1103 if rsync.ret == 23 and rsync.err is not None: 1104 for line in rsync.err.splitlines(): 1105 match = self.VANISHED_RE.match(line.rstrip()) 1106 if match: 1107 continue 1108 else: 1109 _logger.error("First rsync error line: %s", line) 1110 raise CommandFailedException( 1111 dict(ret=rsync.ret, out=rsync.out, err=rsync.err) 1112 ) 1113 return rsync.out, rsync.err 1114 1115 def statistics(self): 1116 """ 1117 Return statistics about the copy object. 1118 1119 :rtype: dict 1120 """ 1121 # This method can only run at the end of a non empty copy 1122 assert self.copy_end_time 1123 assert self.item_list 1124 assert self.jobs_done 1125 1126 # Initialise the result calculating the total runtime 1127 stat = { 1128 "total_time": total_seconds(self.copy_end_time - self.copy_start_time), 1129 "number_of_workers": self.workers, 1130 "analysis_time_per_item": {}, 1131 "copy_time_per_item": {}, 1132 "serialized_copy_time_per_item": {}, 1133 } 1134 1135 # Calculate the time spent during the analysis of the items 1136 analysis_start = None 1137 analysis_end = None 1138 for item in self.item_list: 1139 # Some items don't require analysis 1140 if not item.analysis_end_time: 1141 continue 1142 # Build a human readable name to refer to an item in the output 1143 ident = item.label 1144 if not analysis_start: 1145 analysis_start = item.analysis_start_time 1146 elif analysis_start > item.analysis_start_time: 1147 analysis_start = item.analysis_start_time 1148 1149 if not analysis_end: 1150 analysis_end = item.analysis_end_time 1151 elif analysis_end < item.analysis_end_time: 1152 analysis_end = item.analysis_end_time 1153 1154 stat["analysis_time_per_item"][ident] = total_seconds( 1155 item.analysis_end_time - item.analysis_start_time 1156 ) 1157 stat["analysis_time"] = total_seconds(analysis_end - analysis_start) 1158 1159 # Calculate the time spent per job 1160 # WARNING: this code assumes that every item is copied separately, 1161 # so it's strictly tied to the `_job_generator` method code 1162 item_data = {} 1163 for job in self.jobs_done: 1164 # WARNING: the item contained in the job is not the same object 1165 # contained in self.item_list, as it has gone through two 1166 # pickling/unpickling cycle 1167 # Build a human readable name to refer to an item in the output 1168 ident = self.item_list[job.item_idx].label 1169 # If this is the first time we see this item we just store the 1170 # values from the job 1171 if ident not in item_data: 1172 item_data[ident] = { 1173 "start": job.copy_start_time, 1174 "end": job.copy_end_time, 1175 "total_time": job.copy_end_time - job.copy_start_time, 1176 } 1177 else: 1178 data = item_data[ident] 1179 if data["start"] > job.copy_start_time: 1180 data["start"] = job.copy_start_time 1181 if data["end"] < job.copy_end_time: 1182 data["end"] = job.copy_end_time 1183 data["total_time"] += job.copy_end_time - job.copy_start_time 1184 1185 # Calculate the time spent copying 1186 copy_start = None 1187 copy_end = None 1188 serialized_time = datetime.timedelta(0) 1189 for ident in item_data: 1190 data = item_data[ident] 1191 if copy_start is None or copy_start > data["start"]: 1192 copy_start = data["start"] 1193 if copy_end is None or copy_end < data["end"]: 1194 copy_end = data["end"] 1195 stat["copy_time_per_item"][ident] = total_seconds( 1196 data["end"] - data["start"] 1197 ) 1198 stat["serialized_copy_time_per_item"][ident] = total_seconds( 1199 data["total_time"] 1200 ) 1201 serialized_time += data["total_time"] 1202 # Store the total time spent by copying 1203 stat["copy_time"] = total_seconds(copy_end - copy_start) 1204 stat["serialized_copy_time"] = total_seconds(serialized_time) 1205 1206 return stat 1207