1# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf8 -*-
2#
3# Copyright 2002 Ben Escoto <ben@emerose.org>
4# Copyright 2007 Kenneth Loafman <kenneth@loafman.com>
5#
6# This file is part of duplicity.
7#
8# Duplicity is free software; you can redistribute it and/or modify it
9# under the terms of the GNU General Public License as published by the
10# Free Software Foundation; either version 2 of the License, or (at your
11# option) any later version.
12#
13# Duplicity is distributed in the hope that it will be useful, but
14# WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16# General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with duplicity; if not, write to the Free Software Foundation,
20# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21
22u"""Define some lazy data structures and functions acting on them"""
23
24from __future__ import print_function
25
26from builtins import map
27from builtins import next
28from builtins import range
29from builtins import object
30
31import os
32
33from duplicity import log
34from duplicity import robust
35from duplicity import util
36
37
38class Iter(object):
39    u"""Hold static methods for the manipulation of lazy iterators"""
40
41    @staticmethod
42    def filter(predicate, iterator):
43        u"""Like filter in a lazy functional programming language"""
44        for i in iterator:
45            if predicate(i):
46                yield i
47
48    @staticmethod
49    def map(function, iterator):
50        u"""Like map in a lazy functional programming language"""
51        for i in iterator:
52            yield function(i)
53
54    @staticmethod
55    def foreach(function, iterator):
56        u"""Run function on each element in iterator"""
57        for i in iterator:
58            function(i)
59
60    @staticmethod
61    def cat(*iters):
62        u"""Lazily concatenate iterators"""
63        for iter in iters:  # pylint: disable=redefined-builtin
64            for i in iter:
65                yield i
66
67    @staticmethod
68    def cat2(iter_of_iters):
69        u"""Lazily concatenate iterators, iterated by big iterator"""
70        for iter in iter_of_iters:  # pylint: disable=redefined-builtin
71            for i in iter:
72                yield i
73
74    @staticmethod
75    def empty(iter):  # pylint: disable=redefined-builtin
76        u"""True if iterator has length 0"""
77        for i in iter:
78            return None
79        return 1
80
81    @staticmethod
82    def equal(iter1, iter2, verbose=None, operator=lambda x, y: x == y):
83        u"""True if iterator 1 has same elements as iterator 2
84
85        Use equality operator, or == if it is unspecified.
86
87        """
88        for i1 in iter1:
89            try:
90                i2 = next(iter2)
91            except StopIteration:
92                if verbose:
93                    print(u"End when i1 = %s" % (i1,))
94                return None
95            if not operator(i1, i2):
96                if verbose:
97                    print(u"%s not equal to %s" % (i1, i2))
98                return None
99        try:
100            i2 = next(iter2)
101        except StopIteration:
102            return 1
103        if verbose:
104            print(u"End when i2 = %s" % (i2,))
105        return None
106
107    @staticmethod
108    def Or(iter):  # pylint: disable=redefined-builtin
109        u"""True if any element in iterator is true.  Short circuiting"""
110        i = None
111        for i in iter:
112            if i:
113                return i
114        return i
115
116    @staticmethod
117    def And(iter):  # pylint: disable=redefined-builtin
118        u"""True if all elements in iterator are true.  Short circuiting"""
119        i = 1
120        for i in iter:
121            if not i:
122                return i
123        return i
124
125    @staticmethod
126    def len(iter):  # pylint: disable=redefined-builtin
127        u"""Return length of iterator"""
128        i = 0
129        while 1:
130            try:
131                next(iter)
132            except StopIteration:
133                return i
134            i = i + 1
135
136    @staticmethod
137    def foldr(f, default, iter):  # pylint: disable=redefined-builtin
138        u"""foldr the "fundamental list recursion operator"?"""
139        try:
140            next_item = next(iter)
141        except StopIteration:
142            return default
143        return f(next_item, Iter.foldr(f, default, iter))
144
145    @staticmethod
146    def foldl(f, default, iter):  # pylint: disable=redefined-builtin
147        u"""the fundamental list iteration operator.."""
148        while 1:
149            try:
150                next_item = next(iter)
151            except StopIteration:
152                return default
153            default = f(default, next_item)
154
155    @staticmethod
156    def multiplex(iter, num_of_forks, final_func=None, closing_func=None):  # pylint: disable=redefined-builtin
157        u"""Split a single iterater into a number of streams
158
159        The return val will be a list with length num_of_forks, each
160        of which will be an iterator like iter.  final_func is the
161        function that will be called on each element in iter just as
162        it is being removed from the buffer.  closing_func is called
163        when all the streams are finished.
164
165        """
166        if num_of_forks == 2 and not final_func and not closing_func:
167            im2 = IterMultiplex2(iter)
168            return (im2.yielda(), im2.yieldb())
169        if not final_func:
170            final_func = lambda i: None
171        if not closing_func:
172            closing_func = lambda: None
173
174        # buffer is a list of elements that some iterators need and others
175        # don't
176        buffer = []
177
178        # buffer[forkposition[i]] is the next element yieled by iterator
179        # i.  If it is -1, yield from the original iter
180        starting_forkposition = [-1] * num_of_forks
181        forkposition = starting_forkposition[:]
182        called_closing_func = [None]
183
184        def get_next(fork_num):
185            u"""Return the next element requested by fork_num"""
186            if forkposition[fork_num] == -1:
187                try:
188                    buffer.insert(0, next(iter))
189                except StopIteration:
190                    # call closing_func if necessary
191                    if (forkposition == starting_forkposition and
192                            not called_closing_func[0]):
193                        closing_func()
194                        called_closing_func[0] = None
195                    raise StopIteration
196                for i in range(num_of_forks):
197                    forkposition[i] += 1
198
199            return_val = buffer[forkposition[fork_num]]
200            forkposition[fork_num] -= 1
201
202            blen = len(buffer)
203            if not (blen - 1) in forkposition:
204                # Last position in buffer no longer needed
205                assert forkposition[fork_num] == blen - 2
206                final_func(buffer[blen - 1])
207                del buffer[blen - 1]
208            return return_val
209
210        def make_iterator(fork_num):
211            while(1):
212                try:
213                    ret = get_next(fork_num)
214                except StopIteration:
215                    return
216                yield ret
217
218        return tuple(map(make_iterator, list(range(num_of_forks))))
219
220
221class IterMultiplex2(object):
222    u"""Multiplex an iterator into 2 parts
223
224    This is a special optimized case of the Iter.multiplex function,
225    used when there is no closing_func or final_func, and we only want
226    to split it into 2.  By profiling, this is a time sensitive class.
227
228    """
229    def __init__(self, iter):  # pylint: disable=redefined-builtin
230        self.a_leading_by = 0  # How many places a is ahead of b
231        self.buffer = []
232        self.iter = iter
233
234    def yielda(self):
235        u"""Return first iterator"""
236        buf, iter = self.buffer, self.iter  # pylint: disable=redefined-builtin
237        while(1):
238            if self.a_leading_by >= 0:
239                # a is in front, add new element
240                try:
241                    elem = next(iter)
242                except StopIteration:
243                    return
244                buf.append(elem)
245            else:
246                # b is in front, subtract an element
247                elem = buf.pop(0)
248            self.a_leading_by += 1
249            yield elem
250
251    def yieldb(self):
252        u"""Return second iterator"""
253        buf, iter = self.buffer, self.iter  # pylint: disable=redefined-builtin
254        while(1):
255            if self.a_leading_by <= 0:
256                # b is in front, add new element
257                try:
258                    elem = next(iter)
259                except StopIteration:
260                    return
261                buf.append(elem)
262            else:
263                # a is in front, subtract an element
264                elem = buf.pop(0)
265            self.a_leading_by -= 1
266            yield elem
267
268
269class IterTreeReducer(object):
270    u"""Tree style reducer object for iterator - stolen from rdiff-backup
271
272    The indicies of a RORPIter form a tree type structure.  This class
273    can be used on each element of an iter in sequence and the result
274    will be as if the corresponding tree was reduced.  This tries to
275    bridge the gap between the tree nature of directories, and the
276    iterator nature of the connection between hosts and the temporal
277    order in which the files are processed.
278
279    This will usually be used by subclassing ITRBranch below and then
280    call the initializer below with the new class.
281
282    """
283    def __init__(self, branch_class, branch_args):
284        u"""ITR initializer"""
285        self.branch_class = branch_class
286        self.branch_args = branch_args
287        self.index = None
288        self.root_branch = branch_class(*branch_args)
289        self.branches = [self.root_branch]
290
291    def finish_branches(self, index):
292        u"""Run Finish() on all branches index has passed
293
294        When we pass out of a branch, delete it and process it with
295        the parent.  The innermost branches will be the last in the
296        list.  Return None if we are out of the entire tree, and 1
297        otherwise.
298
299        """
300        branches = self.branches
301        while 1:
302            to_be_finished = branches[-1]
303            base_index = to_be_finished.base_index
304            if base_index != index[:len(base_index)]:
305                # out of the tree, finish with to_be_finished
306                to_be_finished.call_end_proc()
307                del branches[-1]
308                if not branches:
309                    return None
310                branches[-1].branch_process(to_be_finished)
311            else:
312                return 1
313
314    def add_branch(self):
315        u"""Return branch of type self.branch_class, add to branch list"""
316        branch = self.branch_class(*self.branch_args)
317        self.branches.append(branch)
318        return branch
319
320    def process_w_branch(self, index, branch, args):
321        u"""Run start_process on latest branch"""
322        robust.check_common_error(branch.on_error,
323                                  branch.start_process, args)
324        if not branch.caught_exception:
325            branch.start_successful = 1
326        branch.base_index = index
327
328    def Finish(self):
329        u"""Call at end of sequence to tie everything up"""
330        while 1:
331            to_be_finished = self.branches.pop()
332            to_be_finished.call_end_proc()
333            if not self.branches:
334                break
335            self.branches[-1].branch_process(to_be_finished)
336
337    def __call__(self, *args):
338        u"""Process args, where args[0] is current position in iterator
339
340        Returns true if args successfully processed, false if index is
341        not in the current tree and thus the final result is
342        available.
343
344        Also note below we set self.index after doing the necessary
345        start processing, in case there is a crash in the middle.
346
347        """
348        index = args[0]
349        if self.index is None:
350            self.process_w_branch(index, self.root_branch, args)
351            self.index = index
352            return 1
353
354        if index <= self.index:
355            log.Warn(_(u"Warning: oldindex %s >= newindex %s") %
356                     (util.uindex(self.index), util.uindex(index)))
357            return 1
358
359        if self.finish_branches(index) is None:
360            return None  # We are no longer in the main tree
361        last_branch = self.branches[-1]
362        if last_branch.start_successful:
363            if last_branch.can_fast_process(*args):
364                robust.check_common_error(last_branch.on_error,
365                                          last_branch.fast_process, args)
366            else:
367                branch = self.add_branch()
368                self.process_w_branch(index, branch, args)
369        else:
370            last_branch.log_prev_error(index)
371
372        self.index = index
373        return 1
374
375
376class ITRBranch(object):
377    u"""Helper class for IterTreeReducer above
378
379    There are five stub functions below: start_process, end_process,
380    branch_process, fast_process, and can_fast_process.  A class that
381    subclasses this one will probably fill in these functions to do
382    more.
383
384    """
385    base_index = index = None
386    finished = None
387    caught_exception = start_successful = None
388
389    def call_end_proc(self):
390        u"""Runs the end_process on self, checking for errors"""
391        if self.finished or not self.start_successful:
392            self.caught_exception = 1
393
394        # Since all end_process does is copy over attributes, might as
395        # well run it even if we did get errors earlier.
396        robust.check_common_error(self.on_error, self.end_process)
397
398        self.finished = 1
399
400    def start_process(self, *args):
401        u"""Do some initial processing (stub)"""
402        pass
403
404    def end_process(self):
405        u"""Do any final processing before leaving branch (stub)"""
406        pass
407
408    def branch_process(self, branch):
409        u"""Process a branch right after it is finished (stub)"""
410        assert branch.finished
411        pass
412
413    def can_fast_process(self, *args):  # pylint: disable=unused-argument
414        u"""True if object can be processed without new branch (stub)"""
415        return None
416
417    def fast_process(self, *args):
418        u"""Process args without new child branch (stub)"""
419        pass
420
421    def on_error(self, exc, *args):
422        u"""This is run on any exception in start/end-process"""
423        self.caught_exception = 1
424        if args and args[0] and isinstance(args[0], tuple):
425            filename = os.path.join(*args[0])
426        elif self.index:
427            filename = os.path.join(*self.index)  # pylint: disable=not-an-iterable
428        else:
429            filename = u"."
430        log.Warn(_(u"Error '%s' processing %s") % (exc, util.fsdecode(filename)),
431                 log.WarningCode.cannot_process,
432                 util.escape(filename))
433
434    def log_prev_error(self, index):
435        u"""Call function if no pending exception"""
436        if not index:
437            index_str = u"."
438        else:
439            index_str = os.path.join(*index)
440        log.Warn(_(u"Skipping %s because of previous error") % util.fsdecode(index_str),
441                 log.WarningCode.process_skipped,
442                 util.escape(index_str))
443