1"""Plugin loading and management logic and classes.""" 2import logging 3from typing import Any 4from typing import Dict 5from typing import List 6from typing import Optional 7from typing import Set 8 9from flake8 import exceptions 10from flake8 import utils 11from flake8._compat import importlib_metadata 12 13LOG = logging.getLogger(__name__) 14 15__all__ = ("Checkers", "Plugin", "PluginManager", "ReportFormatters") 16 17NO_GROUP_FOUND = object() 18 19 20class Plugin: 21 """Wrap an EntryPoint from setuptools and other logic.""" 22 23 def __init__(self, name, entry_point, local=False): 24 """Initialize our Plugin. 25 26 :param str name: 27 Name of the entry-point as it was registered with setuptools. 28 :param entry_point: 29 EntryPoint returned by setuptools. 30 :type entry_point: 31 setuptools.EntryPoint 32 :param bool local: 33 Is this a repo-local plugin? 34 """ 35 self.name = name 36 self.entry_point = entry_point 37 self.local = local 38 self._plugin: Any = None 39 self._parameters = None 40 self._parameter_names: Optional[List[str]] = None 41 self._group = None 42 self._plugin_name = None 43 self._version = None 44 45 def __repr__(self) -> str: 46 """Provide an easy to read description of the current plugin.""" 47 return 'Plugin(name="{}", entry_point="{}")'.format( 48 self.name, self.entry_point.value 49 ) 50 51 def to_dictionary(self): 52 """Convert this plugin to a dictionary.""" 53 return { 54 "name": self.name, 55 "parameters": self.parameters, 56 "parameter_names": self.parameter_names, 57 "plugin": self.plugin, 58 "plugin_name": self.plugin_name, 59 } 60 61 def is_in_a_group(self): 62 """Determine if this plugin is in a group. 63 64 :returns: 65 True if the plugin is in a group, otherwise False. 66 :rtype: 67 bool 68 """ 69 return self.group() is not None 70 71 def group(self): 72 """Find and parse the group the plugin is in.""" 73 if self._group is None: 74 name = self.name.split(".", 1) 75 if len(name) > 1: 76 self._group = name[0] 77 else: 78 self._group = NO_GROUP_FOUND 79 if self._group is NO_GROUP_FOUND: 80 return None 81 return self._group 82 83 @property 84 def parameters(self): 85 """List of arguments that need to be passed to the plugin.""" 86 if self._parameters is None: 87 self._parameters = utils.parameters_for(self) 88 return self._parameters 89 90 @property 91 def parameter_names(self) -> List[str]: 92 """List of argument names that need to be passed to the plugin.""" 93 if self._parameter_names is None: 94 self._parameter_names = list(self.parameters) 95 return self._parameter_names 96 97 @property 98 def plugin(self): 99 """Load and return the plugin associated with the entry-point. 100 101 This property implicitly loads the plugin and then caches it. 102 """ 103 self.load_plugin() 104 return self._plugin 105 106 @property 107 def version(self) -> str: 108 """Return the version of the plugin.""" 109 version = self._version 110 if version is None: 111 if self.is_in_a_group(): 112 version = self._version = version_for(self) 113 else: 114 version = self._version = self.plugin.version 115 return version 116 117 @property 118 def plugin_name(self): 119 """Return the name of the plugin.""" 120 if self._plugin_name is None: 121 if self.is_in_a_group(): 122 self._plugin_name = self.group() 123 else: 124 self._plugin_name = self.plugin.name 125 126 return self._plugin_name 127 128 @property 129 def off_by_default(self): 130 """Return whether the plugin is ignored by default.""" 131 return getattr(self.plugin, "off_by_default", False) 132 133 def execute(self, *args, **kwargs): 134 r"""Call the plugin with \*args and \*\*kwargs.""" 135 return self.plugin(*args, **kwargs) # pylint: disable=not-callable 136 137 def _load(self): 138 self._plugin = self.entry_point.load() 139 if not callable(self._plugin): 140 msg = ( 141 f"Plugin {self._plugin!r} is not a callable. It might be " 142 f"written for an older version of flake8 and might not work " 143 f"with this version" 144 ) 145 LOG.critical(msg) 146 raise TypeError(msg) 147 148 def load_plugin(self): 149 """Retrieve the plugin for this entry-point. 150 151 This loads the plugin, stores it on the instance and then returns it. 152 It does not reload it after the first time, it merely returns the 153 cached plugin. 154 155 :returns: 156 Nothing 157 """ 158 if self._plugin is None: 159 LOG.info('Loading plugin "%s" from entry-point.', self.name) 160 try: 161 self._load() 162 except Exception as load_exception: 163 LOG.exception(load_exception) 164 failed_to_load = exceptions.FailedToLoadPlugin( 165 plugin_name=self.name, exception=load_exception 166 ) 167 LOG.critical(str(failed_to_load)) 168 raise failed_to_load 169 170 def enable(self, optmanager, options=None): 171 """Remove plugin name from the default ignore list.""" 172 optmanager.remove_from_default_ignore([self.name]) 173 optmanager.extend_default_select([self.name]) 174 if not options: 175 return 176 try: 177 options.ignore.remove(self.name) 178 except (ValueError, KeyError): 179 LOG.debug( 180 "Attempted to remove %s from the ignore list but it was " 181 "not a member of the list.", 182 self.name, 183 ) 184 185 def disable(self, optmanager): 186 """Add the plugin name to the default ignore list.""" 187 optmanager.extend_default_ignore([self.name]) 188 189 def provide_options(self, optmanager, options, extra_args): 190 """Pass the parsed options and extra arguments to the plugin.""" 191 parse_options = getattr(self.plugin, "parse_options", None) 192 if parse_options is not None: 193 LOG.debug('Providing options to plugin "%s".', self.name) 194 try: 195 parse_options(optmanager, options, extra_args) 196 except TypeError: 197 parse_options(options) 198 199 if self.name in options.enable_extensions: 200 self.enable(optmanager, options) 201 202 def register_options(self, optmanager): 203 """Register the plugin's command-line options on the OptionManager. 204 205 :param optmanager: 206 Instantiated OptionManager to register options on. 207 :type optmanager: 208 flake8.options.manager.OptionManager 209 :returns: 210 Nothing 211 """ 212 add_options = getattr(self.plugin, "add_options", None) 213 if add_options is not None: 214 LOG.debug( 215 'Registering options from plugin "%s" on OptionManager %r', 216 self.name, 217 optmanager, 218 ) 219 with optmanager.group(self.plugin_name): 220 add_options(optmanager) 221 222 if self.off_by_default: 223 self.disable(optmanager) 224 225 226class PluginManager: # pylint: disable=too-few-public-methods 227 """Find and manage plugins consistently.""" 228 229 def __init__( 230 self, namespace: str, local_plugins: Optional[List[str]] = None 231 ) -> None: 232 """Initialize the manager. 233 234 :param str namespace: 235 Namespace of the plugins to manage, e.g., 'flake8.extension'. 236 :param list local_plugins: 237 Plugins from config (as "X = path.to:Plugin" strings). 238 """ 239 self.namespace = namespace 240 self.plugins: Dict[str, Plugin] = {} 241 self.names: List[str] = [] 242 self._load_local_plugins(local_plugins or []) 243 self._load_entrypoint_plugins() 244 245 def _load_local_plugins(self, local_plugins): 246 """Load local plugins from config. 247 248 :param list local_plugins: 249 Plugins from config (as "X = path.to:Plugin" strings). 250 """ 251 for plugin_str in local_plugins: 252 name, _, entry_str = plugin_str.partition("=") 253 name, entry_str = name.strip(), entry_str.strip() 254 entry_point = importlib_metadata.EntryPoint( 255 name, entry_str, self.namespace 256 ) 257 self._load_plugin_from_entrypoint(entry_point, local=True) 258 259 def _load_entrypoint_plugins(self): 260 LOG.info('Loading entry-points for "%s".', self.namespace) 261 eps = importlib_metadata.entry_points().get(self.namespace, ()) 262 # python2.7 occasionally gives duplicate results due to redundant 263 # `local/lib` -> `../lib` symlink on linux in virtualenvs so we 264 # eliminate duplicates here 265 for entry_point in sorted(frozenset(eps)): 266 if entry_point.name == "per-file-ignores": 267 LOG.warning( 268 "flake8-per-file-ignores plugin is incompatible with " 269 "flake8>=3.7 (which implements per-file-ignores itself)." 270 ) 271 continue 272 self._load_plugin_from_entrypoint(entry_point) 273 274 def _load_plugin_from_entrypoint(self, entry_point, local=False): 275 """Load a plugin from a setuptools EntryPoint. 276 277 :param EntryPoint entry_point: 278 EntryPoint to load plugin from. 279 :param bool local: 280 Is this a repo-local plugin? 281 """ 282 name = entry_point.name 283 self.plugins[name] = Plugin(name, entry_point, local=local) 284 self.names.append(name) 285 LOG.debug('Loaded %r for plugin "%s".', self.plugins[name], name) 286 287 def map(self, func, *args, **kwargs): 288 r"""Call ``func`` with the plugin and \*args and \**kwargs after. 289 290 This yields the return value from ``func`` for each plugin. 291 292 :param collections.Callable func: 293 Function to call with each plugin. Signature should at least be: 294 295 .. code-block:: python 296 297 def myfunc(plugin): 298 pass 299 300 Any extra positional or keyword arguments specified with map will 301 be passed along to this function after the plugin. The plugin 302 passed is a :class:`~flake8.plugins.manager.Plugin`. 303 :param args: 304 Positional arguments to pass to ``func`` after each plugin. 305 :param kwargs: 306 Keyword arguments to pass to ``func`` after each plugin. 307 """ 308 for name in self.names: 309 yield func(self.plugins[name], *args, **kwargs) 310 311 def versions(self): 312 # () -> (str, str) 313 """Generate the versions of plugins. 314 315 :returns: 316 Tuples of the plugin_name and version 317 :rtype: 318 tuple 319 """ 320 plugins_seen: Set[str] = set() 321 for entry_point_name in self.names: 322 plugin = self.plugins[entry_point_name] 323 plugin_name = plugin.plugin_name 324 if plugin.plugin_name in plugins_seen: 325 continue 326 plugins_seen.add(plugin_name) 327 yield (plugin_name, plugin.version) 328 329 330def version_for(plugin): 331 # (Plugin) -> Optional[str] 332 """Determine the version of a plugin by its module. 333 334 :param plugin: 335 The loaded plugin 336 :type plugin: 337 Plugin 338 :returns: 339 version string for the module 340 :rtype: 341 str 342 """ 343 module_name = plugin.plugin.__module__ 344 try: 345 module = __import__(module_name) 346 except ImportError: 347 return None 348 349 return getattr(module, "__version__", None) 350 351 352class PluginTypeManager: 353 """Parent class for most of the specific plugin types.""" 354 355 namespace: str 356 357 def __init__(self, local_plugins=None): 358 """Initialize the plugin type's manager. 359 360 :param list local_plugins: 361 Plugins from config file instead of entry-points 362 """ 363 self.manager = PluginManager( 364 self.namespace, local_plugins=local_plugins 365 ) 366 self.plugins_loaded = False 367 368 def __contains__(self, name): 369 """Check if the entry-point name is in this plugin type manager.""" 370 LOG.debug('Checking for "%s" in plugin type manager.', name) 371 return name in self.plugins 372 373 def __getitem__(self, name): 374 """Retrieve a plugin by its name.""" 375 LOG.debug('Retrieving plugin for "%s".', name) 376 return self.plugins[name] 377 378 def get(self, name, default=None): 379 """Retrieve the plugin referred to by ``name`` or return the default. 380 381 :param str name: 382 Name of the plugin to retrieve. 383 :param default: 384 Default value to return. 385 :returns: 386 Plugin object referred to by name, if it exists. 387 :rtype: 388 :class:`Plugin` 389 """ 390 if name in self: 391 return self[name] 392 return default 393 394 @property 395 def names(self): 396 """Proxy attribute to underlying manager.""" 397 return self.manager.names 398 399 @property 400 def plugins(self): 401 """Proxy attribute to underlying manager.""" 402 return self.manager.plugins 403 404 @staticmethod 405 def _generate_call_function(method_name, optmanager, *args, **kwargs): 406 def generated_function(plugin): 407 method = getattr(plugin, method_name, None) 408 if method is not None and callable(method): 409 return method(optmanager, *args, **kwargs) 410 411 return generated_function 412 413 def load_plugins(self): 414 """Load all plugins of this type that are managed by this manager.""" 415 if self.plugins_loaded: 416 return 417 418 def load_plugin(plugin): 419 """Call each plugin's load_plugin method.""" 420 return plugin.load_plugin() 421 422 plugins = list(self.manager.map(load_plugin)) 423 # Do not set plugins_loaded if we run into an exception 424 self.plugins_loaded = True 425 return plugins 426 427 def register_plugin_versions(self, optmanager): 428 """Register the plugins and their versions with the OptionManager.""" 429 self.load_plugins() 430 for (plugin_name, version) in self.manager.versions(): 431 optmanager.register_plugin(name=plugin_name, version=version) 432 433 def register_options(self, optmanager): 434 """Register all of the checkers' options to the OptionManager.""" 435 self.load_plugins() 436 call_register_options = self._generate_call_function( 437 "register_options", optmanager 438 ) 439 440 list(self.manager.map(call_register_options)) 441 442 def provide_options(self, optmanager, options, extra_args): 443 """Provide parsed options and extra arguments to the plugins.""" 444 call_provide_options = self._generate_call_function( 445 "provide_options", optmanager, options, extra_args 446 ) 447 448 list(self.manager.map(call_provide_options)) 449 450 451class Checkers(PluginTypeManager): 452 """All of the checkers registered through entry-points or config.""" 453 454 namespace = "flake8.extension" 455 456 def checks_expecting(self, argument_name): 457 """Retrieve checks that expect an argument with the specified name. 458 459 Find all checker plugins that are expecting a specific argument. 460 """ 461 for plugin in self.plugins.values(): 462 if argument_name == plugin.parameter_names[0]: 463 yield plugin 464 465 def to_dictionary(self): 466 """Return a dictionary of AST and line-based plugins.""" 467 return { 468 "ast_plugins": [ 469 plugin.to_dictionary() for plugin in self.ast_plugins 470 ], 471 "logical_line_plugins": [ 472 plugin.to_dictionary() for plugin in self.logical_line_plugins 473 ], 474 "physical_line_plugins": [ 475 plugin.to_dictionary() for plugin in self.physical_line_plugins 476 ], 477 } 478 479 def register_options(self, optmanager): 480 """Register all of the checkers' options to the OptionManager. 481 482 This also ensures that plugins that are not part of a group and are 483 enabled by default are enabled on the option manager. 484 """ 485 # NOTE(sigmavirus24) We reproduce a little of 486 # PluginTypeManager.register_options to reduce the number of times 487 # that we loop over the list of plugins. Instead of looping twice, 488 # option registration and enabling the plugin, we loop once with one 489 # function to map over the plugins. 490 self.load_plugins() 491 call_register_options = self._generate_call_function( 492 "register_options", optmanager 493 ) 494 495 def register_and_enable(plugin): 496 call_register_options(plugin) 497 if plugin.group() is None and not plugin.off_by_default: 498 plugin.enable(optmanager) 499 500 list(self.manager.map(register_and_enable)) 501 502 @property 503 def ast_plugins(self): 504 """List of plugins that expect the AST tree.""" 505 plugins = getattr(self, "_ast_plugins", []) 506 if not plugins: 507 plugins = list(self.checks_expecting("tree")) 508 self._ast_plugins = plugins 509 return plugins 510 511 @property 512 def logical_line_plugins(self): 513 """List of plugins that expect the logical lines.""" 514 plugins = getattr(self, "_logical_line_plugins", []) 515 if not plugins: 516 plugins = list(self.checks_expecting("logical_line")) 517 self._logical_line_plugins = plugins 518 return plugins 519 520 @property 521 def physical_line_plugins(self): 522 """List of plugins that expect the physical lines.""" 523 plugins = getattr(self, "_physical_line_plugins", []) 524 if not plugins: 525 plugins = list(self.checks_expecting("physical_line")) 526 self._physical_line_plugins = plugins 527 return plugins 528 529 530class ReportFormatters(PluginTypeManager): 531 """All of the report formatters registered through entry-points/config.""" 532 533 namespace = "flake8.report" 534