1# MIT License
2#
3# Copyright The SCons Foundation
4#
5# Permission is hereby granted, free of charge, to any person obtaining
6# a copy of this software and associated documentation files (the
7# "Software"), to deal in the Software without restriction, including
8# without limitation the rights to use, copy, modify, merge, publish,
9# distribute, sublicense, and/or sell copies of the Software, and to
10# permit persons to whom the Software is furnished to do so, subject to
11# the following conditions:
12#
13# The above copyright notice and this permission notice shall be included
14# in all copies or substantial portions of the Software.
15#
16# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
17# KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
18# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
20# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
21# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23
24"""Serial and Parallel classes to execute build tasks.
25
26The Jobs class provides a higher level interface to start,
27stop, and wait on jobs.
28"""
29
30import SCons.compat
31
32import os
33import signal
34
35import SCons.Errors
36
37# The default stack size (in kilobytes) of the threads used to execute
38# jobs in parallel.
39#
40# We use a stack size of 256 kilobytes. The default on some platforms
41# is too large and prevents us from creating enough threads to fully
42# parallelized the build. For example, the default stack size on linux
43# is 8 MBytes.
44
45explicit_stack_size = None
46default_stack_size = 256
47
48interrupt_msg = 'Build interrupted.'
49
50
51class InterruptState:
52    def __init__(self):
53        self.interrupted = False
54
55    def set(self):
56        self.interrupted = True
57
58    def __call__(self):
59        return self.interrupted
60
61
62class Jobs:
63    """An instance of this class initializes N jobs, and provides
64    methods for starting, stopping, and waiting on all N jobs.
65    """
66
67    def __init__(self, num, taskmaster):
68        """
69        Create 'num' jobs using the given taskmaster.
70
71        If 'num' is 1 or less, then a serial job will be used,
72        otherwise a parallel job with 'num' worker threads will
73        be used.
74
75        The 'num_jobs' attribute will be set to the actual number of jobs
76        allocated.  If more than one job is requested but the Parallel
77        class can't do it, it gets reset to 1.  Wrapping interfaces that
78        care should check the value of 'num_jobs' after initialization.
79        """
80
81        self.job = None
82        if num > 1:
83            stack_size = explicit_stack_size
84            if stack_size is None:
85                stack_size = default_stack_size
86
87            try:
88                self.job = Parallel(taskmaster, num, stack_size)
89                self.num_jobs = num
90            except NameError:
91                pass
92        if self.job is None:
93            self.job = Serial(taskmaster)
94            self.num_jobs = 1
95
96    def run(self, postfunc=lambda: None):
97        """Run the jobs.
98
99        postfunc() will be invoked after the jobs has run. It will be
100        invoked even if the jobs are interrupted by a keyboard
101        interrupt (well, in fact by a signal such as either SIGINT,
102        SIGTERM or SIGHUP). The execution of postfunc() is protected
103        against keyboard interrupts and is guaranteed to run to
104        completion."""
105        self._setup_sig_handler()
106        try:
107            self.job.start()
108        finally:
109            postfunc()
110            self._reset_sig_handler()
111
112    def were_interrupted(self):
113        """Returns whether the jobs were interrupted by a signal."""
114        return self.job.interrupted()
115
116    def _setup_sig_handler(self):
117        """Setup an interrupt handler so that SCons can shutdown cleanly in
118        various conditions:
119
120          a) SIGINT: Keyboard interrupt
121          b) SIGTERM: kill or system shutdown
122          c) SIGHUP: Controlling shell exiting
123
124        We handle all of these cases by stopping the taskmaster. It
125        turns out that it's very difficult to stop the build process
126        by throwing asynchronously an exception such as
127        KeyboardInterrupt. For example, the python Condition
128        variables (threading.Condition) and queues do not seem to be
129        asynchronous-exception-safe. It would require adding a whole
130        bunch of try/finally block and except KeyboardInterrupt all
131        over the place.
132
133        Note also that we have to be careful to handle the case when
134        SCons forks before executing another process. In that case, we
135        want the child to exit immediately.
136        """
137        def handler(signum, stack, self=self, parentpid=os.getpid()):
138            if os.getpid() == parentpid:
139                self.job.taskmaster.stop()
140                self.job.interrupted.set()
141            else:
142                os._exit(2)
143
144        self.old_sigint  = signal.signal(signal.SIGINT, handler)
145        self.old_sigterm = signal.signal(signal.SIGTERM, handler)
146        try:
147            self.old_sighup = signal.signal(signal.SIGHUP, handler)
148        except AttributeError:
149            pass
150
151    def _reset_sig_handler(self):
152        """Restore the signal handlers to their previous state (before the
153         call to _setup_sig_handler()."""
154
155        signal.signal(signal.SIGINT, self.old_sigint)
156        signal.signal(signal.SIGTERM, self.old_sigterm)
157        try:
158            signal.signal(signal.SIGHUP, self.old_sighup)
159        except AttributeError:
160            pass
161
162class Serial:
163    """This class is used to execute tasks in series, and is more efficient
164    than Parallel, but is only appropriate for non-parallel builds. Only
165    one instance of this class should be in existence at a time.
166
167    This class is not thread safe.
168    """
169
170    def __init__(self, taskmaster):
171        """Create a new serial job given a taskmaster.
172
173        The taskmaster's next_task() method should return the next task
174        that needs to be executed, or None if there are no more tasks. The
175        taskmaster's executed() method will be called for each task when it
176        is successfully executed, or failed() will be called if it failed to
177        execute (e.g. execute() raised an exception)."""
178
179        self.taskmaster = taskmaster
180        self.interrupted = InterruptState()
181
182    def start(self):
183        """Start the job. This will begin pulling tasks from the taskmaster
184        and executing them, and return when there are no more tasks. If a task
185        fails to execute (i.e. execute() raises an exception), then the job will
186        stop."""
187
188        while True:
189            task = self.taskmaster.next_task()
190
191            if task is None:
192                break
193
194            try:
195                task.prepare()
196                if task.needs_execute():
197                    task.execute()
198            except Exception:
199                if self.interrupted():
200                    try:
201                        raise SCons.Errors.BuildError(
202                            task.targets[0], errstr=interrupt_msg)
203                    except:
204                        task.exception_set()
205                else:
206                    task.exception_set()
207
208                # Let the failed() callback function arrange for the
209                # build to stop if that's appropriate.
210                task.failed()
211            else:
212                task.executed()
213
214            task.postprocess()
215        self.taskmaster.cleanup()
216
217
218# Trap import failure so that everything in the Job module but the
219# Parallel class (and its dependent classes) will work if the interpreter
220# doesn't support threads.
221try:
222    import queue
223    import threading
224except ImportError:
225    pass
226else:
227    class Worker(threading.Thread):
228        """A worker thread waits on a task to be posted to its request queue,
229        dequeues the task, executes it, and posts a tuple including the task
230        and a boolean indicating whether the task executed successfully. """
231
232        def __init__(self, requestQueue, resultsQueue, interrupted):
233            threading.Thread.__init__(self)
234            self.setDaemon(1)
235            self.requestQueue = requestQueue
236            self.resultsQueue = resultsQueue
237            self.interrupted = interrupted
238            self.start()
239
240        def run(self):
241            while True:
242                task = self.requestQueue.get()
243
244                if task is None:
245                    # The "None" value is used as a sentinel by
246                    # ThreadPool.cleanup().  This indicates that there
247                    # are no more tasks, so we should quit.
248                    break
249
250                try:
251                    if self.interrupted():
252                        raise SCons.Errors.BuildError(
253                            task.targets[0], errstr=interrupt_msg)
254                    task.execute()
255                except:
256                    task.exception_set()
257                    ok = False
258                else:
259                    ok = True
260
261                self.resultsQueue.put((task, ok))
262
263    class ThreadPool:
264        """This class is responsible for spawning and managing worker threads."""
265
266        def __init__(self, num, stack_size, interrupted):
267            """Create the request and reply queues, and 'num' worker threads.
268
269            One must specify the stack size of the worker threads. The
270            stack size is specified in kilobytes.
271            """
272            self.requestQueue = queue.Queue(0)
273            self.resultsQueue = queue.Queue(0)
274
275            try:
276                prev_size = threading.stack_size(stack_size*1024)
277            except AttributeError as e:
278                # Only print a warning if the stack size has been
279                # explicitly set.
280                if explicit_stack_size is not None:
281                    msg = "Setting stack size is unsupported by this version of Python:\n    " + \
282                        e.args[0]
283                    SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
284            except ValueError as e:
285                msg = "Setting stack size failed:\n    " + str(e)
286                SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
287
288            # Create worker threads
289            self.workers = []
290            for _ in range(num):
291                worker = Worker(self.requestQueue, self.resultsQueue, interrupted)
292                self.workers.append(worker)
293
294            if 'prev_size' in locals():
295                threading.stack_size(prev_size)
296
297        def put(self, task):
298            """Put task into request queue."""
299            self.requestQueue.put(task)
300
301        def get(self):
302            """Remove and return a result tuple from the results queue."""
303            return self.resultsQueue.get()
304
305        def preparation_failed(self, task):
306            self.resultsQueue.put((task, False))
307
308        def cleanup(self):
309            """
310            Shuts down the thread pool, giving each worker thread a
311            chance to shut down gracefully.
312            """
313            # For each worker thread, put a sentinel "None" value
314            # on the requestQueue (indicating that there's no work
315            # to be done) so that each worker thread will get one and
316            # terminate gracefully.
317            for _ in self.workers:
318                self.requestQueue.put(None)
319
320            # Wait for all of the workers to terminate.
321            #
322            # If we don't do this, later Python versions (2.4, 2.5) often
323            # seem to raise exceptions during shutdown.  This happens
324            # in requestQueue.get(), as an assertion failure that
325            # requestQueue.not_full is notified while not acquired,
326            # seemingly because the main thread has shut down (or is
327            # in the process of doing so) while the workers are still
328            # trying to pull sentinels off the requestQueue.
329            #
330            # Normally these terminations should happen fairly quickly,
331            # but we'll stick a one-second timeout on here just in case
332            # someone gets hung.
333            for worker in self.workers:
334                worker.join(1.0)
335            self.workers = []
336
337    class Parallel:
338        """This class is used to execute tasks in parallel, and is somewhat
339        less efficient than Serial, but is appropriate for parallel builds.
340
341        This class is thread safe.
342        """
343
344        def __init__(self, taskmaster, num, stack_size):
345            """Create a new parallel job given a taskmaster.
346
347            The taskmaster's next_task() method should return the next
348            task that needs to be executed, or None if there are no more
349            tasks. The taskmaster's executed() method will be called
350            for each task when it is successfully executed, or failed()
351            will be called if the task failed to execute (i.e. execute()
352            raised an exception).
353
354            Note: calls to taskmaster are serialized, but calls to
355            execute() on distinct tasks are not serialized, because
356            that is the whole point of parallel jobs: they can execute
357            multiple tasks simultaneously. """
358
359            self.taskmaster = taskmaster
360            self.interrupted = InterruptState()
361            self.tp = ThreadPool(num, stack_size, self.interrupted)
362
363            self.maxjobs = num
364
365        def start(self):
366            """Start the job. This will begin pulling tasks from the
367            taskmaster and executing them, and return when there are no
368            more tasks. If a task fails to execute (i.e. execute() raises
369            an exception), then the job will stop."""
370
371            jobs = 0
372
373            while True:
374                # Start up as many available tasks as we're
375                # allowed to.
376                while jobs < self.maxjobs:
377                    task = self.taskmaster.next_task()
378                    if task is None:
379                        break
380
381                    try:
382                        # prepare task for execution
383                        task.prepare()
384                    except:
385                        task.exception_set()
386                        task.failed()
387                        task.postprocess()
388                    else:
389                        if task.needs_execute():
390                            # dispatch task
391                            self.tp.put(task)
392                            jobs = jobs + 1
393                        else:
394                            task.executed()
395                            task.postprocess()
396
397                if not task and not jobs: break
398
399                # Let any/all completed tasks finish up before we go
400                # back and put the next batch of tasks on the queue.
401                while True:
402                    task, ok = self.tp.get()
403                    jobs = jobs - 1
404
405                    if ok:
406                        task.executed()
407                    else:
408                        if self.interrupted():
409                            try:
410                                raise SCons.Errors.BuildError(
411                                    task.targets[0], errstr=interrupt_msg)
412                            except:
413                                task.exception_set()
414
415                        # Let the failed() callback function arrange
416                        # for the build to stop if that's appropriate.
417                        task.failed()
418
419                    task.postprocess()
420
421                    if self.tp.resultsQueue.empty():
422                        break
423
424            self.tp.cleanup()
425            self.taskmaster.cleanup()
426
427# Local Variables:
428# tab-width:4
429# indent-tabs-mode:nil
430# End:
431# vim: set expandtab tabstop=4 shiftwidth=4:
432