1# MIT License 2# 3# Copyright The SCons Foundation 4# 5# Permission is hereby granted, free of charge, to any person obtaining 6# a copy of this software and associated documentation files (the 7# "Software"), to deal in the Software without restriction, including 8# without limitation the rights to use, copy, modify, merge, publish, 9# distribute, sublicense, and/or sell copies of the Software, and to 10# permit persons to whom the Software is furnished to do so, subject to 11# the following conditions: 12# 13# The above copyright notice and this permission notice shall be included 14# in all copies or substantial portions of the Software. 15# 16# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY 17# KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE 18# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 19# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 20# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 21# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 22# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 23 24"""Serial and Parallel classes to execute build tasks. 25 26The Jobs class provides a higher level interface to start, 27stop, and wait on jobs. 28""" 29 30import SCons.compat 31 32import os 33import signal 34 35import SCons.Errors 36 37# The default stack size (in kilobytes) of the threads used to execute 38# jobs in parallel. 39# 40# We use a stack size of 256 kilobytes. The default on some platforms 41# is too large and prevents us from creating enough threads to fully 42# parallelized the build. For example, the default stack size on linux 43# is 8 MBytes. 44 45explicit_stack_size = None 46default_stack_size = 256 47 48interrupt_msg = 'Build interrupted.' 49 50 51class InterruptState: 52 def __init__(self): 53 self.interrupted = False 54 55 def set(self): 56 self.interrupted = True 57 58 def __call__(self): 59 return self.interrupted 60 61 62class Jobs: 63 """An instance of this class initializes N jobs, and provides 64 methods for starting, stopping, and waiting on all N jobs. 65 """ 66 67 def __init__(self, num, taskmaster): 68 """ 69 Create 'num' jobs using the given taskmaster. 70 71 If 'num' is 1 or less, then a serial job will be used, 72 otherwise a parallel job with 'num' worker threads will 73 be used. 74 75 The 'num_jobs' attribute will be set to the actual number of jobs 76 allocated. If more than one job is requested but the Parallel 77 class can't do it, it gets reset to 1. Wrapping interfaces that 78 care should check the value of 'num_jobs' after initialization. 79 """ 80 81 self.job = None 82 if num > 1: 83 stack_size = explicit_stack_size 84 if stack_size is None: 85 stack_size = default_stack_size 86 87 try: 88 self.job = Parallel(taskmaster, num, stack_size) 89 self.num_jobs = num 90 except NameError: 91 pass 92 if self.job is None: 93 self.job = Serial(taskmaster) 94 self.num_jobs = 1 95 96 def run(self, postfunc=lambda: None): 97 """Run the jobs. 98 99 postfunc() will be invoked after the jobs has run. It will be 100 invoked even if the jobs are interrupted by a keyboard 101 interrupt (well, in fact by a signal such as either SIGINT, 102 SIGTERM or SIGHUP). The execution of postfunc() is protected 103 against keyboard interrupts and is guaranteed to run to 104 completion.""" 105 self._setup_sig_handler() 106 try: 107 self.job.start() 108 finally: 109 postfunc() 110 self._reset_sig_handler() 111 112 def were_interrupted(self): 113 """Returns whether the jobs were interrupted by a signal.""" 114 return self.job.interrupted() 115 116 def _setup_sig_handler(self): 117 """Setup an interrupt handler so that SCons can shutdown cleanly in 118 various conditions: 119 120 a) SIGINT: Keyboard interrupt 121 b) SIGTERM: kill or system shutdown 122 c) SIGHUP: Controlling shell exiting 123 124 We handle all of these cases by stopping the taskmaster. It 125 turns out that it's very difficult to stop the build process 126 by throwing asynchronously an exception such as 127 KeyboardInterrupt. For example, the python Condition 128 variables (threading.Condition) and queues do not seem to be 129 asynchronous-exception-safe. It would require adding a whole 130 bunch of try/finally block and except KeyboardInterrupt all 131 over the place. 132 133 Note also that we have to be careful to handle the case when 134 SCons forks before executing another process. In that case, we 135 want the child to exit immediately. 136 """ 137 def handler(signum, stack, self=self, parentpid=os.getpid()): 138 if os.getpid() == parentpid: 139 self.job.taskmaster.stop() 140 self.job.interrupted.set() 141 else: 142 os._exit(2) 143 144 self.old_sigint = signal.signal(signal.SIGINT, handler) 145 self.old_sigterm = signal.signal(signal.SIGTERM, handler) 146 try: 147 self.old_sighup = signal.signal(signal.SIGHUP, handler) 148 except AttributeError: 149 pass 150 151 def _reset_sig_handler(self): 152 """Restore the signal handlers to their previous state (before the 153 call to _setup_sig_handler().""" 154 155 signal.signal(signal.SIGINT, self.old_sigint) 156 signal.signal(signal.SIGTERM, self.old_sigterm) 157 try: 158 signal.signal(signal.SIGHUP, self.old_sighup) 159 except AttributeError: 160 pass 161 162class Serial: 163 """This class is used to execute tasks in series, and is more efficient 164 than Parallel, but is only appropriate for non-parallel builds. Only 165 one instance of this class should be in existence at a time. 166 167 This class is not thread safe. 168 """ 169 170 def __init__(self, taskmaster): 171 """Create a new serial job given a taskmaster. 172 173 The taskmaster's next_task() method should return the next task 174 that needs to be executed, or None if there are no more tasks. The 175 taskmaster's executed() method will be called for each task when it 176 is successfully executed, or failed() will be called if it failed to 177 execute (e.g. execute() raised an exception).""" 178 179 self.taskmaster = taskmaster 180 self.interrupted = InterruptState() 181 182 def start(self): 183 """Start the job. This will begin pulling tasks from the taskmaster 184 and executing them, and return when there are no more tasks. If a task 185 fails to execute (i.e. execute() raises an exception), then the job will 186 stop.""" 187 188 while True: 189 task = self.taskmaster.next_task() 190 191 if task is None: 192 break 193 194 try: 195 task.prepare() 196 if task.needs_execute(): 197 task.execute() 198 except Exception: 199 if self.interrupted(): 200 try: 201 raise SCons.Errors.BuildError( 202 task.targets[0], errstr=interrupt_msg) 203 except: 204 task.exception_set() 205 else: 206 task.exception_set() 207 208 # Let the failed() callback function arrange for the 209 # build to stop if that's appropriate. 210 task.failed() 211 else: 212 task.executed() 213 214 task.postprocess() 215 self.taskmaster.cleanup() 216 217 218# Trap import failure so that everything in the Job module but the 219# Parallel class (and its dependent classes) will work if the interpreter 220# doesn't support threads. 221try: 222 import queue 223 import threading 224except ImportError: 225 pass 226else: 227 class Worker(threading.Thread): 228 """A worker thread waits on a task to be posted to its request queue, 229 dequeues the task, executes it, and posts a tuple including the task 230 and a boolean indicating whether the task executed successfully. """ 231 232 def __init__(self, requestQueue, resultsQueue, interrupted): 233 threading.Thread.__init__(self) 234 self.setDaemon(1) 235 self.requestQueue = requestQueue 236 self.resultsQueue = resultsQueue 237 self.interrupted = interrupted 238 self.start() 239 240 def run(self): 241 while True: 242 task = self.requestQueue.get() 243 244 if task is None: 245 # The "None" value is used as a sentinel by 246 # ThreadPool.cleanup(). This indicates that there 247 # are no more tasks, so we should quit. 248 break 249 250 try: 251 if self.interrupted(): 252 raise SCons.Errors.BuildError( 253 task.targets[0], errstr=interrupt_msg) 254 task.execute() 255 except: 256 task.exception_set() 257 ok = False 258 else: 259 ok = True 260 261 self.resultsQueue.put((task, ok)) 262 263 class ThreadPool: 264 """This class is responsible for spawning and managing worker threads.""" 265 266 def __init__(self, num, stack_size, interrupted): 267 """Create the request and reply queues, and 'num' worker threads. 268 269 One must specify the stack size of the worker threads. The 270 stack size is specified in kilobytes. 271 """ 272 self.requestQueue = queue.Queue(0) 273 self.resultsQueue = queue.Queue(0) 274 275 try: 276 prev_size = threading.stack_size(stack_size*1024) 277 except AttributeError as e: 278 # Only print a warning if the stack size has been 279 # explicitly set. 280 if explicit_stack_size is not None: 281 msg = "Setting stack size is unsupported by this version of Python:\n " + \ 282 e.args[0] 283 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) 284 except ValueError as e: 285 msg = "Setting stack size failed:\n " + str(e) 286 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) 287 288 # Create worker threads 289 self.workers = [] 290 for _ in range(num): 291 worker = Worker(self.requestQueue, self.resultsQueue, interrupted) 292 self.workers.append(worker) 293 294 if 'prev_size' in locals(): 295 threading.stack_size(prev_size) 296 297 def put(self, task): 298 """Put task into request queue.""" 299 self.requestQueue.put(task) 300 301 def get(self): 302 """Remove and return a result tuple from the results queue.""" 303 return self.resultsQueue.get() 304 305 def preparation_failed(self, task): 306 self.resultsQueue.put((task, False)) 307 308 def cleanup(self): 309 """ 310 Shuts down the thread pool, giving each worker thread a 311 chance to shut down gracefully. 312 """ 313 # For each worker thread, put a sentinel "None" value 314 # on the requestQueue (indicating that there's no work 315 # to be done) so that each worker thread will get one and 316 # terminate gracefully. 317 for _ in self.workers: 318 self.requestQueue.put(None) 319 320 # Wait for all of the workers to terminate. 321 # 322 # If we don't do this, later Python versions (2.4, 2.5) often 323 # seem to raise exceptions during shutdown. This happens 324 # in requestQueue.get(), as an assertion failure that 325 # requestQueue.not_full is notified while not acquired, 326 # seemingly because the main thread has shut down (or is 327 # in the process of doing so) while the workers are still 328 # trying to pull sentinels off the requestQueue. 329 # 330 # Normally these terminations should happen fairly quickly, 331 # but we'll stick a one-second timeout on here just in case 332 # someone gets hung. 333 for worker in self.workers: 334 worker.join(1.0) 335 self.workers = [] 336 337 class Parallel: 338 """This class is used to execute tasks in parallel, and is somewhat 339 less efficient than Serial, but is appropriate for parallel builds. 340 341 This class is thread safe. 342 """ 343 344 def __init__(self, taskmaster, num, stack_size): 345 """Create a new parallel job given a taskmaster. 346 347 The taskmaster's next_task() method should return the next 348 task that needs to be executed, or None if there are no more 349 tasks. The taskmaster's executed() method will be called 350 for each task when it is successfully executed, or failed() 351 will be called if the task failed to execute (i.e. execute() 352 raised an exception). 353 354 Note: calls to taskmaster are serialized, but calls to 355 execute() on distinct tasks are not serialized, because 356 that is the whole point of parallel jobs: they can execute 357 multiple tasks simultaneously. """ 358 359 self.taskmaster = taskmaster 360 self.interrupted = InterruptState() 361 self.tp = ThreadPool(num, stack_size, self.interrupted) 362 363 self.maxjobs = num 364 365 def start(self): 366 """Start the job. This will begin pulling tasks from the 367 taskmaster and executing them, and return when there are no 368 more tasks. If a task fails to execute (i.e. execute() raises 369 an exception), then the job will stop.""" 370 371 jobs = 0 372 373 while True: 374 # Start up as many available tasks as we're 375 # allowed to. 376 while jobs < self.maxjobs: 377 task = self.taskmaster.next_task() 378 if task is None: 379 break 380 381 try: 382 # prepare task for execution 383 task.prepare() 384 except: 385 task.exception_set() 386 task.failed() 387 task.postprocess() 388 else: 389 if task.needs_execute(): 390 # dispatch task 391 self.tp.put(task) 392 jobs = jobs + 1 393 else: 394 task.executed() 395 task.postprocess() 396 397 if not task and not jobs: break 398 399 # Let any/all completed tasks finish up before we go 400 # back and put the next batch of tasks on the queue. 401 while True: 402 task, ok = self.tp.get() 403 jobs = jobs - 1 404 405 if ok: 406 task.executed() 407 else: 408 if self.interrupted(): 409 try: 410 raise SCons.Errors.BuildError( 411 task.targets[0], errstr=interrupt_msg) 412 except: 413 task.exception_set() 414 415 # Let the failed() callback function arrange 416 # for the build to stop if that's appropriate. 417 task.failed() 418 419 task.postprocess() 420 421 if self.tp.resultsQueue.empty(): 422 break 423 424 self.tp.cleanup() 425 self.taskmaster.cleanup() 426 427# Local Variables: 428# tab-width:4 429# indent-tabs-mode:nil 430# End: 431# vim: set expandtab tabstop=4 shiftwidth=4: 432