1''' 2testcode2 3--------- 4 5A framework for regression testing numerical programs. 6 7:copyright: (c) 2012 James Spencer. 8:license: modified BSD; see LICENSE for more details. 9''' 10 11import glob 12import os 13import pipes 14import shutil 15import subprocess 16import sys 17import warnings 18 19try: 20 import yaml 21 _HAVE_YAML = True 22except ImportError: 23 _HAVE_YAML = False 24 25try: 26 import importlib 27 _HAVE_IMPORTLIB_ = True 28except ImportError: 29 _HAVE_IMPORTLIB_ = False 30 31import testcode2.dir_lock as dir_lock 32import testcode2.exceptions as exceptions 33import testcode2.queues as queues 34import testcode2.compatibility as compat 35import testcode2.util as util 36import testcode2.validation as validation 37 38DIR_LOCK = dir_lock.DirLock() 39 40# Do not change! Bad things will happen... 41_FILESTEM_TUPLE = ( 42 ('test', 'test.out'), 43 ('error', 'test.err'), 44 ('benchmark', 'benchmark.out'), 45 ) 46_FILESTEM_DICT = dict( _FILESTEM_TUPLE ) 47# We can change FILESTEM if needed. 48# However, this should only be done to compare two sets of test output or two 49# sets of benchmarks. 50# Bad things will happen if tests are run without the default FILESTEM! 51FILESTEM = dict( _FILESTEM_TUPLE ) 52 53class TestProgram: 54 '''Store and access information about the program being tested.''' 55 def __init__(self, name, exe, test_id, benchmark, **kwargs): 56 57 # Set sane defaults (mostly null) for keyword arguments. 58 59 self.name = name 60 61 # Running 62 self.exe = exe 63 self.test_id = test_id 64 self.run_cmd_template = ('tc.program tc.args tc.input > ' 65 'tc.output 2> tc.error') 66 self.launch_parallel = 'mpirun -np tc.nprocs' 67 self.submit_pattern = 'testcode.run_cmd' 68 69 # dummy job with default settings (e.g tolerance) 70 self.default_test_settings = None 71 72 # Analysis 73 self.benchmark = benchmark 74 self.ignore_fields = [] 75 self.data_tag = None 76 self.extract_cmd_template = 'tc.extract tc.args tc.file' 77 self.extract_program = None 78 self.extract_args = '' 79 self.extract_fmt = 'table' 80 self.skip_cmd_template = 'tc.skip tc.args tc.test' 81 self.skip_program = None 82 self.skip_args = '' 83 self.verify = False 84 self.extract_fn = None 85 # By default, the job is expected to exit with error code 0. 86 # Setting it to True will discard the exit status/error code. 87 self.can_fail = False 88 89 # Info 90 self.vcs = None 91 92 # Set values passed in as keyword options. 93 for (attr, val) in kwargs.items(): 94 setattr(self, attr, val) 95 96 # If using an external verification program, then set the default 97 # extract command template. 98 if self.verify and 'extract_cmd_template' not in kwargs: 99 self.extract_cmd_template = 'tc.extract tc.args tc.test tc.bench' 100 101 if self.extract_fn: 102 if _HAVE_IMPORTLIB_: 103 self.extract_fn = self.extract_fn.split() 104 if len(self.extract_fn) == 2: 105 sys.path.append(self.extract_fn[0]) 106 (mod, fn) = self.extract_fn[-1].rsplit('.', 1) 107 mod = importlib.import_module(mod) 108 self.extract_fn = mod.__getattribute__(fn) 109 elif self.extract_program: 110 warnings.warn('importlib not available. Will attempt to ' 111 'analyse data via an external script.') 112 self.extract_fn = None 113 else: 114 raise exceptions.TestCodeError('importlib not available and ' 115 'no data extraction program supplied.') 116 117 # Can we actually extract the data? 118 if self.extract_fmt == 'yaml' and not _HAVE_YAML: 119 err = 'YAML data format cannot be used: PyYAML is not installed.' 120 raise exceptions.TestCodeError(err) 121 122 def run_cmd(self, input_file, args, nprocs=0): 123 '''Create run command.''' 124 output_file = util.testcode_filename(FILESTEM['test'], self.test_id, 125 input_file, args) 126 error_file = util.testcode_filename(FILESTEM['error'], self.test_id, 127 input_file, args) 128 129 # Need to escape filenames for passing them to the shell. 130 exe = pipes.quote(self.exe) 131 output_file = pipes.quote(output_file) 132 error_file = pipes.quote(error_file) 133 134 cmd = self.run_cmd_template.replace('tc.program', exe) 135 if type(input_file) is str: 136 input_file = pipes.quote(input_file) 137 cmd = cmd.replace('tc.input', input_file) 138 else: 139 cmd = cmd.replace('tc.input', '') 140 if type(args) is str: 141 cmd = cmd.replace('tc.args', args) 142 else: 143 cmd = cmd.replace('tc.args', '') 144 cmd = cmd.replace('tc.output', output_file) 145 cmd = cmd.replace('tc.error', error_file) 146 if nprocs > 0 and self.launch_parallel: 147 cmd = '%s %s' % (self.launch_parallel, cmd) 148 cmd = cmd.replace('tc.nprocs', str(nprocs)) 149 return cmd 150 151 def extract_cmd(self, path, input_file, args): 152 '''Create extraction command(s).''' 153 test_file = util.testcode_filename(FILESTEM['test'], self.test_id, 154 input_file, args) 155 bench_file = self.select_benchmark_file(path, input_file, args) 156 cmd = self.extract_cmd_template 157 cmd = cmd.replace('tc.extract', pipes.quote(self.extract_program)) 158 cmd = cmd.replace('tc.args', self.extract_args) 159 if self.verify: 160 # Single command to compare benchmark and test outputs. 161 cmd = cmd.replace('tc.test', pipes.quote(test_file)) 162 cmd = cmd.replace('tc.bench', pipes.quote(bench_file)) 163 return (cmd,) 164 else: 165 # Need to return commands to extract data from the test and 166 # benchmark outputs. 167 test_cmd = cmd.replace('tc.file', pipes.quote(test_file)) 168 bench_cmd = cmd.replace('tc.file', pipes.quote(bench_file)) 169 return (bench_cmd, test_cmd) 170 171 def skip_cmd(self, input_file, args): 172 '''Create skip command.''' 173 test_file = util.testcode_filename(FILESTEM['test'], self.test_id, 174 input_file, args) 175 error_file = util.testcode_filename(FILESTEM['error'], self.test_id, 176 input_file, args) 177 cmd = self.skip_cmd_template 178 cmd = cmd.replace('tc.skip', pipes.quote(self.skip_program)) 179 cmd = cmd.replace('tc.args', self.skip_args) 180 cmd = cmd.replace('tc.test', pipes.quote(test_file)) 181 cmd = cmd.replace('tc.error', pipes.quote(error_file)) 182 return cmd 183 184 def select_benchmark_file(self, path, input_file, args): 185 '''Find the first benchmark file out of all benchmark IDs which exists.''' 186 187 benchmark = None 188 benchmarks = [] 189 for bench_id in self.benchmark: 190 benchfile = util.testcode_filename(FILESTEM['benchmark'], bench_id, 191 input_file, args) 192 benchmarks.append(benchfile) 193 if os.path.exists(os.path.join(path, benchfile)): 194 benchmark = benchfile 195 break 196 if not benchmark: 197 err = 'No benchmark found in %s. Checked for: %s.' 198 raise exceptions.TestCodeError(err % (path, ', '.join(benchmarks))) 199 return benchmark 200 201class Test: 202 '''Store and execute a test.''' 203 def __init__(self, name, test_program, path, **kwargs): 204 205 self.name = name 206 207 # program 208 self.test_program = test_program 209 210 # running 211 self.path = path 212 self.inputs_args = None 213 self.output = None 214 self.nprocs = 0 215 self.min_nprocs = 0 216 self.max_nprocs = compat.maxint 217 self.submit_template = None 218 # Run jobs in this concurrently rather than consecutively? 219 # Only used when setting tests up in testcode2.config: if true then 220 # each pair of input file and arguments are assigned to a different 221 # Test object rather than a single Test object. 222 self.run_concurrent = False 223 224 # Analysis 225 self.default_tolerance = None 226 self.tolerances = {} 227 228 # Set values passed in as keyword options. 229 for (attr, val) in kwargs.items(): 230 setattr(self, attr, val) 231 232 if not self.inputs_args: 233 self.inputs_args = [('', '')] 234 235 self.status = dict( (inp_arg, None) for inp_arg in self.inputs_args ) 236 237 # 'Decorate' functions which require a directory lock in order for file 238 # access to be thread-safe. 239 # As we use the in_dir decorator, which requires knowledge of the test 240 # directory (a per-instance property), we cannot use the @decorator 241 # syntactic sugar. Fortunately we can still modify them at 242 # initialisation time. Thank you python for closures! 243 self.start_job = DIR_LOCK.in_dir(self.path)(self._start_job) 244 self.move_output_to_test_output = DIR_LOCK.in_dir(self.path)( 245 self._move_output_to_test_output) 246 self.move_old_output_files = DIR_LOCK.in_dir(self.path)( 247 self._move_old_output_files) 248 self.verify_job = DIR_LOCK.in_dir(self.path)(self._verify_job) 249 self.skip_job = DIR_LOCK.in_dir(self.path)(self._skip_job) 250 251 def __hash__(self): 252 return hash(self.path) 253 254 def __eq__(self, other): 255 if not isinstance(other, self.__class__): 256 return False 257 else: 258 # Compare values we care about... 259 cmp_vals = ['test_program', 'path', 'inputs_args', 'output', 260 'nprocs', 'min_nprocs', 'max_nprocs', 'submit_template', 261 'default_tolerance', 'tolerances', 'status'] 262 comparison = tuple(getattr(other, cmp_val) == getattr(self, cmp_val) for cmp_val in cmp_vals) 263 return compat.compat_all(comparison) 264 265 def run_test(self, verbose=1, cluster_queue=None, rundir=None): 266 '''Run all jobs in test.''' 267 268 try: 269 # Construct tests. 270 test_cmds = [] 271 test_files = [] 272 for (test_input, test_arg) in self.inputs_args: 273 if (test_input and 274 not os.path.exists(os.path.join(self.path,test_input))): 275 err = 'Input file does not exist: %s' % (test_input,) 276 raise exceptions.RunError(err) 277 test_cmds.append(self.test_program.run_cmd(test_input, test_arg, 278 self.nprocs)) 279 test_files.append(util.testcode_filename(FILESTEM['test'], 280 self.test_program.test_id, test_input, test_arg)) 281 282 # Move files matching output pattern out of the way. 283 self.move_old_output_files(verbose) 284 285 # Run tests one-at-a-time locally or submit job in single submit 286 # file to a queueing system. 287 if cluster_queue: 288 if self.output: 289 for (ind, test) in enumerate(test_cmds): 290 # Don't quote self.output if it contains any wildcards 291 # (assume the user set it up correctly!) 292 out = self.output 293 if not compat.compat_any(wild in self.output for wild in 294 ['*', '?', '[', '{']): 295 out = pipes.quote(self.output) 296 test_cmds[ind] = '%s; mv %s %s' % (test_cmds[ind], 297 out, pipes.quote(test_files[ind])) 298 test_cmds = ['\n'.join(test_cmds)] 299 for (ind, test) in enumerate(test_cmds): 300 job = self.start_job(test, cluster_queue, verbose) 301 job.wait() 302 # Analyse tests as they finish. 303 if cluster_queue: 304 # Did all of them at once. 305 for (test_input, test_arg) in self.inputs_args: 306 self.verify_job(test_input, test_arg, verbose, rundir) 307 else: 308 # Did one job at a time. 309 (test_input, test_arg) = self.inputs_args[ind] 310 err = [] 311 if self.output: 312 try: 313 self.move_output_to_test_output(test_files[ind]) 314 except exceptions.RunError: 315 err.append(sys.exc_info()[1]) 316 status = validation.Status() 317 if job.returncode != 0: 318 if not self.test_program.can_fail: 319 err.insert(0, 'Error running job. Return code: %i' 320 % job.returncode) 321 (status, msg) = self.skip_job(test_input, test_arg, 322 verbose) 323 if status.skipped(): 324 self._update_status(status, (test_input, test_arg)) 325 if verbose > 0 and verbose < 3: 326 sys.stdout.write( 327 util.info_line(self.path, 328 test_input, test_arg, rundir) 329 ) 330 status.print_status(msg, verbose) 331 elif err: 332 # re-raise first error we hit. 333 raise exceptions.RunError(err[0]) 334 else: 335 self.verify_job(test_input, test_arg, verbose, rundir) 336 sys.stdout.flush() 337 except exceptions.RunError: 338 err = sys.exc_info()[1] 339 if verbose > 2: 340 err = 'Test(s) in %s failed.\n%s' % (self.path, err) 341 status = validation.Status([False]) 342 self._update_status(status, (test_input, test_arg)) 343 if verbose > 0 and verbose < 3: 344 info_line = util.info_line(self.path, test_input, test_arg, rundir) 345 sys.stdout.write(info_line) 346 status.print_status(err, verbose) 347 # Shouldn't run remaining tests after such a catastrophic failure. 348 # Mark all remaining tests as skipped so the user knows that they 349 # weren't run. 350 err = 'Previous test in %s caused a system failure.' % (self.path) 351 status = validation.Status(name='skipped') 352 for ((test_input, test_arg), stat) in self.status.items(): 353 if not self.status[(test_input,test_arg)]: 354 self._update_status(status, (test_input, test_arg)) 355 if verbose > 2: 356 cmd = self.test_program.run_cmd(test_input, test_arg, 357 self.nprocs) 358 print('Test using %s in %s' % (cmd, self.path)) 359 elif verbose > 0: 360 info_line = util.info_line(self.path, test_input, 361 test_arg, rundir) 362 sys.stdout.write(info_line) 363 status.print_status(err, verbose) 364 sys.stdout.flush() 365 366 def _start_job(self, cmd, cluster_queue=None, verbose=1): 367 '''Start test running. Requires directory lock. 368 369IMPORTANT: use self.start_job rather than self._start_job if using multiple 370threads. 371 372Decorated to start_job, which acquires directory lock and enters self.path 373first, during initialisation.''' 374 375 if cluster_queue: 376 tp_ptr = self.test_program 377 submit_file = '%s.%s' % (os.path.basename(self.submit_template), 378 tp_ptr.test_id) 379 job = queues.ClusterQueueJob(submit_file, system=cluster_queue) 380 job.create_submit_file(tp_ptr.submit_pattern, cmd, 381 self.submit_template) 382 if verbose > 2: 383 print('Submitting tests using %s (template submit file) in %s' 384 % (self.submit_template, self.path)) 385 job.start_job() 386 else: 387 # Run locally via subprocess. 388 if verbose > 2: 389 print('Running test using %s in %s\n' % (cmd, self.path)) 390 try: 391 job = subprocess.Popen(cmd, shell=True) 392 except OSError: 393 # slightly odd syntax in order to be compatible with python 2.5 394 # and python 2.6/3 395 err = 'Execution of test failed: %s' % (sys.exc_info()[1],) 396 raise exceptions.RunError(err) 397 398 # Return either Popen object or ClusterQueueJob object. Both have 399 # a wait method which returns only once job has finished. 400 return job 401 402 def _move_output_to_test_output(self, test_files_out): 403 '''Move output to the testcode output file. Requires directory lock. 404 405This is used when a program writes to standard output rather than to STDOUT. 406 407IMPORTANT: use self.move_output_to_test_output rather than 408self._move_output_to_test_output if using multiple threads. 409 410Decorated to move_output_to_test_output, which acquires the directory lock and 411enters self.path. 412''' 413 # self.output might be a glob which works with e.g. 414 # mv self.output test_files[ind] 415 # if self.output matches only one file. Reproduce that 416 # here so that running tests through the queueing system 417 # and running tests locally have the same behaviour. 418 out_files = glob.glob(self.output) 419 if len(out_files) == 1: 420 shutil.move(out_files[0], test_files_out) 421 else: 422 err = ('Output pattern (%s) matches %s files (%s).' 423 % (self.output, len(out_files), out_files)) 424 raise exceptions.RunError(err) 425 426 def _move_old_output_files(self, verbose=1): 427 '''Move output to the testcode output file. Requires directory lock. 428 429This is used when a program writes to standard output rather than to STDOUT. 430 431IMPORTANT: use self.move_oold_output_files rather than 432self._move_old_output_files if using multiple threads. 433 434Decorated to move_old_output_files, which acquires the directory lock and 435enters self.path. 436''' 437 if self.output: 438 old_out_files = glob.glob(self.output) 439 if old_out_files: 440 out_dir = 'test.prev.output.%s' % (self.test_program.test_id) 441 if verbose > 2: 442 print('WARNING: found existing files matching output ' 443 'pattern: %s.' % self.output) 444 print('WARNING: moving existing output files (%s) to %s.\n' 445 % (', '.join(old_out_files), out_dir)) 446 if not os.path.exists(out_dir): 447 os.mkdir(out_dir) 448 for out_file in old_out_files: 449 shutil.move(out_file, out_dir) 450 451 def _verify_job(self, input_file, args, verbose=1, rundir=None): 452 '''Check job against benchmark. 453 454Assume function is executed in self.path. 455 456IMPORTANT: use self.verify_job rather than self._verify_job if using multiple 457threads. 458 459Decorated to verify_job, which acquires directory lock and enters self.path 460first, during initialisation.''' 461 # We already have DIR_LOCK, so use _skip_job instead of skip_job. 462 (status, msg) = self._skip_job(input_file, args, verbose) 463 try: 464 if self.test_program.verify and not status.skipped(): 465 (status, msg) = self.verify_job_external(input_file, args, 466 verbose) 467 elif not status.skipped(): 468 (bench_out, test_out) = self.extract_data(input_file, args, 469 verbose) 470 (comparable, status, msg) = validation.compare_data(bench_out, 471 test_out, self.default_tolerance, self.tolerances, 472 self.test_program.ignore_fields) 473 if verbose > 2: 474 # Include data tables in output. 475 if comparable: 476 # Combine test and benchmark dictionaries. 477 data_table = util.pretty_print_table( 478 ['benchmark', 'test'], 479 [bench_out, test_out]) 480 else: 481 # Print dictionaries separately--couldn't even compare 482 # them! 483 data_table = '\n'.join(( 484 util.pretty_print_table(['benchmark'], [bench_out]), 485 util.pretty_print_table(['test '], [test_out]))) 486 if msg.strip(): 487 # join data table with error message from 488 # validation.compare_data. 489 msg = '\n'.join((msg, data_table)) 490 else: 491 msg = data_table 492 except (exceptions.AnalysisError, exceptions.TestCodeError): 493 if msg.strip(): 494 msg = '%s\n%s' % (msg, sys.exc_info()[1]) 495 else: 496 msg = sys.exc_info()[1] 497 status = validation.Status([False]) 498 499 self._update_status(status, (input_file, args)) 500 if verbose > 0 and verbose < 3: 501 info_line = util.info_line(self.path, input_file, args, rundir) 502 sys.stdout.write(info_line) 503 status.print_status(msg, verbose) 504 505 return (status, msg) 506 507 def _skip_job(self, input_file, args, verbose=1): 508 '''Run user-supplied command to check if test should be skipped. 509 510IMPORTANT: use self.skip_job rather than self._skip_job if using multiple 511threads. 512 513Decorated to skip_job, which acquires directory lock and enters self.path 514first, during initialisation.''' 515 status = validation.Status() 516 if self.test_program.skip_program: 517 cmd = self.test_program.skip_cmd(input_file, args) 518 try: 519 if verbose > 2: 520 print('Testing whether to skip test using %s in %s.' % 521 (cmd, self.path)) 522 skip_popen = subprocess.Popen(cmd, shell=True, 523 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 524 skip_popen.wait() 525 if skip_popen.returncode == 0: 526 # skip this test 527 status = validation.Status(name='skipped') 528 except OSError: 529 # slightly odd syntax in order to be compatible with python 530 # 2.5 and python 2.6/3 531 if verbose > 2: 532 print('Test to skip test: %s' % (sys.exc_info()[1],)) 533 return (status, '') 534 535 def verify_job_external(self, input_file, args, verbose=1): 536 '''Run user-supplied verifier script. 537 538Assume function is executed in self.path.''' 539 verify_cmd, = self.test_program.extract_cmd(self.path, input_file, args) 540 try: 541 if verbose > 2: 542 print('Analysing test using %s in %s.' % 543 (verify_cmd, self.path)) 544 verify_popen = subprocess.Popen(verify_cmd, shell=True, 545 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 546 verify_popen.wait() 547 except OSError: 548 # slightly odd syntax in order to be compatible with python 2.5 549 # and python 2.6/3 550 err = 'Analysis of test failed: %s' % (sys.exc_info()[1],) 551 raise exceptions.AnalysisError(err) 552 output = verify_popen.communicate()[0].decode('utf-8') 553 if verbose < 2: 554 # Suppress output. (hackhack) 555 output = '' 556 if verify_popen.returncode == 0: 557 return (validation.Status([True]), output) 558 else: 559 return (validation.Status([False]), output) 560 561 def extract_data(self, input_file, args, verbose=1): 562 '''Extract data from output file. 563 564Assume function is executed in self.path.''' 565 tp_ptr = self.test_program 566 data_files = [ 567 tp_ptr.select_benchmark_file(self.path, input_file, args), 568 util.testcode_filename(FILESTEM['test'], 569 tp_ptr.test_id, input_file, args), 570 ] 571 if tp_ptr.data_tag: 572 # Using internal data extraction function. 573 if verbose > 2: 574 print('Analysing output using data_tag %s in %s on files %s.' % 575 (tp_ptr.data_tag, self.path, ' and '.join(data_files))) 576 outputs = [util.extract_tagged_data(tp_ptr.data_tag, dfile) 577 for dfile in data_files] 578 elif tp_ptr.extract_fn: 579 if verbose > 2: 580 print('Analysing output using function %s in %s on files %s.' % 581 (tp_ptr.extract_fn.__name__, self.path, 582 ' and '.join(data_files))) 583 outputs = [tp_ptr.extract_fn(dfile) for dfile in data_files] 584 else: 585 # Using external data extraction script. 586 # Get extraction commands. 587 extract_cmds = tp_ptr.extract_cmd(self.path, input_file, args) 588 589 # Extract data. 590 outputs = [] 591 for cmd in extract_cmds: 592 try: 593 if verbose > 2: 594 print('Analysing output using %s in %s.' % 595 (cmd, self.path)) 596 extract_popen = subprocess.Popen(cmd, shell=True, 597 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 598 extract_popen.wait() 599 except OSError: 600 # slightly odd syntax in order to be compatible with python 601 # 2.5 and python 2.6/3 602 err = 'Analysing output failed: %s' % (sys.exc_info()[1],) 603 raise exceptions.AnalysisError(err) 604 # Convert data string from extract command to dictionary format. 605 if extract_popen.returncode != 0: 606 err = extract_popen.communicate()[1].decode('utf-8') 607 err = 'Analysing output failed: %s' % (err) 608 raise exceptions.AnalysisError(err) 609 data_string = extract_popen.communicate()[0].decode('utf-8') 610 if self.test_program.extract_fmt == 'table': 611 outputs.append(util.dict_table_string(data_string)) 612 elif self.test_program.extract_fmt == 'yaml': 613 outputs.append({}) 614 # convert values to be in a tuple so the format matches 615 # that from dict_table_string. 616 # ensure all keys are strings so they can be sorted 617 # (different data types cause problems!) 618 for (key, val) in yaml.safe_load(data_string).items(): 619 if isinstance(val, list): 620 outputs[-1][str(key)] = tuple(val) 621 else: 622 outputs[-1][str(key)] = tuple((val,)) 623 624 return tuple(outputs) 625 626 def create_new_benchmarks(self, benchmark, copy_files_since=None, 627 copy_files_path='testcode_data'): 628 '''Copy the test files to benchmark files.''' 629 630 oldcwd = os.getcwd() 631 os.chdir(self.path) 632 633 test_files = [] 634 for (inp, arg) in self.inputs_args: 635 test_file = util.testcode_filename(FILESTEM['test'], 636 self.test_program.test_id, inp, arg) 637 err_file = util.testcode_filename(FILESTEM['error'], 638 self.test_program.test_id, inp, arg) 639 bench_file = util.testcode_filename(_FILESTEM_DICT['benchmark'], 640 benchmark, inp, arg) 641 test_files.extend((test_file, err_file, bench_file)) 642 shutil.copy(test_file, bench_file) 643 644 if copy_files_since: 645 if not os.path.isdir(copy_files_path): 646 os.mkdir(copy_files_path) 647 if os.path.isdir(copy_files_path): 648 for data_file in glob.glob('*'): 649 if (os.path.isfile(data_file) and 650 os.stat(data_file)[-2] >= copy_files_since and 651 data_file not in test_files): 652 bench_data_file = os.path.join(copy_files_path, 653 data_file) 654 # shutil.copy can't overwrite files so remove old ones 655 # with the same name. 656 if os.path.exists(bench_data_file): 657 os.unlink(bench_data_file) 658 shutil.copy(data_file, bench_data_file) 659 660 os.chdir(oldcwd) 661 662 def _update_status(self, status, inp_arg): 663 '''Update self.status with success of a test.''' 664 if status: 665 self.status[inp_arg] = status 666 else: 667 # Something went wrong. Store a Status failed object. 668 self.status[inp_arg] = validation.Status([False]) 669 670 def get_status(self): 671 '''Get number of passed and number of ran tasks.''' 672 # If there's an object (other than None/False) in the corresponding 673 # dict entry in self.status, then that test must have ran (albeit not 674 # necessarily successfuly!). 675 status = {} 676 status['passed'] = sum(True for stat in self.status.values() 677 if stat and stat.passed()) 678 status['warning'] = sum(True for stat in self.status.values() 679 if stat and stat.warning()) 680 status['skipped'] = sum(True for stat in self.status.values() 681 if stat and stat.skipped()) 682 status['failed'] = sum(True for stat in self.status.values() 683 if stat and stat.failed()) 684 status['unknown'] = sum(True for stat in self.status.values() 685 if stat and stat.unknown()) 686 status['ran'] = sum(True for stat in self.status.values() if stat) 687 return status 688