1# SPDX-FileCopyrightText: 2020 Romain Vigier <contact AT romainvigier.fr> 2# SPDX-License-Identifier: GPL-3.0-or-later 3 4"""Files Manager object and states.""" 5 6import libmat2 7import logging 8import mimetypes 9 10from concurrent.futures import ThreadPoolExecutor, as_completed 11from enum import IntEnum, auto 12from gi.repository import Gio, GLib, GObject 13from threading import Thread 14from typing import Dict, Iterable, List, Set 15 16from metadatacleaner.file import File, FileState 17from metadatacleaner.logger import Logger as logger 18 19 20def _get_supported_formats() -> Dict: 21 formats = {} 22 for parser in libmat2.parser_factory._get_parsers(): 23 for mimetype in parser.mimetypes: 24 extensions = set() 25 for extension in mimetypes.guess_all_extensions(mimetype): 26 if extension not in libmat2.UNSUPPORTED_EXTENSIONS: 27 extensions.add(extension) 28 if not extensions: 29 continue 30 formats[mimetype] = extensions 31 return formats 32 33 34SUPPORTED_FORMATS = _get_supported_formats() 35 36 37class FilesManagerState(IntEnum): 38 """States the Files Manager can have.""" 39 40 IDLE = auto() 41 WORKING = auto() 42 43 44class FilesManager(GObject.GObject): 45 """Files Manager object.""" 46 47 __gsignals__ = { 48 "file-added": (GObject.SIGNAL_RUN_FIRST, None, (int,)), 49 "file-removed": (GObject.SIGNAL_RUN_FIRST, None, ()), 50 "file-state-changed": (GObject.SIGNAL_RUN_FIRST, None, (int,)), 51 "state-changed": (GObject.SIGNAL_RUN_FIRST, None, (int,)), 52 "progress-changed": (GObject.SIGNAL_RUN_FIRST, None, (int, int)) 53 } 54 55 def __init__(self) -> None: 56 """Files Manager initialization.""" 57 super().__init__() 58 self._files: List[File] = [] 59 self._paths: Set = set() 60 self.state = FilesManagerState.IDLE 61 self.progress = (0, 0) 62 self.lightweight_mode = False 63 64 def _on_file_state_changed(self, f: File, new_state: FileState) -> None: 65 GLib.idle_add(self.emit, "file-state-changed", self._files.index(f)) 66 67 def _set_state(self, state: FilesManagerState) -> None: 68 if state == self.state: 69 return 70 self.state = state 71 logger.debug( 72 f"State of files manager changed to {str(self.state)}." 73 ) 74 GLib.idle_add(self.emit, "state-changed", state) 75 76 def _set_progress(self, current: int, total: int) -> None: 77 self.progress = (current, total) 78 logger.debug(f"Files manager progress set to {self.progress}.") 79 GLib.idle_add(self.emit, "progress-changed", current, total) 80 81 def get_files(self) -> List[File]: 82 """Get all the files from the Files Manager. 83 84 Returns: 85 List[File]: List of files. 86 """ 87 return self._files 88 89 def get_file(self, index: int) -> File: 90 """Get a files at a specific index. 91 92 Args: 93 index (int): Index of the file. 94 95 Returns: 96 File: The requested file. 97 """ 98 return self._files[index] 99 100 def add_gfiles(self, gfiles: List[Gio.File]) -> None: 101 """Add Gio Files to the Files Manager. 102 103 Args: 104 gfiles (List[Gio.File]): List of Gio Files to add. 105 """ 106 thread = Thread( 107 target=self._add_gfiles_async, 108 args=(gfiles,), 109 daemon=True 110 ) 111 thread.start() 112 113 def _add_gfiles_async(self, gfiles: List[Gio.File]) -> None: 114 number_of_gfiles = len(gfiles) 115 self._set_progress(0, number_of_gfiles) 116 self._set_state(FilesManagerState.WORKING) 117 with ThreadPoolExecutor() as executor: 118 futures = { 119 executor.submit(self.add_gfile, gfile) 120 for gfile in gfiles 121 } 122 for i, future in enumerate(as_completed(futures)): 123 self._set_progress(i + 1, number_of_gfiles) 124 self._set_state(FilesManagerState.IDLE) 125 126 def add_gfile(self, gfile: Gio.File) -> None: 127 """Add a Gio File to the Files Manager. 128 129 Args: 130 gfile (Gio.File): The Gio File to add. 131 """ 132 if gfile.get_path() in self._paths: 133 logger.info(f"Skipping {gfile.get_path()}, already added.") 134 return 135 if not gfile.query_exists(None): 136 logger.info(f"File {gfile.get_path()} does not exist, skipping.") 137 return 138 logger.info(f"Adding {gfile.get_path()}...") 139 f = File(gfile) 140 self._paths.add(f.path) 141 self._files.append(f) 142 GLib.idle_add(self.emit, "file-added", len(self._files) - 1) 143 f.connect("state-changed", self._on_file_state_changed) 144 f.initialize_parser() 145 f.check_metadata() 146 147 def remove_file(self, f: File) -> None: 148 """Remove a file from the Files Manager. 149 150 Args: 151 f (File): The file to remove. 152 """ 153 self._files.remove(f) 154 self._paths.remove(f.path) 155 f.remove() 156 GLib.idle_add(self.emit, "file-removed") 157 158 def clean_files(self) -> None: 159 """Remove metadata from all the cleanable files.""" 160 thread = Thread( 161 target=self._clean_files_async, 162 daemon=True 163 ) 164 thread.start() 165 166 def _clean_files_async(self) -> None: 167 cleanable_files = self.get_cleanable_files() 168 number_of_cleanable_files = len(cleanable_files) 169 self._set_progress(0, number_of_cleanable_files) 170 self._set_state(FilesManagerState.WORKING) 171 with ThreadPoolExecutor() as executor: 172 futures = { 173 executor.submit(f.remove_metadata, self.lightweight_mode) 174 for f in cleanable_files 175 } 176 for i, future in enumerate(as_completed(futures)): 177 self._set_progress(i + 1, number_of_cleanable_files) 178 self._set_state(FilesManagerState.IDLE) 179 180 def save_cleaned_files(self) -> None: 181 """Save the cleaned files.""" 182 thread = Thread(target=self._save_cleaned_files_async, daemon=True) 183 thread.start() 184 185 def _save_cleaned_files_async(self) -> None: 186 cleaned_files = self.get_cleaned_files() 187 number_of_cleaned_files = len(cleaned_files) 188 self._set_progress(0, number_of_cleaned_files) 189 self._set_state(FilesManagerState.WORKING) 190 with ThreadPoolExecutor() as executor: 191 futures = {executor.submit(f.save)for f in cleaned_files} 192 for i, future in enumerate(as_completed(futures)): 193 self._set_progress(i + 1, number_of_cleaned_files) 194 self._set_state(FilesManagerState.IDLE) 195 196 def get_cleanable_files(self) -> List[File]: 197 """Get all the cleanable files. 198 199 Returns: 200 List[File]: List of cleanable files. 201 """ 202 return self._get_files_with_states(( 203 FileState.HAS_METADATA, 204 FileState.HAS_NO_METADATA 205 )) 206 207 def get_cleaned_files(self) -> List[File]: 208 """Get all the cleaned files. 209 210 Returns: 211 List[File]: List of cleaned files. 212 """ 213 return self._get_files_with_states((FileState.CLEANED,)) 214 215 def _get_files_with_states( 216 self, 217 states: Iterable[FileState] 218 ) -> List[File]: 219 wanted_files: List[File] = [] 220 for f in self._files: 221 if f.state in states: 222 wanted_files.append(f) 223 return wanted_files 224