1# This Source Code Form is subject to the terms of the Mozilla Public 2# License, v. 2.0. If a copy of the MPL was not distributed with this 3# file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4from __future__ import print_function, unicode_literals, division 5 6import subprocess 7import sys 8 9from datetime import datetime, timedelta 10from threading import Thread 11from six.moves.queue import Queue, Empty 12 13from .progressbar import ProgressBar 14from .results import NullTestOutput, TestOutput, escape_cmdline 15 16 17class EndMarker: 18 pass 19 20 21class TaskFinishedMarker: 22 pass 23 24 25def _do_work(qTasks, qResults, qWatch, prefix, run_skipped, timeout, show_cmd): 26 while True: 27 test = qTasks.get() 28 if test is EndMarker: 29 qWatch.put(EndMarker) 30 qResults.put(EndMarker) 31 return 32 33 if not test.enable and not run_skipped: 34 qResults.put(NullTestOutput(test)) 35 continue 36 37 # Spawn the test task. 38 cmd = test.get_command(prefix) 39 if show_cmd: 40 print(escape_cmdline(cmd)) 41 tStart = datetime.now() 42 proc = subprocess.Popen( 43 cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 44 45 # Push the task to the watchdog -- it will kill the task 46 # if it goes over the timeout while we keep its stdout 47 # buffer clear on the "main" worker thread. 48 qWatch.put(proc) 49 out, err = proc.communicate() 50 # We're not setting universal_newlines=True in subprocess.Popen due to 51 # still needing to support Python 3.5, which doesn't have the "encoding" 52 # parameter to the Popen constructor, so we have to decode the output 53 # here. 54 system_encoding = 'mbcs' if sys.platform == 'win32' else 'utf-8' 55 out = out.decode(system_encoding) 56 err = err.decode(system_encoding) 57 qWatch.put(TaskFinishedMarker) 58 59 # Create a result record and forward to result processing. 60 dt = datetime.now() - tStart 61 result = TestOutput(test, cmd, out, err, proc.returncode, dt.total_seconds(), 62 dt > timedelta(seconds=timeout)) 63 qResults.put(result) 64 65 66def _do_watch(qWatch, timeout): 67 while True: 68 proc = qWatch.get(True) 69 if proc == EndMarker: 70 return 71 try: 72 fin = qWatch.get(block=True, timeout=timeout) 73 assert fin is TaskFinishedMarker, "invalid finish marker" 74 except Empty: 75 # Timed out, force-kill the test. 76 try: 77 proc.terminate() 78 except WindowsError as ex: 79 # If the process finishes after we time out but before we 80 # terminate, the terminate call will fail. We can safely 81 # ignore this. 82 if ex.winerror != 5: 83 raise 84 fin = qWatch.get() 85 assert fin is TaskFinishedMarker, "invalid finish marker" 86 87 88def run_all_tests(tests, prefix, pb, options): 89 """ 90 Uses scatter-gather to a thread-pool to manage children. 91 """ 92 qTasks, qResults = Queue(), Queue() 93 94 workers = [] 95 watchdogs = [] 96 for _ in range(options.worker_count): 97 qWatch = Queue() 98 watcher = Thread(target=_do_watch, args=(qWatch, options.timeout)) 99 watcher.setDaemon(True) 100 watcher.start() 101 watchdogs.append(watcher) 102 worker = Thread(target=_do_work, args=(qTasks, qResults, qWatch, 103 prefix, options.run_skipped, 104 options.timeout, options.show_cmd)) 105 worker.setDaemon(True) 106 worker.start() 107 workers.append(worker) 108 109 # Insert all jobs into the queue, followed by the queue-end 110 # marker, one per worker. This will not block on growing the 111 # queue, only on waiting for more items in the generator. The 112 # workers are already started, however, so this will process as 113 # fast as we can produce tests from the filesystem. 114 def _do_push(num_workers, qTasks): 115 for test in tests: 116 qTasks.put(test) 117 for _ in range(num_workers): 118 qTasks.put(EndMarker) 119 pusher = Thread(target=_do_push, args=(len(workers), qTasks)) 120 pusher.setDaemon(True) 121 pusher.start() 122 123 # Read from the results. 124 ended = 0 125 delay = ProgressBar.update_granularity().total_seconds() 126 while ended < len(workers): 127 try: 128 result = qResults.get(block=True, timeout=delay) 129 if result is EndMarker: 130 ended += 1 131 else: 132 yield result 133 except Empty: 134 pb.poke() 135 136 # Cleanup and exit. 137 pusher.join() 138 for worker in workers: 139 worker.join() 140 for watcher in watchdogs: 141 watcher.join() 142 assert qTasks.empty(), "Send queue not drained" 143 assert qResults.empty(), "Result queue not drained" 144