1# This Source Code Form is subject to the terms of the Mozilla Public
2# License, v. 2.0. If a copy of the MPL was not distributed with this
3# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
5from __future__ import absolute_import, print_function, unicode_literals
6import logging
7import os
8import copy
9import attr
10import six
11from six import text_type, ensure_text
12
13from . import filter_tasks
14from .graph import Graph
15from .taskgraph import TaskGraph
16from .task import Task
17from .optimize import optimize_task_graph
18from .morph import morph
19from .parameters import Parameters
20from .util.python_path import find_object
21from .transforms.base import TransformSequence, TransformConfig
22from .util.verify import (
23    verify_docs,
24    verifications,
25)
26from .util.yaml import load_yaml
27from .config import load_graph_config, GraphConfig
28
29logger = logging.getLogger(__name__)
30
31
32class KindNotFound(Exception):
33    """
34    Raised when trying to load kind from a directory without a kind.yml.
35    """
36
37
38@attr.s(frozen=True)
39class Kind(object):
40
41    name = attr.ib(type=text_type)
42    path = attr.ib(type=text_type)
43    config = attr.ib(type=dict)
44    graph_config = attr.ib(type=GraphConfig)
45
46    def _get_loader(self):
47        try:
48            loader = self.config["loader"]
49        except KeyError:
50            raise KeyError("{!r} does not define `loader`".format(self.path))
51        return find_object(loader)
52
53    def load_tasks(self, parameters, loaded_tasks, write_artifacts):
54        loader = self._get_loader()
55        config = copy.deepcopy(self.config)
56
57        kind_dependencies = config.get("kind-dependencies", [])
58        kind_dependencies_tasks = {
59            task.label: task for task in loaded_tasks if task.kind in kind_dependencies
60        }
61
62        inputs = loader(self.name, self.path, config, parameters, loaded_tasks)
63
64        transforms = TransformSequence()
65        for xform_path in config["transforms"]:
66            transform = find_object(xform_path)
67            transforms.add(transform)
68
69        # perform the transformations on the loaded inputs
70        trans_config = TransformConfig(
71            self.name,
72            self.path,
73            config,
74            parameters,
75            kind_dependencies_tasks,
76            self.graph_config,
77            write_artifacts=write_artifacts,
78        )
79        tasks = [
80            Task(
81                self.name,
82                label=task_dict["label"],
83                description=task_dict["description"],
84                attributes=task_dict["attributes"],
85                task=task_dict["task"],
86                optimization=task_dict.get("optimization"),
87                dependencies=task_dict.get("dependencies"),
88                soft_dependencies=task_dict.get("soft-dependencies"),
89                if_dependencies=task_dict.get("if-dependencies"),
90                release_artifacts=task_dict.get("release-artifacts"),
91            )
92            for task_dict in transforms(trans_config, inputs)
93        ]
94        return tasks
95
96    @classmethod
97    def load(cls, root_dir, graph_config, kind_name):
98        path = os.path.join(root_dir, kind_name)
99        kind_yml = os.path.join(path, "kind.yml")
100        if not os.path.exists(kind_yml):
101            raise KindNotFound(kind_yml)
102
103        logger.debug("loading kind `{}` from `{}`".format(kind_name, path))
104        config = load_yaml(kind_yml)
105
106        return cls(kind_name, path, config, graph_config)
107
108
109class TaskGraphGenerator(object):
110    """
111    The central controller for taskgraph.  This handles all phases of graph
112    generation.  The task is generated from all of the kinds defined in
113    subdirectories of the generator's root directory.
114
115    Access to the results of this generation, as well as intermediate values at
116    various phases of generation, is available via properties.  This encourages
117    the provision of all generation inputs at instance construction time.
118    """
119
120    # Task-graph generation is implemented as a Python generator that yields
121    # each "phase" of generation.  This allows some mach subcommands to short-
122    # circuit generation of the entire graph by never completing the generator.
123
124    def __init__(
125        self,
126        root_dir,
127        parameters,
128        decision_task_id="DECISION-TASK",
129        write_artifacts=False,
130    ):
131        """
132        @param root_dir: root directory, with subdirectories for each kind
133        @param paramaters: parameters for this task-graph generation, or callable
134            taking a `GraphConfig` and returning parameters
135        @type parameters: Union[Parameters, Callable[[GraphConfig], Parameters]]
136        """
137        if root_dir is None:
138            root_dir = "taskcluster/ci"
139        self.root_dir = ensure_text(root_dir)
140        self._parameters = parameters
141        self._decision_task_id = decision_task_id
142        self._write_artifacts = write_artifacts
143
144        # start the generator
145        self._run = self._run()
146        self._run_results = {}
147
148    @property
149    def parameters(self):
150        """
151        The properties used for this graph.
152
153        @type: Properties
154        """
155        return self._run_until("parameters")
156
157    @property
158    def full_task_set(self):
159        """
160        The full task set: all tasks defined by any kind (a graph without edges)
161
162        @type: TaskGraph
163        """
164        return self._run_until("full_task_set")
165
166    @property
167    def full_task_graph(self):
168        """
169        The full task graph: the full task set, with edges representing
170        dependencies.
171
172        @type: TaskGraph
173        """
174        return self._run_until("full_task_graph")
175
176    @property
177    def target_task_set(self):
178        """
179        The set of targetted tasks (a graph without edges)
180
181        @type: TaskGraph
182        """
183        return self._run_until("target_task_set")
184
185    @property
186    def target_task_graph(self):
187        """
188        The set of targetted tasks and all of their dependencies
189
190        @type: TaskGraph
191        """
192        return self._run_until("target_task_graph")
193
194    @property
195    def optimized_task_graph(self):
196        """
197        The set of targetted tasks and all of their dependencies; tasks that
198        have been optimized out are either omitted or replaced with a Task
199        instance containing only a task_id.
200
201        @type: TaskGraph
202        """
203        return self._run_until("optimized_task_graph")
204
205    @property
206    def label_to_taskid(self):
207        """
208        A dictionary mapping task label to assigned taskId.  This property helps
209        in interpreting `optimized_task_graph`.
210
211        @type: dictionary
212        """
213        return self._run_until("label_to_taskid")
214
215    @property
216    def morphed_task_graph(self):
217        """
218        The optimized task graph, with any subsequent morphs applied. This graph
219        will have the same meaning as the optimized task graph, but be in a form
220        more palatable to TaskCluster.
221
222        @type: TaskGraph
223        """
224        return self._run_until("morphed_task_graph")
225
226    @property
227    def graph_config(self):
228        """
229        The configuration for this graph.
230
231        @type: TaskGraph
232        """
233        return self._run_until("graph_config")
234
235    def _load_kinds(self, graph_config, target_kind=None):
236        if target_kind:
237            # docker-image is an implicit dependency that never appears in
238            # kind-dependencies.
239            queue = [target_kind, "docker-image"]
240            seen_kinds = set()
241            while queue:
242                kind_name = queue.pop()
243                if kind_name in seen_kinds:
244                    continue
245                seen_kinds.add(kind_name)
246                kind = Kind.load(self.root_dir, graph_config, kind_name)
247                yield kind
248                queue.extend(kind.config.get("kind-dependencies", []))
249        else:
250            for kind_name in os.listdir(self.root_dir):
251                try:
252                    yield Kind.load(self.root_dir, graph_config, kind_name)
253                except KindNotFound:
254                    continue
255
256    def _run(self):
257        logger.info("Loading graph configuration.")
258        graph_config = load_graph_config(self.root_dir)
259
260        yield ("graph_config", graph_config)
261
262        graph_config.register()
263
264        if callable(self._parameters):
265            parameters = self._parameters(graph_config)
266        else:
267            parameters = self._parameters
268        self.verify_parameters(parameters)
269
270        filters = parameters.get("filters", [])
271        # Always add legacy target tasks method until we deprecate that API.
272        if "target_tasks_method" not in filters:
273            filters.insert(0, "target_tasks_method")
274        filters = [filter_tasks.filter_task_functions[f] for f in filters]
275
276        yield ("parameters", parameters)
277
278        logger.info("Loading kinds")
279        # put the kinds into a graph and sort topologically so that kinds are loaded
280        # in post-order
281        if parameters.get("target-kind"):
282            target_kind = parameters["target-kind"]
283            logger.info(
284                "Limiting kinds to {target_kind} and dependencies".format(
285                    target_kind=target_kind
286                )
287            )
288        kinds = {
289            kind.name: kind
290            for kind in self._load_kinds(graph_config, parameters.get("target-kind"))
291        }
292        self.verify_kinds(kinds)
293
294        edges = set()
295        for kind in six.itervalues(kinds):
296            for dep in kind.config.get("kind-dependencies", []):
297                edges.add((kind.name, dep, "kind-dependency"))
298        kind_graph = Graph(set(kinds), edges)
299
300        if parameters.get("target-kind"):
301            kind_graph = kind_graph.transitive_closure({target_kind, "docker-image"})
302
303        logger.info("Generating full task set")
304        all_tasks = {}
305        for kind_name in kind_graph.visit_postorder():
306            logger.debug("Loading tasks for kind {}".format(kind_name))
307            kind = kinds[kind_name]
308            try:
309                new_tasks = kind.load_tasks(
310                    parameters,
311                    list(all_tasks.values()),
312                    self._write_artifacts,
313                )
314            except Exception:
315                logger.exception("Error loading tasks for kind {}:".format(kind_name))
316                raise
317            for task in new_tasks:
318                if task.label in all_tasks:
319                    raise Exception("duplicate tasks with label " + task.label)
320                all_tasks[task.label] = task
321            logger.info(
322                "Generated {} tasks for kind {}".format(len(new_tasks), kind_name)
323            )
324        full_task_set = TaskGraph(all_tasks, Graph(set(all_tasks), set()))
325        self.verify_attributes(all_tasks)
326        self.verify_run_using()
327        yield verifications("full_task_set", full_task_set, graph_config, parameters)
328
329        logger.info("Generating full task graph")
330        edges = set()
331        for t in full_task_set:
332            for depname, dep in six.iteritems(t.dependencies):
333                edges.add((t.label, dep, depname))
334
335        full_task_graph = TaskGraph(all_tasks, Graph(full_task_set.graph.nodes, edges))
336        logger.info(
337            "Full task graph contains %d tasks and %d dependencies"
338            % (len(full_task_set.graph.nodes), len(edges))
339        )
340        yield verifications(
341            "full_task_graph", full_task_graph, graph_config, parameters
342        )
343
344        logger.info("Generating target task set")
345        target_task_set = TaskGraph(
346            dict(all_tasks), Graph(set(all_tasks.keys()), set())
347        )
348        for fltr in filters:
349            old_len = len(target_task_set.graph.nodes)
350            target_tasks = set(fltr(target_task_set, parameters, graph_config))
351            target_task_set = TaskGraph(
352                {l: all_tasks[l] for l in target_tasks}, Graph(target_tasks, set())
353            )
354            logger.info(
355                "Filter %s pruned %d tasks (%d remain)"
356                % (fltr.__name__, old_len - len(target_tasks), len(target_tasks))
357            )
358
359        yield verifications(
360            "target_task_set", target_task_set, graph_config, parameters
361        )
362
363        logger.info("Generating target task graph")
364        # include all docker-image build tasks here, in case they are needed for a graph morph
365        docker_image_tasks = set(
366            t.label
367            for t in six.itervalues(full_task_graph.tasks)
368            if t.attributes["kind"] == "docker-image"
369        )
370        # include all tasks with `always_target` set
371        if parameters["tasks_for"] == "hg-push":
372            always_target_tasks = set(
373                t.label
374                for t in six.itervalues(full_task_graph.tasks)
375                if t.attributes.get("always_target")
376            )
377        else:
378            always_target_tasks = set()
379        logger.info(
380            "Adding %d tasks with `always_target` attribute"
381            % (len(always_target_tasks) - len(always_target_tasks & target_tasks))
382        )
383        requested_tasks = target_tasks | docker_image_tasks | always_target_tasks
384        target_graph = full_task_graph.graph.transitive_closure(requested_tasks)
385        target_task_graph = TaskGraph(
386            {l: all_tasks[l] for l in target_graph.nodes}, target_graph
387        )
388        yield verifications(
389            "target_task_graph", target_task_graph, graph_config, parameters
390        )
391
392        logger.info("Generating optimized task graph")
393        existing_tasks = parameters.get("existing_tasks")
394        do_not_optimize = set(parameters.get("do_not_optimize", []))
395        if not parameters.get("optimize_target_tasks", True):
396            do_not_optimize = set(target_task_set.graph.nodes).union(do_not_optimize)
397
398        # this is used for testing experimental optimization strategies
399        strategies = os.environ.get(
400            "TASKGRAPH_OPTIMIZE_STRATEGIES", parameters.get("optimize_strategies")
401        )
402        if strategies:
403            strategies = find_object(strategies)
404
405        optimized_task_graph, label_to_taskid = optimize_task_graph(
406            target_task_graph,
407            requested_tasks,
408            parameters,
409            do_not_optimize,
410            self._decision_task_id,
411            existing_tasks=existing_tasks,
412            strategy_override=strategies,
413        )
414
415        yield verifications(
416            "optimized_task_graph", optimized_task_graph, graph_config, parameters
417        )
418
419        morphed_task_graph, label_to_taskid = morph(
420            optimized_task_graph,
421            label_to_taskid,
422            parameters,
423            graph_config,
424            self._decision_task_id,
425        )
426
427        yield "label_to_taskid", label_to_taskid
428        yield verifications(
429            "morphed_task_graph", morphed_task_graph, graph_config, parameters
430        )
431
432    def _run_until(self, name):
433        while name not in self._run_results:
434            try:
435                k, v = next(self._run)
436            except StopIteration:
437                raise AttributeError("No such run result {}".format(name))
438            self._run_results[k] = v
439        return self._run_results[name]
440
441    def verify_parameters(self, parameters):
442        if not parameters.strict:
443            return
444
445        parameters_dict = dict(**parameters)
446        verify_docs(
447            filename="parameters.rst",
448            identifiers=list(parameters_dict),
449            appearing_as="inline-literal",
450        )
451
452    def verify_kinds(self, kinds):
453        verify_docs(
454            filename="kinds.rst", identifiers=kinds.keys(), appearing_as="heading"
455        )
456
457    def verify_attributes(self, all_tasks):
458        attribute_set = set()
459        for label, task in six.iteritems(all_tasks):
460            attribute_set.update(task.attributes.keys())
461        verify_docs(
462            filename="attributes.rst",
463            identifiers=list(attribute_set),
464            appearing_as="heading",
465        )
466
467    def verify_run_using(self):
468        from .transforms.job import registry
469
470        verify_docs(
471            filename="transforms.rst",
472            identifiers=registry.keys(),
473            appearing_as="inline-literal",
474        )
475
476
477def load_tasks_for_kind(parameters, kind, root_dir=None):
478    """
479    Get all the tasks of a given kind.
480
481    This function is designed to be called from outside of taskgraph.
482    """
483    # make parameters read-write
484    parameters = dict(parameters)
485    parameters["target-kind"] = kind
486    parameters = Parameters(strict=False, **parameters)
487    tgg = TaskGraphGenerator(root_dir=root_dir, parameters=parameters)
488    return {
489        task.task["metadata"]["name"]: task
490        for task in tgg.full_task_set
491        if task.kind == kind
492    }
493