1#!/usr/bin/env python 2# Copyright (c) 2013-2014 Will Thames <will@thames.id.au> 3# 4# Permission is hereby granted, free of charge, to any person obtaining a copy 5# of this software and associated documentation files (the "Software"), to deal 6# in the Software without restriction, including without limitation the rights 7# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8# copies of the Software, and to permit persons to whom the Software is 9# furnished to do so, subject to the following conditions: 10# 11# The above copyright notice and this permission notice shall be included in 12# all copies or substantial portions of the Software. 13# 14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20# THE SOFTWARE. 21"""Command line implementation.""" 22 23import errno 24import hashlib 25import logging 26import os 27import pathlib 28import subprocess 29import sys 30from argparse import Namespace 31from contextlib import contextmanager 32from typing import TYPE_CHECKING, Any, Callable, Dict, Iterator, List, Optional 33 34from enrich.console import should_do_markup 35 36from ansiblelint import cli 37from ansiblelint.app import App 38from ansiblelint.color import ( 39 console, 40 console_options, 41 console_stderr, 42 reconfigure, 43 render_yaml, 44) 45from ansiblelint.config import options 46from ansiblelint.constants import ANSIBLE_MISSING_RC, EXIT_CONTROL_C_RC 47from ansiblelint.file_utils import abspath, cwd, normpath 48from ansiblelint.prerun import check_ansible_presence, prepare_environment 49from ansiblelint.skip_utils import normalize_tag 50from ansiblelint.version import __version__ 51 52if TYPE_CHECKING: 53 from ansiblelint.runner import LintResult 54 55 56_logger = logging.getLogger(__name__) 57 58 59def initialize_logger(level: int = 0) -> None: 60 """Set up the global logging level based on the verbosity number.""" 61 # We are about to act on the root logger, which defaults to logging.WARNING. 62 # That is where our 0 (default) value comes from. 63 VERBOSITY_MAP = { 64 -2: logging.CRITICAL, 65 -1: logging.ERROR, 66 0: logging.WARNING, 67 1: logging.INFO, 68 2: logging.DEBUG, 69 } 70 71 handler = logging.StreamHandler() 72 formatter = logging.Formatter('%(levelname)-8s %(message)s') 73 handler.setFormatter(formatter) 74 logger = logging.getLogger() 75 logger.addHandler(handler) 76 # Unknown logging level is treated as DEBUG 77 logging_level = VERBOSITY_MAP.get(level, logging.DEBUG) 78 logger.setLevel(logging_level) 79 # Use module-level _logger instance to validate it 80 _logger.debug("Logging initialized to level %s", logging_level) 81 82 83def initialize_options(arguments: Optional[List[str]] = None) -> None: 84 """Load config options and store them inside options module.""" 85 new_options = cli.get_config(arguments or []) 86 new_options.cwd = pathlib.Path.cwd() 87 88 if new_options.version: 89 ansible_version, err = check_ansible_presence() 90 print( 91 'ansible-lint {ver!s} using ansible {ansible_ver!s}'.format( 92 ver=__version__, ansible_ver=ansible_version 93 ) 94 ) 95 if err: 96 _logger.error(err) 97 sys.exit(ANSIBLE_MISSING_RC) 98 sys.exit(0) 99 100 if new_options.colored is None: 101 new_options.colored = should_do_markup() 102 103 # persist loaded configuration inside options module 104 for k, v in new_options.__dict__.items(): 105 setattr(options, k, v) 106 107 # rename deprecated ids/tags to newer names 108 options.tags = [normalize_tag(tag) for tag in options.tags] 109 options.skip_list = [normalize_tag(tag) for tag in options.skip_list] 110 options.warn_list = [normalize_tag(tag) for tag in options.warn_list] 111 112 options.configured = True 113 # 6 chars of entropy should be enough 114 cache_key = hashlib.sha256( 115 os.path.abspath(options.project_dir).encode() 116 ).hexdigest()[:6] 117 options.cache_dir = "%s/ansible-lint/%s" % ( 118 os.getenv("XDG_CACHE_HOME", os.path.expanduser("~/.cache")), 119 cache_key, 120 ) 121 122 123def report_outcome( # noqa: C901 124 result: "LintResult", options: Namespace, mark_as_success: bool = False 125) -> int: 126 """Display information about how to skip found rules. 127 128 Returns exit code, 2 if errors were found, 0 when only warnings were found. 129 """ 130 failures = 0 131 warnings = 0 132 msg = "" 133 matches_unignored = [match for match in result.matches if not match.ignored] 134 135 # counting 136 matched_rules = {match.rule.id: match.rule for match in matches_unignored} 137 for match in result.matches: 138 if {match.rule.id, *match.rule.tags}.isdisjoint(options.warn_list): 139 failures += 1 140 else: 141 warnings += 1 142 143 # remove unskippable rules from the list 144 for rule_id in list(matched_rules.keys()): 145 if 'unskippable' in matched_rules[rule_id].tags: 146 matched_rules.pop(rule_id) 147 148 entries = [] 149 for key in sorted(matched_rules.keys()): 150 if {key, *matched_rules[key].tags}.isdisjoint(options.warn_list): 151 entries.append(f" - {key} # {matched_rules[key].shortdesc}\n") 152 for match in result.matches: 153 if "experimental" in match.rule.tags: 154 entries.append(" - experimental # all rules tagged as experimental\n") 155 break 156 if entries and not options.quiet: 157 console_stderr.print( 158 "You can skip specific rules or tags by adding them to your " 159 "configuration file:" 160 ) 161 msg += """\ 162# .ansible-lint 163warn_list: # or 'skip_list' to silence them completely 164""" 165 msg += "".join(sorted(entries)) 166 167 # Do not deprecate the old tags just yet. Why? Because it is not currently feasible 168 # to migrate old tags to new tags. There are a lot of things out there that still 169 # use ansible-lint 4 (for example, Ansible Galaxy and Automation Hub imports). If we 170 # replace the old tags, those tools will report warnings. If we do not replace them, 171 # ansible-lint 5 will report warnings. 172 # 173 # We can do the deprecation once the ecosystem caught up at least a bit. 174 # for k, v in used_old_tags.items(): 175 # _logger.warning( 176 # "Replaced deprecated tag '%s' with '%s' but it will become an " 177 # "error in the future.", 178 # k, 179 # v, 180 # ) 181 182 if result.matches and not options.quiet: 183 console_stderr.print(render_yaml(msg)) 184 console_stderr.print( 185 f"Finished with {failures} failure(s), {warnings} warning(s) " 186 f"on {len(result.files)} files." 187 ) 188 189 if mark_as_success or not failures: 190 return 0 191 return 2 192 193 194def main(argv: Optional[List[str]] = None) -> int: 195 """Linter CLI entry point.""" 196 if argv is None: 197 argv = sys.argv 198 initialize_options(argv[1:]) 199 200 console_options["force_terminal"] = options.colored 201 reconfigure(console_options) 202 203 initialize_logger(options.verbosity) 204 _logger.debug("Options: %s", options) 205 _logger.debug(os.getcwd()) 206 207 app = App(options=options) 208 209 prepare_environment() 210 check_ansible_presence(exit_on_error=True) 211 212 # On purpose lazy-imports to avoid pre-loading Ansible 213 # pylint: disable=import-outside-toplevel 214 from ansiblelint.generate_docs import rules_as_rich, rules_as_rst, rules_as_str 215 from ansiblelint.rules import RulesCollection 216 217 rules = RulesCollection(options.rulesdirs) 218 219 if options.listrules: 220 221 _rule_format_map: Dict[str, Callable[..., Any]] = { 222 'plain': rules_as_str, 223 'rich': rules_as_rich, 224 'rst': rules_as_rst, 225 } 226 227 console.print(_rule_format_map[options.format](rules), highlight=False) 228 return 0 229 230 if options.listtags: 231 console.print(render_yaml(rules.listtags())) 232 return 0 233 234 if isinstance(options.tags, str): 235 options.tags = options.tags.split(',') 236 237 from ansiblelint.runner import _get_matches 238 239 result = _get_matches(rules, options) 240 241 mark_as_success = False 242 if result.matches and options.progressive: 243 _logger.info( 244 "Matches found, running again on previous revision in order to detect regressions" 245 ) 246 with _previous_revision(): 247 _logger.debug("Options: %s", options) 248 _logger.debug(os.getcwd()) 249 old_result = _get_matches(rules, options) 250 # remove old matches from current list 251 matches_delta = list(set(result.matches) - set(old_result.matches)) 252 if len(matches_delta) == 0: 253 _logger.warning( 254 "Total violations not increased since previous " 255 "commit, will mark result as success. (%s -> %s)", 256 len(old_result.matches), 257 len(matches_delta), 258 ) 259 mark_as_success = True 260 261 ignored = 0 262 for match in result.matches: 263 # if match is not new, mark is as ignored 264 if match not in matches_delta: 265 match.ignored = True 266 ignored += 1 267 if ignored: 268 _logger.warning( 269 "Marked %s previously known violation(s) as ignored due to" 270 " progressive mode.", 271 ignored, 272 ) 273 274 app.render_matches(result.matches) 275 276 return report_outcome(result, mark_as_success=mark_as_success, options=options) 277 278 279@contextmanager 280def _previous_revision() -> Iterator[None]: 281 """Create or update a temporary workdir containing the previous revision.""" 282 worktree_dir = f"{options.cache_dir}/old-rev" 283 # Update options.exclude_paths to include use the temporary workdir. 284 rel_exclude_paths = [normpath(p) for p in options.exclude_paths] 285 options.exclude_paths = [abspath(p, worktree_dir) for p in rel_exclude_paths] 286 revision = subprocess.run( 287 ["git", "rev-parse", "HEAD^1"], 288 check=True, 289 universal_newlines=True, 290 stdout=subprocess.PIPE, 291 stderr=subprocess.DEVNULL, 292 ).stdout 293 p = pathlib.Path(worktree_dir) 294 p.mkdir(parents=True, exist_ok=True) 295 os.system(f"git worktree add -f {worktree_dir} 2>/dev/null") 296 try: 297 with cwd(worktree_dir): 298 os.system(f"git checkout {revision}") 299 yield 300 finally: 301 options.exclude_paths = [abspath(p, os.getcwd()) for p in rel_exclude_paths] 302 303 304def _run_cli_entrypoint() -> None: 305 """Invoke the main entrypoint with current CLI args. 306 307 This function also processes the runtime exceptions. 308 """ 309 try: 310 sys.exit(main(sys.argv)) 311 except IOError as exc: 312 # NOTE: Only "broken pipe" is acceptable to ignore 313 if exc.errno != errno.EPIPE: 314 raise 315 except KeyboardInterrupt: 316 sys.exit(EXIT_CONTROL_C_RC) 317 except RuntimeError as e: 318 raise SystemExit(str(e)) 319 320 321if __name__ == "__main__": 322 _run_cli_entrypoint() 323