1# Copyright (C) 2015-2020 Damon Lynch <damonlynch@gmail.com> 2# Copyright (C) 2008-2015 Canonical Ltd. 3# Copyright (C) 2013 Bernard Baeyens 4 5# This file is part of Rapid Photo Downloader. 6# 7# Rapid Photo Downloader is free software: you can redistribute it and/or 8# modify it under the terms of the GNU General Public License as published by 9# the Free Software Foundation, either version 3 of the License, or 10# (at your option) any later version. 11# 12# Rapid Photo Downloader is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15# GNU General Public License for more details. 16# 17# You should have received a copy of the GNU General Public License 18# along with Rapid Photo Downloader. If not, 19# see <http://www.gnu.org/licenses/>. 20 21""" 22The primary task of this module is to handle addition and removal of 23(1) cameras and (2) devices with file systems. 24 25There are two scenarios: 26 271) User is running under a Gnome-like environment in which GVFS will 28automatically mount cameras and devices. We can monitor mounts and 29send a signal when something is mounted. The camera must be 30unmounted before libgphoto2 can access it, so we must handle that too. 31 322) User is running under a non Gnome-like environment (e.g. KDE) in 33which GVFS may or may not be running. However we can assume GVFS will 34not automatically mount cameras and devices. In this case, using GIO 35to monitor mounts is useless, as the mounts may not occur. So we must 36monitor when cameras and other devices are added or removed ourselves. 37To do this, use udev for cameras, and udisks2 for devices with file 38systems. When a device with a file system is inserted, if it is not 39already mounted, attempt to mount it. 40 41The secondary task of this module is to provide miscellaneous services 42regarding mount points and XDG related functionality. 43""" 44 45__author__ = 'Damon Lynch' 46__copyright__ = "Copyright 2011-2020, Damon Lynch. Copyright 2008-2015 Canonical Ltd. Copyright" \ 47 " 2013 Bernard Baeyens." 48 49import logging 50import os 51import re 52import sys 53import time 54import subprocess 55import shlex 56import pwd 57import shutil 58from collections import namedtuple 59from typing import Optional, Tuple, List, Dict, Set 60from urllib.request import pathname2url 61from urllib.parse import unquote_plus, quote, urlparse 62from tempfile import NamedTemporaryFile 63 64from PyQt5.QtCore import (QStorageInfo, QObject, pyqtSignal, QFileSystemWatcher, pyqtSlot, QTimer) 65from xdg.DesktopEntry import DesktopEntry 66from xdg import BaseDirectory 67import xdg 68 69import gi 70 71gi.require_version('GUdev', '1.0') 72gi.require_version('GExiv2', '0.10') 73gi.require_version('GLib', '2.0') 74from gi.repository import GUdev, GLib 75 76 77 78from raphodo.constants import ( 79 Desktop, Distro, FileManagerType, DefaultFileBrowserFallback, FileManagerBehavior 80) 81from raphodo.utilities import ( 82 process_running, log_os_release, remove_topmost_directory_from_path, find_mount_point 83) 84 85logging_level = logging.DEBUG 86 87try: 88 from gi.repository import Gio 89 90 have_gio = True 91except ImportError: 92 have_gio = False 93 94StorageSpace = namedtuple('StorageSpace', 'bytes_free, bytes_total, path') 95CameraDetails = namedtuple('CameraDetails', 'model, port, display_name, is_mtp, storage_desc') 96UdevAttr = namedtuple('UdevAttr', 'is_mtp_device, vendor, model') 97 98PROGRAM_DIRECTORY = 'rapid-photo-downloader' 99 100 101def get_distro_id(id_or_id_like: str) -> Distro: 102 if id_or_id_like[0] in ('"', "'"): 103 id_or_id_like = id_or_id_like[1:-1] 104 try: 105 return Distro[id_or_id_like.strip()] 106 except KeyError: 107 return Distro.unknown 108 109 110os_release = '/etc/os-release' 111 112 113# Sync get_distro() with code in install.py 114 115def get_distro() -> Distro: 116 """ 117 Determine the Linux distribution using /etc/os-release 118 """ 119 120 if os.path.isfile(os_release): 121 with open(os_release, 'r') as f: 122 for line in f: 123 if line.startswith('NAME='): 124 if line.find('elementary') > 0: 125 return Distro.elementary 126 if line.find('CentOS Linux') > 0: 127 return Distro.centos 128 if line.find('openSUSE') > 0: 129 return Distro.opensuse 130 if line.find('Deepin') > 0: 131 return Distro.deepin 132 if line.find('KDE neon') > 0: 133 return Distro.neon 134 if line.find('Zorin') > 0: 135 return Distro.zorin 136 if line.find('Kylin') > 0: 137 return Distro.kylin 138 if line.find('Pop!_OS') > 0: 139 return Distro.popos 140 if line.startswith('ID='): 141 return get_distro_id(line[3:]) 142 if line.startswith('ID_LIKE='): 143 return get_distro_id(line[8:]) 144 return Distro.unknown 145 146 147def get_user_name() -> str: 148 """ 149 Gets the user name of the process owner, with no exception checking 150 :return: user name of the process owner 151 """ 152 153 return pwd.getpwuid(os.getuid())[0] 154 155 156def get_path_display_name(path: str) -> Tuple[str, str]: 157 """ 158 Return a name for the path (path basename), 159 removing a final '/' when it's not the root of the 160 file system. 161 162 :param path: path to generate the display name for 163 :return: display name and sanitized path 164 """ 165 if path.endswith(os.sep) and path != os.sep: 166 path = path[:-1] 167 168 if path == os.sep: 169 display_name = _('File system root') 170 else: 171 display_name = os.path.basename(path) 172 return display_name, path 173 174 175def get_media_dir() -> str: 176 """ 177 Returns the media directory, i.e. where external mounts are mounted. 178 179 Assumes mount point of /media/<USER>. 180 181 """ 182 183 if sys.platform.startswith('linux') or sys.platform.startswith('freebsd'): 184 media_dir = '/media/{}'.format(get_user_name()) 185 run_media_dir = '/run/media' 186 distro = get_distro() 187 if os.path.isdir(run_media_dir) and distro not in ( 188 Distro.ubuntu, Distro.debian, Distro.neon, Distro.galliumos, Distro.peppermint, 189 Distro.elementary, Distro.zorin, Distro.popos): 190 if distro not in (Distro.fedora, Distro.manjaro, Distro.arch, Distro.opensuse, 191 Distro.gentoo, Distro.centos, Distro.centos7): 192 logging.debug( 193 "Detected /run/media directory, but distro does not appear to be CentOS, " 194 "Fedora, Arch, openSUSE, Gentoo, or Manjaro" 195 ) 196 log_os_release() 197 return run_media_dir 198 return media_dir 199 else: 200 raise ("Mounts.setValidMountPoints() not implemented on %s", sys.platform) 201 202 203_gvfs_gphoto2 = re.compile('gvfs.*gphoto2.*host') 204 205 206def gvfs_gphoto2_path(path: str) -> bool: 207 """ 208 :return: True if the path appears to be a GVFS gphoto2 path 209 210 >>> p = "/run/user/1000/gvfs/gphoto2:host=%5Busb%3A002%2C013%5D" 211 >>> gvfs_gphoto2_path(p) 212 True 213 >>> p = '/home/damon' 214 >>> gvfs_gphoto2_path(p) 215 False 216 """ 217 218 return _gvfs_gphoto2.search(path) is not None 219 220 221class ValidMounts(): 222 r""" 223 Operations to find 'valid' mount points, i.e. the places in which 224 it's sensible for a user to mount a partition. Valid mount points: 225 include /home/<USER> , /media/<USER>, and /run/media/<USER> 226 include directories in /etc/fstab, except /, /home, and swap 227 However if only considering external mounts, the the mount must be 228 under /media/<USER> or /run/media/<user> 229 """ 230 231 def __init__(self, onlyExternalMounts: bool): 232 """ 233 :param onlyExternalMounts: if True, valid mounts must be under 234 /media/<USER> or /run/media/<user> 235 """ 236 self.validMountFolders = None # type: Tuple[str] 237 self.onlyExternalMounts = onlyExternalMounts 238 self._setValidMountFolders() 239 assert '/' not in self.validMountFolders 240 if logging_level == logging.DEBUG: 241 self.logValidMountFolders() 242 243 def isValidMountPoint(self, mount: QStorageInfo) -> bool: 244 """ 245 Determine if the path of the mount point starts with a valid 246 path 247 :param mount: QStorageInfo to be tested 248 :return:True if mount is a mount under a valid mount, else False 249 """ 250 for m in self.validMountFolders: 251 if mount.rootPath().startswith(m): 252 return True 253 return False 254 255 def pathIsValidMountPoint(self, path: str) -> bool: 256 """ 257 Determine if path indicates a mount point under a valid mount 258 point 259 :param path: path to be tested 260 :return:True if path is a mount under a valid mount, else False 261 """ 262 for m in self.validMountFolders: 263 if path.startswith(m): 264 return True 265 return False 266 267 def mountedValidMountPointPaths(self) -> Tuple[str]: 268 """ 269 Return paths of all the currently mounted partitions that are 270 valid 271 :return: tuple of currently mounted valid partition paths 272 """ 273 274 return tuple(filter(self.pathIsValidMountPoint, mountPaths())) 275 276 def mountedValidMountPoints(self) -> Tuple[QStorageInfo]: 277 """ 278 Return mount points of all the currently mounted partitions 279 that are valid 280 :return: tuple of currently mounted valid partition 281 """ 282 283 return tuple(filter(self.isValidMountPoint, QStorageInfo.mountedVolumes())) 284 285 def _setValidMountFolders(self) -> None: 286 """ 287 Determine the valid mount point folders and set them in 288 self.validMountFolders, e.g. /media/<USER>, etc. 289 """ 290 291 if not sys.platform.startswith('linux') and not sys.platform.startswith('freebsd'): 292 raise ("Mounts.setValidMountPoints() not implemented on %s", sys.platform()) 293 else: 294 try: 295 media_dir = get_media_dir() 296 except: 297 logging.critical("Unable to determine username of this process") 298 media_dir = '' 299 logging.debug("Media dir is %s", media_dir) 300 if self.onlyExternalMounts: 301 self.validMountFolders = (media_dir, ) 302 else: 303 home_dir = os.path.expanduser('~') 304 validPoints = [home_dir, media_dir] 305 for point in self.mountPointInFstab(): 306 validPoints.append(point) 307 self.validMountFolders = tuple(validPoints) 308 309 def mountPointInFstab(self): 310 """ 311 Yields a list of mount points in /etc/fstab 312 The mount points will exclude /, /home, and swap 313 """ 314 315 with open('/etc/fstab') as f: 316 l = [] 317 for line in f: 318 # As per fstab specs: white space is either Tab or space 319 # Ignore comments, blank lines 320 # Also ignore swap file (mount point none), root, and /home 321 m = re.match(r'^(?![\t ]*#)\S+\s+(?!(none|/[\t ]|/home))(' 322 r'?P<point>\S+)', 323 line) 324 if m is not None: 325 yield (m.group('point')) 326 327 def logValidMountFolders(self): 328 """ 329 Output nicely formatted debug logging message 330 """ 331 332 assert len(self.validMountFolders) > 0 333 if logging_level == logging.DEBUG: 334 msg = "To be recognized, partitions must be mounted under " 335 if len(self.validMountFolders) > 2: 336 msg += "one of " 337 for p in self.validMountFolders[:-2]: 338 msg += "{}, ".format(p) 339 msg += "{} or {}".format(self.validMountFolders[-2], 340 self.validMountFolders[-1]) 341 elif len(self.validMountFolders) == 2: 342 msg += "{} or {}".format(self.validMountFolders[0], 343 self.validMountFolders[1]) 344 else: 345 msg += self.validMountFolders[0] 346 logging.debug(msg) 347 348 349def mountPaths(): 350 """ 351 Yield all the mount paths returned by QStorageInfo 352 """ 353 354 for m in QStorageInfo.mountedVolumes(): 355 yield m.rootPath() 356 357 358def has_one_or_more_folders(path: str, folders: List[str]) -> bool: 359 """ 360 Checks to see if directly below the path there is a folder 361 from the list of specified folders, and if the folder is readable. 362 :param path: path to check 363 :return: True if has one or more valid folders, False otherwise 364 """ 365 366 try: 367 contents = os.listdir(path) 368 for folder in folders: 369 if folder in contents: 370 full_path = os.path.join(path, folder) 371 if os.path.isdir(full_path) and os.access(full_path, os.R_OK): 372 return True 373 except (PermissionError, FileNotFoundError, OSError): 374 return False 375 except: 376 logging.error("Unknown error occurred while probing potential source folder %s", path) 377 return False 378 return False 379 380 381def get_desktop_environment() -> Optional[str]: 382 """ 383 Determine desktop environment using environment variable XDG_CURRENT_DESKTOP 384 385 :return: str with XDG_CURRENT_DESKTOP value 386 """ 387 388 return os.getenv('XDG_CURRENT_DESKTOP') 389 390 391def get_desktop() -> Desktop: 392 """ 393 Determine desktop environment 394 :return: enum representing desktop environment, 395 Desktop.unknown if unknown. 396 """ 397 398 try: 399 env = get_desktop_environment().lower() 400 except AttributeError: 401 # Occurs when there is no value set 402 return Desktop.unknown 403 404 if env == 'unity:unity7': 405 env = 'unity' 406 elif env == 'x-cinnamon': 407 env = 'cinnamon' 408 elif env == 'ubuntu:gnome': 409 env = 'ubuntugnome' 410 elif env == 'pop:gnome': 411 env = 'popgnome' 412 elif env == 'gnome-classic:gnome': 413 env = 'gnome' 414 elif env == 'budgie:gnome': 415 env = 'gnome' 416 elif env == 'zorin:gnome': 417 env = 'zorin' 418 419 try: 420 return Desktop[env] 421 except KeyError: 422 return Desktop.unknown 423 424 425def gvfs_controls_mounts() -> bool: 426 """ 427 Determine if GVFS controls mounts on this system. 428 429 By default, common desktop environments known to use it are assumed 430 to be using it or not. If not found in this list, then the list of 431 running processes is searched, looking for a match against 'gvfs-gphoto2', 432 which will match what is at the time of this code being developed called 433 'gvfs-gphoto2-volume-monitor', which is what we're most interested in. 434 435 :return: True if so, False otherwise 436 """ 437 438 desktop = get_desktop() 439 if desktop in (Desktop.gnome, Desktop.unity, Desktop.cinnamon, Desktop.xfce, 440 Desktop.mate, Desktop.lxde, Desktop.ubuntugnome, 441 Desktop.popgnome, Desktop.gnome, Desktop.lxqt, Desktop.pantheon): 442 return True 443 elif desktop == Desktop.kde: 444 return False 445 return process_running('gvfs-gphoto2') 446 447 448def _get_xdg_special_dir(dir_type: gi.repository.GLib.UserDirectory, 449 home_on_failure: bool=True) -> Optional[str]: 450 path = GLib.get_user_special_dir(dir_type) 451 if path is None and home_on_failure: 452 return os.path.expanduser('~') 453 return path 454 455def xdg_photos_directory(home_on_failure: bool=True) -> Optional[str]: 456 """ 457 Get localized version of /home/<USER>/Pictures 458 459 :param home_on_failure: if the directory does not exist, return 460 the home directory instead 461 :return: the directory if it is specified, else the user's 462 home directory or None 463 """ 464 return _get_xdg_special_dir(GLib.UserDirectory.DIRECTORY_PICTURES, home_on_failure) 465 466 467def xdg_videos_directory(home_on_failure: bool=True) -> str: 468 """ 469 Get localized version of /home/<USER>/Videos 470 471 :param home_on_failure: if the directory does not exist, return 472 the home directory instead 473 :return: the directory if it is specified, else the user's 474 home directory or None 475 """ 476 return _get_xdg_special_dir(GLib.UserDirectory.DIRECTORY_VIDEOS, home_on_failure) 477 478def xdg_desktop_directory(home_on_failure: bool=True) -> str: 479 """ 480 Get localized version of /home/<USER>/Desktop 481 482 :param home_on_failure: if the directory does not exist, return 483 the home directory instead 484 :return: the directory if it is specified, else the user's 485 home directory or None 486 """ 487 return _get_xdg_special_dir(GLib.UserDirectory.DIRECTORY_DESKTOP, home_on_failure) 488 489def xdg_photos_identifier() -> str: 490 """ 491 Get special subfoler indicated by the localized version of /home/<USER>/Pictures 492 :return: the subfolder name if it is specified, else the localized version of 'Pictures' 493 """ 494 495 path = _get_xdg_special_dir(GLib.UserDirectory.DIRECTORY_PICTURES, home_on_failure=False) 496 if path is None: 497 # translators: the name of the Pictures folder 498 return _('Pictures') 499 return os.path.basename(path) 500 501def xdg_videos_identifier() -> str: 502 """ 503 Get special subfoler indicated by the localized version of /home/<USER>/Pictures 504 :return: the subfolder name if it is specified, else the localized version of 'Pictures' 505 """ 506 507 path = _get_xdg_special_dir(GLib.UserDirectory.DIRECTORY_VIDEOS, home_on_failure=False) 508 if path is None: 509 # translators: the name of the Videos folder 510 return _('Videos') 511 return os.path.basename(path) 512 513 514def make_program_directory(path: str) -> str: 515 """ 516 Creates a subfolder used by Rapid Photo Downloader. 517 518 Does not catch errors. 519 520 :param path: location where the subfolder should be 521 :return: the full path of the new directory 522 """ 523 program_dir = os.path.join(path, 'rapid-photo-downloader') 524 if not os.path.exists(program_dir): 525 os.mkdir(program_dir) 526 elif not os.path.isdir(program_dir): 527 os.remove(program_dir) 528 os.mkdir(program_dir) 529 return program_dir 530 531 532def get_program_cache_directory(create_if_not_exist: bool = False) -> Optional[str]: 533 """ 534 Get Rapid Photo Downloader cache directory. 535 536 Is assumed to be under $XDG_CACHE_HOME or if that doesn't exist, 537 ~/.cache. 538 :param create_if_not_exist: creates directory if it does not exist. 539 :return: the full path of the cache directory, or None on error 540 """ 541 try: 542 cache_directory = BaseDirectory.xdg_cache_home 543 if not create_if_not_exist: 544 return os.path.join(cache_directory, PROGRAM_DIRECTORY) 545 else: 546 return make_program_directory(cache_directory) 547 except OSError: 548 logging.error("An error occurred while creating the cache directory") 549 return None 550 551 552def get_program_logging_directory(create_if_not_exist: bool = False) -> Optional[str]: 553 """ 554 Get directory in which to store program log files. 555 556 Log files are kept in the cache dirctory. 557 558 :param create_if_not_exist: 559 :return: the full path of the logging directory, or None on error 560 """ 561 cache_directory = get_program_cache_directory(create_if_not_exist=create_if_not_exist) 562 log_dir = os.path.join(cache_directory, 'log') 563 if os.path.isdir(log_dir): 564 return log_dir 565 if create_if_not_exist: 566 try: 567 if os.path.isfile(log_dir): 568 os.remove(log_dir) 569 os.mkdir(log_dir, 0o700) 570 return log_dir 571 except OSError: 572 logging.error("An error occurred while creating the log directory") 573 return None 574 575 576def get_program_data_directory(create_if_not_exist=False) -> Optional[str]: 577 """ 578 Get Rapid Photo Downloader data directory, which is assumed to be 579 under $XDG_DATA_HOME or if that doesn't exist, ~/.local/share 580 :param create_if_not_exist: creates directory if it does not exist. 581 :return: the full path of the data directory, or None on error 582 """ 583 try: 584 data_directory = BaseDirectory.xdg_data_dirs[0] 585 if not create_if_not_exist: 586 return os.path.join(data_directory, PROGRAM_DIRECTORY) 587 else: 588 return make_program_directory(data_directory) 589 except OSError: 590 logging.error("An error occurred while creating the data directory") 591 return None 592 593 594def get_fdo_cache_thumb_base_directory() -> str: 595 """ 596 Get the Freedesktop.org thumbnail directory location 597 :return: location 598 """ 599 600 # LXDE is a special case: handle it 601 if get_desktop() == Desktop.lxde: 602 return os.path.join(os.path.expanduser('~'), '.thumbnails') 603 604 return os.path.join(BaseDirectory.xdg_cache_home, 'thumbnails') 605 606 607# Module level variables important for determining among other things the generation of URIs 608# Pretty ugly, but the alternative is passing values around between several processes 609_desktop = get_desktop() 610_quoted_comma = quote(',') 611_default_file_manager_probed = False 612_default_file_manager = None 613_default_file_manager_type = None 614 615 616def _default_file_manager_for_desktop() -> Tuple[Optional[str], Optional[FileManagerType]]: 617 """ 618 If default file manager cannot be determined using system tools, guess 619 based on desktop environment. 620 621 Sets module level globals if found. 622 623 :return: file manager command (without path), and type; if not detected, (None, None) 624 """ 625 626 global _default_file_manager 627 global _default_file_manager_type 628 629 try: 630 fm = '' 631 fm = DefaultFileBrowserFallback[_desktop.name] 632 assert shutil.which(fm) 633 t = FileManagerBehavior[fm] 634 _default_file_manager = fm 635 _default_file_manager_type = t 636 return fm, t 637 except KeyError: 638 logging.debug("Error determining default file manager") 639 return None, None 640 except AssertionError: 641 logging.debug("Default file manager %s cannot be found", fm) 642 return None, None 643 644 645def get_default_file_manager() -> Tuple[Optional[str], Optional[FileManagerType]]: 646 """ 647 Attempt to determine the default file manager for the system 648 :param remove_args: if True, remove any arguments such as %U from 649 the returned command 650 :return: file manager command (without path), and type; if not detected, (None, None) 651 """ 652 653 global _default_file_manager_probed 654 global _default_file_manager 655 global _default_file_manager_type 656 657 if _default_file_manager_probed: 658 return _default_file_manager, _default_file_manager_type 659 660 _default_file_manager_probed = True 661 662 assert sys.platform.startswith('linux') or sys.platform.startswith('freebsd') 663 cmd = shlex.split('xdg-mime query default inode/directory') 664 try: 665 desktop_file = subprocess.check_output(cmd, universal_newlines=True) # type: str 666 except: 667 return _default_file_manager_for_desktop() 668 669 # Remove new line character from output 670 desktop_file = desktop_file[:-1] 671 if desktop_file.endswith(';'): 672 desktop_file = desktop_file[:-1] 673 674 for desktop_path in (os.path.join(d, 'applications') for d in BaseDirectory.xdg_data_dirs): 675 path = os.path.join(desktop_path, desktop_file) 676 if os.path.exists(path): 677 try: 678 desktop_entry = DesktopEntry(path) 679 except xdg.Exceptions.ParsingError: 680 return _default_file_manager_for_desktop() 681 try: 682 desktop_entry.parse(path) 683 except: 684 return _default_file_manager_for_desktop() 685 686 fm = desktop_entry.getExec() 687 688 # Strip away any extraneous arguments 689 fm_cmd = fm.split()[0] 690 # Strip away any path information 691 fm_cmd = os.path.split(fm_cmd)[1] 692 # Strip away any quotes 693 fm_cmd = fm_cmd.replace('"', '') 694 fm_cmd = fm_cmd.replace("'", '') 695 696 # Unhelpful results 697 invalid_file_managers = ('baobab', 'exo-open', 'RawTherapee', 'ART') 698 for invalid_file_manger in invalid_file_managers: 699 if fm_cmd.startswith(invalid_file_manger): 700 logging.warning('%s is an invalid file manager: will substitute', fm) 701 return _default_file_manager_for_desktop() 702 703 # Nonexistent file managers 704 if shutil.which(fm_cmd) is None: 705 logging.warning('Default file manager %s does not exist: will substitute', fm) 706 return _default_file_manager_for_desktop() 707 708 try: 709 file_manager_type = FileManagerBehavior[fm_cmd] 710 except KeyError: 711 file_manager_type = FileManagerType.regular 712 713 _default_file_manager = fm_cmd 714 _default_file_manager_type = file_manager_type 715 return _default_file_manager, file_manager_type 716 717 # Special case: no base dirs set, e.g. LXQt 718 return _default_file_manager_for_desktop() 719 720 721def open_in_file_manager(file_manager: str, 722 file_manager_type: FileManagerType, 723 uri: str) -> None: 724 """ 725 Open a directory or file in the file manager. 726 727 If the item is a file, then try to select it in the file manager, 728 rather than opening it directly. 729 730 :param file_manager: the file manager to use 731 :param file_manager_type: file manager behavior 732 :param uri: the URI (path) to open. Assumes file:// or gphoto2:// schema 733 """ 734 735 arg = '' 736 path = unquote_plus(urlparse(uri).path) 737 if not os.path.isdir(path): 738 if file_manager_type == FileManagerType.select: 739 arg = '--select ' 740 elif file_manager_type == FileManagerType.show_item: 741 arg = '--show-item ' 742 elif file_manager_type == FileManagerType.show_items: 743 arg = '--show-items ' 744 745 cmd = '{} {}{}'.format(file_manager, arg, uri) 746 logging.debug("Launching: %s", cmd) 747 args = shlex.split(cmd) 748 subprocess.Popen(args) 749 750 751def get_uri(full_file_name: Optional[str]=None, 752 path: Optional[str]=None, 753 camera_details: Optional[CameraDetails]=None, 754 desktop_environment: Optional[bool]=True) -> str: 755 """ 756 Generate and return the URI for the file, which varies depending on 757 which device it is 758 759 :param full_file_name: full filename and path 760 :param path: straight path when not passing a full_file_name 761 :param camera_details: see named tuple CameraDetails for parameters 762 :param desktop_environment: if True, will to generate a URI accepted 763 by Gnome, KDE and other desktops, which means adjusting the URI if it appears to be an 764 MTP mount. Includes the port too, for cameras. Takes into account 765 file manager characteristics. 766 :return: the URI 767 """ 768 769 if not _default_file_manager_probed: 770 get_default_file_manager() 771 772 if camera_details is None: 773 prefix = 'file://' 774 if desktop_environment: 775 if full_file_name and _default_file_manager_type == FileManagerType.dir_only_uri: 776 full_file_name = os.path.dirname(full_file_name) 777 else: 778 if not desktop_environment: 779 if full_file_name or path: 780 prefix = 'gphoto2://' 781 else: 782 prefix = 'gphoto2://' + pathname2url('[{}]'.format(camera_details.port)) 783 else: 784 prefix = '' 785 # Attempt to generate a URI accepted by desktop environments 786 if camera_details.is_mtp: 787 if full_file_name: 788 full_file_name = remove_topmost_directory_from_path(full_file_name) 789 elif path: 790 path = remove_topmost_directory_from_path(path) 791 792 if gvfs_controls_mounts() or _desktop == Desktop.lxqt: 793 prefix = 'mtp://' + pathname2url( 794 '[{}]/{}'.format(camera_details.port, camera_details.storage_desc) 795 ) 796 elif _desktop == Desktop.kde: 797 prefix = 'mtp:/' + pathname2url( 798 '{}/{}'.format(camera_details.display_name, camera_details.storage_desc) 799 ) 800 else: 801 logging.error("Don't know how to generate MTP prefix for %s", _desktop.name) 802 else: 803 prefix = 'gphoto2://' + pathname2url('[{}]'.format(camera_details.port)) 804 805 if _default_file_manager == 'pcmanfm-qt': 806 # pcmanfm-qt does not like the quoted form of the comma 807 prefix = prefix.replace(_quoted_comma, ',') 808 if full_file_name: 809 # pcmanfm-qt does not like the the filename as part of the path 810 full_file_name = os.path.dirname(full_file_name) 811 812 813 if full_file_name or path: 814 uri = '{}{}'.format(prefix, pathname2url(full_file_name or path)) 815 else: 816 uri = prefix 817 return uri 818 819 820ValidatedFolder = namedtuple('ValidatedFolder', 'valid, absolute_path') 821 822 823def validate_download_folder(path: Optional[str], 824 write_on_waccesss_failure: bool=False) -> ValidatedFolder: 825 r""" 826 Check if folder exists and is writeable. 827 828 Accepts None as a folder, which will always be invalid. 829 830 :param path: path to analyze 831 :param write_on_waccesss_failure: if os.access reports path is not writable, test 832 nonetheless to see if it's writable by writing and deleting a test file 833 :return: Tuple indicating validity and path made absolute 834 835 >>> validate_download_folder('/some/bogus/and/ridiculous/path') 836 ValidatedFolder(valid=False, absolute_path='/some/bogus/and/ridiculous/path') 837 >>> validate_download_folder(None) 838 ValidatedFolder(valid=False, absolute_path='') 839 >>> validate_download_folder('') 840 ValidatedFolder(valid=False, absolute_path='') 841 """ 842 843 if not path: 844 return ValidatedFolder(False, '') 845 absolute_path = os.path.abspath(path) 846 valid = os.path.isdir(path) and os.access(path, os.W_OK) 847 if not valid and write_on_waccesss_failure and os.path.isdir(path): 848 try: 849 with NamedTemporaryFile(dir=path): 850 # the path is in fact writeable -- can happen with NFS 851 valid = True 852 except Exception: 853 logging.warning( 854 'While validating download / backup folder, failed to write a temporary file to ' 855 '%s', path 856 ) 857 858 return ValidatedFolder(valid, absolute_path) 859 860 861def validate_source_folder(path: Optional[str]) -> ValidatedFolder: 862 r""" 863 Check if folder exists and is readable. 864 865 Accepts None as a folder, which will always be invalid. 866 867 :param path: path to analyze 868 :return: Tuple indicating validity and path made absolute 869 870 >>> validate_source_folder('/some/bogus/and/ridiculous/path') 871 ValidatedFolder(valid=False, absolute_path='/some/bogus/and/ridiculous/path') 872 >>> validate_source_folder(None) 873 ValidatedFolder(valid=False, absolute_path='') 874 >>> validate_source_folder('') 875 ValidatedFolder(valid=False, absolute_path='') 876 """ 877 878 if not path: 879 return ValidatedFolder(False, '') 880 absolute_path = os.path.abspath(path) 881 valid = os.path.isdir(path) and os.access(path, os.R_OK) 882 return ValidatedFolder(valid, absolute_path) 883 884 885def udev_attributes(devname: str) -> Optional[UdevAttr]: 886 """ 887 Query udev to see if device is an MTP device. 888 889 :param devname: udev DEVNAME e.g. '/dev/bus/usb/001/003' 890 :return True if udev property ID_MTP_DEVICE == '1', else False 891 """ 892 893 client = GUdev.Client(subsystems=['usb', 'block']) 894 enumerator = GUdev.Enumerator.new(client) 895 enumerator.add_match_property('DEVNAME', devname) 896 for device in enumerator.execute(): 897 model = device.get_property('ID_MODEL') # type: str 898 if model is not None: 899 is_mtp = device.get_property('ID_MTP_DEVICE') == '1' or \ 900 device.get_property('ID_MEDIA_PLAYER') == '1' 901 vendor = device.get_property('ID_VENDOR') # type: str 902 model = model.replace('_', ' ').strip() 903 vendor = vendor.replace('_', ' ').strip() 904 return UdevAttr(is_mtp, vendor, model) 905 return None 906 907 908def udev_is_camera(devname: str) -> bool: 909 """ 910 Query udev to see if device is a gphoto2 device (a camera or phone) 911 :param devname: udev DEVNAME e.g. '/dev/bus/usb/001/003' 912 :return: True if so, else False 913 """ 914 915 client = GUdev.Client(subsystems=['usb', 'block']) 916 enumerator = GUdev.Enumerator.new(client) 917 enumerator.add_match_property('DEVNAME', devname) 918 for device in enumerator.execute(): 919 if device.get_property('ID_GPHOTO2') == '1': 920 return True 921 return False 922 923 924def fs_device_details(path: str) -> Tuple: 925 """ 926 :return: device (volume) name, uri, root path and filesystem type 927 of the mount the path is on 928 """ 929 qsInfo = QStorageInfo(path) 930 name = qsInfo.displayName() 931 root_path = qsInfo.rootPath() 932 uri = 'file://{}'.format(pathname2url(root_path)) 933 fstype = qsInfo.fileSystemType() 934 if isinstance(fstype, bytes): 935 fstype = fstype.decode() 936 return name, uri, root_path, fstype 937 938 939class WatchDownloadDirs(QFileSystemWatcher): 940 """ 941 Create a file system watch to monitor if there are changes to the 942 download directories 943 """ 944 945 def updateWatchPathsFromPrefs(self, prefs) -> None: 946 """ 947 Update the watched directories using values from the program preferences 948 :param prefs: program preferences 949 :type prefs: raphodo.preferences.Preferences 950 """ 951 952 logging.debug("Updating watched paths") 953 954 paths = (os.path.dirname(path) for path in (prefs.photo_download_folder, 955 prefs.video_download_folder)) 956 watch = {path for path in paths if path} 957 958 existing_watches = set(self.directories()) 959 960 if watch == existing_watches: 961 return 962 963 new = watch - existing_watches 964 if new: 965 new = list(new) 966 logging.debug("Adding to watched paths: %s", ', '.join(new)) 967 failures = self.addPaths(new) 968 if failures: 969 logging.debug("Failed to add watched paths: %s", failures) 970 971 old = existing_watches - watch 972 if old: 973 old = list(old) 974 logging.debug("Removing from watched paths: %s", ', '.join(old)) 975 failures = self.removePaths(old) 976 if failures: 977 logging.debug("Failed to remove watched paths: %s", failures) 978 979 def closeWatch(self) -> None: 980 """ 981 End all watches. 982 """ 983 dirs = self.directories() 984 if dirs: 985 self.removePaths(dirs) 986 987 988class CameraHotplug(QObject): 989 cameraAdded = pyqtSignal() 990 cameraRemoved = pyqtSignal() 991 992 def __init__(self): 993 super().__init__() 994 self.cameras = {} 995 996 @pyqtSlot() 997 def startMonitor(self): 998 self.client = GUdev.Client(subsystems=['usb', 'block']) 999 self.client.connect('uevent', self.ueventCallback) 1000 logging.debug("... camera hotplug monitor started") 1001 self.enumerateCameras() 1002 if self.cameras: 1003 logging.info( 1004 "Camera Hotplug found %d camera(s): %s", len(self.cameras), ', '.join( 1005 (model for port, model in self.cameras.items()) 1006 ) 1007 ) 1008 for port, model in self.cameras.items(): 1009 logging.debug("%s is at %s", model, port) 1010 1011 def enumerateCameras(self): 1012 """ 1013 Query udev to get the list of cameras store their path and 1014 model in our internal dict, which is useful when responding to 1015 camera removal. 1016 """ 1017 enumerator = GUdev.Enumerator.new(self.client) 1018 enumerator.add_match_property('ID_GPHOTO2', '1') 1019 for device in enumerator.execute(): 1020 model = device.get_property('ID_MODEL') 1021 if model is not None: 1022 path = device.get_sysfs_path() 1023 self.cameras[path] = model 1024 1025 def ueventCallback(self, client: GUdev.Client, action: str, device: GUdev.Device) -> None: 1026 1027 # for key in device.get_property_keys(): 1028 # print(key, device.get_property(key)) 1029 if device.get_property('ID_GPHOTO2') == '1': 1030 self.camera(action, device) 1031 1032 def camera(self, action: str, device: GUdev.Device) -> None: 1033 # For some reason, the add and remove camera event is triggered twice. 1034 # The second time the device information is a variation on information 1035 # from the first time. 1036 path = device.get_sysfs_path() 1037 parent_device = device.get_parent() 1038 parent_path = parent_device.get_sysfs_path() 1039 logging.debug("Device change: %s. Path: %s Parent path: %s", action, path, parent_path) 1040 1041 # Ignore 'bind' action: seems to add nothing we need to know 1042 1043 if action == 'add': 1044 if parent_path not in self.cameras: 1045 model = device.get_property('ID_MODEL') 1046 logging.info("Hotplug: new camera: %s", model.replace('_', ' ')) 1047 self.cameras[path] = model 1048 self.cameraAdded.emit() 1049 else: 1050 logging.debug("Hotplug: already know about %s", self.cameras[parent_path]) 1051 1052 elif action == 'remove': 1053 emit_remove = False 1054 name = '' 1055 1056 # A path might look like: 1057 # /sys/devices/pci0000:00/0000:00:1c.6/0000:0e:00.0/usb3/3-2/3-2:1.0 1058 # When what we want is: 1059 # /sys/devices/pci0000:00/0000:00:1c.6/0000:0e:00.0/usb3/3-2 1060 # This unchanged path used to work, so test both the unchanged and modified 1061 # path 1062 # Note enumerateCameras() above finds only the path as in the 2nd type, without the 1063 # 3-2:1.0 1064 split_path = os.path.split(path)[0] 1065 1066 for p in (path, split_path): 1067 if p in self.cameras: 1068 name = self.cameras[p] 1069 logging.debug("Hotplug: removing %s on basis of path %s", name, p) 1070 del self.cameras[p] 1071 emit_remove = True 1072 break 1073 1074 if emit_remove: 1075 logging.info("Hotplug: %s has been removed", name) 1076 self.cameraRemoved.emit() 1077 1078 1079if have_gio: 1080 class GVolumeMonitor(QObject): 1081 r""" 1082 Monitor the mounting or unmounting of cameras or partitions 1083 using Gnome's GIO/GVFS. Unmount cameras automatically mounted 1084 by GVFS. 1085 1086 Raises a signal if a volume has been inserted, but will not be 1087 automatically mounted. This is important because this class 1088 is monitoring mounts, and if the volume is not mounted, it will 1089 go unnoticed. 1090 """ 1091 1092 cameraUnmounted = pyqtSignal(bool, str, str, bool, bool) 1093 cameraMounted = pyqtSignal() 1094 partitionMounted = pyqtSignal(str, list, bool) 1095 partitionUnmounted = pyqtSignal(str) 1096 volumeAddedNoAutomount = pyqtSignal() 1097 cameraPossiblyRemoved = pyqtSignal() 1098 cameraVolumeAdded = pyqtSignal(str) 1099 1100 def __init__(self, validMounts: ValidMounts) -> None: 1101 super().__init__() 1102 self.vm = Gio.VolumeMonitor.get() 1103 self.vm.connect('mount-added', self.mountAdded) 1104 self.vm.connect('volume-added', self.volumeAdded) 1105 self.vm.connect('mount-removed', self.mountRemoved) 1106 self.vm.connect('volume-removed', self.volumeRemoved) 1107 self.portSearch = re.compile(r'usb:([\d]+),([\d]+)') 1108 self.scsiPortSearch = re.compile(r'usbscsi:(.+)') 1109 self.possibleCamera = re.compile(r'/usb/([\d]+)/([\d]+)') 1110 self.validMounts = validMounts 1111 self.camera_volumes_added = dict() # type: Dict[str, str] 1112 self.camera_volumes_mounted = set() # type: Set[str] 1113 1114 @staticmethod 1115 def mountMightBeCamera(mount: Gio.Mount) -> bool: 1116 """ 1117 :param mount: the mount to check 1118 :return: True if the mount needs to be checked if it is a camera 1119 """ 1120 return not mount.is_shadowed() and mount.get_volume() is not None 1121 1122 def unixDevicePathIsCamera(self, devname: str) -> bool: 1123 """ 1124 Test if the device at unix device path devname is a camera 1125 :param devname: Gio.VOLUME_IDENTIFIER_KIND_UNIX_DEVICE device path 1126 e.g. '/dev/bus/usb/001/003' 1127 :return: True if camera else False 1128 """ 1129 1130 return self.possibleCamera.search(devname) is not None and udev_is_camera(devname) 1131 1132 def ptpCameraMountPoint(self, model: str, port: str) -> Optional[Gio.Mount]: 1133 """ 1134 :return: the mount point of the PTP / MTP camera, if it is mounted, 1135 else None. If camera is not mounted with PTP / MTP, None is 1136 returned. 1137 """ 1138 1139 p = self.portSearch.match(port) 1140 if p is not None: 1141 p1 = p.group(1) 1142 p2 = p.group(2) 1143 device_path = '/dev/bus/usb/{}/{}'.format(p1, p2) 1144 return self.cameraMountPointByUnixDevice(device_path=device_path) 1145 else: 1146 p = self.scsiPortSearch.match(port) 1147 if p is None: 1148 logging.error("Unknown camera mount method %s %s", model, port) 1149 return None 1150 1151 def cameraMountPointByUnixDevice(self, device_path: str) -> Optional[Gio.Mount]: 1152 """ 1153 :return: the mount point of the PTP / MTP camera, if it is mounted, 1154 else None. If camera is not mounted with PTP / MTP, None is 1155 returned. 1156 """ 1157 1158 to_unmount = None 1159 1160 for mount in self.vm.get_mounts(): 1161 if self.mountMightBeCamera(mount): 1162 identifier = mount.get_volume().get_identifier( 1163 Gio.VOLUME_IDENTIFIER_KIND_UNIX_DEVICE 1164 ) 1165 if device_path == identifier: 1166 to_unmount = mount 1167 break 1168 return to_unmount 1169 1170 @pyqtSlot(str, str, bool, bool, int) 1171 def reUnmountCamera(self, model: str, 1172 port: str, 1173 download_starting: bool, 1174 on_startup: bool, 1175 attempt_no: int) -> None: 1176 1177 logging.info( 1178 "Attempt #%s to unmount camera %s on port %s", 1179 attempt_no + 1, model, port 1180 ) 1181 self.unmountCamera( 1182 model=model, port=port, download_starting=download_starting, on_startup=on_startup, 1183 attempt_no=attempt_no 1184 ) 1185 1186 def unmountCamera(self, model: str, 1187 port: str, 1188 download_starting: bool=False, 1189 on_startup: bool=False, 1190 mount_point: Optional[Gio.Mount]=None, 1191 attempt_no: Optional[int]=0) -> bool: 1192 """ 1193 Unmount camera mounted on gvfs mount point, if it is 1194 mounted. If not mounted, ignore. 1195 :param model: model as returned by libgphoto2 1196 :param port: port as returned by libgphoto2, in format like 1197 usb:001,004 1198 :param download_starting: if True, the unmount is occurring 1199 because a download has been initiated. 1200 :param on_startup: if True, the unmount is occurring during 1201 the program's startup phase 1202 :param mount_point: if not None, try umounting from this 1203 mount point without scanning for it first 1204 :return: True if an unmount operation has been initiated, 1205 else returns False. 1206 """ 1207 1208 if mount_point is None: 1209 to_unmount = self.ptpCameraMountPoint(model, port) 1210 else: 1211 to_unmount = mount_point 1212 1213 if to_unmount is not None: 1214 logging.debug("GIO: Attempting to unmount %s...", model) 1215 to_unmount.unmount_with_operation( 1216 0, None, None, self.unmountCameraCallback, 1217 (model, port, download_starting, on_startup, attempt_no) 1218 ) 1219 return True 1220 1221 return False 1222 1223 def unmountCameraCallback(self, mount: Gio.Mount, 1224 result: Gio.AsyncResult, 1225 user_data: Tuple[str, str, bool, bool]) -> None: 1226 """ 1227 Called by the asynchronous unmount operation. 1228 When complete, emits a signal indicating operation 1229 success, and the camera model and port 1230 :param mount: camera mount 1231 :param result: result of the unmount process 1232 :param user_data: model and port of the camera being 1233 unmounted, in the format of libgphoto2 1234 """ 1235 1236 model, port, download_starting, on_startup, attempt_no = user_data 1237 try: 1238 if mount.unmount_with_operation_finish(result): 1239 logging.debug("...successfully unmounted {}".format(model)) 1240 self.cameraUnmounted.emit(True, model, port, download_starting, on_startup) 1241 else: 1242 logging.debug("...failed to unmount {}".format(model)) 1243 self.cameraUnmounted.emit(False, model, port, download_starting, on_startup) 1244 except GLib.GError as e: 1245 if e.code == 26 and attempt_no < 10: 1246 attempt_no += 1 1247 QTimer.singleShot( 1248 750, lambda : self.reUnmountCamera( 1249 model, port, download_starting, 1250 on_startup, attempt_no 1251 ) 1252 ) 1253 else: 1254 logging.error('Exception occurred unmounting {}'.format(model)) 1255 logging.exception('Traceback:') 1256 self.cameraUnmounted.emit(False, model, port, download_starting, on_startup) 1257 1258 def unmountVolume(self, path: str) -> None: 1259 """ 1260 Unmounts the volume represented by the path. If no volume is found 1261 representing that path, nothing happens. 1262 1263 :param path: path of the volume. It should not end with os.sep. 1264 """ 1265 1266 for mount in self.vm.get_mounts(): 1267 root = mount.get_root() 1268 if root is not None: 1269 mpath = root.get_path() 1270 if path == mpath: 1271 logging.info("Attempting to unmount %s...", path) 1272 mount.unmount_with_operation( 1273 0, None, None, self.unmountVolumeCallback, path 1274 ) 1275 break 1276 1277 @staticmethod 1278 def unmountVolumeCallback(mount: Gio.Mount, 1279 result: Gio.AsyncResult, 1280 user_data: str) -> None: 1281 1282 """ 1283 Called by the asynchronous unmount operation. 1284 1285 :param mount: volume mount 1286 :param result: result of the unmount process 1287 :param user_data: the path of the device unmounted 1288 """ 1289 path = user_data 1290 1291 try: 1292 if mount.unmount_with_operation_finish(result): 1293 logging.info("...successfully unmounted %s", path) 1294 else: 1295 logging.info("...failed to unmount %s", path) 1296 except GLib.GError as e: 1297 logging.error('Exception occurred unmounting %s', path) 1298 logging.exception('Traceback:') 1299 1300 def mountIsCamera(self, mount: Gio.Mount) -> bool: 1301 """ 1302 Determine if the mount refers to a camera by checking the 1303 path to see if gphoto2 or mtp is in the last folder in the 1304 root path. 1305 1306 Does not query udev, deliberately. This can be called when device 1307 is being unmounted. Unclear if the device is still on the system 1308 at this point, or how realible that is even if it is. 1309 1310 :param mount: mount to check 1311 :return: True if mount refers to a camera, else False 1312 """ 1313 1314 if self.mountMightBeCamera(mount): 1315 root = mount.get_root() 1316 if root is None: 1317 logging.warning('Unable to get mount root') 1318 else: 1319 path = root.get_path() 1320 if path: 1321 logging.debug("GIO: Looking for camera at mount {}".format(path)) 1322 # check last two levels of the path name, as it might be in a format like 1323 # /run/..../gvfs/gphoto2:host=Canon_Inc._Canon_Digital_Camera/store_00010001 1324 for i in (1, 2): 1325 path, folder_name = os.path.split(path) 1326 if folder_name: 1327 for s in ('gphoto2:host=', 'mtp:host='): 1328 if folder_name.startswith(s): 1329 return True 1330 return False 1331 1332 def mountIsPartition(self, mount: Gio.Mount) -> bool: 1333 """ 1334 Determine if the mount point is that of a valid partition, 1335 i.e. is mounted in a valid location, which is under one of 1336 self.validMountDirs 1337 :param mount: the mount to examine 1338 :return: True if the mount is a valid partiion 1339 """ 1340 root = mount.get_root() 1341 if root is None: 1342 logging.warning('Unable to get mount root') 1343 else: 1344 path = root.get_path() 1345 if path: 1346 logging.debug("GIO: Looking for valid partition at mount {}".format(path)) 1347 if self.validMounts.pathIsValidMountPoint(path): 1348 logging.debug("GIO: partition found at {}".format(path)) 1349 return True 1350 if path is not None: 1351 logging.debug("GIO: partition is not valid mount: {}".format(path)) 1352 return False 1353 1354 def mountAdded(self, volumeMonitor, mount: Gio.Mount) -> None: 1355 """ 1356 Determine if mount is valid partition or is a camera, or something 1357 else. 1358 1359 :param volumeMonitor: not used 1360 :param mount: the mount to examine 1361 """ 1362 1363 logging.debug("Examining mount %s", mount.get_name()) 1364 try: 1365 identifier = mount.get_volume().get_identifier( 1366 Gio.VOLUME_IDENTIFIER_KIND_UNIX_DEVICE 1367 ) 1368 if identifier in self.camera_volumes_added: 1369 logging.debug("%s is now mounted", self.camera_volumes_added[identifier]) 1370 self.camera_volumes_mounted.add(identifier) 1371 self.cameraMounted.emit() 1372 return 1373 except Exception: 1374 pass 1375 1376 if self.mountIsCamera(mount): 1377 # Can be called on startup if camera was already mounted in GIO before the program 1378 # started. In that case, previous check would not have detected the camera. 1379 self.cameraMounted.emit() 1380 elif self.mountIsPartition(mount): 1381 icon_names = self.getIconNames(mount) 1382 self.partitionMounted.emit( 1383 mount.get_root().get_path(), icon_names, mount.can_eject() 1384 ) 1385 1386 def mountRemoved(self, volumeMonitor, mount: Gio.Mount) -> None: 1387 if not self.mountIsCamera(mount): 1388 if self.mountIsPartition(mount): 1389 logging.debug("GIO: %s has been unmounted", mount.get_name()) 1390 self.partitionUnmounted.emit(mount.get_root().get_path()) 1391 1392 def volumeAdded(self, volumeMonitor, volume: Gio.Volume) -> None: 1393 volume_name = volume.get_name() 1394 logging.debug( 1395 "GIO: Volume added %s. Automount: %s (might be incorrect)", 1396 volume_name, volume.should_automount() 1397 ) 1398 1399 # Even if volume.should_automount(), the volume in fact may not be mounted 1400 # automatically. Unbelievable. 1401 1402 device_path = volume.get_identifier( 1403 Gio.VOLUME_IDENTIFIER_KIND_UNIX_DEVICE 1404 ) 1405 if device_path is None: 1406 logging.error("Unable to determine device path of %s", volume_name) 1407 elif self.unixDevicePathIsCamera(device_path): 1408 self.camera_volumes_added[device_path] = volume_name 1409 logging.debug( 1410 "%s is a camera at %s", volume_name, device_path 1411 ) 1412 # Time is in milliseconds; 3000 is 3 seconds. 1413 QTimer.singleShot(3000, lambda: self.cameraVolumeAddedCheckMount(device_path)) 1414 1415 def cameraVolumeAddedCheckMount(self, device_path) -> None: 1416 if device_path not in self.camera_volumes_mounted: 1417 logging.debug( 1418 "%s had not been automatically mounted. Will initiate camera scan.", 1419 self.camera_volumes_added[device_path] 1420 ) 1421 self.cameraVolumeAdded.emit(device_path) 1422 else: 1423 logging.debug( 1424 "%s had been automatically mounted", self.camera_volumes_added[device_path] 1425 ) 1426 1427 def volumeRemoved(self, volumeMonitor, volume: Gio.Volume) -> None: 1428 logging.debug("GIO: %s volume removed", volume.get_name()) 1429 if volume.get_activation_root() is not None: 1430 logging.debug("GIO: %s might be a camera", volume.get_name()) 1431 self.cameraPossiblyRemoved.emit() 1432 1433 @staticmethod 1434 def getIconNames(mount: Gio.Mount) -> List[str]: 1435 """ 1436 Get icons for the mount from theme 1437 1438 :param mount: 1439 :return: 1440 """ 1441 icon_names = [] 1442 icon = mount.get_icon() 1443 if isinstance(icon, Gio.ThemedIcon): 1444 icon_names = icon.get_names() 1445 1446 return icon_names 1447 1448 def getProps(self, path: str) -> Tuple[Optional[List[str]], Optional[bool]]: 1449 """ 1450 Given a mount's path, get the icon names suggested by the 1451 volume monitor, and determine whether the mount is 1452 ejectable or not. 1453 :param path: the path of mount to check 1454 :return: icon names and eject boolean 1455 """ 1456 1457 for mount in self.vm.get_mounts(): 1458 root = mount.get_root() 1459 if root is not None: 1460 p = root.get_path() 1461 if path == p: 1462 icon_names = self.getIconNames(mount) 1463 return (icon_names, mount.can_eject()) 1464 return (None, None) 1465 1466 1467def _get_info_size_value(info: Gio.FileInfo, attr: str) -> int: 1468 if info.get_attribute_data(attr).type == Gio.FileAttributeType.UINT64: 1469 return info.get_attribute_uint64(attr) 1470 else: 1471 return info.get_attribute_uint32(attr) 1472 1473 1474def get_mount_size(mount: QStorageInfo) -> Tuple[int, int]: 1475 """ 1476 Uses GIO to get bytes total and bytes free (available) for the mount that a 1477 path is in. 1478 1479 :param path: path located anywhere in the mount 1480 :return: bytes_total, bytes_free 1481 """ 1482 1483 bytes_free = mount.bytesAvailable() 1484 bytes_total = mount.bytesTotal() 1485 1486 if bytes_total or not have_gio: 1487 return bytes_total, bytes_free 1488 1489 path = mount.rootPath() 1490 1491 logging.debug("Using GIO to query file system attributes for %s...", path) 1492 p = Gio.File.new_for_path(os.path.abspath(path)) 1493 info = p.query_filesystem_info( 1494 ','.join( 1495 (Gio.FILE_ATTRIBUTE_FILESYSTEM_SIZE, Gio.FILE_ATTRIBUTE_FILESYSTEM_FREE) 1496 ) 1497 ) 1498 logging.debug("...query of file system attributes for %s completed", path) 1499 bytes_total = _get_info_size_value(info, Gio.FILE_ATTRIBUTE_FILESYSTEM_SIZE) 1500 bytes_free = _get_info_size_value(info, Gio.FILE_ATTRIBUTE_FILESYSTEM_FREE) 1501 return bytes_total, bytes_free 1502