1#!/usr/bin/env python
2# encoding: utf-8
3# Thomas Nagy, 2005-2018 (ita)
4
5"""
6Runner.py: Task scheduling and execution
7"""
8
9import heapq, traceback
10try:
11	from queue import Queue, PriorityQueue
12except ImportError:
13	from Queue import Queue
14	try:
15		from Queue import PriorityQueue
16	except ImportError:
17		class PriorityQueue(Queue):
18			def _init(self, maxsize):
19				self.maxsize = maxsize
20				self.queue = []
21			def _put(self, item):
22				heapq.heappush(self.queue, item)
23			def _get(self):
24				return heapq.heappop(self.queue)
25
26from waflib import Utils, Task, Errors, Logs
27
28GAP = 5
29"""
30Wait for at least ``GAP * njobs`` before trying to enqueue more tasks to run
31"""
32
33class PriorityTasks(object):
34	def __init__(self):
35		self.lst = []
36	def __len__(self):
37		return len(self.lst)
38	def __iter__(self):
39		return iter(self.lst)
40	def clear(self):
41		self.lst = []
42	def append(self, task):
43		heapq.heappush(self.lst, task)
44	def appendleft(self, task):
45		"Deprecated, do not use"
46		heapq.heappush(self.lst, task)
47	def pop(self):
48		return heapq.heappop(self.lst)
49	def extend(self, lst):
50		if self.lst:
51			for x in lst:
52				self.append(x)
53		else:
54			if isinstance(lst, list):
55				self.lst = lst
56				heapq.heapify(lst)
57			else:
58				self.lst = lst.lst
59
60class Consumer(Utils.threading.Thread):
61	"""
62	Daemon thread object that executes a task. It shares a semaphore with
63	the coordinator :py:class:`waflib.Runner.Spawner`. There is one
64	instance per task to consume.
65	"""
66	def __init__(self, spawner, task):
67		Utils.threading.Thread.__init__(self)
68		self.task = task
69		"""Task to execute"""
70		self.spawner = spawner
71		"""Coordinator object"""
72		self.setDaemon(1)
73		self.start()
74	def run(self):
75		"""
76		Processes a single task
77		"""
78		try:
79			if not self.spawner.master.stop:
80				self.spawner.master.process_task(self.task)
81		finally:
82			self.spawner.sem.release()
83			self.spawner.master.out.put(self.task)
84			self.task = None
85			self.spawner = None
86
87class Spawner(Utils.threading.Thread):
88	"""
89	Daemon thread that consumes tasks from :py:class:`waflib.Runner.Parallel` producer and
90	spawns a consuming thread :py:class:`waflib.Runner.Consumer` for each
91	:py:class:`waflib.Task.Task` instance.
92	"""
93	def __init__(self, master):
94		Utils.threading.Thread.__init__(self)
95		self.master = master
96		""":py:class:`waflib.Runner.Parallel` producer instance"""
97		self.sem = Utils.threading.Semaphore(master.numjobs)
98		"""Bounded semaphore that prevents spawning more than *n* concurrent consumers"""
99		self.setDaemon(1)
100		self.start()
101	def run(self):
102		"""
103		Spawns new consumers to execute tasks by delegating to :py:meth:`waflib.Runner.Spawner.loop`
104		"""
105		try:
106			self.loop()
107		except Exception:
108			# Python 2 prints unnecessary messages when shutting down
109			# we also want to stop the thread properly
110			pass
111	def loop(self):
112		"""
113		Consumes task objects from the producer; ends when the producer has no more
114		task to provide.
115		"""
116		master = self.master
117		while 1:
118			task = master.ready.get()
119			self.sem.acquire()
120			if not master.stop:
121				task.log_display(task.generator.bld)
122			Consumer(self, task)
123
124class Parallel(object):
125	"""
126	Schedule the tasks obtained from the build context for execution.
127	"""
128	def __init__(self, bld, j=2):
129		"""
130		The initialization requires a build context reference
131		for computing the total number of jobs.
132		"""
133
134		self.numjobs = j
135		"""
136		Amount of parallel consumers to use
137		"""
138
139		self.bld = bld
140		"""
141		Instance of :py:class:`waflib.Build.BuildContext`
142		"""
143
144		self.outstanding = PriorityTasks()
145		"""Heap of :py:class:`waflib.Task.Task` that may be ready to be executed"""
146
147		self.postponed = PriorityTasks()
148		"""Heap of :py:class:`waflib.Task.Task` which are not ready to run for non-DAG reasons"""
149
150		self.incomplete = set()
151		"""List of :py:class:`waflib.Task.Task` waiting for dependent tasks to complete (DAG)"""
152
153		self.ready = PriorityQueue(0)
154		"""List of :py:class:`waflib.Task.Task` ready to be executed by consumers"""
155
156		self.out = Queue(0)
157		"""List of :py:class:`waflib.Task.Task` returned by the task consumers"""
158
159		self.count = 0
160		"""Amount of tasks that may be processed by :py:class:`waflib.Runner.TaskConsumer`"""
161
162		self.processed = 0
163		"""Amount of tasks processed"""
164
165		self.stop = False
166		"""Error flag to stop the build"""
167
168		self.error = []
169		"""Tasks that could not be executed"""
170
171		self.biter = None
172		"""Task iterator which must give groups of parallelizable tasks when calling ``next()``"""
173
174		self.dirty = False
175		"""
176		Flag that indicates that the build cache must be saved when a task was executed
177		(calls :py:meth:`waflib.Build.BuildContext.store`)"""
178
179		self.revdeps = Utils.defaultdict(set)
180		"""
181		The reverse dependency graph of dependencies obtained from Task.run_after
182		"""
183
184		self.spawner = Spawner(self)
185		"""
186		Coordinating daemon thread that spawns thread consumers
187		"""
188
189	def get_next_task(self):
190		"""
191		Obtains the next Task instance to run
192
193		:rtype: :py:class:`waflib.Task.Task`
194		"""
195		if not self.outstanding:
196			return None
197		return self.outstanding.pop()
198
199	def postpone(self, tsk):
200		"""
201		Adds the task to the list :py:attr:`waflib.Runner.Parallel.postponed`.
202		The order is scrambled so as to consume as many tasks in parallel as possible.
203
204		:param tsk: task instance
205		:type tsk: :py:class:`waflib.Task.Task`
206		"""
207		self.postponed.append(tsk)
208
209	def refill_task_list(self):
210		"""
211		Pulls a next group of tasks to execute in :py:attr:`waflib.Runner.Parallel.outstanding`.
212		Ensures that all tasks in the current build group are complete before processing the next one.
213		"""
214		while self.count > self.numjobs * GAP:
215			self.get_out()
216
217		while not self.outstanding:
218			if self.count:
219				self.get_out()
220				if self.outstanding:
221					break
222			elif self.postponed:
223				try:
224					cond = self.deadlock == self.processed
225				except AttributeError:
226					pass
227				else:
228					if cond:
229						# The most common reason is conflicting build order declaration
230						# for example: "X run_after Y" and "Y run_after X"
231						# Another can be changing "run_after" dependencies while the build is running
232						# for example: updating "tsk.run_after" in the "runnable_status" method
233						lst = []
234						for tsk in self.postponed:
235							deps = [id(x) for x in tsk.run_after if not x.hasrun]
236							lst.append('%s\t-> %r' % (repr(tsk), deps))
237							if not deps:
238								lst.append('\n  task %r dependencies are done, check its *runnable_status*?' % id(tsk))
239						raise Errors.WafError('Deadlock detected: check the task build order%s' % ''.join(lst))
240				self.deadlock = self.processed
241
242			if self.postponed:
243				self.outstanding.extend(self.postponed)
244				self.postponed.clear()
245			elif not self.count:
246				if self.incomplete:
247					for x in self.incomplete:
248						for k in x.run_after:
249							if not k.hasrun:
250								break
251						else:
252							# dependency added after the build started without updating revdeps
253							self.incomplete.remove(x)
254							self.outstanding.append(x)
255							break
256					else:
257						raise Errors.WafError('Broken revdeps detected on %r' % self.incomplete)
258				else:
259					tasks = next(self.biter)
260					ready, waiting = self.prio_and_split(tasks)
261					self.outstanding.extend(ready)
262					self.incomplete.update(waiting)
263					self.total = self.bld.total()
264					break
265
266	def add_more_tasks(self, tsk):
267		"""
268		If a task provides :py:attr:`waflib.Task.Task.more_tasks`, then the tasks contained
269		in that list are added to the current build and will be processed before the next build group.
270
271		The priorities for dependent tasks are not re-calculated globally
272
273		:param tsk: task instance
274		:type tsk: :py:attr:`waflib.Task.Task`
275		"""
276		if getattr(tsk, 'more_tasks', None):
277			more = set(tsk.more_tasks)
278			groups_done = set()
279			def iteri(a, b):
280				for x in a:
281					yield x
282				for x in b:
283					yield x
284
285			# Update the dependency tree
286			# this assumes that task.run_after values were updated
287			for x in iteri(self.outstanding, self.incomplete):
288				for k in x.run_after:
289					if isinstance(k, Task.TaskGroup):
290						if k not in groups_done:
291							groups_done.add(k)
292							for j in k.prev & more:
293								self.revdeps[j].add(k)
294					elif k in more:
295						self.revdeps[k].add(x)
296
297			ready, waiting = self.prio_and_split(tsk.more_tasks)
298			self.outstanding.extend(ready)
299			self.incomplete.update(waiting)
300			self.total += len(tsk.more_tasks)
301
302	def mark_finished(self, tsk):
303		def try_unfreeze(x):
304			# DAG ancestors are likely to be in the incomplete set
305			# This assumes that the run_after contents have not changed
306			# after the build starts, else a deadlock may occur
307			if x in self.incomplete:
308				# TODO remove dependencies to free some memory?
309				# x.run_after.remove(tsk)
310				for k in x.run_after:
311					if not k.hasrun:
312						break
313				else:
314					self.incomplete.remove(x)
315					self.outstanding.append(x)
316
317		if tsk in self.revdeps:
318			for x in self.revdeps[tsk]:
319				if isinstance(x, Task.TaskGroup):
320					x.prev.remove(tsk)
321					if not x.prev:
322						for k in x.next:
323							# TODO necessary optimization?
324							k.run_after.remove(x)
325							try_unfreeze(k)
326						# TODO necessary optimization?
327						x.next = []
328				else:
329					try_unfreeze(x)
330			del self.revdeps[tsk]
331
332		if hasattr(tsk, 'semaphore'):
333			sem = tsk.semaphore
334			sem.release(tsk)
335			while sem.waiting and not sem.is_locked():
336				# take a frozen task, make it ready to run
337				x = sem.waiting.pop()
338				self._add_task(x)
339
340	def get_out(self):
341		"""
342		Waits for a Task that task consumers add to :py:attr:`waflib.Runner.Parallel.out` after execution.
343		Adds more Tasks if necessary through :py:attr:`waflib.Runner.Parallel.add_more_tasks`.
344
345		:rtype: :py:attr:`waflib.Task.Task`
346		"""
347		tsk = self.out.get()
348		if not self.stop:
349			self.add_more_tasks(tsk)
350		self.mark_finished(tsk)
351
352		self.count -= 1
353		self.dirty = True
354		return tsk
355
356	def add_task(self, tsk):
357		"""
358		Enqueue a Task to :py:attr:`waflib.Runner.Parallel.ready` so that consumers can run them.
359
360		:param tsk: task instance
361		:type tsk: :py:attr:`waflib.Task.Task`
362		"""
363		# TODO change in waf 2.1
364		self.ready.put(tsk)
365
366	def _add_task(self, tsk):
367		if hasattr(tsk, 'semaphore'):
368			sem = tsk.semaphore
369			try:
370				sem.acquire(tsk)
371			except IndexError:
372				sem.waiting.add(tsk)
373				return
374
375		self.count += 1
376		self.processed += 1
377		if self.numjobs == 1:
378			tsk.log_display(tsk.generator.bld)
379			try:
380				self.process_task(tsk)
381			finally:
382				self.out.put(tsk)
383		else:
384			self.add_task(tsk)
385
386	def process_task(self, tsk):
387		"""
388		Processes a task and attempts to stop the build in case of errors
389		"""
390		tsk.process()
391		if tsk.hasrun != Task.SUCCESS:
392			self.error_handler(tsk)
393
394	def skip(self, tsk):
395		"""
396		Mark a task as skipped/up-to-date
397		"""
398		tsk.hasrun = Task.SKIPPED
399		self.mark_finished(tsk)
400
401	def cancel(self, tsk):
402		"""
403		Mark a task as failed because of unsatisfiable dependencies
404		"""
405		tsk.hasrun = Task.CANCELED
406		self.mark_finished(tsk)
407
408	def error_handler(self, tsk):
409		"""
410		Called when a task cannot be executed. The flag :py:attr:`waflib.Runner.Parallel.stop` is set,
411		unless the build is executed with::
412
413			$ waf build -k
414
415		:param tsk: task instance
416		:type tsk: :py:attr:`waflib.Task.Task`
417		"""
418		if not self.bld.keep:
419			self.stop = True
420		self.error.append(tsk)
421
422	def task_status(self, tsk):
423		"""
424		Obtains the task status to decide whether to run it immediately or not.
425
426		:return: the exit status, for example :py:attr:`waflib.Task.ASK_LATER`
427		:rtype: integer
428		"""
429		try:
430			return tsk.runnable_status()
431		except Exception:
432			self.processed += 1
433			tsk.err_msg = traceback.format_exc()
434			if not self.stop and self.bld.keep:
435				self.skip(tsk)
436				if self.bld.keep == 1:
437					# if -k stop on the first exception, if -kk try to go as far as possible
438					if Logs.verbose > 1 or not self.error:
439						self.error.append(tsk)
440					self.stop = True
441				else:
442					if Logs.verbose > 1:
443						self.error.append(tsk)
444				return Task.EXCEPTION
445
446			tsk.hasrun = Task.EXCEPTION
447			self.error_handler(tsk)
448
449			return Task.EXCEPTION
450
451	def start(self):
452		"""
453		Obtains Task instances from the BuildContext instance and adds the ones that need to be executed to
454		:py:class:`waflib.Runner.Parallel.ready` so that the :py:class:`waflib.Runner.Spawner` consumer thread
455		has them executed. Obtains the executed Tasks back from :py:class:`waflib.Runner.Parallel.out`
456		and marks the build as failed by setting the ``stop`` flag.
457		If only one job is used, then executes the tasks one by one, without consumers.
458		"""
459		self.total = self.bld.total()
460
461		while not self.stop:
462
463			self.refill_task_list()
464
465			# consider the next task
466			tsk = self.get_next_task()
467			if not tsk:
468				if self.count:
469					# tasks may add new ones after they are run
470					continue
471				else:
472					# no tasks to run, no tasks running, time to exit
473					break
474
475			if tsk.hasrun:
476				# if the task is marked as "run", just skip it
477				self.processed += 1
478				continue
479
480			if self.stop: # stop immediately after a failure is detected
481				break
482
483			st = self.task_status(tsk)
484			if st == Task.RUN_ME:
485				self._add_task(tsk)
486			elif st == Task.ASK_LATER:
487				self.postpone(tsk)
488			elif st == Task.SKIP_ME:
489				self.processed += 1
490				self.skip(tsk)
491				self.add_more_tasks(tsk)
492			elif st == Task.CANCEL_ME:
493				# A dependency problem has occurred, and the
494				# build is most likely run with `waf -k`
495				if Logs.verbose > 1:
496					self.error.append(tsk)
497				self.processed += 1
498				self.cancel(tsk)
499
500		# self.count represents the tasks that have been made available to the consumer threads
501		# collect all the tasks after an error else the message may be incomplete
502		while self.error and self.count:
503			self.get_out()
504
505		self.ready.put(None)
506		if not self.stop:
507			assert not self.count
508			assert not self.postponed
509			assert not self.incomplete
510
511	def prio_and_split(self, tasks):
512		"""
513		Label input tasks with priority values, and return a pair containing
514		the tasks that are ready to run and the tasks that are necessarily
515		waiting for other tasks to complete.
516
517		The priority system is really meant as an optional layer for optimization:
518		dependency cycles are found quickly, and builds should be more efficient.
519		A high priority number means that a task is processed first.
520
521		This method can be overridden to disable the priority system::
522
523			def prio_and_split(self, tasks):
524				return tasks, []
525
526		:return: A pair of task lists
527		:rtype: tuple
528		"""
529		# to disable:
530		#return tasks, []
531		for x in tasks:
532			x.visited = 0
533
534		reverse = self.revdeps
535
536		groups_done = set()
537		for x in tasks:
538			for k in x.run_after:
539				if isinstance(k, Task.TaskGroup):
540					if k not in groups_done:
541						groups_done.add(k)
542						for j in k.prev:
543							reverse[j].add(k)
544				else:
545					reverse[k].add(x)
546
547		# the priority number is not the tree depth
548		def visit(n):
549			if isinstance(n, Task.TaskGroup):
550				return sum(visit(k) for k in n.next)
551
552			if n.visited == 0:
553				n.visited = 1
554
555				if n in reverse:
556					rev = reverse[n]
557					n.prio_order = n.tree_weight + len(rev) + sum(visit(k) for k in rev)
558				else:
559					n.prio_order = n.tree_weight
560
561				n.visited = 2
562			elif n.visited == 1:
563				raise Errors.WafError('Dependency cycle found!')
564			return n.prio_order
565
566		for x in tasks:
567			if x.visited != 0:
568				# must visit all to detect cycles
569				continue
570			try:
571				visit(x)
572			except Errors.WafError:
573				self.debug_cycles(tasks, reverse)
574
575		ready = []
576		waiting = []
577		for x in tasks:
578			for k in x.run_after:
579				if not k.hasrun:
580					waiting.append(x)
581					break
582			else:
583				ready.append(x)
584		return (ready, waiting)
585
586	def debug_cycles(self, tasks, reverse):
587		tmp = {}
588		for x in tasks:
589			tmp[x] = 0
590
591		def visit(n, acc):
592			if isinstance(n, Task.TaskGroup):
593				for k in n.next:
594					visit(k, acc)
595				return
596			if tmp[n] == 0:
597				tmp[n] = 1
598				for k in reverse.get(n, []):
599					visit(k, [n] + acc)
600				tmp[n] = 2
601			elif tmp[n] == 1:
602				lst = []
603				for tsk in acc:
604					lst.append(repr(tsk))
605					if tsk is n:
606						# exclude prior nodes, we want the minimum cycle
607						break
608				raise Errors.WafError('Task dependency cycle in "run_after" constraints: %s' % ''.join(lst))
609		for x in tasks:
610			visit(x, [])
611
612