1# -*- coding: utf-8 -*-
2#
3# Picard, the next-generation MusicBrainz tagger
4#
5# Copyright (C) 2007 Lukáš Lalinský
6# Copyright (C) 2014 Shadab Zafar
7# Copyright (C) 2015-2019 Laurent Monin
8# Copyright (C) 2019 Wieland Hoffmann
9# Copyright (C) 2019-2020 Philipp Wolfer
10#
11# This program is free software; you can redistribute it and/or
12# modify it under the terms of the GNU General Public License
13# as published by the Free Software Foundation; either version 2
14# of the License, or (at your option) any later version.
15#
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19# GNU General Public License for more details.
20#
21# You should have received a copy of the GNU General Public License
22# along with this program; if not, write to the Free Software
23# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
24
25
26from functools import partial
27import imp
28import importlib
29import json
30import os.path
31import shutil
32import tempfile
33import zipfile
34import zipimport
35
36from PyQt5 import QtCore
37
38from picard import log
39from picard.const import (
40    PLUGINS_API,
41    USER_PLUGIN_DIR,
42)
43from picard.plugin import (
44    _PLUGIN_MODULE_PREFIX,
45    PluginData,
46    PluginWrapper,
47    _unregister_module_extensions,
48)
49import picard.plugins
50from picard.version import (
51    Version,
52    VersionError,
53)
54
55
56_SUFFIXES = tuple(importlib.machinery.all_suffixes())
57_PACKAGE_ENTRIES = ("__init__.py", "__init__.pyc", "__init__.pyo")
58_PLUGIN_PACKAGE_SUFFIX = ".picard"
59_PLUGIN_PACKAGE_SUFFIX_LEN = len(_PLUGIN_PACKAGE_SUFFIX)
60_FILEEXTS = ('.py', '.pyc', '.pyo', '.zip')
61_UPDATE_SUFFIX = '.update'
62_UPDATE_SUFFIX_LEN = len(_UPDATE_SUFFIX)
63
64
65_extension_points = []
66
67
68def is_update(path):
69    return path.endswith(_UPDATE_SUFFIX)
70
71
72def strip_update_suffix(path):
73    if not is_update(path):
74        return path
75    return path[:-_UPDATE_SUFFIX_LEN]
76
77
78def is_zip(path):
79    return path.endswith('.zip')
80
81
82def strip_zip_suffix(path):
83    if not is_zip(path):
84        return path
85    return path[:-4]
86
87
88def is_package(path):
89    return path.endswith(_PLUGIN_PACKAGE_SUFFIX)
90
91
92def strip_package_suffix(path):
93    if not is_package(path):
94        return path
95    return path[:-_PLUGIN_PACKAGE_SUFFIX_LEN]
96
97
98def is_zipped_package(path):
99    return path.endswith(_PLUGIN_PACKAGE_SUFFIX + '.zip')
100
101
102def _plugin_name_from_path(path):
103    path = os.path.normpath(path)
104    if is_zip(path):
105        name = os.path.basename(strip_zip_suffix(path))
106        if is_package(name):
107            return strip_package_suffix(name)
108        else:
109            return name
110    elif os.path.isdir(path):
111        for entry in _PACKAGE_ENTRIES:
112            if os.path.isfile(os.path.join(path, entry)):
113                return os.path.basename(path)
114    else:
115        file = os.path.basename(path)
116        if file in _PACKAGE_ENTRIES:
117            return None
118        name, ext = os.path.splitext(file)
119        if ext in _SUFFIXES:
120            return name
121        return None
122
123
124def load_manifest(archive_path):
125    archive = zipfile.ZipFile(archive_path)
126    manifest_data = None
127    with archive.open('MANIFEST.json') as f:
128        manifest_data = json.loads(str(f.read().decode()))
129    return manifest_data
130
131
132def zip_import(path):
133    if (not is_zip(path) or not os.path.isfile(path)):
134        return (None, None, None)
135    try:
136        zip_importer = zipimport.zipimporter(path)
137        plugin_name = _plugin_name_from_path(path)
138        manifest_data = None
139        if is_zipped_package(path):
140            try:
141                manifest_data = load_manifest(path)
142            except Exception as why:
143                log.warning("Failed to load manifest data from json: %s", why)
144        return (zip_importer, plugin_name, manifest_data)
145    except zipimport.ZipImportError as why:
146        log.error("ZIP import error: %s", why)
147        return (None, None, None)
148
149
150def _compatible_api_versions(api_versions):
151    versions = [Version.from_string(v) for v in list(api_versions)]
152    return set(versions) & set(picard.api_versions_tuple)
153
154
155class PluginManager(QtCore.QObject):
156
157    plugin_installed = QtCore.pyqtSignal(PluginWrapper, bool)
158    plugin_updated = QtCore.pyqtSignal(str, bool)
159    plugin_removed = QtCore.pyqtSignal(str, bool)
160    plugin_errored = QtCore.pyqtSignal(str, str, bool)
161
162    def __init__(self, plugins_directory=None):
163        super().__init__()
164        self.plugins = []
165        self._available_plugins = None  # None=never loaded, [] = empty
166        if plugins_directory is None:
167            plugins_directory = USER_PLUGIN_DIR
168        self.plugins_directory = os.path.normpath(plugins_directory)
169
170    @property
171    def available_plugins(self):
172        return self._available_plugins
173
174    def plugin_error(self, name, error, *args, **kwargs):
175        """Log a plugin loading error for the plugin `name` and signal the
176        error via the `plugin_errored` signal.
177
178        A string consisting of all `args` interpolated into `error` will be
179        passed to the function given via the `log_func` keyword argument
180        (default: log.error) and as the error message to the `plugin_errored`
181        signal."""
182        error = error % args
183        log_func = kwargs.get('log_func', log.error)
184        log_func(error)
185        self.plugin_errored.emit(name, error, False)
186
187    def _marked_for_update(self):
188        for file in os.listdir(self.plugins_directory):
189            if file.endswith(_UPDATE_SUFFIX):
190                source_path = os.path.join(self.plugins_directory, file)
191                target_path = strip_update_suffix(source_path)
192                plugin_name = _plugin_name_from_path(target_path)
193                if plugin_name:
194                    yield (source_path, target_path, plugin_name)
195                else:
196                    log.error('Cannot get plugin name from %r', source_path)
197
198    def handle_plugin_updates(self):
199        for source_path, target_path, plugin_name in self._marked_for_update():
200            self._remove_plugin(plugin_name)
201            os.rename(source_path, target_path)
202            log.debug('Updating plugin %r (%r))', plugin_name, target_path)
203
204    def load_plugins_from_directory(self, plugindir):
205        plugindir = os.path.normpath(plugindir)
206        if not os.path.isdir(plugindir):
207            log.info("Plugin directory %r doesn't exist", plugindir)
208            return
209        if plugindir == self.plugins_directory:
210            # .update trick is only for plugins installed through the Picard UI
211            # and only for plugins in plugins_directory (USER_PLUGIN_DIR by default)
212            self.handle_plugin_updates()
213        # now load found plugins
214        names = set()
215        for path in [os.path.join(plugindir, file) for file in os.listdir(plugindir)]:
216            name = _plugin_name_from_path(path)
217            if name:
218                names.add(name)
219        log.debug("Looking for plugins in directory %r, %d names found",
220                  plugindir,
221                  len(names))
222        for name in sorted(names):
223            try:
224                self._load_plugin_from_directory(name, plugindir)
225            except Exception:
226                self.plugin_error(name, _("Unable to load plugin '%s'"), name, log_func=log.exception)
227
228    def _get_plugin_index_by_name(self, name):
229        for index, plugin in enumerate(self.plugins):
230            if name == plugin.module_name:
231                return (plugin, index)
232        return (None, None)
233
234    def _load_plugin_from_directory(self, name, plugindir):
235        module_file = None
236        zipfilename = os.path.join(plugindir, name + '.zip')
237        (zip_importer, module_name, manifest_data) = zip_import(zipfilename)
238        if zip_importer:
239            name = module_name
240            if not zip_importer.find_module(name):
241                error = _("Failed loading zipped plugin %r from %r")
242                self.plugin_error(name, error, name, zipfilename)
243                return None
244            module_pathname = zip_importer.get_filename(name)
245        else:
246            try:
247                info = imp.find_module(name, [plugindir])
248                module_file = info[0]
249                module_pathname = info[1]
250            except ImportError:
251                error = _("Failed loading plugin %r in %r")
252                self.plugin_error(name, error, name, [plugindir])
253                return None
254
255        plugin = None
256        try:
257            existing_plugin, existing_plugin_index = self._get_plugin_index_by_name(name)
258            if existing_plugin:
259                log.warning("Module %r conflict: unregistering previously"
260                            " loaded %r version %s from %r",
261                            existing_plugin.module_name,
262                            existing_plugin.name,
263                            existing_plugin.version,
264                            existing_plugin.file)
265                _unregister_module_extensions(name)
266            full_module_name = _PLUGIN_MODULE_PREFIX + name
267            if zip_importer:
268                plugin_module = zip_importer.load_module(full_module_name)
269            else:
270                plugin_module = imp.load_module(full_module_name, *info)
271            plugin = PluginWrapper(plugin_module, plugindir,
272                                   file=module_pathname, manifest_data=manifest_data)
273            compatible_versions = _compatible_api_versions(plugin.api_versions)
274            if compatible_versions:
275                log.debug("Loading plugin %r version %s, compatible with API: %s",
276                          plugin.name,
277                          plugin.version,
278                          ", ".join([v.to_string(short=True) for v in
279                                     sorted(compatible_versions)]))
280                plugin.compatible = True
281                setattr(picard.plugins, name, plugin_module)
282                if existing_plugin:
283                    self.plugins[existing_plugin_index] = plugin
284                else:
285                    self.plugins.append(plugin)
286            else:
287                error = _("Plugin '%s' from '%s' is not compatible with this "
288                          "version of Picard.") % (plugin.name, plugin.file)
289                self.plugin_error(plugin.name, error, log_func=log.warning)
290        except VersionError as e:
291            error = _("Plugin %r has an invalid API version string : %s")
292            self.plugin_error(name, error, name, e)
293        except BaseException:
294            error = _("Plugin %r")
295            self.plugin_error(name, error, name, log_func=log.exception)
296        if module_file is not None:
297            module_file.close()
298        return plugin
299
300    def _get_existing_paths(self, plugin_name, fileexts):
301        dirpath = os.path.join(self.plugins_directory, plugin_name)
302        if not os.path.isdir(dirpath):
303            dirpath = None
304        filepaths = [os.path.join(self.plugins_directory, f)
305                     for f in os.listdir(self.plugins_directory)
306                     if f in [plugin_name + ext for ext in fileexts]
307                     ]
308        return (dirpath, filepaths)
309
310    def _remove_plugin_files(self, plugin_name, with_update=False):
311        plugin_name = strip_zip_suffix(plugin_name)
312        log.debug("Remove plugin files and dirs : %r", plugin_name)
313        dirpath, filepaths = self._get_existing_paths(plugin_name, _FILEEXTS)
314        if dirpath:
315            if os.path.islink(dirpath):
316                log.debug("Removing symlink %r", dirpath)
317                os.remove(dirpath)
318            elif os.path.isdir(dirpath):
319                log.debug("Removing directory %r", dirpath)
320                shutil.rmtree(dirpath)
321        if filepaths:
322            for filepath in filepaths:
323                log.debug("Removing file %r", filepath)
324                os.remove(filepath)
325                if with_update:
326                    update = filepath + _UPDATE_SUFFIX
327                    if os.path.isfile(update):
328                        log.debug("Removing file %r", update)
329                        os.remove(update)
330
331    def _remove_plugin(self, plugin_name, with_update=False):
332        self._remove_plugin_files(plugin_name, with_update)
333        _unregister_module_extensions(plugin_name)
334        self.plugins = [p for p in self.plugins if p.module_name != plugin_name]
335
336    def remove_plugin(self, plugin_name, with_update=False):
337        self._remove_plugin(plugin_name, with_update=with_update)
338        self.plugin_removed.emit(plugin_name, False)
339
340    def _install_plugin_zip(self, plugin_name, plugin_data, update=False):
341        # zipped module from download
342        zip_plugin = plugin_name + '.zip'
343        dst = os.path.join(self.plugins_directory, zip_plugin)
344        if update:
345            dst += _UPDATE_SUFFIX
346            if os.path.isfile(dst):
347                os.remove(dst)
348        with tempfile.NamedTemporaryFile(dir=self.plugins_directory) as zipfile:
349            zipfile.write(plugin_data)
350            zipfile.flush()
351            os.fsync(zipfile.fileno())
352            try:
353                os.link(zipfile.name, dst)
354            except OSError:
355                with open(dst, 'wb') as dstfile:
356                    zipfile.seek(0)
357                    shutil.copyfileobj(zipfile, dstfile)
358            log.debug("Plugin (zipped) saved to %r", dst)
359
360    def _install_plugin_file(self, path, update=False):
361        dst = os.path.join(self.plugins_directory, os.path.basename(path))
362        if update:
363            dst += _UPDATE_SUFFIX
364            if os.path.isfile(dst):
365                os.remove(dst)
366        shutil.copy2(path, dst)
367        log.debug("Plugin (file) saved to %r", dst)
368
369    def _install_plugin_dir(self, plugin_name, path, update=False):
370        dst = os.path.join(self.plugins_directory, plugin_name)
371        if update:
372            dst += _UPDATE_SUFFIX
373            if os.path.isdir(dst):
374                shutil.rmtree(dst)
375        shutil.copytree(path, dst)
376        log.debug("Plugin (directory) saved to %r", dst)
377
378    def install_plugin(self, path, update=False, plugin_name=None, plugin_data=None):
379        """
380            path is either:
381                1) /some/dir/name.py
382                2) /some/dir/name (directory containing __init__.py)
383                3) /some/dir/name.zip (containing either 1 or 2)
384
385        """
386        assert path or plugin_name, "path is required if plugin_name is empty"
387
388        if not plugin_name:
389            plugin_name = _plugin_name_from_path(path)
390        if plugin_name:
391            try:
392                if plugin_data:
393                    self._install_plugin_zip(plugin_name, plugin_data, update=update)
394                elif os.path.isfile(path):
395                    self._install_plugin_file(path, update=update)
396                elif os.path.isdir(path):
397                    self._install_plugin_dir(plugin_name, path, update=update)
398            except OSError as why:
399                log.error("Unable to copy plugin '%s' to %r: %s" % (plugin_name, self.plugins_directory, why))
400                return
401
402            if not update:
403                try:
404                    installed_plugin = self._load_plugin_from_directory(plugin_name, self.plugins_directory)
405                    if not installed_plugin:
406                        raise RuntimeError("Failed loading newly installed plugin %s" % plugin_name)
407                except Exception as e:
408                    log.error("Unable to load plugin '%s': %s", plugin_name, e)
409                    self._remove_plugin(plugin_name)
410                else:
411                    self.plugin_installed.emit(installed_plugin, False)
412            else:
413                self.plugin_updated.emit(plugin_name, False)
414
415    def query_available_plugins(self, callback=None):
416        self.tagger.webservice.get(
417            PLUGINS_API['host'],
418            PLUGINS_API['port'],
419            PLUGINS_API['endpoint']['plugins'],
420            partial(self._plugins_json_loaded, callback=callback),
421            priority=True,
422            important=True
423        )
424
425    def is_available(self, plugin_name):
426        return any(p.module_name == plugin_name for p in self._available_plugins)
427
428    def _plugins_json_loaded(self, response, reply, error, callback=None):
429        if error:
430            self.tagger.window.set_statusbar_message(
431                N_("Error loading plugins list: %(error)s"),
432                {'error': reply.errorString()},
433                echo=log.error
434            )
435            self._available_plugins = []
436        else:
437            try:
438                self._available_plugins = [PluginData(data, key) for key, data in
439                                           response['plugins'].items()
440                                           if _compatible_api_versions(data['api_versions'])]
441            except (AttributeError, KeyError, TypeError):
442                self._available_plugins = []
443        if callback:
444            callback()
445
446    # pylint: disable=no-self-use
447    def enabled(self, name):
448        return True
449