1"""Common utility code that depends on CommonConfig.""" 2from __future__ import (absolute_import, division, print_function) 3__metaclass__ = type 4 5import atexit 6import contextlib 7import json 8import os 9import shutil 10import sys 11import tempfile 12import textwrap 13 14from . import types as t 15 16from .util import ( 17 common_environment, 18 COVERAGE_CONFIG_NAME, 19 display, 20 find_python, 21 remove_tree, 22 MODE_DIRECTORY, 23 MODE_FILE_EXECUTE, 24 PYTHON_PATHS, 25 raw_command, 26 to_bytes, 27 ANSIBLE_TEST_DATA_ROOT, 28 make_dirs, 29 ApplicationError, 30) 31 32from .data import ( 33 data_context, 34) 35 36from .provider.layout import ( 37 LayoutMessages, 38) 39 40 41class ResultType: 42 """Test result type.""" 43 BOT = None # type: ResultType 44 COVERAGE = None # type: ResultType 45 DATA = None # type: ResultType 46 JUNIT = None # type: ResultType 47 LOGS = None # type: ResultType 48 REPORTS = None # type: ResultType 49 TMP = None # type: ResultType 50 51 @staticmethod 52 def _populate(): 53 ResultType.BOT = ResultType('bot') 54 ResultType.COVERAGE = ResultType('coverage') 55 ResultType.DATA = ResultType('data') 56 ResultType.JUNIT = ResultType('junit') 57 ResultType.LOGS = ResultType('logs') 58 ResultType.REPORTS = ResultType('reports') 59 ResultType.TMP = ResultType('.tmp') 60 61 def __init__(self, name): # type: (str) -> None 62 self.name = name 63 64 @property 65 def relative_path(self): # type: () -> str 66 """The content relative path to the results.""" 67 return os.path.join(data_context().content.results_path, self.name) 68 69 @property 70 def path(self): # type: () -> str 71 """The absolute path to the results.""" 72 return os.path.join(data_context().content.root, self.relative_path) 73 74 def __str__(self): # type: () -> str 75 return self.name 76 77 78# noinspection PyProtectedMember 79ResultType._populate() # pylint: disable=protected-access 80 81 82class CommonConfig: 83 """Configuration common to all commands.""" 84 def __init__(self, args, command): 85 """ 86 :type args: any 87 :type command: str 88 """ 89 self.command = command 90 91 self.color = args.color # type: bool 92 self.explain = args.explain # type: bool 93 self.verbosity = args.verbosity # type: int 94 self.debug = args.debug # type: bool 95 self.truncate = args.truncate # type: int 96 self.redact = args.redact # type: bool 97 98 self.cache = {} 99 100 def get_ansible_config(self): # type: () -> str 101 """Return the path to the Ansible config for the given config.""" 102 return os.path.join(ANSIBLE_TEST_DATA_ROOT, 'ansible.cfg') 103 104 105def handle_layout_messages(messages): # type: (t.Optional[LayoutMessages]) -> None 106 """Display the given layout messages.""" 107 if not messages: 108 return 109 110 for message in messages.info: 111 display.info(message, verbosity=1) 112 113 for message in messages.warning: 114 display.warning(message) 115 116 if messages.error: 117 raise ApplicationError('\n'.join(messages.error)) 118 119 120@contextlib.contextmanager 121def named_temporary_file(args, prefix, suffix, directory, content): 122 """ 123 :param args: CommonConfig 124 :param prefix: str 125 :param suffix: str 126 :param directory: str 127 :param content: str | bytes | unicode 128 :rtype: str 129 """ 130 if args.explain: 131 yield os.path.join(directory, '%stemp%s' % (prefix, suffix)) 132 else: 133 with tempfile.NamedTemporaryFile(prefix=prefix, suffix=suffix, dir=directory) as tempfile_fd: 134 tempfile_fd.write(to_bytes(content)) 135 tempfile_fd.flush() 136 137 yield tempfile_fd.name 138 139 140def write_json_test_results(category, name, content): # type: (ResultType, str, t.Union[t.List[t.Any], t.Dict[str, t.Any]]) -> None 141 """Write the given json content to the specified test results path, creating directories as needed.""" 142 path = os.path.join(category.path, name) 143 write_json_file(path, content, create_directories=True) 144 145 146def write_text_test_results(category, name, content): # type: (ResultType, str, str) -> None 147 """Write the given text content to the specified test results path, creating directories as needed.""" 148 path = os.path.join(category.path, name) 149 write_text_file(path, content, create_directories=True) 150 151 152def write_json_file(path, content, create_directories=False): # type: (str, t.Union[t.List[t.Any], t.Dict[str, t.Any]], bool) -> None 153 """Write the given json content to the specified path, optionally creating missing directories.""" 154 text_content = json.dumps(content, sort_keys=True, indent=4) + '\n' 155 write_text_file(path, text_content, create_directories=create_directories) 156 157 158def write_text_file(path, content, create_directories=False): # type: (str, str, bool) -> None 159 """Write the given text content to the specified path, optionally creating missing directories.""" 160 if create_directories: 161 make_dirs(os.path.dirname(path)) 162 163 with open(to_bytes(path), 'wb') as file: 164 file.write(to_bytes(content)) 165 166 167def get_python_path(args, interpreter): 168 """ 169 :type args: TestConfig 170 :type interpreter: str 171 :rtype: str 172 """ 173 python_path = PYTHON_PATHS.get(interpreter) 174 175 if python_path: 176 return python_path 177 178 prefix = 'python-' 179 suffix = '-ansible' 180 181 root_temp_dir = '/tmp' 182 183 if args.explain: 184 return os.path.join(root_temp_dir, ''.join((prefix, 'temp', suffix))) 185 186 python_path = tempfile.mkdtemp(prefix=prefix, suffix=suffix, dir=root_temp_dir) 187 injected_interpreter = os.path.join(python_path, 'python') 188 189 # A symlink is faster than the execv wrapper, but isn't compatible with virtual environments. 190 # Attempt to detect when it is safe to use a symlink by checking the real path of the interpreter. 191 use_symlink = os.path.dirname(os.path.realpath(interpreter)) == os.path.dirname(interpreter) 192 193 if use_symlink: 194 display.info('Injecting "%s" as a symlink to the "%s" interpreter.' % (injected_interpreter, interpreter), verbosity=1) 195 196 os.symlink(interpreter, injected_interpreter) 197 else: 198 display.info('Injecting "%s" as a execv wrapper for the "%s" interpreter.' % (injected_interpreter, interpreter), verbosity=1) 199 200 create_interpreter_wrapper(interpreter, injected_interpreter) 201 202 os.chmod(python_path, MODE_DIRECTORY) 203 204 if not PYTHON_PATHS: 205 atexit.register(cleanup_python_paths) 206 207 PYTHON_PATHS[interpreter] = python_path 208 209 return python_path 210 211 212def create_temp_dir(prefix=None, suffix=None, base_dir=None): # type: (t.Optional[str], t.Optional[str], t.Optional[str]) -> str 213 """Create a temporary directory that persists until the current process exits.""" 214 temp_path = tempfile.mkdtemp(prefix=prefix or 'tmp', suffix=suffix or '', dir=base_dir) 215 atexit.register(remove_tree, temp_path) 216 return temp_path 217 218 219def create_interpreter_wrapper(interpreter, injected_interpreter): # type: (str, str) -> None 220 """Create a wrapper for the given Python interpreter at the specified path.""" 221 # sys.executable is used for the shebang to guarantee it is a binary instead of a script 222 # injected_interpreter could be a script from the system or our own wrapper created for the --venv option 223 shebang_interpreter = sys.executable 224 225 code = textwrap.dedent(''' 226 #!%s 227 228 from __future__ import absolute_import 229 230 from os import execv 231 from sys import argv 232 233 python = '%s' 234 235 execv(python, [python] + argv[1:]) 236 ''' % (shebang_interpreter, interpreter)).lstrip() 237 238 write_text_file(injected_interpreter, code) 239 240 os.chmod(injected_interpreter, MODE_FILE_EXECUTE) 241 242 243def cleanup_python_paths(): 244 """Clean up all temporary python directories.""" 245 for path in sorted(PYTHON_PATHS.values()): 246 display.info('Cleaning up temporary python directory: %s' % path, verbosity=2) 247 shutil.rmtree(path) 248 249 250def get_coverage_environment(args, target_name, version, temp_path, module_coverage, remote_temp_path=None): 251 """ 252 :type args: TestConfig 253 :type target_name: str 254 :type version: str 255 :type temp_path: str 256 :type module_coverage: bool 257 :type remote_temp_path: str | None 258 :rtype: dict[str, str] 259 """ 260 if temp_path: 261 # integration tests (both localhost and the optional testhost) 262 # config and results are in a temporary directory 263 coverage_config_base_path = temp_path 264 coverage_output_base_path = temp_path 265 elif args.coverage_config_base_path: 266 # unit tests, sanity tests and other special cases (localhost only) 267 # config is in a temporary directory 268 # results are in the source tree 269 coverage_config_base_path = args.coverage_config_base_path 270 coverage_output_base_path = os.path.join(data_context().content.root, data_context().content.results_path) 271 else: 272 raise Exception('No temp path and no coverage config base path. Check for missing coverage_context usage.') 273 274 config_file = os.path.join(coverage_config_base_path, COVERAGE_CONFIG_NAME) 275 coverage_file = os.path.join(coverage_output_base_path, ResultType.COVERAGE.name, '%s=%s=%s=%s=coverage' % ( 276 args.command, target_name, args.coverage_label or 'local-%s' % version, 'python-%s' % version)) 277 278 if not args.explain and not os.path.exists(config_file): 279 raise Exception('Missing coverage config file: %s' % config_file) 280 281 if args.coverage_check: 282 # cause the 'coverage' module to be found, but not imported or enabled 283 coverage_file = '' 284 285 # Enable code coverage collection on local Python programs (this does not include Ansible modules). 286 # Used by the injectors to support code coverage. 287 # Used by the pytest unit test plugin to support code coverage. 288 # The COVERAGE_FILE variable is also used directly by the 'coverage' module. 289 env = dict( 290 COVERAGE_CONF=config_file, 291 COVERAGE_FILE=coverage_file, 292 ) 293 294 if module_coverage: 295 # Enable code coverage collection on Ansible modules (both local and remote). 296 # Used by the AnsiballZ wrapper generator in lib/ansible/executor/module_common.py to support code coverage. 297 env.update(dict( 298 _ANSIBLE_COVERAGE_CONFIG=config_file, 299 _ANSIBLE_COVERAGE_OUTPUT=coverage_file, 300 )) 301 302 if remote_temp_path: 303 # Include the command, target and label so the remote host can create a filename with that info. The remote 304 # is responsible for adding '={language version}=coverage.{hostname}.{pid}.{id}' 305 env['_ANSIBLE_COVERAGE_REMOTE_OUTPUT'] = os.path.join(remote_temp_path, '%s=%s=%s' % ( 306 args.command, target_name, args.coverage_label or 'remote')) 307 env['_ANSIBLE_COVERAGE_REMOTE_WHITELIST'] = os.path.join(data_context().content.root, '*') 308 309 return env 310 311 312def intercept_command(args, cmd, target_name, env, capture=False, data=None, cwd=None, python_version=None, temp_path=None, module_coverage=True, 313 virtualenv=None, disable_coverage=False, remote_temp_path=None): 314 """ 315 :type args: TestConfig 316 :type cmd: collections.Iterable[str] 317 :type target_name: str 318 :type env: dict[str, str] 319 :type capture: bool 320 :type data: str | None 321 :type cwd: str | None 322 :type python_version: str | None 323 :type temp_path: str | None 324 :type module_coverage: bool 325 :type virtualenv: str | None 326 :type disable_coverage: bool 327 :type remote_temp_path: str | None 328 :rtype: str | None, str | None 329 """ 330 if not env: 331 env = common_environment() 332 else: 333 env = env.copy() 334 335 cmd = list(cmd) 336 version = python_version or args.python_version 337 interpreter = virtualenv or find_python(version) 338 inject_path = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'injector') 339 340 if not virtualenv: 341 # injection of python into the path is required when not activating a virtualenv 342 # otherwise scripts may find the wrong interpreter or possibly no interpreter 343 python_path = get_python_path(args, interpreter) 344 inject_path = python_path + os.path.pathsep + inject_path 345 346 env['PATH'] = inject_path + os.path.pathsep + env['PATH'] 347 env['ANSIBLE_TEST_PYTHON_VERSION'] = version 348 env['ANSIBLE_TEST_PYTHON_INTERPRETER'] = interpreter 349 350 if args.coverage and not disable_coverage: 351 # add the necessary environment variables to enable code coverage collection 352 env.update(get_coverage_environment(args, target_name, version, temp_path, module_coverage, 353 remote_temp_path=remote_temp_path)) 354 355 return run_command(args, cmd, capture=capture, env=env, data=data, cwd=cwd) 356 357 358def run_command(args, cmd, capture=False, env=None, data=None, cwd=None, always=False, stdin=None, stdout=None, 359 cmd_verbosity=1, str_errors='strict'): 360 """ 361 :type args: CommonConfig 362 :type cmd: collections.Iterable[str] 363 :type capture: bool 364 :type env: dict[str, str] | None 365 :type data: str | None 366 :type cwd: str | None 367 :type always: bool 368 :type stdin: file | None 369 :type stdout: file | None 370 :type cmd_verbosity: int 371 :type str_errors: str 372 :rtype: str | None, str | None 373 """ 374 explain = args.explain and not always 375 return raw_command(cmd, capture=capture, env=env, data=data, cwd=cwd, explain=explain, stdin=stdin, stdout=stdout, 376 cmd_verbosity=cmd_verbosity, str_errors=str_errors) 377