1# This Source Code Form is subject to the terms of the Mozilla Public 2# License, v. 2.0. If a copy of the MPL was not distributed with this 3# file, You can obtain one at http://mozilla.org/MPL/2.0/. 4 5""" 6Graph morphs are modifications to task-graphs that take place *after* the 7optimization phase. 8 9These graph morphs are largely invisible to developers running `./mach` 10locally, so they should be limited to changes that do not modify the meaning of 11the graph. 12""" 13 14# Note that the translation of `{'task-reference': '..'}` and 15# `artifact-reference` are handled in the optimization phase (since 16# optimization involves dealing with taskIds directly). Similarly, 17# `{'relative-datestamp': '..'}` is handled at the last possible moment during 18# task creation. 19 20from __future__ import absolute_import, print_function, unicode_literals 21 22import copy 23import logging 24import os 25import re 26 27import six 28 29from slugid import nice as slugid 30from .task import Task 31from .graph import Graph 32from .taskgraph import TaskGraph 33from .util.workertypes import get_worker_type 34 35here = os.path.abspath(os.path.dirname(__file__)) 36logger = logging.getLogger(__name__) 37MAX_ROUTES = 10 38 39 40def amend_taskgraph(taskgraph, label_to_taskid, to_add): 41 """Add the given tasks to the taskgraph, returning a new taskgraph""" 42 new_tasks = taskgraph.tasks.copy() 43 new_edges = set(taskgraph.graph.edges) 44 for task in to_add: 45 new_tasks[task.task_id] = task 46 assert task.label not in label_to_taskid 47 label_to_taskid[task.label] = task.task_id 48 for depname, dep in six.iteritems(task.dependencies): 49 new_edges.add((task.task_id, dep, depname)) 50 51 taskgraph = TaskGraph(new_tasks, Graph(set(new_tasks), new_edges)) 52 return taskgraph, label_to_taskid 53 54 55def derive_misc_task( 56 target_task, 57 purpose, 58 image, 59 taskgraph, 60 label_to_taskid, 61 parameters, 62 graph_config, 63 dependencies, 64): 65 """Create the shell of a task that depends on `dependencies` and on the given docker 66 image.""" 67 label = "{}-{}".format(purpose, target_task.label) 68 69 # this is why all docker image tasks are included in the target task graph: we 70 # need to find them in label_to_taskid, even if nothing else required them 71 image_taskid = label_to_taskid["docker-image-" + image] 72 73 provisioner_id, worker_type = get_worker_type( 74 graph_config, "misc", parameters["level"], parameters.release_level() 75 ) 76 77 deps = copy.copy(dependencies) 78 deps["docker-image"] = image_taskid 79 80 task_def = { 81 "provisionerId": provisioner_id, 82 "workerType": worker_type, 83 "dependencies": [d for d in deps.values()], 84 "created": {"relative-datestamp": "0 seconds"}, 85 "deadline": target_task.task["deadline"], 86 # no point existing past the parent task's deadline 87 "expires": target_task.task["deadline"], 88 "metadata": { 89 "name": label, 90 "description": "{} for {}".format(purpose, target_task.description), 91 "owner": target_task.task["metadata"]["owner"], 92 "source": target_task.task["metadata"]["source"], 93 }, 94 "scopes": [], 95 "payload": { 96 "image": { 97 "path": "public/image.tar.zst", 98 "taskId": image_taskid, 99 "type": "task-image", 100 }, 101 "features": {"taskclusterProxy": True}, 102 "maxRunTime": 600, 103 }, 104 } 105 106 if image_taskid not in taskgraph.tasks: 107 # The task above depends on the replaced docker-image not one in 108 # this current graph. 109 del deps["docker-image"] 110 111 task = Task( 112 kind="misc", 113 label=label, 114 attributes={}, 115 task=task_def, 116 dependencies=deps, 117 ) 118 task.task_id = slugid().decode("ascii") 119 return task 120 121 122# these regular expressions capture route prefixes for which we have a star 123# scope, allowing them to be summarized. Each should correspond to a star scope 124# in each Gecko `assume:repo:hg.mozilla.org/...` role. 125SCOPE_SUMMARY_REGEXPS = [ 126 re.compile(r"(index:insert-task:docker\.images\.v1\.[^.]*\.).*"), 127 re.compile(r"(index:insert-task:gecko\.v2\.[^.]*\.).*"), 128 re.compile(r"(index:insert-task:comm\.v2\.[^.]*\.).*"), 129] 130 131 132def make_index_task( 133 parent_task, 134 taskgraph, 135 label_to_taskid, 136 parameters, 137 graph_config, 138 index_paths, 139 index_rank, 140 purpose, 141 dependencies, 142): 143 task = derive_misc_task( 144 parent_task, 145 purpose, 146 "index-task", 147 taskgraph, 148 label_to_taskid, 149 parameters, 150 graph_config, 151 dependencies, 152 ) 153 154 # we need to "summarize" the scopes, otherwise a particularly 155 # namespace-heavy index task might have more scopes than can fit in a 156 # temporary credential. 157 scopes = set() 158 for path in index_paths: 159 scope = "index:insert-task:{}".format(path) 160 for summ_re in SCOPE_SUMMARY_REGEXPS: 161 match = summ_re.match(scope) 162 if match: 163 scope = match.group(1) + "*" 164 break 165 scopes.add(scope) 166 task.task["scopes"] = sorted(scopes) 167 168 task.task["payload"]["command"] = ["insert-indexes.js"] + index_paths 169 task.task["payload"]["env"] = { 170 "TARGET_TASKID": parent_task.task_id, 171 "INDEX_RANK": index_rank, 172 } 173 return task 174 175 176def add_index_tasks( 177 taskgraph, label_to_taskid, parameters, graph_config, decision_task_id 178): 179 """ 180 The TaskCluster queue only allows 10 routes on a task, but we have tasks 181 with many more routes, for purposes of indexing. This graph morph adds 182 "index tasks" that depend on such tasks and do the index insertions 183 directly, avoiding the limits on task.routes. 184 """ 185 logger.debug("Morphing: adding index tasks") 186 187 # Add indexes for tasks that exceed MAX_ROUTES. 188 added = [] 189 for label, task in six.iteritems(taskgraph.tasks): 190 if len(task.task.get("routes", [])) <= MAX_ROUTES: 191 continue 192 index_paths = [ 193 r.split(".", 1)[1] for r in task.task["routes"] if r.startswith("index.") 194 ] 195 task.task["routes"] = [ 196 r for r in task.task["routes"] if not r.startswith("index.") 197 ] 198 added.append( 199 make_index_task( 200 task, 201 taskgraph, 202 label_to_taskid, 203 parameters, 204 graph_config, 205 index_paths=index_paths, 206 index_rank=task.task.get("extra", {}).get("index", {}).get("rank", 0), 207 purpose="index-task", 208 dependencies={"parent": task.task_id}, 209 ) 210 ) 211 212 if added: 213 taskgraph, label_to_taskid = amend_taskgraph(taskgraph, label_to_taskid, added) 214 logger.info("Added {} index tasks".format(len(added))) 215 216 return taskgraph, label_to_taskid 217 218 219def add_eager_cache_index_tasks( 220 taskgraph, label_to_taskid, parameters, graph_config, decision_task_id 221): 222 """ 223 Some tasks (e.g. cached tasks) we want to exist in the index before they even 224 run/complete. Our current use is to allow us to depend on an unfinished cached 225 task in future pushes. This graph morph adds "eager-index tasks" that depend on 226 the decision task and do the index insertions directly, which does not need to 227 wait on the pointed at task to complete. 228 """ 229 logger.debug("Morphing: Adding eager cached index's") 230 231 added = [] 232 for label, task in six.iteritems(taskgraph.tasks): 233 if "eager_indexes" not in task.attributes: 234 continue 235 eager_indexes = task.attributes["eager_indexes"] 236 added.append( 237 make_index_task( 238 task, 239 taskgraph, 240 label_to_taskid, 241 parameters, 242 graph_config, 243 index_paths=eager_indexes, 244 index_rank=0, # Be sure complete tasks get priority 245 purpose="eager-index", 246 dependencies={}, 247 ) 248 ) 249 250 if added: 251 taskgraph, label_to_taskid = amend_taskgraph(taskgraph, label_to_taskid, added) 252 logger.info("Added {} eager index tasks".format(len(added))) 253 return taskgraph, label_to_taskid 254 255 256def add_try_task_duplicates( 257 taskgraph, label_to_taskid, parameters, graph_config, decision_task_id 258): 259 try_config = parameters["try_task_config"] 260 rebuild = try_config.get("rebuild") 261 if rebuild: 262 for task in six.itervalues(taskgraph.tasks): 263 if task.label in try_config.get("tasks", []): 264 task.attributes["task_duplicates"] = rebuild 265 return taskgraph, label_to_taskid 266 267 268def morph(taskgraph, label_to_taskid, parameters, graph_config, decision_task_id): 269 """Apply all morphs""" 270 morphs = [ 271 add_eager_cache_index_tasks, 272 add_index_tasks, 273 add_try_task_duplicates, 274 ] 275 276 for m in morphs: 277 taskgraph, label_to_taskid = m( 278 taskgraph, label_to_taskid, parameters, graph_config, decision_task_id 279 ) 280 return taskgraph, label_to_taskid 281