1# -*- coding: utf-8 -*-
2"""CLI parser setup and helpers."""
3import argparse
4import logging
5import os
6import sys
7from argparse import Namespace
8from pathlib import Path
9from typing import Any, Dict, List, Optional, Sequence, Union
10
11import yaml
12
13from ansiblelint.config import DEFAULT_KINDS
14from ansiblelint.constants import (
15    CUSTOM_RULESDIR_ENVVAR,
16    DEFAULT_RULESDIR,
17    INVALID_CONFIG_RC,
18)
19from ansiblelint.file_utils import (
20    abspath,
21    expand_path_vars,
22    guess_project_dir,
23    normpath,
24)
25
26_logger = logging.getLogger(__name__)
27_PATH_VARS = [
28    'exclude_paths',
29    'rulesdir',
30]
31
32
33def expand_to_normalized_paths(
34    config: Dict[str, Any], base_dir: Optional[str] = None
35) -> None:
36    """Mutate given config normalizing any path values in it."""
37    # config can be None (-c /dev/null)
38    if not config:
39        return
40    base_dir = base_dir or os.getcwd()
41    for paths_var in _PATH_VARS:
42        if paths_var not in config:
43            continue  # Cause we don't want to add a variable not present
44
45        normalized_paths = []
46        for path in config.pop(paths_var):
47            normalized_path = abspath(expand_path_vars(path), base_dir=base_dir)
48
49            normalized_paths.append(normalized_path)
50
51        config[paths_var] = normalized_paths
52
53
54def load_config(config_file: str) -> Dict[Any, Any]:
55    """Load configuration from disk."""
56    config_path = None
57    if config_file:
58        config_path = os.path.abspath(config_file)
59        if not os.path.exists(config_path):
60            _logger.error("Config file not found '%s'", config_path)
61            sys.exit(INVALID_CONFIG_RC)
62    config_path = config_path or get_config_path()
63    if not config_path or not os.path.exists(config_path):
64        # a missing default config file should not trigger an error
65        return {}
66
67    try:
68        with open(config_path, "r") as stream:
69            config = yaml.safe_load(stream)
70            if not isinstance(config, dict):
71                _logger.error("Invalid configuration file %s", config_path)
72                sys.exit(INVALID_CONFIG_RC)
73    except yaml.YAMLError as e:
74        _logger.error(e)
75        sys.exit(INVALID_CONFIG_RC)
76
77    config['config_file'] = config_path
78    # TODO(ssbarnea): implement schema validation for config file
79    if isinstance(config, list):
80        _logger.error(
81            "Invalid configuration '%s', expected YAML mapping in the config file.",
82            config_path,
83        )
84        sys.exit(INVALID_CONFIG_RC)
85
86    config_dir = os.path.dirname(config_path)
87    expand_to_normalized_paths(config, config_dir)
88
89    return config
90
91
92def get_config_path(config_file: str = '.ansible-lint') -> Optional[str]:
93    """Return local config file."""
94    project_filenames = [config_file]
95    parent = tail = os.getcwd()
96    while tail:
97        for project_filename in project_filenames:
98            filename = os.path.abspath(os.path.join(parent, project_filename))
99            if os.path.exists(filename):
100                return filename
101            if os.path.exists(os.path.abspath(os.path.join(parent, '.git'))):
102                # Avoid looking outside .git folders as we do not want endup
103                # picking config files from upper level projects if current
104                # project has no config.
105                return None
106        (parent, tail) = os.path.split(parent)
107    return None
108
109
110class AbspathArgAction(argparse.Action):
111    def __call__(
112        self,
113        parser: argparse.ArgumentParser,
114        namespace: Namespace,
115        values: Union[str, Sequence[Any], None],
116        option_string: Optional[str] = None,
117    ) -> None:
118        if isinstance(values, (str, Path)):
119            values = [values]
120        if values:
121            normalized_values = [
122                Path(expand_path_vars(str(path))).resolve() for path in values
123            ]
124            previous_values = getattr(namespace, self.dest, [])
125            setattr(namespace, self.dest, previous_values + normalized_values)
126
127
128def get_cli_parser() -> argparse.ArgumentParser:
129    """Initialize an argument parser."""
130    parser = argparse.ArgumentParser()
131
132    parser.add_argument(
133        '-L',
134        dest='listrules',
135        default=False,
136        action='store_true',
137        help="list all the rules",
138    )
139    parser.add_argument(
140        '-f',
141        dest='format',
142        default='rich',
143        choices=['rich', 'plain', 'rst', 'codeclimate', 'quiet', 'pep8'],
144        help="Format used rules output, (default: %(default)s)",
145    )
146    parser.add_argument(
147        '-q',
148        dest='quiet',
149        default=0,
150        action='count',
151        help="quieter, reduce verbosity, can be specified twice.",
152    )
153    parser.add_argument(
154        '-p',
155        dest='parseable',
156        default=False,
157        action='store_true',
158        help="parseable output, same as '-f pep8'",
159    )
160    parser.add_argument(
161        '--parseable-severity',
162        dest='parseable_severity',
163        default=False,
164        action='store_true',
165        help="parseable output including severity of rule",
166    )
167    parser.add_argument(
168        '--progressive',
169        dest='progressive',
170        default=False,
171        action='store_true',
172        help="Return success if it detects a reduction in number"
173        " of violations compared with previous git commit. This "
174        "feature works only in git repositories.",
175    )
176    parser.add_argument(
177        '--project-dir',
178        dest='project_dir',
179        default=".",
180        help="Location of project/repository, autodetected based on location "
181        " of configuration file.",
182    )
183    parser.add_argument(
184        '-r',
185        action=AbspathArgAction,
186        dest='rulesdir',
187        default=[],
188        type=Path,
189        help="Specify custom rule directories. Add -R "
190        f"to keep using embedded rules from {DEFAULT_RULESDIR}",
191    )
192    parser.add_argument(
193        '-R',
194        action='store_true',
195        default=False,
196        dest='use_default_rules',
197        help="Keep default rules when using -r",
198    )
199    parser.add_argument(
200        '--show-relpath',
201        dest='display_relative_path',
202        action='store_false',
203        default=True,
204        help="Display path relative to CWD",
205    )
206    parser.add_argument(
207        '-t',
208        dest='tags',
209        action='append',
210        default=[],
211        help="only check rules whose id/tags match these values",
212    )
213    parser.add_argument(
214        '-T', dest='listtags', action='store_true', help="list all the tags"
215    )
216    parser.add_argument(
217        '-v',
218        dest='verbosity',
219        action='count',
220        help="Increase verbosity level (-vv for more)",
221        default=0,
222    )
223    parser.add_argument(
224        '-x',
225        dest='skip_list',
226        default=[],
227        action='append',
228        help="only check rules whose id/tags do not " "match these values",
229    )
230    parser.add_argument(
231        '-w',
232        dest='warn_list',
233        default=[],
234        action='append',
235        help="only warn about these rules, unless overridden in "
236        "config file defaults to 'experimental'",
237    )
238    parser.add_argument(
239        '--enable-list',
240        dest='enable_list',
241        default=[],
242        action='append',
243        help="activate optional rules by their tag name",
244    )
245    # Do not use store_true/store_false because they create opposite defaults.
246    parser.add_argument(
247        '--nocolor',
248        dest='colored',
249        action='store_const',
250        const=False,
251        help="disable colored output, same as NO_COLOR=1",
252    )
253    parser.add_argument(
254        '--force-color',
255        dest='colored',
256        action='store_const',
257        const=True,
258        help="Force colored output, same as FORCE_COLOR=1",
259    )
260    parser.add_argument(
261        '--exclude',
262        dest='exclude_paths',
263        action=AbspathArgAction,
264        type=Path,
265        default=[],
266        help='path to directories or files to skip. ' 'This option is repeatable.',
267    )
268    parser.add_argument(
269        '-c',
270        dest='config_file',
271        help='Specify configuration file to use.  ' 'Defaults to ".ansible-lint"',
272    )
273    parser.add_argument(
274        '--offline',
275        dest='offline',
276        action='store_const',
277        const=True,
278        help='Disable installation of requirements.yml',
279    )
280    parser.add_argument(
281        '--version',
282        action='store_true',
283    )
284    parser.add_argument(
285        dest='lintables',
286        nargs='*',
287        help="One or more files or paths. When missing it will "
288        " enable auto-detection mode.",
289    )
290
291    return parser
292
293
294def merge_config(file_config: Dict[Any, Any], cli_config: Namespace) -> Namespace:
295    """Combine the file config with the CLI args."""
296    bools = (
297        'display_relative_path',
298        'parseable',
299        'parseable_severity',
300        'quiet',
301        'use_default_rules',
302        'progressive',
303        'offline',
304    )
305    # maps lists to their default config values
306    lists_map = {
307        'exclude_paths': [".cache", ".git", ".hg", ".svn", ".tox"],
308        'rulesdir': [],
309        'skip_list': [],
310        'tags': [],
311        'warn_list': ['experimental', 'role-name'],
312        'mock_modules': [],
313        'mock_roles': [],
314        'enable_list': [],
315    }
316
317    scalar_map = {
318        "loop_var_prefix": None,
319        "project_dir": ".",
320    }
321
322    if not file_config:
323        # use defaults if we don't have a config file and the commandline
324        # parameter is not set
325        for entry, default in lists_map.items():
326            if not getattr(cli_config, entry, None):
327                setattr(cli_config, entry, default)
328        return cli_config
329
330    for entry in bools:
331        x = getattr(cli_config, entry) or file_config.pop(entry, False)
332        setattr(cli_config, entry, x)
333
334    for entry, default in scalar_map.items():
335        x = getattr(cli_config, entry, None) or file_config.pop(entry, default)
336        setattr(cli_config, entry, x)
337
338    # if either commandline parameter or config file option is set merge
339    # with the other, if neither is set use the default
340    for entry, default in lists_map.items():
341        if getattr(cli_config, entry, None) or entry in file_config.keys():
342            value = getattr(cli_config, entry, [])
343            value.extend(file_config.pop(entry, []))
344        else:
345            value = default
346        setattr(cli_config, entry, value)
347
348    if 'verbosity' in file_config:
349        cli_config.verbosity = cli_config.verbosity + file_config.pop('verbosity')
350
351    # merge options that can be set only via a file config
352    for entry, value in file_config.items():
353        setattr(cli_config, entry, value)
354
355    # append default kinds to the custom list
356    kinds = file_config.get('kinds', [])
357    kinds.extend(DEFAULT_KINDS)
358    setattr(cli_config, 'kinds', kinds)
359
360    return cli_config
361
362
363def get_config(arguments: List[str]) -> Namespace:
364    """Extract the config based on given args."""
365    parser = get_cli_parser()
366    options = parser.parse_args(arguments)
367
368    file_config = load_config(options.config_file)
369
370    config = merge_config(file_config, options)
371
372    options.rulesdirs = get_rules_dirs(options.rulesdir, options.use_default_rules)
373
374    if options.project_dir == ".":
375        project_dir = guess_project_dir(options.config_file)
376        options.project_dir = normpath(project_dir)
377    if not options.project_dir or not os.path.exists(options.project_dir):
378        raise RuntimeError(
379            f"Failed to determine a valid project_dir: {options.project_dir}"
380        )
381
382    # Compute final verbosity level by subtracting -q counter.
383    options.verbosity -= options.quiet
384    return config
385
386
387def print_help(file: Any = sys.stdout) -> None:
388    """Print help test to the given stream."""
389    get_cli_parser().print_help(file=file)
390
391
392def get_rules_dirs(rulesdir: List[str], use_default: bool = True) -> List[str]:
393    """Return a list of rules dirs."""
394    default_ruledirs = [DEFAULT_RULESDIR]
395    default_custom_rulesdir = os.environ.get(
396        CUSTOM_RULESDIR_ENVVAR, os.path.join(DEFAULT_RULESDIR, "custom")
397    )
398    custom_ruledirs = sorted(
399        str(rdir.resolve())
400        for rdir in Path(default_custom_rulesdir).iterdir()
401        if rdir.is_dir() and (rdir / "__init__.py").exists()
402    )
403
404    if use_default:
405        return rulesdir + custom_ruledirs + default_ruledirs
406
407    return rulesdir or custom_ruledirs + default_ruledirs
408