1# This Source Code Form is subject to the terms of the Mozilla Public
2# License, v. 2.0. If a copy of the MPL was not distributed with this
3# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
5import logging
6import os
7import copy
8
9import attr
10from taskgraph.config import GraphConfig
11from taskgraph.parameters import parameters_loader
12from taskgraph.util.yaml import load_yaml
13
14from . import filter_tasks
15from .graph import Graph
16from .morph import morph
17from .optimize import optimize_task_graph
18from .task import Task
19from .taskgraph import TaskGraph
20from .transforms.base import TransformSequence, TransformConfig
21from .util.python_path import find_object
22from .util.verify import (
23    verify_docs,
24    verifications,
25)
26from .config import load_graph_config
27
28logger = logging.getLogger(__name__)
29
30
31class KindNotFound(Exception):
32    """
33    Raised when trying to load kind from a directory without a kind.yml.
34    """
35
36
37@attr.s(frozen=True)
38class Kind:
39
40    name = attr.ib(type=str)
41    path = attr.ib(type=str)
42    config = attr.ib(type=dict)
43    graph_config = attr.ib(type=GraphConfig)
44
45    def _get_loader(self):
46        try:
47            loader = self.config["loader"]
48        except KeyError:
49            raise KeyError(f"{self.path!r} does not define `loader`")
50        return find_object(loader)
51
52    def load_tasks(self, parameters, loaded_tasks, write_artifacts):
53        loader = self._get_loader()
54        config = copy.deepcopy(self.config)
55
56        kind_dependencies = config.get("kind-dependencies", [])
57        kind_dependencies_tasks = {
58            task.label: task for task in loaded_tasks if task.kind in kind_dependencies
59        }
60
61        inputs = loader(self.name, self.path, config, parameters, loaded_tasks)
62
63        transforms = TransformSequence()
64        for xform_path in config["transforms"]:
65            transform = find_object(xform_path)
66            transforms.add(transform)
67
68        # perform the transformations on the loaded inputs
69        trans_config = TransformConfig(
70            self.name,
71            self.path,
72            config,
73            parameters,
74            kind_dependencies_tasks,
75            self.graph_config,
76            write_artifacts=write_artifacts,
77        )
78        tasks = [
79            Task(
80                self.name,
81                label=task_dict["label"],
82                description=task_dict["description"],
83                attributes=task_dict["attributes"],
84                task=task_dict["task"],
85                optimization=task_dict.get("optimization"),
86                dependencies=task_dict.get("dependencies"),
87                soft_dependencies=task_dict.get("soft-dependencies"),
88                if_dependencies=task_dict.get("if-dependencies"),
89            )
90            for task_dict in transforms(trans_config, inputs)
91        ]
92        return tasks
93
94    @classmethod
95    def load(cls, root_dir, graph_config, kind_name):
96        path = os.path.join(root_dir, kind_name)
97        kind_yml = os.path.join(path, "kind.yml")
98        if not os.path.exists(kind_yml):
99            raise KindNotFound(kind_yml)
100
101        logger.debug(f"loading kind `{kind_name}` from `{path}`")
102        config = load_yaml(kind_yml)
103
104        return cls(kind_name, path, config, graph_config)
105
106
107class TaskGraphGenerator:
108    """
109    The central controller for taskgraph.  This handles all phases of graph
110    generation.  The task is generated from all of the kinds defined in
111    subdirectories of the generator's root directory.
112
113    Access to the results of this generation, as well as intermediate values at
114    various phases of generation, is available via properties.  This encourages
115    the provision of all generation inputs at instance construction time.
116    """
117
118    # Task-graph generation is implemented as a Python generator that yields
119    # each "phase" of generation.  This allows some mach subcommands to short-
120    # circuit generation of the entire graph by never completing the generator.
121
122    def __init__(
123        self,
124        root_dir,
125        parameters,
126        decision_task_id="DECISION-TASK",
127        write_artifacts=False,
128    ):
129        """
130        @param root_dir: root directory, with subdirectories for each kind
131        @param paramaters: parameters for this task-graph generation, or callable
132            taking a `GraphConfig` and returning parameters
133        @type parameters: Union[Parameters, Callable[[GraphConfig], Parameters]]
134        """
135        if root_dir is None:
136            root_dir = "taskcluster/ci"
137        self.root_dir = root_dir
138        self._parameters = parameters
139        self._decision_task_id = decision_task_id
140        self._write_artifacts = write_artifacts
141
142        # start the generator
143        self._run = self._run()
144        self._run_results = {}
145
146    @property
147    def parameters(self):
148        """
149        The properties used for this graph.
150
151        @type: Properties
152        """
153        return self._run_until("parameters")
154
155    @property
156    def full_task_set(self):
157        """
158        The full task set: all tasks defined by any kind (a graph without edges)
159
160        @type: TaskGraph
161        """
162        return self._run_until("full_task_set")
163
164    @property
165    def full_task_graph(self):
166        """
167        The full task graph: the full task set, with edges representing
168        dependencies.
169
170        @type: TaskGraph
171        """
172        return self._run_until("full_task_graph")
173
174    @property
175    def target_task_set(self):
176        """
177        The set of targetted tasks (a graph without edges)
178
179        @type: TaskGraph
180        """
181        return self._run_until("target_task_set")
182
183    @property
184    def target_task_graph(self):
185        """
186        The set of targetted tasks and all of their dependencies
187
188        @type: TaskGraph
189        """
190        return self._run_until("target_task_graph")
191
192    @property
193    def optimized_task_graph(self):
194        """
195        The set of targetted tasks and all of their dependencies; tasks that
196        have been optimized out are either omitted or replaced with a Task
197        instance containing only a task_id.
198
199        @type: TaskGraph
200        """
201        return self._run_until("optimized_task_graph")
202
203    @property
204    def label_to_taskid(self):
205        """
206        A dictionary mapping task label to assigned taskId.  This property helps
207        in interpreting `optimized_task_graph`.
208
209        @type: dictionary
210        """
211        return self._run_until("label_to_taskid")
212
213    @property
214    def morphed_task_graph(self):
215        """
216        The optimized task graph, with any subsequent morphs applied. This graph
217        will have the same meaning as the optimized task graph, but be in a form
218        more palatable to TaskCluster.
219
220        @type: TaskGraph
221        """
222        return self._run_until("morphed_task_graph")
223
224    @property
225    def graph_config(self):
226        """
227        The configuration for this graph.
228
229        @type: TaskGraph
230        """
231        return self._run_until("graph_config")
232
233    def _load_kinds(self, graph_config, target_kind=None):
234        if target_kind:
235            # docker-image is an implicit dependency that never appears in
236            # kind-dependencies.
237            queue = [target_kind, "docker-image"]
238            seen_kinds = set()
239            while queue:
240                kind_name = queue.pop()
241                if kind_name in seen_kinds:
242                    continue
243                seen_kinds.add(kind_name)
244                kind = Kind.load(self.root_dir, graph_config, kind_name)
245                yield kind
246                queue.extend(kind.config.get("kind-dependencies", []))
247        else:
248            for kind_name in os.listdir(self.root_dir):
249                try:
250                    yield Kind.load(self.root_dir, graph_config, kind_name)
251                except KindNotFound:
252                    continue
253
254    def _run(self):
255        logger.info("Loading graph configuration.")
256        graph_config = load_graph_config(self.root_dir)
257
258        yield ("graph_config", graph_config)
259
260        graph_config.register()
261
262        if callable(self._parameters):
263            parameters = self._parameters(graph_config)
264        else:
265            parameters = self._parameters
266        self.verify_parameters(parameters)
267
268        logger.info("Using {}".format(parameters))
269        logger.debug("Dumping parameters:\n{}".format(repr(parameters)))
270
271        filters = parameters.get("filters", [])
272        # Always add legacy target tasks method until we deprecate that API.
273        if "target_tasks_method" not in filters:
274            filters.insert(0, "target_tasks_method")
275        filters = [filter_tasks.filter_task_functions[f] for f in filters]
276
277        yield ("parameters", parameters)
278
279        logger.info("Loading kinds")
280        # put the kinds into a graph and sort topologically so that kinds are loaded
281        # in post-order
282        if parameters.get("target-kind"):
283            target_kind = parameters["target-kind"]
284            logger.info(
285                "Limiting kinds to {target_kind} and dependencies".format(
286                    target_kind=target_kind
287                )
288            )
289        kinds = {
290            kind.name: kind
291            for kind in self._load_kinds(graph_config, parameters.get("target-kind"))
292        }
293        self.verify_kinds(kinds)
294
295        edges = set()
296        for kind in kinds.values():
297            for dep in kind.config.get("kind-dependencies", []):
298                edges.add((kind.name, dep, "kind-dependency"))
299        kind_graph = Graph(set(kinds), edges)
300
301        if parameters.get("target-kind"):
302            kind_graph = kind_graph.transitive_closure({target_kind, "docker-image"})
303
304        logger.info("Generating full task set")
305        all_tasks = {}
306        for kind_name in kind_graph.visit_postorder():
307            logger.debug(f"Loading tasks for kind {kind_name}")
308            kind = kinds[kind_name]
309            try:
310                new_tasks = kind.load_tasks(
311                    parameters,
312                    list(all_tasks.values()),
313                    self._write_artifacts,
314                )
315            except Exception:
316                logger.exception(f"Error loading tasks for kind {kind_name}:")
317                raise
318            for task in new_tasks:
319                if task.label in all_tasks:
320                    raise Exception("duplicate tasks with label " + task.label)
321                all_tasks[task.label] = task
322            logger.info(f"Generated {len(new_tasks)} tasks for kind {kind_name}")
323        full_task_set = TaskGraph(all_tasks, Graph(set(all_tasks), set()))
324        self.verify_attributes(all_tasks)
325        self.verify_run_using()
326        yield verifications("full_task_set", full_task_set, graph_config, parameters)
327
328        logger.info("Generating full task graph")
329        edges = set()
330        for t in full_task_set:
331            for depname, dep in t.dependencies.items():
332                edges.add((t.label, dep, depname))
333
334        full_task_graph = TaskGraph(all_tasks, Graph(full_task_set.graph.nodes, edges))
335        logger.info(
336            "Full task graph contains %d tasks and %d dependencies"
337            % (len(full_task_set.graph.nodes), len(edges))
338        )
339        yield verifications(
340            "full_task_graph", full_task_graph, graph_config, parameters
341        )
342
343        logger.info("Generating target task set")
344        target_task_set = TaskGraph(
345            dict(all_tasks), Graph(set(all_tasks.keys()), set())
346        )
347        for fltr in filters:
348            old_len = len(target_task_set.graph.nodes)
349            target_tasks = set(fltr(target_task_set, parameters, graph_config))
350            target_task_set = TaskGraph(
351                {l: all_tasks[l] for l in target_tasks}, Graph(target_tasks, set())
352            )
353            logger.info(
354                "Filter %s pruned %d tasks (%d remain)"
355                % (fltr.__name__, old_len - len(target_tasks), len(target_tasks))
356            )
357
358        yield verifications(
359            "target_task_set", target_task_set, graph_config, parameters
360        )
361
362        logger.info("Generating target task graph")
363        # include all docker-image build tasks here, in case they are needed for a graph morph
364        docker_image_tasks = {
365            t.label
366            for t in full_task_graph.tasks.values()
367            if t.attributes["kind"] == "docker-image"
368        }
369        # include all tasks with `always_target` set
370        if parameters["tasks_for"] == "hg-push":
371            always_target_tasks = {
372                t.label
373                for t in full_task_graph.tasks.values()
374                if t.attributes.get("always_target")
375            }
376        else:
377            always_target_tasks = set()
378        logger.info(
379            "Adding %d tasks with `always_target` attribute"
380            % (len(always_target_tasks) - len(always_target_tasks & target_tasks))
381        )
382        requested_tasks = target_tasks | docker_image_tasks | always_target_tasks
383        target_graph = full_task_graph.graph.transitive_closure(requested_tasks)
384        target_task_graph = TaskGraph(
385            {l: all_tasks[l] for l in target_graph.nodes}, target_graph
386        )
387        yield verifications(
388            "target_task_graph", target_task_graph, graph_config, parameters
389        )
390
391        logger.info("Generating optimized task graph")
392        existing_tasks = parameters.get("existing_tasks")
393        do_not_optimize = set(parameters.get("do_not_optimize", []))
394        if not parameters.get("optimize_target_tasks", True):
395            do_not_optimize = set(target_task_set.graph.nodes).union(do_not_optimize)
396
397        # this is used for testing experimental optimization strategies
398        strategies = os.environ.get(
399            "TASKGRAPH_OPTIMIZE_STRATEGIES", parameters.get("optimize_strategies")
400        )
401        if strategies:
402            strategies = find_object(strategies)
403
404        optimized_task_graph, label_to_taskid = optimize_task_graph(
405            target_task_graph,
406            requested_tasks,
407            parameters,
408            do_not_optimize,
409            self._decision_task_id,
410            existing_tasks=existing_tasks,
411            strategy_override=strategies,
412        )
413
414        yield verifications(
415            "optimized_task_graph", optimized_task_graph, graph_config, parameters
416        )
417
418        morphed_task_graph, label_to_taskid = morph(
419            optimized_task_graph,
420            label_to_taskid,
421            parameters,
422            graph_config,
423            self._decision_task_id,
424        )
425
426        yield "label_to_taskid", label_to_taskid
427        yield verifications(
428            "morphed_task_graph", morphed_task_graph, graph_config, parameters
429        )
430
431    def _run_until(self, name):
432        while name not in self._run_results:
433            try:
434                k, v = next(self._run)
435            except StopIteration:
436                raise AttributeError(f"No such run result {name}")
437            self._run_results[k] = v
438        return self._run_results[name]
439
440    def verify_parameters(self, parameters):
441        if not parameters.strict:
442            return
443
444        parameters_dict = dict(**parameters)
445        verify_docs(
446            filename="parameters.rst",
447            identifiers=list(parameters_dict),
448            appearing_as="inline-literal",
449        )
450
451    def verify_kinds(self, kinds):
452        verify_docs(
453            filename="kinds.rst", identifiers=kinds.keys(), appearing_as="heading"
454        )
455
456    def verify_attributes(self, all_tasks):
457        attribute_set = set()
458        for label, task in all_tasks.items():
459            attribute_set.update(task.attributes.keys())
460        verify_docs(
461            filename="attributes.rst",
462            identifiers=list(attribute_set),
463            appearing_as="heading",
464        )
465
466    def verify_run_using(self):
467        from .transforms.job import registry
468
469        verify_docs(
470            filename="transforms.rst",
471            identifiers=registry.keys(),
472            appearing_as="inline-literal",
473        )
474
475
476def load_tasks_for_kind(parameters, kind, root_dir=None):
477    """
478    Get all the tasks of a given kind.
479
480    This function is designed to be called from outside of taskgraph.
481    """
482    # make parameters read-write
483    parameters = dict(parameters)
484    parameters["target-kind"] = kind
485    parameters = parameters_loader(spec=None, strict=False, overrides=parameters)
486    tgg = TaskGraphGenerator(root_dir=root_dir, parameters=parameters)
487    return {
488        task.task["metadata"]["name"]: task
489        for task in tgg.full_task_set
490        if task.kind == kind
491    }
492