1#!/usr/bin/env python 2# encoding: utf-8 3# Thomas Nagy, 2005-2018 (ita) 4 5""" 6Runner.py: Task scheduling and execution 7""" 8 9import heapq, traceback 10try: 11 from queue import Queue, PriorityQueue 12except ImportError: 13 from Queue import Queue 14 try: 15 from Queue import PriorityQueue 16 except ImportError: 17 class PriorityQueue(Queue): 18 def _init(self, maxsize): 19 self.maxsize = maxsize 20 self.queue = [] 21 def _put(self, item): 22 heapq.heappush(self.queue, item) 23 def _get(self): 24 return heapq.heappop(self.queue) 25 26from waflib import Utils, Task, Errors, Logs 27 28GAP = 5 29""" 30Wait for at least ``GAP * njobs`` before trying to enqueue more tasks to run 31""" 32 33class PriorityTasks(object): 34 def __init__(self): 35 self.lst = [] 36 def __len__(self): 37 return len(self.lst) 38 def __iter__(self): 39 return iter(self.lst) 40 def clear(self): 41 self.lst = [] 42 def append(self, task): 43 heapq.heappush(self.lst, task) 44 def appendleft(self, task): 45 "Deprecated, do not use" 46 heapq.heappush(self.lst, task) 47 def pop(self): 48 return heapq.heappop(self.lst) 49 def extend(self, lst): 50 if self.lst: 51 for x in lst: 52 self.append(x) 53 else: 54 if isinstance(lst, list): 55 self.lst = lst 56 heapq.heapify(lst) 57 else: 58 self.lst = lst.lst 59 60class Consumer(Utils.threading.Thread): 61 """ 62 Daemon thread object that executes a task. It shares a semaphore with 63 the coordinator :py:class:`waflib.Runner.Spawner`. There is one 64 instance per task to consume. 65 """ 66 def __init__(self, spawner, task): 67 Utils.threading.Thread.__init__(self) 68 self.task = task 69 """Task to execute""" 70 self.spawner = spawner 71 """Coordinator object""" 72 self.setDaemon(1) 73 self.start() 74 def run(self): 75 """ 76 Processes a single task 77 """ 78 try: 79 if not self.spawner.master.stop: 80 self.spawner.master.process_task(self.task) 81 finally: 82 self.spawner.sem.release() 83 self.spawner.master.out.put(self.task) 84 self.task = None 85 self.spawner = None 86 87class Spawner(Utils.threading.Thread): 88 """ 89 Daemon thread that consumes tasks from :py:class:`waflib.Runner.Parallel` producer and 90 spawns a consuming thread :py:class:`waflib.Runner.Consumer` for each 91 :py:class:`waflib.Task.Task` instance. 92 """ 93 def __init__(self, master): 94 Utils.threading.Thread.__init__(self) 95 self.master = master 96 """:py:class:`waflib.Runner.Parallel` producer instance""" 97 self.sem = Utils.threading.Semaphore(master.numjobs) 98 """Bounded semaphore that prevents spawning more than *n* concurrent consumers""" 99 self.setDaemon(1) 100 self.start() 101 def run(self): 102 """ 103 Spawns new consumers to execute tasks by delegating to :py:meth:`waflib.Runner.Spawner.loop` 104 """ 105 try: 106 self.loop() 107 except Exception: 108 # Python 2 prints unnecessary messages when shutting down 109 # we also want to stop the thread properly 110 pass 111 def loop(self): 112 """ 113 Consumes task objects from the producer; ends when the producer has no more 114 task to provide. 115 """ 116 master = self.master 117 while 1: 118 task = master.ready.get() 119 self.sem.acquire() 120 if not master.stop: 121 task.log_display(task.generator.bld) 122 Consumer(self, task) 123 124class Parallel(object): 125 """ 126 Schedule the tasks obtained from the build context for execution. 127 """ 128 def __init__(self, bld, j=2): 129 """ 130 The initialization requires a build context reference 131 for computing the total number of jobs. 132 """ 133 134 self.numjobs = j 135 """ 136 Amount of parallel consumers to use 137 """ 138 139 self.bld = bld 140 """ 141 Instance of :py:class:`waflib.Build.BuildContext` 142 """ 143 144 self.outstanding = PriorityTasks() 145 """Heap of :py:class:`waflib.Task.Task` that may be ready to be executed""" 146 147 self.postponed = PriorityTasks() 148 """Heap of :py:class:`waflib.Task.Task` which are not ready to run for non-DAG reasons""" 149 150 self.incomplete = set() 151 """List of :py:class:`waflib.Task.Task` waiting for dependent tasks to complete (DAG)""" 152 153 self.ready = PriorityQueue(0) 154 """List of :py:class:`waflib.Task.Task` ready to be executed by consumers""" 155 156 self.out = Queue(0) 157 """List of :py:class:`waflib.Task.Task` returned by the task consumers""" 158 159 self.count = 0 160 """Amount of tasks that may be processed by :py:class:`waflib.Runner.TaskConsumer`""" 161 162 self.processed = 0 163 """Amount of tasks processed""" 164 165 self.stop = False 166 """Error flag to stop the build""" 167 168 self.error = [] 169 """Tasks that could not be executed""" 170 171 self.biter = None 172 """Task iterator which must give groups of parallelizable tasks when calling ``next()``""" 173 174 self.dirty = False 175 """ 176 Flag that indicates that the build cache must be saved when a task was executed 177 (calls :py:meth:`waflib.Build.BuildContext.store`)""" 178 179 self.revdeps = Utils.defaultdict(set) 180 """ 181 The reverse dependency graph of dependencies obtained from Task.run_after 182 """ 183 184 self.spawner = Spawner(self) 185 """ 186 Coordinating daemon thread that spawns thread consumers 187 """ 188 189 def get_next_task(self): 190 """ 191 Obtains the next Task instance to run 192 193 :rtype: :py:class:`waflib.Task.Task` 194 """ 195 if not self.outstanding: 196 return None 197 return self.outstanding.pop() 198 199 def postpone(self, tsk): 200 """ 201 Adds the task to the list :py:attr:`waflib.Runner.Parallel.postponed`. 202 The order is scrambled so as to consume as many tasks in parallel as possible. 203 204 :param tsk: task instance 205 :type tsk: :py:class:`waflib.Task.Task` 206 """ 207 self.postponed.append(tsk) 208 209 def refill_task_list(self): 210 """ 211 Pulls a next group of tasks to execute in :py:attr:`waflib.Runner.Parallel.outstanding`. 212 Ensures that all tasks in the current build group are complete before processing the next one. 213 """ 214 while self.count > self.numjobs * GAP: 215 self.get_out() 216 217 while not self.outstanding: 218 if self.count: 219 self.get_out() 220 if self.outstanding: 221 break 222 elif self.postponed: 223 try: 224 cond = self.deadlock == self.processed 225 except AttributeError: 226 pass 227 else: 228 if cond: 229 # The most common reason is conflicting build order declaration 230 # for example: "X run_after Y" and "Y run_after X" 231 # Another can be changing "run_after" dependencies while the build is running 232 # for example: updating "tsk.run_after" in the "runnable_status" method 233 lst = [] 234 for tsk in self.postponed: 235 deps = [id(x) for x in tsk.run_after if not x.hasrun] 236 lst.append('%s\t-> %r' % (repr(tsk), deps)) 237 if not deps: 238 lst.append('\n task %r dependencies are done, check its *runnable_status*?' % id(tsk)) 239 raise Errors.WafError('Deadlock detected: check the task build order%s' % ''.join(lst)) 240 self.deadlock = self.processed 241 242 if self.postponed: 243 self.outstanding.extend(self.postponed) 244 self.postponed.clear() 245 elif not self.count: 246 if self.incomplete: 247 for x in self.incomplete: 248 for k in x.run_after: 249 if not k.hasrun: 250 break 251 else: 252 # dependency added after the build started without updating revdeps 253 self.incomplete.remove(x) 254 self.outstanding.append(x) 255 break 256 else: 257 raise Errors.WafError('Broken revdeps detected on %r' % self.incomplete) 258 else: 259 tasks = next(self.biter) 260 ready, waiting = self.prio_and_split(tasks) 261 self.outstanding.extend(ready) 262 self.incomplete.update(waiting) 263 self.total = self.bld.total() 264 break 265 266 def add_more_tasks(self, tsk): 267 """ 268 If a task provides :py:attr:`waflib.Task.Task.more_tasks`, then the tasks contained 269 in that list are added to the current build and will be processed before the next build group. 270 271 The priorities for dependent tasks are not re-calculated globally 272 273 :param tsk: task instance 274 :type tsk: :py:attr:`waflib.Task.Task` 275 """ 276 if getattr(tsk, 'more_tasks', None): 277 more = set(tsk.more_tasks) 278 groups_done = set() 279 def iteri(a, b): 280 for x in a: 281 yield x 282 for x in b: 283 yield x 284 285 # Update the dependency tree 286 # this assumes that task.run_after values were updated 287 for x in iteri(self.outstanding, self.incomplete): 288 for k in x.run_after: 289 if isinstance(k, Task.TaskGroup): 290 if k not in groups_done: 291 groups_done.add(k) 292 for j in k.prev & more: 293 self.revdeps[j].add(k) 294 elif k in more: 295 self.revdeps[k].add(x) 296 297 ready, waiting = self.prio_and_split(tsk.more_tasks) 298 self.outstanding.extend(ready) 299 self.incomplete.update(waiting) 300 self.total += len(tsk.more_tasks) 301 302 def mark_finished(self, tsk): 303 def try_unfreeze(x): 304 # DAG ancestors are likely to be in the incomplete set 305 # This assumes that the run_after contents have not changed 306 # after the build starts, else a deadlock may occur 307 if x in self.incomplete: 308 # TODO remove dependencies to free some memory? 309 # x.run_after.remove(tsk) 310 for k in x.run_after: 311 if not k.hasrun: 312 break 313 else: 314 self.incomplete.remove(x) 315 self.outstanding.append(x) 316 317 if tsk in self.revdeps: 318 for x in self.revdeps[tsk]: 319 if isinstance(x, Task.TaskGroup): 320 x.prev.remove(tsk) 321 if not x.prev: 322 for k in x.next: 323 # TODO necessary optimization? 324 k.run_after.remove(x) 325 try_unfreeze(k) 326 # TODO necessary optimization? 327 x.next = [] 328 else: 329 try_unfreeze(x) 330 del self.revdeps[tsk] 331 332 if hasattr(tsk, 'semaphore'): 333 sem = tsk.semaphore 334 sem.release(tsk) 335 while sem.waiting and not sem.is_locked(): 336 # take a frozen task, make it ready to run 337 x = sem.waiting.pop() 338 self._add_task(x) 339 340 def get_out(self): 341 """ 342 Waits for a Task that task consumers add to :py:attr:`waflib.Runner.Parallel.out` after execution. 343 Adds more Tasks if necessary through :py:attr:`waflib.Runner.Parallel.add_more_tasks`. 344 345 :rtype: :py:attr:`waflib.Task.Task` 346 """ 347 tsk = self.out.get() 348 if not self.stop: 349 self.add_more_tasks(tsk) 350 self.mark_finished(tsk) 351 352 self.count -= 1 353 self.dirty = True 354 return tsk 355 356 def add_task(self, tsk): 357 """ 358 Enqueue a Task to :py:attr:`waflib.Runner.Parallel.ready` so that consumers can run them. 359 360 :param tsk: task instance 361 :type tsk: :py:attr:`waflib.Task.Task` 362 """ 363 # TODO change in waf 2.1 364 self.ready.put(tsk) 365 366 def _add_task(self, tsk): 367 if hasattr(tsk, 'semaphore'): 368 sem = tsk.semaphore 369 try: 370 sem.acquire(tsk) 371 except IndexError: 372 sem.waiting.add(tsk) 373 return 374 375 self.count += 1 376 self.processed += 1 377 if self.numjobs == 1: 378 tsk.log_display(tsk.generator.bld) 379 try: 380 self.process_task(tsk) 381 finally: 382 self.out.put(tsk) 383 else: 384 self.add_task(tsk) 385 386 def process_task(self, tsk): 387 """ 388 Processes a task and attempts to stop the build in case of errors 389 """ 390 tsk.process() 391 if tsk.hasrun != Task.SUCCESS: 392 self.error_handler(tsk) 393 394 def skip(self, tsk): 395 """ 396 Mark a task as skipped/up-to-date 397 """ 398 tsk.hasrun = Task.SKIPPED 399 self.mark_finished(tsk) 400 401 def cancel(self, tsk): 402 """ 403 Mark a task as failed because of unsatisfiable dependencies 404 """ 405 tsk.hasrun = Task.CANCELED 406 self.mark_finished(tsk) 407 408 def error_handler(self, tsk): 409 """ 410 Called when a task cannot be executed. The flag :py:attr:`waflib.Runner.Parallel.stop` is set, 411 unless the build is executed with:: 412 413 $ waf build -k 414 415 :param tsk: task instance 416 :type tsk: :py:attr:`waflib.Task.Task` 417 """ 418 if not self.bld.keep: 419 self.stop = True 420 self.error.append(tsk) 421 422 def task_status(self, tsk): 423 """ 424 Obtains the task status to decide whether to run it immediately or not. 425 426 :return: the exit status, for example :py:attr:`waflib.Task.ASK_LATER` 427 :rtype: integer 428 """ 429 try: 430 return tsk.runnable_status() 431 except Exception: 432 self.processed += 1 433 tsk.err_msg = traceback.format_exc() 434 if not self.stop and self.bld.keep: 435 self.skip(tsk) 436 if self.bld.keep == 1: 437 # if -k stop on the first exception, if -kk try to go as far as possible 438 if Logs.verbose > 1 or not self.error: 439 self.error.append(tsk) 440 self.stop = True 441 else: 442 if Logs.verbose > 1: 443 self.error.append(tsk) 444 return Task.EXCEPTION 445 446 tsk.hasrun = Task.EXCEPTION 447 self.error_handler(tsk) 448 449 return Task.EXCEPTION 450 451 def start(self): 452 """ 453 Obtains Task instances from the BuildContext instance and adds the ones that need to be executed to 454 :py:class:`waflib.Runner.Parallel.ready` so that the :py:class:`waflib.Runner.Spawner` consumer thread 455 has them executed. Obtains the executed Tasks back from :py:class:`waflib.Runner.Parallel.out` 456 and marks the build as failed by setting the ``stop`` flag. 457 If only one job is used, then executes the tasks one by one, without consumers. 458 """ 459 self.total = self.bld.total() 460 461 while not self.stop: 462 463 self.refill_task_list() 464 465 # consider the next task 466 tsk = self.get_next_task() 467 if not tsk: 468 if self.count: 469 # tasks may add new ones after they are run 470 continue 471 else: 472 # no tasks to run, no tasks running, time to exit 473 break 474 475 if tsk.hasrun: 476 # if the task is marked as "run", just skip it 477 self.processed += 1 478 continue 479 480 if self.stop: # stop immediately after a failure is detected 481 break 482 483 st = self.task_status(tsk) 484 if st == Task.RUN_ME: 485 self._add_task(tsk) 486 elif st == Task.ASK_LATER: 487 self.postpone(tsk) 488 elif st == Task.SKIP_ME: 489 self.processed += 1 490 self.skip(tsk) 491 self.add_more_tasks(tsk) 492 elif st == Task.CANCEL_ME: 493 # A dependency problem has occurred, and the 494 # build is most likely run with `waf -k` 495 if Logs.verbose > 1: 496 self.error.append(tsk) 497 self.processed += 1 498 self.cancel(tsk) 499 500 # self.count represents the tasks that have been made available to the consumer threads 501 # collect all the tasks after an error else the message may be incomplete 502 while self.error and self.count: 503 self.get_out() 504 505 self.ready.put(None) 506 if not self.stop: 507 assert not self.count 508 assert not self.postponed 509 assert not self.incomplete 510 511 def prio_and_split(self, tasks): 512 """ 513 Label input tasks with priority values, and return a pair containing 514 the tasks that are ready to run and the tasks that are necessarily 515 waiting for other tasks to complete. 516 517 The priority system is really meant as an optional layer for optimization: 518 dependency cycles are found quickly, and builds should be more efficient. 519 A high priority number means that a task is processed first. 520 521 This method can be overridden to disable the priority system:: 522 523 def prio_and_split(self, tasks): 524 return tasks, [] 525 526 :return: A pair of task lists 527 :rtype: tuple 528 """ 529 # to disable: 530 #return tasks, [] 531 for x in tasks: 532 x.visited = 0 533 534 reverse = self.revdeps 535 536 groups_done = set() 537 for x in tasks: 538 for k in x.run_after: 539 if isinstance(k, Task.TaskGroup): 540 if k not in groups_done: 541 groups_done.add(k) 542 for j in k.prev: 543 reverse[j].add(k) 544 else: 545 reverse[k].add(x) 546 547 # the priority number is not the tree depth 548 def visit(n): 549 if isinstance(n, Task.TaskGroup): 550 return sum(visit(k) for k in n.next) 551 552 if n.visited == 0: 553 n.visited = 1 554 555 if n in reverse: 556 rev = reverse[n] 557 n.prio_order = n.tree_weight + len(rev) + sum(visit(k) for k in rev) 558 else: 559 n.prio_order = n.tree_weight 560 561 n.visited = 2 562 elif n.visited == 1: 563 raise Errors.WafError('Dependency cycle found!') 564 return n.prio_order 565 566 for x in tasks: 567 if x.visited != 0: 568 # must visit all to detect cycles 569 continue 570 try: 571 visit(x) 572 except Errors.WafError: 573 self.debug_cycles(tasks, reverse) 574 575 ready = [] 576 waiting = [] 577 for x in tasks: 578 for k in x.run_after: 579 if not k.hasrun: 580 waiting.append(x) 581 break 582 else: 583 ready.append(x) 584 return (ready, waiting) 585 586 def debug_cycles(self, tasks, reverse): 587 tmp = {} 588 for x in tasks: 589 tmp[x] = 0 590 591 def visit(n, acc): 592 if isinstance(n, Task.TaskGroup): 593 for k in n.next: 594 visit(k, acc) 595 return 596 if tmp[n] == 0: 597 tmp[n] = 1 598 for k in reverse.get(n, []): 599 visit(k, [n] + acc) 600 tmp[n] = 2 601 elif tmp[n] == 1: 602 lst = [] 603 for tsk in acc: 604 lst.append(repr(tsk)) 605 if tsk is n: 606 # exclude prior nodes, we want the minimum cycle 607 break 608 raise Errors.WafError('Task dependency cycle in "run_after" constraints: %s' % ''.join(lst)) 609 for x in tasks: 610 visit(x, []) 611 612