1# -*- coding: utf-8 -*-
2# © Copyright EnterpriseDB UK Limited 2011-2021
3#
4# This file is part of Barman.
5#
6# Barman is free software: you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation, either version 3 of the License, or
9# (at your option) any later version.
10#
11# Barman is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with Barman.  If not, see <http://www.gnu.org/licenses/>.
18
19"""
20Copy controller module
21
22A copy controller will handle the copy between a series of files and directory,
23and their final destination.
24"""
25
26import collections
27import datetime
28import logging
29import os.path
30import re
31import shutil
32import signal
33import tempfile
34from functools import partial
35from multiprocessing import Lock, Pool
36
37import dateutil.tz
38
39from barman.command_wrappers import RsyncPgData
40from barman.exceptions import CommandFailedException, RsyncListFilesFailure
41from barman.utils import human_readable_timedelta, total_seconds
42
43_logger = logging.getLogger(__name__)
44
45_worker_callable = None
46"""
47Global variable containing a callable used to execute the jobs.
48Initialized by `_init_worker` and used by `_run_worker` function.
49This variable must be None outside a multiprocessing worker Process.
50"""
51
52# Parallel copy bucket size (10GB)
53BUCKET_SIZE = 1024 * 1024 * 1024 * 10
54
55
56def _init_worker(func):
57    """
58    Store the callable used to execute jobs passed to `_run_worker` function
59
60    :param callable func: the callable to invoke for every job
61    """
62    global _worker_callable
63    _worker_callable = func
64
65
66def _run_worker(job):
67    """
68    Execute a job using the callable set using `_init_worker` function
69
70    :param _RsyncJob job: the job to be executed
71    """
72    global _worker_callable
73    assert (
74        _worker_callable is not None
75    ), "Worker has not been initialized with `_init_worker`"
76
77    # This is the entrypoint of the worker process. Since the KeyboardInterrupt
78    # exceptions is handled by the main process, let's forget about Ctrl-C
79    # here.
80    # When the parent process will receive a KeyboardInterrupt, it will ask
81    # the pool to terminate its workers and then terminate itself.
82    signal.signal(signal.SIGINT, signal.SIG_IGN)
83    return _worker_callable(job)
84
85
86class _RsyncJob(object):
87    """
88    A job to be executed by a worker Process
89    """
90
91    def __init__(self, item_idx, description, id=None, file_list=None, checksum=None):
92        """
93        :param int item_idx: The index of copy item containing this job
94        :param str description: The description of the job, used for logging
95        :param int id: Job ID (as in bucket)
96        :param list[RsyncCopyController._FileItem] file_list: Path to the file
97            containing the file list
98        :param bool checksum: Whether to force the checksum verification
99        """
100        self.id = id
101        self.item_idx = item_idx
102        self.description = description
103        self.file_list = file_list
104        self.checksum = checksum
105
106        # Statistics
107        self.copy_start_time = None
108        self.copy_end_time = None
109
110
111class _FileItem(collections.namedtuple("_FileItem", "mode size date path")):
112    """
113    This named tuple is used to store the content each line of the output
114    of a "rsync --list-only" call
115    """
116
117
118class _RsyncCopyItem(object):
119    """
120    Internal data object that contains the information about one of the items
121    that have to be copied during a RsyncCopyController run.
122    """
123
124    def __init__(
125        self,
126        label,
127        src,
128        dst,
129        exclude=None,
130        exclude_and_protect=None,
131        include=None,
132        is_directory=False,
133        bwlimit=None,
134        reuse=None,
135        item_class=None,
136        optional=False,
137    ):
138        """
139        The "label" parameter is meant to be used for error messages
140        and logging.
141
142        If "src" or "dst" content begin with a ':' character, it is a remote
143        path. Only local paths are supported in "reuse" argument.
144
145        If "reuse" parameter is provided and is not None, it is used to
146        implement the incremental copy. This only works if "is_directory" is
147        True
148
149        :param str label: a symbolic name for this item
150        :param str src: source directory.
151        :param str dst: destination directory.
152        :param list[str] exclude: list of patterns to be excluded from the
153            copy. The destination will be deleted if present.
154        :param list[str] exclude_and_protect: list of patterns to be excluded
155            from the copy. The destination will be preserved if present.
156        :param list[str] include: list of patterns to be included in the
157            copy even if excluded.
158        :param bool is_directory: Whether the item points to a directory.
159        :param bwlimit: bandwidth limit to be enforced. (KiB)
160        :param str|None reuse: the reference path for incremental mode.
161        :param str|None item_class: If specified carries a meta information
162            about what the object to be copied is.
163        :param bool optional: Whether a failure copying this object should be
164            treated as a fatal failure. This only works if "is_directory" is
165            False
166        """
167        self.label = label
168        self.src = src
169        self.dst = dst
170        self.exclude = exclude
171        self.exclude_and_protect = exclude_and_protect
172        self.include = include
173        self.is_directory = is_directory
174        self.bwlimit = bwlimit
175        self.reuse = reuse
176        self.item_class = item_class
177        self.optional = optional
178
179        # Attributes that will e filled during the analysis
180        self.temp_dir = None
181        self.dir_file = None
182        self.exclude_and_protect_file = None
183        self.safe_list = None
184        self.check_list = None
185
186        # Statistics
187        self.analysis_start_time = None
188        self.analysis_end_time = None
189
190        # Ensure that the user specified the item class, since it is mandatory
191        # to correctly handle the item
192        assert self.item_class
193
194    def __str__(self):
195        # Prepare strings for messages
196        formatted_class = self.item_class
197        formatted_name = self.src
198        if self.src.startswith(":"):
199            formatted_class = "remote " + self.item_class
200            formatted_name = self.src[1:]
201        formatted_class += " directory" if self.is_directory else " file"
202
203        # Log the operation that is being executed
204        if self.item_class in (
205            RsyncCopyController.PGDATA_CLASS,
206            RsyncCopyController.PGCONTROL_CLASS,
207        ):
208            return "%s: %s" % (formatted_class, formatted_name)
209        else:
210            return "%s '%s': %s" % (formatted_class, self.label, formatted_name)
211
212
213class RsyncCopyController(object):
214    """
215    Copy a list of files and directory to their final destination.
216    """
217
218    # Constants to be used as "item_class" values
219    PGDATA_CLASS = "PGDATA"
220    TABLESPACE_CLASS = "tablespace"
221    PGCONTROL_CLASS = "pg_control"
222    CONFIG_CLASS = "config"
223
224    # This regular expression is used to parse each line of the output
225    # of a "rsync --list-only" call. This regexp has been tested with any known
226    # version of upstream rsync that is supported (>= 3.0.4)
227    LIST_ONLY_RE = re.compile(
228        r"""
229        ^ # start of the line
230
231        # capture the mode (es. "-rw-------")
232        (?P<mode>[-\w]+)
233        \s+
234
235        # size is an integer
236        (?P<size>\d+)
237        \s+
238
239        # The date field can have two different form
240        (?P<date>
241            # "2014/06/05 18:00:00" if the sending rsync is compiled
242            # with HAVE_STRFTIME
243            [\d/]+\s+[\d:]+
244        |
245            # "Thu Jun  5 18:00:00 2014" otherwise
246            \w+\s+\w+\s+\d+\s+[\d:]+\s+\d+
247        )
248        \s+
249
250        # all the remaining characters are part of filename
251        (?P<path>.+)
252
253        $ # end of the line
254    """,
255        re.VERBOSE,
256    )
257
258    # This regular expression is used to ignore error messages regarding
259    # vanished files that are not really an error. It is used because
260    # in some cases rsync reports it with exit code 23 which could also mean
261    # a fatal error
262    VANISHED_RE = re.compile(
263        r"""
264        ^ # start of the line
265        (
266        # files which vanished before rsync start
267        rsync:\ link_stat\ ".+"\ failed:\ No\ such\ file\ or\ directory\ \(2\)
268        |
269        # files which vanished after rsync start
270        file\ has\ vanished:\ ".+"
271        |
272        # files which have been truncated during transfer
273        rsync:\ read\ errors\ mapping\ ".+":\ No\ data\ available\ \(61\)
274        |
275        # final summary
276        rsync\ error:\ .* \(code\ 23\)\ at\ main\.c\(\d+\)
277            \ \[(generator|receiver|sender)=[^\]]+\]
278        )
279        $ # end of the line
280    """,
281        re.VERBOSE + re.IGNORECASE,
282    )
283
284    def __init__(
285        self,
286        path=None,
287        ssh_command=None,
288        ssh_options=None,
289        network_compression=False,
290        reuse_backup=None,
291        safe_horizon=None,
292        exclude=None,
293        retry_times=0,
294        retry_sleep=0,
295        workers=1,
296    ):
297        """
298        :param str|None path: the PATH where rsync executable will be searched
299        :param str|None ssh_command: the ssh executable to be used
300            to access remote paths
301        :param list[str]|None ssh_options: list of ssh options to be used
302            to access remote paths
303        :param boolean network_compression: whether to use the network
304            compression
305        :param str|None reuse_backup: if "link" or "copy" enables
306            the incremental copy feature
307        :param datetime.datetime|None safe_horizon: if set, assumes that every
308            files older than it are save to copy without checksum verification.
309        :param list[str]|None exclude: list of patterns to be excluded
310            from the copy
311        :param int retry_times: The number of times to retry a failed operation
312        :param int retry_sleep: Sleep time between two retry
313        :param int workers: The number of parallel copy workers
314        """
315
316        super(RsyncCopyController, self).__init__()
317        self.path = path
318        self.ssh_command = ssh_command
319        self.ssh_options = ssh_options
320        self.network_compression = network_compression
321        self.reuse_backup = reuse_backup
322        self.safe_horizon = safe_horizon
323        self.exclude = exclude
324        self.retry_times = retry_times
325        self.retry_sleep = retry_sleep
326        self.workers = workers
327
328        self._logger_lock = Lock()
329
330        # Assume we are running with a recent rsync (>= 3.1)
331        self.rsync_has_ignore_missing_args = True
332
333        self.item_list = []
334        """List of items to be copied"""
335
336        self.rsync_cache = {}
337        """A cache of RsyncPgData objects"""
338
339        # Attributes used for progress reporting
340
341        self.total_steps = None
342        """Total number of steps"""
343
344        self.current_step = None
345        """Current step number"""
346
347        self.temp_dir = None
348        """Temp dir used to store the status during the copy"""
349
350        # Statistics
351
352        self.jobs_done = None
353        """Already finished jobs list"""
354
355        self.copy_start_time = None
356        """Copy start time"""
357
358        self.copy_end_time = None
359        """Copy end time"""
360
361    def add_directory(
362        self,
363        label,
364        src,
365        dst,
366        exclude=None,
367        exclude_and_protect=None,
368        include=None,
369        bwlimit=None,
370        reuse=None,
371        item_class=None,
372    ):
373        """
374        Add a directory that we want to copy.
375
376        If "src" or "dst" content begin with a ':' character, it is a remote
377        path. Only local paths are supported in "reuse" argument.
378
379        If "reuse" parameter is provided and is not None, it is used to
380        implement the incremental copy. This only works if "is_directory" is
381        True
382
383        :param str label: symbolic name to be used for error messages
384            and logging.
385        :param str src: source directory.
386        :param str dst: destination directory.
387        :param list[str] exclude: list of patterns to be excluded from the
388            copy. The destination will be deleted if present.
389        :param list[str] exclude_and_protect: list of patterns to be excluded
390            from the copy. The destination will be preserved if present.
391        :param list[str] include: list of patterns to be included in the
392            copy even if excluded.
393        :param bwlimit: bandwidth limit to be enforced. (KiB)
394        :param str|None reuse: the reference path for incremental mode.
395        :param str item_class: If specified carries a meta information about
396            what the object to be copied is.
397        """
398        self.item_list.append(
399            _RsyncCopyItem(
400                label=label,
401                src=src,
402                dst=dst,
403                is_directory=True,
404                bwlimit=bwlimit,
405                reuse=reuse,
406                item_class=item_class,
407                optional=False,
408                exclude=exclude,
409                exclude_and_protect=exclude_and_protect,
410                include=include,
411            )
412        )
413
414    def add_file(self, label, src, dst, item_class=None, optional=False):
415        """
416        Add a file that we want to copy
417
418        :param str label: symbolic name to be used for error messages
419            and logging.
420        :param str src: source directory.
421        :param str dst: destination directory.
422        :param str item_class: If specified carries a meta information about
423            what the object to be copied is.
424        :param bool optional: Whether a failure copying this object should be
425            treated as a fatal failure.
426        """
427        self.item_list.append(
428            _RsyncCopyItem(
429                label=label,
430                src=src,
431                dst=dst,
432                is_directory=False,
433                bwlimit=None,
434                reuse=None,
435                item_class=item_class,
436                optional=optional,
437            )
438        )
439
440    def _rsync_factory(self, item):
441        """
442        Build the RsyncPgData object required for copying the provided item
443
444        :param _RsyncCopyItem item: information about a copy operation
445        :rtype: RsyncPgData
446        """
447        # If the object already exists, use it
448        if item in self.rsync_cache:
449            return self.rsync_cache[item]
450
451        # Prepare the command arguments
452        args = self._reuse_args(item.reuse)
453        # Merge the global exclude with the one into the item object
454        if self.exclude and item.exclude:
455            exclude = self.exclude + item.exclude
456        else:
457            exclude = self.exclude or item.exclude
458
459        # Using `--ignore-missing-args` could fail in case
460        # the local or the remote rsync is older than 3.1.
461        # In that case we expect that during the analyze phase
462        # we get an error. The analyze code must catch that error
463        # and retry after flushing the rsync cache.
464        if self.rsync_has_ignore_missing_args:
465            args.append("--ignore-missing-args")
466
467        # TODO: remove debug output or use it to progress tracking
468        # By adding a double '--itemize-changes' option, the rsync
469        # output will contain the full list of files that have been
470        # touched, even those that have not changed
471        args.append("--itemize-changes")
472        args.append("--itemize-changes")
473
474        # Build the rsync object that will execute the copy
475        rsync = RsyncPgData(
476            path=self.path,
477            ssh=self.ssh_command,
478            ssh_options=self.ssh_options,
479            args=args,
480            bwlimit=item.bwlimit,
481            network_compression=self.network_compression,
482            exclude=exclude,
483            exclude_and_protect=item.exclude_and_protect,
484            include=item.include,
485            retry_times=self.retry_times,
486            retry_sleep=self.retry_sleep,
487            retry_handler=partial(self._retry_handler, item),
488        )
489        self.rsync_cache[item] = rsync
490        return rsync
491
492    def _rsync_set_pre_31_mode(self):
493        """
494        Stop using `--ignore-missing-args` and restore rsync < 3.1
495        compatibility
496        """
497        _logger.info(
498            "Detected rsync version less than 3.1. "
499            "top using '--ignore-missing-args' argument."
500        )
501        self.rsync_has_ignore_missing_args = False
502        self.rsync_cache.clear()
503
504    def copy(self):
505        """
506        Execute the actual copy
507        """
508        # Store the start time
509        self.copy_start_time = datetime.datetime.now()
510
511        # Create a temporary directory to hold the file lists.
512        self.temp_dir = tempfile.mkdtemp(suffix="", prefix="barman-")
513        # The following try block is to make sure the temporary directory
514        # will be removed on exit and all the pool workers
515        # have been terminated.
516        pool = None
517        try:
518            # Initialize the counters used by progress reporting
519            self._progress_init()
520            _logger.info("Copy started (safe before %r)", self.safe_horizon)
521
522            # Execute some preliminary steps for each item to be copied
523            for item in self.item_list:
524
525                # The initial preparation is necessary only for directories
526                if not item.is_directory:
527                    continue
528
529                # Store the analysis start time
530                item.analysis_start_time = datetime.datetime.now()
531
532                # Analyze the source and destination directory content
533                _logger.info(self._progress_message("[global] analyze %s" % item))
534                self._analyze_directory(item)
535
536                # Prepare the target directories, removing any unneeded file
537                _logger.info(
538                    self._progress_message(
539                        "[global] create destination directories and delete "
540                        "unknown files for %s" % item
541                    )
542                )
543                self._create_dir_and_purge(item)
544
545                # Store the analysis end time
546                item.analysis_end_time = datetime.datetime.now()
547
548            # Init the list of jobs done. Every job will be added to this list
549            # once finished. The content will be used to calculate statistics
550            # about the copy process.
551            self.jobs_done = []
552
553            # The jobs are executed using a parallel processes pool
554            # Each job is generated by `self._job_generator`, it is executed by
555            # `_run_worker` using `self._execute_job`, which has been set
556            # calling `_init_worker` function during the Pool initialization.
557            pool = Pool(
558                processes=self.workers,
559                initializer=_init_worker,
560                initargs=(self._execute_job,),
561            )
562            for job in pool.imap_unordered(
563                _run_worker, self._job_generator(exclude_classes=[self.PGCONTROL_CLASS])
564            ):
565                # Store the finished job for further analysis
566                self.jobs_done.append(job)
567
568            # The PGCONTROL_CLASS items must always be copied last
569            for job in pool.imap_unordered(
570                _run_worker, self._job_generator(include_classes=[self.PGCONTROL_CLASS])
571            ):
572                # Store the finished job for further analysis
573                self.jobs_done.append(job)
574
575        except KeyboardInterrupt:
576            _logger.info(
577                "Copy interrupted by the user (safe before %s)", self.safe_horizon
578            )
579            raise
580        except BaseException:
581            _logger.info("Copy failed (safe before %s)", self.safe_horizon)
582            raise
583        else:
584            _logger.info("Copy finished (safe before %s)", self.safe_horizon)
585        finally:
586            # The parent process may have finished naturally or have been
587            # interrupted by an exception (i.e. due to a copy error or
588            # the user pressing Ctrl-C).
589            # At this point we must make sure that all the workers have been
590            # correctly terminated before continuing.
591            if pool:
592                pool.terminate()
593                pool.join()
594            # Clean up the temp dir, any exception raised here is logged
595            # and discarded to not clobber an eventual exception being handled.
596            try:
597                shutil.rmtree(self.temp_dir)
598            except EnvironmentError as e:
599                _logger.error("Error cleaning up '%s' (%s)", self.temp_dir, e)
600            self.temp_dir = None
601
602            # Store the end time
603            self.copy_end_time = datetime.datetime.now()
604
605    def _job_generator(self, include_classes=None, exclude_classes=None):
606        """
607        Generate the jobs to be executed by the workers
608
609        :param list[str]|None include_classes: If not none, copy only the items
610            which have one of the specified classes.
611        :param list[str]|None exclude_classes: If not none, skip all items
612            which have one of the specified classes.
613        :rtype: iter[_RsyncJob]
614        """
615        for item_idx, item in enumerate(self.item_list):
616
617            # Skip items of classes which are not required
618            if include_classes and item.item_class not in include_classes:
619                continue
620            if exclude_classes and item.item_class in exclude_classes:
621                continue
622
623            # If the item is a directory then copy it in two stages,
624            # otherwise copy it using a plain rsync
625            if item.is_directory:
626
627                # Copy the safe files using the default rsync algorithm
628                msg = self._progress_message("[%%s] %%s copy safe files from %s" % item)
629                phase_skipped = True
630                for i, bucket in enumerate(self._fill_buckets(item.safe_list)):
631                    phase_skipped = False
632                    yield _RsyncJob(
633                        item_idx,
634                        id=i,
635                        description=msg,
636                        file_list=bucket,
637                        checksum=False,
638                    )
639                if phase_skipped:
640                    _logger.info(msg, "global", "skipping")
641
642                # Copy the check files forcing rsync to verify the checksum
643                msg = self._progress_message(
644                    "[%%s] %%s copy files with checksum from %s" % item
645                )
646                phase_skipped = True
647                for i, bucket in enumerate(self._fill_buckets(item.check_list)):
648                    phase_skipped = False
649                    yield _RsyncJob(
650                        item_idx, id=i, description=msg, file_list=bucket, checksum=True
651                    )
652                if phase_skipped:
653                    _logger.info(msg, "global", "skipping")
654
655            else:
656                # Copy the file using plain rsync
657                msg = self._progress_message("[%%s] %%s copy %s" % item)
658                yield _RsyncJob(item_idx, description=msg)
659
660    def _fill_buckets(self, file_list):
661        """
662        Generate buckets for parallel copy
663
664        :param list[_FileItem] file_list: list of file to transfer
665        :rtype: iter[list[_FileItem]]
666        """
667        # If there is only one worker, fall back to copying all file at once
668        if self.workers < 2:
669            yield file_list
670            return
671
672        # Create `self.workers` buckets
673        buckets = [[] for _ in range(self.workers)]
674        bucket_sizes = [0 for _ in range(self.workers)]
675        pos = -1
676        # Sort the list by size
677        for entry in sorted(file_list, key=lambda item: item.size):
678            # Try to fill the file in a bucket
679            for i in range(self.workers):
680                pos = (pos + 1) % self.workers
681                new_size = bucket_sizes[pos] + entry.size
682                if new_size < BUCKET_SIZE:
683                    bucket_sizes[pos] = new_size
684                    buckets[pos].append(entry)
685                    break
686            else:
687                # All the buckets are filled, so return them all
688                for i in range(self.workers):
689                    if len(buckets[i]) > 0:
690                        yield buckets[i]
691                    # Clear the bucket
692                    buckets[i] = []
693                    bucket_sizes[i] = 0
694                # Put the current file in the first bucket
695                bucket_sizes[0] = entry.size
696                buckets[0].append(entry)
697                pos = 0
698        # Send all the remaining buckets
699        for i in range(self.workers):
700            if len(buckets[i]) > 0:
701                yield buckets[i]
702
703    def _execute_job(self, job):
704        """
705        Execute a `_RsyncJob` in a worker process
706
707        :type job: _RsyncJob
708        """
709        item = self.item_list[job.item_idx]
710        if job.id is not None:
711            bucket = "bucket %s" % job.id
712        else:
713            bucket = "global"
714        # Build the rsync object required for the copy
715        rsync = self._rsync_factory(item)
716        # Store the start time
717        job.copy_start_time = datetime.datetime.now()
718        # Write in the log that the job is starting
719        with self._logger_lock:
720            _logger.info(job.description, bucket, "starting")
721        if item.is_directory:
722            # A directory item must always have checksum and file_list set
723            assert (
724                job.file_list is not None
725            ), "A directory item must not have a None `file_list` attribute"
726            assert (
727                job.checksum is not None
728            ), "A directory item must not have a None `checksum` attribute"
729
730            # Generate a unique name for the file containing the list of files
731            file_list_path = os.path.join(
732                self.temp_dir,
733                "%s_%s_%s.list"
734                % (item.label, "check" if job.checksum else "safe", os.getpid()),
735            )
736
737            # Write the list, one path per line
738            with open(file_list_path, "w") as file_list:
739                for entry in job.file_list:
740                    assert isinstance(entry, _FileItem), (
741                        "expect %r to be a _FileItem" % entry
742                    )
743                    file_list.write(entry.path + "\n")
744
745            self._copy(
746                rsync,
747                item.src,
748                item.dst,
749                file_list=file_list_path,
750                checksum=job.checksum,
751            )
752        else:
753            # A file must never have checksum and file_list set
754            assert (
755                job.file_list is None
756            ), "A file item must have a None `file_list` attribute"
757            assert (
758                job.checksum is None
759            ), "A file item must have a None `checksum` attribute"
760            rsync(item.src, item.dst, allowed_retval=(0, 23, 24))
761            if rsync.ret == 23:
762                if item.optional:
763                    _logger.warning("Ignoring error reading %s", item)
764                else:
765                    raise CommandFailedException(
766                        dict(ret=rsync.ret, out=rsync.out, err=rsync.err)
767                    )
768        # Store the stop time
769        job.copy_end_time = datetime.datetime.now()
770        # Write in the log that the job is finished
771        with self._logger_lock:
772            _logger.info(
773                job.description,
774                bucket,
775                "finished (duration: %s)"
776                % human_readable_timedelta(job.copy_end_time - job.copy_start_time),
777            )
778        # Return the job to the caller, for statistics purpose
779        return job
780
781    def _progress_init(self):
782        """
783        Init counters used by progress logging
784        """
785        self.total_steps = 0
786        for item in self.item_list:
787            # Directories require 4 steps, files only one
788            if item.is_directory:
789                self.total_steps += 4
790            else:
791                self.total_steps += 1
792        self.current_step = 0
793
794    def _progress_message(self, msg):
795        """
796        Log a message containing the progress
797
798        :param str msg: the message
799        :return srt: message to log
800        """
801        self.current_step += 1
802        return "Copy step %s of %s: %s" % (self.current_step, self.total_steps, msg)
803
804    def _reuse_args(self, reuse_directory):
805        """
806        If reuse_backup is 'copy' or 'link', build the rsync option to enable
807        the reuse, otherwise returns an empty list
808
809        :param str reuse_directory: the local path with data to be reused
810        :rtype: list[str]
811        """
812        if self.reuse_backup in ("copy", "link") and reuse_directory is not None:
813            return ["--%s-dest=%s" % (self.reuse_backup, reuse_directory)]
814        else:
815            return []
816
817    def _retry_handler(self, item, command, args, kwargs, attempt, exc):
818        """
819
820        :param _RsyncCopyItem item: The item that is being processed
821        :param RsyncPgData command: Command object being executed
822        :param list args: command args
823        :param dict kwargs: command kwargs
824        :param int attempt: attempt number (starting from 0)
825        :param CommandFailedException exc: the exception which caused the
826            failure
827        """
828        _logger.warn("Failure executing rsync on %s (attempt %s)", item, attempt)
829        _logger.warn("Retrying in %s seconds", self.retry_sleep)
830
831    def _analyze_directory(self, item):
832        """
833        Analyzes the status of source and destination directories identifying
834        the files that are safe from the point of view of a PostgreSQL backup.
835
836        The safe_horizon value is the timestamp of the beginning of the
837        older backup involved in copy (as source or destination). Any files
838        updated after that timestamp, must be checked as they could have been
839        modified during the backup - and we do not reply WAL files to update
840        them.
841
842        The destination directory must exist.
843
844        If the "safe_horizon" parameter is None, we cannot make any
845        assumptions about what can be considered "safe", so we must check
846        everything with checksums enabled.
847
848        If "ref" parameter is provided and is not None, it is looked up
849        instead of the "dst" dir. This is useful when we are copying files
850        using '--link-dest' and '--copy-dest' rsync options.
851        In this case, both the "dst" and "ref" dir must exist and
852        the "dst" dir must be empty.
853
854        If source or destination path begin with a ':' character,
855        it is a remote path. Only local paths are supported in "ref" argument.
856
857        :param _RsyncCopyItem item: information about a copy operation
858        """
859
860        # If reference is not set we use dst as reference path
861        ref = item.reuse
862        if ref is None:
863            ref = item.dst
864
865        # Make sure the ref path ends with a '/' or rsync will add the
866        # last path component to all the returned items during listing
867        if ref[-1] != "/":
868            ref += "/"
869
870        # Build a hash containing all files present on reference directory.
871        # Directories are not included
872        try:
873            ref_hash = {}
874            ref_has_content = False
875            for file_item in self._list_files(item, ref):
876                if file_item.path != "." and not (
877                    item.label == "pgdata" and file_item.path == "pg_tblspc"
878                ):
879                    ref_has_content = True
880                if file_item.mode[0] != "d":
881                    ref_hash[file_item.path] = file_item
882        except (CommandFailedException, RsyncListFilesFailure) as e:
883            # Here we set ref_hash to None, thus disable the code that marks as
884            # "safe matching" those destination files with different time or
885            # size, even if newer than "safe_horizon". As a result, all files
886            # newer than "safe_horizon" will be checked through checksums.
887            ref_hash = None
888            _logger.error(
889                "Unable to retrieve reference directory file list. "
890                "Using only source file information to decide which files"
891                " need to be copied with checksums enabled: %s" % e
892            )
893
894        # The 'dir.list' file will contain every directory in the
895        # source tree
896        item.dir_file = os.path.join(self.temp_dir, "%s_dir.list" % item.label)
897        dir_list = open(item.dir_file, "w+")
898        # The 'protect.list' file will contain a filter rule to protect
899        # each file present in the source tree. It will be used during
900        # the first phase to delete all the extra files on destination.
901        item.exclude_and_protect_file = os.path.join(
902            self.temp_dir, "%s_exclude_and_protect.filter" % item.label
903        )
904        exclude_and_protect_filter = open(item.exclude_and_protect_file, "w+")
905
906        if not ref_has_content:
907            # If the destination directory is empty then include all
908            # directories and exclude all files. This stops the rsync
909            # command which runs during the _create_dir_and_purge function
910            # from copying the entire contents of the source directory and
911            # ensures it only creates the directories.
912            exclude_and_protect_filter.write("+ */\n")
913            exclude_and_protect_filter.write("- *\n")
914
915        # The `safe_list` will contain all items older than
916        # safe_horizon, as well as files that we know rsync will
917        # check anyway due to a difference in mtime or size
918        item.safe_list = []
919        # The `check_list` will contain all items that need
920        # to be copied with checksum option enabled
921        item.check_list = []
922        for entry in self._list_files(item, item.src):
923            # If item is a directory, we only need to save it in 'dir.list'
924            if entry.mode[0] == "d":
925                dir_list.write(entry.path + "\n")
926                continue
927
928            # Add every file in the source path to the list of files
929            # to be protected from deletion ('exclude_and_protect.filter')
930            # But only if we know the destination directory is non-empty
931            if ref_has_content:
932                exclude_and_protect_filter.write("P /" + entry.path + "\n")
933                exclude_and_protect_filter.write("- /" + entry.path + "\n")
934
935            # If source item is older than safe_horizon,
936            # add it to 'safe.list'
937            if self.safe_horizon and entry.date < self.safe_horizon:
938                item.safe_list.append(entry)
939                continue
940
941            # If ref_hash is None, it means we failed to retrieve the
942            # destination file list. We assume the only safe way is to
943            # check every file that is older than safe_horizon
944            if ref_hash is None:
945                item.check_list.append(entry)
946                continue
947
948            # If source file differs by time or size from the matching
949            # destination, rsync will discover the difference in any case.
950            # It is then safe to skip checksum check here.
951            dst_item = ref_hash.get(entry.path, None)
952            if dst_item is None:
953                item.safe_list.append(entry)
954                continue
955
956            different_size = dst_item.size != entry.size
957            different_date = dst_item.date != entry.date
958            if different_size or different_date:
959                item.safe_list.append(entry)
960                continue
961
962            # All remaining files must be checked with checksums enabled
963            item.check_list.append(entry)
964
965        # Close all the control files
966        dir_list.close()
967        exclude_and_protect_filter.close()
968
969    def _create_dir_and_purge(self, item):
970        """
971        Create destination directories and delete any unknown file
972
973        :param _RsyncCopyItem item: information about a copy operation
974        """
975
976        # Build the rsync object required for the analysis
977        rsync = self._rsync_factory(item)
978
979        # Create directories and delete any unknown file
980        self._rsync_ignore_vanished_files(
981            rsync,
982            "--recursive",
983            "--delete",
984            "--files-from=%s" % item.dir_file,
985            "--filter",
986            "merge %s" % item.exclude_and_protect_file,
987            item.src,
988            item.dst,
989            check=True,
990        )
991
992    def _copy(self, rsync, src, dst, file_list, checksum=False):
993        """
994        The method execute the call to rsync, using as source a
995        a list of files, and adding the the checksum option if required by the
996        caller.
997
998        :param Rsync rsync: the Rsync object used to retrieve the list of files
999            inside the directories
1000            for copy purposes
1001        :param str src: source directory
1002        :param str dst: destination directory
1003        :param str file_list: path to the file containing the sources for rsync
1004        :param bool checksum: if checksum argument for rsync is required
1005        """
1006        # Build the rsync call args
1007        args = ["--files-from=%s" % file_list]
1008        if checksum:
1009            # Add checksum option if needed
1010            args.append("--checksum")
1011        self._rsync_ignore_vanished_files(rsync, src, dst, *args, check=True)
1012
1013    def _list_files(self, item, path):
1014        """
1015        This method recursively retrieves a list of files contained in a
1016        directory, either local or remote (if starts with ':')
1017
1018        :param _RsyncCopyItem item: information about a copy operation
1019        :param str path: the path we want to inspect
1020        :except CommandFailedException: if rsync call fails
1021        :except RsyncListFilesFailure: if rsync output can't be parsed
1022        """
1023        _logger.debug("list_files: %r", path)
1024
1025        # Build the rsync object required for the analysis
1026        rsync = self._rsync_factory(item)
1027
1028        try:
1029            # Use the --no-human-readable option to avoid digit groupings
1030            # in "size" field with rsync >= 3.1.0.
1031            # Ref: http://ftp.samba.org/pub/rsync/src/rsync-3.1.0-NEWS
1032            rsync.get_output(
1033                "--no-human-readable", "--list-only", "-r", path, check=True
1034            )
1035        except CommandFailedException:
1036            # This could fail due to the local or the remote rsync
1037            # older than 3.1. IF so, fallback to pre 3.1 mode
1038            if self.rsync_has_ignore_missing_args and rsync.ret in (
1039                12,  # Error in rsync protocol data stream (remote)
1040                1,
1041            ):  # Syntax or usage error (local)
1042                self._rsync_set_pre_31_mode()
1043                # Recursive call, uses the compatibility mode
1044                for item in self._list_files(item, path):
1045                    yield item
1046                return
1047            else:
1048                raise
1049
1050        # Cache tzlocal object we need to build dates
1051        tzinfo = dateutil.tz.tzlocal()
1052        for line in rsync.out.splitlines():
1053            line = line.rstrip()
1054            match = self.LIST_ONLY_RE.match(line)
1055            if match:
1056                mode = match.group("mode")
1057                # no exceptions here: the regexp forces 'size' to be an integer
1058                size = int(match.group("size"))
1059                try:
1060                    date_str = match.group("date")
1061                    # The date format has been validated by LIST_ONLY_RE.
1062                    # Use "2014/06/05 18:00:00" format if the sending rsync
1063                    # is compiled with HAVE_STRFTIME, otherwise use
1064                    # "Thu Jun  5 18:00:00 2014" format
1065                    if date_str[0].isdigit():
1066                        date = datetime.datetime.strptime(date_str, "%Y/%m/%d %H:%M:%S")
1067                    else:
1068                        date = datetime.datetime.strptime(
1069                            date_str, "%a %b %d %H:%M:%S %Y"
1070                        )
1071                    date = date.replace(tzinfo=tzinfo)
1072                except (TypeError, ValueError):
1073                    # This should not happen, due to the regexp
1074                    msg = (
1075                        "Unable to parse rsync --list-only output line "
1076                        "(date): '%s'" % line
1077                    )
1078                    _logger.exception(msg)
1079                    raise RsyncListFilesFailure(msg)
1080                path = match.group("path")
1081                yield _FileItem(mode, size, date, path)
1082            else:
1083                # This is a hard error, as we are unable to parse the output
1084                # of rsync. It can only happen with a modified or unknown
1085                # rsync version (perhaps newer than 3.1?)
1086                msg = "Unable to parse rsync --list-only output line: '%s'" % line
1087                _logger.error(msg)
1088                raise RsyncListFilesFailure(msg)
1089
1090    def _rsync_ignore_vanished_files(self, rsync, *args, **kwargs):
1091        """
1092        Wrap an Rsync.get_output() call and ignore missing args
1093
1094        TODO: when rsync 3.1 will be widespread, replace this
1095            with --ignore-missing-args argument
1096
1097        :param Rsync rsync: the Rsync object used to execute the copy
1098        """
1099        kwargs["allowed_retval"] = (0, 23, 24)
1100        rsync.get_output(*args, **kwargs)
1101        # If return code is 23 and there is any error which doesn't match
1102        # the VANISHED_RE regexp raise an error
1103        if rsync.ret == 23 and rsync.err is not None:
1104            for line in rsync.err.splitlines():
1105                match = self.VANISHED_RE.match(line.rstrip())
1106                if match:
1107                    continue
1108                else:
1109                    _logger.error("First rsync error line: %s", line)
1110                    raise CommandFailedException(
1111                        dict(ret=rsync.ret, out=rsync.out, err=rsync.err)
1112                    )
1113        return rsync.out, rsync.err
1114
1115    def statistics(self):
1116        """
1117        Return statistics about the copy object.
1118
1119        :rtype: dict
1120        """
1121        # This method can only run at the end of a non empty copy
1122        assert self.copy_end_time
1123        assert self.item_list
1124        assert self.jobs_done
1125
1126        # Initialise the result calculating the total runtime
1127        stat = {
1128            "total_time": total_seconds(self.copy_end_time - self.copy_start_time),
1129            "number_of_workers": self.workers,
1130            "analysis_time_per_item": {},
1131            "copy_time_per_item": {},
1132            "serialized_copy_time_per_item": {},
1133        }
1134
1135        # Calculate the time spent during the analysis of the items
1136        analysis_start = None
1137        analysis_end = None
1138        for item in self.item_list:
1139            # Some items don't require analysis
1140            if not item.analysis_end_time:
1141                continue
1142            # Build a human readable name to refer to an item in the output
1143            ident = item.label
1144            if not analysis_start:
1145                analysis_start = item.analysis_start_time
1146            elif analysis_start > item.analysis_start_time:
1147                analysis_start = item.analysis_start_time
1148
1149            if not analysis_end:
1150                analysis_end = item.analysis_end_time
1151            elif analysis_end < item.analysis_end_time:
1152                analysis_end = item.analysis_end_time
1153
1154            stat["analysis_time_per_item"][ident] = total_seconds(
1155                item.analysis_end_time - item.analysis_start_time
1156            )
1157        stat["analysis_time"] = total_seconds(analysis_end - analysis_start)
1158
1159        # Calculate the time spent per job
1160        # WARNING: this code assumes that every item is copied separately,
1161        # so it's strictly tied to the `_job_generator` method code
1162        item_data = {}
1163        for job in self.jobs_done:
1164            # WARNING: the item contained in the job is not the same object
1165            # contained in self.item_list, as it has gone through two
1166            # pickling/unpickling cycle
1167            # Build a human readable name to refer to an item in the output
1168            ident = self.item_list[job.item_idx].label
1169            # If this is the first time we see this item we just store the
1170            # values from the job
1171            if ident not in item_data:
1172                item_data[ident] = {
1173                    "start": job.copy_start_time,
1174                    "end": job.copy_end_time,
1175                    "total_time": job.copy_end_time - job.copy_start_time,
1176                }
1177            else:
1178                data = item_data[ident]
1179                if data["start"] > job.copy_start_time:
1180                    data["start"] = job.copy_start_time
1181                if data["end"] < job.copy_end_time:
1182                    data["end"] = job.copy_end_time
1183                data["total_time"] += job.copy_end_time - job.copy_start_time
1184
1185        # Calculate the time spent copying
1186        copy_start = None
1187        copy_end = None
1188        serialized_time = datetime.timedelta(0)
1189        for ident in item_data:
1190            data = item_data[ident]
1191            if copy_start is None or copy_start > data["start"]:
1192                copy_start = data["start"]
1193            if copy_end is None or copy_end < data["end"]:
1194                copy_end = data["end"]
1195            stat["copy_time_per_item"][ident] = total_seconds(
1196                data["end"] - data["start"]
1197            )
1198            stat["serialized_copy_time_per_item"][ident] = total_seconds(
1199                data["total_time"]
1200            )
1201            serialized_time += data["total_time"]
1202        # Store the total time spent by copying
1203        stat["copy_time"] = total_seconds(copy_end - copy_start)
1204        stat["serialized_copy_time"] = total_seconds(serialized_time)
1205
1206        return stat
1207