1# -*- coding: utf-8 -*-
2# This Source Code Form is subject to the terms of the Mozilla Public
3# License, v. 2.0. If a copy of the MPL was not distributed with this
4# file, You can obtain one at http://mozilla.org/MPL/2.0/.
5
6import os
7
8import datetime
9import logging
10import requests
11from taskcluster.generated import _client_importer
12from taskcluster.generated.aio import _client_importer as _async_client_importer
13from taskcluster.utils import stringDate
14import urllib.parse
15
16logger = logging.getLogger(__name__)
17
18
19class TaskclusterConfig(object):
20    """
21    Local configuration used to access Taskcluster service and objects
22    """
23
24    def __init__(self, url=None):
25        self.options = None
26        self.secrets = None
27        self.default_url = url if url is not None else os.environ.get("TASKCLUSTER_ROOT_URL")
28        assert self.default_url is not None, "You must specify a Taskcluster deployment url"
29
30    def auth(self, client_id=None, access_token=None, max_retries=12):
31        """
32        Build Taskcluster credentials options
33        Supports, by order of preference:
34         * directly provided credentials
35         * credentials from environment variables
36         * taskclusterProxy
37         * no authentication
38        """
39        self.options = {"maxRetries": max_retries}
40
41        if client_id is None and access_token is None:
42            # Credentials preference: Use env. variables
43            client_id = os.environ.get("TASKCLUSTER_CLIENT_ID")
44            access_token = os.environ.get("TASKCLUSTER_ACCESS_TOKEN")
45            logger.info("Using taskcluster credentials from environment")
46        else:
47            logger.info("Using taskcluster credentials from cli")
48
49        if client_id is not None and access_token is not None:
50            # Use provided credentials
51            self.options["credentials"] = {
52                "clientId": client_id,
53                "accessToken": access_token,
54            }
55            self.options["rootUrl"] = self.default_url
56
57        elif "TASK_ID" in os.environ:
58            # Use Taskcluster Proxy when running in a task
59            logger.info("Taskcluster Proxy enabled")
60            self.options["rootUrl"] = os.environ.get("TASKCLUSTER_PROXY_URL", "http://taskcluster")
61
62        else:
63            logger.info("No Taskcluster authentication.")
64            self.options["rootUrl"] = self.default_url
65
66    def get_service(self, service_name, use_async=False):
67        """
68        Build a Taskcluster service instance using current authentication
69        """
70        if self.options is None:
71            self.auth()
72
73        client_importer = _async_client_importer if use_async else _client_importer
74        service = getattr(client_importer, service_name.capitalize(), None)
75        assert service is not None, "Invalid Taskcluster service {}".format(
76            service_name
77        )
78        return service(self.options)
79
80    def load_secrets(
81        self, secret_name, prefixes=[], required=[], existing={}, local_secrets=None
82    ):
83        """Shortcut to use load_secrets helper with current authentication"""
84        self.secrets = load_secrets(
85            self.get_service('secrets'),
86            secret_name,
87            prefixes,
88            required,
89            existing,
90            local_secrets,
91        )
92        return self.secrets
93
94    def upload_artifact(self, artifact_path, content, content_type, ttl):
95        """Shortcut to use upload_artifact helper with current authentication"""
96        path = upload_artifact(
97            self.get_service('queue'),
98            artifact_path,
99            content,
100            content_type,
101            ttl,
102        )
103
104        return urllib.parse.urljoin(self.default_url, path)
105
106
107def load_secrets(
108    secrets_service, secret_name, prefixes=[], required=[], existing={}, local_secrets=None
109):
110    """
111    Fetch a specific set of secrets by name and verify that the required
112    secrets exist.
113    Also supports providing local secrets to avoid using remote Taskcluster service
114    for local development (or contributor onboarding)
115    A user can specify prefixes to limit the part of secrets used (useful when a secret
116    is shared amongst several services)
117    """
118    secrets = {}
119    if existing:
120        secrets.update(existing)
121
122    if isinstance(local_secrets, dict):
123        # Use local secrets file to avoid using Taskcluster secrets
124        logger.info("Using provided local secrets")
125        all_secrets = local_secrets
126    else:
127        # Use Taskcluster secret service
128        assert secret_name is not None, "Missing Taskcluster secret secret_name"
129        all_secrets = secrets_service.get(secret_name).get("secret", dict())
130        logger.info("Loaded Taskcluster secret {}".format(secret_name))
131
132    if prefixes:
133        # Use secrets behind supported prefixes
134        for prefix in prefixes:
135            secrets.update(all_secrets.get(prefix, dict()))
136
137    else:
138        # Use all secrets available
139        secrets.update(all_secrets)
140
141    # Check required secrets
142    for required_secret in required:
143        if required_secret not in secrets:
144            raise Exception("Missing value {} in secrets.".format(required_secret))
145
146    return secrets
147
148
149def upload_artifact(queue_service, artifact_path, content, content_type, ttl):
150    """
151    DEPRECATED. Do not use.
152    """
153    task_id = os.environ.get("TASK_ID")
154    run_id = os.environ.get("RUN_ID")
155    proxy = os.environ.get("TASKCLUSTER_PROXY_URL")
156    assert task_id and run_id and proxy, "Can only run in Taskcluster tasks with proxy"
157    assert isinstance(content, str)
158    assert isinstance(ttl, datetime.timedelta)
159
160    # Create S3 artifact on Taskcluster
161    resp = queue_service.createArtifact(
162        task_id,
163        run_id,
164        artifact_path,
165        {
166            "storageType": "s3",
167            "expires": stringDate(datetime.datetime.utcnow() + ttl),
168            "contentType": content_type,
169        },
170    )
171    assert resp["storageType"] == "s3", "Not an s3 storage"
172    assert "putUrl" in resp, "Missing putUrl"
173    assert "contentType" in resp, "Missing contentType"
174
175    # Push the artifact on storage service
176    headers = {"Content-Type": resp["contentType"]}
177    push = requests.put(url=resp["putUrl"], headers=headers, data=content)
178    push.raise_for_status()
179
180    # Build the absolute url
181    return "/api/queue/v1/task/{task_id}/runs/{run_id}/artifacts/{path}".format(
182        task_id=task_id,
183        run_id=run_id,
184        path=artifact_path,
185    )
186