1# -*- coding: utf-8 -*- 2 3import configparser 4import os 5import inspect 6import subprocess 7import threading 8import shlex 9import sys 10import shutil 11import string 12import unittest 13 14 15from bash_tests import utils as BT 16 17 18if sys.platform in [ 'win32', 'msys', 'cygwin' ]: 19 #: invoke subprocess.Popen with shell=True on Windows 20 _SUBPROCESS_SHELL = True 21 22 def _cmd_splitter(cmd): 23 return cmd 24 25 def _process_output_post(output): 26 return output.replace('\r\n', '\n') 27 28else: 29 #: invoke subprocess.Popen with shell=False on Unix 30 _SUBPROCESS_SHELL = False 31 32 def _cmd_splitter(cmd): 33 return shlex.split(cmd) 34 35 def _process_output_post(output): 36 return output 37 38 39def _disjoint_dict_merge(d1, d2): 40 """ 41 Merges two dictionaries whose keys are disjoint sets and returns the 42 resulting dictionary: 43 44 >>> d1 = {"a": 1} 45 >>> d2 = {"b": 2, "c": 3} 46 >>> _disjoint_dict_merge(d1, d2) == {"a": 1, "b": 2, "c": 3} 47 True 48 49 Passing dictionaries that share keys raises a ValueError: 50 >>> _disjoint_dict_merge({"a": 1, "b": 6}, {"b": 2, "a": 3}) 51 Traceback (most recent call last): 52 .. 53 ValueError: Dictionaries have common keys. 54 55 """ 56 inter = set(d1.keys()).intersection(set(d2.keys())) 57 if len(inter) > 0: 58 raise ValueError("Dictionaries have common keys.") 59 res = d1.copy() 60 res.update(d2) 61 return res 62 63 64class CasePreservingConfigParser(configparser.ConfigParser): 65 r""" ConfigParser where the keys are case sensitive. 66 67 The default ConfigParser converts all options in the config file with their 68 lowercase version. This class overrides the respective functions and 69 preserves the case of keys. 70 71 The default behavior of ConfigParser: 72 >>> conf_string = "[Section1]\nKey = Value" 73 >>> default_conf = configparser.ConfigParser() 74 >>> default_Config.read_string(conf_string) 75 >>> list(default_conf['Section1'].keys()) 76 ['key'] 77 78 This classes' behavior: 79 >>> case_preserve = CasePreservingConfigParser() 80 >>> case_preserve.read_string(conf_string) 81 >>> list(case_preserve['Section1'].keys()) 82 ['Key'] 83 """ 84 85 def optionxform(self, option): 86 return option 87 88 89#: global parameters extracted from the test suite's configuration file 90_parameters = {} 91 92#: variables extracted from the test suite's configuration file 93_config_variables = {} 94 95#: setting whether debug mode is enabled or not 96_debug_mode = False 97 98 99def set_debug_mode(debug): 100 """ Enable or disable debug mode 101 102 In debug mode the test suite will print out all commands that it runs, the 103 expected output and the actually obtained output 104 """ 105 global _debug_mode 106 _debug_mode = debug 107 108 109def configure_suite(config_file): 110 """ 111 Populates a global datastructure with the parameters from the suite's 112 configuration file. 113 114 This function performs the following steps: 115 1. read in the file ``config_file`` via the ConfigParser module using 116 extended interpolation 117 2. check that the sections ``variables`` and ``paths`` are disjoint 118 3. extract the environment variables given in the ``ENV`` section 119 4. save all entries from the ``variables`` section in the global 120 datastructure 121 5. interpret all entries in the ``paths`` section as relative paths from 122 the configuration file, expand them to absolute paths and save them in 123 the global datastructure 124 125 For further information concerning the rationale behind this, please 126 consult the documentation in ``doc.md``. 127 """ 128 129 if not os.path.exists(config_file): 130 raise ValueError( 131 "Test suite config file {:s} does not exist" 132 .format(os.path.abspath(config_file)) 133 ) 134 135 config = CasePreservingConfigParser( 136 interpolation=configparser.ExtendedInterpolation(), 137 delimiters=(':'), 138 comment_prefixes=('#') 139 ) 140 config.read(config_file) 141 142 _parameters["suite_root"] = os.path.split(os.path.abspath(config_file))[0] 143 144 if 'variables' in config and 'paths' in config: 145 intersecting_keys = set(config["paths"].keys()) \ 146 .intersection(set(config["variables"].keys())) 147 if len(intersecting_keys) > 0: 148 raise ValueError( 149 "The sections 'paths' and 'variables' must not share keys, " 150 "but they have the following common key{:s}: {:s}" 151 .format( 152 's' if len(intersecting_keys) > 1 else '', 153 ', '.join(k for k in intersecting_keys) 154 ) 155 ) 156 157 # Extract the environment variables according to config['ENV']. 158 # When an environment variable does not exist, set its default value according to config['ENV fallback']. 159 for key in config['ENV']: 160 env_name = config['ENV'][key] 161 env_fallback = config['ENV fallback'].get(key, '') 162 config['ENV'][key] = os.environ.get(env_name, env_fallback) 163 164 if 'variables' in config: 165 for key in config['variables']: 166 _config_variables[key] = config['variables'][key] 167 168 if 'paths' in config: 169 for key in config['paths']: 170 rel_path = config['paths'][key] 171 abs_path = os.path.abspath( 172 os.path.join(_parameters["suite_root"], rel_path) 173 ) 174 if key == "tmp_path" and not os.path.isdir(abs_path): 175 os.mkdir(abs_path) 176 if key == "data_path" and not os.path.exists(abs_path): 177 raise ValueError( 178 "Path replacement for {short}: {abspath} does not exist" 179 " (was expanded from {rel})".format( 180 short=key, 181 abspath=abs_path, 182 rel=rel_path) 183 ) 184 _config_variables[key] = abs_path 185 186 for key in _config_variables: 187 if key in globals(): 188 raise ValueError("Variable name {!s} already used.") 189 190 globals()[key] = _config_variables[key] 191 192 _parameters["timeout"] = config.getfloat( 193 "General", "timeout", fallback=1.0 194 ) 195 196 if 'memcheck' in config['General']: 197 if config['General']['memcheck'] != '': 198 _parameters['memcheck'] = config['General']['memcheck'] 199 _parameters["timeout"] *= config.getfloat( 200 "General", "memcheck_timeout_penalty", fallback=20.0 201 ) 202 203 # Configure the parameters for bash tests 204 BT.Config.bin_dir = os.path.abspath(config['ENV']['exiv2_path']) 205 BT.Config.dyld_library_path = os.path.abspath(config['ENV']['dyld_library_path']) 206 BT.Config.ld_library_path = os.path.abspath(config['ENV']['ld_library_path']) 207 BT.Config.data_dir = os.path.abspath(config['paths']['data_path']) 208 BT.Config.tmp_dir = os.path.abspath(config['paths']['tmp_path']) 209 BT.Config.exiv2_http = config['ENV']['exiv2_http'] 210 BT.Config.exiv2_port = config['ENV']['exiv2_port'] 211 BT.Config.exiv2_echo = config['ENV']['exiv2_echo'] 212 BT.Config.verbose = config['ENV']['verbose'] 213 BT.Config.valgrind = config['ENV']['valgrind'] 214 215 216class FileDecoratorBase(object): 217 """ 218 Base class for decorators that manipulate files for test cases. 219 220 The decorator expects to be provided with at least one file path 221 on construction. When called, it replaces the setUp() and 222 tearDown() functions of the type it is called on with custom ones. 223 224 The new setUp() function performs the following steps: 225 - create a file list in the decorated class with the name stored in 226 FILE_LIST_NAME (defaults to _files) 227 - iterate over all files, performing: 228 - expand the file's path via expand_variables (member function 229 of the decorated class) 230 - call self.setUp_file_action(expanded file name) 231 - append the result to the file list in the decorated class 232 - call the old setUp() 233 234 The function self.setUp_file_action is provided by this class and 235 is intended to be overridden by child classes to provide some 236 functionality, like file copies. 237 238 239 The new tearDown() function performs the following steps: 240 - call the old tearDown() function 241 - iterate over all files in the file list: 242 - call self.tearDown_file_action(filename) 243 244 The function self.tearDown_file_action can be overridden by child 245 classes. The default version provided by this class simply deletes 246 all files that are passed to it. 247 248 249 Example 250 ------- 251 252 We'll inherit from FileDecoratorBase and override the member 253 functions setUp_file_action and tearDown_file_action: 254 255 >>> class TestDecorator(FileDecoratorBase): 256 ... def setUp_file_action(self, f): 257 ... print("setUp_file_action with", f) 258 ... return f.capitalize() 259 ... 260 ... def tearDown_file_action(self, f): 261 ... print("tearDown_file_action with", f) 262 263 Then, we use that decorator to wrap a class mocking 264 system_tests.Case: 265 266 >>> @TestDecorator("one", "two", "three") 267 ... class MockCase(object): 268 ... def setUp(self): 269 ... print("calling MockCase.setUp()") 270 ... 271 ... def tearDown(self): 272 ... print("calling MockCase.tearDown()") 273 ... 274 ... def expand_variables(self, var): 275 ... return var + "_file" 276 277 >>> M = MockCase() 278 279 setUp has been replaced by a the new version, but the old one is 280 still called. The new setUp iterates over all parameters passed to 281 the constructor of the decorator, passes them to expand_variables 282 and then to setUp_file_action: 283 >>> M.setUp() 284 setUp_file_action with one_file 285 setUp_file_action with two_file 286 setUp_file_action with three_file 287 calling MockCase.setUp() 288 289 The tearDown() function works accordingly: 290 >>> M.tearDown() 291 calling MockCase.tearDown() 292 tearDown_file_action with One_file 293 tearDown_file_action with Two_file 294 tearDown_file_action with Three_file 295 296 Please note the capitalized "file" names (this is due to 297 setUp_file_action returning f.capitalized()) and that the old 298 tearDown is called after the new one runs. 299 """ 300 301 #: Name of the attribute in the decorated child class where the list of 302 #: files is stored 303 FILE_LIST_NAME = '_files' 304 305 def __init__(self, *files): 306 """ 307 Constructor of FileDecoratorBase. 308 309 To prevent accidental wrong usage, it raises an exception if 310 it is not called as a decorator with parameters. 311 312 Only the following syntax works for this decorator: 313 >>> @FileDecoratorBase("test") 314 ... class Test(unittest.TestCase): 315 ... pass 316 317 Calling it without parameters or without parenthesis raises an 318 exception: 319 >>> @FileDecoratorBase() 320 ... class Test(unittest.TestCase): 321 ... pass 322 Traceback (most recent call last): 323 .. 324 ValueError: No files supplied. 325 326 >>> @FileDecoratorBase 327 ... class Test(unittest.TestCase): 328 ... pass 329 Traceback (most recent call last): 330 .. 331 UserWarning: Decorator used wrongly, must be called with filenames in parenthesis 332 """ 333 if len(files) == 0: 334 raise ValueError("No files supplied.") 335 elif len(files) == 1: 336 if isinstance(files[0], type): 337 raise UserWarning( 338 "Decorator used wrongly, must be called with " 339 "filenames in parenthesis" 340 ) 341 342 self.files = files 343 344 def new_setUp(self, old_setUp): 345 """ 346 Returns a new setUp() function that can be used as a class 347 member function (i.e. invoked via self.setUp()). 348 349 Its functionality is described in this classes' docstring. 350 """ 351 352 def setUp(other): 353 if hasattr(other, self.FILE_LIST_NAME): 354 raise TypeError( 355 "{!s} already has an attribute with the name {!s} which " 356 "would be overwritten by setUp()" 357 .format(other, self.FILE_LIST_NAME) 358 ) 359 setattr(other, self.FILE_LIST_NAME, []) 360 for f in self.files: 361 expanded_fname = other.expand_variables(f) 362 getattr(other, self.FILE_LIST_NAME).append( 363 self.setUp_file_action(expanded_fname) 364 ) 365 old_setUp(other) 366 return setUp 367 368 def setUp_file_action(self, expanded_file_name): 369 """ 370 This function is called on each file that is passed to the 371 constructor during the call of the decorated class' setUp(). 372 373 Parameters: 374 - expanded_file_name: the file's path expanded via 375 expand_variables from system_tests.Case 376 377 Returns: 378 This function should return a path that will be stored in the decorated 379 class' file list (the name is given by the attribute 380 FILE_LIST_NAME). The custom tearDown() function (that is returned by 381 self.new_tearDown()) iterates over this list and invokes 382 self.tearDown_file_action on each element in that list. 383 E.g. if a child class creates file copies, that should be deleted after 384 the test ran, then one would have to return the path of the copy, so 385 that tearDown() can delete the copies. 386 387 The default implementation does nothing. 388 """ 389 pass 390 391 def new_tearDown(self, old_tearDown): 392 """ 393 Returns a new tearDown() function that can be used as a class 394 member function. 395 396 It's functionality is described in this classes' docstring. 397 """ 398 399 def tearDown(other): 400 old_tearDown(other) 401 for f in getattr(other, self.FILE_LIST_NAME): 402 self.tearDown_file_action(f) 403 404 return tearDown 405 406 def tearDown_file_action(self, f): 407 """ 408 This function is called on each file in the decorated class' 409 file list (that list is populated during setUp()). 410 411 It can be used to perform cleanup operations after a test run. 412 413 Parameters: 414 - f: An element of the file list 415 416 Returns: 417 The return value is ignored 418 419 The default implementation removes f. 420 """ 421 os.remove(f) 422 423 def __call__(self, cls): 424 """ 425 Call operator for the usage as a decorator. It is 426 automatically used by Python when this class is used as a 427 decorator. 428 429 Parameters: 430 - cls: The decorated type. Must be a type 431 432 Returns: 433 - cls where the setUp and tearDown functions have been 434 replaced by the functions that are returned by 435 self.new_setUp() and self.new_tearDown() 436 """ 437 if not isinstance(cls, type): 438 raise ValueError("The decorator must be called on a type") 439 old_setUp = cls.setUp 440 cls.setUp = self.new_setUp(old_setUp) 441 442 old_tearDown = cls.tearDown 443 cls.tearDown = self.new_tearDown(old_tearDown) 444 445 return cls 446 447 448class CopyFiles(FileDecoratorBase): 449 """ 450 Decorator for subclasses of system_test.Case that automatically creates a 451 copy of the files specified as the parameters passed to the decorator. 452 453 Example: 454 >>> @CopyFiles("$some_var/file.txt", "$another_var/other_file.png") 455 ... class Foo(Case): 456 ... pass 457 458 The decorator will inject a new setUp method that at first calls the 459 already defined setUp(), then expands all supplied file names using 460 Case.expand_variables and then creates copies by appending '_copy' before 461 the file extension. The paths to the copies are stored in 462 self._copied_files. 463 464 The decorator also injects a new tearDown method that deletes all files in 465 self._files and then calls the original tearDown method. 466 467 This function will also complain if it is called without arguments or 468 without parenthesis, which is valid decorator syntax but is obviously a bug 469 in this case as it can result in tests not being run without a warning. 470 """ 471 472 #: override the name of the file list 473 FILE_LIST_NAME = '_copied_files' 474 475 def setUp_file_action(self, expanded_file_name): 476 fname, ext = os.path.splitext(expanded_file_name) 477 new_name = fname + '_copy' + ext 478 return shutil.copyfile(expanded_file_name, new_name) 479 480class CopyTmpFiles(FileDecoratorBase): 481 """ 482 This class copies files from test/data to test/tmp 483 Copied files are NOT removed in tearDown 484 Example: @CopyTmpFiles("$data_path/test_issue_1180.exv") 485 """ 486 487 #: override the name of the file list 488 FILE_LIST_NAME = '_tmp_files' 489 490 def setUp_file_action(self, expanded_file_name): 491 tmp_path = _config_variables['tmp_path'] 492 tmp_name = os.path.join(tmp_path,os.path.basename(expanded_file_name)) 493 return shutil.copyfile(expanded_file_name, tmp_name) 494 495 def tearDown_file_action(self, f): 496 """ 497 Do nothing. We don't clean up TmpFiles 498 """ 499 500class DeleteFiles(FileDecoratorBase): 501 """ 502 Decorator for subclasses of system_test.Case that automatically deletes all 503 files specified as the parameters passed to the decorator after the test 504 were run. 505 506 Example: 507 >>> @DeleteFiles("$some_var/an_output_file", "auxiliary_output.bin") 508 ... class Foo(Case): 509 ... pass 510 511 The decorator injects new setUp() and tearDown() functions. The new setUp() 512 at first calls the old setUp() and then saves all files that should be 513 deleted later in self._files_to_delete. The new tearDown() actually deletes 514 all files supplied to the decorator and then runs the original tearDown() 515 function. 516 """ 517 518 #: override the name of the file list 519 FILE_LIST_NAME = '_files_to_delete' 520 521 def setUp_file_action(self, expanded_file_name): 522 return expanded_file_name 523 524 525def path(path_string): 526 r""" 527 Converts a path which uses ``/`` as a separator into a path which uses the 528 path separator of the current operating system. 529 530 Example 531 ------- 532 533 >>> import platform 534 >>> sep = "\\" if platform.system() == "Windows" else "/" 535 >>> path("a/b") == "a" + sep + "b" 536 True 537 >>> path("a/more/complex/path") == sep.join(['a', 'more', 'complex', 'path']) 538 True 539 """ 540 return os.path.join(*path_string.split('/')) 541 542 543 """ 544 This function reads in the attributes commands, retval, stdout, stderr, 545 stdin and runs the `expand_variables` function on each. The resulting 546 commands are then run using the subprocess module and compared against the 547 expected values that were provided in the attributes via `compare_stdout` 548 and `compare_stderr`. Furthermore a threading.Timer is used to abort the 549 execution if a configured timeout is reached. 550 551 This function is automatically added as a member function to each system 552 test by the CaseMeta metaclass. This ensures that it is run by each system 553 test **after** setUp() and setUpClass() were run. 554 """ 555def test_run(self): 556 if not (len(self.commands) == len(self.retval) 557 == len(self.stdout) == len(self.stderr) == len(self.stdin)): 558 raise ValueError( 559 "commands, retval, stdout, stderr and stdin don't have the same " 560 "length" 561 ) 562 563 for i, command, retval, stdout, stderr, stdin in \ 564 zip(range(len(self.commands)), 565 self.commands, 566 self.retval, 567 self.stdout, 568 self.stderr, 569 self.stdin): 570 command, retval, stdout, stderr, stdin = [ 571 self.expand_variables(var) for var in 572 (command, retval, stdout, stderr, stdin) 573 ] 574 575 retval = int(retval) 576 577 if "memcheck" in _parameters: 578 command = _parameters["memcheck"] + " " + command 579 580 if _debug_mode: 581 print( 582 '', "="*80, "will run: " + command, "expected stdout:", stdout, 583 "expected stderr:", stderr, 584 "expected return value: {:d}".format(retval), 585 sep='\n' 586 ) 587 588 proc = subprocess.Popen( 589 _cmd_splitter(command), 590 stdout=subprocess.PIPE, 591 stderr=subprocess.PIPE, 592 stdin=subprocess.PIPE if stdin is not None else None, 593 env=self._get_env(), 594 cwd=self.work_dir, 595 shell=_SUBPROCESS_SHELL 596 ) 597 598 # Setup a threading.Timer which will terminate the command if it takes 599 # too long. Don't use the timeout parameter in subprocess.Popen, since 600 # that is not available for all Python 3 versions. 601 # Use a dictionary to indicate a timeout, as booleans get passed by 602 # value and the changes made timeout_reached function will not be 603 # visible once it exits (the command will still be terminated once the 604 # timeout expires). 605 timeout = {"flag": False} 606 607 def timeout_reached(tmout): 608 tmout["flag"] = True 609 proc.kill() 610 611 t = threading.Timer( 612 _parameters["timeout"], timeout_reached, args=[timeout] 613 ) 614 615 def get_encode_err(): 616 """ Return an error message indicating that the encoding of stdin 617 failed. 618 """ 619 return "Could not encode stdin {!s} for the command {!s} with the"\ 620 " following encodings: {!s}"\ 621 .format(stdin, command, ','.join(self.encodings)) 622 623 # Prepare stdin: try to encode it or keep it at None if it was not 624 # provided 625 encoded_stdin = None 626 if stdin is not None: 627 encoded_stdin = self._encode( 628 stdin, lambda data_in, encoding: data_in.encode(encoding), 629 get_encode_err 630 ) 631 632 if _debug_mode: 633 print('', "stdin:", stdin or "", sep='\n') 634 635 t.start() 636 got_stdout, got_stderr = proc.communicate(input=encoded_stdin) 637 t.cancel() 638 639 def get_decode_error(): 640 """ Return an error indicating the the decoding of stdout/stderr 641 failed. 642 """ 643 return "Could not decode the output of the command '{!s}' with "\ 644 "the following encodings: {!s}"\ 645 .format(command, ','.join(self.encodings)) 646 647 def decode_output(data_in, encoding): 648 """ Decode stdout/stderr, consider platform dependent line 649 endings. 650 """ 651 return _process_output_post(data_in.decode(encoding)) 652 653 processed_stdout, processed_stderr = [ 654 self._encode(output, decode_output, get_decode_error) 655 for output in (got_stdout, got_stderr) 656 ] 657 658 if _debug_mode: 659 print( 660 "got stdout:", processed_stdout, "got stderr:", 661 processed_stderr, "got return value: {:d}" 662 .format(proc.returncode), 663 sep='\n' 664 ) 665 666 self.assertFalse(timeout["flag"], msg="Timeout reached") 667 self.compare_stderr(i, command, processed_stderr, stderr) 668 self.compare_stdout(i, command, processed_stdout, stdout) 669 self.assertEqual( 670 retval, proc.returncode, msg="Return value does not match" 671 ) 672 673 self.post_command_hook(i, command) 674 675 self.post_tests_hook() 676 677 678class Case(unittest.TestCase): 679 """ 680 System test case base class, provides the functionality to interpret static 681 class members as system tests. 682 683 The class itself only provides utility functions and system tests need not 684 inherit from it, as it is automatically added via the CaseMeta metaclass. 685 """ 686 687 #: maxDiff set so that arbitrarily large diffs will be shown 688 maxDiff = None 689 690 #: list of encodings that are used to decode the test program's output 691 #: the first encoding that does not raise a UnicodeError is used 692 encodings = ['utf-8', 'iso-8859-1'] 693 694 inherit_env = True 695 696 @classmethod 697 def setUpClass(cls): 698 """ 699 This function adds the variable work_dir to the class, which is the 700 path to the directory where the python source file is located. 701 """ 702 cls.work_dir = os.path.dirname(inspect.getfile(cls)) 703 704 def _get_env(self): 705 """ Return an appropriate env value for subprocess.Popen. 706 707 This function returns either an appropriately populated dictionary or 708 None (the latter if this class has no attribute env). If a dictionary 709 is returned, then it will be either exactly self.env (when inherit_env 710 is False) or a copy of the current environment merged with self.env 711 (the values from self.env take precedence). 712 """ 713 if not hasattr(self, "env"): 714 return None 715 716 if not self.inherit_env: 717 return self.env 718 719 env_copy = os.environ.copy() 720 for key in self.env: 721 env_copy[key] = self.env[key] 722 723 return env_copy 724 725 def _encode(self, data_in, encode_action, get_err): 726 """ 727 Try to convert data_in via encode_action using the encodings in 728 self.encodings. 729 730 This function tries all encodings in self.encodings to run 731 encode_action with the parameters (data_in, encoding). If encode_action 732 raises a UnicodeError, the next encoding is used, otherwise the result 733 of encode_action is returned. If an encoding is equal to the type 734 bytes, then data_in is returned unmodified. 735 736 If all encodings result in a UnicodeError, then the conversion is 737 considered unsuccessful and get_err() is called to obtain an error 738 string which is raised as a ValueError. 739 """ 740 result = None 741 for encoding in self.encodings: 742 if encoding == bytes: 743 return data_in 744 try: 745 result = encode_action(data_in, encoding) 746 except UnicodeError: 747 pass 748 else: 749 break 750 if result is None: 751 raise ValueError(get_err()) 752 753 return result 754 755 def _compare_output(self, i, command, got, expected, msg=None): 756 """ Compares the expected and actual output of a test case. """ 757 if isinstance(got, bytes): 758 self.assertEqual(got, expected, msg=msg) 759 else: 760 self.assertMultiLineEqual( 761 expected, got, msg=msg 762 ) 763 764 def compare_stdout(self, i, command, got_stdout, expected_stdout): 765 """ 766 Function to compare whether the expected & obtained stdout match. 767 768 This function is automatically invoked by test_run with the following 769 parameters: 770 i - the index of the current command that is run in self.commands 771 command - the command that was run 772 got_stdout - the obtained stdout, post-processed depending on the 773 platform so that lines always end with \n 774 expected_stdout - the expected stdout extracted from self.stdout 775 776 The default implementation uses assertMultiLineEqual from 777 unittest.TestCase for ordinary strings and assertEqual for binary 778 output. This function can be overridden in a child class to implement a 779 custom check. 780 """ 781 self._compare_output( 782 i, command, expected_stdout, got_stdout, 783 msg="Standard output does not match" 784 ) 785 786 def compare_stderr(self, i, command, got_stderr, expected_stderr): 787 """ Same as compare_stdout only for standard-error. """ 788 self._compare_output( 789 i, command, expected_stderr, got_stderr, 790 msg="Standard error does not match" 791 ) 792 793 def expand_variables(self, unexpanded_string): 794 """ 795 Expands all variables of the form ``$var`` in the given string using 796 the dictionary `variable_dict`. 797 798 The expansion itself is performed by the string's template module using 799 the function `safe_substitute`. 800 801 If unexpanded_string is of the type bytes, then no expansion is 802 performed. 803 """ 804 if isinstance(unexpanded_string, bytes) or unexpanded_string is None: 805 return unexpanded_string 806 807 return string.Template(str(unexpanded_string))\ 808 .safe_substitute(**self.variable_dict) 809 810 def post_command_hook(self, i, command): 811 """ Function that is run after the successful execution of one command. 812 813 It is invoked with the following parameters: 814 i - the index of the current command that is run in self.commands 815 command - the command that was run 816 817 It should return nothing. 818 819 This function can be overridden to perform additional checks after the 820 command ran, for instance it can check whether files were created. 821 822 The default implementation does nothing. 823 """ 824 pass 825 826 def post_tests_hook(self): 827 """ 828 Function that is run after the successful execution all commands. It 829 should return nothing. 830 831 This function can be overridden to run additional checks that only make 832 sense after all commands ran. 833 834 The default implementation does nothing. 835 """ 836 pass 837 838 839class CaseMeta(type): 840 """ System tests generation metaclass. 841 842 This metaclass is performs the following tasks: 843 844 1. Add the `test_run` function as a member of the test class 845 2. Add the `Case` class as the parent class 846 3. Expand all variables already defined in the class, so that any 847 additional code does not have to perform this task 848 849 Using a metaclass instead of inheriting from Case has the advantage, that 850 we can expand all variables in the strings before any test case or even the 851 class constructor is run! Thus users will immediately see the expanded 852 result. Also adding the `test_run` function as a direct member and not via 853 inheritance enforces that it is being run **after** the test cases setUp & 854 setUpClass (which oddly enough seems not to be the case in the unittest 855 module where test functions of the parent class run before setUpClass of 856 the child class). 857 """ 858 859 def __new__(mcs, clsname, bases, dct): 860 861 assert len(_parameters) != 0, \ 862 "Internal error: substitution dictionary not populated" 863 864 changed = True 865 866 # expand all non-private variables by brute force 867 # => try all expanding all elements defined in the current class until 868 # there is no change in them any more 869 keys = [key for key in list(dct.keys()) if not key.startswith('_')] 870 while changed: 871 changed = False 872 873 for key in keys: 874 875 old_value = dct[key] 876 877 # only try expanding strings and lists 878 if isinstance(old_value, str): 879 new_value = string.Template(old_value).safe_substitute( 880 **_disjoint_dict_merge(dct, _config_variables) 881 ) 882 elif isinstance(old_value, list): 883 # do not try to expand anything but strings in the list 884 new_value = [ 885 string.Template(elem).safe_substitute( 886 **_disjoint_dict_merge(dct, _config_variables) 887 ) 888 if isinstance(elem, str) else elem 889 for elem in old_value 890 ] 891 else: 892 continue 893 894 if old_value != new_value: 895 changed = True 896 dct[key] = new_value 897 898 dct['variable_dict'] = _disjoint_dict_merge(dct, _config_variables) 899 dct['test_run'] = test_run 900 901 if Case not in bases: 902 bases += (Case,) 903 904 CaseMeta.add_default_values(clsname, dct) 905 906 return super(CaseMeta, mcs).__new__(mcs, clsname, bases, dct) 907 908 @staticmethod 909 def add_default_values(clsname, dct): 910 if 'commands' not in dct: 911 raise ValueError( 912 "No member 'commands' in class {!s}.".format(clsname) 913 ) 914 915 cmd_length = len(dct['commands']) 916 917 for member, default in zip( 918 ('stderr', 'stdout', 'stdin', 'retval'), 919 ('', '', None, 0)): 920 if member not in dct: 921 dct[member] = [default] * cmd_length 922 923 924def check_no_ASAN_UBSAN_errors(self, i, command, got_stderr, expected_stderr): 925 """ 926 Drop-in replacement for the default Case.compare_stderr() function that 927 **only** checks for any signs of ASAN (address sanitizer) and UBSAN 928 (undefined behavior sanitizer). 929 930 Parameters: 931 - i, command, expected_stderr: ignored 932 - got_stderr: checked for signs of ASAN und UBSAN error messages 933 934 This function ignores the expected output to stderr! It is intended for 935 test cases where standard error is filled with useless debugging 936 messages/warnings that are not really relevant for the test and not worth 937 storing in the test suite. This function can be used to still be able to 938 catch ASAN & UBSAN error messages. 939 940 Example usage 941 ------------- 942 943 Override the default compare_stderr function in your subclass of Case with 944 this function: 945 >>> class TestNoAsan(Case): 946 ... compare_stderr = check_no_ASAN_UBSAN_errors 947 948 >>> T = TestNoAsan() 949 950 The new compare_stderr will only complain if there are strings inside the 951 obtained stderr which could be an error reported by ASAN/UBSAN: 952 >>> T.compare_stderr(0, "", "runtime error: load of value 190", "some output") 953 Traceback (most recent call last): 954 .. 955 AssertionError: 'runtime error' unexpectedly found in 'runtime error: load of value 190' 956 957 >>> T.compare_stderr(0, "", "SUMMARY: AddressSanitizer: heap-buffer-overflow", "") 958 Traceback (most recent call last): 959 .. 960 AssertionError: 'AddressSanitizer' unexpectedly found in 'SUMMARY: AddressSanitizer: heap-buffer-overflow' 961 962 It will not complain in all other cases, especially when expected_stderr 963 and got_stderr do not match: 964 >>> T.compare_stderr(0, "", "some output", "other output") 965 966 This function also supports binary output: 967 >>> ASAN_ERROR = bytes("SUMMARY: AddressSanitizer: heap-buffer-overflow", encoding='ascii') 968 >>> T.compare_stderr(0, "", ASAN_ERROR, "other output") 969 Traceback (most recent call last): 970 .. 971 AssertionError: b'AddressSanitizer' unexpectedly found in b'SUMMARY: AddressSanitizer: heap-buffer-overflow' 972 """ 973 UBSAN_MSG = "runtime error" 974 ASAN_MSG = "AddressSanitizer" 975 976 if isinstance(got_stderr, bytes): 977 self.assertNotIn(UBSAN_MSG.encode('ascii'), got_stderr) 978 self.assertNotIn(ASAN_MSG.encode('ascii'), got_stderr) 979 return 980 981 self.assertNotIn(UBSAN_MSG, got_stderr) 982 self.assertNotIn(ASAN_MSG, got_stderr) 983 984