1# SPDX-FileCopyrightText: 2020 Romain Vigier <contact AT romainvigier.fr>
2# SPDX-License-Identifier: GPL-3.0-or-later
3
4"""File object and states."""
5
6import hashlib
7import os
8import tempfile
9
10from enum import IntEnum, auto
11from gettext import gettext as _
12from gi.repository import Gio, GLib, GObject
13from libmat2 import parser_factory
14from threading import Thread
15from typing import Dict, Optional
16
17from metadatacleaner.logger import Logger as logger
18
19
20class FileState(IntEnum):
21    """States that a File can have."""
22
23    INITIALIZING = auto()
24    ERROR_WHILE_INITIALIZING = auto()
25    UNSUPPORTED = auto()
26    SUPPORTED = auto()
27    CHECKING_METADATA = auto()
28    ERROR_WHILE_CHECKING_METADATA = auto()
29    HAS_NO_METADATA = auto()
30    HAS_METADATA = auto()
31    REMOVING_METADATA = auto()
32    ERROR_WHILE_REMOVING_METADATA = auto()
33    CLEANED = auto()
34    SAVING = auto()
35    ERROR_WHILE_SAVING = auto()
36    SAVED = auto()
37
38
39class File(GObject.GObject):
40    """File object."""
41
42    __gsignals__ = {
43        "state-changed": (GObject.SIGNAL_RUN_FIRST, None, (int,)),
44        "removed": (GObject.SIGNAL_RUN_FIRST, None, ())
45    }
46
47    def __init__(self, gfile: Gio.File) -> None:
48        """File initialization.
49
50        Args:
51            gfile (Gio.File): The Gio File that the File will be built from.
52        """
53        super().__init__()
54        self._gfile = gfile
55        self._temp_path = self._compute_temp_path(gfile.get_path())
56        self.path = gfile.get_path()
57        self.filename = gfile.get_basename()
58        self.state = FileState.INITIALIZING
59        self.mimetype = "text/plain"
60        self.metadata: Optional[Dict] = None
61        self.error: Optional[Exception] = None
62
63    def _compute_temp_path(self, path: str) -> str:
64        # We have to keep the extension so that ffmpeg doesn't break
65        filename, extension = os.path.splitext(path)
66        digest = hashlib.sha256(path.encode("utf-8")).hexdigest()
67        return os.path.join(tempfile.gettempdir(), f"{digest}{extension}")
68
69    def _set_state(self, state: FileState) -> None:
70        if state != self.state:
71            self.state = state
72            logger.debug(f"State of {self.filename} changed to {str(state)}.")
73            GLib.idle_add(self.emit, "state-changed", state)
74
75    def initialize_parser(self) -> None:
76        """Initialize the metadata parser."""
77        logger.info(f"Initializing parser for {self.filename}...")
78        try:
79            parser, mimetype = parser_factory.get_parser(self.path)
80        except Exception as e:
81            self.error = e
82            logger.warning(
83                f"Error while initializing parser for {self.filename}: {e}"
84            )
85            self._set_state(FileState.ERROR_WHILE_INITIALIZING)
86        else:
87            self._parser = parser
88            self.mimetype = mimetype
89            if self._parser:
90                logger.info(f"{self.filename} is supported.")
91                self._set_state(FileState.SUPPORTED)
92            else:
93                logger.info(f"{self.filename} is unsupported.")
94                self._set_state(FileState.UNSUPPORTED)
95
96    def check_metadata(self) -> None:
97        """Check the metadata present in the file."""
98        if self.state != FileState.SUPPORTED:
99            return
100        logger.info(f"Checking metadata for {self.filename}...")
101        self._set_state(FileState.CHECKING_METADATA)
102        try:
103            metadata = self._parser.get_meta()
104        except Exception as e:
105            self.error = e
106            logger.warning(
107                f"Error while checking metadata for {self.filename}: {e}"
108            )
109            self._set_state(FileState.ERROR_WHILE_CHECKING_METADATA)
110        else:
111            self.metadata = metadata if bool(metadata) else None
112            if self.metadata:
113                logger.info(f"Found metadata for {self.filename}.")
114                self._set_state(FileState.HAS_METADATA)
115            else:
116                logger.info(f"Found no metadata for {self.filename}.")
117                self._set_state(FileState.HAS_NO_METADATA)
118
119    def remove_metadata(self, lightweight_mode=False) -> None:
120        """Remove the metadata from the file.
121
122        Args:
123            lightweight_mode (bool, optional): Use mat2 lightweight mode to
124                preserve data integrity. Defaults to False.
125        """
126        if self.state not in [
127            FileState.HAS_METADATA,
128            FileState.HAS_NO_METADATA
129        ]:
130            return
131        logger.info(f"Removing metadata for {self.filename}...")
132        self._set_state(FileState.REMOVING_METADATA)
133        try:
134            self._parser.output_filename = self._temp_path
135            self._parser.lightweight_cleaning = lightweight_mode
136            self._parser.remove_all()
137            if not os.path.exists(self._temp_path):
138                raise RuntimeError(_(
139                    "Something bad happened during the removal, "
140                    "cleaned file not found"
141                ))
142        except Exception as e:
143            self.error = e
144            logger.warning(
145                f"Error while removing metadata for {self.filename}: {e}"
146            )
147            self._set_state(FileState.ERROR_WHILE_REMOVING_METADATA)
148        else:
149            logger.info(f"{self.filename} has been cleaned.")
150            self._set_state(FileState.CLEANED)
151
152    def save(self) -> None:
153        """Save the cleaned file."""
154        if self.state != FileState.CLEANED:
155            return
156        logger.info(f"Saving {self.filename}...")
157        self._set_state(FileState.SAVING)
158        try:
159            cleaned_gfile = Gio.File.new_for_path(self._temp_path)
160            cleaned_gfile.move(
161                self._gfile,
162                Gio.FileCopyFlags.OVERWRITE,
163                None,
164                None,
165                None
166            )
167        except Exception as e:
168            self.error = e
169            logger.warning(f"Error while saving {self.filename}: {e}")
170            self._set_state(FileState.ERROR_WHILE_SAVING)
171        else:
172            logger.info(f"{self.filename} has been saved.")
173            self._set_state(FileState.SAVED)
174
175    def remove(self) -> None:
176        """Remove the file from the application."""
177        if os.path.exists(self._temp_path):
178            os.remove(self._temp_path)
179        GLib.idle_add(self.emit, "removed")
180