1import collections
2import faulthandler
3import json
4import os
5import queue
6import signal
7import subprocess
8import sys
9import threading
10import time
11import traceback
12import types
13from test import support
14
15from test.libregrtest.runtest import (
16    runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME,
17    format_test_result, TestResult, is_failed, TIMEOUT)
18from test.libregrtest.setup import setup_tests
19from test.libregrtest.utils import format_duration, print_warning
20
21
22# Display the running tests if nothing happened last N seconds
23PROGRESS_UPDATE = 30.0   # seconds
24assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME
25
26# Kill the main process after 5 minutes. It is supposed to write an update
27# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest
28# buildbot workers.
29MAIN_PROCESS_TIMEOUT = 5 * 60.0
30assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE
31
32# Time to wait until a worker completes: should be immediate
33JOIN_TIMEOUT = 30.0   # seconds
34
35USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
36
37
38def must_stop(result, ns):
39    if result.result == INTERRUPTED:
40        return True
41    if ns.failfast and is_failed(result, ns):
42        return True
43    return False
44
45
46def parse_worker_args(worker_args):
47    ns_dict, test_name = json.loads(worker_args)
48    ns = types.SimpleNamespace(**ns_dict)
49    return (ns, test_name)
50
51
52def run_test_in_subprocess(testname, ns):
53    ns_dict = vars(ns)
54    worker_args = (ns_dict, testname)
55    worker_args = json.dumps(worker_args)
56
57    cmd = [sys.executable, *support.args_from_interpreter_flags(),
58           '-u',    # Unbuffered stdout and stderr
59           '-m', 'test.regrtest',
60           '--worker-args', worker_args]
61
62    # Running the child from the same working directory as regrtest's original
63    # invocation ensures that TEMPDIR for the child is the same when
64    # sysconfig.is_python_build() is true. See issue 15300.
65    kw = {}
66    if USE_PROCESS_GROUP:
67        kw['start_new_session'] = True
68    return subprocess.Popen(cmd,
69                            stdout=subprocess.PIPE,
70                            stderr=subprocess.PIPE,
71                            universal_newlines=True,
72                            close_fds=(os.name != 'nt'),
73                            cwd=support.SAVEDCWD,
74                            **kw)
75
76
77def run_tests_worker(ns, test_name):
78    setup_tests(ns)
79
80    result = runtest(ns, test_name)
81
82    print()   # Force a newline (just in case)
83
84    # Serialize TestResult as list in JSON
85    print(json.dumps(list(result)), flush=True)
86    sys.exit(0)
87
88
89# We do not use a generator so multiple threads can call next().
90class MultiprocessIterator:
91
92    """A thread-safe iterator over tests for multiprocess mode."""
93
94    def __init__(self, tests_iter):
95        self.lock = threading.Lock()
96        self.tests_iter = tests_iter
97
98    def __iter__(self):
99        return self
100
101    def __next__(self):
102        with self.lock:
103            if self.tests_iter is None:
104                raise StopIteration
105            return next(self.tests_iter)
106
107    def stop(self):
108        with self.lock:
109            self.tests_iter = None
110
111
112MultiprocessResult = collections.namedtuple('MultiprocessResult',
113    'result stdout stderr error_msg')
114
115class ExitThread(Exception):
116    pass
117
118
119class TestWorkerProcess(threading.Thread):
120    def __init__(self, worker_id, runner):
121        super().__init__()
122        self.worker_id = worker_id
123        self.pending = runner.pending
124        self.output = runner.output
125        self.ns = runner.ns
126        self.timeout = runner.worker_timeout
127        self.regrtest = runner.regrtest
128        self.current_test_name = None
129        self.start_time = None
130        self._popen = None
131        self._killed = False
132        self._stopped = False
133
134    def __repr__(self):
135        info = [f'TestWorkerProcess #{self.worker_id}']
136        if self.is_alive():
137            info.append("running")
138        else:
139            info.append('stopped')
140        test = self.current_test_name
141        if test:
142            info.append(f'test={test}')
143        popen = self._popen
144        if popen is not None:
145            dt = time.monotonic() - self.start_time
146            info.extend((f'pid={self._popen.pid}',
147                         f'time={format_duration(dt)}'))
148        return '<%s>' % ' '.join(info)
149
150    def _kill(self):
151        popen = self._popen
152        if popen is None:
153            return
154
155        if self._killed:
156            return
157        self._killed = True
158
159        if USE_PROCESS_GROUP:
160            what = f"{self} process group"
161        else:
162            what = f"{self}"
163
164        print(f"Kill {what}", file=sys.stderr, flush=True)
165        try:
166            if USE_PROCESS_GROUP:
167                os.killpg(popen.pid, signal.SIGKILL)
168            else:
169                popen.kill()
170        except ProcessLookupError:
171            # popen.kill(): the process completed, the TestWorkerProcess thread
172            # read its exit status, but Popen.send_signal() read the returncode
173            # just before Popen.wait() set returncode.
174            pass
175        except OSError as exc:
176            print_warning(f"Failed to kill {what}: {exc!r}")
177
178    def stop(self):
179        # Method called from a different thread to stop this thread
180        self._stopped = True
181        self._kill()
182
183    def mp_result_error(self, test_name, error_type, stdout='', stderr='',
184                        err_msg=None):
185        test_time = time.monotonic() - self.start_time
186        result = TestResult(test_name, error_type, test_time, None)
187        return MultiprocessResult(result, stdout, stderr, err_msg)
188
189    def _run_process(self, test_name):
190        self.start_time = time.monotonic()
191
192        self.current_test_name = test_name
193        try:
194            popen = run_test_in_subprocess(test_name, self.ns)
195
196            self._killed = False
197            self._popen = popen
198        except:
199            self.current_test_name = None
200            raise
201
202        try:
203            if self._stopped:
204                # If kill() has been called before self._popen is set,
205                # self._popen is still running. Call again kill()
206                # to ensure that the process is killed.
207                self._kill()
208                raise ExitThread
209
210            try:
211                stdout, stderr = popen.communicate(timeout=self.timeout)
212                retcode = popen.returncode
213                assert retcode is not None
214            except subprocess.TimeoutExpired:
215                if self._stopped:
216                    # kill() has been called: communicate() fails
217                    # on reading closed stdout/stderr
218                    raise ExitThread
219
220                # On timeout, kill the process
221                self._kill()
222
223                # None means TIMEOUT for the caller
224                retcode = None
225                # bpo-38207: Don't attempt to call communicate() again: on it
226                # can hang until all child processes using stdout and stderr
227                # pipes completes.
228                stdout = stderr = ''
229            except OSError:
230                if self._stopped:
231                    # kill() has been called: communicate() fails
232                    # on reading closed stdout/stderr
233                    raise ExitThread
234                raise
235            else:
236                stdout = stdout.strip()
237                stderr = stderr.rstrip()
238
239            return (retcode, stdout, stderr)
240        except:
241            self._kill()
242            raise
243        finally:
244            self._wait_completed()
245            self._popen = None
246            self.current_test_name = None
247
248    def _runtest(self, test_name):
249        retcode, stdout, stderr = self._run_process(test_name)
250
251        if retcode is None:
252            return self.mp_result_error(test_name, TIMEOUT, stdout, stderr)
253
254        err_msg = None
255        if retcode != 0:
256            err_msg = "Exit code %s" % retcode
257        else:
258            stdout, _, result = stdout.rpartition("\n")
259            stdout = stdout.rstrip()
260            if not result:
261                err_msg = "Failed to parse worker stdout"
262            else:
263                try:
264                    # deserialize run_tests_worker() output
265                    result = json.loads(result)
266                    result = TestResult(*result)
267                except Exception as exc:
268                    err_msg = "Failed to parse worker JSON: %s" % exc
269
270        if err_msg is not None:
271            return self.mp_result_error(test_name, CHILD_ERROR,
272                                        stdout, stderr, err_msg)
273
274        return MultiprocessResult(result, stdout, stderr, err_msg)
275
276    def run(self):
277        while not self._stopped:
278            try:
279                try:
280                    test_name = next(self.pending)
281                except StopIteration:
282                    break
283
284                mp_result = self._runtest(test_name)
285                self.output.put((False, mp_result))
286
287                if must_stop(mp_result.result, self.ns):
288                    break
289            except ExitThread:
290                break
291            except BaseException:
292                self.output.put((True, traceback.format_exc()))
293                break
294
295    def _wait_completed(self):
296        popen = self._popen
297
298        # stdout and stderr must be closed to ensure that communicate()
299        # does not hang
300        popen.stdout.close()
301        popen.stderr.close()
302
303        try:
304            popen.wait(JOIN_TIMEOUT)
305        except (subprocess.TimeoutExpired, OSError) as exc:
306            print_warning(f"Failed to wait for {self} completion "
307                          f"(timeout={format_duration(JOIN_TIMEOUT)}): "
308                          f"{exc!r}")
309
310    def wait_stopped(self, start_time):
311        # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop()
312        # which killed the process. Sometimes, killing the process from the
313        # main thread does not interrupt popen.communicate() in
314        # TestWorkerProcess thread. This loop with a timeout is a workaround
315        # for that.
316        #
317        # Moreover, if this method fails to join the thread, it is likely
318        # that Python will hang at exit while calling threading._shutdown()
319        # which tries again to join the blocked thread. Regrtest.main()
320        # uses EXIT_TIMEOUT to workaround this second bug.
321        while True:
322            # Write a message every second
323            self.join(1.0)
324            if not self.is_alive():
325                break
326            dt = time.monotonic() - start_time
327            self.regrtest.log(f"Waiting for {self} thread "
328                              f"for {format_duration(dt)}")
329            if dt > JOIN_TIMEOUT:
330                print_warning(f"Failed to join {self} in {format_duration(dt)}")
331                break
332
333
334def get_running(workers):
335    running = []
336    for worker in workers:
337        current_test_name = worker.current_test_name
338        if not current_test_name:
339            continue
340        dt = time.monotonic() - worker.start_time
341        if dt >= PROGRESS_MIN_TIME:
342            text = '%s (%s)' % (current_test_name, format_duration(dt))
343            running.append(text)
344    return running
345
346
347class MultiprocessTestRunner:
348    def __init__(self, regrtest):
349        self.regrtest = regrtest
350        self.log = self.regrtest.log
351        self.ns = regrtest.ns
352        self.output = queue.Queue()
353        self.pending = MultiprocessIterator(self.regrtest.tests)
354        if self.ns.timeout is not None:
355            # Rely on faulthandler to kill a worker process. This timouet is
356            # when faulthandler fails to kill a worker process. Give a maximum
357            # of 5 minutes to faulthandler to kill the worker.
358            self.worker_timeout = min(self.ns.timeout * 1.5,
359                                      self.ns.timeout + 5 * 60)
360        else:
361            self.worker_timeout = None
362        self.workers = None
363
364    def start_workers(self):
365        self.workers = [TestWorkerProcess(index, self)
366                        for index in range(1, self.ns.use_mp + 1)]
367        msg = f"Run tests in parallel using {len(self.workers)} child processes"
368        if self.ns.timeout:
369            msg += (" (timeout: %s, worker timeout: %s)"
370                    % (format_duration(self.ns.timeout),
371                       format_duration(self.worker_timeout)))
372        self.log(msg)
373        for worker in self.workers:
374            worker.start()
375
376    def stop_workers(self):
377        start_time = time.monotonic()
378        for worker in self.workers:
379            worker.stop()
380        for worker in self.workers:
381            worker.wait_stopped(start_time)
382
383    def _get_result(self):
384        if not any(worker.is_alive() for worker in self.workers):
385            # all worker threads are done: consume pending results
386            try:
387                return self.output.get(timeout=0)
388            except queue.Empty:
389                return None
390
391        use_faulthandler = (self.ns.timeout is not None)
392        timeout = PROGRESS_UPDATE
393        while True:
394            if use_faulthandler:
395                faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
396                                                  exit=True)
397
398            # wait for a thread
399            try:
400                return self.output.get(timeout=timeout)
401            except queue.Empty:
402                pass
403
404            # display progress
405            running = get_running(self.workers)
406            if running and not self.ns.pgo:
407                self.log('running: %s' % ', '.join(running))
408
409    def display_result(self, mp_result):
410        result = mp_result.result
411
412        text = format_test_result(result)
413        if mp_result.error_msg is not None:
414            # CHILD_ERROR
415            text += ' (%s)' % mp_result.error_msg
416        elif (result.test_time >= PROGRESS_MIN_TIME and not self.ns.pgo):
417            text += ' (%s)' % format_duration(result.test_time)
418        running = get_running(self.workers)
419        if running and not self.ns.pgo:
420            text += ' -- running: %s' % ', '.join(running)
421        self.regrtest.display_progress(self.test_index, text)
422
423    def _process_result(self, item):
424        if item[0]:
425            # Thread got an exception
426            format_exc = item[1]
427            print_warning(f"regrtest worker thread failed: {format_exc}")
428            return True
429
430        self.test_index += 1
431        mp_result = item[1]
432        self.regrtest.accumulate_result(mp_result.result)
433        self.display_result(mp_result)
434
435        if mp_result.stdout:
436            print(mp_result.stdout, flush=True)
437        if mp_result.stderr and not self.ns.pgo:
438            print(mp_result.stderr, file=sys.stderr, flush=True)
439
440        if must_stop(mp_result.result, self.ns):
441            return True
442
443        return False
444
445    def run_tests(self):
446        self.start_workers()
447
448        self.test_index = 0
449        try:
450            while True:
451                item = self._get_result()
452                if item is None:
453                    break
454
455                stop = self._process_result(item)
456                if stop:
457                    break
458        except KeyboardInterrupt:
459            print()
460            self.regrtest.interrupted = True
461        finally:
462            if self.ns.timeout is not None:
463                faulthandler.cancel_dump_traceback_later()
464
465            # Always ensure that all worker processes are no longer
466            # worker when we exit this function
467            self.pending.stop()
468            self.stop_workers()
469
470
471def run_tests_multiprocess(regrtest):
472    MultiprocessTestRunner(regrtest).run_tests()
473