1"""Utilities for configuring ansible runtime environment."""
2import json
3import logging
4import os
5import pathlib
6import re
7import subprocess
8import sys
9from functools import lru_cache
10from typing import Any, Dict, List, Optional, Tuple, Type, Union
11
12import packaging
13import tenacity
14from packaging import version
15
16from ansiblelint.config import (
17    ansible_collections_path,
18    collection_list,
19    options,
20    parse_ansible_version,
21)
22from ansiblelint.constants import (
23    ANSIBLE_DEFAULT_ROLES_PATH,
24    ANSIBLE_MIN_VERSION,
25    ANSIBLE_MISSING_RC,
26    ANSIBLE_MOCKED_MODULE,
27    INVALID_CONFIG_RC,
28    INVALID_PREREQUISITES_RC,
29)
30from ansiblelint.loaders import yaml_from_file
31
32_logger = logging.getLogger(__name__)
33
34
35def check_ansible_presence(exit_on_error: bool = False) -> Tuple[str, str]:
36    """Assures we stop execution if Ansible is missing or outdated.
37
38    Returne found version and an optional exception if something wrong
39    was detected.
40    """
41
42    @lru_cache()
43    def _get_ver_err() -> Tuple[str, str]:
44
45        err = ""
46        failed = False
47        ver = ""
48        result = subprocess.run(
49            args=["ansible", "--version"],
50            stdout=subprocess.PIPE,
51            universal_newlines=True,
52            check=False,
53        )
54        if result.returncode != 0:
55            return (
56                ver,
57                "FATAL: Unable to retrieve ansible cli version: %s" % result.stdout,
58            )
59
60        ver, error = parse_ansible_version(result.stdout)
61        if error is not None:
62            return "", error
63        try:
64            # pylint: disable=import-outside-toplevel
65            from ansible.release import __version__ as ansible_module_version
66
67            if version.parse(ansible_module_version) < version.parse(
68                ANSIBLE_MIN_VERSION
69            ):
70                failed = True
71        except (ImportError, ModuleNotFoundError) as e:
72            failed = True
73            ansible_module_version = "none"
74            err += f"{e}\n"
75        if failed:
76            err += (
77                "FATAL: ansible-lint requires a version of Ansible package"
78                " >= %s, but %s was found. "
79                "Please install a compatible version using the same python interpreter. See "
80                "https://docs.ansible.com/ansible/latest/installation_guide"
81                "/intro_installation.html#installing-ansible-with-pip"
82                % (ANSIBLE_MIN_VERSION, ansible_module_version)
83            )
84
85        elif ver != ansible_module_version:
86            err = (
87                f"FATAL: Ansible CLI ({ver}) and python module"
88                f" ({ansible_module_version}) versions do not match. This "
89                "indicates a broken execution environment."
90            )
91        return ver, err
92
93    ver, err = _get_ver_err()
94    if exit_on_error and err:
95        _logger.error(err)
96        sys.exit(ANSIBLE_MISSING_RC)
97    return ver, err
98
99
100def install_collection(collection: str, destination: Optional[str] = None) -> None:
101    """Install an Ansible collection.
102
103    Can accept version constraints like 'foo.bar:>=1.2.3'
104    """
105    cmd = [
106        "ansible-galaxy",
107        "collection",
108        "install",
109        "--force",  # required for ansible 2.9
110        "-v",
111    ]
112    if destination:
113        cmd.extend(["-p", destination])
114    cmd.append(f"{collection}")
115
116    _logger.info("Running %s", " ".join(cmd))
117    run = subprocess.run(
118        cmd,
119        universal_newlines=True,
120        check=False,
121        stdout=subprocess.PIPE,
122        stderr=subprocess.STDOUT,
123    )
124    if run.returncode != 0:
125        _logger.error("Command returned %s code:\n%s", run.returncode, run.stdout)
126        sys.exit(INVALID_PREREQUISITES_RC)
127
128
129@tenacity.retry(  # Retry up to 3 times as galaxy server can return errors
130    reraise=True,
131    wait=tenacity.wait_fixed(30),  # type: ignore
132    stop=tenacity.stop_after_attempt(3),  # type: ignore
133    before_sleep=tenacity.after_log(_logger, logging.WARNING),  # type: ignore
134)
135def install_requirements(requirement: str) -> None:
136    """Install dependencies from a requirements.yml."""
137    if not os.path.exists(requirement):
138        return
139
140    cmd = [
141        "ansible-galaxy",
142        "role",
143        "install",
144        "--force",  # required for ansible 2.9
145        "--roles-path",
146        f"{options.cache_dir}/roles",
147        "-vr",
148        f"{requirement}",
149    ]
150
151    _logger.info("Running %s", " ".join(cmd))
152    run = subprocess.run(
153        cmd,
154        universal_newlines=True,
155        check=False,
156        stdout=subprocess.PIPE,
157        stderr=subprocess.STDOUT,
158    )
159    if run.returncode != 0:
160        _logger.error(run.stdout)
161        raise RuntimeError(run.returncode)
162
163    # Run galaxy collection install works on v2 requirements.yml
164    if "collections" in yaml_from_file(requirement):
165
166        cmd = [
167            "ansible-galaxy",
168            "collection",
169            "install",
170            "--force",  # required for ansible 2.9
171            "-p",
172            f"{options.cache_dir}/collections",
173            "-vr",
174            f"{requirement}",
175        ]
176
177        _logger.info("Running %s", " ".join(cmd))
178        run = subprocess.run(
179            cmd,
180            universal_newlines=True,
181            check=False,
182            stdout=subprocess.PIPE,
183            stderr=subprocess.STDOUT,
184        )
185        if run.returncode != 0:
186            _logger.error(run.stdout)
187            raise RuntimeError(run.returncode)
188
189
190def prepare_environment(required_collections: Optional[Dict[str, str]] = None) -> None:
191    """Make dependencies available if needed."""
192    if not options.configured:
193        # Allow method to be used without calling the command line, so we can
194        # reuse it in other tools, like molecule.
195        # pylint: disable=import-outside-toplevel,cyclic-import
196        from ansiblelint.__main__ import initialize_options
197
198        initialize_options()
199
200    if not options.offline:
201        install_requirements("requirements.yml")
202        for req in pathlib.Path(".").glob("molecule/*/requirements.yml"):
203            install_requirements(str(req))
204
205    if required_collections:
206        for name, min_version in required_collections.items():
207            install_collection(
208                f"{name}:>={min_version}",
209                destination=f"{options.cache_dir}/collections"
210                if options.cache_dir
211                else None,
212            )
213
214    _install_galaxy_role()
215    _perform_mockings()
216    _prepare_ansible_paths()
217
218
219def _get_galaxy_role_ns(galaxy_infos: Dict[str, Any]) -> str:
220    """Compute role namespace from meta/main.yml, including trailing dot."""
221    role_namespace = galaxy_infos.get('namespace', "")
222    if len(role_namespace) == 0:
223        role_namespace = galaxy_infos.get('author', "")
224    # if there's a space in the name space, it's likely author name
225    # and not the galaxy login, so act as if there was no namespace
226    if re.match(r"^\w+ \w+", role_namespace):
227        role_namespace = ""
228    else:
229        role_namespace = f"{role_namespace}."
230    if not isinstance(role_namespace, str):
231        raise RuntimeError("Role namespace must be string, not %s" % role_namespace)
232    return role_namespace
233
234
235def _get_galaxy_role_name(galaxy_infos: Dict[str, Any]) -> str:
236    """Compute role name from meta/main.yml."""
237    return galaxy_infos.get('role_name', "")
238
239
240def _get_role_fqrn(galaxy_infos: Dict[str, Any]) -> str:
241    """Compute role fqrn."""
242    role_namespace = _get_galaxy_role_ns(galaxy_infos)
243    role_name = _get_galaxy_role_name(galaxy_infos)
244    if len(role_name) == 0:
245        role_name = pathlib.Path(".").absolute().name
246        role_name = re.sub(r'(ansible-|ansible-role-)', '', role_name)
247
248    return f"{role_namespace}{role_name}"
249
250
251def _install_galaxy_role() -> None:
252    """Detect standalone galaxy role and installs it."""
253    if not os.path.exists("meta/main.yml"):
254        return
255    yaml = yaml_from_file("meta/main.yml")
256    if 'galaxy_info' not in yaml:
257        return
258
259    fqrn = _get_role_fqrn(yaml['galaxy_info'])
260
261    if 'role-name' not in options.skip_list:
262        if not re.match(r"[a-z0-9][a-z0-9_]+\.[a-z][a-z0-9_]+$", fqrn):
263            msg = (
264                """\
265Computed fully qualified role name of %s does not follow current galaxy requirements.
266Please edit meta/main.yml and assure we can correctly determine full role name:
267
268galaxy_info:
269role_name: my_name  # if absent directory name hosting role is used instead
270namespace: my_galaxy_namespace  # if absent, author is used instead
271
272Namespace: https://galaxy.ansible.com/docs/contributing/namespaces.html#galaxy-namespace-limitations
273Role: https://galaxy.ansible.com/docs/contributing/creating_role.html#role-names
274
275As an alternative, you can add 'role-name' to either skip_list or warn_list.
276"""
277                % fqrn
278            )
279            if 'role-name' in options.warn_list:
280                _logger.warning(msg)
281            else:
282                _logger.error(msg)
283                sys.exit(INVALID_PREREQUISITES_RC)
284    else:
285        # when 'role-name' is in skip_list, we stick to plain role names
286        if 'role_name' in yaml['galaxy_info']:
287            role_namespace = _get_galaxy_role_ns(yaml['galaxy_info'])
288            role_name = _get_galaxy_role_name(yaml['galaxy_info'])
289            fqrn = f"{role_namespace}{role_name}"
290        else:
291            fqrn = pathlib.Path(".").absolute().name
292    p = pathlib.Path(f"{options.cache_dir}/roles")
293    p.mkdir(parents=True, exist_ok=True)
294    link_path = p / fqrn
295    # despite documentation stating that is_file() reports true for symlinks,
296    # it appears that is_dir() reports true instead, so we rely on exits().
297    target = pathlib.Path(options.project_dir).absolute()
298    if not link_path.exists() or os.readlink(link_path) != str(target):
299        if link_path.exists():
300            link_path.unlink()
301        link_path.symlink_to(target, target_is_directory=True)
302    _logger.info(
303        "Using %s symlink to current repository in order to enable Ansible to find the role using its expected full name.",
304        link_path,
305    )
306
307
308def _prepare_ansible_paths() -> None:
309    """Configure Ansible environment variables."""
310    library_paths: List[str] = []
311    roles_path: List[str] = []
312
313    for path_list, path in (
314        (library_paths, "plugins/modules"),
315        (library_paths, f"{options.cache_dir}/modules"),
316        (collection_list, f"{options.cache_dir}/collections"),
317        (roles_path, "roles"),
318        (roles_path, f"{options.cache_dir}/roles"),
319    ):
320        if path not in path_list and os.path.exists(path):
321            path_list.append(path)
322
323    _update_env('ANSIBLE_LIBRARY', library_paths)
324    _update_env(ansible_collections_path(), collection_list)
325    _update_env('ANSIBLE_ROLES_PATH', roles_path, default=ANSIBLE_DEFAULT_ROLES_PATH)
326
327
328def _make_module_stub(module_name: str) -> None:
329    # a.b.c is treated a collection
330    if re.match(r"^(\w+|\w+\.\w+\.[\.\w]+)$", module_name):
331        parts = module_name.split(".")
332        if len(parts) < 3:
333            path = f"{options.cache_dir}/modules"
334            module_file = f"{options.cache_dir}/modules/{module_name}.py"
335            namespace = None
336            collection = None
337        else:
338            namespace = parts[0]
339            collection = parts[1]
340            path = f"{ options.cache_dir }/collections/ansible_collections/{ namespace }/{ collection }/plugins/modules/{ '/'.join(parts[2:-1]) }"
341            module_file = f"{path}/{parts[-1]}.py"
342        os.makedirs(path, exist_ok=True)
343        _write_module_stub(
344            filename=module_file,
345            name=module_file,
346            namespace=namespace,
347            collection=collection,
348        )
349    else:
350        _logger.error("Config error: %s is not a valid module name.", module_name)
351        sys.exit(INVALID_CONFIG_RC)
352
353
354def _write_module_stub(
355    filename: str,
356    name: str,
357    namespace: Optional[str] = None,
358    collection: Optional[str] = None,
359) -> None:
360    """Write module stub to disk."""
361    body = ANSIBLE_MOCKED_MODULE.format(
362        name=name, collection=collection, namespace=namespace
363    )
364    with open(filename, "w") as f:
365        f.write(body)
366
367
368def _update_env(varname: str, value: List[str], default: str = "") -> None:
369    """Update colon based environment variable if needed. by appending."""
370    if value:
371        orig_value = os.environ.get(varname, default=default)
372        if orig_value:
373            # Prepend original or default variable content to custom content.
374            value = [*orig_value.split(':'), *value]
375        value_str = ":".join(value)
376        if value_str != os.environ.get(varname, ""):
377            os.environ[varname] = value_str
378            _logger.info("Added %s=%s", varname, value_str)
379
380
381def _perform_mockings() -> None:
382    """Mock modules and roles."""
383    for role_name in options.mock_roles:
384        if re.match(r"\w+\.\w+\.\w+$", role_name):
385            namespace, collection, role_dir = role_name.split(".")
386            path = f"{options.cache_dir}/collections/ansible_collections/{ namespace }/{ collection }/roles/{ role_dir }/"
387        else:
388            path = f"{options.cache_dir}/roles/{role_name}"
389        os.makedirs(path, exist_ok=True)
390
391    if options.mock_modules:
392        for module_name in options.mock_modules:
393            _make_module_stub(module_name)
394
395    # if inside a collection repo, symlink it to simulate its installed state
396    if not os.path.exists("galaxy.yml"):
397        return
398    yaml = yaml_from_file("galaxy.yml")
399    if not yaml:
400        # ignore empty galaxy.yml file
401        return
402    namespace = yaml.get('namespace', None)
403    collection = yaml.get('name', None)
404    if not namespace or not collection:
405        return
406    p = pathlib.Path(
407        f"{options.cache_dir}/collections/ansible_collections/{ namespace }"
408    )
409    p.mkdir(parents=True, exist_ok=True)
410    link_path = p / collection
411    target = pathlib.Path(options.project_dir).absolute()
412    if not link_path.exists() or os.readlink(link_path) != target:
413        if link_path.exists():
414            link_path.unlink()
415        link_path.symlink_to(target, target_is_directory=True)
416
417
418def ansible_config_get(key: str, kind: Type[Any] = str) -> Union[str, List[str], None]:
419    """Return configuration item from ansible config."""
420    env = os.environ.copy()
421    # Avoid possible ANSI garbage
422    env["ANSIBLE_FORCE_COLOR"] = "0"
423    # Avoid our own override as this prevents returning system paths.
424    colpathvar = ansible_collections_path()
425    if colpathvar in env:
426        env.pop(colpathvar)
427
428    config = subprocess.check_output(
429        ["ansible-config", "dump"], universal_newlines=True, env=env
430    )
431
432    if kind == str:
433        result = re.search(rf"^{key}.* = (.*)$", config, re.MULTILINE)
434        if result:
435            return result.groups()[0]
436    elif kind == list:
437        result = re.search(rf"^{key}.* = (\[.*\])$", config, re.MULTILINE)
438        if result:
439            val = eval(result.groups()[0])  # pylint: disable=eval-used
440            if not isinstance(val, list):
441                raise RuntimeError(f"Unexpected data read for {key}: {val}")
442            return val
443    else:
444        raise RuntimeError("Unknown data type.")
445    return None
446
447
448def require_collection(  # noqa: C901
449    name: str, version: Optional[str] = None, install: bool = True
450) -> None:
451    """Check if a minimal collection version is present or exits.
452
453    In the future this method may attempt to install a missing or outdated
454    collection before failing.
455    """
456    try:
457        ns, coll = name.split('.', 1)
458    except ValueError:
459        sys.exit("Invalid collection name supplied: %s" % name)
460
461    paths = ansible_config_get('COLLECTIONS_PATHS', list)
462    if not paths or not isinstance(paths, list):
463        sys.exit(f"Unable to determine ansible collection paths. ({paths})")
464
465    if options.cache_dir:
466        # if we have a cache dir, we want to be use that would be preferred
467        # destination when installing a missing collection
468        paths.insert(0, f"{options.cache_dir}/collections")
469
470    for path in paths:
471        collpath = os.path.join(path, 'ansible_collections', ns, coll)
472        if os.path.exists(collpath):
473            mpath = os.path.join(collpath, 'MANIFEST.json')
474            if not os.path.exists(mpath):
475                _logger.fatal(
476                    "Found collection at '%s' but missing MANIFEST.json, cannot get info.",
477                    collpath,
478                )
479                sys.exit(INVALID_PREREQUISITES_RC)
480
481            with open(mpath, 'r') as f:
482                manifest = json.loads(f.read())
483                found_version = packaging.version.parse(
484                    manifest['collection_info']['version']
485                )
486                if version and found_version < packaging.version.parse(version):
487                    if install:
488                        install_collection(f"{name}:>={version}")
489                        require_collection(name, version, install=False)
490                    else:
491                        _logger.fatal(
492                            "Found %s collection %s but %s or newer is required.",
493                            name,
494                            found_version,
495                            version,
496                        )
497                        sys.exit(INVALID_PREREQUISITES_RC)
498            break
499    else:
500        if install:
501            install_collection(f"{name}:>={version}")
502            require_collection(name, version, install=False)
503        else:
504            _logger.fatal("Collection '%s' not found in '%s'", name, paths)
505            sys.exit(INVALID_PREREQUISITES_RC)
506