1# -*- coding: utf-8 -*- 2# 3# Picard, the next-generation MusicBrainz tagger 4# 5# Copyright (C) 2007 Lukáš Lalinský 6# Copyright (C) 2014 Shadab Zafar 7# Copyright (C) 2015-2019 Laurent Monin 8# Copyright (C) 2019 Wieland Hoffmann 9# Copyright (C) 2019-2020 Philipp Wolfer 10# 11# This program is free software; you can redistribute it and/or 12# modify it under the terms of the GNU General Public License 13# as published by the Free Software Foundation; either version 2 14# of the License, or (at your option) any later version. 15# 16# This program is distributed in the hope that it will be useful, 17# but WITHOUT ANY WARRANTY; without even the implied warranty of 18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 19# GNU General Public License for more details. 20# 21# You should have received a copy of the GNU General Public License 22# along with this program; if not, write to the Free Software 23# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 24 25 26from functools import partial 27import imp 28import importlib 29import json 30import os.path 31import shutil 32import tempfile 33import zipfile 34import zipimport 35 36from PyQt5 import QtCore 37 38from picard import log 39from picard.const import ( 40 PLUGINS_API, 41 USER_PLUGIN_DIR, 42) 43from picard.plugin import ( 44 _PLUGIN_MODULE_PREFIX, 45 PluginData, 46 PluginWrapper, 47 _unregister_module_extensions, 48) 49import picard.plugins 50from picard.version import ( 51 Version, 52 VersionError, 53) 54 55 56_SUFFIXES = tuple(importlib.machinery.all_suffixes()) 57_PACKAGE_ENTRIES = ("__init__.py", "__init__.pyc", "__init__.pyo") 58_PLUGIN_PACKAGE_SUFFIX = ".picard" 59_PLUGIN_PACKAGE_SUFFIX_LEN = len(_PLUGIN_PACKAGE_SUFFIX) 60_FILEEXTS = ('.py', '.pyc', '.pyo', '.zip') 61_UPDATE_SUFFIX = '.update' 62_UPDATE_SUFFIX_LEN = len(_UPDATE_SUFFIX) 63 64 65_extension_points = [] 66 67 68def is_update(path): 69 return path.endswith(_UPDATE_SUFFIX) 70 71 72def strip_update_suffix(path): 73 if not is_update(path): 74 return path 75 return path[:-_UPDATE_SUFFIX_LEN] 76 77 78def is_zip(path): 79 return path.endswith('.zip') 80 81 82def strip_zip_suffix(path): 83 if not is_zip(path): 84 return path 85 return path[:-4] 86 87 88def is_package(path): 89 return path.endswith(_PLUGIN_PACKAGE_SUFFIX) 90 91 92def strip_package_suffix(path): 93 if not is_package(path): 94 return path 95 return path[:-_PLUGIN_PACKAGE_SUFFIX_LEN] 96 97 98def is_zipped_package(path): 99 return path.endswith(_PLUGIN_PACKAGE_SUFFIX + '.zip') 100 101 102def _plugin_name_from_path(path): 103 path = os.path.normpath(path) 104 if is_zip(path): 105 name = os.path.basename(strip_zip_suffix(path)) 106 if is_package(name): 107 return strip_package_suffix(name) 108 else: 109 return name 110 elif os.path.isdir(path): 111 for entry in _PACKAGE_ENTRIES: 112 if os.path.isfile(os.path.join(path, entry)): 113 return os.path.basename(path) 114 else: 115 file = os.path.basename(path) 116 if file in _PACKAGE_ENTRIES: 117 return None 118 name, ext = os.path.splitext(file) 119 if ext in _SUFFIXES: 120 return name 121 return None 122 123 124def load_manifest(archive_path): 125 archive = zipfile.ZipFile(archive_path) 126 manifest_data = None 127 with archive.open('MANIFEST.json') as f: 128 manifest_data = json.loads(str(f.read().decode())) 129 return manifest_data 130 131 132def zip_import(path): 133 if (not is_zip(path) or not os.path.isfile(path)): 134 return (None, None, None) 135 try: 136 zip_importer = zipimport.zipimporter(path) 137 plugin_name = _plugin_name_from_path(path) 138 manifest_data = None 139 if is_zipped_package(path): 140 try: 141 manifest_data = load_manifest(path) 142 except Exception as why: 143 log.warning("Failed to load manifest data from json: %s", why) 144 return (zip_importer, plugin_name, manifest_data) 145 except zipimport.ZipImportError as why: 146 log.error("ZIP import error: %s", why) 147 return (None, None, None) 148 149 150def _compatible_api_versions(api_versions): 151 versions = [Version.from_string(v) for v in list(api_versions)] 152 return set(versions) & set(picard.api_versions_tuple) 153 154 155class PluginManager(QtCore.QObject): 156 157 plugin_installed = QtCore.pyqtSignal(PluginWrapper, bool) 158 plugin_updated = QtCore.pyqtSignal(str, bool) 159 plugin_removed = QtCore.pyqtSignal(str, bool) 160 plugin_errored = QtCore.pyqtSignal(str, str, bool) 161 162 def __init__(self, plugins_directory=None): 163 super().__init__() 164 self.plugins = [] 165 self._available_plugins = None # None=never loaded, [] = empty 166 if plugins_directory is None: 167 plugins_directory = USER_PLUGIN_DIR 168 self.plugins_directory = os.path.normpath(plugins_directory) 169 170 @property 171 def available_plugins(self): 172 return self._available_plugins 173 174 def plugin_error(self, name, error, *args, **kwargs): 175 """Log a plugin loading error for the plugin `name` and signal the 176 error via the `plugin_errored` signal. 177 178 A string consisting of all `args` interpolated into `error` will be 179 passed to the function given via the `log_func` keyword argument 180 (default: log.error) and as the error message to the `plugin_errored` 181 signal.""" 182 error = error % args 183 log_func = kwargs.get('log_func', log.error) 184 log_func(error) 185 self.plugin_errored.emit(name, error, False) 186 187 def _marked_for_update(self): 188 for file in os.listdir(self.plugins_directory): 189 if file.endswith(_UPDATE_SUFFIX): 190 source_path = os.path.join(self.plugins_directory, file) 191 target_path = strip_update_suffix(source_path) 192 plugin_name = _plugin_name_from_path(target_path) 193 if plugin_name: 194 yield (source_path, target_path, plugin_name) 195 else: 196 log.error('Cannot get plugin name from %r', source_path) 197 198 def handle_plugin_updates(self): 199 for source_path, target_path, plugin_name in self._marked_for_update(): 200 self._remove_plugin(plugin_name) 201 os.rename(source_path, target_path) 202 log.debug('Updating plugin %r (%r))', plugin_name, target_path) 203 204 def load_plugins_from_directory(self, plugindir): 205 plugindir = os.path.normpath(plugindir) 206 if not os.path.isdir(plugindir): 207 log.info("Plugin directory %r doesn't exist", plugindir) 208 return 209 if plugindir == self.plugins_directory: 210 # .update trick is only for plugins installed through the Picard UI 211 # and only for plugins in plugins_directory (USER_PLUGIN_DIR by default) 212 self.handle_plugin_updates() 213 # now load found plugins 214 names = set() 215 for path in [os.path.join(plugindir, file) for file in os.listdir(plugindir)]: 216 name = _plugin_name_from_path(path) 217 if name: 218 names.add(name) 219 log.debug("Looking for plugins in directory %r, %d names found", 220 plugindir, 221 len(names)) 222 for name in sorted(names): 223 try: 224 self._load_plugin_from_directory(name, plugindir) 225 except Exception: 226 self.plugin_error(name, _("Unable to load plugin '%s'"), name, log_func=log.exception) 227 228 def _get_plugin_index_by_name(self, name): 229 for index, plugin in enumerate(self.plugins): 230 if name == plugin.module_name: 231 return (plugin, index) 232 return (None, None) 233 234 def _load_plugin_from_directory(self, name, plugindir): 235 module_file = None 236 zipfilename = os.path.join(plugindir, name + '.zip') 237 (zip_importer, module_name, manifest_data) = zip_import(zipfilename) 238 if zip_importer: 239 name = module_name 240 if not zip_importer.find_module(name): 241 error = _("Failed loading zipped plugin %r from %r") 242 self.plugin_error(name, error, name, zipfilename) 243 return None 244 module_pathname = zip_importer.get_filename(name) 245 else: 246 try: 247 info = imp.find_module(name, [plugindir]) 248 module_file = info[0] 249 module_pathname = info[1] 250 except ImportError: 251 error = _("Failed loading plugin %r in %r") 252 self.plugin_error(name, error, name, [plugindir]) 253 return None 254 255 plugin = None 256 try: 257 existing_plugin, existing_plugin_index = self._get_plugin_index_by_name(name) 258 if existing_plugin: 259 log.warning("Module %r conflict: unregistering previously" 260 " loaded %r version %s from %r", 261 existing_plugin.module_name, 262 existing_plugin.name, 263 existing_plugin.version, 264 existing_plugin.file) 265 _unregister_module_extensions(name) 266 full_module_name = _PLUGIN_MODULE_PREFIX + name 267 if zip_importer: 268 plugin_module = zip_importer.load_module(full_module_name) 269 else: 270 plugin_module = imp.load_module(full_module_name, *info) 271 plugin = PluginWrapper(plugin_module, plugindir, 272 file=module_pathname, manifest_data=manifest_data) 273 compatible_versions = _compatible_api_versions(plugin.api_versions) 274 if compatible_versions: 275 log.debug("Loading plugin %r version %s, compatible with API: %s", 276 plugin.name, 277 plugin.version, 278 ", ".join([v.to_string(short=True) for v in 279 sorted(compatible_versions)])) 280 plugin.compatible = True 281 setattr(picard.plugins, name, plugin_module) 282 if existing_plugin: 283 self.plugins[existing_plugin_index] = plugin 284 else: 285 self.plugins.append(plugin) 286 else: 287 error = _("Plugin '%s' from '%s' is not compatible with this " 288 "version of Picard.") % (plugin.name, plugin.file) 289 self.plugin_error(plugin.name, error, log_func=log.warning) 290 except VersionError as e: 291 error = _("Plugin %r has an invalid API version string : %s") 292 self.plugin_error(name, error, name, e) 293 except BaseException: 294 error = _("Plugin %r") 295 self.plugin_error(name, error, name, log_func=log.exception) 296 if module_file is not None: 297 module_file.close() 298 return plugin 299 300 def _get_existing_paths(self, plugin_name, fileexts): 301 dirpath = os.path.join(self.plugins_directory, plugin_name) 302 if not os.path.isdir(dirpath): 303 dirpath = None 304 filepaths = [os.path.join(self.plugins_directory, f) 305 for f in os.listdir(self.plugins_directory) 306 if f in [plugin_name + ext for ext in fileexts] 307 ] 308 return (dirpath, filepaths) 309 310 def _remove_plugin_files(self, plugin_name, with_update=False): 311 plugin_name = strip_zip_suffix(plugin_name) 312 log.debug("Remove plugin files and dirs : %r", plugin_name) 313 dirpath, filepaths = self._get_existing_paths(plugin_name, _FILEEXTS) 314 if dirpath: 315 if os.path.islink(dirpath): 316 log.debug("Removing symlink %r", dirpath) 317 os.remove(dirpath) 318 elif os.path.isdir(dirpath): 319 log.debug("Removing directory %r", dirpath) 320 shutil.rmtree(dirpath) 321 if filepaths: 322 for filepath in filepaths: 323 log.debug("Removing file %r", filepath) 324 os.remove(filepath) 325 if with_update: 326 update = filepath + _UPDATE_SUFFIX 327 if os.path.isfile(update): 328 log.debug("Removing file %r", update) 329 os.remove(update) 330 331 def _remove_plugin(self, plugin_name, with_update=False): 332 self._remove_plugin_files(plugin_name, with_update) 333 _unregister_module_extensions(plugin_name) 334 self.plugins = [p for p in self.plugins if p.module_name != plugin_name] 335 336 def remove_plugin(self, plugin_name, with_update=False): 337 self._remove_plugin(plugin_name, with_update=with_update) 338 self.plugin_removed.emit(plugin_name, False) 339 340 def _install_plugin_zip(self, plugin_name, plugin_data, update=False): 341 # zipped module from download 342 zip_plugin = plugin_name + '.zip' 343 dst = os.path.join(self.plugins_directory, zip_plugin) 344 if update: 345 dst += _UPDATE_SUFFIX 346 if os.path.isfile(dst): 347 os.remove(dst) 348 with tempfile.NamedTemporaryFile(dir=self.plugins_directory) as zipfile: 349 zipfile.write(plugin_data) 350 zipfile.flush() 351 os.fsync(zipfile.fileno()) 352 try: 353 os.link(zipfile.name, dst) 354 except OSError: 355 with open(dst, 'wb') as dstfile: 356 zipfile.seek(0) 357 shutil.copyfileobj(zipfile, dstfile) 358 log.debug("Plugin (zipped) saved to %r", dst) 359 360 def _install_plugin_file(self, path, update=False): 361 dst = os.path.join(self.plugins_directory, os.path.basename(path)) 362 if update: 363 dst += _UPDATE_SUFFIX 364 if os.path.isfile(dst): 365 os.remove(dst) 366 shutil.copy2(path, dst) 367 log.debug("Plugin (file) saved to %r", dst) 368 369 def _install_plugin_dir(self, plugin_name, path, update=False): 370 dst = os.path.join(self.plugins_directory, plugin_name) 371 if update: 372 dst += _UPDATE_SUFFIX 373 if os.path.isdir(dst): 374 shutil.rmtree(dst) 375 shutil.copytree(path, dst) 376 log.debug("Plugin (directory) saved to %r", dst) 377 378 def install_plugin(self, path, update=False, plugin_name=None, plugin_data=None): 379 """ 380 path is either: 381 1) /some/dir/name.py 382 2) /some/dir/name (directory containing __init__.py) 383 3) /some/dir/name.zip (containing either 1 or 2) 384 385 """ 386 assert path or plugin_name, "path is required if plugin_name is empty" 387 388 if not plugin_name: 389 plugin_name = _plugin_name_from_path(path) 390 if plugin_name: 391 try: 392 if plugin_data: 393 self._install_plugin_zip(plugin_name, plugin_data, update=update) 394 elif os.path.isfile(path): 395 self._install_plugin_file(path, update=update) 396 elif os.path.isdir(path): 397 self._install_plugin_dir(plugin_name, path, update=update) 398 except OSError as why: 399 log.error("Unable to copy plugin '%s' to %r: %s" % (plugin_name, self.plugins_directory, why)) 400 return 401 402 if not update: 403 try: 404 installed_plugin = self._load_plugin_from_directory(plugin_name, self.plugins_directory) 405 if not installed_plugin: 406 raise RuntimeError("Failed loading newly installed plugin %s" % plugin_name) 407 except Exception as e: 408 log.error("Unable to load plugin '%s': %s", plugin_name, e) 409 self._remove_plugin(plugin_name) 410 else: 411 self.plugin_installed.emit(installed_plugin, False) 412 else: 413 self.plugin_updated.emit(plugin_name, False) 414 415 def query_available_plugins(self, callback=None): 416 self.tagger.webservice.get( 417 PLUGINS_API['host'], 418 PLUGINS_API['port'], 419 PLUGINS_API['endpoint']['plugins'], 420 partial(self._plugins_json_loaded, callback=callback), 421 priority=True, 422 important=True 423 ) 424 425 def is_available(self, plugin_name): 426 return any(p.module_name == plugin_name for p in self._available_plugins) 427 428 def _plugins_json_loaded(self, response, reply, error, callback=None): 429 if error: 430 self.tagger.window.set_statusbar_message( 431 N_("Error loading plugins list: %(error)s"), 432 {'error': reply.errorString()}, 433 echo=log.error 434 ) 435 self._available_plugins = [] 436 else: 437 try: 438 self._available_plugins = [PluginData(data, key) for key, data in 439 response['plugins'].items() 440 if _compatible_api_versions(data['api_versions'])] 441 except (AttributeError, KeyError, TypeError): 442 self._available_plugins = [] 443 if callback: 444 callback() 445 446 # pylint: disable=no-self-use 447 def enabled(self, name): 448 return True 449