1# Copyright (c) 2019 Ultimaker B.V.
2# Uranium is released under the terms of the LGPLv3 or higher.
3
4import imp
5import json
6import os
7import shutil  # For deleting plugin directories;
8import stat  # For setting file permissions correctly;
9import time
10import types
11import zipfile
12from typing import Any, Callable, Dict, List, Optional, Tuple, TYPE_CHECKING
13
14from PyQt5.QtCore import QCoreApplication
15from PyQt5.QtCore import QObject, pyqtSlot, QUrl, pyqtProperty, pyqtSignal
16
17from UM.Logger import Logger
18from UM.Message import Message
19from UM.Platform import Platform
20from UM.PluginError import PluginNotFoundError, InvalidMetaDataError
21from UM.PluginObject import PluginObject  # For type hinting
22from UM.Resources import Resources
23from UM.Trust import Trust, TrustException, TrustBasics
24from UM.Version import Version
25from UM.i18n import i18nCatalog
26
27i18n_catalog = i18nCatalog("uranium")
28
29if TYPE_CHECKING:
30    from UM.Application import Application
31
32
33plugin_path_ignore_list = ["__pycache__", "tests", ".git"]
34
35
36class PluginRegistry(QObject):
37    """A central object to dynamically load modules as plugins.
38
39    The PluginRegistry class can load modules dynamically and use
40    them as plugins. Each plugin module is expected to be a directory with
41    and `__init__` file defining a `getMetaData` and a `register` function.
42
43    For more details, see the [plugins] file.
44
45    [plugins]: docs/plugins.md
46    """
47
48    def __init__(self, application: "Application", parent: QObject = None) -> None:
49        if PluginRegistry.__instance is not None:
50            raise RuntimeError("Try to create singleton '%s' more than once" % self.__class__.__name__)
51        PluginRegistry.__instance = self
52
53        super().__init__(parent)
54        self._application = application  # type: Application
55        self._api_version = application.getAPIVersion()  # type: Version
56
57        self._all_plugins = []        # type: List[str]
58        self._metadata = {}           # type: Dict[str, Dict[str, Any]]
59
60        self._plugins_installed = []  # type: List[str]
61
62        # NOTE: The disabled_plugins and plugins_to_remove is explicitly set to None.
63        # When actually loading the preferences, it's set to a list. This way we can see the
64        # difference between no list and an empty one.
65        self._disabled_plugins = []  # type: List[str]
66        self._outdated_plugins = []  # type: List[str]
67        self._plugins_to_install = dict()  # type: Dict[str, Dict[str, str]]
68        self._plugins_to_remove = []  # type: List[str]
69
70        self._plugins = {}            # type: Dict[str, types.ModuleType]
71        self._found_plugins = {}      # type: Dict[str, types.ModuleType]  # Cache to speed up _findPlugin
72        self._plugin_objects = {}     # type: Dict[str, PluginObject]
73
74        self._plugin_locations = []  # type: List[str]
75        self._plugin_folder_cache = {}  # type: Dict[str, List[Tuple[str, str]]]  # Cache to speed up _locatePlugin
76
77        self._bundled_plugin_cache = {}  # type: Dict[str, bool]
78
79        self._supported_file_types = {"umplugin": "Uranium Plugin"} # type: Dict[str, str]
80
81        self._check_if_trusted = False  # type: bool
82        self._checked_plugin_ids = []     # type: List[str]
83        self._distrusted_plugin_ids = []  # type: List[str]
84        self._trust_checker = None  # type: Optional[Trust]
85
86    def setCheckIfTrusted(self, check_if_trusted: bool) -> None:
87        self._check_if_trusted = check_if_trusted
88        if self._check_if_trusted:
89            self._trust_checker = Trust.getInstance()
90            # 'Trust.getInstance()' will raise an exception if anything goes wrong (e.g.: 'unable to read public key').
91            # Any such exception is explicitly _not_ caught here, as the application should quit with a crash.
92
93    def getCheckIfTrusted(self) -> bool:
94        return self._check_if_trusted
95
96    def initializeBeforePluginsAreLoaded(self) -> None:
97        config_path = Resources.getConfigStoragePath()
98
99        # File to store plugin info, such as which ones to install/remove and which ones are disabled.
100        # At this point we can load this here because we already know the actual Application name, so the directory name
101        self._plugin_config_filename = os.path.join(os.path.abspath(config_path), "plugins.json") # type: str
102
103        from UM.Settings.ContainerRegistry import ContainerRegistry
104        container_registry = ContainerRegistry.getInstance()
105
106        try:
107            with container_registry.lockFile():
108                # Load the plugin info if exists
109                if os.path.exists(self._plugin_config_filename):
110                    Logger.log("i", "Loading plugin configuration file '%s'", self._plugin_config_filename)
111                    with open(self._plugin_config_filename, "r", encoding = "utf-8") as f:
112                        data = json.load(f)
113                        self._disabled_plugins = data["disabled"]
114                        self._plugins_to_install = data["to_install"]
115                        self._plugins_to_remove = data["to_remove"]
116        except:
117            Logger.logException("e", "Failed to load plugin configuration file '%s'", self._plugin_config_filename)
118
119        # Also load data from preferences, where the plugin info used to be saved
120        preferences = self._application.getPreferences()
121        disabled_plugins = preferences.getValue("general/disabled_plugins")
122        disabled_plugins = disabled_plugins.split(",") if disabled_plugins else []
123        disabled_plugins = [plugin for plugin in disabled_plugins if len(plugin.strip()) > 0]
124        for plugin_id in disabled_plugins:
125            if plugin_id not in self._disabled_plugins:
126                self._disabled_plugins.append(plugin_id)
127
128        plugins_to_remove = preferences.getValue("general/plugins_to_remove")
129        plugins_to_remove = plugins_to_remove.split(",") if plugins_to_remove else []
130        for plugin_id in plugins_to_remove:
131            if plugin_id not in self._plugins_to_remove:
132                self._plugins_to_remove.append(plugin_id)
133
134        # Remove plugins that need to be removed
135        for plugin_id in self._plugins_to_remove:
136            self._removePlugin(plugin_id)
137        self._plugins_to_remove = []
138        if plugins_to_remove is not None:
139            preferences.setValue("general/plugins_to_remove", "")
140        self._savePluginData()
141
142        # Install the plugins that need to be installed (overwrite existing)
143        for plugin_id, plugin_info in self._plugins_to_install.items():
144            self._installPlugin(plugin_id, plugin_info["filename"])
145        self._plugins_to_install = {}
146        self._savePluginData()
147
148    def initializeAfterPluginsAreLoaded(self) -> None:
149        preferences = self._application.getPreferences()
150
151        # Remove the old preferences settings from preferences
152        preferences.resetPreference("general/disabled_plugins")
153        preferences.resetPreference("general/plugins_to_remove")
154
155    def _savePluginData(self) -> None:
156        from UM.Settings.ContainerRegistry import ContainerRegistry
157        container_registry = ContainerRegistry.getInstance()
158        try:
159            with container_registry.lockFile():
160                with open(self._plugin_config_filename, "w", encoding = "utf-8") as f:
161                    data = json.dumps({"disabled": self._disabled_plugins,
162                                       "to_install": self._plugins_to_install,
163                                       "to_remove": self._plugins_to_remove,
164                                       })
165                    f.write(data)
166        except:
167            # Since we're writing to file (and waiting for a lock), there are a few things that can go wrong.
168            # There is no need to crash the application for this, but it is a failure that we want to log.
169            Logger.logException("e", "Unable to save the plugin data.")
170
171    # TODO:
172    # - [ ] Improve how metadata is stored. It should not be in the 'plugin' prop
173    #       of the dictionary item.
174    # - [ ] Remove usage of "active" in favor of "enabled".
175    # - [ ] Switch self._disabled_plugins to self._plugins_disabled
176    # - [ ] External plugins only appear in installed after restart
177    #
178    # NOMENCLATURE:
179    # Enabled (active):  A plugin which is installed and currently enabled.
180    # Disabled: A plugin which is installed but not currently enabled.
181    # Available: A plugin which is not installed but could be.
182    # Installed: A plugin which is installed locally in Cura.
183
184    #===============================================================================
185    # PUBLIC METHODS
186    #===============================================================================
187
188    #   Add a plugin location to the list of locations to search:
189    def addPluginLocation(self, location: str) -> None:
190        #TODO: Add error checking!
191        self._plugin_locations.append(location)
192
193    #   Check if all required plugins are loaded:
194    def checkRequiredPlugins(self, required_plugins: List[str]) -> bool:
195        plugins = self._findInstalledPlugins()
196        for plugin_id in required_plugins:
197            if plugin_id not in plugins:
198                Logger.log("e", "Plugin %s is required, but not added or loaded", plugin_id)
199                return False
200        return True
201
202    #   Remove plugin from the list of enabled plugins and save to preferences:
203    def disablePlugin(self, plugin_id: str) -> None:
204        if plugin_id not in self._disabled_plugins:
205            self._disabled_plugins.append(plugin_id)
206        self._savePluginData()
207
208    #   Add plugin to the list of enabled plugins and save to preferences:
209    def enablePlugin(self, plugin_id: str) -> None:
210        if plugin_id in self._disabled_plugins:
211            self._disabled_plugins.remove(plugin_id)
212        self._savePluginData()
213
214    #   Get a list of enabled plugins:
215    def getActivePlugins(self) -> List[str]:
216        plugin_list = []
217        for plugin_id in self._all_plugins:
218            if self.isActivePlugin(plugin_id):
219                plugin_list.append(plugin_id)
220        return plugin_list
221
222    #   Get a list of all metadata matching a certain subset of metadata:
223    #   \param kwargs Keyword arguments.
224    #       Possible keywords:
225    #       - filter: \type{dict} The subset of metadata that should be matched.
226    #       - active_only: Boolean, True when only active plugin metadata should
227    #         be returned.
228    def getAllMetaData(self, **kwargs: Any):
229        data_filter = kwargs.get("filter", {})
230        active_only = kwargs.get("active_only", False)
231        metadata_list = []
232        for plugin_id in self._all_plugins:
233            if active_only and (plugin_id in self._disabled_plugins or plugin_id in self._outdated_plugins):
234                continue
235            plugin_metadata = self.getMetaData(plugin_id)
236            if self._subsetInDict(plugin_metadata, data_filter):
237                metadata_list.append(plugin_metadata)
238        return metadata_list
239
240    #   Get a list of disabled plugins:
241    def getDisabledPlugins(self) -> List[str]:
242        return self._disabled_plugins
243
244    #   Get a list of installed plugins:
245    #   NOTE: These are plugins which have already been registered. This list is
246    #         actually populated by the private _findInstalledPlugins() method.
247    def getInstalledPlugins(self) -> List[str]:
248        plugins = self._plugins_installed.copy()
249        for plugin_id in self._plugins_to_remove:
250            if plugin_id in plugins:
251                plugins.remove(plugin_id)
252        for plugin_id in self._plugins_to_install:
253            if plugin_id not in plugins:
254                plugins.append(plugin_id)
255        return sorted(plugins)
256
257    #   Get the metadata for a certain plugin:
258    #   NOTE: InvalidMetaDataError is raised when no metadata can be found or
259    #         the metadata misses the right keys.
260    def getMetaData(self, plugin_id: str) -> Dict[str, Any]:
261        if plugin_id not in self._metadata:
262            try:
263                if not self._populateMetaData(plugin_id):
264                    return {}
265            except InvalidMetaDataError:
266                return {}
267
268        return self._metadata[plugin_id]
269
270    @pyqtSlot(str, result = "QVariantMap")
271    def installPlugin(self, plugin_path: str) -> Optional[Dict[str, str]]:
272        plugin_path = QUrl(plugin_path).toLocalFile()
273
274        plugin_id = self._getPluginIdFromFile(plugin_path)
275        if plugin_id is None: #Failed to load.
276            return None
277
278        # Remove it from the to-be-removed list if it's there
279        if plugin_id in self._plugins_to_remove:
280            self._plugins_to_remove.remove(plugin_id)
281            self._savePluginData()
282
283        # Copy the plugin file to the cache directory so it can later be used for installation
284        cache_dir = os.path.join(Resources.getCacheStoragePath(), "plugins")
285        if not os.path.exists(cache_dir):
286            os.makedirs(cache_dir, exist_ok = True)
287        cache_plugin_filename = os.path.join(cache_dir, plugin_id + ".plugin")
288        if os.path.exists(cache_plugin_filename):
289            os.remove(cache_plugin_filename)
290        shutil.copy2(plugin_path, cache_plugin_filename)
291
292        # Add new install data
293        install_info = {"plugin_id": plugin_id,
294                        "filename": cache_plugin_filename}
295        self._plugins_to_install[plugin_id] = install_info
296        self._savePluginData()
297        Logger.log("i", "Plugin '%s' has been scheduled for installation.", plugin_id)
298
299        result = {"status": "ok",
300                  "id": "",
301                  "message": i18n_catalog.i18nc("@info:status", "The plugin has been installed.\nPlease re-start the application to activate the plugin."),
302                  }
303        return result
304
305    #   Check by ID if a plugin is active (enabled):
306    def isActivePlugin(self, plugin_id: str) -> bool:
307        if plugin_id not in self._disabled_plugins and plugin_id not in self._outdated_plugins and plugin_id in self._all_plugins:
308            return True
309
310        return False
311
312    def isBundledPlugin(self, plugin_id: str) -> bool:
313        if plugin_id in self._bundled_plugin_cache:
314            return self._bundled_plugin_cache[plugin_id]
315        install_prefix = os.path.abspath(self._application.getInstallPrefix())
316
317        # Go through all plugin locations and check if the given plugin is located in the installation path.
318        is_bundled = False
319        for plugin_dir in self._plugin_locations:
320            try:
321                is_in_installation_path = os.path.commonpath([install_prefix, plugin_dir]).startswith(install_prefix)
322            except ValueError:
323                is_in_installation_path = False
324            if not is_in_installation_path:
325                # To prevent the situation in a 'trusted' env. that the user-folder has a supposedly 'bundled' plugin:
326                if self._check_if_trusted:
327                    result = self._locatePlugin(plugin_id, plugin_dir)
328                    if result:
329                        is_bundled = False
330                        break
331                else:
332                    continue
333
334            result = self._locatePlugin(plugin_id, plugin_dir)
335            if result:
336                is_bundled = True
337                break
338        self._bundled_plugin_cache[plugin_id] = is_bundled
339        return is_bundled
340    def loadPlugins(self, metadata: Optional[Dict[str, Any]] = None) -> None:
341        """Load all plugins matching a certain set of metadata
342
343        :param metadata: The meta data that needs to be matched.
344        NOTE: This is the method which kicks everything off at app launch.
345        """
346
347        start_time = time.time()
348        # Get a list of all installed plugins:
349        plugin_ids = self._findInstalledPlugins()
350        for plugin_id in plugin_ids:
351            # Get the plugin metadata:
352            try:
353                plugin_metadata = self.getMetaData(plugin_id)
354            except TrustException:
355                Logger.error("Plugin {} was not loaded because it could not be verified.", plugin_id)
356                message_text = i18n_catalog.i18nc("@error:untrusted",
357                                                  "Plugin {} was not loaded because it could not be verified.", plugin_id)
358                Message(text = message_text).show()
359                continue
360
361            # Save all metadata to the metadata dictionary:
362            self._metadata[plugin_id] = plugin_metadata
363            if metadata is None or self._subsetInDict(self._metadata[plugin_id], metadata):
364                #
365                try:
366                    self.loadPlugin(plugin_id)
367                    QCoreApplication.processEvents()  # Ensure that the GUI does not freeze.
368                    # Add the plugin to the list after actually load the plugin:
369                    self._all_plugins.append(plugin_id)
370                    self._plugins_installed.append(plugin_id)
371                except PluginNotFoundError:
372                    pass
373        Logger.log("d", "Loading all plugins took %s seconds", time.time() - start_time)
374
375    # Checks if the given plugin API version is compatible with the current version.
376    def isPluginApiVersionCompatible(self, plugin_api_version: "Version") -> bool:
377        return plugin_api_version.getMajor() == self._api_version.getMajor() \
378               and plugin_api_version.getMinor() <= self._api_version.getMinor()
379
380    #   Load a single plugin by ID:
381    def loadPlugin(self, plugin_id: str) -> None:
382        # If plugin has already been loaded, do not load it again:
383        if plugin_id in self._plugins:
384            Logger.log("w", "Plugin %s was already loaded", plugin_id)
385            return
386
387        # Find the actual plugin on drive, do security checks if necessary:
388        plugin = self._findPlugin(plugin_id)
389
390        # If not found, raise error:
391        if not plugin:
392            raise PluginNotFoundError(plugin_id)
393
394        # If found, but isn't in the metadata dictionary, add it:
395        if plugin_id not in self._metadata:
396            try:
397                self._populateMetaData(plugin_id)
398            except InvalidMetaDataError:
399                return
400
401        # Do not load plugin that has been disabled
402        if plugin_id in self._disabled_plugins:
403            Logger.log("i", "Plugin [%s] has been disabled. Skip loading it.", plugin_id)
404            return
405
406        # If API version is incompatible, don't load it.
407        supported_sdk_versions = self._metadata[plugin_id].get("plugin", {}).get("supported_sdk_versions", [Version("0")])
408        is_plugin_supported = False
409        for supported_sdk_version in supported_sdk_versions:
410            is_plugin_supported |= self.isPluginApiVersionCompatible(supported_sdk_version)
411            if is_plugin_supported:
412                break
413
414        if not is_plugin_supported:
415            Logger.log("w", "Plugin [%s] with supported sdk versions [%s] is incompatible with the current sdk version [%s].",
416                       plugin_id, [str(version) for version in supported_sdk_versions], self._api_version)
417            self._outdated_plugins.append(plugin_id)
418            return
419
420        try:
421            to_register = plugin.register(self._application)  # type: ignore  # We catch AttributeError on this in case register() doesn't exist.
422            if not to_register:
423                Logger.log("w", "Plugin %s did not return any objects to register", plugin_id)
424                return
425            for plugin_type, plugin_object in to_register.items():
426                if type(plugin_object) == list:
427                    for metadata_index, nested_plugin_object in enumerate(plugin_object):
428                        nested_plugin_object.setVersion(self._metadata[plugin_id].get("plugin", {}).get("version"))
429                        all_metadata = self._metadata[plugin_id].get(plugin_type, [])
430                        try:
431                            nested_plugin_object.setMetaData(all_metadata[metadata_index])
432                        except IndexError:
433                            nested_plugin_object.setMetaData({})
434                        self._addPluginObject(nested_plugin_object, plugin_id, plugin_type)
435                else:
436                    plugin_object.setVersion(self._metadata[plugin_id].get("plugin", {}).get("version"))
437                    metadata = self._metadata[plugin_id].get(plugin_type, {})
438                    if type(metadata) == list:
439                        try:
440                            metadata = metadata[0]
441                        except IndexError:
442                            metadata = {}
443                    plugin_object.setMetaData(metadata)
444                    self._addPluginObject(plugin_object, plugin_id, plugin_type)
445
446            self._plugins[plugin_id] = plugin
447            self.enablePlugin(plugin_id)
448            Logger.info("Loaded plugin %s", plugin_id)
449
450        except Exception as ex:
451            Logger.logException("e", "Error loading plugin %s:", plugin_id)
452
453    #   Uninstall a plugin with a given ID:
454    @pyqtSlot(str, result = "QVariantMap")
455    def uninstallPlugin(self, plugin_id: str) -> Dict[str, str]:
456        result = {"status": "error", "message": "", "id": plugin_id}
457        success_message = i18n_catalog.i18nc("@info:status", "The plugin has been removed.\nPlease restart {0} to finish uninstall.", self._application.getApplicationName())
458
459        if plugin_id not in self._plugins_installed:
460            return result
461
462        in_to_install = plugin_id in self._plugins_to_install
463        if in_to_install:
464            del self._plugins_to_install[plugin_id]
465            self._savePluginData()
466            Logger.log("i", "Plugin '%s' removed from to-be-installed list.", plugin_id)
467        else:
468            if plugin_id not in self._plugins_to_remove:
469                self._plugins_to_remove.append(plugin_id)
470            self._savePluginData()
471            Logger.log("i", "Plugin '%s' has been scheduled for later removal.", plugin_id)
472
473            # Remove the plugin object from the Plugin Registry:
474            self._plugins.pop(plugin_id, None)
475            self._plugins_installed.remove(plugin_id)
476
477        result["status"] = "ok"
478        result["message"] = success_message
479        return result
480
481    # Installs the given plugin file. It will overwrite the existing plugin if present.
482    def _installPlugin(self, plugin_id: str, plugin_path: str) -> None:
483        Logger.log("i", "Attempting to install a new plugin %s from file '%s'", plugin_id, plugin_path)
484
485        local_plugin_path = os.path.join(Resources.getStoragePath(Resources.Resources), "plugins")
486
487        if plugin_id in self._bundled_plugin_cache:
488            del self._bundled_plugin_cache[plugin_id]
489
490        try:
491            with zipfile.ZipFile(plugin_path, "r") as zip_ref:
492                plugin_folder = os.path.join(local_plugin_path, plugin_id)
493
494                # Overwrite the existing plugin if already installed
495                if os.path.isdir(plugin_folder):
496                    shutil.rmtree(plugin_folder, ignore_errors = True)
497                os.makedirs(plugin_folder, exist_ok = True)
498
499                # Extract all files
500                for info in zip_ref.infolist():
501                    extracted_path = zip_ref.extract(info.filename, path = plugin_folder)
502                    permissions = os.stat(extracted_path).st_mode
503                    os.chmod(extracted_path, permissions | stat.S_IEXEC) # Make these files executable.
504        except: # Installing a new plugin should never crash the application.
505            Logger.logException("e", "An exception occurred while installing plugin {path}".format(path = plugin_path))
506
507        if plugin_id in self._disabled_plugins:
508            self._disabled_plugins.remove(plugin_id)
509
510    # Removes the given plugin.
511    def _removePlugin(self, plugin_id: str) -> None:
512        plugin_folder = os.path.join(Resources.getStoragePath(Resources.Resources), "plugins")
513        plugin_path = os.path.join(plugin_folder, plugin_id)
514
515        if plugin_id in self._bundled_plugin_cache:
516            del self.bundled_plugin_cache[plugin_id]
517
518        Logger.log("i", "Attempting to remove plugin '%s' from directory '%s'", plugin_id, plugin_path)
519        shutil.rmtree(plugin_path)
520
521#===============================================================================
522# PRIVATE METHODS
523#===============================================================================
524
525    def _getPluginIdFromFile(self, filename: str) -> Optional[str]:
526        plugin_id = None
527        try:
528            with zipfile.ZipFile(filename, "r") as zip_ref:
529                for file_info in zip_ref.infolist():
530                    if file_info.filename.endswith("/"):
531                        plugin_id = file_info.filename.strip("/")
532                        break
533        except zipfile.BadZipFile:
534            Logger.logException("e", "Failed to load plug-in file. The zip archive seems to be corrupt.")
535            return None  # Signals that loading this failed.
536        except FileNotFoundError:
537            Logger.logException("e", "Failed to load plug-in file as we were unable to find it.")
538            return None  # Signals that loading this failed.
539        return plugin_id
540
541    #   Returns a list of all possible plugin ids in the plugin locations:
542    def _findInstalledPlugins(self, paths = None) -> List[str]:
543        plugin_ids = []
544
545        if not paths:
546            paths = self._plugin_locations
547
548        for folder in paths:
549            try:
550                if not os.path.isdir(folder):
551                    continue
552
553                for file in os.listdir(folder):
554                    filepath = os.path.join(folder, file)
555                    if os.path.isdir(filepath):
556                        if os.path.isfile(os.path.join(filepath, "__init__.py")):
557                            plugin_ids.append(file)
558                        else:
559                            plugin_ids += self._findInstalledPlugins([filepath])
560            except EnvironmentError as err:
561                Logger.warning("Unable to read folder {folder}: {err}".format(folder = folder, err = err))
562                continue
563
564        return plugin_ids
565
566    def _findPlugin(self, plugin_id: str) -> Optional[types.ModuleType]:
567        """Try to find a module implementing a plugin
568
569        :param plugin_id: The name of the plugin to find
570        :returns: module if it was found (and, if 'self._check_if_trusted' is set, also secure), None otherwise
571        """
572
573        if plugin_id in self._found_plugins:
574            return self._found_plugins[plugin_id]
575        location = None
576        for folder in self._plugin_locations:
577            location = self._locatePlugin(plugin_id, folder)
578            if location:
579                break
580
581        if not location:
582            return None
583
584        try:
585            file, path, desc = imp.find_module(plugin_id, [location])
586        except Exception:
587            Logger.logException("e", "Import error when importing %s", plugin_id)
588            return None
589
590        # Define a trusted plugin as either: already checked, correctly signed, or bundled with the application.
591        if self._check_if_trusted and plugin_id not in self._checked_plugin_ids and not self.isBundledPlugin(plugin_id):
592
593            # NOTE: '__pychache__'s (+ subfolders) are deleted on startup _before_ load module:
594            if not TrustBasics.removeCached(path):
595                self._distrusted_plugin_ids.append(plugin_id)
596                return None
597
598            # Do the actual check:
599            if self._trust_checker is not None and self._trust_checker.signedFolderCheck(path):
600                self._checked_plugin_ids.append(plugin_id)
601            else:
602                self._distrusted_plugin_ids.append(plugin_id)
603                return None
604
605        try:
606            module = imp.load_module(plugin_id, file, path, desc) #type: ignore #MyPy gets the wrong output type from imp.find_module for some reason.
607        except Exception:
608            Logger.logException("e", "Import error loading module %s", plugin_id)
609            return None
610        finally:
611            if file:
612                os.close(file) #type: ignore #MyPy gets the wrong output type from imp.find_module for some reason.
613        self._found_plugins[plugin_id] = module
614        return module
615
616    def _locatePlugin(self, plugin_id: str, folder: str) -> Optional[str]:
617        if not os.path.isdir(folder):
618            return None
619
620        # self._plugin_folder_cache is a per-plugin-location list of all subfolders that contain a __init__.py file
621        if folder not in self._plugin_folder_cache:
622            plugin_folders = []
623            for root, dirs, files in os.walk(folder, topdown = True, followlinks = True):
624                # modify dirs in place to ignore .git, pycache and test folders completely
625                dirs[:] = [d for d in dirs if d not in plugin_path_ignore_list]
626
627                if "plugin.json" in files:
628                    plugin_folders.append((root, os.path.basename(root)))
629
630            self._plugin_folder_cache[folder] = plugin_folders
631
632        for folder_path, folder_name in self._plugin_folder_cache[folder]:
633            if folder_name == plugin_id:
634                return os.path.abspath(os.path.join(folder_path, ".."))
635
636        return None
637
638    #   Load the plugin data from the stream and in-place update the metadata.
639    def _parsePluginInfo(self, plugin_id, file_data, meta_data):
640        try:
641            meta_data["plugin"] = json.loads(file_data)
642        except json.decoder.JSONDecodeError:
643            Logger.logException("e", "Failed to parse plugin.json for plugin %s", plugin_id)
644            raise InvalidMetaDataError(plugin_id)
645
646        # Check if metadata is valid;
647        if "version" not in meta_data["plugin"]:
648            Logger.log("e", "Version must be set!")
649            raise InvalidMetaDataError(plugin_id)
650
651        # Check if the plugin states what API version it needs.
652        if "api" not in meta_data["plugin"] and "supported_sdk_versions" not in meta_data["plugin"]:
653            Logger.log("e", "The API or the supported_sdk_versions must be set!")
654            raise InvalidMetaDataError(plugin_id)
655        else:
656            # Store the api_version as a Version object.
657            all_supported_sdk_versions = []  # type: List[Version]
658            if "supported_sdk_versions" in meta_data["plugin"]:
659                all_supported_sdk_versions += [Version(supported_version) for supported_version in
660                                               meta_data["plugin"]["supported_sdk_versions"]]
661            if "api" in meta_data["plugin"]:
662                all_supported_sdk_versions += [Version(meta_data["plugin"]["api"])]
663            meta_data["plugin"]["supported_sdk_versions"] = all_supported_sdk_versions
664
665        if "i18n-catalog" in meta_data["plugin"]:
666            # A catalog was set, try to translate a few strings
667            i18n_catalog = i18nCatalog(meta_data["plugin"]["i18n-catalog"])
668            if "name" in meta_data["plugin"]:
669                meta_data["plugin"]["name"] = i18n_catalog.i18n(meta_data["plugin"]["name"])
670            if "description" in meta_data["plugin"]:
671                meta_data["plugin"]["description"] = i18n_catalog.i18n(meta_data["plugin"]["description"])
672
673    def _populateMetaData(self, plugin_id: str) -> bool:
674        """Populate the list of metadata"""
675
676        plugin = self._findPlugin(plugin_id)
677        if not plugin:
678            Logger.log("w", "Could not find plugin %s", plugin_id)
679            return False
680
681        location = None
682        for folder in self._plugin_locations:
683            location = self._locatePlugin(plugin_id, folder)
684            if location:
685                break
686
687        if not location:
688            Logger.log("w", "Could not find plugin %s", plugin_id)
689            return False
690        location = os.path.join(location, plugin_id)
691
692        try:
693            meta_data = plugin.getMetaData() #type: ignore #We catch the AttributeError that this would raise if the module has no getMetaData function.
694            metadata_file = os.path.join(location, "plugin.json")
695            try:
696                with open(metadata_file, "r", encoding = "utf-8") as file_stream:
697                    self._parsePluginInfo(plugin_id, file_stream.read(), meta_data)
698            except FileNotFoundError:
699                Logger.logException("e", "Unable to find the required plugin.json file for plugin %s", plugin_id)
700                raise InvalidMetaDataError(plugin_id)
701            except UnicodeDecodeError:
702                Logger.logException("e", "The plug-in metadata file for plug-in {plugin_id} is corrupt.".format(plugin_id = plugin_id))
703                raise InvalidMetaDataError(plugin_id)
704            except EnvironmentError as e:
705                Logger.logException("e", "Can't open the metadata file for plug-in {plugin_id}: {err}".format(plugin_id = plugin_id, err = str(e)))
706                raise InvalidMetaDataError(plugin_id)
707
708        except AttributeError as e:
709            Logger.log("e", "Plug-in {plugin_id} has no getMetaData function to get metadata of the plug-in: {err}".format(plugin_id = plugin_id, err = str(e)))
710            raise InvalidMetaDataError(plugin_id)
711        except TypeError as e:
712            Logger.log("e", "Plug-in {plugin_id} has a getMetaData function with the wrong signature: {err}".format(plugin_id = plugin_id, err = str(e)))
713            raise InvalidMetaDataError(plugin_id)
714
715        if not meta_data:
716            raise InvalidMetaDataError(plugin_id)
717
718        meta_data["id"] = plugin_id
719        meta_data["location"] = location
720
721        # Application-specific overrides
722        appname = self._application.getApplicationName()
723        if appname in meta_data:
724            meta_data.update(meta_data[appname])
725            del meta_data[appname]
726
727        self._metadata[plugin_id] = meta_data
728        return True
729
730    #   Check if a certain dictionary contains a certain subset of key/value pairs
731    #   \param dictionary \type{dict} The dictionary to search
732    #   \param subset \type{dict} The subset to search for
733    def _subsetInDict(self, dictionary: Dict[Any, Any], subset: Dict[Any, Any]) -> bool:
734        for key in subset:
735            if key not in dictionary:
736                return False
737            if subset[key] != {} and dictionary[key] != subset[key]:
738                return False
739        return True
740
741    def getPluginObject(self, plugin_id: str) -> PluginObject:
742        """Get a specific plugin object given an ID. If not loaded, load it.
743
744        :param plugin_id: The ID of the plugin object to get.
745        """
746
747        if plugin_id not in self._plugins:
748            self.loadPlugin(plugin_id)
749        if plugin_id not in self._plugin_objects:
750            raise PluginNotFoundError(plugin_id)
751        return self._plugin_objects[plugin_id]
752
753    def _addPluginObject(self, plugin_object: PluginObject, plugin_id: str, plugin_type: str) -> None:
754        plugin_object.setPluginId(plugin_id)
755        self._plugin_objects[plugin_id] = plugin_object
756        try:
757            self._type_register_map[plugin_type](plugin_object)
758        except Exception as e:
759            Logger.logException("e", "Unable to add plugin %s", plugin_id)
760
761    def addSupportedPluginExtension(self, extension: str, description: str) -> None:
762        if extension not in self._supported_file_types:
763            self._supported_file_types[extension] = description
764            self.supportedPluginExtensionsChanged.emit()
765
766    supportedPluginExtensionsChanged = pyqtSignal()
767
768    @pyqtProperty("QStringList", notify=supportedPluginExtensionsChanged)
769    def supportedPluginExtensions(self) -> List[str]:
770        file_types = []
771        all_types = []
772
773        if Platform.isLinux():
774            for ext, desc in self._supported_file_types.items():
775                file_types.append("{0} (*.{1} *.{2})".format(desc, ext.lower(), ext.upper()))
776                all_types.append("*.{0} *.{1}".format(ext.lower(), ext.upper()))
777        else:
778            for ext, desc in self._supported_file_types.items():
779                file_types.append("{0} (*.{1})".format(desc, ext))
780                all_types.append("*.{0}".format(ext))
781
782        file_types.sort()
783        file_types.insert(0, i18n_catalog.i18nc("@item:inlistbox", "All Supported Types ({0})", " ".join(all_types)))
784        file_types.append(i18n_catalog.i18nc("@item:inlistbox", "All Files (*)"))
785        return file_types
786
787    def getPluginPath(self, plugin_id: str) -> Optional[str]:
788        """Get the path to a plugin.
789
790        :param plugin_id: The PluginObject.getPluginId() of the plugin.
791        :return: The absolute path to the plugin or an empty string if the plugin could not be found.
792        """
793
794        if plugin_id in self._plugins:
795            plugin = self._plugins.get(plugin_id)
796        else:
797            plugin = self._findPlugin(plugin_id)
798
799        if not plugin:
800            return None
801
802        path = os.path.dirname(self._plugins[plugin_id].__file__)
803        if os.path.isdir(path):
804            return path
805
806        return None
807
808    @classmethod
809    def addType(cls, plugin_type: str, register_function: Callable[[Any], None]) -> None:
810        """Add a new plugin type.
811
812        This function is used to add new plugin types. Plugin types are simple
813        string identifiers that match a certain plugin to a registration function.
814
815        The callable `register_function` is responsible for handling the object.
816        Usually it will add the object to a list of objects in the relevant class.
817        For example, the plugin type 'tool' has Controller::addTool as register
818        function.
819
820        `register_function` will be called every time a plugin of `type` is loaded.
821
822        :param plugin_type: The name of the plugin type to add.
823        :param register_function: A callable that takes an object as parameter.
824        """
825
826        cls._type_register_map[plugin_type] = register_function
827
828    @classmethod
829    def removeType(cls, plugin_type: str) -> None:
830        """Remove a plugin type.
831
832        :param plugin_type: The plugin type to remove.
833        """
834
835        if plugin_type in cls._type_register_map:
836            del cls._type_register_map[plugin_type]
837
838    _type_register_map = {}  # type: Dict[str, Callable[[Any], None]]
839    __instance = None    # type: PluginRegistry
840
841    @classmethod
842    def getInstance(cls, *args, **kwargs) -> "PluginRegistry":
843        return cls.__instance
844