1""" 2 :codeauthor: Pedro Algarvio (pedro@algarvio.me) 3 4 5 ============================ 6 Unittest Compatibility Layer 7 ============================ 8 9 Compatibility layer to use :mod:`unittest <python2:unittest>` under Python 10 2.7 or `unittest2`_ under Python 2.6 without having to worry about which is 11 in use. 12 13 .. attention:: 14 15 Please refer to Python's :mod:`unittest <python2:unittest>` 16 documentation as the ultimate source of information, this is just a 17 compatibility layer. 18 19 .. _`unittest2`: https://pypi.python.org/pypi/unittest2 20""" 21# pylint: disable=unused-import,blacklisted-module,deprecated-method 22 23 24import inspect 25import logging 26import os 27import sys 28import types 29from unittest import TestCase as _TestCase 30from unittest import TestLoader as _TestLoader 31from unittest import TestResult 32from unittest import TestSuite as _TestSuite 33from unittest import TextTestResult as _TextTestResult 34from unittest import TextTestRunner as _TextTestRunner 35from unittest import expectedFailure, skip, skipIf 36from unittest.case import SkipTest, _id 37 38try: 39 import psutil 40 41 HAS_PSUTIL = True 42except ImportError: 43 HAS_PSUTIL = False 44 45log = logging.getLogger(__name__) 46 47# Set SHOW_PROC to True to show 48# process details when running in verbose mode 49# i.e. [CPU:15.1%|MEM:48.3%|Z:0] 50SHOW_PROC = "NO_SHOW_PROC" not in os.environ 51 52LOREM_IPSUM = """\ 53Lorem ipsum dolor sit amet, consectetur adipiscing elit. Quisque eget urna a arcu lacinia sagittis. 54Sed scelerisque, lacus eget malesuada vestibulum, justo diam facilisis tortor, in sodales dolor 55nibh eu urna. Aliquam iaculis massa risus, sed elementum risus accumsan id. Suspendisse mattis, 56metus sed lacinia dictum, leo orci dapibus sapien, at porttitor sapien nulla ac velit. 57Duis ac cursus leo, non varius metus. Sed laoreet felis magna, vel tempor diam malesuada nec. 58Quisque cursus odio tortor. In consequat augue nisl, eget lacinia odio vestibulum eget. 59Donec venenatis elementum arcu at rhoncus. Nunc pharetra erat in lacinia convallis. Ut condimentum 60eu mauris sit amet convallis. Morbi vulputate vel odio non laoreet. Nullam in suscipit tellus. 61Sed quis posuere urna.""" 62 63 64class TestSuite(_TestSuite): 65 def _handleClassSetUp(self, test, result): 66 previousClass = getattr(result, "_previousTestClass", None) 67 currentClass = test.__class__ 68 if ( 69 currentClass == previousClass 70 or getattr(currentClass, "setUpClass", None) is None 71 ): 72 return super()._handleClassSetUp(test, result) 73 74 # Store a reference to all class attributes before running the setUpClass method 75 initial_class_attributes = dir(test.__class__) 76 super()._handleClassSetUp(test, result) 77 # Store the difference in in a variable in order to check later if they were deleted 78 test.__class__._prerun_class_attributes = [ 79 attr for attr in dir(test.__class__) if attr not in initial_class_attributes 80 ] 81 82 def _tearDownPreviousClass(self, test, result): 83 # Run any tearDownClass code defined 84 super()._tearDownPreviousClass(test, result) 85 previousClass = getattr(result, "_previousTestClass", None) 86 currentClass = test.__class__ 87 if currentClass == previousClass: 88 return 89 # See if the previous class attributes have been cleaned 90 if previousClass and getattr(previousClass, "tearDownClass", None): 91 prerun_class_attributes = getattr( 92 previousClass, "_prerun_class_attributes", None 93 ) 94 if prerun_class_attributes is not None: 95 previousClass._prerun_class_attributes = None 96 del previousClass._prerun_class_attributes 97 for attr in prerun_class_attributes: 98 if hasattr(previousClass, attr): 99 attr_value = getattr(previousClass, attr, None) 100 if attr_value is None: 101 continue 102 if isinstance(attr_value, (bool, str, int)): 103 setattr(previousClass, attr, None) 104 continue 105 log.warning( 106 "Deleting extra class attribute after test run: %s.%s(%s). " 107 "Please consider using 'del self.%s' on the test class " 108 "'tearDownClass()' method", 109 previousClass.__name__, 110 attr, 111 str(getattr(previousClass, attr))[:100], 112 attr, 113 ) 114 delattr(previousClass, attr) 115 116 def _handleModuleFixture(self, test, result): 117 # We override _handleModuleFixture so that we can inspect all test classes in the module. 118 # If all tests in a test class are going to be skipped, mark the class to skip. 119 # This avoids running setUpClass and tearDownClass unnecessarily 120 currentModule = test.__class__.__module__ 121 try: 122 module = sys.modules[currentModule] 123 except KeyError: 124 return 125 for attr in dir(module): 126 klass = getattr(module, attr) 127 if not inspect.isclass(klass): 128 # Not even a class? Carry on... 129 continue 130 if klass.__module__ != currentModule: 131 # This class is not defined in the module being tested? Carry on... 132 continue 133 if not issubclass(klass, TestCase): 134 # This class is not a subclass of TestCase, carry on 135 continue 136 137 skip_klass = True 138 test_functions = [name for name in dir(klass) if name.startswith("test_")] 139 for name in test_functions: 140 func = getattr(klass, name) 141 if not isinstance(func, types.FunctionType): 142 # Not even a function, carry on 143 continue 144 if getattr(func, "__unittest_skip__", False) is False: 145 # At least one test is not going to be skipped. 146 # Stop searching. 147 skip_klass = False 148 break 149 if skip_klass is True: 150 klass.__unittest_skip__ = True 151 return super()._handleModuleFixture(test, result) 152 153 154class TestLoader(_TestLoader): 155 # We're just subclassing to make sure tha tour TestSuite class is the one used 156 suiteClass = TestSuite 157 158 159class TestCase(_TestCase): 160 161 # pylint: disable=expected-an-indented-block-comment,too-many-leading-hastag-for-block-comment 162 ## Commented out because it may be causing tests to hang 163 ## at the end of the run 164 # 165 # _cwd = os.getcwd() 166 # _chdir_counter = 0 167 168 # @classmethod 169 # def tearDownClass(cls): 170 # ''' 171 # Overriden method for tearing down all classes in salttesting 172 # 173 # This hard-resets the environment between test classes 174 # ''' 175 # # Compare where we are now compared to where we were when we began this family of tests 176 # if not cls._cwd == os.getcwd() and cls._chdir_counter > 0: 177 # os.chdir(cls._cwd) 178 # print('\nWARNING: A misbehaving test has modified the working directory!\nThe test suite has reset the working directory ' 179 # 'on tearDown() to {0}\n'.format(cls._cwd)) 180 # cls._chdir_counter += 1 181 # pylint: enable=expected-an-indented-block-comment,too-many-leading-hastag-for-block-comment 182 183 def run(self, result=None): 184 self._prerun_instance_attributes = dir(self) 185 self.maxDiff = None 186 outcome = super().run(result=result) 187 for attr in dir(self): 188 if attr == "_prerun_instance_attributes": 189 continue 190 if attr in getattr(self.__class__, "_prerun_class_attributes", ()): 191 continue 192 if attr not in self._prerun_instance_attributes: 193 attr_value = getattr(self, attr, None) 194 if attr_value is None: 195 continue 196 if isinstance(attr_value, (bool, str, int)): 197 setattr(self, attr, None) 198 continue 199 log.warning( 200 "Deleting extra class attribute after test run: %s.%s(%s). " 201 "Please consider using 'del self.%s' on the test case " 202 "'tearDown()' method", 203 self.__class__.__name__, 204 attr, 205 getattr(self, attr), 206 attr, 207 ) 208 delattr(self, attr) 209 self._prerun_instance_attributes = None 210 del self._prerun_instance_attributes 211 return outcome 212 213 def shortDescription(self): 214 desc = _TestCase.shortDescription(self) 215 if HAS_PSUTIL and SHOW_PROC: 216 show_zombie_processes = "SHOW_PROC_ZOMBIES" in os.environ 217 proc_info = "[CPU:{}%|MEM:{}%".format( 218 psutil.cpu_percent(), psutil.virtual_memory().percent 219 ) 220 if show_zombie_processes: 221 found_zombies = 0 222 try: 223 for proc in psutil.process_iter(): 224 if proc.status == psutil.STATUS_ZOMBIE: 225 found_zombies += 1 226 except Exception: # pylint: disable=broad-except 227 pass 228 proc_info += "|Z:{}".format(found_zombies) 229 proc_info += "] {short_desc}".format(short_desc=desc if desc else "") 230 return proc_info 231 else: 232 return _TestCase.shortDescription(self) 233 234 def repack_state_returns(self, state_ret): 235 """ 236 Accepts a state return dict and returns it back with the top level key 237 names rewritten such that the ID declaration is the key instead of the 238 State's unique tag. For example: 'foo' instead of 239 'file_|-foo_|-/etc/foo.conf|-managed' 240 241 This makes it easier to work with state returns when crafting asserts 242 after running states. 243 """ 244 assert isinstance(state_ret, dict), state_ret 245 return {x.split("_|-")[1]: y for x, y in state_ret.items()} 246 247 248class TextTestResult(_TextTestResult): 249 """ 250 Custom TestResult class whith logs the start and the end of a test 251 """ 252 253 def startTest(self, test): 254 log.debug(">>>>> START >>>>> %s", test.id()) 255 return super().startTest(test) 256 257 def stopTest(self, test): 258 log.debug("<<<<< END <<<<<<< %s", test.id()) 259 return super().stopTest(test) 260 261 262class TextTestRunner(_TextTestRunner): 263 """ 264 Custom Text tests runner to log the start and the end of a test case 265 """ 266 267 resultclass = TextTestResult 268 269 270__all__ = [ 271 "TestLoader", 272 "TextTestRunner", 273 "TestCase", 274 "expectedFailure", 275 "TestSuite", 276 "skipIf", 277 "TestResult", 278] 279