1# SPDX-FileCopyrightText: 2020 Romain Vigier <contact AT romainvigier.fr> 2# SPDX-License-Identifier: GPL-3.0-or-later 3 4"""File object and states.""" 5 6import hashlib 7import os 8import tempfile 9 10from enum import IntEnum, auto 11from gettext import gettext as _ 12from gi.repository import Gio, GLib, GObject 13from libmat2 import parser_factory 14from threading import Thread 15from typing import Dict, Optional 16 17from metadatacleaner.logger import Logger as logger 18 19 20class FileState(IntEnum): 21 """States that a File can have.""" 22 23 INITIALIZING = auto() 24 ERROR_WHILE_INITIALIZING = auto() 25 UNSUPPORTED = auto() 26 SUPPORTED = auto() 27 CHECKING_METADATA = auto() 28 ERROR_WHILE_CHECKING_METADATA = auto() 29 HAS_NO_METADATA = auto() 30 HAS_METADATA = auto() 31 REMOVING_METADATA = auto() 32 ERROR_WHILE_REMOVING_METADATA = auto() 33 CLEANED = auto() 34 SAVING = auto() 35 ERROR_WHILE_SAVING = auto() 36 SAVED = auto() 37 38 39class File(GObject.GObject): 40 """File object.""" 41 42 __gsignals__ = { 43 "state-changed": (GObject.SIGNAL_RUN_FIRST, None, (int,)), 44 "removed": (GObject.SIGNAL_RUN_FIRST, None, ()) 45 } 46 47 def __init__(self, gfile: Gio.File) -> None: 48 """File initialization. 49 50 Args: 51 gfile (Gio.File): The Gio File that the File will be built from. 52 """ 53 super().__init__() 54 self._gfile = gfile 55 self._temp_path = self._compute_temp_path(gfile.get_path()) 56 self.path = gfile.get_path() 57 self.filename = gfile.get_basename() 58 self.state = FileState.INITIALIZING 59 self.mimetype = "text/plain" 60 self.metadata: Optional[Dict] = None 61 self.error: Optional[Exception] = None 62 63 def _compute_temp_path(self, path: str) -> str: 64 # We have to keep the extension so that ffmpeg doesn't break 65 filename, extension = os.path.splitext(path) 66 digest = hashlib.sha256(path.encode("utf-8")).hexdigest() 67 return os.path.join(tempfile.gettempdir(), f"{digest}{extension}") 68 69 def _set_state(self, state: FileState) -> None: 70 if state != self.state: 71 self.state = state 72 logger.debug(f"State of {self.filename} changed to {str(state)}.") 73 GLib.idle_add(self.emit, "state-changed", state) 74 75 def initialize_parser(self) -> None: 76 """Initialize the metadata parser.""" 77 logger.info(f"Initializing parser for {self.filename}...") 78 try: 79 parser, mimetype = parser_factory.get_parser(self.path) 80 except Exception as e: 81 self.error = e 82 logger.warning( 83 f"Error while initializing parser for {self.filename}: {e}" 84 ) 85 self._set_state(FileState.ERROR_WHILE_INITIALIZING) 86 else: 87 self._parser = parser 88 self.mimetype = mimetype 89 if self._parser: 90 logger.info(f"{self.filename} is supported.") 91 self._set_state(FileState.SUPPORTED) 92 else: 93 logger.info(f"{self.filename} is unsupported.") 94 self._set_state(FileState.UNSUPPORTED) 95 96 def check_metadata(self) -> None: 97 """Check the metadata present in the file.""" 98 if self.state != FileState.SUPPORTED: 99 return 100 logger.info(f"Checking metadata for {self.filename}...") 101 self._set_state(FileState.CHECKING_METADATA) 102 try: 103 metadata = self._parser.get_meta() 104 except Exception as e: 105 self.error = e 106 logger.warning( 107 f"Error while checking metadata for {self.filename}: {e}" 108 ) 109 self._set_state(FileState.ERROR_WHILE_CHECKING_METADATA) 110 else: 111 self.metadata = metadata if bool(metadata) else None 112 if self.metadata: 113 logger.info(f"Found metadata for {self.filename}.") 114 self._set_state(FileState.HAS_METADATA) 115 else: 116 logger.info(f"Found no metadata for {self.filename}.") 117 self._set_state(FileState.HAS_NO_METADATA) 118 119 def remove_metadata(self, lightweight_mode=False) -> None: 120 """Remove the metadata from the file. 121 122 Args: 123 lightweight_mode (bool, optional): Use mat2 lightweight mode to 124 preserve data integrity. Defaults to False. 125 """ 126 if self.state not in [ 127 FileState.HAS_METADATA, 128 FileState.HAS_NO_METADATA 129 ]: 130 return 131 logger.info(f"Removing metadata for {self.filename}...") 132 self._set_state(FileState.REMOVING_METADATA) 133 try: 134 self._parser.output_filename = self._temp_path 135 self._parser.lightweight_cleaning = lightweight_mode 136 self._parser.remove_all() 137 if not os.path.exists(self._temp_path): 138 raise RuntimeError(_( 139 "Something bad happened during the removal, " 140 "cleaned file not found" 141 )) 142 except Exception as e: 143 self.error = e 144 logger.warning( 145 f"Error while removing metadata for {self.filename}: {e}" 146 ) 147 self._set_state(FileState.ERROR_WHILE_REMOVING_METADATA) 148 else: 149 logger.info(f"{self.filename} has been cleaned.") 150 self._set_state(FileState.CLEANED) 151 152 def save(self) -> None: 153 """Save the cleaned file.""" 154 if self.state != FileState.CLEANED: 155 return 156 logger.info(f"Saving {self.filename}...") 157 self._set_state(FileState.SAVING) 158 try: 159 cleaned_gfile = Gio.File.new_for_path(self._temp_path) 160 cleaned_gfile.move( 161 self._gfile, 162 Gio.FileCopyFlags.OVERWRITE, 163 None, 164 None, 165 None 166 ) 167 except Exception as e: 168 self.error = e 169 logger.warning(f"Error while saving {self.filename}: {e}") 170 self._set_state(FileState.ERROR_WHILE_SAVING) 171 else: 172 logger.info(f"{self.filename} has been saved.") 173 self._set_state(FileState.SAVED) 174 175 def remove(self) -> None: 176 """Remove the file from the application.""" 177 if os.path.exists(self._temp_path): 178 os.remove(self._temp_path) 179 GLib.idle_add(self.emit, "removed") 180