1"""Common utility code that depends on CommonConfig."""
2from __future__ import (absolute_import, division, print_function)
3__metaclass__ = type
4
5import atexit
6import contextlib
7import json
8import os
9import shutil
10import sys
11import tempfile
12import textwrap
13
14from . import types as t
15
16from .util import (
17    common_environment,
18    COVERAGE_CONFIG_NAME,
19    display,
20    find_python,
21    remove_tree,
22    MODE_DIRECTORY,
23    MODE_FILE_EXECUTE,
24    PYTHON_PATHS,
25    raw_command,
26    to_bytes,
27    ANSIBLE_TEST_DATA_ROOT,
28    make_dirs,
29    ApplicationError,
30)
31
32from .data import (
33    data_context,
34)
35
36from .provider.layout import (
37    LayoutMessages,
38)
39
40
41class ResultType:
42    """Test result type."""
43    BOT = None  # type: ResultType
44    COVERAGE = None  # type: ResultType
45    DATA = None  # type: ResultType
46    JUNIT = None  # type: ResultType
47    LOGS = None  # type: ResultType
48    REPORTS = None  # type: ResultType
49    TMP = None   # type: ResultType
50
51    @staticmethod
52    def _populate():
53        ResultType.BOT = ResultType('bot')
54        ResultType.COVERAGE = ResultType('coverage')
55        ResultType.DATA = ResultType('data')
56        ResultType.JUNIT = ResultType('junit')
57        ResultType.LOGS = ResultType('logs')
58        ResultType.REPORTS = ResultType('reports')
59        ResultType.TMP = ResultType('.tmp')
60
61    def __init__(self, name):  # type: (str) -> None
62        self.name = name
63
64    @property
65    def relative_path(self):  # type: () -> str
66        """The content relative path to the results."""
67        return os.path.join(data_context().content.results_path, self.name)
68
69    @property
70    def path(self):  # type: () -> str
71        """The absolute path to the results."""
72        return os.path.join(data_context().content.root, self.relative_path)
73
74    def __str__(self):  # type: () -> str
75        return self.name
76
77
78# noinspection PyProtectedMember
79ResultType._populate()  # pylint: disable=protected-access
80
81
82class CommonConfig:
83    """Configuration common to all commands."""
84    def __init__(self, args, command):
85        """
86        :type args: any
87        :type command: str
88        """
89        self.command = command
90
91        self.color = args.color  # type: bool
92        self.explain = args.explain  # type: bool
93        self.verbosity = args.verbosity  # type: int
94        self.debug = args.debug  # type: bool
95        self.truncate = args.truncate  # type: int
96        self.redact = args.redact  # type: bool
97
98        self.cache = {}
99
100    def get_ansible_config(self):  # type: () -> str
101        """Return the path to the Ansible config for the given config."""
102        return os.path.join(ANSIBLE_TEST_DATA_ROOT, 'ansible.cfg')
103
104
105def handle_layout_messages(messages):  # type: (t.Optional[LayoutMessages]) -> None
106    """Display the given layout messages."""
107    if not messages:
108        return
109
110    for message in messages.info:
111        display.info(message, verbosity=1)
112
113    for message in messages.warning:
114        display.warning(message)
115
116    if messages.error:
117        raise ApplicationError('\n'.join(messages.error))
118
119
120@contextlib.contextmanager
121def named_temporary_file(args, prefix, suffix, directory, content):
122    """
123    :param args: CommonConfig
124    :param prefix: str
125    :param suffix: str
126    :param directory: str
127    :param content: str | bytes | unicode
128    :rtype: str
129    """
130    if args.explain:
131        yield os.path.join(directory, '%stemp%s' % (prefix, suffix))
132    else:
133        with tempfile.NamedTemporaryFile(prefix=prefix, suffix=suffix, dir=directory) as tempfile_fd:
134            tempfile_fd.write(to_bytes(content))
135            tempfile_fd.flush()
136
137            yield tempfile_fd.name
138
139
140def write_json_test_results(category, name, content):  # type: (ResultType, str, t.Union[t.List[t.Any], t.Dict[str, t.Any]]) -> None
141    """Write the given json content to the specified test results path, creating directories as needed."""
142    path = os.path.join(category.path, name)
143    write_json_file(path, content, create_directories=True)
144
145
146def write_text_test_results(category, name, content):  # type: (ResultType, str, str) -> None
147    """Write the given text content to the specified test results path, creating directories as needed."""
148    path = os.path.join(category.path, name)
149    write_text_file(path, content, create_directories=True)
150
151
152def write_json_file(path, content, create_directories=False):  # type: (str, t.Union[t.List[t.Any], t.Dict[str, t.Any]], bool) -> None
153    """Write the given json content to the specified path, optionally creating missing directories."""
154    text_content = json.dumps(content, sort_keys=True, indent=4) + '\n'
155    write_text_file(path, text_content, create_directories=create_directories)
156
157
158def write_text_file(path, content, create_directories=False):  # type: (str, str, bool) -> None
159    """Write the given text content to the specified path, optionally creating missing directories."""
160    if create_directories:
161        make_dirs(os.path.dirname(path))
162
163    with open(to_bytes(path), 'wb') as file:
164        file.write(to_bytes(content))
165
166
167def get_python_path(args, interpreter):
168    """
169    :type args: TestConfig
170    :type interpreter: str
171    :rtype: str
172    """
173    python_path = PYTHON_PATHS.get(interpreter)
174
175    if python_path:
176        return python_path
177
178    prefix = 'python-'
179    suffix = '-ansible'
180
181    root_temp_dir = '/tmp'
182
183    if args.explain:
184        return os.path.join(root_temp_dir, ''.join((prefix, 'temp', suffix)))
185
186    python_path = tempfile.mkdtemp(prefix=prefix, suffix=suffix, dir=root_temp_dir)
187    injected_interpreter = os.path.join(python_path, 'python')
188
189    # A symlink is faster than the execv wrapper, but isn't compatible with virtual environments.
190    # Attempt to detect when it is safe to use a symlink by checking the real path of the interpreter.
191    use_symlink = os.path.dirname(os.path.realpath(interpreter)) == os.path.dirname(interpreter)
192
193    if use_symlink:
194        display.info('Injecting "%s" as a symlink to the "%s" interpreter.' % (injected_interpreter, interpreter), verbosity=1)
195
196        os.symlink(interpreter, injected_interpreter)
197    else:
198        display.info('Injecting "%s" as a execv wrapper for the "%s" interpreter.' % (injected_interpreter, interpreter), verbosity=1)
199
200        create_interpreter_wrapper(interpreter, injected_interpreter)
201
202    os.chmod(python_path, MODE_DIRECTORY)
203
204    if not PYTHON_PATHS:
205        atexit.register(cleanup_python_paths)
206
207    PYTHON_PATHS[interpreter] = python_path
208
209    return python_path
210
211
212def create_temp_dir(prefix=None, suffix=None, base_dir=None):  # type: (t.Optional[str], t.Optional[str], t.Optional[str]) -> str
213    """Create a temporary directory that persists until the current process exits."""
214    temp_path = tempfile.mkdtemp(prefix=prefix or 'tmp', suffix=suffix or '', dir=base_dir)
215    atexit.register(remove_tree, temp_path)
216    return temp_path
217
218
219def create_interpreter_wrapper(interpreter, injected_interpreter):  # type: (str, str) -> None
220    """Create a wrapper for the given Python interpreter at the specified path."""
221    # sys.executable is used for the shebang to guarantee it is a binary instead of a script
222    # injected_interpreter could be a script from the system or our own wrapper created for the --venv option
223    shebang_interpreter = sys.executable
224
225    code = textwrap.dedent('''
226    #!%s
227
228    from __future__ import absolute_import
229
230    from os import execv
231    from sys import argv
232
233    python = '%s'
234
235    execv(python, [python] + argv[1:])
236    ''' % (shebang_interpreter, interpreter)).lstrip()
237
238    write_text_file(injected_interpreter, code)
239
240    os.chmod(injected_interpreter, MODE_FILE_EXECUTE)
241
242
243def cleanup_python_paths():
244    """Clean up all temporary python directories."""
245    for path in sorted(PYTHON_PATHS.values()):
246        display.info('Cleaning up temporary python directory: %s' % path, verbosity=2)
247        shutil.rmtree(path)
248
249
250def get_coverage_environment(args, target_name, version, temp_path, module_coverage, remote_temp_path=None):
251    """
252    :type args: TestConfig
253    :type target_name: str
254    :type version: str
255    :type temp_path: str
256    :type module_coverage: bool
257    :type remote_temp_path: str | None
258    :rtype: dict[str, str]
259    """
260    if temp_path:
261        # integration tests (both localhost and the optional testhost)
262        # config and results are in a temporary directory
263        coverage_config_base_path = temp_path
264        coverage_output_base_path = temp_path
265    elif args.coverage_config_base_path:
266        # unit tests, sanity tests and other special cases (localhost only)
267        # config is in a temporary directory
268        # results are in the source tree
269        coverage_config_base_path = args.coverage_config_base_path
270        coverage_output_base_path = os.path.join(data_context().content.root, data_context().content.results_path)
271    else:
272        raise Exception('No temp path and no coverage config base path. Check for missing coverage_context usage.')
273
274    config_file = os.path.join(coverage_config_base_path, COVERAGE_CONFIG_NAME)
275    coverage_file = os.path.join(coverage_output_base_path, ResultType.COVERAGE.name, '%s=%s=%s=%s=coverage' % (
276        args.command, target_name, args.coverage_label or 'local-%s' % version, 'python-%s' % version))
277
278    if not args.explain and not os.path.exists(config_file):
279        raise Exception('Missing coverage config file: %s' % config_file)
280
281    if args.coverage_check:
282        # cause the 'coverage' module to be found, but not imported or enabled
283        coverage_file = ''
284
285    # Enable code coverage collection on local Python programs (this does not include Ansible modules).
286    # Used by the injectors to support code coverage.
287    # Used by the pytest unit test plugin to support code coverage.
288    # The COVERAGE_FILE variable is also used directly by the 'coverage' module.
289    env = dict(
290        COVERAGE_CONF=config_file,
291        COVERAGE_FILE=coverage_file,
292    )
293
294    if module_coverage:
295        # Enable code coverage collection on Ansible modules (both local and remote).
296        # Used by the AnsiballZ wrapper generator in lib/ansible/executor/module_common.py to support code coverage.
297        env.update(dict(
298            _ANSIBLE_COVERAGE_CONFIG=config_file,
299            _ANSIBLE_COVERAGE_OUTPUT=coverage_file,
300        ))
301
302        if remote_temp_path:
303            # Include the command, target and label so the remote host can create a filename with that info. The remote
304            # is responsible for adding '={language version}=coverage.{hostname}.{pid}.{id}'
305            env['_ANSIBLE_COVERAGE_REMOTE_OUTPUT'] = os.path.join(remote_temp_path, '%s=%s=%s' % (
306                args.command, target_name, args.coverage_label or 'remote'))
307            env['_ANSIBLE_COVERAGE_REMOTE_WHITELIST'] = os.path.join(data_context().content.root, '*')
308
309    return env
310
311
312def intercept_command(args, cmd, target_name, env, capture=False, data=None, cwd=None, python_version=None, temp_path=None, module_coverage=True,
313                      virtualenv=None, disable_coverage=False, remote_temp_path=None):
314    """
315    :type args: TestConfig
316    :type cmd: collections.Iterable[str]
317    :type target_name: str
318    :type env: dict[str, str]
319    :type capture: bool
320    :type data: str | None
321    :type cwd: str | None
322    :type python_version: str | None
323    :type temp_path: str | None
324    :type module_coverage: bool
325    :type virtualenv: str | None
326    :type disable_coverage: bool
327    :type remote_temp_path: str | None
328    :rtype: str | None, str | None
329    """
330    if not env:
331        env = common_environment()
332    else:
333        env = env.copy()
334
335    cmd = list(cmd)
336    version = python_version or args.python_version
337    interpreter = virtualenv or find_python(version)
338    inject_path = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'injector')
339
340    if not virtualenv:
341        # injection of python into the path is required when not activating a virtualenv
342        # otherwise scripts may find the wrong interpreter or possibly no interpreter
343        python_path = get_python_path(args, interpreter)
344        inject_path = python_path + os.path.pathsep + inject_path
345
346    env['PATH'] = inject_path + os.path.pathsep + env['PATH']
347    env['ANSIBLE_TEST_PYTHON_VERSION'] = version
348    env['ANSIBLE_TEST_PYTHON_INTERPRETER'] = interpreter
349
350    if args.coverage and not disable_coverage:
351        # add the necessary environment variables to enable code coverage collection
352        env.update(get_coverage_environment(args, target_name, version, temp_path, module_coverage,
353                                            remote_temp_path=remote_temp_path))
354
355    return run_command(args, cmd, capture=capture, env=env, data=data, cwd=cwd)
356
357
358def run_command(args, cmd, capture=False, env=None, data=None, cwd=None, always=False, stdin=None, stdout=None,
359                cmd_verbosity=1, str_errors='strict'):
360    """
361    :type args: CommonConfig
362    :type cmd: collections.Iterable[str]
363    :type capture: bool
364    :type env: dict[str, str] | None
365    :type data: str | None
366    :type cwd: str | None
367    :type always: bool
368    :type stdin: file | None
369    :type stdout: file | None
370    :type cmd_verbosity: int
371    :type str_errors: str
372    :rtype: str | None, str | None
373    """
374    explain = args.explain and not always
375    return raw_command(cmd, capture=capture, env=env, data=data, cwd=cwd, explain=explain, stdin=stdin, stdout=stdout,
376                       cmd_verbosity=cmd_verbosity, str_errors=str_errors)
377