1# SPDX-FileCopyrightText: 2020 Romain Vigier <contact AT romainvigier.fr>
2# SPDX-License-Identifier: GPL-3.0-or-later
3
4"""Files Manager object and states."""
5
6import libmat2
7import logging
8import mimetypes
9
10from concurrent.futures import ThreadPoolExecutor, as_completed
11from enum import IntEnum, auto
12from gi.repository import Gio, GLib, GObject
13from threading import Thread
14from typing import Dict, Iterable, List, Set
15
16from metadatacleaner.file import File, FileState
17from metadatacleaner.logger import Logger as logger
18
19
20def _get_supported_formats() -> Dict:
21    formats = {}
22    for parser in libmat2.parser_factory._get_parsers():
23        for mimetype in parser.mimetypes:
24            extensions = set()
25            for extension in mimetypes.guess_all_extensions(mimetype):
26                if extension not in libmat2.UNSUPPORTED_EXTENSIONS:
27                    extensions.add(extension)
28            if not extensions:
29                continue
30            formats[mimetype] = extensions
31    return formats
32
33
34SUPPORTED_FORMATS = _get_supported_formats()
35
36
37class FilesManagerState(IntEnum):
38    """States the Files Manager can have."""
39
40    IDLE = auto()
41    WORKING = auto()
42
43
44class FilesManager(GObject.GObject):
45    """Files Manager object."""
46
47    __gsignals__ = {
48        "file-added": (GObject.SIGNAL_RUN_FIRST, None, (int,)),
49        "file-removed": (GObject.SIGNAL_RUN_FIRST, None, ()),
50        "file-state-changed": (GObject.SIGNAL_RUN_FIRST, None, (int,)),
51        "state-changed": (GObject.SIGNAL_RUN_FIRST, None, (int,)),
52        "progress-changed": (GObject.SIGNAL_RUN_FIRST, None, (int, int))
53    }
54
55    def __init__(self) -> None:
56        """Files Manager initialization."""
57        super().__init__()
58        self._files: List[File] = []
59        self._paths: Set = set()
60        self.state = FilesManagerState.IDLE
61        self.progress = (0, 0)
62        self.lightweight_mode = False
63
64    def _on_file_state_changed(self, f: File, new_state: FileState) -> None:
65        GLib.idle_add(self.emit, "file-state-changed", self._files.index(f))
66
67    def _set_state(self, state: FilesManagerState) -> None:
68        if state == self.state:
69            return
70        self.state = state
71        logger.debug(
72            f"State of files manager changed to {str(self.state)}."
73        )
74        GLib.idle_add(self.emit, "state-changed", state)
75
76    def _set_progress(self, current: int, total: int) -> None:
77        self.progress = (current, total)
78        logger.debug(f"Files manager progress set to {self.progress}.")
79        GLib.idle_add(self.emit, "progress-changed", current, total)
80
81    def get_files(self) -> List[File]:
82        """Get all the files from the Files Manager.
83
84        Returns:
85            List[File]: List of files.
86        """
87        return self._files
88
89    def get_file(self, index: int) -> File:
90        """Get a files at a specific index.
91
92        Args:
93            index (int): Index of the file.
94
95        Returns:
96            File: The requested file.
97        """
98        return self._files[index]
99
100    def add_gfiles(self, gfiles: List[Gio.File]) -> None:
101        """Add Gio Files to the Files Manager.
102
103        Args:
104            gfiles (List[Gio.File]): List of Gio Files to add.
105        """
106        thread = Thread(
107            target=self._add_gfiles_async,
108            args=(gfiles,),
109            daemon=True
110        )
111        thread.start()
112
113    def _add_gfiles_async(self, gfiles: List[Gio.File]) -> None:
114        number_of_gfiles = len(gfiles)
115        self._set_progress(0, number_of_gfiles)
116        self._set_state(FilesManagerState.WORKING)
117        with ThreadPoolExecutor() as executor:
118            futures = {
119                executor.submit(self.add_gfile, gfile)
120                for gfile in gfiles
121            }
122            for i, future in enumerate(as_completed(futures)):
123                self._set_progress(i + 1, number_of_gfiles)
124        self._set_state(FilesManagerState.IDLE)
125
126    def add_gfile(self, gfile: Gio.File) -> None:
127        """Add a Gio File to the Files Manager.
128
129        Args:
130            gfile (Gio.File): The Gio File to add.
131        """
132        if gfile.get_path() in self._paths:
133            logger.info(f"Skipping {gfile.get_path()}, already added.")
134            return
135        if not gfile.query_exists(None):
136            logger.info(f"File {gfile.get_path()} does not exist, skipping.")
137            return
138        logger.info(f"Adding {gfile.get_path()}...")
139        f = File(gfile)
140        self._paths.add(f.path)
141        self._files.append(f)
142        GLib.idle_add(self.emit, "file-added", len(self._files) - 1)
143        f.connect("state-changed", self._on_file_state_changed)
144        f.initialize_parser()
145        f.check_metadata()
146
147    def remove_file(self, f: File) -> None:
148        """Remove a file from the Files Manager.
149
150        Args:
151            f (File): The file to remove.
152        """
153        self._files.remove(f)
154        self._paths.remove(f.path)
155        f.remove()
156        GLib.idle_add(self.emit, "file-removed")
157
158    def clean_files(self) -> None:
159        """Remove metadata from all the cleanable files."""
160        thread = Thread(
161            target=self._clean_files_async,
162            daemon=True
163        )
164        thread.start()
165
166    def _clean_files_async(self) -> None:
167        cleanable_files = self.get_cleanable_files()
168        number_of_cleanable_files = len(cleanable_files)
169        self._set_progress(0, number_of_cleanable_files)
170        self._set_state(FilesManagerState.WORKING)
171        with ThreadPoolExecutor() as executor:
172            futures = {
173                executor.submit(f.remove_metadata, self.lightweight_mode)
174                for f in cleanable_files
175            }
176            for i, future in enumerate(as_completed(futures)):
177                self._set_progress(i + 1, number_of_cleanable_files)
178        self._set_state(FilesManagerState.IDLE)
179
180    def save_cleaned_files(self) -> None:
181        """Save the cleaned files."""
182        thread = Thread(target=self._save_cleaned_files_async, daemon=True)
183        thread.start()
184
185    def _save_cleaned_files_async(self) -> None:
186        cleaned_files = self.get_cleaned_files()
187        number_of_cleaned_files = len(cleaned_files)
188        self._set_progress(0, number_of_cleaned_files)
189        self._set_state(FilesManagerState.WORKING)
190        with ThreadPoolExecutor() as executor:
191            futures = {executor.submit(f.save)for f in cleaned_files}
192            for i, future in enumerate(as_completed(futures)):
193                self._set_progress(i + 1, number_of_cleaned_files)
194        self._set_state(FilesManagerState.IDLE)
195
196    def get_cleanable_files(self) -> List[File]:
197        """Get all the cleanable files.
198
199        Returns:
200            List[File]: List of cleanable files.
201        """
202        return self._get_files_with_states((
203            FileState.HAS_METADATA,
204            FileState.HAS_NO_METADATA
205        ))
206
207    def get_cleaned_files(self) -> List[File]:
208        """Get all the cleaned files.
209
210        Returns:
211            List[File]: List of cleaned files.
212        """
213        return self._get_files_with_states((FileState.CLEANED,))
214
215    def _get_files_with_states(
216        self,
217        states: Iterable[FileState]
218    ) -> List[File]:
219        wanted_files: List[File] = []
220        for f in self._files:
221            if f.state in states:
222                wanted_files.append(f)
223        return wanted_files
224