1"""Plugin loading and management logic and classes."""
2import logging
3from typing import Any
4from typing import Dict
5from typing import List
6from typing import Optional
7from typing import Set
8
9from flake8 import exceptions
10from flake8 import utils
11from flake8._compat import importlib_metadata
12
13LOG = logging.getLogger(__name__)
14
15__all__ = ("Checkers", "Plugin", "PluginManager", "ReportFormatters")
16
17NO_GROUP_FOUND = object()
18
19
20class Plugin:
21    """Wrap an EntryPoint from setuptools and other logic."""
22
23    def __init__(self, name, entry_point, local=False):
24        """Initialize our Plugin.
25
26        :param str name:
27            Name of the entry-point as it was registered with setuptools.
28        :param entry_point:
29            EntryPoint returned by setuptools.
30        :type entry_point:
31            setuptools.EntryPoint
32        :param bool local:
33            Is this a repo-local plugin?
34        """
35        self.name = name
36        self.entry_point = entry_point
37        self.local = local
38        self._plugin: Any = None
39        self._parameters = None
40        self._parameter_names: Optional[List[str]] = None
41        self._group = None
42        self._plugin_name = None
43        self._version = None
44
45    def __repr__(self) -> str:
46        """Provide an easy to read description of the current plugin."""
47        return 'Plugin(name="{}", entry_point="{}")'.format(
48            self.name, self.entry_point.value
49        )
50
51    def to_dictionary(self):
52        """Convert this plugin to a dictionary."""
53        return {
54            "name": self.name,
55            "parameters": self.parameters,
56            "parameter_names": self.parameter_names,
57            "plugin": self.plugin,
58            "plugin_name": self.plugin_name,
59        }
60
61    def is_in_a_group(self):
62        """Determine if this plugin is in a group.
63
64        :returns:
65            True if the plugin is in a group, otherwise False.
66        :rtype:
67            bool
68        """
69        return self.group() is not None
70
71    def group(self):
72        """Find and parse the group the plugin is in."""
73        if self._group is None:
74            name = self.name.split(".", 1)
75            if len(name) > 1:
76                self._group = name[0]
77            else:
78                self._group = NO_GROUP_FOUND
79        if self._group is NO_GROUP_FOUND:
80            return None
81        return self._group
82
83    @property
84    def parameters(self):
85        """List of arguments that need to be passed to the plugin."""
86        if self._parameters is None:
87            self._parameters = utils.parameters_for(self)
88        return self._parameters
89
90    @property
91    def parameter_names(self) -> List[str]:
92        """List of argument names that need to be passed to the plugin."""
93        if self._parameter_names is None:
94            self._parameter_names = list(self.parameters)
95        return self._parameter_names
96
97    @property
98    def plugin(self):
99        """Load and return the plugin associated with the entry-point.
100
101        This property implicitly loads the plugin and then caches it.
102        """
103        self.load_plugin()
104        return self._plugin
105
106    @property
107    def version(self) -> str:
108        """Return the version of the plugin."""
109        version = self._version
110        if version is None:
111            if self.is_in_a_group():
112                version = self._version = version_for(self)
113            else:
114                version = self._version = self.plugin.version
115        return version
116
117    @property
118    def plugin_name(self):
119        """Return the name of the plugin."""
120        if self._plugin_name is None:
121            if self.is_in_a_group():
122                self._plugin_name = self.group()
123            else:
124                self._plugin_name = self.plugin.name
125
126        return self._plugin_name
127
128    @property
129    def off_by_default(self):
130        """Return whether the plugin is ignored by default."""
131        return getattr(self.plugin, "off_by_default", False)
132
133    def execute(self, *args, **kwargs):
134        r"""Call the plugin with \*args and \*\*kwargs."""
135        return self.plugin(*args, **kwargs)  # pylint: disable=not-callable
136
137    def _load(self):
138        self._plugin = self.entry_point.load()
139        if not callable(self._plugin):
140            msg = (
141                f"Plugin {self._plugin!r} is not a callable. It might be "
142                f"written for an older version of flake8 and might not work "
143                f"with this version"
144            )
145            LOG.critical(msg)
146            raise TypeError(msg)
147
148    def load_plugin(self):
149        """Retrieve the plugin for this entry-point.
150
151        This loads the plugin, stores it on the instance and then returns it.
152        It does not reload it after the first time, it merely returns the
153        cached plugin.
154
155        :returns:
156            Nothing
157        """
158        if self._plugin is None:
159            LOG.info('Loading plugin "%s" from entry-point.', self.name)
160            try:
161                self._load()
162            except Exception as load_exception:
163                LOG.exception(load_exception)
164                failed_to_load = exceptions.FailedToLoadPlugin(
165                    plugin_name=self.name, exception=load_exception
166                )
167                LOG.critical(str(failed_to_load))
168                raise failed_to_load
169
170    def enable(self, optmanager, options=None):
171        """Remove plugin name from the default ignore list."""
172        optmanager.remove_from_default_ignore([self.name])
173        optmanager.extend_default_select([self.name])
174        if not options:
175            return
176        try:
177            options.ignore.remove(self.name)
178        except (ValueError, KeyError):
179            LOG.debug(
180                "Attempted to remove %s from the ignore list but it was "
181                "not a member of the list.",
182                self.name,
183            )
184
185    def disable(self, optmanager):
186        """Add the plugin name to the default ignore list."""
187        optmanager.extend_default_ignore([self.name])
188
189    def provide_options(self, optmanager, options, extra_args):
190        """Pass the parsed options and extra arguments to the plugin."""
191        parse_options = getattr(self.plugin, "parse_options", None)
192        if parse_options is not None:
193            LOG.debug('Providing options to plugin "%s".', self.name)
194            try:
195                parse_options(optmanager, options, extra_args)
196            except TypeError:
197                parse_options(options)
198
199        if self.name in options.enable_extensions:
200            self.enable(optmanager, options)
201
202    def register_options(self, optmanager):
203        """Register the plugin's command-line options on the OptionManager.
204
205        :param optmanager:
206            Instantiated OptionManager to register options on.
207        :type optmanager:
208            flake8.options.manager.OptionManager
209        :returns:
210            Nothing
211        """
212        add_options = getattr(self.plugin, "add_options", None)
213        if add_options is not None:
214            LOG.debug(
215                'Registering options from plugin "%s" on OptionManager %r',
216                self.name,
217                optmanager,
218            )
219            with optmanager.group(self.plugin_name):
220                add_options(optmanager)
221
222        if self.off_by_default:
223            self.disable(optmanager)
224
225
226class PluginManager:  # pylint: disable=too-few-public-methods
227    """Find and manage plugins consistently."""
228
229    def __init__(
230        self, namespace: str, local_plugins: Optional[List[str]] = None
231    ) -> None:
232        """Initialize the manager.
233
234        :param str namespace:
235            Namespace of the plugins to manage, e.g., 'flake8.extension'.
236        :param list local_plugins:
237            Plugins from config (as "X = path.to:Plugin" strings).
238        """
239        self.namespace = namespace
240        self.plugins: Dict[str, Plugin] = {}
241        self.names: List[str] = []
242        self._load_local_plugins(local_plugins or [])
243        self._load_entrypoint_plugins()
244
245    def _load_local_plugins(self, local_plugins):
246        """Load local plugins from config.
247
248        :param list local_plugins:
249            Plugins from config (as "X = path.to:Plugin" strings).
250        """
251        for plugin_str in local_plugins:
252            name, _, entry_str = plugin_str.partition("=")
253            name, entry_str = name.strip(), entry_str.strip()
254            entry_point = importlib_metadata.EntryPoint(
255                name, entry_str, self.namespace
256            )
257            self._load_plugin_from_entrypoint(entry_point, local=True)
258
259    def _load_entrypoint_plugins(self):
260        LOG.info('Loading entry-points for "%s".', self.namespace)
261        eps = importlib_metadata.entry_points().get(self.namespace, ())
262        # python2.7 occasionally gives duplicate results due to redundant
263        # `local/lib` -> `../lib` symlink on linux in virtualenvs so we
264        # eliminate duplicates here
265        for entry_point in sorted(frozenset(eps)):
266            if entry_point.name == "per-file-ignores":
267                LOG.warning(
268                    "flake8-per-file-ignores plugin is incompatible with "
269                    "flake8>=3.7 (which implements per-file-ignores itself)."
270                )
271                continue
272            self._load_plugin_from_entrypoint(entry_point)
273
274    def _load_plugin_from_entrypoint(self, entry_point, local=False):
275        """Load a plugin from a setuptools EntryPoint.
276
277        :param EntryPoint entry_point:
278            EntryPoint to load plugin from.
279        :param bool local:
280            Is this a repo-local plugin?
281        """
282        name = entry_point.name
283        self.plugins[name] = Plugin(name, entry_point, local=local)
284        self.names.append(name)
285        LOG.debug('Loaded %r for plugin "%s".', self.plugins[name], name)
286
287    def map(self, func, *args, **kwargs):
288        r"""Call ``func`` with the plugin and \*args and \**kwargs after.
289
290        This yields the return value from ``func`` for each plugin.
291
292        :param collections.Callable func:
293            Function to call with each plugin. Signature should at least be:
294
295            .. code-block:: python
296
297                def myfunc(plugin):
298                     pass
299
300            Any extra positional or keyword arguments specified with map will
301            be passed along to this function after the plugin. The plugin
302            passed is a :class:`~flake8.plugins.manager.Plugin`.
303        :param args:
304            Positional arguments to pass to ``func`` after each plugin.
305        :param kwargs:
306            Keyword arguments to pass to ``func`` after each plugin.
307        """
308        for name in self.names:
309            yield func(self.plugins[name], *args, **kwargs)
310
311    def versions(self):
312        # () -> (str, str)
313        """Generate the versions of plugins.
314
315        :returns:
316            Tuples of the plugin_name and version
317        :rtype:
318            tuple
319        """
320        plugins_seen: Set[str] = set()
321        for entry_point_name in self.names:
322            plugin = self.plugins[entry_point_name]
323            plugin_name = plugin.plugin_name
324            if plugin.plugin_name in plugins_seen:
325                continue
326            plugins_seen.add(plugin_name)
327            yield (plugin_name, plugin.version)
328
329
330def version_for(plugin):
331    # (Plugin) -> Optional[str]
332    """Determine the version of a plugin by its module.
333
334    :param plugin:
335        The loaded plugin
336    :type plugin:
337        Plugin
338    :returns:
339        version string for the module
340    :rtype:
341        str
342    """
343    module_name = plugin.plugin.__module__
344    try:
345        module = __import__(module_name)
346    except ImportError:
347        return None
348
349    return getattr(module, "__version__", None)
350
351
352class PluginTypeManager:
353    """Parent class for most of the specific plugin types."""
354
355    namespace: str
356
357    def __init__(self, local_plugins=None):
358        """Initialize the plugin type's manager.
359
360        :param list local_plugins:
361            Plugins from config file instead of entry-points
362        """
363        self.manager = PluginManager(
364            self.namespace, local_plugins=local_plugins
365        )
366        self.plugins_loaded = False
367
368    def __contains__(self, name):
369        """Check if the entry-point name is in this plugin type manager."""
370        LOG.debug('Checking for "%s" in plugin type manager.', name)
371        return name in self.plugins
372
373    def __getitem__(self, name):
374        """Retrieve a plugin by its name."""
375        LOG.debug('Retrieving plugin for "%s".', name)
376        return self.plugins[name]
377
378    def get(self, name, default=None):
379        """Retrieve the plugin referred to by ``name`` or return the default.
380
381        :param str name:
382            Name of the plugin to retrieve.
383        :param default:
384            Default value to return.
385        :returns:
386            Plugin object referred to by name, if it exists.
387        :rtype:
388            :class:`Plugin`
389        """
390        if name in self:
391            return self[name]
392        return default
393
394    @property
395    def names(self):
396        """Proxy attribute to underlying manager."""
397        return self.manager.names
398
399    @property
400    def plugins(self):
401        """Proxy attribute to underlying manager."""
402        return self.manager.plugins
403
404    @staticmethod
405    def _generate_call_function(method_name, optmanager, *args, **kwargs):
406        def generated_function(plugin):
407            method = getattr(plugin, method_name, None)
408            if method is not None and callable(method):
409                return method(optmanager, *args, **kwargs)
410
411        return generated_function
412
413    def load_plugins(self):
414        """Load all plugins of this type that are managed by this manager."""
415        if self.plugins_loaded:
416            return
417
418        def load_plugin(plugin):
419            """Call each plugin's load_plugin method."""
420            return plugin.load_plugin()
421
422        plugins = list(self.manager.map(load_plugin))
423        # Do not set plugins_loaded if we run into an exception
424        self.plugins_loaded = True
425        return plugins
426
427    def register_plugin_versions(self, optmanager):
428        """Register the plugins and their versions with the OptionManager."""
429        self.load_plugins()
430        for (plugin_name, version) in self.manager.versions():
431            optmanager.register_plugin(name=plugin_name, version=version)
432
433    def register_options(self, optmanager):
434        """Register all of the checkers' options to the OptionManager."""
435        self.load_plugins()
436        call_register_options = self._generate_call_function(
437            "register_options", optmanager
438        )
439
440        list(self.manager.map(call_register_options))
441
442    def provide_options(self, optmanager, options, extra_args):
443        """Provide parsed options and extra arguments to the plugins."""
444        call_provide_options = self._generate_call_function(
445            "provide_options", optmanager, options, extra_args
446        )
447
448        list(self.manager.map(call_provide_options))
449
450
451class Checkers(PluginTypeManager):
452    """All of the checkers registered through entry-points or config."""
453
454    namespace = "flake8.extension"
455
456    def checks_expecting(self, argument_name):
457        """Retrieve checks that expect an argument with the specified name.
458
459        Find all checker plugins that are expecting a specific argument.
460        """
461        for plugin in self.plugins.values():
462            if argument_name == plugin.parameter_names[0]:
463                yield plugin
464
465    def to_dictionary(self):
466        """Return a dictionary of AST and line-based plugins."""
467        return {
468            "ast_plugins": [
469                plugin.to_dictionary() for plugin in self.ast_plugins
470            ],
471            "logical_line_plugins": [
472                plugin.to_dictionary() for plugin in self.logical_line_plugins
473            ],
474            "physical_line_plugins": [
475                plugin.to_dictionary() for plugin in self.physical_line_plugins
476            ],
477        }
478
479    def register_options(self, optmanager):
480        """Register all of the checkers' options to the OptionManager.
481
482        This also ensures that plugins that are not part of a group and are
483        enabled by default are enabled on the option manager.
484        """
485        # NOTE(sigmavirus24) We reproduce a little of
486        # PluginTypeManager.register_options to reduce the number of times
487        # that we loop over the list of plugins. Instead of looping twice,
488        # option registration and enabling the plugin, we loop once with one
489        # function to map over the plugins.
490        self.load_plugins()
491        call_register_options = self._generate_call_function(
492            "register_options", optmanager
493        )
494
495        def register_and_enable(plugin):
496            call_register_options(plugin)
497            if plugin.group() is None and not plugin.off_by_default:
498                plugin.enable(optmanager)
499
500        list(self.manager.map(register_and_enable))
501
502    @property
503    def ast_plugins(self):
504        """List of plugins that expect the AST tree."""
505        plugins = getattr(self, "_ast_plugins", [])
506        if not plugins:
507            plugins = list(self.checks_expecting("tree"))
508            self._ast_plugins = plugins
509        return plugins
510
511    @property
512    def logical_line_plugins(self):
513        """List of plugins that expect the logical lines."""
514        plugins = getattr(self, "_logical_line_plugins", [])
515        if not plugins:
516            plugins = list(self.checks_expecting("logical_line"))
517            self._logical_line_plugins = plugins
518        return plugins
519
520    @property
521    def physical_line_plugins(self):
522        """List of plugins that expect the physical lines."""
523        plugins = getattr(self, "_physical_line_plugins", [])
524        if not plugins:
525            plugins = list(self.checks_expecting("physical_line"))
526            self._physical_line_plugins = plugins
527        return plugins
528
529
530class ReportFormatters(PluginTypeManager):
531    """All of the report formatters registered through entry-points/config."""
532
533    namespace = "flake8.report"
534