1# This Source Code Form is subject to the terms of the Mozilla Public 2# License, v. 2.0. If a copy of the MPL was not distributed with this 3# file, You can obtain one at http://mozilla.org/MPL/2.0/. 4 5import logging 6import os 7import copy 8 9import attr 10from taskgraph.config import GraphConfig 11from taskgraph.parameters import parameters_loader 12from taskgraph.util.yaml import load_yaml 13 14from . import filter_tasks 15from .graph import Graph 16from .morph import morph 17from .optimize import optimize_task_graph 18from .task import Task 19from .taskgraph import TaskGraph 20from .transforms.base import TransformSequence, TransformConfig 21from .util.python_path import find_object 22from .util.verify import ( 23 verify_docs, 24 verifications, 25) 26from .config import load_graph_config 27 28logger = logging.getLogger(__name__) 29 30 31class KindNotFound(Exception): 32 """ 33 Raised when trying to load kind from a directory without a kind.yml. 34 """ 35 36 37@attr.s(frozen=True) 38class Kind: 39 40 name = attr.ib(type=str) 41 path = attr.ib(type=str) 42 config = attr.ib(type=dict) 43 graph_config = attr.ib(type=GraphConfig) 44 45 def _get_loader(self): 46 try: 47 loader = self.config["loader"] 48 except KeyError: 49 raise KeyError(f"{self.path!r} does not define `loader`") 50 return find_object(loader) 51 52 def load_tasks(self, parameters, loaded_tasks, write_artifacts): 53 loader = self._get_loader() 54 config = copy.deepcopy(self.config) 55 56 kind_dependencies = config.get("kind-dependencies", []) 57 kind_dependencies_tasks = { 58 task.label: task for task in loaded_tasks if task.kind in kind_dependencies 59 } 60 61 inputs = loader(self.name, self.path, config, parameters, loaded_tasks) 62 63 transforms = TransformSequence() 64 for xform_path in config["transforms"]: 65 transform = find_object(xform_path) 66 transforms.add(transform) 67 68 # perform the transformations on the loaded inputs 69 trans_config = TransformConfig( 70 self.name, 71 self.path, 72 config, 73 parameters, 74 kind_dependencies_tasks, 75 self.graph_config, 76 write_artifacts=write_artifacts, 77 ) 78 tasks = [ 79 Task( 80 self.name, 81 label=task_dict["label"], 82 description=task_dict["description"], 83 attributes=task_dict["attributes"], 84 task=task_dict["task"], 85 optimization=task_dict.get("optimization"), 86 dependencies=task_dict.get("dependencies"), 87 soft_dependencies=task_dict.get("soft-dependencies"), 88 if_dependencies=task_dict.get("if-dependencies"), 89 ) 90 for task_dict in transforms(trans_config, inputs) 91 ] 92 return tasks 93 94 @classmethod 95 def load(cls, root_dir, graph_config, kind_name): 96 path = os.path.join(root_dir, kind_name) 97 kind_yml = os.path.join(path, "kind.yml") 98 if not os.path.exists(kind_yml): 99 raise KindNotFound(kind_yml) 100 101 logger.debug(f"loading kind `{kind_name}` from `{path}`") 102 config = load_yaml(kind_yml) 103 104 return cls(kind_name, path, config, graph_config) 105 106 107class TaskGraphGenerator: 108 """ 109 The central controller for taskgraph. This handles all phases of graph 110 generation. The task is generated from all of the kinds defined in 111 subdirectories of the generator's root directory. 112 113 Access to the results of this generation, as well as intermediate values at 114 various phases of generation, is available via properties. This encourages 115 the provision of all generation inputs at instance construction time. 116 """ 117 118 # Task-graph generation is implemented as a Python generator that yields 119 # each "phase" of generation. This allows some mach subcommands to short- 120 # circuit generation of the entire graph by never completing the generator. 121 122 def __init__( 123 self, 124 root_dir, 125 parameters, 126 decision_task_id="DECISION-TASK", 127 write_artifacts=False, 128 ): 129 """ 130 @param root_dir: root directory, with subdirectories for each kind 131 @param paramaters: parameters for this task-graph generation, or callable 132 taking a `GraphConfig` and returning parameters 133 @type parameters: Union[Parameters, Callable[[GraphConfig], Parameters]] 134 """ 135 if root_dir is None: 136 root_dir = "taskcluster/ci" 137 self.root_dir = root_dir 138 self._parameters = parameters 139 self._decision_task_id = decision_task_id 140 self._write_artifacts = write_artifacts 141 142 # start the generator 143 self._run = self._run() 144 self._run_results = {} 145 146 @property 147 def parameters(self): 148 """ 149 The properties used for this graph. 150 151 @type: Properties 152 """ 153 return self._run_until("parameters") 154 155 @property 156 def full_task_set(self): 157 """ 158 The full task set: all tasks defined by any kind (a graph without edges) 159 160 @type: TaskGraph 161 """ 162 return self._run_until("full_task_set") 163 164 @property 165 def full_task_graph(self): 166 """ 167 The full task graph: the full task set, with edges representing 168 dependencies. 169 170 @type: TaskGraph 171 """ 172 return self._run_until("full_task_graph") 173 174 @property 175 def target_task_set(self): 176 """ 177 The set of targetted tasks (a graph without edges) 178 179 @type: TaskGraph 180 """ 181 return self._run_until("target_task_set") 182 183 @property 184 def target_task_graph(self): 185 """ 186 The set of targetted tasks and all of their dependencies 187 188 @type: TaskGraph 189 """ 190 return self._run_until("target_task_graph") 191 192 @property 193 def optimized_task_graph(self): 194 """ 195 The set of targetted tasks and all of their dependencies; tasks that 196 have been optimized out are either omitted or replaced with a Task 197 instance containing only a task_id. 198 199 @type: TaskGraph 200 """ 201 return self._run_until("optimized_task_graph") 202 203 @property 204 def label_to_taskid(self): 205 """ 206 A dictionary mapping task label to assigned taskId. This property helps 207 in interpreting `optimized_task_graph`. 208 209 @type: dictionary 210 """ 211 return self._run_until("label_to_taskid") 212 213 @property 214 def morphed_task_graph(self): 215 """ 216 The optimized task graph, with any subsequent morphs applied. This graph 217 will have the same meaning as the optimized task graph, but be in a form 218 more palatable to TaskCluster. 219 220 @type: TaskGraph 221 """ 222 return self._run_until("morphed_task_graph") 223 224 @property 225 def graph_config(self): 226 """ 227 The configuration for this graph. 228 229 @type: TaskGraph 230 """ 231 return self._run_until("graph_config") 232 233 def _load_kinds(self, graph_config, target_kind=None): 234 if target_kind: 235 # docker-image is an implicit dependency that never appears in 236 # kind-dependencies. 237 queue = [target_kind, "docker-image"] 238 seen_kinds = set() 239 while queue: 240 kind_name = queue.pop() 241 if kind_name in seen_kinds: 242 continue 243 seen_kinds.add(kind_name) 244 kind = Kind.load(self.root_dir, graph_config, kind_name) 245 yield kind 246 queue.extend(kind.config.get("kind-dependencies", [])) 247 else: 248 for kind_name in os.listdir(self.root_dir): 249 try: 250 yield Kind.load(self.root_dir, graph_config, kind_name) 251 except KindNotFound: 252 continue 253 254 def _run(self): 255 logger.info("Loading graph configuration.") 256 graph_config = load_graph_config(self.root_dir) 257 258 yield ("graph_config", graph_config) 259 260 graph_config.register() 261 262 if callable(self._parameters): 263 parameters = self._parameters(graph_config) 264 else: 265 parameters = self._parameters 266 self.verify_parameters(parameters) 267 268 logger.info("Using {}".format(parameters)) 269 logger.debug("Dumping parameters:\n{}".format(repr(parameters))) 270 271 filters = parameters.get("filters", []) 272 # Always add legacy target tasks method until we deprecate that API. 273 if "target_tasks_method" not in filters: 274 filters.insert(0, "target_tasks_method") 275 filters = [filter_tasks.filter_task_functions[f] for f in filters] 276 277 yield ("parameters", parameters) 278 279 logger.info("Loading kinds") 280 # put the kinds into a graph and sort topologically so that kinds are loaded 281 # in post-order 282 if parameters.get("target-kind"): 283 target_kind = parameters["target-kind"] 284 logger.info( 285 "Limiting kinds to {target_kind} and dependencies".format( 286 target_kind=target_kind 287 ) 288 ) 289 kinds = { 290 kind.name: kind 291 for kind in self._load_kinds(graph_config, parameters.get("target-kind")) 292 } 293 self.verify_kinds(kinds) 294 295 edges = set() 296 for kind in kinds.values(): 297 for dep in kind.config.get("kind-dependencies", []): 298 edges.add((kind.name, dep, "kind-dependency")) 299 kind_graph = Graph(set(kinds), edges) 300 301 if parameters.get("target-kind"): 302 kind_graph = kind_graph.transitive_closure({target_kind, "docker-image"}) 303 304 logger.info("Generating full task set") 305 all_tasks = {} 306 for kind_name in kind_graph.visit_postorder(): 307 logger.debug(f"Loading tasks for kind {kind_name}") 308 kind = kinds[kind_name] 309 try: 310 new_tasks = kind.load_tasks( 311 parameters, 312 list(all_tasks.values()), 313 self._write_artifacts, 314 ) 315 except Exception: 316 logger.exception(f"Error loading tasks for kind {kind_name}:") 317 raise 318 for task in new_tasks: 319 if task.label in all_tasks: 320 raise Exception("duplicate tasks with label " + task.label) 321 all_tasks[task.label] = task 322 logger.info(f"Generated {len(new_tasks)} tasks for kind {kind_name}") 323 full_task_set = TaskGraph(all_tasks, Graph(set(all_tasks), set())) 324 self.verify_attributes(all_tasks) 325 self.verify_run_using() 326 yield verifications("full_task_set", full_task_set, graph_config, parameters) 327 328 logger.info("Generating full task graph") 329 edges = set() 330 for t in full_task_set: 331 for depname, dep in t.dependencies.items(): 332 edges.add((t.label, dep, depname)) 333 334 full_task_graph = TaskGraph(all_tasks, Graph(full_task_set.graph.nodes, edges)) 335 logger.info( 336 "Full task graph contains %d tasks and %d dependencies" 337 % (len(full_task_set.graph.nodes), len(edges)) 338 ) 339 yield verifications( 340 "full_task_graph", full_task_graph, graph_config, parameters 341 ) 342 343 logger.info("Generating target task set") 344 target_task_set = TaskGraph( 345 dict(all_tasks), Graph(set(all_tasks.keys()), set()) 346 ) 347 for fltr in filters: 348 old_len = len(target_task_set.graph.nodes) 349 target_tasks = set(fltr(target_task_set, parameters, graph_config)) 350 target_task_set = TaskGraph( 351 {l: all_tasks[l] for l in target_tasks}, Graph(target_tasks, set()) 352 ) 353 logger.info( 354 "Filter %s pruned %d tasks (%d remain)" 355 % (fltr.__name__, old_len - len(target_tasks), len(target_tasks)) 356 ) 357 358 yield verifications( 359 "target_task_set", target_task_set, graph_config, parameters 360 ) 361 362 logger.info("Generating target task graph") 363 # include all docker-image build tasks here, in case they are needed for a graph morph 364 docker_image_tasks = { 365 t.label 366 for t in full_task_graph.tasks.values() 367 if t.attributes["kind"] == "docker-image" 368 } 369 # include all tasks with `always_target` set 370 if parameters["tasks_for"] == "hg-push": 371 always_target_tasks = { 372 t.label 373 for t in full_task_graph.tasks.values() 374 if t.attributes.get("always_target") 375 } 376 else: 377 always_target_tasks = set() 378 logger.info( 379 "Adding %d tasks with `always_target` attribute" 380 % (len(always_target_tasks) - len(always_target_tasks & target_tasks)) 381 ) 382 requested_tasks = target_tasks | docker_image_tasks | always_target_tasks 383 target_graph = full_task_graph.graph.transitive_closure(requested_tasks) 384 target_task_graph = TaskGraph( 385 {l: all_tasks[l] for l in target_graph.nodes}, target_graph 386 ) 387 yield verifications( 388 "target_task_graph", target_task_graph, graph_config, parameters 389 ) 390 391 logger.info("Generating optimized task graph") 392 existing_tasks = parameters.get("existing_tasks") 393 do_not_optimize = set(parameters.get("do_not_optimize", [])) 394 if not parameters.get("optimize_target_tasks", True): 395 do_not_optimize = set(target_task_set.graph.nodes).union(do_not_optimize) 396 397 # this is used for testing experimental optimization strategies 398 strategies = os.environ.get( 399 "TASKGRAPH_OPTIMIZE_STRATEGIES", parameters.get("optimize_strategies") 400 ) 401 if strategies: 402 strategies = find_object(strategies) 403 404 optimized_task_graph, label_to_taskid = optimize_task_graph( 405 target_task_graph, 406 requested_tasks, 407 parameters, 408 do_not_optimize, 409 self._decision_task_id, 410 existing_tasks=existing_tasks, 411 strategy_override=strategies, 412 ) 413 414 yield verifications( 415 "optimized_task_graph", optimized_task_graph, graph_config, parameters 416 ) 417 418 morphed_task_graph, label_to_taskid = morph( 419 optimized_task_graph, 420 label_to_taskid, 421 parameters, 422 graph_config, 423 self._decision_task_id, 424 ) 425 426 yield "label_to_taskid", label_to_taskid 427 yield verifications( 428 "morphed_task_graph", morphed_task_graph, graph_config, parameters 429 ) 430 431 def _run_until(self, name): 432 while name not in self._run_results: 433 try: 434 k, v = next(self._run) 435 except StopIteration: 436 raise AttributeError(f"No such run result {name}") 437 self._run_results[k] = v 438 return self._run_results[name] 439 440 def verify_parameters(self, parameters): 441 if not parameters.strict: 442 return 443 444 parameters_dict = dict(**parameters) 445 verify_docs( 446 filename="parameters.rst", 447 identifiers=list(parameters_dict), 448 appearing_as="inline-literal", 449 ) 450 451 def verify_kinds(self, kinds): 452 verify_docs( 453 filename="kinds.rst", identifiers=kinds.keys(), appearing_as="heading" 454 ) 455 456 def verify_attributes(self, all_tasks): 457 attribute_set = set() 458 for label, task in all_tasks.items(): 459 attribute_set.update(task.attributes.keys()) 460 verify_docs( 461 filename="attributes.rst", 462 identifiers=list(attribute_set), 463 appearing_as="heading", 464 ) 465 466 def verify_run_using(self): 467 from .transforms.job import registry 468 469 verify_docs( 470 filename="transforms.rst", 471 identifiers=registry.keys(), 472 appearing_as="inline-literal", 473 ) 474 475 476def load_tasks_for_kind(parameters, kind, root_dir=None): 477 """ 478 Get all the tasks of a given kind. 479 480 This function is designed to be called from outside of taskgraph. 481 """ 482 # make parameters read-write 483 parameters = dict(parameters) 484 parameters["target-kind"] = kind 485 parameters = parameters_loader(spec=None, strict=False, overrides=parameters) 486 tgg = TaskGraphGenerator(root_dir=root_dir, parameters=parameters) 487 return { 488 task.task["metadata"]["name"]: task 489 for task in tgg.full_task_set 490 if task.kind == kind 491 } 492