1#!/usr/bin/env python 2# Copyright 2014 the V8 project authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6# for py2/py3 compatibility 7from __future__ import print_function 8 9from contextlib import contextmanager 10from multiprocessing import Process, Queue 11import os 12import signal 13import time 14import traceback 15 16try: 17 from queue import Empty # Python 3 18except ImportError: 19 from Queue import Empty # Python 2 20 21from . import command 22from . import utils 23 24 25def setup_testing(): 26 """For testing only: Use threading under the hood instead of multiprocessing 27 to make coverage work. 28 """ 29 global Queue 30 global Process 31 del Queue 32 del Process 33 try: 34 from queue import Queue # Python 3 35 except ImportError: 36 from Queue import Queue # Python 2 37 38 from threading import Thread as Process 39 # Monkeypatch threading Queue to look like multiprocessing Queue. 40 Queue.cancel_join_thread = lambda self: None 41 # Monkeypatch os.kill and add fake pid property on Thread. 42 os.kill = lambda *args: None 43 Process.pid = property(lambda self: None) 44 45 46class NormalResult(): 47 def __init__(self, result): 48 self.result = result 49 self.exception = None 50 51class ExceptionResult(): 52 def __init__(self, exception): 53 self.exception = exception 54 55 56class MaybeResult(): 57 def __init__(self, heartbeat, value): 58 self.heartbeat = heartbeat 59 self.value = value 60 61 @staticmethod 62 def create_heartbeat(): 63 return MaybeResult(True, None) 64 65 @staticmethod 66 def create_result(value): 67 return MaybeResult(False, value) 68 69 70def Worker(fn, work_queue, done_queue, 71 process_context_fn=None, process_context_args=None): 72 """Worker to be run in a child process. 73 The worker stops when the poison pill "STOP" is reached. 74 """ 75 try: 76 kwargs = {} 77 if process_context_fn and process_context_args is not None: 78 kwargs.update(process_context=process_context_fn(*process_context_args)) 79 for args in iter(work_queue.get, "STOP"): 80 try: 81 done_queue.put(NormalResult(fn(*args, **kwargs))) 82 except command.AbortException: 83 # SIGINT, SIGTERM or internal hard timeout. 84 break 85 except Exception as e: 86 traceback.print_exc() 87 print(">>> EXCEPTION: %s" % e) 88 done_queue.put(ExceptionResult(e)) 89 # When we reach here on normal tear down, all items have been pulled from 90 # the done_queue before and this should have no effect. On fast abort, it's 91 # possible that a fast worker left items on the done_queue in memory, which 92 # will never be pulled. This call purges those to avoid a deadlock. 93 done_queue.cancel_join_thread() 94 except KeyboardInterrupt: 95 assert False, 'Unreachable' 96 97 98@contextmanager 99def without_sig(): 100 int_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) 101 term_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN) 102 try: 103 yield 104 finally: 105 signal.signal(signal.SIGINT, int_handler) 106 signal.signal(signal.SIGTERM, term_handler) 107 108 109class Pool(): 110 """Distributes tasks to a number of worker processes. 111 New tasks can be added dynamically even after the workers have been started. 112 Requirement: Tasks can only be added from the parent process, e.g. while 113 consuming the results generator.""" 114 115 # Factor to calculate the maximum number of items in the work/done queue. 116 # Necessary to not overflow the queue's pipe if a keyboard interrupt happens. 117 BUFFER_FACTOR = 4 118 119 def __init__(self, num_workers, heartbeat_timeout=1, notify_fun=None): 120 """ 121 Args: 122 num_workers: Number of worker processes to run in parallel. 123 heartbeat_timeout: Timeout in seconds for waiting for results. Each time 124 the timeout is reached, a heartbeat is signalled and timeout is reset. 125 notify_fun: Callable called to signale some events like termination. The 126 event name is passed as string. 127 """ 128 self.num_workers = num_workers 129 self.processes = [] 130 self.terminated = False 131 self.abort_now = False 132 133 # Invariant: processing_count >= #work_queue + #done_queue. It is greater 134 # when a worker takes an item from the work_queue and before the result is 135 # submitted to the done_queue. It is equal when no worker is working, 136 # e.g. when all workers have finished, and when no results are processed. 137 # Count is only accessed by the parent process. Only the parent process is 138 # allowed to remove items from the done_queue and to add items to the 139 # work_queue. 140 self.processing_count = 0 141 self.heartbeat_timeout = heartbeat_timeout 142 self.notify = notify_fun or (lambda x: x) 143 144 # Disable sigint and sigterm to prevent subprocesses from capturing the 145 # signals. 146 with without_sig(): 147 self.work_queue = Queue() 148 self.done_queue = Queue() 149 150 def imap_unordered(self, fn, gen, 151 process_context_fn=None, process_context_args=None): 152 """Maps function "fn" to items in generator "gen" on the worker processes 153 in an arbitrary order. The items are expected to be lists of arguments to 154 the function. Returns a results iterator. A result value of type 155 MaybeResult either indicates a heartbeat of the runner, i.e. indicating 156 that the runner is still waiting for the result to be computed, or it wraps 157 the real result. 158 159 Args: 160 process_context_fn: Function executed once by each worker. Expected to 161 return a process-context object. If present, this object is passed 162 as additional argument to each call to fn. 163 process_context_args: List of arguments for the invocation of 164 process_context_fn. All arguments will be pickled and sent beyond the 165 process boundary. 166 """ 167 if self.terminated: 168 return 169 try: 170 internal_error = False 171 gen = iter(gen) 172 self.advance = self._advance_more 173 174 # Disable sigint and sigterm to prevent subprocesses from capturing the 175 # signals. 176 with without_sig(): 177 for w in range(self.num_workers): 178 p = Process(target=Worker, args=(fn, 179 self.work_queue, 180 self.done_queue, 181 process_context_fn, 182 process_context_args)) 183 p.start() 184 self.processes.append(p) 185 186 self.advance(gen) 187 while self.processing_count > 0: 188 while True: 189 try: 190 # Read from result queue in a responsive fashion. If available, 191 # this will return a normal result immediately or a heartbeat on 192 # heartbeat timeout (default 1 second). 193 result = self._get_result_from_queue() 194 except: 195 # TODO(machenbach): Handle a few known types of internal errors 196 # gracefully, e.g. missing test files. 197 internal_error = True 198 continue 199 finally: 200 if self.abort_now: 201 # SIGINT, SIGTERM or internal hard timeout. 202 return 203 204 yield result 205 break 206 207 self.advance(gen) 208 except KeyboardInterrupt: 209 assert False, 'Unreachable' 210 except Exception as e: 211 traceback.print_exc() 212 print(">>> EXCEPTION: %s" % e) 213 finally: 214 self._terminate() 215 216 if internal_error: 217 raise Exception("Internal error in a worker process.") 218 219 def _advance_more(self, gen): 220 while self.processing_count < self.num_workers * self.BUFFER_FACTOR: 221 try: 222 self.work_queue.put(next(gen)) 223 self.processing_count += 1 224 except StopIteration: 225 self.advance = self._advance_empty 226 break 227 228 def _advance_empty(self, gen): 229 pass 230 231 def add(self, args): 232 """Adds an item to the work queue. Can be called dynamically while 233 processing the results from imap_unordered.""" 234 assert not self.terminated 235 236 self.work_queue.put(args) 237 self.processing_count += 1 238 239 def abort(self): 240 """Schedules abort on next queue read. 241 242 This is safe to call when handling SIGINT, SIGTERM or when an internal 243 hard timeout is reached. 244 """ 245 self.abort_now = True 246 247 def _terminate_processes(self): 248 for p in self.processes: 249 if utils.IsWindows(): 250 command.taskkill_windows(p, verbose=True, force=False) 251 else: 252 os.kill(p.pid, signal.SIGTERM) 253 254 def _terminate(self): 255 """Terminates execution and cleans up the queues. 256 257 If abort() was called before termination, this also terminates the 258 subprocesses and doesn't wait for ongoing tests. 259 """ 260 if self.terminated: 261 return 262 self.terminated = True 263 264 # Drain out work queue from tests 265 try: 266 while True: 267 self.work_queue.get(True, 0.1) 268 except Empty: 269 pass 270 271 # Make sure all processes stop 272 for _ in self.processes: 273 # During normal tear down the workers block on get(). Feed a poison pill 274 # per worker to make them stop. 275 self.work_queue.put("STOP") 276 277 if self.abort_now: 278 self._terminate_processes() 279 280 self.notify("Joining workers") 281 for p in self.processes: 282 p.join() 283 284 # Drain the queues to prevent stderr chatter when queues are garbage 285 # collected. 286 self.notify("Draining queues") 287 try: 288 while True: self.work_queue.get(False) 289 except: 290 pass 291 try: 292 while True: self.done_queue.get(False) 293 except: 294 pass 295 296 def _get_result_from_queue(self): 297 """Attempts to get the next result from the queue. 298 299 Returns: A wrapped result if one was available within heartbeat timeout, 300 a heartbeat result otherwise. 301 Raises: 302 Exception: If an exception occured when processing the task on the 303 worker side, it is reraised here. 304 """ 305 while True: 306 try: 307 result = self.done_queue.get(timeout=self.heartbeat_timeout) 308 self.processing_count -= 1 309 if result.exception: 310 raise result.exception 311 return MaybeResult.create_result(result.result) 312 except Empty: 313 return MaybeResult.create_heartbeat() 314