1# This Source Code Form is subject to the terms of the Mozilla Public
2# License, v. 2.0. If a copy of the MPL was not distributed with this
3# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
5"""
6Graph morphs are modifications to task-graphs that take place *after* the
7optimization phase.
8
9These graph morphs are largely invisible to developers running `./mach`
10locally, so they should be limited to changes that do not modify the meaning of
11the graph.
12"""
13
14# Note that the translation of `{'task-reference': '..'}` and
15# `artifact-reference` are handled in the optimization phase (since
16# optimization involves dealing with taskIds directly).  Similarly,
17# `{'relative-datestamp': '..'}` is handled at the last possible moment during
18# task creation.
19
20from __future__ import absolute_import, print_function, unicode_literals
21
22import copy
23import logging
24import os
25import re
26
27import six
28
29from slugid import nice as slugid
30from .task import Task
31from .graph import Graph
32from .taskgraph import TaskGraph
33from .util.workertypes import get_worker_type
34
35here = os.path.abspath(os.path.dirname(__file__))
36logger = logging.getLogger(__name__)
37MAX_ROUTES = 10
38
39
40def amend_taskgraph(taskgraph, label_to_taskid, to_add):
41    """Add the given tasks to the taskgraph, returning a new taskgraph"""
42    new_tasks = taskgraph.tasks.copy()
43    new_edges = set(taskgraph.graph.edges)
44    for task in to_add:
45        new_tasks[task.task_id] = task
46        assert task.label not in label_to_taskid
47        label_to_taskid[task.label] = task.task_id
48        for depname, dep in six.iteritems(task.dependencies):
49            new_edges.add((task.task_id, dep, depname))
50
51    taskgraph = TaskGraph(new_tasks, Graph(set(new_tasks), new_edges))
52    return taskgraph, label_to_taskid
53
54
55def derive_misc_task(
56    target_task,
57    purpose,
58    image,
59    taskgraph,
60    label_to_taskid,
61    parameters,
62    graph_config,
63    dependencies,
64):
65    """Create the shell of a task that depends on `dependencies` and on the given docker
66    image."""
67    label = "{}-{}".format(purpose, target_task.label)
68
69    # this is why all docker image tasks are included in the target task graph: we
70    # need to find them in label_to_taskid, even if nothing else required them
71    image_taskid = label_to_taskid["docker-image-" + image]
72
73    provisioner_id, worker_type = get_worker_type(
74        graph_config, "misc", parameters["level"], parameters.release_level()
75    )
76
77    deps = copy.copy(dependencies)
78    deps["docker-image"] = image_taskid
79
80    task_def = {
81        "provisionerId": provisioner_id,
82        "workerType": worker_type,
83        "dependencies": [d for d in deps.values()],
84        "created": {"relative-datestamp": "0 seconds"},
85        "deadline": target_task.task["deadline"],
86        # no point existing past the parent task's deadline
87        "expires": target_task.task["deadline"],
88        "metadata": {
89            "name": label,
90            "description": "{} for {}".format(purpose, target_task.description),
91            "owner": target_task.task["metadata"]["owner"],
92            "source": target_task.task["metadata"]["source"],
93        },
94        "scopes": [],
95        "payload": {
96            "image": {
97                "path": "public/image.tar.zst",
98                "taskId": image_taskid,
99                "type": "task-image",
100            },
101            "features": {"taskclusterProxy": True},
102            "maxRunTime": 600,
103        },
104    }
105
106    if image_taskid not in taskgraph.tasks:
107        # The task above depends on the replaced docker-image not one in
108        # this current graph.
109        del deps["docker-image"]
110
111    task = Task(
112        kind="misc",
113        label=label,
114        attributes={},
115        task=task_def,
116        dependencies=deps,
117    )
118    task.task_id = slugid().decode("ascii")
119    return task
120
121
122# these regular expressions capture route prefixes for which we have a star
123# scope, allowing them to be summarized.  Each should correspond to a star scope
124# in each Gecko `assume:repo:hg.mozilla.org/...` role.
125SCOPE_SUMMARY_REGEXPS = [
126    re.compile(r"(index:insert-task:docker\.images\.v1\.[^.]*\.).*"),
127    re.compile(r"(index:insert-task:gecko\.v2\.[^.]*\.).*"),
128    re.compile(r"(index:insert-task:comm\.v2\.[^.]*\.).*"),
129]
130
131
132def make_index_task(
133    parent_task,
134    taskgraph,
135    label_to_taskid,
136    parameters,
137    graph_config,
138    index_paths,
139    index_rank,
140    purpose,
141    dependencies,
142):
143    task = derive_misc_task(
144        parent_task,
145        purpose,
146        "index-task",
147        taskgraph,
148        label_to_taskid,
149        parameters,
150        graph_config,
151        dependencies,
152    )
153
154    # we need to "summarize" the scopes, otherwise a particularly
155    # namespace-heavy index task might have more scopes than can fit in a
156    # temporary credential.
157    scopes = set()
158    for path in index_paths:
159        scope = "index:insert-task:{}".format(path)
160        for summ_re in SCOPE_SUMMARY_REGEXPS:
161            match = summ_re.match(scope)
162            if match:
163                scope = match.group(1) + "*"
164                break
165        scopes.add(scope)
166    task.task["scopes"] = sorted(scopes)
167
168    task.task["payload"]["command"] = ["insert-indexes.js"] + index_paths
169    task.task["payload"]["env"] = {
170        "TARGET_TASKID": parent_task.task_id,
171        "INDEX_RANK": index_rank,
172    }
173    return task
174
175
176def add_index_tasks(
177    taskgraph, label_to_taskid, parameters, graph_config, decision_task_id
178):
179    """
180    The TaskCluster queue only allows 10 routes on a task, but we have tasks
181    with many more routes, for purposes of indexing. This graph morph adds
182    "index tasks" that depend on such tasks and do the index insertions
183    directly, avoiding the limits on task.routes.
184    """
185    logger.debug("Morphing: adding index tasks")
186
187    # Add indexes for tasks that exceed MAX_ROUTES.
188    added = []
189    for label, task in six.iteritems(taskgraph.tasks):
190        if len(task.task.get("routes", [])) <= MAX_ROUTES:
191            continue
192        index_paths = [
193            r.split(".", 1)[1] for r in task.task["routes"] if r.startswith("index.")
194        ]
195        task.task["routes"] = [
196            r for r in task.task["routes"] if not r.startswith("index.")
197        ]
198        added.append(
199            make_index_task(
200                task,
201                taskgraph,
202                label_to_taskid,
203                parameters,
204                graph_config,
205                index_paths=index_paths,
206                index_rank=task.task.get("extra", {}).get("index", {}).get("rank", 0),
207                purpose="index-task",
208                dependencies={"parent": task.task_id},
209            )
210        )
211
212    if added:
213        taskgraph, label_to_taskid = amend_taskgraph(taskgraph, label_to_taskid, added)
214        logger.info("Added {} index tasks".format(len(added)))
215
216    return taskgraph, label_to_taskid
217
218
219def add_eager_cache_index_tasks(
220    taskgraph, label_to_taskid, parameters, graph_config, decision_task_id
221):
222    """
223    Some tasks (e.g. cached tasks) we want to exist in the index before they even
224    run/complete. Our current use is to allow us to depend on an unfinished cached
225    task in future pushes. This graph morph adds "eager-index tasks" that depend on
226    the decision task and do the index insertions directly, which does not need to
227    wait on the pointed at task to complete.
228    """
229    logger.debug("Morphing: Adding eager cached index's")
230
231    added = []
232    for label, task in six.iteritems(taskgraph.tasks):
233        if "eager_indexes" not in task.attributes:
234            continue
235        eager_indexes = task.attributes["eager_indexes"]
236        added.append(
237            make_index_task(
238                task,
239                taskgraph,
240                label_to_taskid,
241                parameters,
242                graph_config,
243                index_paths=eager_indexes,
244                index_rank=0,  # Be sure complete tasks get priority
245                purpose="eager-index",
246                dependencies={},
247            )
248        )
249
250    if added:
251        taskgraph, label_to_taskid = amend_taskgraph(taskgraph, label_to_taskid, added)
252        logger.info("Added {} eager index tasks".format(len(added)))
253    return taskgraph, label_to_taskid
254
255
256def add_try_task_duplicates(
257    taskgraph, label_to_taskid, parameters, graph_config, decision_task_id
258):
259    try_config = parameters["try_task_config"]
260    rebuild = try_config.get("rebuild")
261    if rebuild:
262        for task in six.itervalues(taskgraph.tasks):
263            if task.label in try_config.get("tasks", []):
264                task.attributes["task_duplicates"] = rebuild
265    return taskgraph, label_to_taskid
266
267
268def morph(taskgraph, label_to_taskid, parameters, graph_config, decision_task_id):
269    """Apply all morphs"""
270    morphs = [
271        add_eager_cache_index_tasks,
272        add_index_tasks,
273        add_try_task_duplicates,
274    ]
275
276    for m in morphs:
277        taskgraph, label_to_taskid = m(
278            taskgraph, label_to_taskid, parameters, graph_config, decision_task_id
279        )
280    return taskgraph, label_to_taskid
281