1# -*- coding: utf-8 -*- 2 3# This Source Code Form is subject to the terms of the Mozilla Public 4# License, v. 2.0. If a copy of the MPL was not distributed with this 5# file, You can obtain one at http://mozilla.org/MPL/2.0/. 6 7from __future__ import absolute_import, print_function, unicode_literals 8 9import concurrent.futures as futures 10import copy 11import logging 12import os 13import re 14from functools import reduce 15 16import jsone 17import requests 18from requests.exceptions import HTTPError 19from six import text_type, string_types 20from slugid import nice as slugid 21 22from taskgraph import create 23from taskgraph.decision import read_artifact, write_artifact, rename_artifact 24from taskgraph.taskgraph import TaskGraph 25from taskgraph.optimize import optimize_task_graph 26from taskgraph.util.taskcluster import ( 27 find_task_id, 28 get_artifact, 29 get_task_definition, 30 get_session, 31 list_tasks, 32 parse_time, 33 trigger_hook, 34 CONCURRENCY, 35) 36from taskgraph.util.taskgraph import find_decision_task 37 38logger = logging.getLogger(__name__) 39 40INDEX_TMPL = "gecko.v2.{}.pushlog-id.{}.decision" 41PUSHLOG_TMPL = "{}/json-pushes?version=2&startID={}&endID={}" 42 43 44def _tags_within_context(tags, context=[]): 45 """A context of [] means that it *only* applies to a task group""" 46 return any( 47 all(tag in tags and tags[tag] == tag_set[tag] for tag in tag_set.keys()) 48 for tag_set in context 49 ) 50 51 52def _extract_applicable_action(actions_json, action_name, task_group_id, task_id): 53 """Extract action that applies to the given task or task group. 54 55 A task (as defined by its tags) is said to match a tag-set if its 56 tags are a super-set of the tag-set. A tag-set is a set of key-value pairs. 57 58 An action (as defined by its context) is said to be relevant for 59 a given task, if the task's tags match one of the tag-sets given 60 in the context property of the action. 61 62 The order of the actions is significant. When multiple actions apply to a 63 task the first one takes precedence. 64 65 For more details visit: 66 https://docs.taskcluster.net/docs/manual/design/conventions/actions/spec 67 """ 68 if task_id: 69 tags = get_task_definition(task_id).get("tags") 70 71 for _action in actions_json["actions"]: 72 if action_name != _action["name"]: 73 continue 74 75 context = _action.get("context", []) 76 # Ensure the task is within the context of the action 77 if task_id and tags and _tags_within_context(tags, context): 78 return _action 79 elif context == []: 80 return _action 81 82 available_actions = ", ".join(sorted({a["name"] for a in actions_json["actions"]})) 83 raise LookupError( 84 "{} action is not available for this task. Available: {}".format( 85 action_name, available_actions 86 ) 87 ) 88 89 90def trigger_action(action_name, decision_task_id, task_id=None, input={}): 91 if not decision_task_id: 92 raise ValueError("No decision task. We can't find the actions artifact.") 93 actions_json = get_artifact(decision_task_id, "public/actions.json") 94 if actions_json["version"] != 1: 95 raise RuntimeError("Wrong version of actions.json, unable to continue") 96 97 # These values substitute $eval in the template 98 context = { 99 "input": input, 100 "taskId": task_id, 101 "taskGroupId": decision_task_id, 102 } 103 # https://docs.taskcluster.net/docs/manual/design/conventions/actions/spec#variables 104 context.update(actions_json["variables"]) 105 action = _extract_applicable_action( 106 actions_json, action_name, decision_task_id, task_id 107 ) 108 kind = action["kind"] 109 if kind == "hook": 110 hook_payload = jsone.render(action["hookPayload"], context) 111 trigger_hook(action["hookGroupId"], action["hookId"], hook_payload) 112 else: 113 raise NotImplementedError("Unable to submit actions with {} kind.".format(kind)) 114 115 116def get_pushes_from_params_input(parameters, input): 117 inclusive_tweak = 1 if input.get("inclusive") else 0 118 return get_pushes( 119 project=parameters["head_repository"], 120 end_id=int(parameters["pushlog_id"]) - (1 - inclusive_tweak), 121 depth=input.get("depth", 9) + inclusive_tweak, 122 ) 123 124 125def get_pushes(project, end_id, depth): 126 pushes = [] 127 while True: 128 start_id = max(end_id - depth, 0) 129 pushlog_url = PUSHLOG_TMPL.format(project, start_id, end_id) 130 logger.debug(pushlog_url) 131 r = requests.get(pushlog_url) 132 r.raise_for_status() 133 pushes = pushes + list(r.json()["pushes"].keys()) 134 if len(pushes) >= depth: 135 break 136 137 end_id = start_id - 1 138 start_id -= depth 139 if start_id < 0: 140 break 141 142 pushes = sorted(pushes)[-depth:] 143 return pushes 144 145 146def get_decision_task_id(project, push_id): 147 return find_task_id(INDEX_TMPL.format(project, push_id)) 148 149 150def get_parameters(decision_task_id): 151 return get_artifact(decision_task_id, "public/parameters.yml") 152 153 154def get_tasks_with_downstream(labels, full_task_graph, label_to_taskid): 155 # Used to gather tasks when downstream tasks need to run as well 156 return full_task_graph.graph.transitive_closure( 157 set(labels), reverse=True 158 ).nodes & set(label_to_taskid.keys()) 159 160 161def get_downstream_browsertime_tasks(labels, full_task_graph, label_to_taskid): 162 # Used to gather tasks when downstream tasks need to run as well. This 163 # function is specific to browsertime as it doesn't take an intersection 164 # with existing tasks, making it possible to schedule these without 165 # previously having scheduled them. 166 return full_task_graph.graph.transitive_closure(set(labels), reverse=True).nodes 167 168 169def rename_browsertime_vismet_task(label): 170 # Vismet tasks have labels which are modified from 171 # the task label which created the data so we can undo it here 172 return label.replace("-vismet", "") + "-e10s" 173 174 175def fetch_graph_and_labels(parameters, graph_config): 176 decision_task_id = find_decision_task(parameters, graph_config) 177 178 # First grab the graph and labels generated during the initial decision task 179 full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json") 180 logger.info("Load taskgraph from JSON.") 181 _, full_task_graph = TaskGraph.from_json(full_task_graph) 182 label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json") 183 184 logger.info("Fetching additional tasks from action and cron tasks.") 185 # fetch everything in parallel; this avoids serializing any delay in downloading 186 # each artifact (such as waiting for the artifact to be mirrored locally) 187 with futures.ThreadPoolExecutor(CONCURRENCY) as e: 188 fetches = [] 189 190 # fetch any modifications made by action tasks and swap out new tasks 191 # for old ones 192 def fetch_action(task_id): 193 logger.info( 194 "fetching label-to-taskid.json for action task {}".format(task_id) 195 ) 196 try: 197 run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json") 198 label_to_taskid.update(run_label_to_id) 199 except HTTPError as e: 200 if e.response.status_code != 404: 201 raise 202 logger.debug( 203 "No label-to-taskid.json found for {}: {}".format(task_id, e) 204 ) 205 206 head_rev_param = "{}head_rev".format(graph_config["project-repo-param-prefix"]) 207 208 namespace = "{}.v2.{}.revision.{}.taskgraph.actions".format( 209 graph_config["trust-domain"], 210 parameters["project"], 211 parameters[head_rev_param], 212 ) 213 for task_id in list_tasks(namespace): 214 fetches.append(e.submit(fetch_action, task_id)) 215 216 # Similarly for cron tasks.. 217 def fetch_cron(task_id): 218 logger.info( 219 "fetching label-to-taskid.json for cron task {}".format(task_id) 220 ) 221 try: 222 run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json") 223 label_to_taskid.update(run_label_to_id) 224 except HTTPError as e: 225 if e.response.status_code != 404: 226 raise 227 logger.debug( 228 "No label-to-taskid.json found for {}: {}".format(task_id, e) 229 ) 230 231 namespace = "{}.v2.{}.revision.{}.cron".format( 232 graph_config["trust-domain"], 233 parameters["project"], 234 parameters[head_rev_param], 235 ) 236 for task_id in list_tasks(namespace): 237 fetches.append(e.submit(fetch_cron, task_id)) 238 239 # now wait for each fetch to complete, raising an exception if there 240 # were any issues 241 for f in futures.as_completed(fetches): 242 f.result() 243 244 return (decision_task_id, full_task_graph, label_to_taskid) 245 246 247def create_task_from_def(task_def, level): 248 """Create a new task from a definition rather than from a label 249 that is already in the full-task-graph. The task definition will 250 have {relative-datestamp': '..'} rendered just like in a decision task. 251 Use this for entirely new tasks or ones that change internals of the task. 252 It is useful if you want to "edit" the full_task_graph and then hand 253 it to this function. No dependencies will be scheduled. You must handle 254 this yourself. Seeing how create_tasks handles it might prove helpful.""" 255 task_def["schedulerId"] = "gecko-level-{}".format(level) 256 label = task_def["metadata"]["name"] 257 task_id = slugid().decode("ascii") 258 session = get_session() 259 create.create_task(session, task_id, label, task_def) 260 261 262def update_parent(task, graph): 263 task.task.setdefault("extra", {})["parent"] = os.environ.get("TASK_ID", "") 264 return task 265 266 267def update_dependencies(task, graph): 268 if os.environ.get("TASK_ID"): 269 task.task.setdefault("dependencies", []).append(os.environ["TASK_ID"]) 270 return task 271 272 273def create_tasks( 274 graph_config, 275 to_run, 276 full_task_graph, 277 label_to_taskid, 278 params, 279 decision_task_id, 280 suffix="", 281 modifier=lambda t: t, 282): 283 """Create new tasks. The task definition will have {relative-datestamp': 284 '..'} rendered just like in a decision task. Action callbacks should use 285 this function to create new tasks, 286 allowing easy debugging with `mach taskgraph action-callback --test`. 287 This builds up all required tasks to run in order to run the tasks requested. 288 289 Optionally this function takes a `modifier` function that is passed in each 290 task before it is put into a new graph. It should return a valid task. Note 291 that this is passed _all_ tasks in the graph, not just the set in to_run. You 292 may want to skip modifying tasks not in your to_run list. 293 294 If `suffix` is given, then it is used to give unique names to the resulting 295 artifacts. If you call this function multiple times in the same action, 296 pass a different suffix each time to avoid overwriting artifacts. 297 298 If you wish to create the tasks in a new group, leave out decision_task_id. 299 300 Returns an updated label_to_taskid containing the new tasks""" 301 if suffix != "": 302 suffix = "-{}".format(suffix) 303 to_run = set(to_run) 304 305 # Copy to avoid side-effects later 306 full_task_graph = copy.deepcopy(full_task_graph) 307 label_to_taskid = label_to_taskid.copy() 308 309 target_graph = full_task_graph.graph.transitive_closure(to_run) 310 target_task_graph = TaskGraph( 311 {l: modifier(full_task_graph[l]) for l in target_graph.nodes}, target_graph 312 ) 313 target_task_graph.for_each_task(update_parent) 314 if decision_task_id and decision_task_id != os.environ.get("TASK_ID"): 315 target_task_graph.for_each_task(update_dependencies) 316 optimized_task_graph, label_to_taskid = optimize_task_graph( 317 target_task_graph, 318 to_run, 319 params, 320 to_run, 321 decision_task_id, 322 existing_tasks=label_to_taskid, 323 ) 324 write_artifact("task-graph{}.json".format(suffix), optimized_task_graph.to_json()) 325 write_artifact("label-to-taskid{}.json".format(suffix), label_to_taskid) 326 write_artifact("to-run{}.json".format(suffix), list(to_run)) 327 create.create_tasks( 328 graph_config, 329 optimized_task_graph, 330 label_to_taskid, 331 params, 332 decision_task_id, 333 ) 334 return label_to_taskid 335 336 337def _update_reducer(accumulator, new_value): 338 "similar to set or dict `update` method, but returning the modified object" 339 accumulator.update(new_value) 340 return accumulator 341 342 343def combine_task_graph_files(suffixes): 344 """Combine task-graph-{suffix}.json files into a single task-graph.json file. 345 346 Since Chain of Trust verification requires a task-graph.json file that 347 contains all children tasks, we can combine the various task-graph-0.json 348 type files into a master task-graph.json file at the end. 349 350 Actions also look for various artifacts, so we combine those in a similar 351 fashion. 352 353 In the case where there is only one suffix, we simply rename it to avoid the 354 additional cost of uploading two copies of the same data. 355 """ 356 357 if len(suffixes) == 1: 358 for filename in ["task-graph", "label-to-taskid", "to-run"]: 359 rename_artifact( 360 "{}-{}.json".format(filename, suffixes[0]), "{}.json".format(filename) 361 ) 362 return 363 364 def combine(file_contents, base): 365 return reduce(_update_reducer, file_contents, base) 366 367 files = [read_artifact("task-graph-{}.json".format(suffix)) for suffix in suffixes] 368 write_artifact("task-graph.json", combine(files, dict())) 369 370 files = [ 371 read_artifact("label-to-taskid-{}.json".format(suffix)) for suffix in suffixes 372 ] 373 write_artifact("label-to-taskid.json", combine(files, dict())) 374 375 files = [read_artifact("to-run-{}.json".format(suffix)) for suffix in suffixes] 376 write_artifact("to-run.json", list(combine(files, set()))) 377 378 379def relativize_datestamps(task_def): 380 """ 381 Given a task definition as received from the queue, convert all datestamps 382 to {relative_datestamp: ..} format, with the task creation time as "now". 383 The result is useful for handing to ``create_task``. 384 """ 385 base = parse_time(task_def["created"]) 386 # borrowed from https://github.com/epoberezkin/ajv/blob/master/lib/compile/formats.js 387 ts_pattern = re.compile( 388 r"^\d\d\d\d-[0-1]\d-[0-3]\d[t\s]" 389 r"(?:[0-2]\d:[0-5]\d:[0-5]\d|23:59:60)(?:\.\d+)?" 390 r"(?:z|[+-]\d\d:\d\d)$", 391 re.I, 392 ) 393 394 def recurse(value): 395 if isinstance(value, text_type): 396 if ts_pattern.match(value): 397 value = parse_time(value) 398 diff = value - base 399 return { 400 "relative-datestamp": "{} seconds".format(int(diff.total_seconds())) 401 } 402 if isinstance(value, list): 403 return [recurse(e) for e in value] 404 if isinstance(value, dict): 405 return {k: recurse(v) for k, v in value.items()} 406 return value 407 408 return recurse(task_def) 409 410 411def add_args_to_command(cmd_parts, extra_args=[]): 412 """ 413 Add custom command line args to a given command. 414 args: 415 cmd_parts: the raw command as seen by taskcluster 416 extra_args: array of args we want to add 417 """ 418 # Prevent modification of the caller's copy of cmd_parts 419 cmd_parts = copy.deepcopy(cmd_parts) 420 cmd_type = "default" 421 if len(cmd_parts) == 1 and isinstance(cmd_parts[0], dict): 422 # windows has single cmd part as dict: 'task-reference', with long string 423 cmd_parts = cmd_parts[0]["task-reference"].split(" ") 424 cmd_type = "dict" 425 elif len(cmd_parts) == 1 and isinstance(cmd_parts[0], string_types): 426 # windows has single cmd part as a long string 427 cmd_parts = cmd_parts[0].split(" ") 428 cmd_type = "unicode" 429 elif len(cmd_parts) == 1 and isinstance(cmd_parts[0], list): 430 # osx has an single value array with an array inside 431 cmd_parts = cmd_parts[0] 432 cmd_type = "subarray" 433 elif len(cmd_parts) == 2 and isinstance(cmd_parts[1], list): 434 # osx has an double value array with an array inside each element. 435 # The first element is a pre-requisite command while the second 436 # is the actual test command. 437 cmd_type = "subarray2" 438 439 if cmd_type == "subarray2": 440 cmd_parts[1].extend(extra_args) 441 else: 442 cmd_parts.extend(extra_args) 443 444 if cmd_type == "dict": 445 cmd_parts = [{"task-reference": " ".join(cmd_parts)}] 446 elif cmd_type == "unicode": 447 cmd_parts = [" ".join(cmd_parts)] 448 elif cmd_type == "subarray": 449 cmd_parts = [cmd_parts] 450 return cmd_parts 451