1import multiprocessing 2import os 3import time 4 5import lit.Test 6import lit.util 7import lit.worker 8 9 10class MaxFailuresError(Exception): 11 pass 12class TimeoutError(Exception): 13 pass 14 15 16class Run(object): 17 """A concrete, configured testing run.""" 18 19 def __init__(self, tests, lit_config, workers, progress_callback, 20 max_failures, timeout): 21 self.tests = tests 22 self.lit_config = lit_config 23 self.workers = workers 24 self.progress_callback = progress_callback 25 self.max_failures = max_failures 26 self.timeout = timeout 27 assert workers > 0 28 29 def execute(self): 30 """ 31 Execute the tests in the run using up to the specified number of 32 parallel tasks, and inform the caller of each individual result. The 33 provided tests should be a subset of the tests available in this run 34 object. 35 36 The progress_callback will be invoked for each completed test. 37 38 If timeout is non-None, it should be a time in seconds after which to 39 stop executing tests. 40 41 Returns the elapsed testing time. 42 43 Upon completion, each test in the run will have its result 44 computed. Tests which were not actually executed (for any reason) will 45 be marked SKIPPED. 46 """ 47 self.failures = 0 48 49 # Larger timeouts (one year, positive infinity) don't work on Windows. 50 one_week = 7 * 24 * 60 * 60 # days * hours * minutes * seconds 51 timeout = self.timeout or one_week 52 deadline = time.time() + timeout 53 54 try: 55 self._execute(deadline) 56 finally: 57 skipped = lit.Test.Result(lit.Test.SKIPPED) 58 for test in self.tests: 59 if test.result is None: 60 test.setResult(skipped) 61 62 def _execute(self, deadline): 63 self._increase_process_limit() 64 65 semaphores = {k: multiprocessing.BoundedSemaphore(v) 66 for k, v in self.lit_config.parallelism_groups.items() 67 if v is not None} 68 69 pool = multiprocessing.Pool(self.workers, lit.worker.initialize, 70 (self.lit_config, semaphores)) 71 72 async_results = [ 73 pool.apply_async(lit.worker.execute, args=[test], 74 callback=self.progress_callback) 75 for test in self.tests] 76 pool.close() 77 78 try: 79 self._wait_for(async_results, deadline) 80 except: 81 pool.terminate() 82 raise 83 finally: 84 pool.join() 85 86 def _wait_for(self, async_results, deadline): 87 timeout = deadline - time.time() 88 for idx, ar in enumerate(async_results): 89 try: 90 test = ar.get(timeout) 91 except multiprocessing.TimeoutError: 92 raise TimeoutError() 93 else: 94 self._update_test(self.tests[idx], test) 95 if test.isFailure(): 96 self.failures += 1 97 if self.failures == self.max_failures: 98 raise MaxFailuresError() 99 100 # Update local test object "in place" from remote test object. This 101 # ensures that the original test object which is used for printing test 102 # results reflects the changes. 103 def _update_test(self, local_test, remote_test): 104 # Needed for getMissingRequiredFeatures() 105 local_test.requires = remote_test.requires 106 local_test.result = remote_test.result 107 108 # TODO(yln): interferes with progress bar 109 # Some tests use threads internally, and at least on Linux each of these 110 # threads counts toward the current process limit. Try to raise the (soft) 111 # process limit so that tests don't fail due to resource exhaustion. 112 def _increase_process_limit(self): 113 ncpus = lit.util.usable_core_count() 114 desired_limit = self.workers * ncpus * 2 # the 2 is a safety factor 115 116 # Importing the resource module will likely fail on Windows. 117 try: 118 import resource 119 NPROC = resource.RLIMIT_NPROC 120 121 soft_limit, hard_limit = resource.getrlimit(NPROC) 122 desired_limit = min(desired_limit, hard_limit) 123 124 if soft_limit < desired_limit: 125 resource.setrlimit(NPROC, (desired_limit, hard_limit)) 126 self.lit_config.note('Raised process limit from %d to %d' % \ 127 (soft_limit, desired_limit)) 128 except Exception as ex: 129 # Warn, unless this is Windows, in which case this is expected. 130 if os.name != 'nt': 131 self.lit_config.warning('Failed to raise process limit: %s' % ex) 132