1# This Source Code Form is subject to the terms of the Mozilla Public 2# License, v. 2.0. If a copy of the MPL was not distributed with this 3# file, You can obtain one at http://mozilla.org/MPL/2.0/. 4 5from __future__ import absolute_import, print_function, unicode_literals 6import logging 7import os 8import copy 9import attr 10import six 11from six import text_type, ensure_text 12 13from . import filter_tasks 14from .graph import Graph 15from .taskgraph import TaskGraph 16from .task import Task 17from .optimize import optimize_task_graph 18from .morph import morph 19from .parameters import Parameters 20from .util.python_path import find_object 21from .transforms.base import TransformSequence, TransformConfig 22from .util.verify import ( 23 verify_docs, 24 verifications, 25) 26from .util.yaml import load_yaml 27from .config import load_graph_config, GraphConfig 28 29logger = logging.getLogger(__name__) 30 31 32class KindNotFound(Exception): 33 """ 34 Raised when trying to load kind from a directory without a kind.yml. 35 """ 36 37 38@attr.s(frozen=True) 39class Kind(object): 40 41 name = attr.ib(type=text_type) 42 path = attr.ib(type=text_type) 43 config = attr.ib(type=dict) 44 graph_config = attr.ib(type=GraphConfig) 45 46 def _get_loader(self): 47 try: 48 loader = self.config["loader"] 49 except KeyError: 50 raise KeyError("{!r} does not define `loader`".format(self.path)) 51 return find_object(loader) 52 53 def load_tasks(self, parameters, loaded_tasks, write_artifacts): 54 loader = self._get_loader() 55 config = copy.deepcopy(self.config) 56 57 kind_dependencies = config.get("kind-dependencies", []) 58 kind_dependencies_tasks = { 59 task.label: task for task in loaded_tasks if task.kind in kind_dependencies 60 } 61 62 inputs = loader(self.name, self.path, config, parameters, loaded_tasks) 63 64 transforms = TransformSequence() 65 for xform_path in config["transforms"]: 66 transform = find_object(xform_path) 67 transforms.add(transform) 68 69 # perform the transformations on the loaded inputs 70 trans_config = TransformConfig( 71 self.name, 72 self.path, 73 config, 74 parameters, 75 kind_dependencies_tasks, 76 self.graph_config, 77 write_artifacts=write_artifacts, 78 ) 79 tasks = [ 80 Task( 81 self.name, 82 label=task_dict["label"], 83 description=task_dict["description"], 84 attributes=task_dict["attributes"], 85 task=task_dict["task"], 86 optimization=task_dict.get("optimization"), 87 dependencies=task_dict.get("dependencies"), 88 soft_dependencies=task_dict.get("soft-dependencies"), 89 if_dependencies=task_dict.get("if-dependencies"), 90 release_artifacts=task_dict.get("release-artifacts"), 91 ) 92 for task_dict in transforms(trans_config, inputs) 93 ] 94 return tasks 95 96 @classmethod 97 def load(cls, root_dir, graph_config, kind_name): 98 path = os.path.join(root_dir, kind_name) 99 kind_yml = os.path.join(path, "kind.yml") 100 if not os.path.exists(kind_yml): 101 raise KindNotFound(kind_yml) 102 103 logger.debug("loading kind `{}` from `{}`".format(kind_name, path)) 104 config = load_yaml(kind_yml) 105 106 return cls(kind_name, path, config, graph_config) 107 108 109class TaskGraphGenerator(object): 110 """ 111 The central controller for taskgraph. This handles all phases of graph 112 generation. The task is generated from all of the kinds defined in 113 subdirectories of the generator's root directory. 114 115 Access to the results of this generation, as well as intermediate values at 116 various phases of generation, is available via properties. This encourages 117 the provision of all generation inputs at instance construction time. 118 """ 119 120 # Task-graph generation is implemented as a Python generator that yields 121 # each "phase" of generation. This allows some mach subcommands to short- 122 # circuit generation of the entire graph by never completing the generator. 123 124 def __init__( 125 self, 126 root_dir, 127 parameters, 128 decision_task_id="DECISION-TASK", 129 write_artifacts=False, 130 ): 131 """ 132 @param root_dir: root directory, with subdirectories for each kind 133 @param paramaters: parameters for this task-graph generation, or callable 134 taking a `GraphConfig` and returning parameters 135 @type parameters: Union[Parameters, Callable[[GraphConfig], Parameters]] 136 """ 137 if root_dir is None: 138 root_dir = "taskcluster/ci" 139 self.root_dir = ensure_text(root_dir) 140 self._parameters = parameters 141 self._decision_task_id = decision_task_id 142 self._write_artifacts = write_artifacts 143 144 # start the generator 145 self._run = self._run() 146 self._run_results = {} 147 148 @property 149 def parameters(self): 150 """ 151 The properties used for this graph. 152 153 @type: Properties 154 """ 155 return self._run_until("parameters") 156 157 @property 158 def full_task_set(self): 159 """ 160 The full task set: all tasks defined by any kind (a graph without edges) 161 162 @type: TaskGraph 163 """ 164 return self._run_until("full_task_set") 165 166 @property 167 def full_task_graph(self): 168 """ 169 The full task graph: the full task set, with edges representing 170 dependencies. 171 172 @type: TaskGraph 173 """ 174 return self._run_until("full_task_graph") 175 176 @property 177 def target_task_set(self): 178 """ 179 The set of targetted tasks (a graph without edges) 180 181 @type: TaskGraph 182 """ 183 return self._run_until("target_task_set") 184 185 @property 186 def target_task_graph(self): 187 """ 188 The set of targetted tasks and all of their dependencies 189 190 @type: TaskGraph 191 """ 192 return self._run_until("target_task_graph") 193 194 @property 195 def optimized_task_graph(self): 196 """ 197 The set of targetted tasks and all of their dependencies; tasks that 198 have been optimized out are either omitted or replaced with a Task 199 instance containing only a task_id. 200 201 @type: TaskGraph 202 """ 203 return self._run_until("optimized_task_graph") 204 205 @property 206 def label_to_taskid(self): 207 """ 208 A dictionary mapping task label to assigned taskId. This property helps 209 in interpreting `optimized_task_graph`. 210 211 @type: dictionary 212 """ 213 return self._run_until("label_to_taskid") 214 215 @property 216 def morphed_task_graph(self): 217 """ 218 The optimized task graph, with any subsequent morphs applied. This graph 219 will have the same meaning as the optimized task graph, but be in a form 220 more palatable to TaskCluster. 221 222 @type: TaskGraph 223 """ 224 return self._run_until("morphed_task_graph") 225 226 @property 227 def graph_config(self): 228 """ 229 The configuration for this graph. 230 231 @type: TaskGraph 232 """ 233 return self._run_until("graph_config") 234 235 def _load_kinds(self, graph_config, target_kind=None): 236 if target_kind: 237 # docker-image is an implicit dependency that never appears in 238 # kind-dependencies. 239 queue = [target_kind, "docker-image"] 240 seen_kinds = set() 241 while queue: 242 kind_name = queue.pop() 243 if kind_name in seen_kinds: 244 continue 245 seen_kinds.add(kind_name) 246 kind = Kind.load(self.root_dir, graph_config, kind_name) 247 yield kind 248 queue.extend(kind.config.get("kind-dependencies", [])) 249 else: 250 for kind_name in os.listdir(self.root_dir): 251 try: 252 yield Kind.load(self.root_dir, graph_config, kind_name) 253 except KindNotFound: 254 continue 255 256 def _run(self): 257 logger.info("Loading graph configuration.") 258 graph_config = load_graph_config(self.root_dir) 259 260 yield ("graph_config", graph_config) 261 262 graph_config.register() 263 264 if callable(self._parameters): 265 parameters = self._parameters(graph_config) 266 else: 267 parameters = self._parameters 268 self.verify_parameters(parameters) 269 270 filters = parameters.get("filters", []) 271 # Always add legacy target tasks method until we deprecate that API. 272 if "target_tasks_method" not in filters: 273 filters.insert(0, "target_tasks_method") 274 filters = [filter_tasks.filter_task_functions[f] for f in filters] 275 276 yield ("parameters", parameters) 277 278 logger.info("Loading kinds") 279 # put the kinds into a graph and sort topologically so that kinds are loaded 280 # in post-order 281 if parameters.get("target-kind"): 282 target_kind = parameters["target-kind"] 283 logger.info( 284 "Limiting kinds to {target_kind} and dependencies".format( 285 target_kind=target_kind 286 ) 287 ) 288 kinds = { 289 kind.name: kind 290 for kind in self._load_kinds(graph_config, parameters.get("target-kind")) 291 } 292 self.verify_kinds(kinds) 293 294 edges = set() 295 for kind in six.itervalues(kinds): 296 for dep in kind.config.get("kind-dependencies", []): 297 edges.add((kind.name, dep, "kind-dependency")) 298 kind_graph = Graph(set(kinds), edges) 299 300 if parameters.get("target-kind"): 301 kind_graph = kind_graph.transitive_closure({target_kind, "docker-image"}) 302 303 logger.info("Generating full task set") 304 all_tasks = {} 305 for kind_name in kind_graph.visit_postorder(): 306 logger.debug("Loading tasks for kind {}".format(kind_name)) 307 kind = kinds[kind_name] 308 try: 309 new_tasks = kind.load_tasks( 310 parameters, 311 list(all_tasks.values()), 312 self._write_artifacts, 313 ) 314 except Exception: 315 logger.exception("Error loading tasks for kind {}:".format(kind_name)) 316 raise 317 for task in new_tasks: 318 if task.label in all_tasks: 319 raise Exception("duplicate tasks with label " + task.label) 320 all_tasks[task.label] = task 321 logger.info( 322 "Generated {} tasks for kind {}".format(len(new_tasks), kind_name) 323 ) 324 full_task_set = TaskGraph(all_tasks, Graph(set(all_tasks), set())) 325 self.verify_attributes(all_tasks) 326 self.verify_run_using() 327 yield verifications("full_task_set", full_task_set, graph_config, parameters) 328 329 logger.info("Generating full task graph") 330 edges = set() 331 for t in full_task_set: 332 for depname, dep in six.iteritems(t.dependencies): 333 edges.add((t.label, dep, depname)) 334 335 full_task_graph = TaskGraph(all_tasks, Graph(full_task_set.graph.nodes, edges)) 336 logger.info( 337 "Full task graph contains %d tasks and %d dependencies" 338 % (len(full_task_set.graph.nodes), len(edges)) 339 ) 340 yield verifications( 341 "full_task_graph", full_task_graph, graph_config, parameters 342 ) 343 344 logger.info("Generating target task set") 345 target_task_set = TaskGraph( 346 dict(all_tasks), Graph(set(all_tasks.keys()), set()) 347 ) 348 for fltr in filters: 349 old_len = len(target_task_set.graph.nodes) 350 target_tasks = set(fltr(target_task_set, parameters, graph_config)) 351 target_task_set = TaskGraph( 352 {l: all_tasks[l] for l in target_tasks}, Graph(target_tasks, set()) 353 ) 354 logger.info( 355 "Filter %s pruned %d tasks (%d remain)" 356 % (fltr.__name__, old_len - len(target_tasks), len(target_tasks)) 357 ) 358 359 yield verifications( 360 "target_task_set", target_task_set, graph_config, parameters 361 ) 362 363 logger.info("Generating target task graph") 364 # include all docker-image build tasks here, in case they are needed for a graph morph 365 docker_image_tasks = set( 366 t.label 367 for t in six.itervalues(full_task_graph.tasks) 368 if t.attributes["kind"] == "docker-image" 369 ) 370 # include all tasks with `always_target` set 371 if parameters["tasks_for"] == "hg-push": 372 always_target_tasks = set( 373 t.label 374 for t in six.itervalues(full_task_graph.tasks) 375 if t.attributes.get("always_target") 376 ) 377 else: 378 always_target_tasks = set() 379 logger.info( 380 "Adding %d tasks with `always_target` attribute" 381 % (len(always_target_tasks) - len(always_target_tasks & target_tasks)) 382 ) 383 requested_tasks = target_tasks | docker_image_tasks | always_target_tasks 384 target_graph = full_task_graph.graph.transitive_closure(requested_tasks) 385 target_task_graph = TaskGraph( 386 {l: all_tasks[l] for l in target_graph.nodes}, target_graph 387 ) 388 yield verifications( 389 "target_task_graph", target_task_graph, graph_config, parameters 390 ) 391 392 logger.info("Generating optimized task graph") 393 existing_tasks = parameters.get("existing_tasks") 394 do_not_optimize = set(parameters.get("do_not_optimize", [])) 395 if not parameters.get("optimize_target_tasks", True): 396 do_not_optimize = set(target_task_set.graph.nodes).union(do_not_optimize) 397 398 # this is used for testing experimental optimization strategies 399 strategies = os.environ.get( 400 "TASKGRAPH_OPTIMIZE_STRATEGIES", parameters.get("optimize_strategies") 401 ) 402 if strategies: 403 strategies = find_object(strategies) 404 405 optimized_task_graph, label_to_taskid = optimize_task_graph( 406 target_task_graph, 407 requested_tasks, 408 parameters, 409 do_not_optimize, 410 self._decision_task_id, 411 existing_tasks=existing_tasks, 412 strategy_override=strategies, 413 ) 414 415 yield verifications( 416 "optimized_task_graph", optimized_task_graph, graph_config, parameters 417 ) 418 419 morphed_task_graph, label_to_taskid = morph( 420 optimized_task_graph, 421 label_to_taskid, 422 parameters, 423 graph_config, 424 self._decision_task_id, 425 ) 426 427 yield "label_to_taskid", label_to_taskid 428 yield verifications( 429 "morphed_task_graph", morphed_task_graph, graph_config, parameters 430 ) 431 432 def _run_until(self, name): 433 while name not in self._run_results: 434 try: 435 k, v = next(self._run) 436 except StopIteration: 437 raise AttributeError("No such run result {}".format(name)) 438 self._run_results[k] = v 439 return self._run_results[name] 440 441 def verify_parameters(self, parameters): 442 if not parameters.strict: 443 return 444 445 parameters_dict = dict(**parameters) 446 verify_docs( 447 filename="parameters.rst", 448 identifiers=list(parameters_dict), 449 appearing_as="inline-literal", 450 ) 451 452 def verify_kinds(self, kinds): 453 verify_docs( 454 filename="kinds.rst", identifiers=kinds.keys(), appearing_as="heading" 455 ) 456 457 def verify_attributes(self, all_tasks): 458 attribute_set = set() 459 for label, task in six.iteritems(all_tasks): 460 attribute_set.update(task.attributes.keys()) 461 verify_docs( 462 filename="attributes.rst", 463 identifiers=list(attribute_set), 464 appearing_as="heading", 465 ) 466 467 def verify_run_using(self): 468 from .transforms.job import registry 469 470 verify_docs( 471 filename="transforms.rst", 472 identifiers=registry.keys(), 473 appearing_as="inline-literal", 474 ) 475 476 477def load_tasks_for_kind(parameters, kind, root_dir=None): 478 """ 479 Get all the tasks of a given kind. 480 481 This function is designed to be called from outside of taskgraph. 482 """ 483 # make parameters read-write 484 parameters = dict(parameters) 485 parameters["target-kind"] = kind 486 parameters = Parameters(strict=False, **parameters) 487 tgg = TaskGraphGenerator(root_dir=root_dir, parameters=parameters) 488 return { 489 task.task["metadata"]["name"]: task 490 for task in tgg.full_task_set 491 if task.kind == kind 492 } 493