1# This Source Code Form is subject to the terms of the Mozilla Public 2# License, v. 2.0. If a copy of the MPL was not distributed with this 3# file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4from __future__ import print_function, unicode_literals, division 5 6import subprocess 7import sys 8from datetime import datetime, timedelta 9from progressbar import ProgressBar 10from results import NullTestOutput, TestOutput, escape_cmdline 11from threading import Thread 12from Queue import Queue, Empty 13 14 15class EndMarker: 16 pass 17 18 19class TaskFinishedMarker: 20 pass 21 22 23def _do_work(qTasks, qResults, qWatch, prefix, run_skipped, timeout, show_cmd): 24 while True: 25 test = qTasks.get(block=True, timeout=sys.maxint) 26 if test is EndMarker: 27 qWatch.put(EndMarker) 28 qResults.put(EndMarker) 29 return 30 31 if not test.enable and not run_skipped: 32 qResults.put(NullTestOutput(test)) 33 continue 34 35 # Spawn the test task. 36 cmd = test.get_command(prefix) 37 if show_cmd: 38 print(escape_cmdline(cmd)) 39 tStart = datetime.now() 40 proc = subprocess.Popen(cmd, 41 stdout=subprocess.PIPE, 42 stderr=subprocess.PIPE) 43 44 # Push the task to the watchdog -- it will kill the task 45 # if it goes over the timeout while we keep its stdout 46 # buffer clear on the "main" worker thread. 47 qWatch.put(proc) 48 out, err = proc.communicate() 49 qWatch.put(TaskFinishedMarker) 50 51 # Create a result record and forward to result processing. 52 dt = datetime.now() - tStart 53 result = TestOutput(test, cmd, out, err, proc.returncode, dt.total_seconds(), 54 dt > timedelta(seconds=timeout)) 55 qResults.put(result) 56 57 58def _do_watch(qWatch, timeout): 59 while True: 60 proc = qWatch.get(True) 61 if proc == EndMarker: 62 return 63 try: 64 fin = qWatch.get(block=True, timeout=timeout) 65 assert fin is TaskFinishedMarker, "invalid finish marker" 66 except Empty: 67 # Timed out, force-kill the test. 68 try: 69 proc.terminate() 70 except WindowsError as ex: 71 # If the process finishes after we time out but before we 72 # terminate, the terminate call will fail. We can safely 73 # ignore this. 74 if ex.winerror != 5: 75 raise 76 fin = qWatch.get(block=True, timeout=sys.maxint) 77 assert fin is TaskFinishedMarker, "invalid finish marker" 78 79 80def run_all_tests(tests, prefix, pb, options): 81 """ 82 Uses scatter-gather to a thread-pool to manage children. 83 """ 84 qTasks, qResults = Queue(), Queue() 85 86 workers = [] 87 watchdogs = [] 88 for _ in range(options.worker_count): 89 qWatch = Queue() 90 watcher = Thread(target=_do_watch, args=(qWatch, options.timeout)) 91 watcher.setDaemon(True) 92 watcher.start() 93 watchdogs.append(watcher) 94 worker = Thread(target=_do_work, args=(qTasks, qResults, qWatch, 95 prefix, options.run_skipped, 96 options.timeout, options.show_cmd)) 97 worker.setDaemon(True) 98 worker.start() 99 workers.append(worker) 100 101 # Insert all jobs into the queue, followed by the queue-end 102 # marker, one per worker. This will not block on growing the 103 # queue, only on waiting for more items in the generator. The 104 # workers are already started, however, so this will process as 105 # fast as we can produce tests from the filesystem. 106 def _do_push(num_workers, qTasks): 107 for test in tests: 108 qTasks.put(test) 109 for _ in range(num_workers): 110 qTasks.put(EndMarker) 111 pusher = Thread(target=_do_push, args=(len(workers), qTasks)) 112 pusher.setDaemon(True) 113 pusher.start() 114 115 # Read from the results. 116 ended = 0 117 delay = ProgressBar.update_granularity().total_seconds() 118 while ended < len(workers): 119 try: 120 result = qResults.get(block=True, timeout=delay) 121 if result is EndMarker: 122 ended += 1 123 else: 124 yield result 125 except Empty: 126 pb.poke() 127 128 # Cleanup and exit. 129 pusher.join() 130 for worker in workers: 131 worker.join() 132 for watcher in watchdogs: 133 watcher.join() 134 assert qTasks.empty(), "Send queue not drained" 135 assert qResults.empty(), "Result queue not drained" 136