1#!/usr/bin/env python
2# Copyright (c) 2013-2014 Will Thames <will@thames.id.au>
3#
4# Permission is hereby granted, free of charge, to any person obtaining a copy
5# of this software and associated documentation files (the "Software"), to deal
6# in the Software without restriction, including without limitation the rights
7# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8# copies of the Software, and to permit persons to whom the Software is
9# furnished to do so, subject to the following conditions:
10#
11# The above copyright notice and this permission notice shall be included in
12# all copies or substantial portions of the Software.
13#
14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20# THE SOFTWARE.
21"""Command line implementation."""
22
23import errno
24import hashlib
25import logging
26import os
27import pathlib
28import subprocess
29import sys
30from argparse import Namespace
31from contextlib import contextmanager
32from typing import TYPE_CHECKING, Any, Callable, Dict, Iterator, List, Optional
33
34from enrich.console import should_do_markup
35
36from ansiblelint import cli
37from ansiblelint.app import App
38from ansiblelint.color import (
39    console,
40    console_options,
41    console_stderr,
42    reconfigure,
43    render_yaml,
44)
45from ansiblelint.config import options
46from ansiblelint.constants import ANSIBLE_MISSING_RC, EXIT_CONTROL_C_RC
47from ansiblelint.file_utils import abspath, cwd, normpath
48from ansiblelint.prerun import check_ansible_presence, prepare_environment
49from ansiblelint.skip_utils import normalize_tag
50from ansiblelint.version import __version__
51
52if TYPE_CHECKING:
53    from ansiblelint.runner import LintResult
54
55
56_logger = logging.getLogger(__name__)
57
58
59def initialize_logger(level: int = 0) -> None:
60    """Set up the global logging level based on the verbosity number."""
61    # We are about to act on the root logger, which defaults to logging.WARNING.
62    # That is where our 0 (default) value comes from.
63    VERBOSITY_MAP = {
64        -2: logging.CRITICAL,
65        -1: logging.ERROR,
66        0: logging.WARNING,
67        1: logging.INFO,
68        2: logging.DEBUG,
69    }
70
71    handler = logging.StreamHandler()
72    formatter = logging.Formatter('%(levelname)-8s %(message)s')
73    handler.setFormatter(formatter)
74    logger = logging.getLogger()
75    logger.addHandler(handler)
76    # Unknown logging level is treated as DEBUG
77    logging_level = VERBOSITY_MAP.get(level, logging.DEBUG)
78    logger.setLevel(logging_level)
79    # Use module-level _logger instance to validate it
80    _logger.debug("Logging initialized to level %s", logging_level)
81
82
83def initialize_options(arguments: Optional[List[str]] = None) -> None:
84    """Load config options and store them inside options module."""
85    new_options = cli.get_config(arguments or [])
86    new_options.cwd = pathlib.Path.cwd()
87
88    if new_options.version:
89        ansible_version, err = check_ansible_presence()
90        print(
91            'ansible-lint {ver!s} using ansible {ansible_ver!s}'.format(
92                ver=__version__, ansible_ver=ansible_version
93            )
94        )
95        if err:
96            _logger.error(err)
97            sys.exit(ANSIBLE_MISSING_RC)
98        sys.exit(0)
99
100    if new_options.colored is None:
101        new_options.colored = should_do_markup()
102
103    # persist loaded configuration inside options module
104    for k, v in new_options.__dict__.items():
105        setattr(options, k, v)
106
107    # rename deprecated ids/tags to newer names
108    options.tags = [normalize_tag(tag) for tag in options.tags]
109    options.skip_list = [normalize_tag(tag) for tag in options.skip_list]
110    options.warn_list = [normalize_tag(tag) for tag in options.warn_list]
111
112    options.configured = True
113    # 6 chars of entropy should be enough
114    cache_key = hashlib.sha256(
115        os.path.abspath(options.project_dir).encode()
116    ).hexdigest()[:6]
117    options.cache_dir = "%s/ansible-lint/%s" % (
118        os.getenv("XDG_CACHE_HOME", os.path.expanduser("~/.cache")),
119        cache_key,
120    )
121
122
123def report_outcome(  # noqa: C901
124    result: "LintResult", options: Namespace, mark_as_success: bool = False
125) -> int:
126    """Display information about how to skip found rules.
127
128    Returns exit code, 2 if errors were found, 0 when only warnings were found.
129    """
130    failures = 0
131    warnings = 0
132    msg = ""
133    matches_unignored = [match for match in result.matches if not match.ignored]
134
135    # counting
136    matched_rules = {match.rule.id: match.rule for match in matches_unignored}
137    for match in result.matches:
138        if {match.rule.id, *match.rule.tags}.isdisjoint(options.warn_list):
139            failures += 1
140        else:
141            warnings += 1
142
143    # remove unskippable rules from the list
144    for rule_id in list(matched_rules.keys()):
145        if 'unskippable' in matched_rules[rule_id].tags:
146            matched_rules.pop(rule_id)
147
148    entries = []
149    for key in sorted(matched_rules.keys()):
150        if {key, *matched_rules[key].tags}.isdisjoint(options.warn_list):
151            entries.append(f"  - {key}  # {matched_rules[key].shortdesc}\n")
152    for match in result.matches:
153        if "experimental" in match.rule.tags:
154            entries.append("  - experimental  # all rules tagged as experimental\n")
155            break
156    if entries and not options.quiet:
157        console_stderr.print(
158            "You can skip specific rules or tags by adding them to your "
159            "configuration file:"
160        )
161        msg += """\
162# .ansible-lint
163warn_list:  # or 'skip_list' to silence them completely
164"""
165        msg += "".join(sorted(entries))
166
167    # Do not deprecate the old tags just yet. Why? Because it is not currently feasible
168    # to migrate old tags to new tags. There are a lot of things out there that still
169    # use ansible-lint 4 (for example, Ansible Galaxy and Automation Hub imports). If we
170    # replace the old tags, those tools will report warnings. If we do not replace them,
171    # ansible-lint 5 will report warnings.
172    #
173    # We can do the deprecation once the ecosystem caught up at least a bit.
174    # for k, v in used_old_tags.items():
175    #     _logger.warning(
176    #         "Replaced deprecated tag '%s' with '%s' but it will become an "
177    #         "error in the future.",
178    #         k,
179    #         v,
180    #     )
181
182    if result.matches and not options.quiet:
183        console_stderr.print(render_yaml(msg))
184        console_stderr.print(
185            f"Finished with {failures} failure(s), {warnings} warning(s) "
186            f"on {len(result.files)} files."
187        )
188
189    if mark_as_success or not failures:
190        return 0
191    return 2
192
193
194def main(argv: Optional[List[str]] = None) -> int:
195    """Linter CLI entry point."""
196    if argv is None:
197        argv = sys.argv
198    initialize_options(argv[1:])
199
200    console_options["force_terminal"] = options.colored
201    reconfigure(console_options)
202
203    initialize_logger(options.verbosity)
204    _logger.debug("Options: %s", options)
205    _logger.debug(os.getcwd())
206
207    app = App(options=options)
208
209    prepare_environment()
210    check_ansible_presence(exit_on_error=True)
211
212    # On purpose lazy-imports to avoid pre-loading Ansible
213    # pylint: disable=import-outside-toplevel
214    from ansiblelint.generate_docs import rules_as_rich, rules_as_rst, rules_as_str
215    from ansiblelint.rules import RulesCollection
216
217    rules = RulesCollection(options.rulesdirs)
218
219    if options.listrules:
220
221        _rule_format_map: Dict[str, Callable[..., Any]] = {
222            'plain': rules_as_str,
223            'rich': rules_as_rich,
224            'rst': rules_as_rst,
225        }
226
227        console.print(_rule_format_map[options.format](rules), highlight=False)
228        return 0
229
230    if options.listtags:
231        console.print(render_yaml(rules.listtags()))
232        return 0
233
234    if isinstance(options.tags, str):
235        options.tags = options.tags.split(',')
236
237    from ansiblelint.runner import _get_matches
238
239    result = _get_matches(rules, options)
240
241    mark_as_success = False
242    if result.matches and options.progressive:
243        _logger.info(
244            "Matches found, running again on previous revision in order to detect regressions"
245        )
246        with _previous_revision():
247            _logger.debug("Options: %s", options)
248            _logger.debug(os.getcwd())
249            old_result = _get_matches(rules, options)
250            # remove old matches from current list
251            matches_delta = list(set(result.matches) - set(old_result.matches))
252            if len(matches_delta) == 0:
253                _logger.warning(
254                    "Total violations not increased since previous "
255                    "commit, will mark result as success. (%s -> %s)",
256                    len(old_result.matches),
257                    len(matches_delta),
258                )
259                mark_as_success = True
260
261            ignored = 0
262            for match in result.matches:
263                # if match is not new, mark is as ignored
264                if match not in matches_delta:
265                    match.ignored = True
266                    ignored += 1
267            if ignored:
268                _logger.warning(
269                    "Marked %s previously known violation(s) as ignored due to"
270                    " progressive mode.",
271                    ignored,
272                )
273
274    app.render_matches(result.matches)
275
276    return report_outcome(result, mark_as_success=mark_as_success, options=options)
277
278
279@contextmanager
280def _previous_revision() -> Iterator[None]:
281    """Create or update a temporary workdir containing the previous revision."""
282    worktree_dir = f"{options.cache_dir}/old-rev"
283    # Update options.exclude_paths to include use the temporary workdir.
284    rel_exclude_paths = [normpath(p) for p in options.exclude_paths]
285    options.exclude_paths = [abspath(p, worktree_dir) for p in rel_exclude_paths]
286    revision = subprocess.run(
287        ["git", "rev-parse", "HEAD^1"],
288        check=True,
289        universal_newlines=True,
290        stdout=subprocess.PIPE,
291        stderr=subprocess.DEVNULL,
292    ).stdout
293    p = pathlib.Path(worktree_dir)
294    p.mkdir(parents=True, exist_ok=True)
295    os.system(f"git worktree add -f {worktree_dir} 2>/dev/null")
296    try:
297        with cwd(worktree_dir):
298            os.system(f"git checkout {revision}")
299            yield
300    finally:
301        options.exclude_paths = [abspath(p, os.getcwd()) for p in rel_exclude_paths]
302
303
304def _run_cli_entrypoint() -> None:
305    """Invoke the main entrypoint with current CLI args.
306
307    This function also processes the runtime exceptions.
308    """
309    try:
310        sys.exit(main(sys.argv))
311    except IOError as exc:
312        # NOTE: Only "broken pipe" is acceptable to ignore
313        if exc.errno != errno.EPIPE:
314            raise
315    except KeyboardInterrupt:
316        sys.exit(EXIT_CONTROL_C_RC)
317    except RuntimeError as e:
318        raise SystemExit(str(e))
319
320
321if __name__ == "__main__":
322    _run_cli_entrypoint()
323