1"""Execute Ansible sanity tests."""
2from __future__ import (absolute_import, division, print_function)
3__metaclass__ = type
4
5import abc
6import glob
7import os
8import re
9import collections
10
11from .. import types as t
12
13from ..io import (
14    read_json_file,
15)
16
17from ..util import (
18    ApplicationError,
19    SubprocessError,
20    display,
21    import_plugins,
22    load_plugins,
23    parse_to_list_of_dict,
24    ABC,
25    ANSIBLE_TEST_DATA_ROOT,
26    is_binary_file,
27    read_lines_without_comments,
28    get_available_python_versions,
29    find_python,
30    is_subdir,
31    paths_to_dirs,
32    get_ansible_version,
33    str_to_version,
34)
35
36from ..util_common import (
37    run_command,
38    intercept_command,
39    handle_layout_messages,
40)
41
42from ..ansible_util import (
43    ansible_environment,
44)
45
46from ..target import (
47    walk_internal_targets,
48    walk_sanity_targets,
49    TestTarget,
50)
51
52from ..executor import (
53    get_changes_filter,
54    AllTargetsSkipped,
55    Delegate,
56    install_command_requirements,
57    SUPPORTED_PYTHON_VERSIONS,
58)
59
60from ..config import (
61    SanityConfig,
62)
63
64from ..test import (
65    TestSuccess,
66    TestFailure,
67    TestSkipped,
68    TestMessage,
69    calculate_best_confidence,
70)
71
72from ..data import (
73    data_context,
74)
75
76COMMAND = 'sanity'
77SANITY_ROOT = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'sanity')
78
79
80def command_sanity(args):
81    """
82    :type args: SanityConfig
83    """
84    handle_layout_messages(data_context().content.sanity_messages)
85
86    changes = get_changes_filter(args)
87    require = args.require + changes
88    targets = SanityTargets.create(args.include, args.exclude, require)
89
90    if not targets.include:
91        raise AllTargetsSkipped()
92
93    if args.delegate:
94        raise Delegate(require=changes, exclude=args.exclude)
95
96    tests = sanity_get_tests()
97
98    if args.test:
99        tests = [target for target in tests if target.name in args.test]
100    else:
101        disabled = [target.name for target in tests if not target.enabled and not args.allow_disabled]
102        tests = [target for target in tests if target.enabled or args.allow_disabled]
103
104        if disabled:
105            display.warning('Skipping tests disabled by default without --allow-disabled: %s' % ', '.join(sorted(disabled)))
106
107    if args.skip_test:
108        tests = [target for target in tests if target.name not in args.skip_test]
109
110    total = 0
111    failed = []
112
113    for test in tests:
114        if args.list_tests:
115            display.info(test.name)
116            continue
117
118        available_versions = sorted(get_available_python_versions(SUPPORTED_PYTHON_VERSIONS).keys())
119
120        if args.python:
121            # specific version selected
122            versions = (args.python,)
123        elif isinstance(test, SanityMultipleVersion):
124            # try all supported versions for multi-version tests when a specific version has not been selected
125            versions = test.supported_python_versions
126        elif not test.supported_python_versions or args.python_version in test.supported_python_versions:
127            # the test works with any version or the version we're already running
128            versions = (args.python_version,)
129        else:
130            # available versions supported by the test
131            versions = tuple(sorted(set(available_versions) & set(test.supported_python_versions)))
132            # use the lowest available version supported by the test or the current version as a fallback (which will be skipped)
133            versions = versions[:1] or (args.python_version,)
134
135        for version in versions:
136            if isinstance(test, SanityMultipleVersion):
137                skip_version = version
138            else:
139                skip_version = None
140
141            options = ''
142
143            if test.supported_python_versions and version not in test.supported_python_versions:
144                display.warning("Skipping sanity test '%s' on unsupported Python %s." % (test.name, version))
145                result = SanitySkipped(test.name, skip_version)
146            elif not args.python and version not in available_versions:
147                display.warning("Skipping sanity test '%s' on Python %s due to missing interpreter." % (test.name, version))
148                result = SanitySkipped(test.name, skip_version)
149            else:
150                if test.supported_python_versions:
151                    display.info("Running sanity test '%s' with Python %s" % (test.name, version))
152                else:
153                    display.info("Running sanity test '%s'" % test.name)
154
155                if isinstance(test, SanityCodeSmellTest):
156                    settings = test.load_processor(args)
157                elif isinstance(test, SanityMultipleVersion):
158                    settings = test.load_processor(args, version)
159                elif isinstance(test, SanitySingleVersion):
160                    settings = test.load_processor(args)
161                elif isinstance(test, SanityVersionNeutral):
162                    settings = test.load_processor(args)
163                else:
164                    raise Exception('Unsupported test type: %s' % type(test))
165
166                all_targets = targets.targets
167
168                if test.all_targets:
169                    usable_targets = targets.targets
170                elif test.no_targets:
171                    usable_targets = tuple()
172                else:
173                    usable_targets = targets.include
174
175                all_targets = SanityTargets.filter_and_inject_targets(test, all_targets)
176                usable_targets = SanityTargets.filter_and_inject_targets(test, usable_targets)
177
178                usable_targets = sorted(test.filter_targets(list(usable_targets)))
179                usable_targets = settings.filter_skipped_targets(usable_targets)
180                sanity_targets = SanityTargets(tuple(all_targets), tuple(usable_targets))
181
182                if usable_targets or test.no_targets:
183                    install_command_requirements(args, version, context=test.name, enable_pyyaml_check=True)
184
185                    if isinstance(test, SanityCodeSmellTest):
186                        result = test.test(args, sanity_targets, version)
187                    elif isinstance(test, SanityMultipleVersion):
188                        result = test.test(args, sanity_targets, version)
189                        options = ' --python %s' % version
190                    elif isinstance(test, SanitySingleVersion):
191                        result = test.test(args, sanity_targets, version)
192                    elif isinstance(test, SanityVersionNeutral):
193                        result = test.test(args, sanity_targets)
194                    else:
195                        raise Exception('Unsupported test type: %s' % type(test))
196                else:
197                    result = SanitySkipped(test.name, skip_version)
198
199            result.write(args)
200
201            total += 1
202
203            if isinstance(result, SanityFailure):
204                failed.append(result.test + options)
205
206    if failed:
207        message = 'The %d sanity test(s) listed below (out of %d) failed. See error output above for details.\n%s' % (
208            len(failed), total, '\n'.join(failed))
209
210        if args.failure_ok:
211            display.error(message)
212        else:
213            raise ApplicationError(message)
214
215
216def collect_code_smell_tests():  # type: () -> t.Tuple[SanityFunc, ...]
217    """Return a tuple of available code smell sanity tests."""
218    paths = glob.glob(os.path.join(SANITY_ROOT, 'code-smell', '*.py'))
219
220    if data_context().content.is_ansible:
221        # include Ansible specific code-smell tests which are not configured to be skipped
222        ansible_code_smell_root = os.path.join(data_context().content.root, 'test', 'sanity', 'code-smell')
223        skip_tests = read_lines_without_comments(os.path.join(ansible_code_smell_root, 'skip.txt'), remove_blank_lines=True, optional=True)
224        paths.extend(path for path in glob.glob(os.path.join(ansible_code_smell_root, '*.py')) if os.path.basename(path) not in skip_tests)
225
226    paths = sorted(p for p in paths if os.access(p, os.X_OK) and os.path.isfile(p))
227    tests = tuple(SanityCodeSmellTest(p) for p in paths)
228
229    return tests
230
231
232def sanity_get_tests():
233    """
234    :rtype: tuple[SanityFunc]
235    """
236    return SANITY_TESTS
237
238
239class SanityIgnoreParser:
240    """Parser for the consolidated sanity test ignore file."""
241    NO_CODE = '_'
242
243    def __init__(self, args):  # type: (SanityConfig) -> None
244        if data_context().content.collection:
245            ansible_version = '%s.%s' % tuple(get_ansible_version().split('.')[:2])
246
247            ansible_label = 'Ansible %s' % ansible_version
248            file_name = 'ignore-%s.txt' % ansible_version
249        else:
250            ansible_label = 'Ansible'
251            file_name = 'ignore.txt'
252
253        self.args = args
254        self.relative_path = os.path.join(data_context().content.sanity_path, file_name)
255        self.path = os.path.join(data_context().content.root, self.relative_path)
256        self.ignores = collections.defaultdict(lambda: collections.defaultdict(dict))  # type: t.Dict[str, t.Dict[str, t.Dict[str, int]]]
257        self.skips = collections.defaultdict(lambda: collections.defaultdict(int))  # type: t.Dict[str, t.Dict[str, int]]
258        self.parse_errors = []  # type: t.List[t.Tuple[int, int, str]]
259        self.file_not_found_errors = []  # type: t.List[t.Tuple[int, str]]
260
261        lines = read_lines_without_comments(self.path, optional=True)
262        targets = SanityTargets.get_targets()
263        paths = set(target.path for target in targets)
264        tests_by_name = {}  # type: t.Dict[str, SanityTest]
265        versioned_test_names = set()  # type: t.Set[str]
266        unversioned_test_names = {}  # type: t.Dict[str, str]
267        directories = paths_to_dirs(list(paths))
268        paths_by_test = {}  # type: t.Dict[str, t.Set[str]]
269
270        display.info('Read %d sanity test ignore line(s) for %s from: %s' % (len(lines), ansible_label, self.relative_path), verbosity=1)
271
272        for test in sanity_get_tests():
273            test_targets = SanityTargets.filter_and_inject_targets(test, targets)
274
275            paths_by_test[test.name] = set(target.path for target in test.filter_targets(test_targets))
276
277            if isinstance(test, SanityMultipleVersion):
278                versioned_test_names.add(test.name)
279                tests_by_name.update(dict(('%s-%s' % (test.name, python_version), test) for python_version in test.supported_python_versions))
280            else:
281                unversioned_test_names.update(dict(('%s-%s' % (test.name, python_version), test.name) for python_version in SUPPORTED_PYTHON_VERSIONS))
282                tests_by_name[test.name] = test
283
284        for line_no, line in enumerate(lines, start=1):
285            if not line:
286                self.parse_errors.append((line_no, 1, "Line cannot be empty or contain only a comment"))
287                continue
288
289            parts = line.split(' ')
290            path = parts[0]
291            codes = parts[1:]
292
293            if not path:
294                self.parse_errors.append((line_no, 1, "Line cannot start with a space"))
295                continue
296
297            if path.endswith(os.path.sep):
298                if path not in directories:
299                    self.file_not_found_errors.append((line_no, path))
300                    continue
301            else:
302                if path not in paths:
303                    self.file_not_found_errors.append((line_no, path))
304                    continue
305
306            if not codes:
307                self.parse_errors.append((line_no, len(path), "Error code required after path"))
308                continue
309
310            code = codes[0]
311
312            if not code:
313                self.parse_errors.append((line_no, len(path) + 1, "Error code after path cannot be empty"))
314                continue
315
316            if len(codes) > 1:
317                self.parse_errors.append((line_no, len(path) + len(code) + 2, "Error code cannot contain spaces"))
318                continue
319
320            parts = code.split('!')
321            code = parts[0]
322            commands = parts[1:]
323
324            parts = code.split(':')
325            test_name = parts[0]
326            error_codes = parts[1:]
327
328            test = tests_by_name.get(test_name)
329
330            if not test:
331                unversioned_name = unversioned_test_names.get(test_name)
332
333                if unversioned_name:
334                    self.parse_errors.append((line_no, len(path) + len(unversioned_name) + 2, "Sanity test '%s' cannot use a Python version like '%s'" % (
335                        unversioned_name, test_name)))
336                elif test_name in versioned_test_names:
337                    self.parse_errors.append((line_no, len(path) + len(test_name) + 1, "Sanity test '%s' requires a Python version like '%s-%s'" % (
338                        test_name, test_name, args.python_version)))
339                else:
340                    self.parse_errors.append((line_no, len(path) + 2, "Sanity test '%s' does not exist" % test_name))
341
342                continue
343
344            if path.endswith(os.path.sep) and not test.include_directories:
345                self.parse_errors.append((line_no, 1, "Sanity test '%s' does not support directory paths" % test_name))
346                continue
347
348            if path not in paths_by_test[test.name] and not test.no_targets:
349                self.parse_errors.append((line_no, 1, "Sanity test '%s' does not test path '%s'" % (test_name, path)))
350                continue
351
352            if commands and error_codes:
353                self.parse_errors.append((line_no, len(path) + len(test_name) + 2, "Error code cannot contain both '!' and ':' characters"))
354                continue
355
356            if commands:
357                command = commands[0]
358
359                if len(commands) > 1:
360                    self.parse_errors.append((line_no, len(path) + len(test_name) + len(command) + 3, "Error code cannot contain multiple '!' characters"))
361                    continue
362
363                if command == 'skip':
364                    if not test.can_skip:
365                        self.parse_errors.append((line_no, len(path) + len(test_name) + 2, "Sanity test '%s' cannot be skipped" % test_name))
366                        continue
367
368                    existing_line_no = self.skips.get(test_name, {}).get(path)
369
370                    if existing_line_no:
371                        self.parse_errors.append((line_no, 1, "Duplicate '%s' skip for path '%s' first found on line %d" % (test_name, path, existing_line_no)))
372                        continue
373
374                    self.skips[test_name][path] = line_no
375                    continue
376
377                self.parse_errors.append((line_no, len(path) + len(test_name) + 2, "Command '!%s' not recognized" % command))
378                continue
379
380            if not test.can_ignore:
381                self.parse_errors.append((line_no, len(path) + 1, "Sanity test '%s' cannot be ignored" % test_name))
382                continue
383
384            if test.error_code:
385                if not error_codes:
386                    self.parse_errors.append((line_no, len(path) + len(test_name) + 1, "Sanity test '%s' requires an error code" % test_name))
387                    continue
388
389                error_code = error_codes[0]
390
391                if len(error_codes) > 1:
392                    self.parse_errors.append((line_no, len(path) + len(test_name) + len(error_code) + 3, "Error code cannot contain multiple ':' characters"))
393                    continue
394
395                if error_code in test.optional_error_codes:
396                    self.parse_errors.append((line_no, len(path) + len(test_name) + 3, "Optional error code '%s' cannot be ignored" % (
397                        error_code)))
398                    continue
399            else:
400                if error_codes:
401                    self.parse_errors.append((line_no, len(path) + len(test_name) + 2, "Sanity test '%s' does not support error codes" % test_name))
402                    continue
403
404                error_code = self.NO_CODE
405
406            existing = self.ignores.get(test_name, {}).get(path, {}).get(error_code)
407
408            if existing:
409                if test.error_code:
410                    self.parse_errors.append((line_no, 1, "Duplicate '%s' ignore for error code '%s' for path '%s' first found on line %d" % (
411                        test_name, error_code, path, existing)))
412                else:
413                    self.parse_errors.append((line_no, 1, "Duplicate '%s' ignore for path '%s' first found on line %d" % (
414                        test_name, path, existing)))
415
416                continue
417
418            self.ignores[test_name][path][error_code] = line_no
419
420    @staticmethod
421    def load(args):  # type: (SanityConfig) -> SanityIgnoreParser
422        """Return the current SanityIgnore instance, initializing it if needed."""
423        try:
424            return SanityIgnoreParser.instance
425        except AttributeError:
426            pass
427
428        SanityIgnoreParser.instance = SanityIgnoreParser(args)
429        return SanityIgnoreParser.instance
430
431
432class SanityIgnoreProcessor:
433    """Processor for sanity test ignores for a single run of one sanity test."""
434    def __init__(self,
435                 args,  # type: SanityConfig
436                 test,  # type: SanityTest
437                 python_version,  # type: t.Optional[str]
438                 ):  # type: (...) -> None
439        name = test.name
440        code = test.error_code
441
442        if python_version:
443            full_name = '%s-%s' % (name, python_version)
444        else:
445            full_name = name
446
447        self.args = args
448        self.test = test
449        self.code = code
450        self.parser = SanityIgnoreParser.load(args)
451        self.ignore_entries = self.parser.ignores.get(full_name, {})
452        self.skip_entries = self.parser.skips.get(full_name, {})
453        self.used_line_numbers = set()  # type: t.Set[int]
454
455    def filter_skipped_targets(self, targets):  # type: (t.List[TestTarget]) -> t.List[TestTarget]
456        """Return the given targets, with any skipped paths filtered out."""
457        return sorted(target for target in targets if target.path not in self.skip_entries)
458
459    def process_errors(self, errors, paths):  # type: (t.List[SanityMessage], t.List[str]) -> t.List[SanityMessage]
460        """Return the given errors filtered for ignores and with any settings related errors included."""
461        errors = self.filter_messages(errors)
462        errors.extend(self.get_errors(paths))
463
464        errors = sorted(set(errors))
465
466        return errors
467
468    def filter_messages(self, messages):  # type: (t.List[SanityMessage]) -> t.List[SanityMessage]
469        """Return a filtered list of the given messages using the entries that have been loaded."""
470        filtered = []
471
472        for message in messages:
473            if message.code in self.test.optional_error_codes and not self.args.enable_optional_errors:
474                continue
475
476            path_entry = self.ignore_entries.get(message.path)
477
478            if path_entry:
479                code = message.code if self.code else SanityIgnoreParser.NO_CODE
480                line_no = path_entry.get(code)
481
482                if line_no:
483                    self.used_line_numbers.add(line_no)
484                    continue
485
486            filtered.append(message)
487
488        return filtered
489
490    def get_errors(self, paths):  # type: (t.List[str]) -> t.List[SanityMessage]
491        """Return error messages related to issues with the file."""
492        messages = []
493
494        # unused errors
495
496        unused = []  # type: t.List[t.Tuple[int, str, str]]
497
498        if self.test.no_targets or self.test.all_targets:
499            # tests which do not accept a target list, or which use all targets, always return all possible errors, so all ignores can be checked
500            targets = SanityTargets.get_targets()
501            test_targets = SanityTargets.filter_and_inject_targets(self.test, targets)
502            paths = [target.path for target in test_targets]
503
504        for path in paths:
505            path_entry = self.ignore_entries.get(path)
506
507            if not path_entry:
508                continue
509
510            unused.extend((line_no, path, code) for code, line_no in path_entry.items() if line_no not in self.used_line_numbers)
511
512        messages.extend(SanityMessage(
513            code=self.code,
514            message="Ignoring '%s' on '%s' is unnecessary" % (code, path) if self.code else "Ignoring '%s' is unnecessary" % path,
515            path=self.parser.relative_path,
516            line=line,
517            column=1,
518            confidence=calculate_best_confidence(((self.parser.path, line), (path, 0)), self.args.metadata) if self.args.metadata.changes else None,
519        ) for line, path, code in unused)
520
521        return messages
522
523
524class SanitySuccess(TestSuccess):
525    """Sanity test success."""
526    def __init__(self, test, python_version=None):
527        """
528        :type test: str
529        :type python_version: str
530        """
531        super(SanitySuccess, self).__init__(COMMAND, test, python_version)
532
533
534class SanitySkipped(TestSkipped):
535    """Sanity test skipped."""
536    def __init__(self, test, python_version=None):
537        """
538        :type test: str
539        :type python_version: str
540        """
541        super(SanitySkipped, self).__init__(COMMAND, test, python_version)
542
543
544class SanityFailure(TestFailure):
545    """Sanity test failure."""
546    def __init__(self, test, python_version=None, messages=None, summary=None):
547        """
548        :type test: str
549        :type python_version: str
550        :type messages: list[SanityMessage]
551        :type summary: unicode
552        """
553        super(SanityFailure, self).__init__(COMMAND, test, python_version, messages, summary)
554
555
556class SanityMessage(TestMessage):
557    """Single sanity test message for one file."""
558
559
560class SanityTargets:
561    """Sanity test target information."""
562    def __init__(self, targets, include):  # type: (t.Tuple[TestTarget], t.Tuple[TestTarget]) -> None
563        self.targets = targets
564        self.include = include
565
566    @staticmethod
567    def create(include, exclude, require):  # type: (t.List[str], t.List[str], t.List[str]) -> SanityTargets
568        """Create a SanityTargets instance from the given include, exclude and require lists."""
569        _targets = SanityTargets.get_targets()
570        _include = walk_internal_targets(_targets, include, exclude, require)
571        return SanityTargets(_targets, _include)
572
573    @staticmethod
574    def filter_and_inject_targets(test, targets):  # type: (SanityTest, t.Iterable[TestTarget]) -> t.List[TestTarget]
575        """Filter and inject targets based on test requirements and the given target list."""
576        test_targets = list(targets)
577
578        if not test.include_symlinks:
579            # remove all symlinks unless supported by the test
580            test_targets = [target for target in test_targets if not target.symlink]
581
582        if not test.include_directories or not test.include_symlinks:
583            # exclude symlinked directories unless supported by the test
584            test_targets = [target for target in test_targets if not target.path.endswith(os.path.sep)]
585
586        if test.include_directories:
587            # include directories containing any of the included files
588            test_targets += tuple(TestTarget(path, None, None, '') for path in paths_to_dirs([target.path for target in test_targets]))
589
590            if not test.include_symlinks:
591                # remove all directory symlinks unless supported by the test
592                test_targets = [target for target in test_targets if not target.symlink]
593
594        return test_targets
595
596    @staticmethod
597    def get_targets():  # type: () -> t.Tuple[TestTarget, ...]
598        """Return a tuple of sanity test targets. Uses a cached version when available."""
599        try:
600            return SanityTargets.get_targets.targets
601        except AttributeError:
602            SanityTargets.get_targets.targets = tuple(sorted(walk_sanity_targets()))
603
604        return SanityTargets.get_targets.targets
605
606
607class SanityTest(ABC):
608    """Sanity test base class."""
609    __metaclass__ = abc.ABCMeta
610
611    ansible_only = False
612
613    def __init__(self, name):
614        self.name = name
615        self.enabled = True
616
617        # Optional error codes represent errors which spontaneously occur without changes to the content under test, such as those based on the current date.
618        # Because these errors can be unpredictable they behave differently than normal error codes:
619        #  * They are not reported by default. The `--enable-optional-errors` option must be used to display these errors.
620        #  * They cannot be ignored. This is done to maintain the integrity of the ignore system.
621        self.optional_error_codes = set()
622
623    @property
624    def error_code(self):  # type: () -> t.Optional[str]
625        """Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes."""
626        return None
627
628    @property
629    def can_ignore(self):  # type: () -> bool
630        """True if the test supports ignore entries."""
631        return True
632
633    @property
634    def can_skip(self):  # type: () -> bool
635        """True if the test supports skip entries."""
636        return not self.all_targets and not self.no_targets
637
638    @property
639    def all_targets(self):  # type: () -> bool
640        """True if test targets will not be filtered using includes, excludes, requires or changes. Mutually exclusive with no_targets."""
641        return False
642
643    @property
644    def no_targets(self):  # type: () -> bool
645        """True if the test does not use test targets. Mutually exclusive with all_targets."""
646        return False
647
648    @property
649    def include_directories(self):  # type: () -> bool
650        """True if the test targets should include directories."""
651        return False
652
653    @property
654    def include_symlinks(self):  # type: () -> bool
655        """True if the test targets should include symlinks."""
656        return False
657
658    @property
659    def supported_python_versions(self):  # type: () -> t.Optional[t.Tuple[str, ...]]
660        """A tuple of supported Python versions or None if the test does not depend on specific Python versions."""
661        return tuple(python_version for python_version in SUPPORTED_PYTHON_VERSIONS if python_version.startswith('3.'))
662
663    def filter_targets(self, targets):  # type: (t.List[TestTarget]) -> t.List[TestTarget]  # pylint: disable=unused-argument
664        """Return the given list of test targets, filtered to include only those relevant for the test."""
665        if self.no_targets:
666            return []
667
668        raise NotImplementedError('Sanity test "%s" must implement "filter_targets" or set "no_targets" to True.' % self.name)
669
670
671class SanityCodeSmellTest(SanityTest):
672    """Sanity test script."""
673    def __init__(self, path):
674        name = os.path.splitext(os.path.basename(path))[0]
675        config_path = os.path.splitext(path)[0] + '.json'
676
677        super(SanityCodeSmellTest, self).__init__(name)
678
679        self.path = path
680        self.config_path = config_path if os.path.exists(config_path) else None
681        self.config = None
682
683        if self.config_path:
684            self.config = read_json_file(self.config_path)
685
686        if self.config:
687            self.enabled = not self.config.get('disabled')
688
689            self.output = self.config.get('output')  # type: t.Optional[str]
690            self.extensions = self.config.get('extensions')  # type: t.List[str]
691            self.prefixes = self.config.get('prefixes')  # type: t.List[str]
692            self.files = self.config.get('files')  # type: t.List[str]
693            self.text = self.config.get('text')  # type: t.Optional[bool]
694            self.ignore_self = self.config.get('ignore_self')  # type: bool
695            self.intercept = self.config.get('intercept')  # type: bool
696            self.minimum_python_version = self.config.get('minimum_python_version')  # type: t.Optional[str]
697
698            self.__all_targets = self.config.get('all_targets')  # type: bool
699            self.__no_targets = self.config.get('no_targets')  # type: bool
700            self.__include_directories = self.config.get('include_directories')  # type: bool
701            self.__include_symlinks = self.config.get('include_symlinks')  # type: bool
702        else:
703            self.output = None
704            self.extensions = []
705            self.prefixes = []
706            self.files = []
707            self.text = None  # type: t.Optional[bool]
708            self.ignore_self = False
709            self.intercept = False
710            self.minimum_python_version = None  # type: t.Optional[str]
711
712            self.__all_targets = False
713            self.__no_targets = True
714            self.__include_directories = False
715            self.__include_symlinks = False
716
717        if self.no_targets:
718            mutually_exclusive = (
719                'extensions',
720                'prefixes',
721                'files',
722                'text',
723                'ignore_self',
724                'all_targets',
725                'include_directories',
726                'include_symlinks',
727            )
728
729            problems = sorted(name for name in mutually_exclusive if getattr(self, name))
730
731            if problems:
732                raise ApplicationError('Sanity test "%s" option "no_targets" is mutually exclusive with options: %s' % (self.name, ', '.join(problems)))
733
734    @property
735    def all_targets(self):  # type: () -> bool
736        """True if test targets will not be filtered using includes, excludes, requires or changes. Mutually exclusive with no_targets."""
737        return self.__all_targets
738
739    @property
740    def no_targets(self):  # type: () -> bool
741        """True if the test does not use test targets. Mutually exclusive with all_targets."""
742        return self.__no_targets
743
744    @property
745    def include_directories(self):  # type: () -> bool
746        """True if the test targets should include directories."""
747        return self.__include_directories
748
749    @property
750    def include_symlinks(self):  # type: () -> bool
751        """True if the test targets should include symlinks."""
752        return self.__include_symlinks
753
754    def filter_targets(self, targets):  # type: (t.List[TestTarget]) -> t.List[TestTarget]
755        """Return the given list of test targets, filtered to include only those relevant for the test."""
756        if self.no_targets:
757            return []
758
759        if self.text is not None:
760            if self.text:
761                targets = [target for target in targets if not is_binary_file(target.path)]
762            else:
763                targets = [target for target in targets if is_binary_file(target.path)]
764
765        if self.extensions:
766            targets = [target for target in targets if os.path.splitext(target.path)[1] in self.extensions
767                       or (is_subdir(target.path, 'bin') and '.py' in self.extensions)]
768
769        if self.prefixes:
770            targets = [target for target in targets if any(target.path.startswith(pre) for pre in self.prefixes)]
771
772        if self.files:
773            targets = [target for target in targets if os.path.basename(target.path) in self.files]
774
775        if self.ignore_self and data_context().content.is_ansible:
776            relative_self_path = os.path.relpath(self.path, data_context().content.root)
777            targets = [target for target in targets if target.path != relative_self_path]
778
779        return targets
780
781    def test(self, args, targets, python_version):
782        """
783        :type args: SanityConfig
784        :type targets: SanityTargets
785        :type python_version: str
786        :rtype: TestResult
787        """
788        if self.minimum_python_version:
789            if str_to_version(python_version) < str_to_version(self.minimum_python_version):
790                display.warning("Skipping sanity test '%s' on unsupported Python %s; requires Python %s or newer." % (
791                    self.name, python_version, self.minimum_python_version))
792                return SanitySkipped(self.name, 'Test requires Python %s or newer' % (self.minimum_python_version, ))
793
794        cmd = [find_python(python_version), self.path]
795
796        env = ansible_environment(args, color=False)
797
798        pattern = None
799        data = None
800
801        settings = self.load_processor(args)
802
803        paths = [target.path for target in targets.include]
804
805        if self.config:
806            if self.output == 'path-line-column-message':
807                pattern = '^(?P<path>[^:]*):(?P<line>[0-9]+):(?P<column>[0-9]+): (?P<message>.*)$'
808            elif self.output == 'path-message':
809                pattern = '^(?P<path>[^:]*): (?P<message>.*)$'
810            else:
811                pattern = ApplicationError('Unsupported output type: %s' % self.output)
812
813        if not self.no_targets:
814            data = '\n'.join(paths)
815
816            if data:
817                display.info(data, verbosity=4)
818
819        try:
820            if self.intercept:
821                stdout, stderr = intercept_command(args, cmd, target_name='sanity.%s' % self.name, data=data, env=env, capture=True, disable_coverage=True)
822            else:
823                stdout, stderr = run_command(args, cmd, data=data, env=env, capture=True)
824
825            status = 0
826        except SubprocessError as ex:
827            stdout = ex.stdout
828            stderr = ex.stderr
829            status = ex.status
830
831        if args.explain:
832            return SanitySuccess(self.name)
833
834        if stdout and not stderr:
835            if pattern:
836                matches = parse_to_list_of_dict(pattern, stdout)
837
838                messages = [SanityMessage(
839                    message=m['message'],
840                    path=m['path'],
841                    line=int(m.get('line', 0)),
842                    column=int(m.get('column', 0)),
843                ) for m in matches]
844
845                messages = settings.process_errors(messages, paths)
846
847                if not messages:
848                    return SanitySuccess(self.name)
849
850                return SanityFailure(self.name, messages=messages)
851
852        if stderr or status:
853            summary = u'%s' % SubprocessError(cmd=cmd, status=status, stderr=stderr, stdout=stdout)
854            return SanityFailure(self.name, summary=summary)
855
856        messages = settings.process_errors([], paths)
857
858        if messages:
859            return SanityFailure(self.name, messages=messages)
860
861        return SanitySuccess(self.name)
862
863    def load_processor(self, args):  # type: (SanityConfig) -> SanityIgnoreProcessor
864        """Load the ignore processor for this sanity test."""
865        return SanityIgnoreProcessor(args, self, None)
866
867
868class SanityFunc(SanityTest):
869    """Base class for sanity test plugins."""
870    def __init__(self):
871        name = self.__class__.__name__
872        name = re.sub(r'Test$', '', name)  # drop Test suffix
873        name = re.sub(r'(.)([A-Z][a-z]+)', r'\1-\2', name).lower()  # use dashes instead of capitalization
874
875        super(SanityFunc, self).__init__(name)
876
877
878class SanityVersionNeutral(SanityFunc):
879    """Base class for sanity test plugins which are idependent of the python version being used."""
880    @abc.abstractmethod
881    def test(self, args, targets):
882        """
883        :type args: SanityConfig
884        :type targets: SanityTargets
885        :rtype: TestResult
886        """
887
888    def load_processor(self, args):  # type: (SanityConfig) -> SanityIgnoreProcessor
889        """Load the ignore processor for this sanity test."""
890        return SanityIgnoreProcessor(args, self, None)
891
892    @property
893    def supported_python_versions(self):  # type: () -> t.Optional[t.Tuple[str, ...]]
894        """A tuple of supported Python versions or None if the test does not depend on specific Python versions."""
895        return None
896
897
898class SanitySingleVersion(SanityFunc):
899    """Base class for sanity test plugins which should run on a single python version."""
900    @abc.abstractmethod
901    def test(self, args, targets, python_version):
902        """
903        :type args: SanityConfig
904        :type targets: SanityTargets
905        :type python_version: str
906        :rtype: TestResult
907        """
908
909    def load_processor(self, args):  # type: (SanityConfig) -> SanityIgnoreProcessor
910        """Load the ignore processor for this sanity test."""
911        return SanityIgnoreProcessor(args, self, None)
912
913
914class SanityMultipleVersion(SanityFunc):
915    """Base class for sanity test plugins which should run on multiple python versions."""
916    @abc.abstractmethod
917    def test(self, args, targets, python_version):
918        """
919        :type args: SanityConfig
920        :type targets: SanityTargets
921        :type python_version: str
922        :rtype: TestResult
923        """
924
925    def load_processor(self, args, python_version):  # type: (SanityConfig, str) -> SanityIgnoreProcessor
926        """Load the ignore processor for this sanity test."""
927        return SanityIgnoreProcessor(args, self, python_version)
928
929    @property
930    def supported_python_versions(self):  # type: () -> t.Optional[t.Tuple[str, ...]]
931        """A tuple of supported Python versions or None if the test does not depend on specific Python versions."""
932        return SUPPORTED_PYTHON_VERSIONS
933
934
935SANITY_TESTS = (
936)
937
938
939def sanity_init():
940    """Initialize full sanity test list (includes code-smell scripts determined at runtime)."""
941    import_plugins('sanity')
942    sanity_plugins = {}  # type: t.Dict[str, t.Type[SanityFunc]]
943    load_plugins(SanityFunc, sanity_plugins)
944    sanity_tests = tuple([plugin() for plugin in sanity_plugins.values() if data_context().content.is_ansible or not plugin.ansible_only])
945    global SANITY_TESTS  # pylint: disable=locally-disabled, global-statement
946    SANITY_TESTS = tuple(sorted(sanity_tests + collect_code_smell_tests(), key=lambda k: k.name))
947