1# Copyright (C) 2015-2020 Damon Lynch <damonlynch@gmail.com>
2# Copyright (C) 2008-2015 Canonical Ltd.
3# Copyright (C) 2013 Bernard Baeyens
4
5# This file is part of Rapid Photo Downloader.
6#
7# Rapid Photo Downloader is free software: you can redistribute it and/or
8# modify it under the terms of the GNU General Public License as published by
9# the Free Software Foundation, either version 3 of the License, or
10# (at your option) any later version.
11#
12# Rapid Photo Downloader is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with Rapid Photo Downloader.  If not,
19# see <http://www.gnu.org/licenses/>.
20
21"""
22The primary task of this module is to handle addition and removal of
23(1) cameras and (2) devices with file systems.
24
25There are two scenarios:
26
271) User is running under a Gnome-like environment in which GVFS will
28automatically mount cameras and devices. We can monitor mounts and
29send a signal when something is mounted. The camera must be
30unmounted before libgphoto2 can access it, so we must handle that too.
31
322) User is running under a non Gnome-like environment (e.g. KDE) in
33which GVFS may or may not be running. However we can assume GVFS will
34not automatically mount cameras and devices. In this case, using GIO
35to monitor mounts is useless, as the mounts may not occur. So we must
36monitor when cameras and other devices are added or removed ourselves.
37To do this, use udev for cameras, and udisks2 for devices with file
38systems. When a device with a file system is inserted, if it is not
39already mounted, attempt to mount it.
40
41The secondary task of this module is to provide miscellaneous services
42regarding mount points and XDG related functionality.
43"""
44
45__author__ = 'Damon Lynch'
46__copyright__ = "Copyright 2011-2020, Damon Lynch. Copyright 2008-2015 Canonical Ltd. Copyright" \
47                " 2013 Bernard Baeyens."
48
49import logging
50import os
51import re
52import sys
53import time
54import subprocess
55import shlex
56import pwd
57import shutil
58from collections import namedtuple
59from typing import Optional, Tuple, List, Dict, Set
60from urllib.request import pathname2url
61from urllib.parse import unquote_plus, quote, urlparse
62from tempfile import NamedTemporaryFile
63
64from PyQt5.QtCore import (QStorageInfo, QObject, pyqtSignal, QFileSystemWatcher, pyqtSlot, QTimer)
65from xdg.DesktopEntry import DesktopEntry
66from xdg import BaseDirectory
67import xdg
68
69import gi
70
71gi.require_version('GUdev', '1.0')
72gi.require_version('GExiv2', '0.10')
73gi.require_version('GLib', '2.0')
74from gi.repository import GUdev, GLib
75
76
77
78from raphodo.constants import (
79    Desktop, Distro, FileManagerType, DefaultFileBrowserFallback, FileManagerBehavior
80)
81from raphodo.utilities import (
82    process_running, log_os_release, remove_topmost_directory_from_path, find_mount_point
83)
84
85logging_level = logging.DEBUG
86
87try:
88    from gi.repository import Gio
89
90    have_gio = True
91except ImportError:
92    have_gio = False
93
94StorageSpace = namedtuple('StorageSpace', 'bytes_free, bytes_total, path')
95CameraDetails = namedtuple('CameraDetails', 'model, port, display_name, is_mtp, storage_desc')
96UdevAttr = namedtuple('UdevAttr', 'is_mtp_device, vendor, model')
97
98PROGRAM_DIRECTORY = 'rapid-photo-downloader'
99
100
101def get_distro_id(id_or_id_like: str) -> Distro:
102    if id_or_id_like[0] in ('"', "'"):
103        id_or_id_like = id_or_id_like[1:-1]
104    try:
105        return Distro[id_or_id_like.strip()]
106    except KeyError:
107        return Distro.unknown
108
109
110os_release = '/etc/os-release'
111
112
113# Sync get_distro() with code in install.py
114
115def get_distro() -> Distro:
116    """
117    Determine the Linux distribution using /etc/os-release
118    """
119
120    if os.path.isfile(os_release):
121        with open(os_release, 'r') as f:
122            for line in f:
123                if line.startswith('NAME='):
124                    if line.find('elementary') > 0:
125                        return Distro.elementary
126                    if line.find('CentOS Linux') > 0:
127                        return Distro.centos
128                    if line.find('openSUSE') > 0:
129                        return Distro.opensuse
130                    if line.find('Deepin') > 0:
131                        return Distro.deepin
132                    if line.find('KDE neon') > 0:
133                        return Distro.neon
134                    if line.find('Zorin') > 0:
135                        return Distro.zorin
136                    if line.find('Kylin') > 0:
137                        return Distro.kylin
138                    if line.find('Pop!_OS') > 0:
139                        return Distro.popos
140                if line.startswith('ID='):
141                    return get_distro_id(line[3:])
142                if line.startswith('ID_LIKE='):
143                    return get_distro_id(line[8:])
144    return Distro.unknown
145
146
147def get_user_name() -> str:
148    """
149    Gets the user name of the process owner, with no exception checking
150    :return: user name of the process owner
151    """
152
153    return pwd.getpwuid(os.getuid())[0]
154
155
156def get_path_display_name(path: str) -> Tuple[str, str]:
157    """
158    Return a name for the path (path basename),
159    removing a final '/' when it's not the root of the
160    file system.
161
162    :param path: path to generate the display name for
163    :return: display name and sanitized path
164    """
165    if path.endswith(os.sep) and path != os.sep:
166        path = path[:-1]
167
168    if path == os.sep:
169        display_name = _('File system root')
170    else:
171        display_name = os.path.basename(path)
172    return display_name, path
173
174
175def get_media_dir() -> str:
176    """
177    Returns the media directory, i.e. where external mounts are mounted.
178
179    Assumes mount point of /media/<USER>.
180
181    """
182
183    if sys.platform.startswith('linux') or sys.platform.startswith('freebsd'):
184        media_dir = '/media/{}'.format(get_user_name())
185        run_media_dir = '/run/media'
186        distro = get_distro()
187        if os.path.isdir(run_media_dir) and distro not in (
188                Distro.ubuntu, Distro.debian, Distro.neon, Distro.galliumos, Distro.peppermint,
189                Distro.elementary, Distro.zorin, Distro.popos):
190            if distro not in (Distro.fedora, Distro.manjaro, Distro.arch, Distro.opensuse,
191                              Distro.gentoo, Distro.centos, Distro.centos7):
192                logging.debug(
193                    "Detected /run/media directory, but distro does not appear to be CentOS, "
194                    "Fedora, Arch, openSUSE, Gentoo, or Manjaro"
195                )
196                log_os_release()
197            return run_media_dir
198        return media_dir
199    else:
200        raise ("Mounts.setValidMountPoints() not implemented on %s", sys.platform)
201
202
203_gvfs_gphoto2 = re.compile('gvfs.*gphoto2.*host')
204
205
206def gvfs_gphoto2_path(path: str) -> bool:
207    """
208    :return: True if the path appears to be a GVFS gphoto2 path
209
210    >>> p = "/run/user/1000/gvfs/gphoto2:host=%5Busb%3A002%2C013%5D"
211    >>> gvfs_gphoto2_path(p)
212    True
213    >>> p = '/home/damon'
214    >>> gvfs_gphoto2_path(p)
215    False
216    """
217
218    return _gvfs_gphoto2.search(path) is not None
219
220
221class ValidMounts():
222    r"""
223    Operations to find 'valid' mount points, i.e. the places in which
224    it's sensible for a user to mount a partition. Valid mount points:
225    include /home/<USER> , /media/<USER>, and /run/media/<USER>
226    include directories in /etc/fstab, except /, /home, and swap
227    However if only considering external mounts, the the mount must be
228    under /media/<USER> or /run/media/<user>
229    """
230
231    def __init__(self, onlyExternalMounts: bool):
232        """
233        :param onlyExternalMounts: if True, valid mounts must be under
234        /media/<USER> or /run/media/<user>
235        """
236        self.validMountFolders = None  # type: Tuple[str]
237        self.onlyExternalMounts = onlyExternalMounts
238        self._setValidMountFolders()
239        assert '/' not in self.validMountFolders
240        if logging_level == logging.DEBUG:
241            self.logValidMountFolders()
242
243    def isValidMountPoint(self, mount: QStorageInfo) -> bool:
244        """
245        Determine if the path of the mount point starts with a valid
246        path
247        :param mount: QStorageInfo to be tested
248        :return:True if mount is a mount under a valid mount, else False
249        """
250        for m in self.validMountFolders:
251            if mount.rootPath().startswith(m):
252                return True
253        return False
254
255    def pathIsValidMountPoint(self, path: str) -> bool:
256        """
257        Determine if path indicates a mount point under a valid mount
258        point
259        :param path: path to be tested
260        :return:True if path is a mount under a valid mount, else False
261        """
262        for m in self.validMountFolders:
263            if path.startswith(m):
264                return True
265        return False
266
267    def mountedValidMountPointPaths(self) -> Tuple[str]:
268        """
269        Return paths of all the currently mounted partitions that are
270        valid
271        :return: tuple of currently mounted valid partition paths
272        """
273
274        return tuple(filter(self.pathIsValidMountPoint, mountPaths()))
275
276    def mountedValidMountPoints(self) -> Tuple[QStorageInfo]:
277        """
278        Return mount points of all the currently mounted partitions
279        that are valid
280        :return: tuple of currently mounted valid partition
281        """
282
283        return tuple(filter(self.isValidMountPoint, QStorageInfo.mountedVolumes()))
284
285    def _setValidMountFolders(self) -> None:
286        """
287        Determine the valid mount point folders and set them in
288        self.validMountFolders, e.g. /media/<USER>, etc.
289        """
290
291        if not sys.platform.startswith('linux') and not sys.platform.startswith('freebsd'):
292            raise ("Mounts.setValidMountPoints() not implemented on %s", sys.platform())
293        else:
294            try:
295                media_dir = get_media_dir()
296            except:
297                logging.critical("Unable to determine username of this process")
298                media_dir = ''
299            logging.debug("Media dir is %s", media_dir)
300            if self.onlyExternalMounts:
301                self.validMountFolders = (media_dir, )
302            else:
303                home_dir = os.path.expanduser('~')
304                validPoints = [home_dir, media_dir]
305                for point in self.mountPointInFstab():
306                    validPoints.append(point)
307                self.validMountFolders = tuple(validPoints)
308
309    def mountPointInFstab(self):
310        """
311        Yields a list of mount points in /etc/fstab
312        The mount points will exclude /, /home, and swap
313        """
314
315        with open('/etc/fstab') as f:
316            l = []
317            for line in f:
318                # As per fstab specs: white space is either Tab or space
319                # Ignore comments, blank lines
320                # Also ignore swap file (mount point none), root, and /home
321                m = re.match(r'^(?![\t ]*#)\S+\s+(?!(none|/[\t ]|/home))('
322                             r'?P<point>\S+)',
323                             line)
324                if m is not None:
325                    yield (m.group('point'))
326
327    def logValidMountFolders(self):
328        """
329        Output nicely formatted debug logging message
330        """
331
332        assert len(self.validMountFolders) > 0
333        if logging_level == logging.DEBUG:
334            msg = "To be recognized, partitions must be mounted under "
335            if len(self.validMountFolders) > 2:
336                msg += "one of "
337                for p in self.validMountFolders[:-2]:
338                    msg += "{}, ".format(p)
339                msg += "{} or {}".format(self.validMountFolders[-2],
340                                         self.validMountFolders[-1])
341            elif len(self.validMountFolders) == 2:
342                msg += "{} or {}".format(self.validMountFolders[0],
343                                         self.validMountFolders[1])
344            else:
345                msg += self.validMountFolders[0]
346            logging.debug(msg)
347
348
349def mountPaths():
350    """
351    Yield all the mount paths returned by QStorageInfo
352    """
353
354    for m in QStorageInfo.mountedVolumes():
355        yield m.rootPath()
356
357
358def has_one_or_more_folders(path: str, folders: List[str]) -> bool:
359    """
360    Checks to see if directly below the path there is a folder
361    from the list of specified folders, and if the folder is readable.
362    :param path: path to check
363    :return: True if has one or more valid folders, False otherwise
364    """
365
366    try:
367        contents = os.listdir(path)
368        for folder in folders:
369            if folder in contents:
370                full_path = os.path.join(path, folder)
371                if os.path.isdir(full_path) and os.access(full_path, os.R_OK):
372                    return True
373    except (PermissionError, FileNotFoundError, OSError):
374        return False
375    except:
376        logging.error("Unknown error occurred while probing potential source folder %s", path)
377        return False
378    return False
379
380
381def get_desktop_environment() -> Optional[str]:
382    """
383    Determine desktop environment using environment variable XDG_CURRENT_DESKTOP
384
385    :return: str with XDG_CURRENT_DESKTOP value
386    """
387
388    return os.getenv('XDG_CURRENT_DESKTOP')
389
390
391def get_desktop() -> Desktop:
392    """
393    Determine desktop environment
394    :return: enum representing desktop environment,
395    Desktop.unknown if unknown.
396    """
397
398    try:
399        env = get_desktop_environment().lower()
400    except AttributeError:
401        # Occurs when there is no value set
402        return Desktop.unknown
403
404    if env == 'unity:unity7':
405        env = 'unity'
406    elif env == 'x-cinnamon':
407        env = 'cinnamon'
408    elif env == 'ubuntu:gnome':
409        env = 'ubuntugnome'
410    elif env == 'pop:gnome':
411        env = 'popgnome'
412    elif env == 'gnome-classic:gnome':
413        env = 'gnome'
414    elif env == 'budgie:gnome':
415        env = 'gnome'
416    elif env == 'zorin:gnome':
417        env = 'zorin'
418
419    try:
420        return Desktop[env]
421    except KeyError:
422        return Desktop.unknown
423
424
425def gvfs_controls_mounts() -> bool:
426    """
427    Determine if GVFS controls mounts on this system.
428
429    By default, common desktop environments known to use it are assumed
430    to be using it or not. If not found in this list, then the list of
431    running processes is searched, looking for a match against 'gvfs-gphoto2',
432    which will match what is at the time of this code being developed called
433    'gvfs-gphoto2-volume-monitor', which is what we're most interested in.
434
435    :return: True if so, False otherwise
436    """
437
438    desktop = get_desktop()
439    if desktop in (Desktop.gnome, Desktop.unity, Desktop.cinnamon, Desktop.xfce,
440                   Desktop.mate, Desktop.lxde, Desktop.ubuntugnome,
441                   Desktop.popgnome, Desktop.gnome, Desktop.lxqt, Desktop.pantheon):
442        return True
443    elif desktop == Desktop.kde:
444        return False
445    return process_running('gvfs-gphoto2')
446
447
448def _get_xdg_special_dir(dir_type: gi.repository.GLib.UserDirectory,
449                         home_on_failure: bool=True) -> Optional[str]:
450    path = GLib.get_user_special_dir(dir_type)
451    if path is None and home_on_failure:
452        return os.path.expanduser('~')
453    return path
454
455def xdg_photos_directory(home_on_failure: bool=True) -> Optional[str]:
456    """
457    Get localized version of /home/<USER>/Pictures
458
459    :param home_on_failure: if the directory does not exist, return
460     the home directory instead
461    :return: the directory if it is specified, else the user's
462    home directory or None
463    """
464    return _get_xdg_special_dir(GLib.UserDirectory.DIRECTORY_PICTURES, home_on_failure)
465
466
467def xdg_videos_directory(home_on_failure: bool=True) -> str:
468    """
469    Get localized version of /home/<USER>/Videos
470
471    :param home_on_failure: if the directory does not exist, return
472     the home directory instead
473    :return: the directory if it is specified, else the user's
474    home directory or None
475    """
476    return _get_xdg_special_dir(GLib.UserDirectory.DIRECTORY_VIDEOS, home_on_failure)
477
478def xdg_desktop_directory(home_on_failure: bool=True) -> str:
479    """
480    Get localized version of /home/<USER>/Desktop
481
482    :param home_on_failure: if the directory does not exist, return
483     the home directory instead
484    :return: the directory if it is specified, else the user's
485    home directory or None
486    """
487    return _get_xdg_special_dir(GLib.UserDirectory.DIRECTORY_DESKTOP, home_on_failure)
488
489def xdg_photos_identifier() -> str:
490    """
491    Get special subfoler indicated by the localized version of /home/<USER>/Pictures
492    :return: the subfolder name if it is specified, else the localized version of 'Pictures'
493    """
494
495    path = _get_xdg_special_dir(GLib.UserDirectory.DIRECTORY_PICTURES, home_on_failure=False)
496    if path is None:
497        # translators: the name of the Pictures folder
498        return _('Pictures')
499    return os.path.basename(path)
500
501def xdg_videos_identifier() -> str:
502    """
503    Get special subfoler indicated by the localized version of /home/<USER>/Pictures
504    :return: the subfolder name if it is specified, else the localized version of 'Pictures'
505    """
506
507    path = _get_xdg_special_dir(GLib.UserDirectory.DIRECTORY_VIDEOS, home_on_failure=False)
508    if path is None:
509        # translators: the name of the Videos folder
510        return _('Videos')
511    return os.path.basename(path)
512
513
514def make_program_directory(path: str) -> str:
515    """
516    Creates a subfolder used by Rapid Photo Downloader.
517
518    Does not catch errors.
519
520    :param path: location where the subfolder should be
521    :return: the full path of the new directory
522    """
523    program_dir = os.path.join(path, 'rapid-photo-downloader')
524    if not os.path.exists(program_dir):
525        os.mkdir(program_dir)
526    elif not os.path.isdir(program_dir):
527        os.remove(program_dir)
528        os.mkdir(program_dir)
529    return program_dir
530
531
532def get_program_cache_directory(create_if_not_exist: bool = False) -> Optional[str]:
533    """
534    Get Rapid Photo Downloader cache directory.
535
536    Is assumed to be under $XDG_CACHE_HOME or if that doesn't exist,
537     ~/.cache.
538    :param create_if_not_exist: creates directory if it does not exist.
539    :return: the full path of the cache directory, or None on error
540    """
541    try:
542        cache_directory = BaseDirectory.xdg_cache_home
543        if not create_if_not_exist:
544            return os.path.join(cache_directory, PROGRAM_DIRECTORY)
545        else:
546            return make_program_directory(cache_directory)
547    except OSError:
548        logging.error("An error occurred while creating the cache directory")
549        return None
550
551
552def get_program_logging_directory(create_if_not_exist: bool = False) -> Optional[str]:
553    """
554    Get directory in which to store program log files.
555
556    Log files are kept in the cache dirctory.
557
558    :param create_if_not_exist:
559    :return: the full path of the logging directory, or None on error
560    """
561    cache_directory = get_program_cache_directory(create_if_not_exist=create_if_not_exist)
562    log_dir = os.path.join(cache_directory, 'log')
563    if os.path.isdir(log_dir):
564        return log_dir
565    if create_if_not_exist:
566        try:
567            if os.path.isfile(log_dir):
568                os.remove(log_dir)
569            os.mkdir(log_dir, 0o700)
570            return log_dir
571        except OSError:
572            logging.error("An error occurred while creating the log directory")
573    return None
574
575
576def get_program_data_directory(create_if_not_exist=False) -> Optional[str]:
577    """
578    Get Rapid Photo Downloader data directory, which is assumed to be
579    under $XDG_DATA_HOME or if that doesn't exist,  ~/.local/share
580    :param create_if_not_exist: creates directory if it does not exist.
581    :return: the full path of the data directory, or None on error
582    """
583    try:
584        data_directory = BaseDirectory.xdg_data_dirs[0]
585        if not create_if_not_exist:
586            return os.path.join(data_directory, PROGRAM_DIRECTORY)
587        else:
588            return make_program_directory(data_directory)
589    except OSError:
590        logging.error("An error occurred while creating the data directory")
591        return None
592
593
594def get_fdo_cache_thumb_base_directory() -> str:
595    """
596    Get the Freedesktop.org thumbnail directory location
597    :return: location
598    """
599
600    # LXDE is a special case: handle it
601    if get_desktop() == Desktop.lxde:
602        return os.path.join(os.path.expanduser('~'), '.thumbnails')
603
604    return os.path.join(BaseDirectory.xdg_cache_home, 'thumbnails')
605
606
607# Module level variables important for determining among other things the generation of URIs
608# Pretty ugly, but the alternative is passing values around between several processes
609_desktop = get_desktop()
610_quoted_comma = quote(',')
611_default_file_manager_probed = False
612_default_file_manager = None
613_default_file_manager_type = None
614
615
616def _default_file_manager_for_desktop() -> Tuple[Optional[str], Optional[FileManagerType]]:
617    """
618    If default file manager cannot be determined using system tools, guess
619    based on desktop environment.
620
621    Sets module level globals if found.
622
623    :return: file manager command (without path), and type; if not detected, (None, None)
624    """
625
626    global _default_file_manager
627    global _default_file_manager_type
628
629    try:
630        fm = ''
631        fm = DefaultFileBrowserFallback[_desktop.name]
632        assert shutil.which(fm)
633        t = FileManagerBehavior[fm]
634        _default_file_manager = fm
635        _default_file_manager_type = t
636        return fm, t
637    except KeyError:
638        logging.debug("Error determining default file manager")
639        return None, None
640    except AssertionError:
641        logging.debug("Default file manager %s cannot be found", fm)
642        return None, None
643
644
645def get_default_file_manager() -> Tuple[Optional[str], Optional[FileManagerType]]:
646    """
647    Attempt to determine the default file manager for the system
648    :param remove_args: if True, remove any arguments such as %U from
649     the returned command
650    :return: file manager command (without path), and type; if not detected, (None, None)
651    """
652
653    global _default_file_manager_probed
654    global _default_file_manager
655    global _default_file_manager_type
656
657    if _default_file_manager_probed:
658        return _default_file_manager, _default_file_manager_type
659
660    _default_file_manager_probed = True
661
662    assert sys.platform.startswith('linux') or sys.platform.startswith('freebsd')
663    cmd = shlex.split('xdg-mime query default inode/directory')
664    try:
665        desktop_file = subprocess.check_output(cmd, universal_newlines=True)  # type: str
666    except:
667        return _default_file_manager_for_desktop()
668
669    # Remove new line character from output
670    desktop_file = desktop_file[:-1]
671    if desktop_file.endswith(';'):
672        desktop_file = desktop_file[:-1]
673
674    for desktop_path in (os.path.join(d, 'applications') for d in BaseDirectory.xdg_data_dirs):
675        path = os.path.join(desktop_path, desktop_file)
676        if os.path.exists(path):
677            try:
678                desktop_entry = DesktopEntry(path)
679            except xdg.Exceptions.ParsingError:
680                return _default_file_manager_for_desktop()
681            try:
682                desktop_entry.parse(path)
683            except:
684                return _default_file_manager_for_desktop()
685
686            fm = desktop_entry.getExec()
687
688            # Strip away any extraneous arguments
689            fm_cmd = fm.split()[0]
690            # Strip away any path information
691            fm_cmd = os.path.split(fm_cmd)[1]
692            # Strip away any quotes
693            fm_cmd = fm_cmd.replace('"', '')
694            fm_cmd = fm_cmd.replace("'", '')
695
696            # Unhelpful results
697            invalid_file_managers = ('baobab', 'exo-open', 'RawTherapee', 'ART')
698            for invalid_file_manger in invalid_file_managers:
699                if fm_cmd.startswith(invalid_file_manger):
700                    logging.warning('%s is an invalid file manager: will substitute', fm)
701                    return _default_file_manager_for_desktop()
702
703            # Nonexistent file managers
704            if shutil.which(fm_cmd) is None:
705                logging.warning('Default file manager %s does not exist: will substitute', fm)
706                return _default_file_manager_for_desktop()
707
708            try:
709                file_manager_type = FileManagerBehavior[fm_cmd]
710            except KeyError:
711                file_manager_type = FileManagerType.regular
712
713            _default_file_manager = fm_cmd
714            _default_file_manager_type = file_manager_type
715            return _default_file_manager, file_manager_type
716
717    # Special case: no base dirs set, e.g. LXQt
718    return _default_file_manager_for_desktop()
719
720
721def open_in_file_manager(file_manager: str,
722                         file_manager_type: FileManagerType,
723                         uri: str) -> None:
724    """
725    Open a directory or file in the file manager.
726
727    If the item is a file, then try to select it in the file manager,
728    rather than opening it directly.
729
730    :param file_manager: the file manager to use
731    :param file_manager_type: file manager behavior
732    :param uri: the URI (path) to open. Assumes file:// or gphoto2:// schema
733    """
734
735    arg = ''
736    path = unquote_plus(urlparse(uri).path)
737    if not os.path.isdir(path):
738        if file_manager_type == FileManagerType.select:
739            arg = '--select '
740        elif file_manager_type == FileManagerType.show_item:
741            arg = '--show-item '
742        elif file_manager_type == FileManagerType.show_items:
743            arg = '--show-items '
744
745    cmd = '{} {}{}'.format(file_manager, arg, uri)
746    logging.debug("Launching: %s", cmd)
747    args = shlex.split(cmd)
748    subprocess.Popen(args)
749
750
751def get_uri(full_file_name: Optional[str]=None,
752            path: Optional[str]=None,
753            camera_details: Optional[CameraDetails]=None,
754            desktop_environment: Optional[bool]=True) -> str:
755    """
756    Generate and return the URI for the file, which varies depending on
757    which device it is
758
759    :param full_file_name: full filename and path
760    :param path: straight path when not passing a full_file_name
761    :param camera_details: see named tuple CameraDetails for parameters
762    :param desktop_environment: if True, will to generate a URI accepted
763     by Gnome, KDE and other desktops, which means adjusting the URI if it appears to be an
764     MTP mount. Includes the port too, for cameras. Takes into account
765     file manager characteristics.
766    :return: the URI
767    """
768
769    if not _default_file_manager_probed:
770        get_default_file_manager()
771
772    if camera_details is None:
773        prefix = 'file://'
774        if desktop_environment:
775            if full_file_name and _default_file_manager_type == FileManagerType.dir_only_uri:
776                full_file_name = os.path.dirname(full_file_name)
777    else:
778        if not desktop_environment:
779            if full_file_name or path:
780                prefix = 'gphoto2://'
781            else:
782                prefix = 'gphoto2://' + pathname2url('[{}]'.format(camera_details.port))
783        else:
784            prefix = ''
785            # Attempt to generate a URI accepted by desktop environments
786            if camera_details.is_mtp:
787                if full_file_name:
788                    full_file_name = remove_topmost_directory_from_path(full_file_name)
789                elif path:
790                    path = remove_topmost_directory_from_path(path)
791
792                if gvfs_controls_mounts() or _desktop == Desktop.lxqt:
793                    prefix = 'mtp://' + pathname2url(
794                        '[{}]/{}'.format(camera_details.port, camera_details.storage_desc)
795                    )
796                elif _desktop == Desktop.kde:
797                    prefix = 'mtp:/' + pathname2url(
798                        '{}/{}'.format(camera_details.display_name, camera_details.storage_desc)
799                    )
800                else:
801                    logging.error("Don't know how to generate MTP prefix for %s", _desktop.name)
802            else:
803                prefix = 'gphoto2://' + pathname2url('[{}]'.format(camera_details.port))
804
805            if _default_file_manager == 'pcmanfm-qt':
806                # pcmanfm-qt does not like the quoted form of the comma
807                prefix = prefix.replace(_quoted_comma, ',')
808                if full_file_name:
809                    # pcmanfm-qt does not like the the filename as part of the path
810                    full_file_name = os.path.dirname(full_file_name)
811
812
813    if full_file_name or path:
814        uri = '{}{}'.format(prefix, pathname2url(full_file_name or path))
815    else:
816        uri = prefix
817    return uri
818
819
820ValidatedFolder = namedtuple('ValidatedFolder', 'valid, absolute_path')
821
822
823def validate_download_folder(path: Optional[str],
824                             write_on_waccesss_failure: bool=False) -> ValidatedFolder:
825    r"""
826    Check if folder exists and is writeable.
827
828    Accepts None as a folder, which will always be invalid.
829
830    :param path: path to analyze
831    :param write_on_waccesss_failure: if os.access reports path is not writable, test
832     nonetheless to see if it's writable by writing and deleting a test file
833    :return: Tuple indicating validity and path made absolute
834
835    >>> validate_download_folder('/some/bogus/and/ridiculous/path')
836    ValidatedFolder(valid=False, absolute_path='/some/bogus/and/ridiculous/path')
837    >>> validate_download_folder(None)
838    ValidatedFolder(valid=False, absolute_path='')
839    >>> validate_download_folder('')
840    ValidatedFolder(valid=False, absolute_path='')
841    """
842
843    if not path:
844        return ValidatedFolder(False, '')
845    absolute_path = os.path.abspath(path)
846    valid = os.path.isdir(path) and os.access(path, os.W_OK)
847    if not valid and write_on_waccesss_failure and os.path.isdir(path):
848        try:
849            with NamedTemporaryFile(dir=path):
850                # the path is in fact writeable -- can happen with NFS
851                valid = True
852        except Exception:
853            logging.warning(
854                'While validating download / backup folder, failed to write a temporary file to '
855                '%s', path
856            )
857
858    return ValidatedFolder(valid, absolute_path)
859
860
861def validate_source_folder(path: Optional[str]) -> ValidatedFolder:
862    r"""
863    Check if folder exists and is readable.
864
865    Accepts None as a folder, which will always be invalid.
866
867    :param path: path to analyze
868    :return: Tuple indicating validity and path made absolute
869
870    >>> validate_source_folder('/some/bogus/and/ridiculous/path')
871    ValidatedFolder(valid=False, absolute_path='/some/bogus/and/ridiculous/path')
872    >>> validate_source_folder(None)
873    ValidatedFolder(valid=False, absolute_path='')
874    >>> validate_source_folder('')
875    ValidatedFolder(valid=False, absolute_path='')
876    """
877
878    if not path:
879        return ValidatedFolder(False, '')
880    absolute_path = os.path.abspath(path)
881    valid = os.path.isdir(path) and os.access(path, os.R_OK)
882    return ValidatedFolder(valid, absolute_path)
883
884
885def udev_attributes(devname: str) -> Optional[UdevAttr]:
886    """
887    Query udev to see if device is an MTP device.
888
889    :param devname: udev DEVNAME e.g. '/dev/bus/usb/001/003'
890    :return True if udev property ID_MTP_DEVICE == '1', else False
891    """
892
893    client = GUdev.Client(subsystems=['usb', 'block'])
894    enumerator = GUdev.Enumerator.new(client)
895    enumerator.add_match_property('DEVNAME', devname)
896    for device in enumerator.execute():
897        model = device.get_property('ID_MODEL')  # type: str
898        if model is not None:
899            is_mtp = device.get_property('ID_MTP_DEVICE') == '1' or \
900                     device.get_property('ID_MEDIA_PLAYER') == '1'
901            vendor = device.get_property('ID_VENDOR')  # type: str
902            model = model.replace('_', ' ').strip()
903            vendor = vendor.replace('_', ' ').strip()
904            return UdevAttr(is_mtp, vendor, model)
905    return None
906
907
908def udev_is_camera(devname: str) -> bool:
909    """
910    Query udev to see if device is a gphoto2 device (a camera or phone)
911    :param devname: udev DEVNAME e.g. '/dev/bus/usb/001/003'
912    :return: True if so, else False
913    """
914
915    client = GUdev.Client(subsystems=['usb', 'block'])
916    enumerator = GUdev.Enumerator.new(client)
917    enumerator.add_match_property('DEVNAME', devname)
918    for device in enumerator.execute():
919        if device.get_property('ID_GPHOTO2') == '1':
920            return True
921    return False
922
923
924def fs_device_details(path: str) -> Tuple:
925    """
926    :return: device (volume) name, uri, root path and filesystem type
927     of the mount the path is on
928    """
929    qsInfo = QStorageInfo(path)
930    name = qsInfo.displayName()
931    root_path = qsInfo.rootPath()
932    uri = 'file://{}'.format(pathname2url(root_path))
933    fstype = qsInfo.fileSystemType()
934    if isinstance(fstype, bytes):
935        fstype = fstype.decode()
936    return name, uri, root_path, fstype
937
938
939class WatchDownloadDirs(QFileSystemWatcher):
940    """
941    Create a file system watch to monitor if there are changes to the
942    download directories
943    """
944
945    def updateWatchPathsFromPrefs(self, prefs) -> None:
946        """
947        Update the watched directories using values from the program preferences
948        :param prefs: program preferences
949        :type prefs: raphodo.preferences.Preferences
950        """
951
952        logging.debug("Updating watched paths")
953
954        paths = (os.path.dirname(path) for path in (prefs.photo_download_folder,
955                                                    prefs.video_download_folder))
956        watch = {path for path in paths if path}
957
958        existing_watches = set(self.directories())
959
960        if watch == existing_watches:
961            return
962
963        new = watch - existing_watches
964        if new:
965            new = list(new)
966            logging.debug("Adding to watched paths: %s", ', '.join(new))
967            failures = self.addPaths(new)
968            if failures:
969                logging.debug("Failed to add watched paths: %s", failures)
970
971        old = existing_watches - watch
972        if old:
973            old = list(old)
974            logging.debug("Removing from watched paths: %s", ', '.join(old))
975            failures = self.removePaths(old)
976            if failures:
977                logging.debug("Failed to remove watched paths: %s", failures)
978
979    def closeWatch(self) -> None:
980        """
981        End all watches.
982        """
983        dirs = self.directories()
984        if dirs:
985            self.removePaths(dirs)
986
987
988class CameraHotplug(QObject):
989    cameraAdded = pyqtSignal()
990    cameraRemoved = pyqtSignal()
991
992    def __init__(self):
993        super().__init__()
994        self.cameras = {}
995
996    @pyqtSlot()
997    def startMonitor(self):
998        self.client = GUdev.Client(subsystems=['usb', 'block'])
999        self.client.connect('uevent', self.ueventCallback)
1000        logging.debug("... camera hotplug monitor started")
1001        self.enumerateCameras()
1002        if self.cameras:
1003            logging.info(
1004                "Camera Hotplug found %d camera(s): %s", len(self.cameras), ', '.join(
1005                    (model for port, model in self.cameras.items())
1006                )
1007            )
1008            for port, model in self.cameras.items():
1009                logging.debug("%s is at %s", model, port)
1010
1011    def enumerateCameras(self):
1012        """
1013        Query udev to get the list of cameras store their path and
1014        model in our internal dict, which is useful when responding to
1015        camera removal.
1016        """
1017        enumerator = GUdev.Enumerator.new(self.client)
1018        enumerator.add_match_property('ID_GPHOTO2', '1')
1019        for device in enumerator.execute():
1020            model = device.get_property('ID_MODEL')
1021            if model is not None:
1022                path = device.get_sysfs_path()
1023                self.cameras[path] = model
1024
1025    def ueventCallback(self, client: GUdev.Client, action: str, device: GUdev.Device) -> None:
1026
1027        # for key in device.get_property_keys():
1028        #     print(key, device.get_property(key))
1029        if device.get_property('ID_GPHOTO2') == '1':
1030            self.camera(action, device)
1031
1032    def camera(self, action: str, device: GUdev.Device) -> None:
1033        # For some reason, the add and remove camera event is triggered twice.
1034        # The second time the device information is a variation on information
1035        # from the first time.
1036        path = device.get_sysfs_path()
1037        parent_device = device.get_parent()
1038        parent_path = parent_device.get_sysfs_path()
1039        logging.debug("Device change: %s. Path: %s Parent path: %s", action, path, parent_path)
1040
1041        # Ignore 'bind' action: seems to add nothing we need to know
1042
1043        if action == 'add':
1044            if parent_path not in self.cameras:
1045                model = device.get_property('ID_MODEL')
1046                logging.info("Hotplug: new camera: %s", model.replace('_', ' '))
1047                self.cameras[path] = model
1048                self.cameraAdded.emit()
1049            else:
1050                logging.debug("Hotplug: already know about %s", self.cameras[parent_path])
1051
1052        elif action == 'remove':
1053            emit_remove = False
1054            name = ''
1055
1056            # A path might look like:
1057            # /sys/devices/pci0000:00/0000:00:1c.6/0000:0e:00.0/usb3/3-2/3-2:1.0
1058            # When what we want is:
1059            # /sys/devices/pci0000:00/0000:00:1c.6/0000:0e:00.0/usb3/3-2
1060            # This unchanged path used to work, so test both the unchanged and modified
1061            # path
1062            # Note enumerateCameras() above finds only the path as in the 2nd type, without the
1063            # 3-2:1.0
1064            split_path = os.path.split(path)[0]
1065
1066            for p in (path, split_path):
1067                if p in self.cameras:
1068                    name = self.cameras[p]
1069                    logging.debug("Hotplug: removing %s on basis of path %s", name, p)
1070                    del self.cameras[p]
1071                    emit_remove = True
1072                    break
1073
1074            if emit_remove:
1075                logging.info("Hotplug: %s has been removed", name)
1076                self.cameraRemoved.emit()
1077
1078
1079if have_gio:
1080    class GVolumeMonitor(QObject):
1081        r"""
1082        Monitor the mounting or unmounting of cameras or partitions
1083        using Gnome's GIO/GVFS. Unmount cameras automatically mounted
1084        by GVFS.
1085
1086        Raises a signal if a volume has been inserted, but will not be
1087        automatically mounted. This is important because this class
1088        is monitoring mounts, and if the volume is not mounted, it will
1089        go unnoticed.
1090        """
1091
1092        cameraUnmounted = pyqtSignal(bool, str, str, bool, bool)
1093        cameraMounted = pyqtSignal()
1094        partitionMounted = pyqtSignal(str, list, bool)
1095        partitionUnmounted = pyqtSignal(str)
1096        volumeAddedNoAutomount = pyqtSignal()
1097        cameraPossiblyRemoved = pyqtSignal()
1098        cameraVolumeAdded = pyqtSignal(str)
1099
1100        def __init__(self, validMounts: ValidMounts) -> None:
1101            super().__init__()
1102            self.vm = Gio.VolumeMonitor.get()
1103            self.vm.connect('mount-added', self.mountAdded)
1104            self.vm.connect('volume-added', self.volumeAdded)
1105            self.vm.connect('mount-removed', self.mountRemoved)
1106            self.vm.connect('volume-removed', self.volumeRemoved)
1107            self.portSearch = re.compile(r'usb:([\d]+),([\d]+)')
1108            self.scsiPortSearch = re.compile(r'usbscsi:(.+)')
1109            self.possibleCamera = re.compile(r'/usb/([\d]+)/([\d]+)')
1110            self.validMounts = validMounts
1111            self.camera_volumes_added = dict()  # type: Dict[str, str]
1112            self.camera_volumes_mounted = set()  # type: Set[str]
1113
1114        @staticmethod
1115        def mountMightBeCamera(mount: Gio.Mount) -> bool:
1116            """
1117            :param mount: the mount to check
1118            :return: True if the mount needs to be checked if it is a camera
1119            """
1120            return not mount.is_shadowed() and mount.get_volume() is not None
1121
1122        def unixDevicePathIsCamera(self, devname: str) -> bool:
1123            """
1124            Test if the device at unix device path devname is a camera
1125            :param devname: Gio.VOLUME_IDENTIFIER_KIND_UNIX_DEVICE device path
1126             e.g. '/dev/bus/usb/001/003'
1127            :return: True if camera else False
1128            """
1129
1130            return self.possibleCamera.search(devname) is not None and udev_is_camera(devname)
1131
1132        def ptpCameraMountPoint(self, model: str, port: str) -> Optional[Gio.Mount]:
1133            """
1134            :return: the mount point of the PTP / MTP camera, if it is mounted,
1135             else None. If camera is not mounted with PTP / MTP, None is
1136             returned.
1137            """
1138
1139            p = self.portSearch.match(port)
1140            if p is not None:
1141                p1 = p.group(1)
1142                p2 = p.group(2)
1143                device_path = '/dev/bus/usb/{}/{}'.format(p1, p2)
1144                return self.cameraMountPointByUnixDevice(device_path=device_path)
1145            else:
1146                p = self.scsiPortSearch.match(port)
1147                if p is None:
1148                    logging.error("Unknown camera mount method %s %s", model, port)
1149                return None
1150
1151        def cameraMountPointByUnixDevice(self, device_path: str) -> Optional[Gio.Mount]:
1152            """
1153            :return: the mount point of the PTP / MTP camera, if it is mounted,
1154             else None. If camera is not mounted with PTP / MTP, None is
1155             returned.
1156            """
1157
1158            to_unmount = None
1159
1160            for mount in self.vm.get_mounts():
1161                if self.mountMightBeCamera(mount):
1162                    identifier = mount.get_volume().get_identifier(
1163                        Gio.VOLUME_IDENTIFIER_KIND_UNIX_DEVICE
1164                    )
1165                    if device_path == identifier:
1166                        to_unmount = mount
1167                        break
1168            return to_unmount
1169
1170        @pyqtSlot(str, str, bool, bool, int)
1171        def reUnmountCamera(self, model: str,
1172                          port: str,
1173                          download_starting: bool,
1174                          on_startup: bool,
1175                          attempt_no: int) -> None:
1176
1177            logging.info(
1178                "Attempt #%s to unmount camera %s on port %s",
1179                attempt_no + 1, model, port
1180            )
1181            self.unmountCamera(
1182                model=model, port=port, download_starting=download_starting, on_startup=on_startup,
1183                attempt_no=attempt_no
1184            )
1185
1186        def unmountCamera(self, model: str,
1187                          port: str,
1188                          download_starting: bool=False,
1189                          on_startup: bool=False,
1190                          mount_point: Optional[Gio.Mount]=None,
1191                          attempt_no: Optional[int]=0) -> bool:
1192            """
1193            Unmount camera mounted on gvfs mount point, if it is
1194            mounted. If not mounted, ignore.
1195            :param model: model as returned by libgphoto2
1196            :param port: port as returned by libgphoto2, in format like
1197             usb:001,004
1198            :param download_starting: if True, the unmount is occurring
1199             because a download has been initiated.
1200            :param on_startup: if True, the unmount is occurring during
1201             the program's startup phase
1202            :param mount_point: if not None, try umounting from this
1203             mount point without scanning for it first
1204            :return: True if an unmount operation has been initiated,
1205             else returns False.
1206            """
1207
1208            if mount_point is None:
1209                to_unmount = self.ptpCameraMountPoint(model, port)
1210            else:
1211                to_unmount = mount_point
1212
1213            if to_unmount is not None:
1214                logging.debug("GIO: Attempting to unmount %s...", model)
1215                to_unmount.unmount_with_operation(
1216                    0, None, None, self.unmountCameraCallback,
1217                    (model, port, download_starting, on_startup, attempt_no)
1218                )
1219                return True
1220
1221            return False
1222
1223        def unmountCameraCallback(self, mount: Gio.Mount,
1224                                  result: Gio.AsyncResult,
1225                                  user_data: Tuple[str, str, bool, bool]) -> None:
1226            """
1227            Called by the asynchronous unmount operation.
1228            When complete, emits a signal indicating operation
1229            success, and the camera model and port
1230            :param mount: camera mount
1231            :param result: result of the unmount process
1232            :param user_data: model and port of the camera being
1233            unmounted, in the format of libgphoto2
1234            """
1235
1236            model, port, download_starting, on_startup, attempt_no = user_data
1237            try:
1238                if mount.unmount_with_operation_finish(result):
1239                    logging.debug("...successfully unmounted {}".format(model))
1240                    self.cameraUnmounted.emit(True, model, port, download_starting, on_startup)
1241                else:
1242                    logging.debug("...failed to unmount {}".format(model))
1243                    self.cameraUnmounted.emit(False, model, port, download_starting, on_startup)
1244            except GLib.GError as e:
1245                if e.code == 26 and attempt_no < 10:
1246                    attempt_no += 1
1247                    QTimer.singleShot(
1248                        750, lambda : self.reUnmountCamera(
1249                            model, port, download_starting,
1250                            on_startup, attempt_no
1251                        )
1252                    )
1253                else:
1254                    logging.error('Exception occurred unmounting {}'.format(model))
1255                    logging.exception('Traceback:')
1256                    self.cameraUnmounted.emit(False, model, port, download_starting, on_startup)
1257
1258        def unmountVolume(self, path: str) -> None:
1259            """
1260            Unmounts the volume represented by the path. If no volume is found
1261            representing that path, nothing happens.
1262
1263            :param path: path of the volume. It should not end with os.sep.
1264            """
1265
1266            for mount in self.vm.get_mounts():
1267                root = mount.get_root()
1268                if root is not None:
1269                    mpath = root.get_path()
1270                    if path == mpath:
1271                        logging.info("Attempting to unmount %s...", path)
1272                        mount.unmount_with_operation(
1273                            0, None, None, self.unmountVolumeCallback, path
1274                        )
1275                        break
1276
1277        @staticmethod
1278        def unmountVolumeCallback(mount: Gio.Mount,
1279                                  result: Gio.AsyncResult,
1280                                  user_data: str) -> None:
1281
1282            """
1283            Called by the asynchronous unmount operation.
1284
1285            :param mount: volume mount
1286            :param result: result of the unmount process
1287            :param user_data: the path of the device unmounted
1288            """
1289            path = user_data
1290
1291            try:
1292                if mount.unmount_with_operation_finish(result):
1293                    logging.info("...successfully unmounted %s", path)
1294                else:
1295                    logging.info("...failed to unmount %s", path)
1296            except GLib.GError as e:
1297                logging.error('Exception occurred unmounting %s', path)
1298                logging.exception('Traceback:')
1299
1300        def mountIsCamera(self, mount: Gio.Mount) -> bool:
1301            """
1302            Determine if the mount refers to a camera by checking the
1303            path to see if gphoto2 or mtp is in the last folder in the
1304            root path.
1305
1306            Does not query udev, deliberately. This can be called when device
1307            is being unmounted. Unclear if the device is still on the system
1308            at this point, or how realible that is even if it is.
1309
1310            :param mount: mount to check
1311            :return: True if mount refers to a camera, else False
1312            """
1313
1314            if self.mountMightBeCamera(mount):
1315                root = mount.get_root()
1316                if root is None:
1317                    logging.warning('Unable to get mount root')
1318                else:
1319                    path = root.get_path()
1320                    if path:
1321                        logging.debug("GIO: Looking for camera at mount {}".format(path))
1322                        # check last two levels of the path name, as it might be in a format like
1323                        # /run/..../gvfs/gphoto2:host=Canon_Inc._Canon_Digital_Camera/store_00010001
1324                        for i in (1, 2):
1325                            path, folder_name = os.path.split(path)
1326                            if folder_name:
1327                                for s in ('gphoto2:host=', 'mtp:host='):
1328                                    if folder_name.startswith(s):
1329                                        return True
1330            return False
1331
1332        def mountIsPartition(self, mount: Gio.Mount) -> bool:
1333            """
1334            Determine if the mount point is that of a valid partition,
1335            i.e. is mounted in a valid location, which is under one of
1336            self.validMountDirs
1337            :param mount: the mount to examine
1338            :return: True if the mount is a valid partiion
1339            """
1340            root = mount.get_root()
1341            if root is None:
1342                logging.warning('Unable to get mount root')
1343            else:
1344                path = root.get_path()
1345                if path:
1346                    logging.debug("GIO: Looking for valid partition at mount {}".format(path))
1347                    if self.validMounts.pathIsValidMountPoint(path):
1348                        logging.debug("GIO: partition found at {}".format(path))
1349                        return True
1350                if path is not None:
1351                    logging.debug("GIO: partition is not valid mount: {}".format(path))
1352            return False
1353
1354        def mountAdded(self, volumeMonitor, mount: Gio.Mount) -> None:
1355            """
1356            Determine if mount is valid partition or is a camera, or something
1357            else.
1358
1359            :param volumeMonitor: not used
1360            :param mount: the mount to examine
1361            """
1362
1363            logging.debug("Examining mount %s", mount.get_name())
1364            try:
1365                identifier = mount.get_volume().get_identifier(
1366                    Gio.VOLUME_IDENTIFIER_KIND_UNIX_DEVICE
1367                )
1368                if identifier in self.camera_volumes_added:
1369                    logging.debug("%s is now mounted", self.camera_volumes_added[identifier])
1370                    self.camera_volumes_mounted.add(identifier)
1371                    self.cameraMounted.emit()
1372                    return
1373            except Exception:
1374                pass
1375
1376            if self.mountIsCamera(mount):
1377                # Can be called on startup if camera was already mounted in GIO before the program
1378                # started. In that case, previous check would not have detected the camera.
1379                self.cameraMounted.emit()
1380            elif self.mountIsPartition(mount):
1381                icon_names = self.getIconNames(mount)
1382                self.partitionMounted.emit(
1383                    mount.get_root().get_path(), icon_names, mount.can_eject()
1384                )
1385
1386        def mountRemoved(self, volumeMonitor, mount: Gio.Mount) -> None:
1387            if not self.mountIsCamera(mount):
1388                if self.mountIsPartition(mount):
1389                    logging.debug("GIO: %s has been unmounted", mount.get_name())
1390                    self.partitionUnmounted.emit(mount.get_root().get_path())
1391
1392        def volumeAdded(self, volumeMonitor, volume: Gio.Volume) -> None:
1393            volume_name = volume.get_name()
1394            logging.debug(
1395                "GIO: Volume added %s. Automount: %s (might be incorrect)",
1396                volume_name, volume.should_automount()
1397            )
1398
1399            # Even if volume.should_automount(), the volume in fact may not be mounted
1400            # automatically. Unbelievable.
1401
1402            device_path = volume.get_identifier(
1403                Gio.VOLUME_IDENTIFIER_KIND_UNIX_DEVICE
1404            )
1405            if device_path is None:
1406                logging.error("Unable to determine device path of %s", volume_name)
1407            elif self.unixDevicePathIsCamera(device_path):
1408                self.camera_volumes_added[device_path] = volume_name
1409                logging.debug(
1410                    "%s is a camera at %s", volume_name, device_path
1411                )
1412                # Time is in milliseconds; 3000 is 3 seconds.
1413                QTimer.singleShot(3000, lambda: self.cameraVolumeAddedCheckMount(device_path))
1414
1415        def cameraVolumeAddedCheckMount(self, device_path) -> None:
1416            if device_path not in self.camera_volumes_mounted:
1417                logging.debug(
1418                    "%s had not been automatically mounted. Will initiate camera scan.",
1419                    self.camera_volumes_added[device_path]
1420                )
1421                self.cameraVolumeAdded.emit(device_path)
1422            else:
1423                logging.debug(
1424                    "%s had been automatically mounted", self.camera_volumes_added[device_path]
1425                )
1426
1427        def volumeRemoved(self, volumeMonitor, volume: Gio.Volume) -> None:
1428            logging.debug("GIO: %s volume removed", volume.get_name())
1429            if volume.get_activation_root() is not None:
1430                logging.debug("GIO: %s might be a camera", volume.get_name())
1431                self.cameraPossiblyRemoved.emit()
1432
1433        @staticmethod
1434        def getIconNames(mount: Gio.Mount) -> List[str]:
1435            """
1436            Get icons for the mount from theme
1437
1438            :param mount:
1439            :return:
1440            """
1441            icon_names = []
1442            icon = mount.get_icon()
1443            if isinstance(icon, Gio.ThemedIcon):
1444                icon_names = icon.get_names()
1445
1446            return icon_names
1447
1448        def getProps(self, path: str) -> Tuple[Optional[List[str]], Optional[bool]]:
1449            """
1450            Given a mount's path, get the icon names suggested by the
1451            volume monitor, and determine whether the mount is
1452            ejectable or not.
1453            :param path: the path of mount to check
1454            :return: icon names and eject boolean
1455            """
1456
1457            for mount in self.vm.get_mounts():
1458                root = mount.get_root()
1459                if root is not None:
1460                    p = root.get_path()
1461                    if path == p:
1462                        icon_names = self.getIconNames(mount)
1463                        return (icon_names, mount.can_eject())
1464            return (None, None)
1465
1466
1467def _get_info_size_value(info: Gio.FileInfo, attr: str) -> int:
1468    if info.get_attribute_data(attr).type == Gio.FileAttributeType.UINT64:
1469        return info.get_attribute_uint64(attr)
1470    else:
1471        return info.get_attribute_uint32(attr)
1472
1473
1474def get_mount_size(mount: QStorageInfo) -> Tuple[int, int]:
1475    """
1476    Uses GIO to get bytes total and bytes free (available) for the mount that a
1477    path is in.
1478
1479    :param path: path located anywhere in the mount
1480    :return: bytes_total, bytes_free
1481    """
1482
1483    bytes_free = mount.bytesAvailable()
1484    bytes_total = mount.bytesTotal()
1485
1486    if bytes_total or not have_gio:
1487        return bytes_total, bytes_free
1488
1489    path = mount.rootPath()
1490
1491    logging.debug("Using GIO to query file system attributes for %s...", path)
1492    p = Gio.File.new_for_path(os.path.abspath(path))
1493    info = p.query_filesystem_info(
1494        ','.join(
1495            (Gio.FILE_ATTRIBUTE_FILESYSTEM_SIZE, Gio.FILE_ATTRIBUTE_FILESYSTEM_FREE)
1496        )
1497    )
1498    logging.debug("...query of file system attributes for %s completed", path)
1499    bytes_total = _get_info_size_value(info, Gio.FILE_ATTRIBUTE_FILESYSTEM_SIZE)
1500    bytes_free = _get_info_size_value(info, Gio.FILE_ATTRIBUTE_FILESYSTEM_FREE)
1501    return bytes_total, bytes_free
1502