1import multiprocessing 2import os 3import time 4 5import lit10.Test 6import lit10.util 7import lit10.worker 8 9 10# No-operation semaphore for supporting `None` for parallelism_groups. 11# lit_config.parallelism_groups['my_group'] = None 12class NopSemaphore(object): 13 def acquire(self): pass 14 def release(self): pass 15 16 17def create_run(tests, lit_config, workers, progress_callback, max_failures, 18 timeout): 19 assert workers > 0 20 if workers == 1: 21 return SerialRun(tests, lit_config, progress_callback, max_failures, timeout) 22 return ParallelRun(tests, lit_config, progress_callback, max_failures, timeout, workers) 23 24 25class Run(object): 26 """A concrete, configured testing run.""" 27 28 def __init__(self, tests, lit_config, progress_callback, max_failures, timeout): 29 self.tests = tests 30 self.lit_config = lit_config 31 self.progress_callback = progress_callback 32 self.max_failures = max_failures 33 self.timeout = timeout 34 35 def execute(self): 36 """ 37 Execute the tests in the run using up to the specified number of 38 parallel tasks, and inform the caller of each individual result. The 39 provided tests should be a subset of the tests available in this run 40 object. 41 42 The progress_callback will be invoked for each completed test. 43 44 If timeout is non-None, it should be a time in seconds after which to 45 stop executing tests. 46 47 Returns the elapsed testing time. 48 49 Upon completion, each test in the run will have its result 50 computed. Tests which were not actually executed (for any reason) will 51 be given an UNRESOLVED result. 52 """ 53 self.failure_count = 0 54 self.hit_max_failures = False 55 56 # Larger timeouts (one year, positive infinity) don't work on Windows. 57 one_week = 7 * 24 * 60 * 60 # days * hours * minutes * seconds 58 timeout = self.timeout or one_week 59 deadline = time.time() + timeout 60 61 self._execute(deadline) 62 63 # Mark any tests that weren't run as UNRESOLVED. 64 for test in self.tests: 65 if test.result is None: 66 test.setResult(lit10.Test.Result(lit10.Test.UNRESOLVED, '', 0.0)) 67 68 # TODO(yln): as the comment says.. this is racing with the main thread waiting 69 # for results 70 def _process_result(self, test, result): 71 # Don't add any more test results after we've hit the maximum failure 72 # count. Otherwise we're racing with the main thread, which is going 73 # to terminate the process pool soon. 74 if self.hit_max_failures: 75 return 76 77 test.setResult(result) 78 79 # Use test.isFailure() for correct XFAIL and XPASS handling 80 if test.isFailure(): 81 self.failure_count += 1 82 if self.failure_count == self.max_failures: 83 self.hit_max_failures = True 84 85 self.progress_callback(test) 86 87 88class SerialRun(Run): 89 def __init__(self, tests, lit_config, progress_callback, max_failures, timeout): 90 super(SerialRun, self).__init__(tests, lit_config, progress_callback, max_failures, timeout) 91 92 def _execute(self, deadline): 93 # TODO(yln): ignores deadline 94 for test in self.tests: 95 result = lit10.worker._execute(test, self.lit_config) 96 self._process_result(test, result) 97 if self.hit_max_failures: 98 break 99 100 101class ParallelRun(Run): 102 def __init__(self, tests, lit_config, progress_callback, max_failures, timeout, workers): 103 super(ParallelRun, self).__init__(tests, lit_config, progress_callback, max_failures, timeout) 104 self.workers = workers 105 106 def _execute(self, deadline): 107 semaphores = { 108 k: NopSemaphore() if v is None else 109 multiprocessing.BoundedSemaphore(v) for k, v in 110 self.lit_config.parallelism_groups.items()} 111 112 self._increase_process_limit() 113 114 # Start a process pool. Copy over the data shared between all test runs. 115 # FIXME: Find a way to capture the worker process stderr. If the user 116 # interrupts the workers before we make it into our task callback, they 117 # will each raise a KeyboardInterrupt exception and print to stderr at 118 # the same time. 119 pool = multiprocessing.Pool(self.workers, lit10.worker.initialize, 120 (self.lit_config, semaphores)) 121 122 self._install_win32_signal_handler(pool) 123 124 async_results = [ 125 pool.apply_async(lit10.worker.execute, args=[test], 126 callback=lambda r, t=test: self._process_result(t, r)) 127 for test in self.tests] 128 pool.close() 129 130 for ar in async_results: 131 timeout = deadline - time.time() 132 try: 133 ar.get(timeout) 134 except multiprocessing.TimeoutError: 135 # TODO(yln): print timeout error 136 pool.terminate() 137 break 138 if self.hit_max_failures: 139 pool.terminate() 140 break 141 pool.join() 142 143 # TODO(yln): interferes with progress bar 144 # Some tests use threads internally, and at least on Linux each of these 145 # threads counts toward the current process limit. Try to raise the (soft) 146 # process limit so that tests don't fail due to resource exhaustion. 147 def _increase_process_limit(self): 148 ncpus = lit10.util.detectCPUs() 149 desired_limit = self.workers * ncpus * 2 # the 2 is a safety factor 150 151 # Importing the resource module will likely fail on Windows. 152 try: 153 import resource 154 NPROC = resource.RLIMIT_NPROC 155 156 soft_limit, hard_limit = resource.getrlimit(NPROC) 157 desired_limit = min(desired_limit, hard_limit) 158 159 if soft_limit < desired_limit: 160 resource.setrlimit(NPROC, (desired_limit, hard_limit)) 161 self.lit_config.note('Raised process limit from %d to %d' % \ 162 (soft_limit, desired_limit)) 163 except Exception as ex: 164 # Warn, unless this is Windows, in which case this is expected. 165 if os.name != 'nt': 166 self.lit_config.warning('Failed to raise process limit: %s' % ex) 167 168 def _install_win32_signal_handler(self, pool): 169 if lit10.util.win32api is not None: 170 def console_ctrl_handler(type): 171 print('\nCtrl-C detected, terminating.') 172 pool.terminate() 173 pool.join() 174 lit10.util.abort_now() 175 return True 176 lit10.util.win32api.SetConsoleCtrlHandler(console_ctrl_handler, True) 177