1# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf8 -*- 2# 3# Copyright 2002 Ben Escoto <ben@emerose.org> 4# Copyright 2007 Kenneth Loafman <kenneth@loafman.com> 5# 6# This file is part of duplicity. 7# 8# Duplicity is free software; you can redistribute it and/or modify it 9# under the terms of the GNU General Public License as published by the 10# Free Software Foundation; either version 2 of the License, or (at your 11# option) any later version. 12# 13# Duplicity is distributed in the hope that it will be useful, but 14# WITHOUT ANY WARRANTY; without even the implied warranty of 15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 16# General Public License for more details. 17# 18# You should have received a copy of the GNU General Public License 19# along with duplicity; if not, write to the Free Software Foundation, 20# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 21 22u"""Define some lazy data structures and functions acting on them""" 23 24from __future__ import print_function 25 26from builtins import map 27from builtins import next 28from builtins import range 29from builtins import object 30 31import os 32 33from duplicity import log 34from duplicity import robust 35from duplicity import util 36 37 38class Iter(object): 39 u"""Hold static methods for the manipulation of lazy iterators""" 40 41 @staticmethod 42 def filter(predicate, iterator): 43 u"""Like filter in a lazy functional programming language""" 44 for i in iterator: 45 if predicate(i): 46 yield i 47 48 @staticmethod 49 def map(function, iterator): 50 u"""Like map in a lazy functional programming language""" 51 for i in iterator: 52 yield function(i) 53 54 @staticmethod 55 def foreach(function, iterator): 56 u"""Run function on each element in iterator""" 57 for i in iterator: 58 function(i) 59 60 @staticmethod 61 def cat(*iters): 62 u"""Lazily concatenate iterators""" 63 for iter in iters: # pylint: disable=redefined-builtin 64 for i in iter: 65 yield i 66 67 @staticmethod 68 def cat2(iter_of_iters): 69 u"""Lazily concatenate iterators, iterated by big iterator""" 70 for iter in iter_of_iters: # pylint: disable=redefined-builtin 71 for i in iter: 72 yield i 73 74 @staticmethod 75 def empty(iter): # pylint: disable=redefined-builtin 76 u"""True if iterator has length 0""" 77 for i in iter: 78 return None 79 return 1 80 81 @staticmethod 82 def equal(iter1, iter2, verbose=None, operator=lambda x, y: x == y): 83 u"""True if iterator 1 has same elements as iterator 2 84 85 Use equality operator, or == if it is unspecified. 86 87 """ 88 for i1 in iter1: 89 try: 90 i2 = next(iter2) 91 except StopIteration: 92 if verbose: 93 print(u"End when i1 = %s" % (i1,)) 94 return None 95 if not operator(i1, i2): 96 if verbose: 97 print(u"%s not equal to %s" % (i1, i2)) 98 return None 99 try: 100 i2 = next(iter2) 101 except StopIteration: 102 return 1 103 if verbose: 104 print(u"End when i2 = %s" % (i2,)) 105 return None 106 107 @staticmethod 108 def Or(iter): # pylint: disable=redefined-builtin 109 u"""True if any element in iterator is true. Short circuiting""" 110 i = None 111 for i in iter: 112 if i: 113 return i 114 return i 115 116 @staticmethod 117 def And(iter): # pylint: disable=redefined-builtin 118 u"""True if all elements in iterator are true. Short circuiting""" 119 i = 1 120 for i in iter: 121 if not i: 122 return i 123 return i 124 125 @staticmethod 126 def len(iter): # pylint: disable=redefined-builtin 127 u"""Return length of iterator""" 128 i = 0 129 while 1: 130 try: 131 next(iter) 132 except StopIteration: 133 return i 134 i = i + 1 135 136 @staticmethod 137 def foldr(f, default, iter): # pylint: disable=redefined-builtin 138 u"""foldr the "fundamental list recursion operator"?""" 139 try: 140 next_item = next(iter) 141 except StopIteration: 142 return default 143 return f(next_item, Iter.foldr(f, default, iter)) 144 145 @staticmethod 146 def foldl(f, default, iter): # pylint: disable=redefined-builtin 147 u"""the fundamental list iteration operator..""" 148 while 1: 149 try: 150 next_item = next(iter) 151 except StopIteration: 152 return default 153 default = f(default, next_item) 154 155 @staticmethod 156 def multiplex(iter, num_of_forks, final_func=None, closing_func=None): # pylint: disable=redefined-builtin 157 u"""Split a single iterater into a number of streams 158 159 The return val will be a list with length num_of_forks, each 160 of which will be an iterator like iter. final_func is the 161 function that will be called on each element in iter just as 162 it is being removed from the buffer. closing_func is called 163 when all the streams are finished. 164 165 """ 166 if num_of_forks == 2 and not final_func and not closing_func: 167 im2 = IterMultiplex2(iter) 168 return (im2.yielda(), im2.yieldb()) 169 if not final_func: 170 final_func = lambda i: None 171 if not closing_func: 172 closing_func = lambda: None 173 174 # buffer is a list of elements that some iterators need and others 175 # don't 176 buffer = [] 177 178 # buffer[forkposition[i]] is the next element yieled by iterator 179 # i. If it is -1, yield from the original iter 180 starting_forkposition = [-1] * num_of_forks 181 forkposition = starting_forkposition[:] 182 called_closing_func = [None] 183 184 def get_next(fork_num): 185 u"""Return the next element requested by fork_num""" 186 if forkposition[fork_num] == -1: 187 try: 188 buffer.insert(0, next(iter)) 189 except StopIteration: 190 # call closing_func if necessary 191 if (forkposition == starting_forkposition and 192 not called_closing_func[0]): 193 closing_func() 194 called_closing_func[0] = None 195 raise StopIteration 196 for i in range(num_of_forks): 197 forkposition[i] += 1 198 199 return_val = buffer[forkposition[fork_num]] 200 forkposition[fork_num] -= 1 201 202 blen = len(buffer) 203 if not (blen - 1) in forkposition: 204 # Last position in buffer no longer needed 205 assert forkposition[fork_num] == blen - 2 206 final_func(buffer[blen - 1]) 207 del buffer[blen - 1] 208 return return_val 209 210 def make_iterator(fork_num): 211 while(1): 212 try: 213 ret = get_next(fork_num) 214 except StopIteration: 215 return 216 yield ret 217 218 return tuple(map(make_iterator, list(range(num_of_forks)))) 219 220 221class IterMultiplex2(object): 222 u"""Multiplex an iterator into 2 parts 223 224 This is a special optimized case of the Iter.multiplex function, 225 used when there is no closing_func or final_func, and we only want 226 to split it into 2. By profiling, this is a time sensitive class. 227 228 """ 229 def __init__(self, iter): # pylint: disable=redefined-builtin 230 self.a_leading_by = 0 # How many places a is ahead of b 231 self.buffer = [] 232 self.iter = iter 233 234 def yielda(self): 235 u"""Return first iterator""" 236 buf, iter = self.buffer, self.iter # pylint: disable=redefined-builtin 237 while(1): 238 if self.a_leading_by >= 0: 239 # a is in front, add new element 240 try: 241 elem = next(iter) 242 except StopIteration: 243 return 244 buf.append(elem) 245 else: 246 # b is in front, subtract an element 247 elem = buf.pop(0) 248 self.a_leading_by += 1 249 yield elem 250 251 def yieldb(self): 252 u"""Return second iterator""" 253 buf, iter = self.buffer, self.iter # pylint: disable=redefined-builtin 254 while(1): 255 if self.a_leading_by <= 0: 256 # b is in front, add new element 257 try: 258 elem = next(iter) 259 except StopIteration: 260 return 261 buf.append(elem) 262 else: 263 # a is in front, subtract an element 264 elem = buf.pop(0) 265 self.a_leading_by -= 1 266 yield elem 267 268 269class IterTreeReducer(object): 270 u"""Tree style reducer object for iterator - stolen from rdiff-backup 271 272 The indicies of a RORPIter form a tree type structure. This class 273 can be used on each element of an iter in sequence and the result 274 will be as if the corresponding tree was reduced. This tries to 275 bridge the gap between the tree nature of directories, and the 276 iterator nature of the connection between hosts and the temporal 277 order in which the files are processed. 278 279 This will usually be used by subclassing ITRBranch below and then 280 call the initializer below with the new class. 281 282 """ 283 def __init__(self, branch_class, branch_args): 284 u"""ITR initializer""" 285 self.branch_class = branch_class 286 self.branch_args = branch_args 287 self.index = None 288 self.root_branch = branch_class(*branch_args) 289 self.branches = [self.root_branch] 290 291 def finish_branches(self, index): 292 u"""Run Finish() on all branches index has passed 293 294 When we pass out of a branch, delete it and process it with 295 the parent. The innermost branches will be the last in the 296 list. Return None if we are out of the entire tree, and 1 297 otherwise. 298 299 """ 300 branches = self.branches 301 while 1: 302 to_be_finished = branches[-1] 303 base_index = to_be_finished.base_index 304 if base_index != index[:len(base_index)]: 305 # out of the tree, finish with to_be_finished 306 to_be_finished.call_end_proc() 307 del branches[-1] 308 if not branches: 309 return None 310 branches[-1].branch_process(to_be_finished) 311 else: 312 return 1 313 314 def add_branch(self): 315 u"""Return branch of type self.branch_class, add to branch list""" 316 branch = self.branch_class(*self.branch_args) 317 self.branches.append(branch) 318 return branch 319 320 def process_w_branch(self, index, branch, args): 321 u"""Run start_process on latest branch""" 322 robust.check_common_error(branch.on_error, 323 branch.start_process, args) 324 if not branch.caught_exception: 325 branch.start_successful = 1 326 branch.base_index = index 327 328 def Finish(self): 329 u"""Call at end of sequence to tie everything up""" 330 while 1: 331 to_be_finished = self.branches.pop() 332 to_be_finished.call_end_proc() 333 if not self.branches: 334 break 335 self.branches[-1].branch_process(to_be_finished) 336 337 def __call__(self, *args): 338 u"""Process args, where args[0] is current position in iterator 339 340 Returns true if args successfully processed, false if index is 341 not in the current tree and thus the final result is 342 available. 343 344 Also note below we set self.index after doing the necessary 345 start processing, in case there is a crash in the middle. 346 347 """ 348 index = args[0] 349 if self.index is None: 350 self.process_w_branch(index, self.root_branch, args) 351 self.index = index 352 return 1 353 354 if index <= self.index: 355 log.Warn(_(u"Warning: oldindex %s >= newindex %s") % 356 (util.uindex(self.index), util.uindex(index))) 357 return 1 358 359 if self.finish_branches(index) is None: 360 return None # We are no longer in the main tree 361 last_branch = self.branches[-1] 362 if last_branch.start_successful: 363 if last_branch.can_fast_process(*args): 364 robust.check_common_error(last_branch.on_error, 365 last_branch.fast_process, args) 366 else: 367 branch = self.add_branch() 368 self.process_w_branch(index, branch, args) 369 else: 370 last_branch.log_prev_error(index) 371 372 self.index = index 373 return 1 374 375 376class ITRBranch(object): 377 u"""Helper class for IterTreeReducer above 378 379 There are five stub functions below: start_process, end_process, 380 branch_process, fast_process, and can_fast_process. A class that 381 subclasses this one will probably fill in these functions to do 382 more. 383 384 """ 385 base_index = index = None 386 finished = None 387 caught_exception = start_successful = None 388 389 def call_end_proc(self): 390 u"""Runs the end_process on self, checking for errors""" 391 if self.finished or not self.start_successful: 392 self.caught_exception = 1 393 394 # Since all end_process does is copy over attributes, might as 395 # well run it even if we did get errors earlier. 396 robust.check_common_error(self.on_error, self.end_process) 397 398 self.finished = 1 399 400 def start_process(self, *args): 401 u"""Do some initial processing (stub)""" 402 pass 403 404 def end_process(self): 405 u"""Do any final processing before leaving branch (stub)""" 406 pass 407 408 def branch_process(self, branch): 409 u"""Process a branch right after it is finished (stub)""" 410 assert branch.finished 411 pass 412 413 def can_fast_process(self, *args): # pylint: disable=unused-argument 414 u"""True if object can be processed without new branch (stub)""" 415 return None 416 417 def fast_process(self, *args): 418 u"""Process args without new child branch (stub)""" 419 pass 420 421 def on_error(self, exc, *args): 422 u"""This is run on any exception in start/end-process""" 423 self.caught_exception = 1 424 if args and args[0] and isinstance(args[0], tuple): 425 filename = os.path.join(*args[0]) 426 elif self.index: 427 filename = os.path.join(*self.index) # pylint: disable=not-an-iterable 428 else: 429 filename = u"." 430 log.Warn(_(u"Error '%s' processing %s") % (exc, util.fsdecode(filename)), 431 log.WarningCode.cannot_process, 432 util.escape(filename)) 433 434 def log_prev_error(self, index): 435 u"""Call function if no pending exception""" 436 if not index: 437 index_str = u"." 438 else: 439 index_str = os.path.join(*index) 440 log.Warn(_(u"Skipping %s because of previous error") % util.fsdecode(index_str), 441 log.WarningCode.process_skipped, 442 util.escape(index_str)) 443