1"""Execute Ansible sanity tests.""" 2from __future__ import (absolute_import, division, print_function) 3__metaclass__ = type 4 5import abc 6import glob 7import os 8import re 9import collections 10 11from .. import types as t 12 13from ..io import ( 14 read_json_file, 15) 16 17from ..util import ( 18 ApplicationError, 19 SubprocessError, 20 display, 21 import_plugins, 22 load_plugins, 23 parse_to_list_of_dict, 24 ABC, 25 ANSIBLE_TEST_DATA_ROOT, 26 is_binary_file, 27 read_lines_without_comments, 28 get_available_python_versions, 29 find_python, 30 is_subdir, 31 paths_to_dirs, 32 get_ansible_version, 33 str_to_version, 34) 35 36from ..util_common import ( 37 run_command, 38 intercept_command, 39 handle_layout_messages, 40) 41 42from ..ansible_util import ( 43 ansible_environment, 44) 45 46from ..target import ( 47 walk_internal_targets, 48 walk_sanity_targets, 49 TestTarget, 50) 51 52from ..executor import ( 53 get_changes_filter, 54 AllTargetsSkipped, 55 Delegate, 56 install_command_requirements, 57 SUPPORTED_PYTHON_VERSIONS, 58) 59 60from ..config import ( 61 SanityConfig, 62) 63 64from ..test import ( 65 TestSuccess, 66 TestFailure, 67 TestSkipped, 68 TestMessage, 69 calculate_best_confidence, 70) 71 72from ..data import ( 73 data_context, 74) 75 76COMMAND = 'sanity' 77SANITY_ROOT = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'sanity') 78 79 80def command_sanity(args): 81 """ 82 :type args: SanityConfig 83 """ 84 handle_layout_messages(data_context().content.sanity_messages) 85 86 changes = get_changes_filter(args) 87 require = args.require + changes 88 targets = SanityTargets.create(args.include, args.exclude, require) 89 90 if not targets.include: 91 raise AllTargetsSkipped() 92 93 if args.delegate: 94 raise Delegate(require=changes, exclude=args.exclude) 95 96 tests = sanity_get_tests() 97 98 if args.test: 99 tests = [target for target in tests if target.name in args.test] 100 else: 101 disabled = [target.name for target in tests if not target.enabled and not args.allow_disabled] 102 tests = [target for target in tests if target.enabled or args.allow_disabled] 103 104 if disabled: 105 display.warning('Skipping tests disabled by default without --allow-disabled: %s' % ', '.join(sorted(disabled))) 106 107 if args.skip_test: 108 tests = [target for target in tests if target.name not in args.skip_test] 109 110 total = 0 111 failed = [] 112 113 for test in tests: 114 if args.list_tests: 115 display.info(test.name) 116 continue 117 118 available_versions = sorted(get_available_python_versions(SUPPORTED_PYTHON_VERSIONS).keys()) 119 120 if args.python: 121 # specific version selected 122 versions = (args.python,) 123 elif isinstance(test, SanityMultipleVersion): 124 # try all supported versions for multi-version tests when a specific version has not been selected 125 versions = test.supported_python_versions 126 elif not test.supported_python_versions or args.python_version in test.supported_python_versions: 127 # the test works with any version or the version we're already running 128 versions = (args.python_version,) 129 else: 130 # available versions supported by the test 131 versions = tuple(sorted(set(available_versions) & set(test.supported_python_versions))) 132 # use the lowest available version supported by the test or the current version as a fallback (which will be skipped) 133 versions = versions[:1] or (args.python_version,) 134 135 for version in versions: 136 if isinstance(test, SanityMultipleVersion): 137 skip_version = version 138 else: 139 skip_version = None 140 141 options = '' 142 143 if test.supported_python_versions and version not in test.supported_python_versions: 144 display.warning("Skipping sanity test '%s' on unsupported Python %s." % (test.name, version)) 145 result = SanitySkipped(test.name, skip_version) 146 elif not args.python and version not in available_versions: 147 display.warning("Skipping sanity test '%s' on Python %s due to missing interpreter." % (test.name, version)) 148 result = SanitySkipped(test.name, skip_version) 149 else: 150 if test.supported_python_versions: 151 display.info("Running sanity test '%s' with Python %s" % (test.name, version)) 152 else: 153 display.info("Running sanity test '%s'" % test.name) 154 155 if isinstance(test, SanityCodeSmellTest): 156 settings = test.load_processor(args) 157 elif isinstance(test, SanityMultipleVersion): 158 settings = test.load_processor(args, version) 159 elif isinstance(test, SanitySingleVersion): 160 settings = test.load_processor(args) 161 elif isinstance(test, SanityVersionNeutral): 162 settings = test.load_processor(args) 163 else: 164 raise Exception('Unsupported test type: %s' % type(test)) 165 166 all_targets = targets.targets 167 168 if test.all_targets: 169 usable_targets = targets.targets 170 elif test.no_targets: 171 usable_targets = tuple() 172 else: 173 usable_targets = targets.include 174 175 all_targets = SanityTargets.filter_and_inject_targets(test, all_targets) 176 usable_targets = SanityTargets.filter_and_inject_targets(test, usable_targets) 177 178 usable_targets = sorted(test.filter_targets(list(usable_targets))) 179 usable_targets = settings.filter_skipped_targets(usable_targets) 180 sanity_targets = SanityTargets(tuple(all_targets), tuple(usable_targets)) 181 182 if usable_targets or test.no_targets: 183 install_command_requirements(args, version, context=test.name, enable_pyyaml_check=True) 184 185 if isinstance(test, SanityCodeSmellTest): 186 result = test.test(args, sanity_targets, version) 187 elif isinstance(test, SanityMultipleVersion): 188 result = test.test(args, sanity_targets, version) 189 options = ' --python %s' % version 190 elif isinstance(test, SanitySingleVersion): 191 result = test.test(args, sanity_targets, version) 192 elif isinstance(test, SanityVersionNeutral): 193 result = test.test(args, sanity_targets) 194 else: 195 raise Exception('Unsupported test type: %s' % type(test)) 196 else: 197 result = SanitySkipped(test.name, skip_version) 198 199 result.write(args) 200 201 total += 1 202 203 if isinstance(result, SanityFailure): 204 failed.append(result.test + options) 205 206 if failed: 207 message = 'The %d sanity test(s) listed below (out of %d) failed. See error output above for details.\n%s' % ( 208 len(failed), total, '\n'.join(failed)) 209 210 if args.failure_ok: 211 display.error(message) 212 else: 213 raise ApplicationError(message) 214 215 216def collect_code_smell_tests(): # type: () -> t.Tuple[SanityFunc, ...] 217 """Return a tuple of available code smell sanity tests.""" 218 paths = glob.glob(os.path.join(SANITY_ROOT, 'code-smell', '*.py')) 219 220 if data_context().content.is_ansible: 221 # include Ansible specific code-smell tests which are not configured to be skipped 222 ansible_code_smell_root = os.path.join(data_context().content.root, 'test', 'sanity', 'code-smell') 223 skip_tests = read_lines_without_comments(os.path.join(ansible_code_smell_root, 'skip.txt'), remove_blank_lines=True, optional=True) 224 paths.extend(path for path in glob.glob(os.path.join(ansible_code_smell_root, '*.py')) if os.path.basename(path) not in skip_tests) 225 226 paths = sorted(p for p in paths if os.access(p, os.X_OK) and os.path.isfile(p)) 227 tests = tuple(SanityCodeSmellTest(p) for p in paths) 228 229 return tests 230 231 232def sanity_get_tests(): 233 """ 234 :rtype: tuple[SanityFunc] 235 """ 236 return SANITY_TESTS 237 238 239class SanityIgnoreParser: 240 """Parser for the consolidated sanity test ignore file.""" 241 NO_CODE = '_' 242 243 def __init__(self, args): # type: (SanityConfig) -> None 244 if data_context().content.collection: 245 ansible_version = '%s.%s' % tuple(get_ansible_version().split('.')[:2]) 246 247 ansible_label = 'Ansible %s' % ansible_version 248 file_name = 'ignore-%s.txt' % ansible_version 249 else: 250 ansible_label = 'Ansible' 251 file_name = 'ignore.txt' 252 253 self.args = args 254 self.relative_path = os.path.join(data_context().content.sanity_path, file_name) 255 self.path = os.path.join(data_context().content.root, self.relative_path) 256 self.ignores = collections.defaultdict(lambda: collections.defaultdict(dict)) # type: t.Dict[str, t.Dict[str, t.Dict[str, int]]] 257 self.skips = collections.defaultdict(lambda: collections.defaultdict(int)) # type: t.Dict[str, t.Dict[str, int]] 258 self.parse_errors = [] # type: t.List[t.Tuple[int, int, str]] 259 self.file_not_found_errors = [] # type: t.List[t.Tuple[int, str]] 260 261 lines = read_lines_without_comments(self.path, optional=True) 262 targets = SanityTargets.get_targets() 263 paths = set(target.path for target in targets) 264 tests_by_name = {} # type: t.Dict[str, SanityTest] 265 versioned_test_names = set() # type: t.Set[str] 266 unversioned_test_names = {} # type: t.Dict[str, str] 267 directories = paths_to_dirs(list(paths)) 268 paths_by_test = {} # type: t.Dict[str, t.Set[str]] 269 270 display.info('Read %d sanity test ignore line(s) for %s from: %s' % (len(lines), ansible_label, self.relative_path), verbosity=1) 271 272 for test in sanity_get_tests(): 273 test_targets = SanityTargets.filter_and_inject_targets(test, targets) 274 275 paths_by_test[test.name] = set(target.path for target in test.filter_targets(test_targets)) 276 277 if isinstance(test, SanityMultipleVersion): 278 versioned_test_names.add(test.name) 279 tests_by_name.update(dict(('%s-%s' % (test.name, python_version), test) for python_version in test.supported_python_versions)) 280 else: 281 unversioned_test_names.update(dict(('%s-%s' % (test.name, python_version), test.name) for python_version in SUPPORTED_PYTHON_VERSIONS)) 282 tests_by_name[test.name] = test 283 284 for line_no, line in enumerate(lines, start=1): 285 if not line: 286 self.parse_errors.append((line_no, 1, "Line cannot be empty or contain only a comment")) 287 continue 288 289 parts = line.split(' ') 290 path = parts[0] 291 codes = parts[1:] 292 293 if not path: 294 self.parse_errors.append((line_no, 1, "Line cannot start with a space")) 295 continue 296 297 if path.endswith(os.path.sep): 298 if path not in directories: 299 self.file_not_found_errors.append((line_no, path)) 300 continue 301 else: 302 if path not in paths: 303 self.file_not_found_errors.append((line_no, path)) 304 continue 305 306 if not codes: 307 self.parse_errors.append((line_no, len(path), "Error code required after path")) 308 continue 309 310 code = codes[0] 311 312 if not code: 313 self.parse_errors.append((line_no, len(path) + 1, "Error code after path cannot be empty")) 314 continue 315 316 if len(codes) > 1: 317 self.parse_errors.append((line_no, len(path) + len(code) + 2, "Error code cannot contain spaces")) 318 continue 319 320 parts = code.split('!') 321 code = parts[0] 322 commands = parts[1:] 323 324 parts = code.split(':') 325 test_name = parts[0] 326 error_codes = parts[1:] 327 328 test = tests_by_name.get(test_name) 329 330 if not test: 331 unversioned_name = unversioned_test_names.get(test_name) 332 333 if unversioned_name: 334 self.parse_errors.append((line_no, len(path) + len(unversioned_name) + 2, "Sanity test '%s' cannot use a Python version like '%s'" % ( 335 unversioned_name, test_name))) 336 elif test_name in versioned_test_names: 337 self.parse_errors.append((line_no, len(path) + len(test_name) + 1, "Sanity test '%s' requires a Python version like '%s-%s'" % ( 338 test_name, test_name, args.python_version))) 339 else: 340 self.parse_errors.append((line_no, len(path) + 2, "Sanity test '%s' does not exist" % test_name)) 341 342 continue 343 344 if path.endswith(os.path.sep) and not test.include_directories: 345 self.parse_errors.append((line_no, 1, "Sanity test '%s' does not support directory paths" % test_name)) 346 continue 347 348 if path not in paths_by_test[test.name] and not test.no_targets: 349 self.parse_errors.append((line_no, 1, "Sanity test '%s' does not test path '%s'" % (test_name, path))) 350 continue 351 352 if commands and error_codes: 353 self.parse_errors.append((line_no, len(path) + len(test_name) + 2, "Error code cannot contain both '!' and ':' characters")) 354 continue 355 356 if commands: 357 command = commands[0] 358 359 if len(commands) > 1: 360 self.parse_errors.append((line_no, len(path) + len(test_name) + len(command) + 3, "Error code cannot contain multiple '!' characters")) 361 continue 362 363 if command == 'skip': 364 if not test.can_skip: 365 self.parse_errors.append((line_no, len(path) + len(test_name) + 2, "Sanity test '%s' cannot be skipped" % test_name)) 366 continue 367 368 existing_line_no = self.skips.get(test_name, {}).get(path) 369 370 if existing_line_no: 371 self.parse_errors.append((line_no, 1, "Duplicate '%s' skip for path '%s' first found on line %d" % (test_name, path, existing_line_no))) 372 continue 373 374 self.skips[test_name][path] = line_no 375 continue 376 377 self.parse_errors.append((line_no, len(path) + len(test_name) + 2, "Command '!%s' not recognized" % command)) 378 continue 379 380 if not test.can_ignore: 381 self.parse_errors.append((line_no, len(path) + 1, "Sanity test '%s' cannot be ignored" % test_name)) 382 continue 383 384 if test.error_code: 385 if not error_codes: 386 self.parse_errors.append((line_no, len(path) + len(test_name) + 1, "Sanity test '%s' requires an error code" % test_name)) 387 continue 388 389 error_code = error_codes[0] 390 391 if len(error_codes) > 1: 392 self.parse_errors.append((line_no, len(path) + len(test_name) + len(error_code) + 3, "Error code cannot contain multiple ':' characters")) 393 continue 394 395 if error_code in test.optional_error_codes: 396 self.parse_errors.append((line_no, len(path) + len(test_name) + 3, "Optional error code '%s' cannot be ignored" % ( 397 error_code))) 398 continue 399 else: 400 if error_codes: 401 self.parse_errors.append((line_no, len(path) + len(test_name) + 2, "Sanity test '%s' does not support error codes" % test_name)) 402 continue 403 404 error_code = self.NO_CODE 405 406 existing = self.ignores.get(test_name, {}).get(path, {}).get(error_code) 407 408 if existing: 409 if test.error_code: 410 self.parse_errors.append((line_no, 1, "Duplicate '%s' ignore for error code '%s' for path '%s' first found on line %d" % ( 411 test_name, error_code, path, existing))) 412 else: 413 self.parse_errors.append((line_no, 1, "Duplicate '%s' ignore for path '%s' first found on line %d" % ( 414 test_name, path, existing))) 415 416 continue 417 418 self.ignores[test_name][path][error_code] = line_no 419 420 @staticmethod 421 def load(args): # type: (SanityConfig) -> SanityIgnoreParser 422 """Return the current SanityIgnore instance, initializing it if needed.""" 423 try: 424 return SanityIgnoreParser.instance 425 except AttributeError: 426 pass 427 428 SanityIgnoreParser.instance = SanityIgnoreParser(args) 429 return SanityIgnoreParser.instance 430 431 432class SanityIgnoreProcessor: 433 """Processor for sanity test ignores for a single run of one sanity test.""" 434 def __init__(self, 435 args, # type: SanityConfig 436 test, # type: SanityTest 437 python_version, # type: t.Optional[str] 438 ): # type: (...) -> None 439 name = test.name 440 code = test.error_code 441 442 if python_version: 443 full_name = '%s-%s' % (name, python_version) 444 else: 445 full_name = name 446 447 self.args = args 448 self.test = test 449 self.code = code 450 self.parser = SanityIgnoreParser.load(args) 451 self.ignore_entries = self.parser.ignores.get(full_name, {}) 452 self.skip_entries = self.parser.skips.get(full_name, {}) 453 self.used_line_numbers = set() # type: t.Set[int] 454 455 def filter_skipped_targets(self, targets): # type: (t.List[TestTarget]) -> t.List[TestTarget] 456 """Return the given targets, with any skipped paths filtered out.""" 457 return sorted(target for target in targets if target.path not in self.skip_entries) 458 459 def process_errors(self, errors, paths): # type: (t.List[SanityMessage], t.List[str]) -> t.List[SanityMessage] 460 """Return the given errors filtered for ignores and with any settings related errors included.""" 461 errors = self.filter_messages(errors) 462 errors.extend(self.get_errors(paths)) 463 464 errors = sorted(set(errors)) 465 466 return errors 467 468 def filter_messages(self, messages): # type: (t.List[SanityMessage]) -> t.List[SanityMessage] 469 """Return a filtered list of the given messages using the entries that have been loaded.""" 470 filtered = [] 471 472 for message in messages: 473 if message.code in self.test.optional_error_codes and not self.args.enable_optional_errors: 474 continue 475 476 path_entry = self.ignore_entries.get(message.path) 477 478 if path_entry: 479 code = message.code if self.code else SanityIgnoreParser.NO_CODE 480 line_no = path_entry.get(code) 481 482 if line_no: 483 self.used_line_numbers.add(line_no) 484 continue 485 486 filtered.append(message) 487 488 return filtered 489 490 def get_errors(self, paths): # type: (t.List[str]) -> t.List[SanityMessage] 491 """Return error messages related to issues with the file.""" 492 messages = [] 493 494 # unused errors 495 496 unused = [] # type: t.List[t.Tuple[int, str, str]] 497 498 if self.test.no_targets or self.test.all_targets: 499 # tests which do not accept a target list, or which use all targets, always return all possible errors, so all ignores can be checked 500 targets = SanityTargets.get_targets() 501 test_targets = SanityTargets.filter_and_inject_targets(self.test, targets) 502 paths = [target.path for target in test_targets] 503 504 for path in paths: 505 path_entry = self.ignore_entries.get(path) 506 507 if not path_entry: 508 continue 509 510 unused.extend((line_no, path, code) for code, line_no in path_entry.items() if line_no not in self.used_line_numbers) 511 512 messages.extend(SanityMessage( 513 code=self.code, 514 message="Ignoring '%s' on '%s' is unnecessary" % (code, path) if self.code else "Ignoring '%s' is unnecessary" % path, 515 path=self.parser.relative_path, 516 line=line, 517 column=1, 518 confidence=calculate_best_confidence(((self.parser.path, line), (path, 0)), self.args.metadata) if self.args.metadata.changes else None, 519 ) for line, path, code in unused) 520 521 return messages 522 523 524class SanitySuccess(TestSuccess): 525 """Sanity test success.""" 526 def __init__(self, test, python_version=None): 527 """ 528 :type test: str 529 :type python_version: str 530 """ 531 super(SanitySuccess, self).__init__(COMMAND, test, python_version) 532 533 534class SanitySkipped(TestSkipped): 535 """Sanity test skipped.""" 536 def __init__(self, test, python_version=None): 537 """ 538 :type test: str 539 :type python_version: str 540 """ 541 super(SanitySkipped, self).__init__(COMMAND, test, python_version) 542 543 544class SanityFailure(TestFailure): 545 """Sanity test failure.""" 546 def __init__(self, test, python_version=None, messages=None, summary=None): 547 """ 548 :type test: str 549 :type python_version: str 550 :type messages: list[SanityMessage] 551 :type summary: unicode 552 """ 553 super(SanityFailure, self).__init__(COMMAND, test, python_version, messages, summary) 554 555 556class SanityMessage(TestMessage): 557 """Single sanity test message for one file.""" 558 559 560class SanityTargets: 561 """Sanity test target information.""" 562 def __init__(self, targets, include): # type: (t.Tuple[TestTarget], t.Tuple[TestTarget]) -> None 563 self.targets = targets 564 self.include = include 565 566 @staticmethod 567 def create(include, exclude, require): # type: (t.List[str], t.List[str], t.List[str]) -> SanityTargets 568 """Create a SanityTargets instance from the given include, exclude and require lists.""" 569 _targets = SanityTargets.get_targets() 570 _include = walk_internal_targets(_targets, include, exclude, require) 571 return SanityTargets(_targets, _include) 572 573 @staticmethod 574 def filter_and_inject_targets(test, targets): # type: (SanityTest, t.Iterable[TestTarget]) -> t.List[TestTarget] 575 """Filter and inject targets based on test requirements and the given target list.""" 576 test_targets = list(targets) 577 578 if not test.include_symlinks: 579 # remove all symlinks unless supported by the test 580 test_targets = [target for target in test_targets if not target.symlink] 581 582 if not test.include_directories or not test.include_symlinks: 583 # exclude symlinked directories unless supported by the test 584 test_targets = [target for target in test_targets if not target.path.endswith(os.path.sep)] 585 586 if test.include_directories: 587 # include directories containing any of the included files 588 test_targets += tuple(TestTarget(path, None, None, '') for path in paths_to_dirs([target.path for target in test_targets])) 589 590 if not test.include_symlinks: 591 # remove all directory symlinks unless supported by the test 592 test_targets = [target for target in test_targets if not target.symlink] 593 594 return test_targets 595 596 @staticmethod 597 def get_targets(): # type: () -> t.Tuple[TestTarget, ...] 598 """Return a tuple of sanity test targets. Uses a cached version when available.""" 599 try: 600 return SanityTargets.get_targets.targets 601 except AttributeError: 602 SanityTargets.get_targets.targets = tuple(sorted(walk_sanity_targets())) 603 604 return SanityTargets.get_targets.targets 605 606 607class SanityTest(ABC): 608 """Sanity test base class.""" 609 __metaclass__ = abc.ABCMeta 610 611 ansible_only = False 612 613 def __init__(self, name): 614 self.name = name 615 self.enabled = True 616 617 # Optional error codes represent errors which spontaneously occur without changes to the content under test, such as those based on the current date. 618 # Because these errors can be unpredictable they behave differently than normal error codes: 619 # * They are not reported by default. The `--enable-optional-errors` option must be used to display these errors. 620 # * They cannot be ignored. This is done to maintain the integrity of the ignore system. 621 self.optional_error_codes = set() 622 623 @property 624 def error_code(self): # type: () -> t.Optional[str] 625 """Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes.""" 626 return None 627 628 @property 629 def can_ignore(self): # type: () -> bool 630 """True if the test supports ignore entries.""" 631 return True 632 633 @property 634 def can_skip(self): # type: () -> bool 635 """True if the test supports skip entries.""" 636 return not self.all_targets and not self.no_targets 637 638 @property 639 def all_targets(self): # type: () -> bool 640 """True if test targets will not be filtered using includes, excludes, requires or changes. Mutually exclusive with no_targets.""" 641 return False 642 643 @property 644 def no_targets(self): # type: () -> bool 645 """True if the test does not use test targets. Mutually exclusive with all_targets.""" 646 return False 647 648 @property 649 def include_directories(self): # type: () -> bool 650 """True if the test targets should include directories.""" 651 return False 652 653 @property 654 def include_symlinks(self): # type: () -> bool 655 """True if the test targets should include symlinks.""" 656 return False 657 658 @property 659 def supported_python_versions(self): # type: () -> t.Optional[t.Tuple[str, ...]] 660 """A tuple of supported Python versions or None if the test does not depend on specific Python versions.""" 661 return tuple(python_version for python_version in SUPPORTED_PYTHON_VERSIONS if python_version.startswith('3.')) 662 663 def filter_targets(self, targets): # type: (t.List[TestTarget]) -> t.List[TestTarget] # pylint: disable=unused-argument 664 """Return the given list of test targets, filtered to include only those relevant for the test.""" 665 if self.no_targets: 666 return [] 667 668 raise NotImplementedError('Sanity test "%s" must implement "filter_targets" or set "no_targets" to True.' % self.name) 669 670 671class SanityCodeSmellTest(SanityTest): 672 """Sanity test script.""" 673 def __init__(self, path): 674 name = os.path.splitext(os.path.basename(path))[0] 675 config_path = os.path.splitext(path)[0] + '.json' 676 677 super(SanityCodeSmellTest, self).__init__(name) 678 679 self.path = path 680 self.config_path = config_path if os.path.exists(config_path) else None 681 self.config = None 682 683 if self.config_path: 684 self.config = read_json_file(self.config_path) 685 686 if self.config: 687 self.enabled = not self.config.get('disabled') 688 689 self.output = self.config.get('output') # type: t.Optional[str] 690 self.extensions = self.config.get('extensions') # type: t.List[str] 691 self.prefixes = self.config.get('prefixes') # type: t.List[str] 692 self.files = self.config.get('files') # type: t.List[str] 693 self.text = self.config.get('text') # type: t.Optional[bool] 694 self.ignore_self = self.config.get('ignore_self') # type: bool 695 self.intercept = self.config.get('intercept') # type: bool 696 self.minimum_python_version = self.config.get('minimum_python_version') # type: t.Optional[str] 697 698 self.__all_targets = self.config.get('all_targets') # type: bool 699 self.__no_targets = self.config.get('no_targets') # type: bool 700 self.__include_directories = self.config.get('include_directories') # type: bool 701 self.__include_symlinks = self.config.get('include_symlinks') # type: bool 702 else: 703 self.output = None 704 self.extensions = [] 705 self.prefixes = [] 706 self.files = [] 707 self.text = None # type: t.Optional[bool] 708 self.ignore_self = False 709 self.intercept = False 710 self.minimum_python_version = None # type: t.Optional[str] 711 712 self.__all_targets = False 713 self.__no_targets = True 714 self.__include_directories = False 715 self.__include_symlinks = False 716 717 if self.no_targets: 718 mutually_exclusive = ( 719 'extensions', 720 'prefixes', 721 'files', 722 'text', 723 'ignore_self', 724 'all_targets', 725 'include_directories', 726 'include_symlinks', 727 ) 728 729 problems = sorted(name for name in mutually_exclusive if getattr(self, name)) 730 731 if problems: 732 raise ApplicationError('Sanity test "%s" option "no_targets" is mutually exclusive with options: %s' % (self.name, ', '.join(problems))) 733 734 @property 735 def all_targets(self): # type: () -> bool 736 """True if test targets will not be filtered using includes, excludes, requires or changes. Mutually exclusive with no_targets.""" 737 return self.__all_targets 738 739 @property 740 def no_targets(self): # type: () -> bool 741 """True if the test does not use test targets. Mutually exclusive with all_targets.""" 742 return self.__no_targets 743 744 @property 745 def include_directories(self): # type: () -> bool 746 """True if the test targets should include directories.""" 747 return self.__include_directories 748 749 @property 750 def include_symlinks(self): # type: () -> bool 751 """True if the test targets should include symlinks.""" 752 return self.__include_symlinks 753 754 def filter_targets(self, targets): # type: (t.List[TestTarget]) -> t.List[TestTarget] 755 """Return the given list of test targets, filtered to include only those relevant for the test.""" 756 if self.no_targets: 757 return [] 758 759 if self.text is not None: 760 if self.text: 761 targets = [target for target in targets if not is_binary_file(target.path)] 762 else: 763 targets = [target for target in targets if is_binary_file(target.path)] 764 765 if self.extensions: 766 targets = [target for target in targets if os.path.splitext(target.path)[1] in self.extensions 767 or (is_subdir(target.path, 'bin') and '.py' in self.extensions)] 768 769 if self.prefixes: 770 targets = [target for target in targets if any(target.path.startswith(pre) for pre in self.prefixes)] 771 772 if self.files: 773 targets = [target for target in targets if os.path.basename(target.path) in self.files] 774 775 if self.ignore_self and data_context().content.is_ansible: 776 relative_self_path = os.path.relpath(self.path, data_context().content.root) 777 targets = [target for target in targets if target.path != relative_self_path] 778 779 return targets 780 781 def test(self, args, targets, python_version): 782 """ 783 :type args: SanityConfig 784 :type targets: SanityTargets 785 :type python_version: str 786 :rtype: TestResult 787 """ 788 if self.minimum_python_version: 789 if str_to_version(python_version) < str_to_version(self.minimum_python_version): 790 display.warning("Skipping sanity test '%s' on unsupported Python %s; requires Python %s or newer." % ( 791 self.name, python_version, self.minimum_python_version)) 792 return SanitySkipped(self.name, 'Test requires Python %s or newer' % (self.minimum_python_version, )) 793 794 cmd = [find_python(python_version), self.path] 795 796 env = ansible_environment(args, color=False) 797 798 pattern = None 799 data = None 800 801 settings = self.load_processor(args) 802 803 paths = [target.path for target in targets.include] 804 805 if self.config: 806 if self.output == 'path-line-column-message': 807 pattern = '^(?P<path>[^:]*):(?P<line>[0-9]+):(?P<column>[0-9]+): (?P<message>.*)$' 808 elif self.output == 'path-message': 809 pattern = '^(?P<path>[^:]*): (?P<message>.*)$' 810 else: 811 pattern = ApplicationError('Unsupported output type: %s' % self.output) 812 813 if not self.no_targets: 814 data = '\n'.join(paths) 815 816 if data: 817 display.info(data, verbosity=4) 818 819 try: 820 if self.intercept: 821 stdout, stderr = intercept_command(args, cmd, target_name='sanity.%s' % self.name, data=data, env=env, capture=True, disable_coverage=True) 822 else: 823 stdout, stderr = run_command(args, cmd, data=data, env=env, capture=True) 824 825 status = 0 826 except SubprocessError as ex: 827 stdout = ex.stdout 828 stderr = ex.stderr 829 status = ex.status 830 831 if args.explain: 832 return SanitySuccess(self.name) 833 834 if stdout and not stderr: 835 if pattern: 836 matches = parse_to_list_of_dict(pattern, stdout) 837 838 messages = [SanityMessage( 839 message=m['message'], 840 path=m['path'], 841 line=int(m.get('line', 0)), 842 column=int(m.get('column', 0)), 843 ) for m in matches] 844 845 messages = settings.process_errors(messages, paths) 846 847 if not messages: 848 return SanitySuccess(self.name) 849 850 return SanityFailure(self.name, messages=messages) 851 852 if stderr or status: 853 summary = u'%s' % SubprocessError(cmd=cmd, status=status, stderr=stderr, stdout=stdout) 854 return SanityFailure(self.name, summary=summary) 855 856 messages = settings.process_errors([], paths) 857 858 if messages: 859 return SanityFailure(self.name, messages=messages) 860 861 return SanitySuccess(self.name) 862 863 def load_processor(self, args): # type: (SanityConfig) -> SanityIgnoreProcessor 864 """Load the ignore processor for this sanity test.""" 865 return SanityIgnoreProcessor(args, self, None) 866 867 868class SanityFunc(SanityTest): 869 """Base class for sanity test plugins.""" 870 def __init__(self): 871 name = self.__class__.__name__ 872 name = re.sub(r'Test$', '', name) # drop Test suffix 873 name = re.sub(r'(.)([A-Z][a-z]+)', r'\1-\2', name).lower() # use dashes instead of capitalization 874 875 super(SanityFunc, self).__init__(name) 876 877 878class SanityVersionNeutral(SanityFunc): 879 """Base class for sanity test plugins which are idependent of the python version being used.""" 880 @abc.abstractmethod 881 def test(self, args, targets): 882 """ 883 :type args: SanityConfig 884 :type targets: SanityTargets 885 :rtype: TestResult 886 """ 887 888 def load_processor(self, args): # type: (SanityConfig) -> SanityIgnoreProcessor 889 """Load the ignore processor for this sanity test.""" 890 return SanityIgnoreProcessor(args, self, None) 891 892 @property 893 def supported_python_versions(self): # type: () -> t.Optional[t.Tuple[str, ...]] 894 """A tuple of supported Python versions or None if the test does not depend on specific Python versions.""" 895 return None 896 897 898class SanitySingleVersion(SanityFunc): 899 """Base class for sanity test plugins which should run on a single python version.""" 900 @abc.abstractmethod 901 def test(self, args, targets, python_version): 902 """ 903 :type args: SanityConfig 904 :type targets: SanityTargets 905 :type python_version: str 906 :rtype: TestResult 907 """ 908 909 def load_processor(self, args): # type: (SanityConfig) -> SanityIgnoreProcessor 910 """Load the ignore processor for this sanity test.""" 911 return SanityIgnoreProcessor(args, self, None) 912 913 914class SanityMultipleVersion(SanityFunc): 915 """Base class for sanity test plugins which should run on multiple python versions.""" 916 @abc.abstractmethod 917 def test(self, args, targets, python_version): 918 """ 919 :type args: SanityConfig 920 :type targets: SanityTargets 921 :type python_version: str 922 :rtype: TestResult 923 """ 924 925 def load_processor(self, args, python_version): # type: (SanityConfig, str) -> SanityIgnoreProcessor 926 """Load the ignore processor for this sanity test.""" 927 return SanityIgnoreProcessor(args, self, python_version) 928 929 @property 930 def supported_python_versions(self): # type: () -> t.Optional[t.Tuple[str, ...]] 931 """A tuple of supported Python versions or None if the test does not depend on specific Python versions.""" 932 return SUPPORTED_PYTHON_VERSIONS 933 934 935SANITY_TESTS = ( 936) 937 938 939def sanity_init(): 940 """Initialize full sanity test list (includes code-smell scripts determined at runtime).""" 941 import_plugins('sanity') 942 sanity_plugins = {} # type: t.Dict[str, t.Type[SanityFunc]] 943 load_plugins(SanityFunc, sanity_plugins) 944 sanity_tests = tuple([plugin() for plugin in sanity_plugins.values() if data_context().content.is_ansible or not plugin.ansible_only]) 945 global SANITY_TESTS # pylint: disable=locally-disabled, global-statement 946 SANITY_TESTS = tuple(sorted(sanity_tests + collect_code_smell_tests(), key=lambda k: k.name)) 947