1#!/usr/bin/env python
2# Copyright 2014 the V8 project authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6# for py2/py3 compatibility
7from __future__ import print_function
8
9from contextlib import contextmanager
10from multiprocessing import Process, Queue
11import os
12import signal
13import time
14import traceback
15
16try:
17  from queue import Empty  # Python 3
18except ImportError:
19  from Queue import Empty  # Python 2
20
21from . import command
22from . import utils
23
24
25def setup_testing():
26  """For testing only: Use threading under the hood instead of multiprocessing
27  to make coverage work.
28  """
29  global Queue
30  global Process
31  del Queue
32  del Process
33  try:
34    from queue import Queue  # Python 3
35  except ImportError:
36    from Queue import Queue  # Python 2
37
38  from threading import Thread as Process
39  # Monkeypatch threading Queue to look like multiprocessing Queue.
40  Queue.cancel_join_thread = lambda self: None
41  # Monkeypatch os.kill and add fake pid property on Thread.
42  os.kill = lambda *args: None
43  Process.pid = property(lambda self: None)
44
45
46class NormalResult():
47  def __init__(self, result):
48    self.result = result
49    self.exception = None
50
51class ExceptionResult():
52  def __init__(self, exception):
53    self.exception = exception
54
55
56class MaybeResult():
57  def __init__(self, heartbeat, value):
58    self.heartbeat = heartbeat
59    self.value = value
60
61  @staticmethod
62  def create_heartbeat():
63    return MaybeResult(True, None)
64
65  @staticmethod
66  def create_result(value):
67    return MaybeResult(False, value)
68
69
70def Worker(fn, work_queue, done_queue,
71           process_context_fn=None, process_context_args=None):
72  """Worker to be run in a child process.
73  The worker stops when the poison pill "STOP" is reached.
74  """
75  try:
76    kwargs = {}
77    if process_context_fn and process_context_args is not None:
78      kwargs.update(process_context=process_context_fn(*process_context_args))
79    for args in iter(work_queue.get, "STOP"):
80      try:
81        done_queue.put(NormalResult(fn(*args, **kwargs)))
82      except command.AbortException:
83        # SIGINT, SIGTERM or internal hard timeout.
84        break
85      except Exception as e:
86        traceback.print_exc()
87        print(">>> EXCEPTION: %s" % e)
88        done_queue.put(ExceptionResult(e))
89    # When we reach here on normal tear down, all items have been pulled from
90    # the done_queue before and this should have no effect. On fast abort, it's
91    # possible that a fast worker left items on the done_queue in memory, which
92    # will never be pulled. This call purges those to avoid a deadlock.
93    done_queue.cancel_join_thread()
94  except KeyboardInterrupt:
95    assert False, 'Unreachable'
96
97
98@contextmanager
99def without_sig():
100  int_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
101  term_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN)
102  try:
103    yield
104  finally:
105    signal.signal(signal.SIGINT, int_handler)
106    signal.signal(signal.SIGTERM, term_handler)
107
108
109class Pool():
110  """Distributes tasks to a number of worker processes.
111  New tasks can be added dynamically even after the workers have been started.
112  Requirement: Tasks can only be added from the parent process, e.g. while
113  consuming the results generator."""
114
115  # Factor to calculate the maximum number of items in the work/done queue.
116  # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
117  BUFFER_FACTOR = 4
118
119  def __init__(self, num_workers, heartbeat_timeout=1, notify_fun=None):
120    """
121    Args:
122      num_workers: Number of worker processes to run in parallel.
123      heartbeat_timeout: Timeout in seconds for waiting for results. Each time
124          the timeout is reached, a heartbeat is signalled and timeout is reset.
125      notify_fun: Callable called to signale some events like termination. The
126          event name is passed as string.
127    """
128    self.num_workers = num_workers
129    self.processes = []
130    self.terminated = False
131    self.abort_now = False
132
133    # Invariant: processing_count >= #work_queue + #done_queue. It is greater
134    # when a worker takes an item from the work_queue and before the result is
135    # submitted to the done_queue. It is equal when no worker is working,
136    # e.g. when all workers have finished, and when no results are processed.
137    # Count is only accessed by the parent process. Only the parent process is
138    # allowed to remove items from the done_queue and to add items to the
139    # work_queue.
140    self.processing_count = 0
141    self.heartbeat_timeout = heartbeat_timeout
142    self.notify = notify_fun or (lambda x: x)
143
144    # Disable sigint and sigterm to prevent subprocesses from capturing the
145    # signals.
146    with without_sig():
147      self.work_queue = Queue()
148      self.done_queue = Queue()
149
150  def imap_unordered(self, fn, gen,
151                     process_context_fn=None, process_context_args=None):
152    """Maps function "fn" to items in generator "gen" on the worker processes
153    in an arbitrary order. The items are expected to be lists of arguments to
154    the function. Returns a results iterator. A result value of type
155    MaybeResult either indicates a heartbeat of the runner, i.e. indicating
156    that the runner is still waiting for the result to be computed, or it wraps
157    the real result.
158
159    Args:
160      process_context_fn: Function executed once by each worker. Expected to
161          return a process-context object. If present, this object is passed
162          as additional argument to each call to fn.
163      process_context_args: List of arguments for the invocation of
164          process_context_fn. All arguments will be pickled and sent beyond the
165          process boundary.
166    """
167    if self.terminated:
168      return
169    try:
170      internal_error = False
171      gen = iter(gen)
172      self.advance = self._advance_more
173
174      # Disable sigint and sigterm to prevent subprocesses from capturing the
175      # signals.
176      with without_sig():
177        for w in range(self.num_workers):
178          p = Process(target=Worker, args=(fn,
179                                          self.work_queue,
180                                          self.done_queue,
181                                          process_context_fn,
182                                          process_context_args))
183          p.start()
184          self.processes.append(p)
185
186      self.advance(gen)
187      while self.processing_count > 0:
188        while True:
189          try:
190            # Read from result queue in a responsive fashion. If available,
191            # this will return a normal result immediately or a heartbeat on
192            # heartbeat timeout (default 1 second).
193            result = self._get_result_from_queue()
194          except:
195            # TODO(machenbach): Handle a few known types of internal errors
196            # gracefully, e.g. missing test files.
197            internal_error = True
198            continue
199          finally:
200            if self.abort_now:
201              # SIGINT, SIGTERM or internal hard timeout.
202              return
203
204          yield result
205          break
206
207        self.advance(gen)
208    except KeyboardInterrupt:
209      assert False, 'Unreachable'
210    except Exception as e:
211      traceback.print_exc()
212      print(">>> EXCEPTION: %s" % e)
213    finally:
214      self._terminate()
215
216    if internal_error:
217      raise Exception("Internal error in a worker process.")
218
219  def _advance_more(self, gen):
220    while self.processing_count < self.num_workers * self.BUFFER_FACTOR:
221      try:
222        self.work_queue.put(next(gen))
223        self.processing_count += 1
224      except StopIteration:
225        self.advance = self._advance_empty
226        break
227
228  def _advance_empty(self, gen):
229    pass
230
231  def add(self, args):
232    """Adds an item to the work queue. Can be called dynamically while
233    processing the results from imap_unordered."""
234    assert not self.terminated
235
236    self.work_queue.put(args)
237    self.processing_count += 1
238
239  def abort(self):
240    """Schedules abort on next queue read.
241
242    This is safe to call when handling SIGINT, SIGTERM or when an internal
243    hard timeout is reached.
244    """
245    self.abort_now = True
246
247  def _terminate_processes(self):
248    for p in self.processes:
249      if utils.IsWindows():
250        command.taskkill_windows(p, verbose=True, force=False)
251      else:
252        os.kill(p.pid, signal.SIGTERM)
253
254  def _terminate(self):
255    """Terminates execution and cleans up the queues.
256
257    If abort() was called before termination, this also terminates the
258    subprocesses and doesn't wait for ongoing tests.
259    """
260    if self.terminated:
261      return
262    self.terminated = True
263
264    # Drain out work queue from tests
265    try:
266      while True:
267        self.work_queue.get(True, 0.1)
268    except Empty:
269      pass
270
271    # Make sure all processes stop
272    for _ in self.processes:
273      # During normal tear down the workers block on get(). Feed a poison pill
274      # per worker to make them stop.
275      self.work_queue.put("STOP")
276
277    if self.abort_now:
278      self._terminate_processes()
279
280    self.notify("Joining workers")
281    for p in self.processes:
282      p.join()
283
284    # Drain the queues to prevent stderr chatter when queues are garbage
285    # collected.
286    self.notify("Draining queues")
287    try:
288      while True: self.work_queue.get(False)
289    except:
290      pass
291    try:
292      while True: self.done_queue.get(False)
293    except:
294      pass
295
296  def _get_result_from_queue(self):
297    """Attempts to get the next result from the queue.
298
299    Returns: A wrapped result if one was available within heartbeat timeout,
300        a heartbeat result otherwise.
301    Raises:
302        Exception: If an exception occured when processing the task on the
303            worker side, it is reraised here.
304    """
305    while True:
306      try:
307        result = self.done_queue.get(timeout=self.heartbeat_timeout)
308        self.processing_count -= 1
309        if result.exception:
310          raise result.exception
311        return MaybeResult.create_result(result.result)
312      except Empty:
313        return MaybeResult.create_heartbeat()
314