1import collections 2import faulthandler 3import json 4import os 5import queue 6import signal 7import subprocess 8import sys 9import threading 10import time 11import traceback 12import types 13from test import support 14 15from test.libregrtest.runtest import ( 16 runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME, 17 format_test_result, TestResult, is_failed, TIMEOUT) 18from test.libregrtest.setup import setup_tests 19from test.libregrtest.utils import format_duration, print_warning 20 21 22# Display the running tests if nothing happened last N seconds 23PROGRESS_UPDATE = 30.0 # seconds 24assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME 25 26# Kill the main process after 5 minutes. It is supposed to write an update 27# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest 28# buildbot workers. 29MAIN_PROCESS_TIMEOUT = 5 * 60.0 30assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE 31 32# Time to wait until a worker completes: should be immediate 33JOIN_TIMEOUT = 30.0 # seconds 34 35USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg")) 36 37 38def must_stop(result, ns): 39 if result.result == INTERRUPTED: 40 return True 41 if ns.failfast and is_failed(result, ns): 42 return True 43 return False 44 45 46def parse_worker_args(worker_args): 47 ns_dict, test_name = json.loads(worker_args) 48 ns = types.SimpleNamespace(**ns_dict) 49 return (ns, test_name) 50 51 52def run_test_in_subprocess(testname, ns): 53 ns_dict = vars(ns) 54 worker_args = (ns_dict, testname) 55 worker_args = json.dumps(worker_args) 56 57 cmd = [sys.executable, *support.args_from_interpreter_flags(), 58 '-u', # Unbuffered stdout and stderr 59 '-m', 'test.regrtest', 60 '--worker-args', worker_args] 61 62 # Running the child from the same working directory as regrtest's original 63 # invocation ensures that TEMPDIR for the child is the same when 64 # sysconfig.is_python_build() is true. See issue 15300. 65 kw = {} 66 if USE_PROCESS_GROUP: 67 kw['start_new_session'] = True 68 return subprocess.Popen(cmd, 69 stdout=subprocess.PIPE, 70 stderr=subprocess.PIPE, 71 universal_newlines=True, 72 close_fds=(os.name != 'nt'), 73 cwd=support.SAVEDCWD, 74 **kw) 75 76 77def run_tests_worker(ns, test_name): 78 setup_tests(ns) 79 80 result = runtest(ns, test_name) 81 82 print() # Force a newline (just in case) 83 84 # Serialize TestResult as list in JSON 85 print(json.dumps(list(result)), flush=True) 86 sys.exit(0) 87 88 89# We do not use a generator so multiple threads can call next(). 90class MultiprocessIterator: 91 92 """A thread-safe iterator over tests for multiprocess mode.""" 93 94 def __init__(self, tests_iter): 95 self.lock = threading.Lock() 96 self.tests_iter = tests_iter 97 98 def __iter__(self): 99 return self 100 101 def __next__(self): 102 with self.lock: 103 if self.tests_iter is None: 104 raise StopIteration 105 return next(self.tests_iter) 106 107 def stop(self): 108 with self.lock: 109 self.tests_iter = None 110 111 112MultiprocessResult = collections.namedtuple('MultiprocessResult', 113 'result stdout stderr error_msg') 114 115class ExitThread(Exception): 116 pass 117 118 119class TestWorkerProcess(threading.Thread): 120 def __init__(self, worker_id, runner): 121 super().__init__() 122 self.worker_id = worker_id 123 self.pending = runner.pending 124 self.output = runner.output 125 self.ns = runner.ns 126 self.timeout = runner.worker_timeout 127 self.regrtest = runner.regrtest 128 self.current_test_name = None 129 self.start_time = None 130 self._popen = None 131 self._killed = False 132 self._stopped = False 133 134 def __repr__(self): 135 info = [f'TestWorkerProcess #{self.worker_id}'] 136 if self.is_alive(): 137 info.append("running") 138 else: 139 info.append('stopped') 140 test = self.current_test_name 141 if test: 142 info.append(f'test={test}') 143 popen = self._popen 144 if popen is not None: 145 dt = time.monotonic() - self.start_time 146 info.extend((f'pid={self._popen.pid}', 147 f'time={format_duration(dt)}')) 148 return '<%s>' % ' '.join(info) 149 150 def _kill(self): 151 popen = self._popen 152 if popen is None: 153 return 154 155 if self._killed: 156 return 157 self._killed = True 158 159 if USE_PROCESS_GROUP: 160 what = f"{self} process group" 161 else: 162 what = f"{self}" 163 164 print(f"Kill {what}", file=sys.stderr, flush=True) 165 try: 166 if USE_PROCESS_GROUP: 167 os.killpg(popen.pid, signal.SIGKILL) 168 else: 169 popen.kill() 170 except ProcessLookupError: 171 # popen.kill(): the process completed, the TestWorkerProcess thread 172 # read its exit status, but Popen.send_signal() read the returncode 173 # just before Popen.wait() set returncode. 174 pass 175 except OSError as exc: 176 print_warning(f"Failed to kill {what}: {exc!r}") 177 178 def stop(self): 179 # Method called from a different thread to stop this thread 180 self._stopped = True 181 self._kill() 182 183 def mp_result_error(self, test_name, error_type, stdout='', stderr='', 184 err_msg=None): 185 test_time = time.monotonic() - self.start_time 186 result = TestResult(test_name, error_type, test_time, None) 187 return MultiprocessResult(result, stdout, stderr, err_msg) 188 189 def _run_process(self, test_name): 190 self.start_time = time.monotonic() 191 192 self.current_test_name = test_name 193 try: 194 popen = run_test_in_subprocess(test_name, self.ns) 195 196 self._killed = False 197 self._popen = popen 198 except: 199 self.current_test_name = None 200 raise 201 202 try: 203 if self._stopped: 204 # If kill() has been called before self._popen is set, 205 # self._popen is still running. Call again kill() 206 # to ensure that the process is killed. 207 self._kill() 208 raise ExitThread 209 210 try: 211 stdout, stderr = popen.communicate(timeout=self.timeout) 212 retcode = popen.returncode 213 assert retcode is not None 214 except subprocess.TimeoutExpired: 215 if self._stopped: 216 # kill() has been called: communicate() fails 217 # on reading closed stdout/stderr 218 raise ExitThread 219 220 # On timeout, kill the process 221 self._kill() 222 223 # None means TIMEOUT for the caller 224 retcode = None 225 # bpo-38207: Don't attempt to call communicate() again: on it 226 # can hang until all child processes using stdout and stderr 227 # pipes completes. 228 stdout = stderr = '' 229 except OSError: 230 if self._stopped: 231 # kill() has been called: communicate() fails 232 # on reading closed stdout/stderr 233 raise ExitThread 234 raise 235 else: 236 stdout = stdout.strip() 237 stderr = stderr.rstrip() 238 239 return (retcode, stdout, stderr) 240 except: 241 self._kill() 242 raise 243 finally: 244 self._wait_completed() 245 self._popen = None 246 self.current_test_name = None 247 248 def _runtest(self, test_name): 249 retcode, stdout, stderr = self._run_process(test_name) 250 251 if retcode is None: 252 return self.mp_result_error(test_name, TIMEOUT, stdout, stderr) 253 254 err_msg = None 255 if retcode != 0: 256 err_msg = "Exit code %s" % retcode 257 else: 258 stdout, _, result = stdout.rpartition("\n") 259 stdout = stdout.rstrip() 260 if not result: 261 err_msg = "Failed to parse worker stdout" 262 else: 263 try: 264 # deserialize run_tests_worker() output 265 result = json.loads(result) 266 result = TestResult(*result) 267 except Exception as exc: 268 err_msg = "Failed to parse worker JSON: %s" % exc 269 270 if err_msg is not None: 271 return self.mp_result_error(test_name, CHILD_ERROR, 272 stdout, stderr, err_msg) 273 274 return MultiprocessResult(result, stdout, stderr, err_msg) 275 276 def run(self): 277 while not self._stopped: 278 try: 279 try: 280 test_name = next(self.pending) 281 except StopIteration: 282 break 283 284 mp_result = self._runtest(test_name) 285 self.output.put((False, mp_result)) 286 287 if must_stop(mp_result.result, self.ns): 288 break 289 except ExitThread: 290 break 291 except BaseException: 292 self.output.put((True, traceback.format_exc())) 293 break 294 295 def _wait_completed(self): 296 popen = self._popen 297 298 # stdout and stderr must be closed to ensure that communicate() 299 # does not hang 300 popen.stdout.close() 301 popen.stderr.close() 302 303 try: 304 popen.wait(JOIN_TIMEOUT) 305 except (subprocess.TimeoutExpired, OSError) as exc: 306 print_warning(f"Failed to wait for {self} completion " 307 f"(timeout={format_duration(JOIN_TIMEOUT)}): " 308 f"{exc!r}") 309 310 def wait_stopped(self, start_time): 311 # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop() 312 # which killed the process. Sometimes, killing the process from the 313 # main thread does not interrupt popen.communicate() in 314 # TestWorkerProcess thread. This loop with a timeout is a workaround 315 # for that. 316 # 317 # Moreover, if this method fails to join the thread, it is likely 318 # that Python will hang at exit while calling threading._shutdown() 319 # which tries again to join the blocked thread. Regrtest.main() 320 # uses EXIT_TIMEOUT to workaround this second bug. 321 while True: 322 # Write a message every second 323 self.join(1.0) 324 if not self.is_alive(): 325 break 326 dt = time.monotonic() - start_time 327 self.regrtest.log(f"Waiting for {self} thread " 328 f"for {format_duration(dt)}") 329 if dt > JOIN_TIMEOUT: 330 print_warning(f"Failed to join {self} in {format_duration(dt)}") 331 break 332 333 334def get_running(workers): 335 running = [] 336 for worker in workers: 337 current_test_name = worker.current_test_name 338 if not current_test_name: 339 continue 340 dt = time.monotonic() - worker.start_time 341 if dt >= PROGRESS_MIN_TIME: 342 text = '%s (%s)' % (current_test_name, format_duration(dt)) 343 running.append(text) 344 return running 345 346 347class MultiprocessTestRunner: 348 def __init__(self, regrtest): 349 self.regrtest = regrtest 350 self.log = self.regrtest.log 351 self.ns = regrtest.ns 352 self.output = queue.Queue() 353 self.pending = MultiprocessIterator(self.regrtest.tests) 354 if self.ns.timeout is not None: 355 # Rely on faulthandler to kill a worker process. This timouet is 356 # when faulthandler fails to kill a worker process. Give a maximum 357 # of 5 minutes to faulthandler to kill the worker. 358 self.worker_timeout = min(self.ns.timeout * 1.5, 359 self.ns.timeout + 5 * 60) 360 else: 361 self.worker_timeout = None 362 self.workers = None 363 364 def start_workers(self): 365 self.workers = [TestWorkerProcess(index, self) 366 for index in range(1, self.ns.use_mp + 1)] 367 msg = f"Run tests in parallel using {len(self.workers)} child processes" 368 if self.ns.timeout: 369 msg += (" (timeout: %s, worker timeout: %s)" 370 % (format_duration(self.ns.timeout), 371 format_duration(self.worker_timeout))) 372 self.log(msg) 373 for worker in self.workers: 374 worker.start() 375 376 def stop_workers(self): 377 start_time = time.monotonic() 378 for worker in self.workers: 379 worker.stop() 380 for worker in self.workers: 381 worker.wait_stopped(start_time) 382 383 def _get_result(self): 384 if not any(worker.is_alive() for worker in self.workers): 385 # all worker threads are done: consume pending results 386 try: 387 return self.output.get(timeout=0) 388 except queue.Empty: 389 return None 390 391 use_faulthandler = (self.ns.timeout is not None) 392 timeout = PROGRESS_UPDATE 393 while True: 394 if use_faulthandler: 395 faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT, 396 exit=True) 397 398 # wait for a thread 399 try: 400 return self.output.get(timeout=timeout) 401 except queue.Empty: 402 pass 403 404 # display progress 405 running = get_running(self.workers) 406 if running and not self.ns.pgo: 407 self.log('running: %s' % ', '.join(running)) 408 409 def display_result(self, mp_result): 410 result = mp_result.result 411 412 text = format_test_result(result) 413 if mp_result.error_msg is not None: 414 # CHILD_ERROR 415 text += ' (%s)' % mp_result.error_msg 416 elif (result.test_time >= PROGRESS_MIN_TIME and not self.ns.pgo): 417 text += ' (%s)' % format_duration(result.test_time) 418 running = get_running(self.workers) 419 if running and not self.ns.pgo: 420 text += ' -- running: %s' % ', '.join(running) 421 self.regrtest.display_progress(self.test_index, text) 422 423 def _process_result(self, item): 424 if item[0]: 425 # Thread got an exception 426 format_exc = item[1] 427 print_warning(f"regrtest worker thread failed: {format_exc}") 428 return True 429 430 self.test_index += 1 431 mp_result = item[1] 432 self.regrtest.accumulate_result(mp_result.result) 433 self.display_result(mp_result) 434 435 if mp_result.stdout: 436 print(mp_result.stdout, flush=True) 437 if mp_result.stderr and not self.ns.pgo: 438 print(mp_result.stderr, file=sys.stderr, flush=True) 439 440 if must_stop(mp_result.result, self.ns): 441 return True 442 443 return False 444 445 def run_tests(self): 446 self.start_workers() 447 448 self.test_index = 0 449 try: 450 while True: 451 item = self._get_result() 452 if item is None: 453 break 454 455 stop = self._process_result(item) 456 if stop: 457 break 458 except KeyboardInterrupt: 459 print() 460 self.regrtest.interrupted = True 461 finally: 462 if self.ns.timeout is not None: 463 faulthandler.cancel_dump_traceback_later() 464 465 # Always ensure that all worker processes are no longer 466 # worker when we exit this function 467 self.pending.stop() 468 self.stop_workers() 469 470 471def run_tests_multiprocess(regrtest): 472 MultiprocessTestRunner(regrtest).run_tests() 473