1# This Source Code Form is subject to the terms of the Mozilla Public
2# License, v. 2.0. If a copy of the MPL was not distributed with this
3# file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4from __future__ import print_function, unicode_literals, division
5
6import subprocess
7import sys
8from datetime import datetime, timedelta
9from progressbar import ProgressBar
10from results import NullTestOutput, TestOutput, escape_cmdline
11from threading import Thread
12from Queue import Queue, Empty
13
14
15class EndMarker:
16    pass
17
18
19class TaskFinishedMarker:
20    pass
21
22
23def _do_work(qTasks, qResults, qWatch, prefix, run_skipped, timeout, show_cmd):
24    while True:
25        test = qTasks.get(block=True, timeout=sys.maxint)
26        if test is EndMarker:
27            qWatch.put(EndMarker)
28            qResults.put(EndMarker)
29            return
30
31        if not test.enable and not run_skipped:
32            qResults.put(NullTestOutput(test))
33            continue
34
35        # Spawn the test task.
36        cmd = test.get_command(prefix)
37        if show_cmd:
38            print(escape_cmdline(cmd))
39        tStart = datetime.now()
40        proc = subprocess.Popen(cmd,
41                                stdout=subprocess.PIPE,
42                                stderr=subprocess.PIPE)
43
44        # Push the task to the watchdog -- it will kill the task
45        # if it goes over the timeout while we keep its stdout
46        # buffer clear on the "main" worker thread.
47        qWatch.put(proc)
48        out, err = proc.communicate()
49        qWatch.put(TaskFinishedMarker)
50
51        # Create a result record and forward to result processing.
52        dt = datetime.now() - tStart
53        result = TestOutput(test, cmd, out, err, proc.returncode, dt.total_seconds(),
54                            dt > timedelta(seconds=timeout))
55        qResults.put(result)
56
57
58def _do_watch(qWatch, timeout):
59    while True:
60        proc = qWatch.get(True)
61        if proc == EndMarker:
62            return
63        try:
64            fin = qWatch.get(block=True, timeout=timeout)
65            assert fin is TaskFinishedMarker, "invalid finish marker"
66        except Empty:
67            # Timed out, force-kill the test.
68            try:
69                proc.terminate()
70            except WindowsError as ex:
71                # If the process finishes after we time out but before we
72                # terminate, the terminate call will fail. We can safely
73                # ignore this.
74                if ex.winerror != 5:
75                    raise
76            fin = qWatch.get(block=True, timeout=sys.maxint)
77            assert fin is TaskFinishedMarker, "invalid finish marker"
78
79
80def run_all_tests(tests, prefix, pb, options):
81    """
82    Uses scatter-gather to a thread-pool to manage children.
83    """
84    qTasks, qResults = Queue(), Queue()
85
86    workers = []
87    watchdogs = []
88    for _ in range(options.worker_count):
89        qWatch = Queue()
90        watcher = Thread(target=_do_watch, args=(qWatch, options.timeout))
91        watcher.setDaemon(True)
92        watcher.start()
93        watchdogs.append(watcher)
94        worker = Thread(target=_do_work, args=(qTasks, qResults, qWatch,
95                                               prefix, options.run_skipped,
96                                               options.timeout, options.show_cmd))
97        worker.setDaemon(True)
98        worker.start()
99        workers.append(worker)
100
101    # Insert all jobs into the queue, followed by the queue-end
102    # marker, one per worker. This will not block on growing the
103    # queue, only on waiting for more items in the generator. The
104    # workers are already started, however, so this will process as
105    # fast as we can produce tests from the filesystem.
106    def _do_push(num_workers, qTasks):
107        for test in tests:
108            qTasks.put(test)
109        for _ in range(num_workers):
110            qTasks.put(EndMarker)
111    pusher = Thread(target=_do_push, args=(len(workers), qTasks))
112    pusher.setDaemon(True)
113    pusher.start()
114
115    # Read from the results.
116    ended = 0
117    delay = ProgressBar.update_granularity().total_seconds()
118    while ended < len(workers):
119        try:
120            result = qResults.get(block=True, timeout=delay)
121            if result is EndMarker:
122                ended += 1
123            else:
124                yield result
125        except Empty:
126            pb.poke()
127
128    # Cleanup and exit.
129    pusher.join()
130    for worker in workers:
131        worker.join()
132    for watcher in watchdogs:
133        watcher.join()
134    assert qTasks.empty(), "Send queue not drained"
135    assert qResults.empty(), "Result queue not drained"
136