1# Copyright (C) 2007-2020 Damon Lynch <damonlynch@gmail.com> 2 3# This file is part of Rapid Photo Downloader. 4# 5# Rapid Photo Downloader is free software: you can redistribute it and/or 6# modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation, either version 3 of the License, or 9# (at your option) any later version. 10# 11# Rapid Photo Downloader is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with Rapid Photo Downloader. If not, 18# see <http://www.gnu.org/licenses/>. 19 20__author__ = 'Damon Lynch' 21__copyright__ = "Copyright 2007-2020, Damon Lynch" 22 23import contextlib 24import site 25import locale 26import logging 27import os 28import random 29import re 30import string 31import sys 32import tempfile 33import time 34import tarfile 35from collections import namedtuple, defaultdict 36 37from datetime import datetime 38from itertools import groupby, zip_longest 39from typing import Optional, List, Union, Any, Tuple, Iterator 40import struct 41import ctypes 42import signal 43import warnings 44import xdg 45import babel 46from glob import glob 47from pkg_resources import parse_version 48import pkg_resources 49 50import arrow 51import psutil 52from PyQt5.QtCore import QSize, QLocale, QTranslator, QLibraryInfo 53 54import raphodo.__about__ as __about__ 55from raphodo.constants import disable_version_check 56from raphodo import localedir, i18n_domain 57 58 59# Arrow 0.9.0 separated the replace and shift functions into separate calls, deprecating using 60# replace() to do the work of the new shift() 61# Arrow 0.14.5 removed the deprecated shift functionality from the replace() 62try: 63 arrow_version = parse_version(arrow.__version__) 64except AttributeError: 65 arrow_version = None 66 67if arrow_version is not None: 68 arrow_shift_support = arrow_version >= parse_version('0.9.0') 69else: 70 try: 71 now = arrow.now() 72 now.shift(seconds=1) 73 arrow_shift_support = True 74 except AttributeError: 75 arrow_shift_support = False 76 77 78# Suppress parsing warnings for 0.14.3 <= Arrow version < 0.15 79if arrow_version >= parse_version('0.14.3') and arrow_version < parse_version('0.15.0'): 80 from arrow.factory import ArrowParseWarning 81 warnings.simplefilter("ignore", ArrowParseWarning) 82 83 84# Linux specific code to ensure child processes exit when parent dies 85# See http://stackoverflow.com/questions/19447603/ 86# how-to-kill-a-python-child-process-created-with-subprocess-check-output-when-t/ 87libc = ctypes.CDLL("libc.so.7") 88def set_pdeathsig(sig = signal.SIGTERM): 89 def callable(): 90 return 0 91 #return libc.procctl(0, 0, 11, sig) 92 return callable 93 94 95def available_cpu_count(physical_only=False) -> int: 96 """ 97 Determine the number of CPUs available. 98 99 A CPU is "available" if cpuset has not restricted the number of 100 cpus. Portions of this code from 101 http://stackoverflow.com/questions/1006289/how-to-find-out-the-number-of- 102 cpus-using-python 103 104 :return available CPU count, or 1 if cannot be determined. 105 Value guaranteed to be >= 1. 106 """ 107 108 # cpuset may restrict the number of *available* processors 109 available = None 110 if sys.platform.startswith('linux'): 111 try: 112 status_file = open('/proc/self/status') 113 status = status_file.read() 114 status_file.close() 115 except IOError: 116 pass 117 else: 118 m = re.search(r'(?m)^Cpus_allowed:\s*(.*)$', status) 119 if m: 120 available = bin(int(m.group(1).replace(',', ''), 16)).count('1') 121 if available > 0 and not physical_only: 122 return available 123 124 if physical_only: 125 physical = psutil.cpu_count(logical=False) 126 if physical is not None: 127 if available is not None: 128 return min(available, physical) 129 return physical 130 131 c = os.cpu_count() 132 if c is not None: 133 return max(c, 1) 134 c = psutil.cpu_count() 135 if c is not None: 136 return max(c, 1) 137 else: 138 return 1 139 140def confirm(prompt: Optional[str]=None, resp: Optional[bool]=False) -> bool: 141 r""" 142 Prompts for yes or no response from the user. 143 144 :param prompt: prompt displayed to user 145 :param resp: the default value assumed by the caller when user 146 simply types ENTER. 147 :return: True for yes and False for no. 148 """ 149 150 # >>> confirm(prompt='Create Directory?', resp=True) 151 # Create Directory? [y]|n: 152 # True 153 # >>> confirm(prompt='Create Directory?', resp=False) 154 # Create Directory? [n]|y: 155 # False 156 # >>> confirm(prompt='Create Directory?', resp=False) 157 # Create Directory? [n]|y: y 158 # True 159 160 if prompt is None: 161 prompt = 'Confirm' 162 163 if resp: 164 prompt = '%s [%s]|%s: ' % (prompt, 'y', 'n') 165 else: 166 prompt = '%s [%s]|%s: ' % (prompt, 'n', 'y') 167 168 while True: 169 ans = input(prompt) 170 if not ans: 171 return resp 172 if ans not in ['y', 'Y', 'n', 'N']: 173 print('please enter y or n.') 174 continue 175 return ans in ['y', 'Y'] 176 177 178@contextlib.contextmanager 179def stdchannel_redirected(stdchannel, dest_filename): 180 """ 181 A context manager to temporarily redirect stdout or stderr 182 183 Usage: 184 with stdchannel_redirected(sys.stderr, os.devnull): 185 do_work() 186 187 Source: 188 http://marc-abramowitz.com/archives/2013/07/19/python-context-manager-for-redirected-stdout-and-stderr/ 189 """ 190 oldstdchannel = dest_file = None 191 try: 192 oldstdchannel = os.dup(stdchannel.fileno()) 193 dest_file = open(dest_filename, 'w') 194 os.dup2(dest_file.fileno(), stdchannel.fileno()) 195 yield 196 finally: 197 if oldstdchannel is not None: 198 os.dup2(oldstdchannel, stdchannel.fileno()) 199 if dest_file is not None: 200 dest_file.close() 201 202@contextlib.contextmanager 203def show_errors(): 204 yield 205 206# Translators: these values are file size suffixes like B representing bytes, KB representing 207# kilobytes, etc. 208suffixes = [_('B'), _('KB'), _('MB'), _('GB'), _('TB'), _('PB'), _('EB'), _('ZB'), _('YB')] 209 210def format_size_for_user(size_in_bytes: int, 211 zero_string: str='', 212 no_decimals: int=2) -> str: 213 r""" 214 Humanize display of bytes. 215 216 Uses Microsoft style i.e. 1000 Bytes = 1 KB 217 218 :param size: size in bytes 219 :param zero_string: string to use if size == 0 220 221 >>> locale.setlocale(locale.LC_ALL, ('en_US', 'utf-8')) 222 'en_US.UTF-8' 223 >>> format_size_for_user(0) 224 '' 225 >>> format_size_for_user(1) 226 '1 B' 227 >>> format_size_for_user(123) 228 '123 B' 229 >>> format_size_for_user(1000) 230 '1 KB' 231 >>> format_size_for_user(1024) 232 '1.02 KB' 233 >>> format_size_for_user(1024, no_decimals=0) 234 '1 KB' 235 >>> format_size_for_user(1100, no_decimals=2) 236 '1.1 KB' 237 >>> format_size_for_user(1000000, no_decimals=2) 238 '1 MB' 239 >>> format_size_for_user(1000001, no_decimals=2) 240 '1 MB' 241 >>> format_size_for_user(1020001, no_decimals=2) 242 '1.02 MB' 243 """ 244 245 if size_in_bytes == 0: return zero_string 246 i = 0 247 while size_in_bytes >= 1000 and i < len(suffixes)-1: 248 size_in_bytes /= 1000 249 i += 1 250 251 if no_decimals: 252 s = '{:.{prec}f}'.format(size_in_bytes, prec=no_decimals).rstrip('0').rstrip('.') 253 else: 254 s = '{:.0f}'.format(size_in_bytes) 255 return s + ' ' + suffixes[i] 256 257def divide_list(source: list, no_pieces: int) -> list: 258 r""" 259 Returns a list containing no_pieces lists, with the items 260 of the original list evenly distributed 261 :param source: the list to divide 262 :param no_pieces: the nubmer of pieces the lists 263 :return: the new list 264 265 >>> divide_list(list(range(12)), 4) 266 [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10, 11]] 267 >>> divide_list(list(range(11)), 4) 268 [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]] 269 """ 270 source_size = len(source) 271 slice_size = source_size // no_pieces 272 remainder = source_size % no_pieces 273 result = [] 274 275 extra = 0 276 for i in range(no_pieces): 277 start = i * slice_size + extra 278 source_slice = source[start:start + slice_size] 279 if remainder: 280 source_slice += [source[start + slice_size]] 281 remainder -= 1 282 extra += 1 283 result.append(source_slice) 284 return result 285 286def divide_list_on_length(source: List, length: int) -> List: 287 288 r""" 289 Break a list into lists no longer than length. 290 291 >>> l=list(range(11)) 292 >>> divide_list_on_length(l, 3) 293 [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]] 294 >>> l=list(range(12)) 295 >>> divide_list_on_length(l, 3) 296 [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10, 11]] 297 """ 298 299 return [source[i:i+length] for i in range(0, len(source), length)] 300 301def addPushButtonLabelSpacer(s: str) -> str: 302 return ' ' + s 303 304 305class GenerateRandomFileName: 306 def __init__(self): 307 # the characters used to generate temporary file names 308 self.file_name_characters = list(string.ascii_letters + string.digits) 309 310 def name(self, extension: str=None) -> str: 311 """ 312 :param extension: if included, random file name will include the 313 file extension 314 :return: file name 5 characters long with or without extension 315 """ 316 if extension is not None: 317 return '{}.{}'.format( 318 ''.join(random.sample(self.file_name_characters, 5)), 319 extension 320 ) 321 else: 322 return ''.join(random.sample(self.file_name_characters, 5)) 323 324 325TempDirs = namedtuple('TempDirs', 'photo_temp_dir, video_temp_dir') 326CacheDirs = namedtuple('CacheDirs', 'photo_cache_dir, video_cache_dir') 327 328 329def create_temp_dir(folder: Optional[str]=None, 330 prefix: Optional[str]=None, 331 force_no_prefix: bool=False) -> str: 332 """ 333 Creates a temporary director and logs errors 334 :param folder: the folder in which the temporary directory should 335 be created. If not specified, uses the tempfile.mkstemp default. 336 :param prefix: any name the directory should start with. If None, 337 default rpd-tmp will be used as prefix, unless force_no_prefix 338 is True 339 :param force_no_prefix: if True, a directory prefix will never 340 be used 341 :return: full path of the temporary directory 342 """ 343 if prefix is None and not force_no_prefix: 344 prefix = "rpd-tmp-" 345 try: 346 temp_dir = tempfile.mkdtemp(prefix=prefix, dir=folder) 347 except OSError as inst: 348 msg = "Failed to create temporary directory in %s: %s %s" % ( 349 folder, 350 inst.errno, 351 inst.strerror 352 ) 353 logging.critical(msg) 354 temp_dir = None 355 return temp_dir 356 357 358def create_temp_dirs(photo_download_folder: str, 359 video_download_folder: str) -> TempDirs: 360 """ 361 Create pair of temporary directories for photo and video download 362 :param photo_download_folder: where photos will be downloaded to 363 :param video_download_folder: where videos will be downloaded to 364 :return: the directories 365 """ 366 photo_temp_dir = video_temp_dir = None 367 if photo_download_folder is not None: 368 photo_temp_dir = create_temp_dir(photo_download_folder) 369 logging.debug("Photo temporary directory: %s", photo_temp_dir) 370 if video_download_folder is not None: 371 video_temp_dir = create_temp_dir(video_download_folder) 372 logging.debug("Video temporary directory: %s", video_temp_dir) 373 return TempDirs(photo_temp_dir, video_temp_dir) 374 375 376def same_device(file1: str, file2: str) -> bool: 377 """ 378 Returns True if the files / directories are on the same device (partition). 379 380 No error checking. 381 382 :param file1: first file / directory to check 383 :param file2: second file / directory to check 384 :return: True if the same file system, else false 385 """ 386 387 dev1 = os.stat(file1).st_dev 388 dev2 = os.stat(file2).st_dev 389 return dev1 == dev2 390 391 392def find_mount_point(path: str) -> str: 393 """ 394 Find the mount point of a path 395 See: 396 http://stackoverflow.com/questions/4453602/how-to-find-the-mountpoint-a-file-resides-on 397 398 >>> print(find_mount_point('/crazy/path')) 399 / 400 401 :param path: 402 :return: 403 """ 404 path = os.path.realpath(path) 405 while not os.path.ismount(path): 406 path = os.path.dirname(path) 407 return path 408 409 410def make_internationalized_list(items: List[str]) -> str: 411 r""" 412 Makes a string of items conforming to i18n 413 414 >>> print(make_internationalized_list([])) 415 <BLANKLINE> 416 >>> print(make_internationalized_list(['one'])) 417 one 418 >>> print(make_internationalized_list(['one', 'two'])) 419 one and two 420 >>> print(make_internationalized_list(['one', 'two', 'three'])) 421 one, two and three 422 >>> print(make_internationalized_list(['one', 'two', 'three', 'four'])) 423 one, two, three and four 424 425 Loosely follows the guideline here: 426 http://cldr.unicode.org/translation/lists 427 428 :param items: the list of items to make a string out of 429 :return: internationalized string 430 """ 431 if len(items) == 1: 432 return items[0] 433 if len(items) == 2: 434 # Translators: two things in a list e.g. "device1 and device2" 435 # Translators: %(variable)s represents Python code, not a plural of the term 436 # variable. You must keep the %(variable)s untranslated, or the program will 437 # crash. 438 return _('%(first_item)s and %(last_item)s') % dict( 439 first_item=items[0], last_item=items[1]) 440 if len(items) > 2: 441 s = items[0] 442 for item in items[1:-1]: 443 # Translators: the middle of a list of things 444 # Translators: %(variable)s represents Python code, not a plural of the term 445 # variable. You must keep the %(variable)s untranslated, or the program will 446 # crash. 447 s = '%(first_items)s, %(last_items)s'% dict( 448 first_items=s, last_items=item 449 ) 450 # Translators: the end of a list of things 451 # Translators: %(variable)s represents Python code, not a plural of the term 452 # variable. You must keep the %(variable)s untranslated, or the program will 453 # crash. 454 s = '%(start_items)s and %(last_item)s' % dict( 455 start_items=s, last_item=items[-1] 456 ) 457 return s 458 return '' 459 460 461def thousands(i: int) -> str: 462 """ 463 Add a thousands seperator (or its locale equivalent) to an 464 integer. Assumes the module level locale setting has already been 465 set. 466 :param i: the integer e.g. 1000 467 :return: string with seperators e.g. '1,000' 468 """ 469 try: 470 return locale.format_string("%d", i, grouping=True) 471 except TypeError: 472 return i 473 474 475# Source of class AdjacentKey, first_and_last and runs: 476# http://stupidpythonideas.blogspot.com/2014/01/grouping-into-runs-of-adjacent-values.html 477class AdjacentKey: 478 r""" 479 >>> [list(g) for k, g in groupby([0, 1, 2, 3, 5, 6, 7, 10, 11, 13, 16], AdjacentKey)] 480 [[0, 1, 2, 3], [5, 6, 7], [10, 11], [13], [16]] 481 """ 482 __slots__ = ['obj'] 483 484 def __init__(self, obj) -> None: 485 self.obj = obj 486 487 def __eq__(self, other) -> bool: 488 ret = self.obj - 1 <= other.obj <= self.obj + 1 489 if ret: 490 self.obj = other.obj 491 return ret 492 493 494def first_and_last(iterable): 495 start = end = next(iterable) 496 for end in iterable: pass 497 return start, end 498 499 500def runs(iterable): 501 r""" 502 identify adjacent elements in pre-sorted data 503 504 :param iterable: sorted data 505 506 >>> list(runs([0, 1, 2, 3, 5, 6, 7, 10, 11, 13, 16])) 507 [(0, 3), (5, 7), (10, 11), (13, 13), (16, 16)] 508 >>> list(runs([0])) 509 [(0, 0)] 510 >>> list(runs([0, 1, 10, 100, 101])) 511 [(0, 1), (10, 10), (100, 101)] 512 """ 513 514 for k, g in groupby(iterable, AdjacentKey): 515 yield first_and_last(g) 516 517 518numbers = namedtuple('numbers', 'number, plural') 519 520 521long_numbers = { 522 1: _('one'), 523 2: _('two'), 524 3: _('three'), 525 4: _('four'), 526 5: _('five'), 527 6: _('six'), 528 7: _('seven'), 529 8: _('eight'), 530 9: _('nine'), 531 10: _('ten'), 532 11: _('eleven'), 533 12: _('twelve'), 534 13: _('thirteen'), 535 14: _('fourteen'), 536 15: _('fifteen'), 537 16: _('sixteen'), 538 17: _('seventeen'), 539 18: _('eighteen'), 540 19: _('ninenteen'), 541 20: _('twenty') 542} 543 544 545def number(value: int) -> numbers: 546 r""" 547 Convert integer to written form, e.g. one, two, etc. 548 549 Will propagate TypeError or KeyError on 550 failure. 551 552 >>> number(1) 553 numbers(number='one', plural=False) 554 >>> number(2) 555 numbers(number='two', plural=True) 556 >>> number(10) 557 numbers(number='ten', plural=True) 558 >>> number(20) 559 numbers(number='twenty', plural=True) 560 >>> 561 562 :param value: int between 1 and 20 563 :return: tuple of str and whether it is plural 564 """ 565 566 plural = value > 1 567 text = long_numbers[value] 568 return numbers(text, plural) 569 570 571def datetime_roughly_equal(dt1: Union[datetime, float], dt2: Union[datetime, float], 572 seconds: int=120) -> bool: 573 r""" 574 Check to see if date times are equal, give or take n seconds 575 :param dt1: python datetime, or timestamp, to check 576 :param dt2:python datetime, or timestamp to check 577 :param seconds: number of seconds leeway 578 :return: True if "equal", False otherwise 579 580 >>> dt1 = datetime.now() 581 >>> time.sleep(.1) 582 >>> dt2 = datetime.now() 583 >>> datetime_roughly_equal(dt1, dt2, 1) 584 True 585 >>> dt1 = 1458561776.0 586 >>> dt2 = 1458561776.0 587 >>> datetime_roughly_equal(dt1, dt2, 120) 588 True 589 >>> dt2 += 450 590 >>> datetime_roughly_equal(dt1, dt2, 120) 591 False 592 >>> datetime_roughly_equal(dt1, dt2, 500) 593 True 594 """ 595 596 # arrow.get from time stamp gives UTC time 597 at1 = arrow.get(dt1) 598 at2 = arrow.get(dt2) 599 if arrow_shift_support: 600 return at1.shift(seconds=-seconds) < at2 < at1.shift(seconds=+seconds) 601 else: 602 return at1.replace(seconds=-seconds) < at2 < at1.replace(seconds=+seconds) 603 604 605def process_running(process_name: str, partial_name: bool=True) -> bool: 606 """ 607 Search the list of the system's running processes to see if a process with this 608 name is running 609 610 :param process_name: the name of the process to search for 611 :param partial_name: if True, the process_name argument can be a 612 partial match 613 :return: True if found, else False 614 """ 615 616 for proc in psutil.process_iter(): 617 try: 618 name = proc.name() 619 except psutil.NoSuchProcess: 620 pass 621 else: 622 if partial_name: 623 if name.find(process_name) >= 0: 624 return True 625 else: 626 if name == process_name: 627 return True 628 return False 629 630 631def make_html_path_non_breaking(path: str) -> str: 632 """ 633 When /some/path is displayed in rich text, it will be word-wrapped on the 634 slashes. Inhibit that using a special unicode character. 635 636 :param path: the path 637 :return: the path containing the special characters 638 """ 639 640 return path.replace(os.sep, '{}⁠'.format(os.sep)) 641 642 643def prefs_list_from_gconftool2_string(value: str) -> List[str]: 644 r""" 645 Take a raw string preference value as returned by gconftool-2 646 and convert it to a list of strings. 647 648 Handles escaped characters 649 650 :param value: the raw value as returned by gconftool-2 651 :return: the list of strings 652 653 >>> prefs_list_from_gconftool2_string( # doctest: +ELLIPSIS 654 ... '[Text,IMG_,,Sequences,Stored number,Four digits,Filename,Extension,UPPERCASE]') 655 ... # doctest: +NORMALIZE_WHITESPACE 656 ['Text', 'IMG_', '', 'Sequences', 'Stored number', 'Four digits', 'Filename', 'Extension', 657 'UPPERCASE'] 658 >>> prefs_list_from_gconftool2_string('[Text,IMG_\,\\;+=|!@\,#^&*()$%/",,]') 659 ['Text', 'IMG_,\\;+=|!@,#^&*()$%/"', '', ''] 660 >>> prefs_list_from_gconftool2_string('[Manila,Dubai,London]') 661 ['Manila', 'Dubai', 'London'] 662 """ 663 # Trim the left and right square brackets 664 value = value[1:-1] 665 666 # Split on the comma, but not commas that were escaped. 667 # Use a regex with a negative lookbehind assertion 668 splits = re.split(r'(?<!\\),', value) 669 # Replace the escaped commas with just plain commas 670 return [s.replace('\\,', ',') for s in splits] 671 672 673def pref_bool_from_gconftool2_string(value: str) -> bool: 674 if value == 'true': 675 return True 676 elif value == 'false': 677 return False 678 raise ValueError 679 680 681def remove_last_char_from_list_str(items: List[str]) -> List[str]: 682 r""" 683 Remove the last character from a list of strings, modifying the list in place, 684 such that the last item is never empty 685 686 :param items: the list to modify 687 :return: in place copy 688 689 >>> remove_last_char_from_list_str([' abc', 'def', 'ghi']) 690 [' abc', 'def', 'gh'] 691 >>> remove_last_char_from_list_str([' abc', 'def', 'gh'] ) 692 [' abc', 'def', 'g'] 693 >>> remove_last_char_from_list_str([' abc', 'def', 'g'] ) 694 [' abc', 'def'] 695 >>> remove_last_char_from_list_str([' a']) 696 [' '] 697 >>> remove_last_char_from_list_str([' ']) 698 [] 699 >>> remove_last_char_from_list_str([]) 700 [] 701 """ 702 if items: 703 if not items[-1]: 704 items = items[:-1] 705 else: 706 items[-1] = items[-1][:-1] 707 if items and not items[-1]: 708 items = items[:-1] 709 return items 710 711 712def platform_c_maxint() -> int: 713 """ 714 See http://stackoverflow.com/questions/13795758/what-is-sys-maxint-in-python-3 715 716 :return: the maximum size of an int in C when compiled the same way Python was 717 """ 718 return 2 ** (struct.Struct('i').size * 8 - 1) - 1 719 720 721def commonprefix(*paths) -> str: 722 """ 723 Python 3.4 compatible. 724 725 Remove when Python 3.5 becomes the minimum. 726 """ 727 728 return os.path.dirname(os.path.commonprefix(paths)) 729 730 731def _recursive_identify_depth(*paths, depth) -> int: 732 basenames = [os.path.basename(path) for path in paths] 733 if len(basenames) != len(set(basenames)): 734 duplicates = _collect_duplicates(basenames, paths) 735 736 for basename in duplicates: 737 chop = len(basename) + 1 738 chopped = (path[:-chop] for path in duplicates[basename]) 739 depth = max(depth, _recursive_identify_depth(*chopped, depth=depth + 1)) 740 return depth 741 742 743def _collect_duplicates(basenames, paths): 744 duplicates = defaultdict(list) 745 for basename, path in zip(basenames, paths): 746 duplicates[basename].append(path) 747 return {basename: paths for basename, paths in duplicates.items() if len(paths) > 1} 748 749 750def make_path_end_snippets_unique(*paths) -> List[str]: 751 r""" 752 Make list of path ends unique given possible common path endings. 753 754 A snippet starts from the end of the path, in extreme cases possibly up the path start. 755 756 :param paths: sequence of paths to generate unique end snippets for 757 :return: list of unique snippets 758 759 >>> p0 = '/home/damon/photos' 760 >>> p1 = '/media/damon/backup1/photos' 761 >>> p2 = '/media/damon/backup2/photos' 762 >>> p3 = '/home/damon/videos' 763 >>> p4 = '/media/damon/backup1/videos' 764 >>> p5 = '/media/damon/backup2/videos' 765 >>> p6 = '/media/damon/drive1/home/damon/photos' 766 >>> s0 = make_path_end_snippets_unique(p0, p3) 767 >>> print(s0) 768 ['photos', 'videos'] 769 >>> s1 = make_path_end_snippets_unique(p0, p1, p2) 770 >>> print(s1) 771 ['damon/photos', 'backup1/photos', 'backup2/photos'] 772 >>> s2 = make_path_end_snippets_unique(p0, p1, p2, p3) 773 >>> print(s2) 774 ['damon/photos', 'backup1/photos', 'backup2/photos', 'videos'] 775 >>> s3 = make_path_end_snippets_unique(p3, p4, p5) 776 >>> print(s3) 777 ['damon/videos', 'backup1/videos', 'backup2/videos'] 778 >>> s4 = make_path_end_snippets_unique(p0, p1, p2, p3, p6) 779 >>> print(s4) #doctest: +NORMALIZE_WHITESPACE 780 ['/home/damon/photos', '/media/damon/backup1/photos', '/media/damon/backup2/photos', 'videos', 781 'drive1/home/damon/photos'] 782 >>> s5 = make_path_end_snippets_unique(p1, p2, p3, p6) 783 >>> print(s5) 784 ['backup1/photos', 'backup2/photos', 'videos', 'damon/photos'] 785 """ 786 787 basenames = [os.path.basename(path) for path in paths] 788 789 if len(basenames) != len(set(basenames)): 790 names = [] 791 depths = defaultdict(int) 792 duplicates = _collect_duplicates(basenames, paths) 793 794 for basename, path in zip(basenames, paths): 795 if basename in duplicates: 796 depths[basename] = _recursive_identify_depth(*duplicates[basename], depth=0) 797 798 for basename, path in zip(basenames, paths): 799 depth = depths[basename] 800 if depth: 801 dirs = path.split(os.sep) 802 index = len(dirs) - depth - 1 803 name = (os.sep.join(dirs[max(index, 0): ])) 804 if index > 1: 805 pass 806 # name = '...' + name 807 elif index == 1: 808 name = os.sep + name 809 else: 810 name = basename 811 names.append(name) 812 return names 813 else: 814 return basenames 815 816have_logged_os_release = False 817 818 819def log_os_release() -> None: 820 """ 821 Log the entired contents of /etc/os-release, but only if 822 we didn't do so already. 823 """ 824 825 global have_logged_os_release 826 827 if not have_logged_os_release: 828 try: 829 with open('/etc/os-release', 'r') as f: 830 for line in f: 831 logging.debug(line.rstrip('\n')) 832 except: 833 pass 834 have_logged_os_release = True 835 836 837def extract_file_from_tar(full_tar_path, member_filename) -> bool: 838 """ 839 Extracts a file from a tar.gz and places it beside the tar file 840 :param full_tar_path: path and filename of the tar.gz file 841 :param member_filename: file wanted 842 :return: True if successful, False otherwise 843 """ 844 845 tar_dir, tar_name = os.path.split(full_tar_path) 846 tar_name = tar_name[:len('.tar.gz') * -1] 847 member = os.path.join(tar_name, member_filename) 848 try: 849 with tarfile.open(full_tar_path) as tar: 850 tar.extractall(members=(tar.getmember(member),), path=tar_dir) 851 except Exception: 852 logging.error('Unable to extract %s from tarfile', member_filename) 853 return False 854 else: 855 try: 856 src = os.path.join(tar_dir, tar_name, member_filename) 857 dst = os.path.join(tar_dir, member_filename) 858 os.rename(src, dst) 859 os.rmdir(os.path.join(tar_dir, tar_name)) 860 return True 861 except OSError: 862 logging.error('Unable to move %s to new location', member_filename) 863 return False 864 865 866def bug_report_full_tar_path() -> str: 867 """ 868 Generate a full path for uncompressed bug report tar file. 869 The filename will not already exist. 870 871 :return: File name including path 872 """ 873 874 filename = 'rpd-bug-report-{}'.format(datetime.now().strftime('%Y%m%d')) 875 component = os.path.join(os.path.expanduser('~'), filename) 876 877 i = 0 878 while os.path.isfile('{}{}.tar'.format(component, '' if not i else '-{}'.format(i))): 879 i += 1 880 881 return '{}{}.tar'.format(component, '' if not i else '-{}'.format(i)) 882 883 884def create_bugreport_tar(full_tar_name: str, 885 log_path: Optional[str]='', 886 full_config_file: Optional[str]='') -> bool: 887 """ 888 Create a tar file containing log and configuration files. 889 890 If the file already exists, do nothing. 891 892 :param full_tar_name: the full path in which to create the tar file 893 :param log_path: path to the log files 894 :param full_config_file: the full path and file of the configuration file 895 :return: True if tar file created, else False 896 """ 897 898 if os.path.isfile(full_tar_name): 899 logging.error("Cannot create bug report tarfile, because it already exists") 900 return False 901 902 if not log_path: 903 log_path = os.path.join(xdg.BaseDirectory.xdg_cache_home, 'rapid-photo-downloader', 'log') 904 905 if not full_config_file: 906 config_dir = os.path.join(xdg.BaseDirectory.xdg_config_home, 'Rapid Photo Downloader') 907 config_file = 'Rapid Photo Downloader.conf' 908 else: 909 config_dir, config_file = os.path.split(full_config_file) 910 911 curr_dir = os.getcwd() 912 created = False 913 914 try: 915 with tarfile.open(full_tar_name, 'x') as t: 916 os.chdir(log_path) 917 for l in glob('*'): 918 t.add(l) 919 os.chdir(config_dir) 920 t.add(config_file) 921 except FileNotFoundError as e: 922 logging.error( 923 "When creating a bug report tar file, the directory or file %s does not exist", e.filename 924 ) 925 except Exception: 926 logging.exception("Unexpected error when creating bug report tar file") 927 else: 928 created = True 929 930 try: 931 os.chdir(curr_dir) 932 except FileNotFoundError: 933 pass 934 935 return created 936 937 938def current_version_is_dev_version(current_version=None) -> bool: 939 if current_version is None: 940 current_version = parse_version(__about__.__version__) 941 return current_version.is_prerelease 942 943 944def remove_topmost_directory_from_path(path: str) -> str: 945 if os.sep not in path: 946 return path 947 return path[path[1:].find(os.sep) + 1:] 948 949 950def arrow_locale(lang: str) -> str: 951 """ 952 Test if locale is suitable for use with Arrow. 953 :return: Return user locale if it works with Arrow, else Arrow default ('en_us') 954 """ 955 956 default = 'en_us' 957 if not lang: 958 try: 959 lang = locale.getdefaultlocale()[0] 960 except Exception: 961 return default 962 963 try: 964 arrow.locales.get_locale(lang) 965 return lang 966 except (ValueError, AttributeError): 967 return default 968 969 970def letters(x: int) -> str: 971 """ 972 Return a letter representation of a positive number. 973 974 Adapted from algorithm at 975 http://en.wikipedia.org/wiki/Hexavigesimal 976 977 >>> letters(0) 978 'a' 979 >>> letters(1) 980 'b' 981 >>> letters(2) 982 'c' 983 >>> letters(25) 984 'z' 985 >>> letters(26) 986 'aa' 987 >>> letters(27) 988 'ab' 989 >>> letters(28) 990 'ac' 991 """ 992 993 v = '' 994 while x > 25: 995 r = x % 26 996 x = x // 26 - 1 997 v = string.ascii_lowercase[r] + v 998 v = string.ascii_lowercase[x] + v 999 1000 return v 1001 1002 1003# Use to extract time zone information from date / times: 1004_flexible_dt_re = re.compile( 1005 r"""(?P<year>\d{4})[:-](?P<month>\d{2})[:-](?P<day>\d{2}) 1006 [\sT] # separator between date and time 1007 (?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2}) 1008 (?P<subsecond>\.\d{2})? 1009 (?P<timezone>([+-])\d{2}:\d{2})? 1010 (?P<dst>\s(DST))?""", re.VERBOSE 1011) 1012 1013 1014def flexible_date_time_parser(dt_string: str) -> Tuple[datetime, str]: 1015 r""" 1016 Use regular expresion to parse exif date time value, and attempt 1017 to convert it to a python date time. 1018 No error checking. 1019 1020 :param dt_string: date time from exif in string format 1021 :return: datetime, may or may not have a time zone, and format string 1022 1023 >>> flexible_date_time_parser('2018:09:03 14:00:13+01:00 DST') 1024 datetime.datetime(2018, 9, 3, 14, 0, 13, tzinfo=datetime.timezone(datetime.timedelta(0, 3600))) 1025 >>> flexible_date_time_parser('2010:07:18 01:53:35') 1026 datetime.datetime(2010, 7, 18, 1, 53, 35) 1027 >>> flexible_date_time_parser('2016:02:27 22:18:03.00') 1028 datetime.datetime(2016, 2, 27, 22, 18, 3) 1029 >>> flexible_date_time_parser('2010:05:25 17:43:16+02:00') 1030 datetime.datetime(2010, 5, 25, 17, 43, 16, tzinfo=datetime.timezone(datetime.timedelta(0, 7200))) 1031 >>> flexible_date_time_parser('2010:06:07 14:14:02+00:00') 1032 datetime.datetime(2010, 6, 7, 14, 14, 2, tzinfo=datetime.timezone.utc) 1033 >>> flexible_date_time_parser('2016-11-25T14:31:24') 1034 datetime.datetime(2016, 11, 25, 14, 31, 24) 1035 >>> flexible_date_time_parser('2016-11-25T14:20:09') 1036 datetime.datetime(2016, 11, 25, 14, 20, 9) 1037 """ 1038 1039 match = _flexible_dt_re.match(dt_string) 1040 assert match 1041 m = match.groupdict() 1042 1043 dte = '{}:{}:{} {}:{}:{}'.format( 1044 m['year'], m['month'], m['day'], m['hour'], m['minute'], m['second'] 1045 ) 1046 1047 fs = "%Y:%m:%d %H:%M:%S" # format string 1048 1049 ss = m['subsecond'] 1050 if ss: 1051 dte = '{}{}'.format(dte, ss) 1052 fs = '{}.%f'.format(fs) 1053 1054 tze = m['timezone'] 1055 if tze: 1056 dte = '{}{}'.format(dte, tze.replace(':', '')) 1057 fs = '{}%z'.format(fs) 1058 1059 # dst: daylight savings 1060 # no idea how to handle this properly -- so ignore for now! 1061 1062 return datetime.strptime(dte, fs), fs 1063 1064 1065def image_large_enough_fdo(size: QSize) -> bool: 1066 """ 1067 :param size: image size 1068 :return: True if image is large enough to meet the FreeDesktop.org 1069 specs for a large thumbnail 1070 """ 1071 1072 return size.width() >= 256 or size.height() >= 256 1073 1074 1075def is_venv(): 1076 """ 1077 :return: True if the python interpreter is running in venv or virtualenv 1078 """ 1079 1080 return hasattr(sys, 'real_prefix') or \ 1081 (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix) 1082 1083 1084def python_package_version(package: str) -> str: 1085 """ 1086 Determine the version of an installed Python package 1087 :param package: package name 1088 :return: version number, if could be determined, else '' 1089 """ 1090 1091 try: 1092 return pkg_resources.get_distribution(package).version 1093 except pkg_resources.DistributionNotFound: 1094 return '' 1095 1096 1097def is_snap() -> bool: 1098 """ 1099 Determine if program is running in a snap environment 1100 :return: True if it is False otherwise 1101 """ 1102 1103 snap_name = os.getenv('SNAP_NAME', '') 1104 return snap_name.find('rapid-photo-downloader') >= 0 1105 1106 1107def version_check_disabled(): 1108 """ 1109 Determine if version checking should be disabled or not 1110 :return: True if it should be False otherwise 1111 """ 1112 1113 return disable_version_check or is_snap() 1114 1115 1116def available_lang_codes() -> List[str]: 1117 """ 1118 Detect translations that exist for Rapid Photo Downloader 1119 :return: list of language codes 1120 """ 1121 1122 if localedir is not None: 1123 files = glob(os.path.join(localedir, '*', 'LC_MESSAGES', '%s.mo' % i18n_domain)) 1124 langs = [file.split(os.path.sep)[-3] for file in files] 1125 langs.append('en') 1126 return langs 1127 else: 1128 return [] 1129 1130 1131# Auto-generated from extract_language_names.py do not delete 1132substitute_languages = { 1133 'fa': 'Persian', 1134 'sk': 'Slovak', 1135 'it': 'Italian', 1136 'oc': 'Occitan (post 1500)', 1137 'fi': 'Finnish', 1138 'sv': 'Swedish', 1139 'cs': 'Czech', 1140 'pl': 'Polish', 1141 'kab': 'Kabyle', 1142 'tr': 'Turkish', 1143 'hr': 'Croatian', 1144 'nn': 'Norwegian Nynorsk', 1145 'da': 'Danish', 1146 'de': 'German', 1147 'sr': 'српски', 1148 'pt_BR': 'Brazilian Portuguese', 1149 'ja': 'Japanese', 1150 'bg': 'Bulgarian', 1151 'uk': 'Ukrainian', 1152 'ar': 'Arabic', 1153 'ca': 'Catalan', 1154 'nb': 'Norwegian Bokmal', 1155 'ru': 'Russian', 1156 'hu': 'magyar', 1157 'be': 'Belarusian', 1158 'es': 'Spanish', 1159 'pt': 'Portuguese', 1160 'zh_CN': 'Chinese (Simplified)', 1161 'fr': 'Français', 1162 'et': 'Estonian', 1163 'nl': 'Dutch', 1164 'ro': 'Romanian', 1165 'id': 'Indonesian', 1166 'el': 'Greek', 1167} # Auto-generated from extract_language_names.py do not delete 1168 1169 1170def get_language_display_name(lang_code: str, 1171 make_missing_lower: bool, 1172 locale_code: str) -> str: 1173 """ 1174 Use babel to the human friendly name for a locale, or failing that our 1175 auto-generated version 1176 :param lang_code: locale code for language to get the display name for 1177 :param make_missing_lower: whether to make the default name when 1178 babel does not suppply it lower case 1179 :param locale_code: current system locale code 1180 :return: human friendly version 1181 """ 1182 1183 try: 1184 return babel.Locale.parse(lang_code).get_display_name(locale_code) 1185 except babel.core.UnknownLocaleError: 1186 display = substitute_languages[lang_code] 1187 return display if not make_missing_lower else display.lower() 1188 1189 1190def available_languages(display_locale_code: str='') -> List[Tuple[str, str]]: 1191 """ 1192 Detect translations that exist for Rapid Photo Downloader 1193 :return: iterator of Tuple of language code and localized name 1194 """ 1195 1196 lang_codes = available_lang_codes() 1197 1198 if not lang_codes: # Testing code when translations are not installed 1199 lang_codes = ['en', 'de', 'es'] 1200 1201 if not display_locale_code: 1202 try: 1203 locale_code = locale.getdefaultlocale()[0] 1204 except Exception: 1205 locale_code = 'en_US' 1206 else: 1207 locale_code = display_locale_code 1208 1209 # Determine if this locale makes its language names lower case 1210 babel_sample = babel.Locale.parse('en').get_display_name(locale_code) 1211 make_missing_lower = babel_sample.islower() 1212 1213 langs = zip( 1214 lang_codes, [ 1215 get_language_display_name(code, make_missing_lower, locale_code) for code in lang_codes 1216 ] 1217 ) 1218 1219 # Sort languages by display name 1220 langs = list(langs) 1221 try: 1222 langs.sort(key=lambda l: locale.strxfrm(l[1])) 1223 except Exception: 1224 logging.error("Error sorting language names for display in program preferences") 1225 return langs 1226 1227 1228def installed_using_pip(package: str, suppress_errors: bool = True) -> bool: 1229 """ 1230 Determine if python package was installed in local directory using pip. 1231 1232 Determination is not 100% robust in all circumstances. 1233 1234 Exceptions are not caught. 1235 1236 :param package: package name to search for 1237 :param suppress_errors: if True, silently catch all exceptions and return False 1238 :return: True if installed via pip, else False 1239 """ 1240 1241 try: 1242 pkg = pkg_resources.get_distribution(package) 1243 location = pkg.location 1244 return not location.startswith('/usr') or location.find('local') > 0 1245 except Exception: 1246 if not suppress_errors: 1247 raise 1248 return False 1249 1250 1251def getQtSystemTranslation(locale_name: str) -> Optional[QTranslator]: 1252 """ 1253 Attempt to install Qt base system translations (for QMessageBox and QDialogBox buttons) 1254 :return: translator if loaded, else None 1255 """ 1256 1257 # These locales are found in the path QLibraryInfo.TranslationsPath 1258 convert_locale = dict( 1259 cs_CZ='cs', 1260 da_DK='da', 1261 de_DE='de', 1262 es_ES='es', 1263 fi_FI='fi', 1264 fr_FR='fr', 1265 it_IT='it', 1266 ja_JP='ja', 1267 hu_HU='hu', 1268 pl_PL='pl', 1269 ru_RU='ru', 1270 sk_SK='sk', 1271 uk_UA='uk', 1272 ) 1273 1274 qtTranslator = QTranslator() 1275 location = QLibraryInfo.location(QLibraryInfo.TranslationsPath) 1276 qm_file = "qtbase_{}.qm".format(convert_locale.get(locale_name, locale_name)) 1277 qm_file = os.path.join(location, qm_file) 1278 if os.path.isfile(qm_file): 1279 if qtTranslator.load(qm_file): 1280 logging.debug("Installing Qt support for locale %s", locale_name) 1281 return qtTranslator 1282 else: 1283 logging.debug("Could not load Qt locale file %s", qm_file) 1284