1"""
2    :codeauthor: Pedro Algarvio (pedro@algarvio.me)
3
4
5    ============================
6    Unittest Compatibility Layer
7    ============================
8
9    Compatibility layer to use :mod:`unittest <python2:unittest>` under Python
10    2.7 or `unittest2`_ under Python 2.6 without having to worry about which is
11    in use.
12
13    .. attention::
14
15        Please refer to Python's :mod:`unittest <python2:unittest>`
16        documentation as the ultimate source of information, this is just a
17        compatibility layer.
18
19    .. _`unittest2`: https://pypi.python.org/pypi/unittest2
20"""
21# pylint: disable=unused-import,blacklisted-module,deprecated-method
22
23
24import inspect
25import logging
26import os
27import sys
28import types
29from unittest import TestCase as _TestCase
30from unittest import TestLoader as _TestLoader
31from unittest import TestResult
32from unittest import TestSuite as _TestSuite
33from unittest import TextTestResult as _TextTestResult
34from unittest import TextTestRunner as _TextTestRunner
35from unittest import expectedFailure, skip, skipIf
36from unittest.case import SkipTest, _id
37
38try:
39    import psutil
40
41    HAS_PSUTIL = True
42except ImportError:
43    HAS_PSUTIL = False
44
45log = logging.getLogger(__name__)
46
47# Set SHOW_PROC to True to show
48# process details when running in verbose mode
49# i.e. [CPU:15.1%|MEM:48.3%|Z:0]
50SHOW_PROC = "NO_SHOW_PROC" not in os.environ
51
52LOREM_IPSUM = """\
53Lorem ipsum dolor sit amet, consectetur adipiscing elit. Quisque eget urna a arcu lacinia sagittis.
54Sed scelerisque, lacus eget malesuada vestibulum, justo diam facilisis tortor, in sodales dolor
55nibh eu urna. Aliquam iaculis massa risus, sed elementum risus accumsan id. Suspendisse mattis,
56metus sed lacinia dictum, leo orci dapibus sapien, at porttitor sapien nulla ac velit.
57Duis ac cursus leo, non varius metus. Sed laoreet felis magna, vel tempor diam malesuada nec.
58Quisque cursus odio tortor. In consequat augue nisl, eget lacinia odio vestibulum eget.
59Donec venenatis elementum arcu at rhoncus. Nunc pharetra erat in lacinia convallis. Ut condimentum
60eu mauris sit amet convallis. Morbi vulputate vel odio non laoreet. Nullam in suscipit tellus.
61Sed quis posuere urna."""
62
63
64class TestSuite(_TestSuite):
65    def _handleClassSetUp(self, test, result):
66        previousClass = getattr(result, "_previousTestClass", None)
67        currentClass = test.__class__
68        if (
69            currentClass == previousClass
70            or getattr(currentClass, "setUpClass", None) is None
71        ):
72            return super()._handleClassSetUp(test, result)
73
74        # Store a reference to all class attributes before running the setUpClass method
75        initial_class_attributes = dir(test.__class__)
76        super()._handleClassSetUp(test, result)
77        # Store the difference in in a variable in order to check later if they were deleted
78        test.__class__._prerun_class_attributes = [
79            attr for attr in dir(test.__class__) if attr not in initial_class_attributes
80        ]
81
82    def _tearDownPreviousClass(self, test, result):
83        # Run any tearDownClass code defined
84        super()._tearDownPreviousClass(test, result)
85        previousClass = getattr(result, "_previousTestClass", None)
86        currentClass = test.__class__
87        if currentClass == previousClass:
88            return
89        # See if the previous class attributes have been cleaned
90        if previousClass and getattr(previousClass, "tearDownClass", None):
91            prerun_class_attributes = getattr(
92                previousClass, "_prerun_class_attributes", None
93            )
94            if prerun_class_attributes is not None:
95                previousClass._prerun_class_attributes = None
96                del previousClass._prerun_class_attributes
97                for attr in prerun_class_attributes:
98                    if hasattr(previousClass, attr):
99                        attr_value = getattr(previousClass, attr, None)
100                        if attr_value is None:
101                            continue
102                        if isinstance(attr_value, (bool, str, int)):
103                            setattr(previousClass, attr, None)
104                            continue
105                        log.warning(
106                            "Deleting extra class attribute after test run: %s.%s(%s). "
107                            "Please consider using 'del self.%s' on the test class "
108                            "'tearDownClass()' method",
109                            previousClass.__name__,
110                            attr,
111                            str(getattr(previousClass, attr))[:100],
112                            attr,
113                        )
114                        delattr(previousClass, attr)
115
116    def _handleModuleFixture(self, test, result):
117        # We override _handleModuleFixture so that we can inspect all test classes in the module.
118        # If all tests in a test class are going to be skipped, mark the class to skip.
119        # This avoids running setUpClass and tearDownClass unnecessarily
120        currentModule = test.__class__.__module__
121        try:
122            module = sys.modules[currentModule]
123        except KeyError:
124            return
125        for attr in dir(module):
126            klass = getattr(module, attr)
127            if not inspect.isclass(klass):
128                # Not even a class? Carry on...
129                continue
130            if klass.__module__ != currentModule:
131                # This class is not defined in the module being tested? Carry on...
132                continue
133            if not issubclass(klass, TestCase):
134                # This class is not a subclass of TestCase, carry on
135                continue
136
137            skip_klass = True
138            test_functions = [name for name in dir(klass) if name.startswith("test_")]
139            for name in test_functions:
140                func = getattr(klass, name)
141                if not isinstance(func, types.FunctionType):
142                    # Not even a function, carry on
143                    continue
144                if getattr(func, "__unittest_skip__", False) is False:
145                    # At least one test is not going to be skipped.
146                    # Stop searching.
147                    skip_klass = False
148                    break
149            if skip_klass is True:
150                klass.__unittest_skip__ = True
151        return super()._handleModuleFixture(test, result)
152
153
154class TestLoader(_TestLoader):
155    # We're just subclassing to make sure tha tour TestSuite class is the one used
156    suiteClass = TestSuite
157
158
159class TestCase(_TestCase):
160
161    # pylint: disable=expected-an-indented-block-comment,too-many-leading-hastag-for-block-comment
162    ##   Commented out because it may be causing tests to hang
163    ##   at the end of the run
164    #
165    #    _cwd = os.getcwd()
166    #    _chdir_counter = 0
167
168    #    @classmethod
169    #    def tearDownClass(cls):
170    #        '''
171    #        Overriden method for tearing down all classes in salttesting
172    #
173    #        This hard-resets the environment between test classes
174    #        '''
175    #        # Compare where we are now compared to where we were when we began this family of tests
176    #        if not cls._cwd == os.getcwd() and cls._chdir_counter > 0:
177    #            os.chdir(cls._cwd)
178    #            print('\nWARNING: A misbehaving test has modified the working directory!\nThe test suite has reset the working directory '
179    #                    'on tearDown() to {0}\n'.format(cls._cwd))
180    #            cls._chdir_counter += 1
181    # pylint: enable=expected-an-indented-block-comment,too-many-leading-hastag-for-block-comment
182
183    def run(self, result=None):
184        self._prerun_instance_attributes = dir(self)
185        self.maxDiff = None
186        outcome = super().run(result=result)
187        for attr in dir(self):
188            if attr == "_prerun_instance_attributes":
189                continue
190            if attr in getattr(self.__class__, "_prerun_class_attributes", ()):
191                continue
192            if attr not in self._prerun_instance_attributes:
193                attr_value = getattr(self, attr, None)
194                if attr_value is None:
195                    continue
196                if isinstance(attr_value, (bool, str, int)):
197                    setattr(self, attr, None)
198                    continue
199                log.warning(
200                    "Deleting extra class attribute after test run: %s.%s(%s). "
201                    "Please consider using 'del self.%s' on the test case "
202                    "'tearDown()' method",
203                    self.__class__.__name__,
204                    attr,
205                    getattr(self, attr),
206                    attr,
207                )
208                delattr(self, attr)
209        self._prerun_instance_attributes = None
210        del self._prerun_instance_attributes
211        return outcome
212
213    def shortDescription(self):
214        desc = _TestCase.shortDescription(self)
215        if HAS_PSUTIL and SHOW_PROC:
216            show_zombie_processes = "SHOW_PROC_ZOMBIES" in os.environ
217            proc_info = "[CPU:{}%|MEM:{}%".format(
218                psutil.cpu_percent(), psutil.virtual_memory().percent
219            )
220            if show_zombie_processes:
221                found_zombies = 0
222                try:
223                    for proc in psutil.process_iter():
224                        if proc.status == psutil.STATUS_ZOMBIE:
225                            found_zombies += 1
226                except Exception:  # pylint: disable=broad-except
227                    pass
228                proc_info += "|Z:{}".format(found_zombies)
229            proc_info += "] {short_desc}".format(short_desc=desc if desc else "")
230            return proc_info
231        else:
232            return _TestCase.shortDescription(self)
233
234    def repack_state_returns(self, state_ret):
235        """
236        Accepts a state return dict and returns it back with the top level key
237        names rewritten such that the ID declaration is the key instead of the
238        State's unique tag. For example: 'foo' instead of
239        'file_|-foo_|-/etc/foo.conf|-managed'
240
241        This makes it easier to work with state returns when crafting asserts
242        after running states.
243        """
244        assert isinstance(state_ret, dict), state_ret
245        return {x.split("_|-")[1]: y for x, y in state_ret.items()}
246
247
248class TextTestResult(_TextTestResult):
249    """
250    Custom TestResult class whith logs the start and the end of a test
251    """
252
253    def startTest(self, test):
254        log.debug(">>>>> START >>>>> %s", test.id())
255        return super().startTest(test)
256
257    def stopTest(self, test):
258        log.debug("<<<<< END <<<<<<< %s", test.id())
259        return super().stopTest(test)
260
261
262class TextTestRunner(_TextTestRunner):
263    """
264    Custom Text tests runner to log the start and the end of a test case
265    """
266
267    resultclass = TextTestResult
268
269
270__all__ = [
271    "TestLoader",
272    "TextTestRunner",
273    "TestCase",
274    "expectedFailure",
275    "TestSuite",
276    "skipIf",
277    "TestResult",
278]
279