1# This Source Code Form is subject to the terms of the Mozilla Public
2# License, v. 2.0. If a copy of the MPL was not distributed with this
3# file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4from __future__ import print_function, unicode_literals, division
5
6import subprocess
7import sys
8
9from datetime import datetime, timedelta
10from threading import Thread
11from six.moves.queue import Queue, Empty
12
13from .progressbar import ProgressBar
14from .results import NullTestOutput, TestOutput, escape_cmdline
15
16
17class EndMarker:
18    pass
19
20
21class TaskFinishedMarker:
22    pass
23
24
25def _do_work(qTasks, qResults, qWatch, prefix, run_skipped, timeout, show_cmd):
26    while True:
27        test = qTasks.get()
28        if test is EndMarker:
29            qWatch.put(EndMarker)
30            qResults.put(EndMarker)
31            return
32
33        if not test.enable and not run_skipped:
34            qResults.put(NullTestOutput(test))
35            continue
36
37        # Spawn the test task.
38        cmd = test.get_command(prefix)
39        if show_cmd:
40            print(escape_cmdline(cmd))
41        tStart = datetime.now()
42        proc = subprocess.Popen(
43            cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
44
45        # Push the task to the watchdog -- it will kill the task
46        # if it goes over the timeout while we keep its stdout
47        # buffer clear on the "main" worker thread.
48        qWatch.put(proc)
49        out, err = proc.communicate()
50        # We're not setting universal_newlines=True in subprocess.Popen due to
51        # still needing to support Python 3.5, which doesn't have the "encoding"
52        # parameter to the Popen constructor, so we have to decode the output
53        # here.
54        system_encoding = 'mbcs' if sys.platform == 'win32' else 'utf-8'
55        out = out.decode(system_encoding)
56        err = err.decode(system_encoding)
57        qWatch.put(TaskFinishedMarker)
58
59        # Create a result record and forward to result processing.
60        dt = datetime.now() - tStart
61        result = TestOutput(test, cmd, out, err, proc.returncode, dt.total_seconds(),
62                            dt > timedelta(seconds=timeout))
63        qResults.put(result)
64
65
66def _do_watch(qWatch, timeout):
67    while True:
68        proc = qWatch.get(True)
69        if proc == EndMarker:
70            return
71        try:
72            fin = qWatch.get(block=True, timeout=timeout)
73            assert fin is TaskFinishedMarker, "invalid finish marker"
74        except Empty:
75            # Timed out, force-kill the test.
76            try:
77                proc.terminate()
78            except WindowsError as ex:
79                # If the process finishes after we time out but before we
80                # terminate, the terminate call will fail. We can safely
81                # ignore this.
82                if ex.winerror != 5:
83                    raise
84            fin = qWatch.get()
85            assert fin is TaskFinishedMarker, "invalid finish marker"
86
87
88def run_all_tests(tests, prefix, pb, options):
89    """
90    Uses scatter-gather to a thread-pool to manage children.
91    """
92    qTasks, qResults = Queue(), Queue()
93
94    workers = []
95    watchdogs = []
96    for _ in range(options.worker_count):
97        qWatch = Queue()
98        watcher = Thread(target=_do_watch, args=(qWatch, options.timeout))
99        watcher.setDaemon(True)
100        watcher.start()
101        watchdogs.append(watcher)
102        worker = Thread(target=_do_work, args=(qTasks, qResults, qWatch,
103                                               prefix, options.run_skipped,
104                                               options.timeout, options.show_cmd))
105        worker.setDaemon(True)
106        worker.start()
107        workers.append(worker)
108
109    # Insert all jobs into the queue, followed by the queue-end
110    # marker, one per worker. This will not block on growing the
111    # queue, only on waiting for more items in the generator. The
112    # workers are already started, however, so this will process as
113    # fast as we can produce tests from the filesystem.
114    def _do_push(num_workers, qTasks):
115        for test in tests:
116            qTasks.put(test)
117        for _ in range(num_workers):
118            qTasks.put(EndMarker)
119    pusher = Thread(target=_do_push, args=(len(workers), qTasks))
120    pusher.setDaemon(True)
121    pusher.start()
122
123    # Read from the results.
124    ended = 0
125    delay = ProgressBar.update_granularity().total_seconds()
126    while ended < len(workers):
127        try:
128            result = qResults.get(block=True, timeout=delay)
129            if result is EndMarker:
130                ended += 1
131            else:
132                yield result
133        except Empty:
134            pb.poke()
135
136    # Cleanup and exit.
137    pusher.join()
138    for worker in workers:
139        worker.join()
140    for watcher in watchdogs:
141        watcher.join()
142    assert qTasks.empty(), "Send queue not drained"
143    assert qResults.empty(), "Result queue not drained"
144