1# -*- coding: utf-8 -*-
2
3# This Source Code Form is subject to the terms of the Mozilla Public
4# License, v. 2.0. If a copy of the MPL was not distributed with this
5# file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7from __future__ import absolute_import, print_function, unicode_literals
8
9import concurrent.futures as futures
10import copy
11import logging
12import os
13import re
14from functools import reduce
15
16import jsone
17import requests
18from requests.exceptions import HTTPError
19from six import text_type, string_types
20from slugid import nice as slugid
21
22from taskgraph import create
23from taskgraph.decision import read_artifact, write_artifact, rename_artifact
24from taskgraph.taskgraph import TaskGraph
25from taskgraph.optimize import optimize_task_graph
26from taskgraph.util.taskcluster import (
27    find_task_id,
28    get_artifact,
29    get_task_definition,
30    get_session,
31    list_tasks,
32    parse_time,
33    trigger_hook,
34    CONCURRENCY,
35)
36from taskgraph.util.taskgraph import find_decision_task
37
38logger = logging.getLogger(__name__)
39
40INDEX_TMPL = "gecko.v2.{}.pushlog-id.{}.decision"
41PUSHLOG_TMPL = "{}/json-pushes?version=2&startID={}&endID={}"
42
43
44def _tags_within_context(tags, context=[]):
45    """A context of [] means that it *only* applies to a task group"""
46    return any(
47        all(tag in tags and tags[tag] == tag_set[tag] for tag in tag_set.keys())
48        for tag_set in context
49    )
50
51
52def _extract_applicable_action(actions_json, action_name, task_group_id, task_id):
53    """Extract action that applies to the given task or task group.
54
55    A task (as defined by its tags) is said to match a tag-set if its
56    tags are a super-set of the tag-set. A tag-set is a set of key-value pairs.
57
58    An action (as defined by its context) is said to be relevant for
59    a given task, if the task's tags match one of the tag-sets given
60    in the context property of the action.
61
62    The order of the actions is significant. When multiple actions apply to a
63    task the first one takes precedence.
64
65    For more details visit:
66    https://docs.taskcluster.net/docs/manual/design/conventions/actions/spec
67    """
68    if task_id:
69        tags = get_task_definition(task_id).get("tags")
70
71    for _action in actions_json["actions"]:
72        if action_name != _action["name"]:
73            continue
74
75        context = _action.get("context", [])
76        # Ensure the task is within the context of the action
77        if task_id and tags and _tags_within_context(tags, context):
78            return _action
79        elif context == []:
80            return _action
81
82    available_actions = ", ".join(sorted({a["name"] for a in actions_json["actions"]}))
83    raise LookupError(
84        "{} action is not available for this task. Available: {}".format(
85            action_name, available_actions
86        )
87    )
88
89
90def trigger_action(action_name, decision_task_id, task_id=None, input={}):
91    if not decision_task_id:
92        raise ValueError("No decision task. We can't find the actions artifact.")
93    actions_json = get_artifact(decision_task_id, "public/actions.json")
94    if actions_json["version"] != 1:
95        raise RuntimeError("Wrong version of actions.json, unable to continue")
96
97    # These values substitute $eval in the template
98    context = {
99        "input": input,
100        "taskId": task_id,
101        "taskGroupId": decision_task_id,
102    }
103    # https://docs.taskcluster.net/docs/manual/design/conventions/actions/spec#variables
104    context.update(actions_json["variables"])
105    action = _extract_applicable_action(
106        actions_json, action_name, decision_task_id, task_id
107    )
108    kind = action["kind"]
109    if kind == "hook":
110        hook_payload = jsone.render(action["hookPayload"], context)
111        trigger_hook(action["hookGroupId"], action["hookId"], hook_payload)
112    else:
113        raise NotImplementedError("Unable to submit actions with {} kind.".format(kind))
114
115
116def get_pushes_from_params_input(parameters, input):
117    inclusive_tweak = 1 if input.get("inclusive") else 0
118    return get_pushes(
119        project=parameters["head_repository"],
120        end_id=int(parameters["pushlog_id"]) - (1 - inclusive_tweak),
121        depth=input.get("depth", 9) + inclusive_tweak,
122    )
123
124
125def get_pushes(project, end_id, depth):
126    pushes = []
127    while True:
128        start_id = max(end_id - depth, 0)
129        pushlog_url = PUSHLOG_TMPL.format(project, start_id, end_id)
130        logger.debug(pushlog_url)
131        r = requests.get(pushlog_url)
132        r.raise_for_status()
133        pushes = pushes + list(r.json()["pushes"].keys())
134        if len(pushes) >= depth:
135            break
136
137        end_id = start_id - 1
138        start_id -= depth
139        if start_id < 0:
140            break
141
142    pushes = sorted(pushes)[-depth:]
143    return pushes
144
145
146def get_decision_task_id(project, push_id):
147    return find_task_id(INDEX_TMPL.format(project, push_id))
148
149
150def get_parameters(decision_task_id):
151    return get_artifact(decision_task_id, "public/parameters.yml")
152
153
154def get_tasks_with_downstream(labels, full_task_graph, label_to_taskid):
155    # Used to gather tasks when downstream tasks need to run as well
156    return full_task_graph.graph.transitive_closure(
157        set(labels), reverse=True
158    ).nodes & set(label_to_taskid.keys())
159
160
161def get_downstream_browsertime_tasks(labels, full_task_graph, label_to_taskid):
162    # Used to gather tasks when downstream tasks need to run as well. This
163    # function is specific to browsertime as it doesn't take an intersection
164    # with existing tasks, making it possible to schedule these without
165    # previously having scheduled them.
166    return full_task_graph.graph.transitive_closure(set(labels), reverse=True).nodes
167
168
169def rename_browsertime_vismet_task(label):
170    # Vismet tasks have labels which are modified from
171    # the task label which created the data so we can undo it here
172    return label.replace("-vismet", "") + "-e10s"
173
174
175def fetch_graph_and_labels(parameters, graph_config):
176    decision_task_id = find_decision_task(parameters, graph_config)
177
178    # First grab the graph and labels generated during the initial decision task
179    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
180    logger.info("Load taskgraph from JSON.")
181    _, full_task_graph = TaskGraph.from_json(full_task_graph)
182    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
183
184    logger.info("Fetching additional tasks from action and cron tasks.")
185    # fetch everything in parallel; this avoids serializing any delay in downloading
186    # each artifact (such as waiting for the artifact to be mirrored locally)
187    with futures.ThreadPoolExecutor(CONCURRENCY) as e:
188        fetches = []
189
190        # fetch any modifications made by action tasks and swap out new tasks
191        # for old ones
192        def fetch_action(task_id):
193            logger.info(
194                "fetching label-to-taskid.json for action task {}".format(task_id)
195            )
196            try:
197                run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json")
198                label_to_taskid.update(run_label_to_id)
199            except HTTPError as e:
200                if e.response.status_code != 404:
201                    raise
202                logger.debug(
203                    "No label-to-taskid.json found for {}: {}".format(task_id, e)
204                )
205
206        head_rev_param = "{}head_rev".format(graph_config["project-repo-param-prefix"])
207
208        namespace = "{}.v2.{}.revision.{}.taskgraph.actions".format(
209            graph_config["trust-domain"],
210            parameters["project"],
211            parameters[head_rev_param],
212        )
213        for task_id in list_tasks(namespace):
214            fetches.append(e.submit(fetch_action, task_id))
215
216        # Similarly for cron tasks..
217        def fetch_cron(task_id):
218            logger.info(
219                "fetching label-to-taskid.json for cron task {}".format(task_id)
220            )
221            try:
222                run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json")
223                label_to_taskid.update(run_label_to_id)
224            except HTTPError as e:
225                if e.response.status_code != 404:
226                    raise
227                logger.debug(
228                    "No label-to-taskid.json found for {}: {}".format(task_id, e)
229                )
230
231        namespace = "{}.v2.{}.revision.{}.cron".format(
232            graph_config["trust-domain"],
233            parameters["project"],
234            parameters[head_rev_param],
235        )
236        for task_id in list_tasks(namespace):
237            fetches.append(e.submit(fetch_cron, task_id))
238
239        # now wait for each fetch to complete, raising an exception if there
240        # were any issues
241        for f in futures.as_completed(fetches):
242            f.result()
243
244    return (decision_task_id, full_task_graph, label_to_taskid)
245
246
247def create_task_from_def(task_def, level):
248    """Create a new task from a definition rather than from a label
249    that is already in the full-task-graph. The task definition will
250    have {relative-datestamp': '..'} rendered just like in a decision task.
251    Use this for entirely new tasks or ones that change internals of the task.
252    It is useful if you want to "edit" the full_task_graph and then hand
253    it to this function. No dependencies will be scheduled. You must handle
254    this yourself. Seeing how create_tasks handles it might prove helpful."""
255    task_def["schedulerId"] = "gecko-level-{}".format(level)
256    label = task_def["metadata"]["name"]
257    task_id = slugid().decode("ascii")
258    session = get_session()
259    create.create_task(session, task_id, label, task_def)
260
261
262def update_parent(task, graph):
263    task.task.setdefault("extra", {})["parent"] = os.environ.get("TASK_ID", "")
264    return task
265
266
267def update_dependencies(task, graph):
268    if os.environ.get("TASK_ID"):
269        task.task.setdefault("dependencies", []).append(os.environ["TASK_ID"])
270    return task
271
272
273def create_tasks(
274    graph_config,
275    to_run,
276    full_task_graph,
277    label_to_taskid,
278    params,
279    decision_task_id,
280    suffix="",
281    modifier=lambda t: t,
282):
283    """Create new tasks.  The task definition will have {relative-datestamp':
284    '..'} rendered just like in a decision task.  Action callbacks should use
285    this function to create new tasks,
286    allowing easy debugging with `mach taskgraph action-callback --test`.
287    This builds up all required tasks to run in order to run the tasks requested.
288
289    Optionally this function takes a `modifier` function that is passed in each
290    task before it is put into a new graph. It should return a valid task. Note
291    that this is passed _all_ tasks in the graph, not just the set in to_run. You
292    may want to skip modifying tasks not in your to_run list.
293
294    If `suffix` is given, then it is used to give unique names to the resulting
295    artifacts.  If you call this function multiple times in the same action,
296    pass a different suffix each time to avoid overwriting artifacts.
297
298    If you wish to create the tasks in a new group, leave out decision_task_id.
299
300    Returns an updated label_to_taskid containing the new tasks"""
301    if suffix != "":
302        suffix = "-{}".format(suffix)
303    to_run = set(to_run)
304
305    #  Copy to avoid side-effects later
306    full_task_graph = copy.deepcopy(full_task_graph)
307    label_to_taskid = label_to_taskid.copy()
308
309    target_graph = full_task_graph.graph.transitive_closure(to_run)
310    target_task_graph = TaskGraph(
311        {l: modifier(full_task_graph[l]) for l in target_graph.nodes}, target_graph
312    )
313    target_task_graph.for_each_task(update_parent)
314    if decision_task_id and decision_task_id != os.environ.get("TASK_ID"):
315        target_task_graph.for_each_task(update_dependencies)
316    optimized_task_graph, label_to_taskid = optimize_task_graph(
317        target_task_graph,
318        to_run,
319        params,
320        to_run,
321        decision_task_id,
322        existing_tasks=label_to_taskid,
323    )
324    write_artifact("task-graph{}.json".format(suffix), optimized_task_graph.to_json())
325    write_artifact("label-to-taskid{}.json".format(suffix), label_to_taskid)
326    write_artifact("to-run{}.json".format(suffix), list(to_run))
327    create.create_tasks(
328        graph_config,
329        optimized_task_graph,
330        label_to_taskid,
331        params,
332        decision_task_id,
333    )
334    return label_to_taskid
335
336
337def _update_reducer(accumulator, new_value):
338    "similar to set or dict `update` method, but returning the modified object"
339    accumulator.update(new_value)
340    return accumulator
341
342
343def combine_task_graph_files(suffixes):
344    """Combine task-graph-{suffix}.json files into a single task-graph.json file.
345
346    Since Chain of Trust verification requires a task-graph.json file that
347    contains all children tasks, we can combine the various task-graph-0.json
348    type files into a master task-graph.json file at the end.
349
350    Actions also look for various artifacts, so we combine those in a similar
351    fashion.
352
353    In the case where there is only one suffix, we simply rename it to avoid the
354    additional cost of uploading two copies of the same data.
355    """
356
357    if len(suffixes) == 1:
358        for filename in ["task-graph", "label-to-taskid", "to-run"]:
359            rename_artifact(
360                "{}-{}.json".format(filename, suffixes[0]), "{}.json".format(filename)
361            )
362        return
363
364    def combine(file_contents, base):
365        return reduce(_update_reducer, file_contents, base)
366
367    files = [read_artifact("task-graph-{}.json".format(suffix)) for suffix in suffixes]
368    write_artifact("task-graph.json", combine(files, dict()))
369
370    files = [
371        read_artifact("label-to-taskid-{}.json".format(suffix)) for suffix in suffixes
372    ]
373    write_artifact("label-to-taskid.json", combine(files, dict()))
374
375    files = [read_artifact("to-run-{}.json".format(suffix)) for suffix in suffixes]
376    write_artifact("to-run.json", list(combine(files, set())))
377
378
379def relativize_datestamps(task_def):
380    """
381    Given a task definition as received from the queue, convert all datestamps
382    to {relative_datestamp: ..} format, with the task creation time as "now".
383    The result is useful for handing to ``create_task``.
384    """
385    base = parse_time(task_def["created"])
386    # borrowed from https://github.com/epoberezkin/ajv/blob/master/lib/compile/formats.js
387    ts_pattern = re.compile(
388        r"^\d\d\d\d-[0-1]\d-[0-3]\d[t\s]"
389        r"(?:[0-2]\d:[0-5]\d:[0-5]\d|23:59:60)(?:\.\d+)?"
390        r"(?:z|[+-]\d\d:\d\d)$",
391        re.I,
392    )
393
394    def recurse(value):
395        if isinstance(value, text_type):
396            if ts_pattern.match(value):
397                value = parse_time(value)
398                diff = value - base
399                return {
400                    "relative-datestamp": "{} seconds".format(int(diff.total_seconds()))
401                }
402        if isinstance(value, list):
403            return [recurse(e) for e in value]
404        if isinstance(value, dict):
405            return {k: recurse(v) for k, v in value.items()}
406        return value
407
408    return recurse(task_def)
409
410
411def add_args_to_command(cmd_parts, extra_args=[]):
412    """
413    Add custom command line args to a given command.
414    args:
415      cmd_parts: the raw command as seen by taskcluster
416      extra_args: array of args we want to add
417    """
418    # Prevent modification of the caller's copy of cmd_parts
419    cmd_parts = copy.deepcopy(cmd_parts)
420    cmd_type = "default"
421    if len(cmd_parts) == 1 and isinstance(cmd_parts[0], dict):
422        # windows has single cmd part as dict: 'task-reference', with long string
423        cmd_parts = cmd_parts[0]["task-reference"].split(" ")
424        cmd_type = "dict"
425    elif len(cmd_parts) == 1 and isinstance(cmd_parts[0], string_types):
426        # windows has single cmd part as a long string
427        cmd_parts = cmd_parts[0].split(" ")
428        cmd_type = "unicode"
429    elif len(cmd_parts) == 1 and isinstance(cmd_parts[0], list):
430        # osx has an single value array with an array inside
431        cmd_parts = cmd_parts[0]
432        cmd_type = "subarray"
433    elif len(cmd_parts) == 2 and isinstance(cmd_parts[1], list):
434        # osx has an double value array with an array inside each element.
435        # The first element is a pre-requisite command while the second
436        # is the actual test command.
437        cmd_type = "subarray2"
438
439    if cmd_type == "subarray2":
440        cmd_parts[1].extend(extra_args)
441    else:
442        cmd_parts.extend(extra_args)
443
444    if cmd_type == "dict":
445        cmd_parts = [{"task-reference": " ".join(cmd_parts)}]
446    elif cmd_type == "unicode":
447        cmd_parts = [" ".join(cmd_parts)]
448    elif cmd_type == "subarray":
449        cmd_parts = [cmd_parts]
450    return cmd_parts
451