1# Copyright (c) 2019 Ultimaker B.V. 2# Uranium is released under the terms of the LGPLv3 or higher. 3 4import imp 5import json 6import os 7import shutil # For deleting plugin directories; 8import stat # For setting file permissions correctly; 9import time 10import types 11import zipfile 12from typing import Any, Callable, Dict, List, Optional, Tuple, TYPE_CHECKING 13 14from PyQt5.QtCore import QCoreApplication 15from PyQt5.QtCore import QObject, pyqtSlot, QUrl, pyqtProperty, pyqtSignal 16 17from UM.Logger import Logger 18from UM.Message import Message 19from UM.Platform import Platform 20from UM.PluginError import PluginNotFoundError, InvalidMetaDataError 21from UM.PluginObject import PluginObject # For type hinting 22from UM.Resources import Resources 23from UM.Trust import Trust, TrustException, TrustBasics 24from UM.Version import Version 25from UM.i18n import i18nCatalog 26 27i18n_catalog = i18nCatalog("uranium") 28 29if TYPE_CHECKING: 30 from UM.Application import Application 31 32 33plugin_path_ignore_list = ["__pycache__", "tests", ".git"] 34 35 36class PluginRegistry(QObject): 37 """A central object to dynamically load modules as plugins. 38 39 The PluginRegistry class can load modules dynamically and use 40 them as plugins. Each plugin module is expected to be a directory with 41 and `__init__` file defining a `getMetaData` and a `register` function. 42 43 For more details, see the [plugins] file. 44 45 [plugins]: docs/plugins.md 46 """ 47 48 def __init__(self, application: "Application", parent: QObject = None) -> None: 49 if PluginRegistry.__instance is not None: 50 raise RuntimeError("Try to create singleton '%s' more than once" % self.__class__.__name__) 51 PluginRegistry.__instance = self 52 53 super().__init__(parent) 54 self._application = application # type: Application 55 self._api_version = application.getAPIVersion() # type: Version 56 57 self._all_plugins = [] # type: List[str] 58 self._metadata = {} # type: Dict[str, Dict[str, Any]] 59 60 self._plugins_installed = [] # type: List[str] 61 62 # NOTE: The disabled_plugins and plugins_to_remove is explicitly set to None. 63 # When actually loading the preferences, it's set to a list. This way we can see the 64 # difference between no list and an empty one. 65 self._disabled_plugins = [] # type: List[str] 66 self._outdated_plugins = [] # type: List[str] 67 self._plugins_to_install = dict() # type: Dict[str, Dict[str, str]] 68 self._plugins_to_remove = [] # type: List[str] 69 70 self._plugins = {} # type: Dict[str, types.ModuleType] 71 self._found_plugins = {} # type: Dict[str, types.ModuleType] # Cache to speed up _findPlugin 72 self._plugin_objects = {} # type: Dict[str, PluginObject] 73 74 self._plugin_locations = [] # type: List[str] 75 self._plugin_folder_cache = {} # type: Dict[str, List[Tuple[str, str]]] # Cache to speed up _locatePlugin 76 77 self._bundled_plugin_cache = {} # type: Dict[str, bool] 78 79 self._supported_file_types = {"umplugin": "Uranium Plugin"} # type: Dict[str, str] 80 81 self._check_if_trusted = False # type: bool 82 self._checked_plugin_ids = [] # type: List[str] 83 self._distrusted_plugin_ids = [] # type: List[str] 84 self._trust_checker = None # type: Optional[Trust] 85 86 def setCheckIfTrusted(self, check_if_trusted: bool) -> None: 87 self._check_if_trusted = check_if_trusted 88 if self._check_if_trusted: 89 self._trust_checker = Trust.getInstance() 90 # 'Trust.getInstance()' will raise an exception if anything goes wrong (e.g.: 'unable to read public key'). 91 # Any such exception is explicitly _not_ caught here, as the application should quit with a crash. 92 93 def getCheckIfTrusted(self) -> bool: 94 return self._check_if_trusted 95 96 def initializeBeforePluginsAreLoaded(self) -> None: 97 config_path = Resources.getConfigStoragePath() 98 99 # File to store plugin info, such as which ones to install/remove and which ones are disabled. 100 # At this point we can load this here because we already know the actual Application name, so the directory name 101 self._plugin_config_filename = os.path.join(os.path.abspath(config_path), "plugins.json") # type: str 102 103 from UM.Settings.ContainerRegistry import ContainerRegistry 104 container_registry = ContainerRegistry.getInstance() 105 106 try: 107 with container_registry.lockFile(): 108 # Load the plugin info if exists 109 if os.path.exists(self._plugin_config_filename): 110 Logger.log("i", "Loading plugin configuration file '%s'", self._plugin_config_filename) 111 with open(self._plugin_config_filename, "r", encoding = "utf-8") as f: 112 data = json.load(f) 113 self._disabled_plugins = data["disabled"] 114 self._plugins_to_install = data["to_install"] 115 self._plugins_to_remove = data["to_remove"] 116 except: 117 Logger.logException("e", "Failed to load plugin configuration file '%s'", self._plugin_config_filename) 118 119 # Also load data from preferences, where the plugin info used to be saved 120 preferences = self._application.getPreferences() 121 disabled_plugins = preferences.getValue("general/disabled_plugins") 122 disabled_plugins = disabled_plugins.split(",") if disabled_plugins else [] 123 disabled_plugins = [plugin for plugin in disabled_plugins if len(plugin.strip()) > 0] 124 for plugin_id in disabled_plugins: 125 if plugin_id not in self._disabled_plugins: 126 self._disabled_plugins.append(plugin_id) 127 128 plugins_to_remove = preferences.getValue("general/plugins_to_remove") 129 plugins_to_remove = plugins_to_remove.split(",") if plugins_to_remove else [] 130 for plugin_id in plugins_to_remove: 131 if plugin_id not in self._plugins_to_remove: 132 self._plugins_to_remove.append(plugin_id) 133 134 # Remove plugins that need to be removed 135 for plugin_id in self._plugins_to_remove: 136 self._removePlugin(plugin_id) 137 self._plugins_to_remove = [] 138 if plugins_to_remove is not None: 139 preferences.setValue("general/plugins_to_remove", "") 140 self._savePluginData() 141 142 # Install the plugins that need to be installed (overwrite existing) 143 for plugin_id, plugin_info in self._plugins_to_install.items(): 144 self._installPlugin(plugin_id, plugin_info["filename"]) 145 self._plugins_to_install = {} 146 self._savePluginData() 147 148 def initializeAfterPluginsAreLoaded(self) -> None: 149 preferences = self._application.getPreferences() 150 151 # Remove the old preferences settings from preferences 152 preferences.resetPreference("general/disabled_plugins") 153 preferences.resetPreference("general/plugins_to_remove") 154 155 def _savePluginData(self) -> None: 156 from UM.Settings.ContainerRegistry import ContainerRegistry 157 container_registry = ContainerRegistry.getInstance() 158 try: 159 with container_registry.lockFile(): 160 with open(self._plugin_config_filename, "w", encoding = "utf-8") as f: 161 data = json.dumps({"disabled": self._disabled_plugins, 162 "to_install": self._plugins_to_install, 163 "to_remove": self._plugins_to_remove, 164 }) 165 f.write(data) 166 except: 167 # Since we're writing to file (and waiting for a lock), there are a few things that can go wrong. 168 # There is no need to crash the application for this, but it is a failure that we want to log. 169 Logger.logException("e", "Unable to save the plugin data.") 170 171 # TODO: 172 # - [ ] Improve how metadata is stored. It should not be in the 'plugin' prop 173 # of the dictionary item. 174 # - [ ] Remove usage of "active" in favor of "enabled". 175 # - [ ] Switch self._disabled_plugins to self._plugins_disabled 176 # - [ ] External plugins only appear in installed after restart 177 # 178 # NOMENCLATURE: 179 # Enabled (active): A plugin which is installed and currently enabled. 180 # Disabled: A plugin which is installed but not currently enabled. 181 # Available: A plugin which is not installed but could be. 182 # Installed: A plugin which is installed locally in Cura. 183 184 #=============================================================================== 185 # PUBLIC METHODS 186 #=============================================================================== 187 188 # Add a plugin location to the list of locations to search: 189 def addPluginLocation(self, location: str) -> None: 190 #TODO: Add error checking! 191 self._plugin_locations.append(location) 192 193 # Check if all required plugins are loaded: 194 def checkRequiredPlugins(self, required_plugins: List[str]) -> bool: 195 plugins = self._findInstalledPlugins() 196 for plugin_id in required_plugins: 197 if plugin_id not in plugins: 198 Logger.log("e", "Plugin %s is required, but not added or loaded", plugin_id) 199 return False 200 return True 201 202 # Remove plugin from the list of enabled plugins and save to preferences: 203 def disablePlugin(self, plugin_id: str) -> None: 204 if plugin_id not in self._disabled_plugins: 205 self._disabled_plugins.append(plugin_id) 206 self._savePluginData() 207 208 # Add plugin to the list of enabled plugins and save to preferences: 209 def enablePlugin(self, plugin_id: str) -> None: 210 if plugin_id in self._disabled_plugins: 211 self._disabled_plugins.remove(plugin_id) 212 self._savePluginData() 213 214 # Get a list of enabled plugins: 215 def getActivePlugins(self) -> List[str]: 216 plugin_list = [] 217 for plugin_id in self._all_plugins: 218 if self.isActivePlugin(plugin_id): 219 plugin_list.append(plugin_id) 220 return plugin_list 221 222 # Get a list of all metadata matching a certain subset of metadata: 223 # \param kwargs Keyword arguments. 224 # Possible keywords: 225 # - filter: \type{dict} The subset of metadata that should be matched. 226 # - active_only: Boolean, True when only active plugin metadata should 227 # be returned. 228 def getAllMetaData(self, **kwargs: Any): 229 data_filter = kwargs.get("filter", {}) 230 active_only = kwargs.get("active_only", False) 231 metadata_list = [] 232 for plugin_id in self._all_plugins: 233 if active_only and (plugin_id in self._disabled_plugins or plugin_id in self._outdated_plugins): 234 continue 235 plugin_metadata = self.getMetaData(plugin_id) 236 if self._subsetInDict(plugin_metadata, data_filter): 237 metadata_list.append(plugin_metadata) 238 return metadata_list 239 240 # Get a list of disabled plugins: 241 def getDisabledPlugins(self) -> List[str]: 242 return self._disabled_plugins 243 244 # Get a list of installed plugins: 245 # NOTE: These are plugins which have already been registered. This list is 246 # actually populated by the private _findInstalledPlugins() method. 247 def getInstalledPlugins(self) -> List[str]: 248 plugins = self._plugins_installed.copy() 249 for plugin_id in self._plugins_to_remove: 250 if plugin_id in plugins: 251 plugins.remove(plugin_id) 252 for plugin_id in self._plugins_to_install: 253 if plugin_id not in plugins: 254 plugins.append(plugin_id) 255 return sorted(plugins) 256 257 # Get the metadata for a certain plugin: 258 # NOTE: InvalidMetaDataError is raised when no metadata can be found or 259 # the metadata misses the right keys. 260 def getMetaData(self, plugin_id: str) -> Dict[str, Any]: 261 if plugin_id not in self._metadata: 262 try: 263 if not self._populateMetaData(plugin_id): 264 return {} 265 except InvalidMetaDataError: 266 return {} 267 268 return self._metadata[plugin_id] 269 270 @pyqtSlot(str, result = "QVariantMap") 271 def installPlugin(self, plugin_path: str) -> Optional[Dict[str, str]]: 272 plugin_path = QUrl(plugin_path).toLocalFile() 273 274 plugin_id = self._getPluginIdFromFile(plugin_path) 275 if plugin_id is None: #Failed to load. 276 return None 277 278 # Remove it from the to-be-removed list if it's there 279 if plugin_id in self._plugins_to_remove: 280 self._plugins_to_remove.remove(plugin_id) 281 self._savePluginData() 282 283 # Copy the plugin file to the cache directory so it can later be used for installation 284 cache_dir = os.path.join(Resources.getCacheStoragePath(), "plugins") 285 if not os.path.exists(cache_dir): 286 os.makedirs(cache_dir, exist_ok = True) 287 cache_plugin_filename = os.path.join(cache_dir, plugin_id + ".plugin") 288 if os.path.exists(cache_plugin_filename): 289 os.remove(cache_plugin_filename) 290 shutil.copy2(plugin_path, cache_plugin_filename) 291 292 # Add new install data 293 install_info = {"plugin_id": plugin_id, 294 "filename": cache_plugin_filename} 295 self._plugins_to_install[plugin_id] = install_info 296 self._savePluginData() 297 Logger.log("i", "Plugin '%s' has been scheduled for installation.", plugin_id) 298 299 result = {"status": "ok", 300 "id": "", 301 "message": i18n_catalog.i18nc("@info:status", "The plugin has been installed.\nPlease re-start the application to activate the plugin."), 302 } 303 return result 304 305 # Check by ID if a plugin is active (enabled): 306 def isActivePlugin(self, plugin_id: str) -> bool: 307 if plugin_id not in self._disabled_plugins and plugin_id not in self._outdated_plugins and plugin_id in self._all_plugins: 308 return True 309 310 return False 311 312 def isBundledPlugin(self, plugin_id: str) -> bool: 313 if plugin_id in self._bundled_plugin_cache: 314 return self._bundled_plugin_cache[plugin_id] 315 install_prefix = os.path.abspath(self._application.getInstallPrefix()) 316 317 # Go through all plugin locations and check if the given plugin is located in the installation path. 318 is_bundled = False 319 for plugin_dir in self._plugin_locations: 320 try: 321 is_in_installation_path = os.path.commonpath([install_prefix, plugin_dir]).startswith(install_prefix) 322 except ValueError: 323 is_in_installation_path = False 324 if not is_in_installation_path: 325 # To prevent the situation in a 'trusted' env. that the user-folder has a supposedly 'bundled' plugin: 326 if self._check_if_trusted: 327 result = self._locatePlugin(plugin_id, plugin_dir) 328 if result: 329 is_bundled = False 330 break 331 else: 332 continue 333 334 result = self._locatePlugin(plugin_id, plugin_dir) 335 if result: 336 is_bundled = True 337 break 338 self._bundled_plugin_cache[plugin_id] = is_bundled 339 return is_bundled 340 def loadPlugins(self, metadata: Optional[Dict[str, Any]] = None) -> None: 341 """Load all plugins matching a certain set of metadata 342 343 :param metadata: The meta data that needs to be matched. 344 NOTE: This is the method which kicks everything off at app launch. 345 """ 346 347 start_time = time.time() 348 # Get a list of all installed plugins: 349 plugin_ids = self._findInstalledPlugins() 350 for plugin_id in plugin_ids: 351 # Get the plugin metadata: 352 try: 353 plugin_metadata = self.getMetaData(plugin_id) 354 except TrustException: 355 Logger.error("Plugin {} was not loaded because it could not be verified.", plugin_id) 356 message_text = i18n_catalog.i18nc("@error:untrusted", 357 "Plugin {} was not loaded because it could not be verified.", plugin_id) 358 Message(text = message_text).show() 359 continue 360 361 # Save all metadata to the metadata dictionary: 362 self._metadata[plugin_id] = plugin_metadata 363 if metadata is None or self._subsetInDict(self._metadata[plugin_id], metadata): 364 # 365 try: 366 self.loadPlugin(plugin_id) 367 QCoreApplication.processEvents() # Ensure that the GUI does not freeze. 368 # Add the plugin to the list after actually load the plugin: 369 self._all_plugins.append(plugin_id) 370 self._plugins_installed.append(plugin_id) 371 except PluginNotFoundError: 372 pass 373 Logger.log("d", "Loading all plugins took %s seconds", time.time() - start_time) 374 375 # Checks if the given plugin API version is compatible with the current version. 376 def isPluginApiVersionCompatible(self, plugin_api_version: "Version") -> bool: 377 return plugin_api_version.getMajor() == self._api_version.getMajor() \ 378 and plugin_api_version.getMinor() <= self._api_version.getMinor() 379 380 # Load a single plugin by ID: 381 def loadPlugin(self, plugin_id: str) -> None: 382 # If plugin has already been loaded, do not load it again: 383 if plugin_id in self._plugins: 384 Logger.log("w", "Plugin %s was already loaded", plugin_id) 385 return 386 387 # Find the actual plugin on drive, do security checks if necessary: 388 plugin = self._findPlugin(plugin_id) 389 390 # If not found, raise error: 391 if not plugin: 392 raise PluginNotFoundError(plugin_id) 393 394 # If found, but isn't in the metadata dictionary, add it: 395 if plugin_id not in self._metadata: 396 try: 397 self._populateMetaData(plugin_id) 398 except InvalidMetaDataError: 399 return 400 401 # Do not load plugin that has been disabled 402 if plugin_id in self._disabled_plugins: 403 Logger.log("i", "Plugin [%s] has been disabled. Skip loading it.", plugin_id) 404 return 405 406 # If API version is incompatible, don't load it. 407 supported_sdk_versions = self._metadata[plugin_id].get("plugin", {}).get("supported_sdk_versions", [Version("0")]) 408 is_plugin_supported = False 409 for supported_sdk_version in supported_sdk_versions: 410 is_plugin_supported |= self.isPluginApiVersionCompatible(supported_sdk_version) 411 if is_plugin_supported: 412 break 413 414 if not is_plugin_supported: 415 Logger.log("w", "Plugin [%s] with supported sdk versions [%s] is incompatible with the current sdk version [%s].", 416 plugin_id, [str(version) for version in supported_sdk_versions], self._api_version) 417 self._outdated_plugins.append(plugin_id) 418 return 419 420 try: 421 to_register = plugin.register(self._application) # type: ignore # We catch AttributeError on this in case register() doesn't exist. 422 if not to_register: 423 Logger.log("w", "Plugin %s did not return any objects to register", plugin_id) 424 return 425 for plugin_type, plugin_object in to_register.items(): 426 if type(plugin_object) == list: 427 for metadata_index, nested_plugin_object in enumerate(plugin_object): 428 nested_plugin_object.setVersion(self._metadata[plugin_id].get("plugin", {}).get("version")) 429 all_metadata = self._metadata[plugin_id].get(plugin_type, []) 430 try: 431 nested_plugin_object.setMetaData(all_metadata[metadata_index]) 432 except IndexError: 433 nested_plugin_object.setMetaData({}) 434 self._addPluginObject(nested_plugin_object, plugin_id, plugin_type) 435 else: 436 plugin_object.setVersion(self._metadata[plugin_id].get("plugin", {}).get("version")) 437 metadata = self._metadata[plugin_id].get(plugin_type, {}) 438 if type(metadata) == list: 439 try: 440 metadata = metadata[0] 441 except IndexError: 442 metadata = {} 443 plugin_object.setMetaData(metadata) 444 self._addPluginObject(plugin_object, plugin_id, plugin_type) 445 446 self._plugins[plugin_id] = plugin 447 self.enablePlugin(plugin_id) 448 Logger.info("Loaded plugin %s", plugin_id) 449 450 except Exception as ex: 451 Logger.logException("e", "Error loading plugin %s:", plugin_id) 452 453 # Uninstall a plugin with a given ID: 454 @pyqtSlot(str, result = "QVariantMap") 455 def uninstallPlugin(self, plugin_id: str) -> Dict[str, str]: 456 result = {"status": "error", "message": "", "id": plugin_id} 457 success_message = i18n_catalog.i18nc("@info:status", "The plugin has been removed.\nPlease restart {0} to finish uninstall.", self._application.getApplicationName()) 458 459 if plugin_id not in self._plugins_installed: 460 return result 461 462 in_to_install = plugin_id in self._plugins_to_install 463 if in_to_install: 464 del self._plugins_to_install[plugin_id] 465 self._savePluginData() 466 Logger.log("i", "Plugin '%s' removed from to-be-installed list.", plugin_id) 467 else: 468 if plugin_id not in self._plugins_to_remove: 469 self._plugins_to_remove.append(plugin_id) 470 self._savePluginData() 471 Logger.log("i", "Plugin '%s' has been scheduled for later removal.", plugin_id) 472 473 # Remove the plugin object from the Plugin Registry: 474 self._plugins.pop(plugin_id, None) 475 self._plugins_installed.remove(plugin_id) 476 477 result["status"] = "ok" 478 result["message"] = success_message 479 return result 480 481 # Installs the given plugin file. It will overwrite the existing plugin if present. 482 def _installPlugin(self, plugin_id: str, plugin_path: str) -> None: 483 Logger.log("i", "Attempting to install a new plugin %s from file '%s'", plugin_id, plugin_path) 484 485 local_plugin_path = os.path.join(Resources.getStoragePath(Resources.Resources), "plugins") 486 487 if plugin_id in self._bundled_plugin_cache: 488 del self._bundled_plugin_cache[plugin_id] 489 490 try: 491 with zipfile.ZipFile(plugin_path, "r") as zip_ref: 492 plugin_folder = os.path.join(local_plugin_path, plugin_id) 493 494 # Overwrite the existing plugin if already installed 495 if os.path.isdir(plugin_folder): 496 shutil.rmtree(plugin_folder, ignore_errors = True) 497 os.makedirs(plugin_folder, exist_ok = True) 498 499 # Extract all files 500 for info in zip_ref.infolist(): 501 extracted_path = zip_ref.extract(info.filename, path = plugin_folder) 502 permissions = os.stat(extracted_path).st_mode 503 os.chmod(extracted_path, permissions | stat.S_IEXEC) # Make these files executable. 504 except: # Installing a new plugin should never crash the application. 505 Logger.logException("e", "An exception occurred while installing plugin {path}".format(path = plugin_path)) 506 507 if plugin_id in self._disabled_plugins: 508 self._disabled_plugins.remove(plugin_id) 509 510 # Removes the given plugin. 511 def _removePlugin(self, plugin_id: str) -> None: 512 plugin_folder = os.path.join(Resources.getStoragePath(Resources.Resources), "plugins") 513 plugin_path = os.path.join(plugin_folder, plugin_id) 514 515 if plugin_id in self._bundled_plugin_cache: 516 del self.bundled_plugin_cache[plugin_id] 517 518 Logger.log("i", "Attempting to remove plugin '%s' from directory '%s'", plugin_id, plugin_path) 519 shutil.rmtree(plugin_path) 520 521#=============================================================================== 522# PRIVATE METHODS 523#=============================================================================== 524 525 def _getPluginIdFromFile(self, filename: str) -> Optional[str]: 526 plugin_id = None 527 try: 528 with zipfile.ZipFile(filename, "r") as zip_ref: 529 for file_info in zip_ref.infolist(): 530 if file_info.filename.endswith("/"): 531 plugin_id = file_info.filename.strip("/") 532 break 533 except zipfile.BadZipFile: 534 Logger.logException("e", "Failed to load plug-in file. The zip archive seems to be corrupt.") 535 return None # Signals that loading this failed. 536 except FileNotFoundError: 537 Logger.logException("e", "Failed to load plug-in file as we were unable to find it.") 538 return None # Signals that loading this failed. 539 return plugin_id 540 541 # Returns a list of all possible plugin ids in the plugin locations: 542 def _findInstalledPlugins(self, paths = None) -> List[str]: 543 plugin_ids = [] 544 545 if not paths: 546 paths = self._plugin_locations 547 548 for folder in paths: 549 try: 550 if not os.path.isdir(folder): 551 continue 552 553 for file in os.listdir(folder): 554 filepath = os.path.join(folder, file) 555 if os.path.isdir(filepath): 556 if os.path.isfile(os.path.join(filepath, "__init__.py")): 557 plugin_ids.append(file) 558 else: 559 plugin_ids += self._findInstalledPlugins([filepath]) 560 except EnvironmentError as err: 561 Logger.warning("Unable to read folder {folder}: {err}".format(folder = folder, err = err)) 562 continue 563 564 return plugin_ids 565 566 def _findPlugin(self, plugin_id: str) -> Optional[types.ModuleType]: 567 """Try to find a module implementing a plugin 568 569 :param plugin_id: The name of the plugin to find 570 :returns: module if it was found (and, if 'self._check_if_trusted' is set, also secure), None otherwise 571 """ 572 573 if plugin_id in self._found_plugins: 574 return self._found_plugins[plugin_id] 575 location = None 576 for folder in self._plugin_locations: 577 location = self._locatePlugin(plugin_id, folder) 578 if location: 579 break 580 581 if not location: 582 return None 583 584 try: 585 file, path, desc = imp.find_module(plugin_id, [location]) 586 except Exception: 587 Logger.logException("e", "Import error when importing %s", plugin_id) 588 return None 589 590 # Define a trusted plugin as either: already checked, correctly signed, or bundled with the application. 591 if self._check_if_trusted and plugin_id not in self._checked_plugin_ids and not self.isBundledPlugin(plugin_id): 592 593 # NOTE: '__pychache__'s (+ subfolders) are deleted on startup _before_ load module: 594 if not TrustBasics.removeCached(path): 595 self._distrusted_plugin_ids.append(plugin_id) 596 return None 597 598 # Do the actual check: 599 if self._trust_checker is not None and self._trust_checker.signedFolderCheck(path): 600 self._checked_plugin_ids.append(plugin_id) 601 else: 602 self._distrusted_plugin_ids.append(plugin_id) 603 return None 604 605 try: 606 module = imp.load_module(plugin_id, file, path, desc) #type: ignore #MyPy gets the wrong output type from imp.find_module for some reason. 607 except Exception: 608 Logger.logException("e", "Import error loading module %s", plugin_id) 609 return None 610 finally: 611 if file: 612 os.close(file) #type: ignore #MyPy gets the wrong output type from imp.find_module for some reason. 613 self._found_plugins[plugin_id] = module 614 return module 615 616 def _locatePlugin(self, plugin_id: str, folder: str) -> Optional[str]: 617 if not os.path.isdir(folder): 618 return None 619 620 # self._plugin_folder_cache is a per-plugin-location list of all subfolders that contain a __init__.py file 621 if folder not in self._plugin_folder_cache: 622 plugin_folders = [] 623 for root, dirs, files in os.walk(folder, topdown = True, followlinks = True): 624 # modify dirs in place to ignore .git, pycache and test folders completely 625 dirs[:] = [d for d in dirs if d not in plugin_path_ignore_list] 626 627 if "plugin.json" in files: 628 plugin_folders.append((root, os.path.basename(root))) 629 630 self._plugin_folder_cache[folder] = plugin_folders 631 632 for folder_path, folder_name in self._plugin_folder_cache[folder]: 633 if folder_name == plugin_id: 634 return os.path.abspath(os.path.join(folder_path, "..")) 635 636 return None 637 638 # Load the plugin data from the stream and in-place update the metadata. 639 def _parsePluginInfo(self, plugin_id, file_data, meta_data): 640 try: 641 meta_data["plugin"] = json.loads(file_data) 642 except json.decoder.JSONDecodeError: 643 Logger.logException("e", "Failed to parse plugin.json for plugin %s", plugin_id) 644 raise InvalidMetaDataError(plugin_id) 645 646 # Check if metadata is valid; 647 if "version" not in meta_data["plugin"]: 648 Logger.log("e", "Version must be set!") 649 raise InvalidMetaDataError(plugin_id) 650 651 # Check if the plugin states what API version it needs. 652 if "api" not in meta_data["plugin"] and "supported_sdk_versions" not in meta_data["plugin"]: 653 Logger.log("e", "The API or the supported_sdk_versions must be set!") 654 raise InvalidMetaDataError(plugin_id) 655 else: 656 # Store the api_version as a Version object. 657 all_supported_sdk_versions = [] # type: List[Version] 658 if "supported_sdk_versions" in meta_data["plugin"]: 659 all_supported_sdk_versions += [Version(supported_version) for supported_version in 660 meta_data["plugin"]["supported_sdk_versions"]] 661 if "api" in meta_data["plugin"]: 662 all_supported_sdk_versions += [Version(meta_data["plugin"]["api"])] 663 meta_data["plugin"]["supported_sdk_versions"] = all_supported_sdk_versions 664 665 if "i18n-catalog" in meta_data["plugin"]: 666 # A catalog was set, try to translate a few strings 667 i18n_catalog = i18nCatalog(meta_data["plugin"]["i18n-catalog"]) 668 if "name" in meta_data["plugin"]: 669 meta_data["plugin"]["name"] = i18n_catalog.i18n(meta_data["plugin"]["name"]) 670 if "description" in meta_data["plugin"]: 671 meta_data["plugin"]["description"] = i18n_catalog.i18n(meta_data["plugin"]["description"]) 672 673 def _populateMetaData(self, plugin_id: str) -> bool: 674 """Populate the list of metadata""" 675 676 plugin = self._findPlugin(plugin_id) 677 if not plugin: 678 Logger.log("w", "Could not find plugin %s", plugin_id) 679 return False 680 681 location = None 682 for folder in self._plugin_locations: 683 location = self._locatePlugin(plugin_id, folder) 684 if location: 685 break 686 687 if not location: 688 Logger.log("w", "Could not find plugin %s", plugin_id) 689 return False 690 location = os.path.join(location, plugin_id) 691 692 try: 693 meta_data = plugin.getMetaData() #type: ignore #We catch the AttributeError that this would raise if the module has no getMetaData function. 694 metadata_file = os.path.join(location, "plugin.json") 695 try: 696 with open(metadata_file, "r", encoding = "utf-8") as file_stream: 697 self._parsePluginInfo(plugin_id, file_stream.read(), meta_data) 698 except FileNotFoundError: 699 Logger.logException("e", "Unable to find the required plugin.json file for plugin %s", plugin_id) 700 raise InvalidMetaDataError(plugin_id) 701 except UnicodeDecodeError: 702 Logger.logException("e", "The plug-in metadata file for plug-in {plugin_id} is corrupt.".format(plugin_id = plugin_id)) 703 raise InvalidMetaDataError(plugin_id) 704 except EnvironmentError as e: 705 Logger.logException("e", "Can't open the metadata file for plug-in {plugin_id}: {err}".format(plugin_id = plugin_id, err = str(e))) 706 raise InvalidMetaDataError(plugin_id) 707 708 except AttributeError as e: 709 Logger.log("e", "Plug-in {plugin_id} has no getMetaData function to get metadata of the plug-in: {err}".format(plugin_id = plugin_id, err = str(e))) 710 raise InvalidMetaDataError(plugin_id) 711 except TypeError as e: 712 Logger.log("e", "Plug-in {plugin_id} has a getMetaData function with the wrong signature: {err}".format(plugin_id = plugin_id, err = str(e))) 713 raise InvalidMetaDataError(plugin_id) 714 715 if not meta_data: 716 raise InvalidMetaDataError(plugin_id) 717 718 meta_data["id"] = plugin_id 719 meta_data["location"] = location 720 721 # Application-specific overrides 722 appname = self._application.getApplicationName() 723 if appname in meta_data: 724 meta_data.update(meta_data[appname]) 725 del meta_data[appname] 726 727 self._metadata[plugin_id] = meta_data 728 return True 729 730 # Check if a certain dictionary contains a certain subset of key/value pairs 731 # \param dictionary \type{dict} The dictionary to search 732 # \param subset \type{dict} The subset to search for 733 def _subsetInDict(self, dictionary: Dict[Any, Any], subset: Dict[Any, Any]) -> bool: 734 for key in subset: 735 if key not in dictionary: 736 return False 737 if subset[key] != {} and dictionary[key] != subset[key]: 738 return False 739 return True 740 741 def getPluginObject(self, plugin_id: str) -> PluginObject: 742 """Get a specific plugin object given an ID. If not loaded, load it. 743 744 :param plugin_id: The ID of the plugin object to get. 745 """ 746 747 if plugin_id not in self._plugins: 748 self.loadPlugin(plugin_id) 749 if plugin_id not in self._plugin_objects: 750 raise PluginNotFoundError(plugin_id) 751 return self._plugin_objects[plugin_id] 752 753 def _addPluginObject(self, plugin_object: PluginObject, plugin_id: str, plugin_type: str) -> None: 754 plugin_object.setPluginId(plugin_id) 755 self._plugin_objects[plugin_id] = plugin_object 756 try: 757 self._type_register_map[plugin_type](plugin_object) 758 except Exception as e: 759 Logger.logException("e", "Unable to add plugin %s", plugin_id) 760 761 def addSupportedPluginExtension(self, extension: str, description: str) -> None: 762 if extension not in self._supported_file_types: 763 self._supported_file_types[extension] = description 764 self.supportedPluginExtensionsChanged.emit() 765 766 supportedPluginExtensionsChanged = pyqtSignal() 767 768 @pyqtProperty("QStringList", notify=supportedPluginExtensionsChanged) 769 def supportedPluginExtensions(self) -> List[str]: 770 file_types = [] 771 all_types = [] 772 773 if Platform.isLinux(): 774 for ext, desc in self._supported_file_types.items(): 775 file_types.append("{0} (*.{1} *.{2})".format(desc, ext.lower(), ext.upper())) 776 all_types.append("*.{0} *.{1}".format(ext.lower(), ext.upper())) 777 else: 778 for ext, desc in self._supported_file_types.items(): 779 file_types.append("{0} (*.{1})".format(desc, ext)) 780 all_types.append("*.{0}".format(ext)) 781 782 file_types.sort() 783 file_types.insert(0, i18n_catalog.i18nc("@item:inlistbox", "All Supported Types ({0})", " ".join(all_types))) 784 file_types.append(i18n_catalog.i18nc("@item:inlistbox", "All Files (*)")) 785 return file_types 786 787 def getPluginPath(self, plugin_id: str) -> Optional[str]: 788 """Get the path to a plugin. 789 790 :param plugin_id: The PluginObject.getPluginId() of the plugin. 791 :return: The absolute path to the plugin or an empty string if the plugin could not be found. 792 """ 793 794 if plugin_id in self._plugins: 795 plugin = self._plugins.get(plugin_id) 796 else: 797 plugin = self._findPlugin(plugin_id) 798 799 if not plugin: 800 return None 801 802 path = os.path.dirname(self._plugins[plugin_id].__file__) 803 if os.path.isdir(path): 804 return path 805 806 return None 807 808 @classmethod 809 def addType(cls, plugin_type: str, register_function: Callable[[Any], None]) -> None: 810 """Add a new plugin type. 811 812 This function is used to add new plugin types. Plugin types are simple 813 string identifiers that match a certain plugin to a registration function. 814 815 The callable `register_function` is responsible for handling the object. 816 Usually it will add the object to a list of objects in the relevant class. 817 For example, the plugin type 'tool' has Controller::addTool as register 818 function. 819 820 `register_function` will be called every time a plugin of `type` is loaded. 821 822 :param plugin_type: The name of the plugin type to add. 823 :param register_function: A callable that takes an object as parameter. 824 """ 825 826 cls._type_register_map[plugin_type] = register_function 827 828 @classmethod 829 def removeType(cls, plugin_type: str) -> None: 830 """Remove a plugin type. 831 832 :param plugin_type: The plugin type to remove. 833 """ 834 835 if plugin_type in cls._type_register_map: 836 del cls._type_register_map[plugin_type] 837 838 _type_register_map = {} # type: Dict[str, Callable[[Any], None]] 839 __instance = None # type: PluginRegistry 840 841 @classmethod 842 def getInstance(cls, *args, **kwargs) -> "PluginRegistry": 843 return cls.__instance 844