1"""SCons.Job
2
3This module defines the Serial and Parallel classes that execute tasks to
4complete a build. The Jobs class provides a higher level interface to start,
5stop, and wait on jobs.
6
7"""
8
9#
10# Copyright (c) 2001 - 2014 The SCons Foundation
11#
12# Permission is hereby granted, free of charge, to any person obtaining
13# a copy of this software and associated documentation files (the
14# "Software"), to deal in the Software without restriction, including
15# without limitation the rights to use, copy, modify, merge, publish,
16# distribute, sublicense, and/or sell copies of the Software, and to
17# permit persons to whom the Software is furnished to do so, subject to
18# the following conditions:
19#
20# The above copyright notice and this permission notice shall be included
21# in all copies or substantial portions of the Software.
22#
23# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
24# KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
25# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30#
31
32__revision__ = "src/engine/SCons/Job.py  2014/07/05 09:42:21 garyo"
33
34import SCons.compat
35
36import os
37import signal
38
39import SCons.Errors
40
41# The default stack size (in kilobytes) of the threads used to execute
42# jobs in parallel.
43#
44# We use a stack size of 256 kilobytes. The default on some platforms
45# is too large and prevents us from creating enough threads to fully
46# parallelized the build. For example, the default stack size on linux
47# is 8 MBytes.
48
49explicit_stack_size = None
50default_stack_size = 256
51
52interrupt_msg = 'Build interrupted.'
53
54
55class InterruptState(object):
56   def __init__(self):
57       self.interrupted = False
58
59   def set(self):
60       self.interrupted = True
61
62   def __call__(self):
63       return self.interrupted
64
65
66class Jobs(object):
67    """An instance of this class initializes N jobs, and provides
68    methods for starting, stopping, and waiting on all N jobs.
69    """
70
71    def __init__(self, num, taskmaster):
72        """
73        create 'num' jobs using the given taskmaster.
74
75        If 'num' is 1 or less, then a serial job will be used,
76        otherwise a parallel job with 'num' worker threads will
77        be used.
78
79        The 'num_jobs' attribute will be set to the actual number of jobs
80        allocated.  If more than one job is requested but the Parallel
81        class can't do it, it gets reset to 1.  Wrapping interfaces that
82        care should check the value of 'num_jobs' after initialization.
83        """
84
85        self.job = None
86        if num > 1:
87            stack_size = explicit_stack_size
88            if stack_size is None:
89                stack_size = default_stack_size
90
91            try:
92                self.job = Parallel(taskmaster, num, stack_size)
93                self.num_jobs = num
94            except NameError:
95                pass
96        if self.job is None:
97            self.job = Serial(taskmaster)
98            self.num_jobs = 1
99
100    def run(self, postfunc=lambda: None):
101        """Run the jobs.
102
103        postfunc() will be invoked after the jobs has run. It will be
104        invoked even if the jobs are interrupted by a keyboard
105        interrupt (well, in fact by a signal such as either SIGINT,
106        SIGTERM or SIGHUP). The execution of postfunc() is protected
107        against keyboard interrupts and is guaranteed to run to
108        completion."""
109        self._setup_sig_handler()
110        try:
111            self.job.start()
112        finally:
113            postfunc()
114            self._reset_sig_handler()
115
116    def were_interrupted(self):
117        """Returns whether the jobs were interrupted by a signal."""
118        return self.job.interrupted()
119
120    def _setup_sig_handler(self):
121        """Setup an interrupt handler so that SCons can shutdown cleanly in
122        various conditions:
123
124          a) SIGINT: Keyboard interrupt
125          b) SIGTERM: kill or system shutdown
126          c) SIGHUP: Controlling shell exiting
127
128        We handle all of these cases by stopping the taskmaster. It
129        turns out that it very difficult to stop the build process
130        by throwing asynchronously an exception such as
131        KeyboardInterrupt. For example, the python Condition
132        variables (threading.Condition) and queue's do not seem to
133        asynchronous-exception-safe. It would require adding a whole
134        bunch of try/finally block and except KeyboardInterrupt all
135        over the place.
136
137        Note also that we have to be careful to handle the case when
138        SCons forks before executing another process. In that case, we
139        want the child to exit immediately.
140        """
141        def handler(signum, stack, self=self, parentpid=os.getpid()):
142            if os.getpid() == parentpid:
143                self.job.taskmaster.stop()
144                self.job.interrupted.set()
145            else:
146                os._exit(2)
147
148        self.old_sigint  = signal.signal(signal.SIGINT, handler)
149        self.old_sigterm = signal.signal(signal.SIGTERM, handler)
150        try:
151            self.old_sighup = signal.signal(signal.SIGHUP, handler)
152        except AttributeError:
153            pass
154
155    def _reset_sig_handler(self):
156        """Restore the signal handlers to their previous state (before the
157         call to _setup_sig_handler()."""
158
159        signal.signal(signal.SIGINT, self.old_sigint)
160        signal.signal(signal.SIGTERM, self.old_sigterm)
161        try:
162            signal.signal(signal.SIGHUP, self.old_sighup)
163        except AttributeError:
164            pass
165
166class Serial(object):
167    """This class is used to execute tasks in series, and is more efficient
168    than Parallel, but is only appropriate for non-parallel builds. Only
169    one instance of this class should be in existence at a time.
170
171    This class is not thread safe.
172    """
173
174    def __init__(self, taskmaster):
175        """Create a new serial job given a taskmaster.
176
177        The taskmaster's next_task() method should return the next task
178        that needs to be executed, or None if there are no more tasks. The
179        taskmaster's executed() method will be called for each task when it
180        is successfully executed or failed() will be called if it failed to
181        execute (e.g. execute() raised an exception)."""
182
183        self.taskmaster = taskmaster
184        self.interrupted = InterruptState()
185
186    def start(self):
187        """Start the job. This will begin pulling tasks from the taskmaster
188        and executing them, and return when there are no more tasks. If a task
189        fails to execute (i.e. execute() raises an exception), then the job will
190        stop."""
191
192        while True:
193            task = self.taskmaster.next_task()
194
195            if task is None:
196                break
197
198            try:
199                task.prepare()
200                if task.needs_execute():
201                    task.execute()
202            except:
203                if self.interrupted():
204                    try:
205                        raise SCons.Errors.BuildError(
206                            task.targets[0], errstr=interrupt_msg)
207                    except:
208                        task.exception_set()
209                else:
210                    task.exception_set()
211
212                # Let the failed() callback function arrange for the
213                # build to stop if that's appropriate.
214                task.failed()
215            else:
216                task.executed()
217
218            task.postprocess()
219        self.taskmaster.cleanup()
220
221
222# Trap import failure so that everything in the Job module but the
223# Parallel class (and its dependent classes) will work if the interpreter
224# doesn't support threads.
225try:
226    import queue
227    import threading
228except ImportError:
229    pass
230else:
231    class Worker(threading.Thread):
232        """A worker thread waits on a task to be posted to its request queue,
233        dequeues the task, executes it, and posts a tuple including the task
234        and a boolean indicating whether the task executed successfully. """
235
236        def __init__(self, requestQueue, resultsQueue, interrupted):
237            threading.Thread.__init__(self)
238            self.setDaemon(1)
239            self.requestQueue = requestQueue
240            self.resultsQueue = resultsQueue
241            self.interrupted = interrupted
242            self.start()
243
244        def run(self):
245            while True:
246                task = self.requestQueue.get()
247
248                if task is None:
249                    # The "None" value is used as a sentinel by
250                    # ThreadPool.cleanup().  This indicates that there
251                    # are no more tasks, so we should quit.
252                    break
253
254                try:
255                    if self.interrupted():
256                        raise SCons.Errors.BuildError(
257                            task.targets[0], errstr=interrupt_msg)
258                    task.execute()
259                except:
260                    task.exception_set()
261                    ok = False
262                else:
263                    ok = True
264
265                self.resultsQueue.put((task, ok))
266
267    class ThreadPool(object):
268        """This class is responsible for spawning and managing worker threads."""
269
270        def __init__(self, num, stack_size, interrupted):
271            """Create the request and reply queues, and 'num' worker threads.
272
273            One must specify the stack size of the worker threads. The
274            stack size is specified in kilobytes.
275            """
276            self.requestQueue = queue.Queue(0)
277            self.resultsQueue = queue.Queue(0)
278
279            try:
280                prev_size = threading.stack_size(stack_size*1024)
281            except AttributeError, e:
282                # Only print a warning if the stack size has been
283                # explicitly set.
284                if not explicit_stack_size is None:
285                    msg = "Setting stack size is unsupported by this version of Python:\n    " + \
286                        e.args[0]
287                    SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
288            except ValueError, e:
289                msg = "Setting stack size failed:\n    " + str(e)
290                SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
291
292            # Create worker threads
293            self.workers = []
294            for _ in range(num):
295                worker = Worker(self.requestQueue, self.resultsQueue, interrupted)
296                self.workers.append(worker)
297
298            if 'prev_size' in locals():
299                threading.stack_size(prev_size)
300
301        def put(self, task):
302            """Put task into request queue."""
303            self.requestQueue.put(task)
304
305        def get(self):
306            """Remove and return a result tuple from the results queue."""
307            return self.resultsQueue.get()
308
309        def preparation_failed(self, task):
310            self.resultsQueue.put((task, False))
311
312        def cleanup(self):
313            """
314            Shuts down the thread pool, giving each worker thread a
315            chance to shut down gracefully.
316            """
317            # For each worker thread, put a sentinel "None" value
318            # on the requestQueue (indicating that there's no work
319            # to be done) so that each worker thread will get one and
320            # terminate gracefully.
321            for _ in self.workers:
322                self.requestQueue.put(None)
323
324            # Wait for all of the workers to terminate.
325            #
326            # If we don't do this, later Python versions (2.4, 2.5) often
327            # seem to raise exceptions during shutdown.  This happens
328            # in requestQueue.get(), as an assertion failure that
329            # requestQueue.not_full is notified while not acquired,
330            # seemingly because the main thread has shut down (or is
331            # in the process of doing so) while the workers are still
332            # trying to pull sentinels off the requestQueue.
333            #
334            # Normally these terminations should happen fairly quickly,
335            # but we'll stick a one-second timeout on here just in case
336            # someone gets hung.
337            for worker in self.workers:
338                worker.join(1.0)
339            self.workers = []
340
341    class Parallel(object):
342        """This class is used to execute tasks in parallel, and is somewhat
343        less efficient than Serial, but is appropriate for parallel builds.
344
345        This class is thread safe.
346        """
347
348        def __init__(self, taskmaster, num, stack_size):
349            """Create a new parallel job given a taskmaster.
350
351            The taskmaster's next_task() method should return the next
352            task that needs to be executed, or None if there are no more
353            tasks. The taskmaster's executed() method will be called
354            for each task when it is successfully executed or failed()
355            will be called if the task failed to execute (i.e. execute()
356            raised an exception).
357
358            Note: calls to taskmaster are serialized, but calls to
359            execute() on distinct tasks are not serialized, because
360            that is the whole point of parallel jobs: they can execute
361            multiple tasks simultaneously. """
362
363            self.taskmaster = taskmaster
364            self.interrupted = InterruptState()
365            self.tp = ThreadPool(num, stack_size, self.interrupted)
366
367            self.maxjobs = num
368
369        def start(self):
370            """Start the job. This will begin pulling tasks from the
371            taskmaster and executing them, and return when there are no
372            more tasks. If a task fails to execute (i.e. execute() raises
373            an exception), then the job will stop."""
374
375            jobs = 0
376
377            while True:
378                # Start up as many available tasks as we're
379                # allowed to.
380                while jobs < self.maxjobs:
381                    task = self.taskmaster.next_task()
382                    if task is None:
383                        break
384
385                    try:
386                        # prepare task for execution
387                        task.prepare()
388                    except:
389                        task.exception_set()
390                        task.failed()
391                        task.postprocess()
392                    else:
393                        if task.needs_execute():
394                            # dispatch task
395                            self.tp.put(task)
396                            jobs = jobs + 1
397                        else:
398                            task.executed()
399                            task.postprocess()
400
401                if not task and not jobs: break
402
403                # Let any/all completed tasks finish up before we go
404                # back and put the next batch of tasks on the queue.
405                while True:
406                    task, ok = self.tp.get()
407                    jobs = jobs - 1
408
409                    if ok:
410                        task.executed()
411                    else:
412                        if self.interrupted():
413                            try:
414                                raise SCons.Errors.BuildError(
415                                    task.targets[0], errstr=interrupt_msg)
416                            except:
417                                task.exception_set()
418
419                        # Let the failed() callback function arrange
420                        # for the build to stop if that's appropriate.
421                        task.failed()
422
423                    task.postprocess()
424
425                    if self.tp.resultsQueue.empty():
426                        break
427
428            self.tp.cleanup()
429            self.taskmaster.cleanup()
430
431# Local Variables:
432# tab-width:4
433# indent-tabs-mode:nil
434# End:
435# vim: set expandtab tabstop=4 shiftwidth=4:
436