1# -*- coding: utf-8 -*- 2# This Source Code Form is subject to the terms of the Mozilla Public 3# License, v. 2.0. If a copy of the MPL was not distributed with this 4# file, You can obtain one at http://mozilla.org/MPL/2.0/. 5 6import os 7 8import datetime 9import logging 10import requests 11from taskcluster.generated import _client_importer 12from taskcluster.generated.aio import _client_importer as _async_client_importer 13from taskcluster.utils import stringDate 14import urllib.parse 15 16logger = logging.getLogger(__name__) 17 18 19class TaskclusterConfig(object): 20 """ 21 Local configuration used to access Taskcluster service and objects 22 """ 23 24 def __init__(self, url=None): 25 self.options = None 26 self.secrets = None 27 self.default_url = url if url is not None else os.environ.get("TASKCLUSTER_ROOT_URL") 28 assert self.default_url is not None, "You must specify a Taskcluster deployment url" 29 30 def auth(self, client_id=None, access_token=None, max_retries=12): 31 """ 32 Build Taskcluster credentials options 33 Supports, by order of preference: 34 * directly provided credentials 35 * credentials from environment variables 36 * taskclusterProxy 37 * no authentication 38 """ 39 self.options = {"maxRetries": max_retries} 40 41 if client_id is None and access_token is None: 42 # Credentials preference: Use env. variables 43 client_id = os.environ.get("TASKCLUSTER_CLIENT_ID") 44 access_token = os.environ.get("TASKCLUSTER_ACCESS_TOKEN") 45 logger.info("Using taskcluster credentials from environment") 46 else: 47 logger.info("Using taskcluster credentials from cli") 48 49 if client_id is not None and access_token is not None: 50 # Use provided credentials 51 self.options["credentials"] = { 52 "clientId": client_id, 53 "accessToken": access_token, 54 } 55 self.options["rootUrl"] = self.default_url 56 57 elif "TASK_ID" in os.environ: 58 # Use Taskcluster Proxy when running in a task 59 logger.info("Taskcluster Proxy enabled") 60 self.options["rootUrl"] = os.environ.get("TASKCLUSTER_PROXY_URL", "http://taskcluster") 61 62 else: 63 logger.info("No Taskcluster authentication.") 64 self.options["rootUrl"] = self.default_url 65 66 def get_service(self, service_name, use_async=False): 67 """ 68 Build a Taskcluster service instance using current authentication 69 """ 70 if self.options is None: 71 self.auth() 72 73 client_importer = _async_client_importer if use_async else _client_importer 74 service = getattr(client_importer, service_name.capitalize(), None) 75 assert service is not None, "Invalid Taskcluster service {}".format( 76 service_name 77 ) 78 return service(self.options) 79 80 def load_secrets( 81 self, secret_name, prefixes=[], required=[], existing={}, local_secrets=None 82 ): 83 """Shortcut to use load_secrets helper with current authentication""" 84 self.secrets = load_secrets( 85 self.get_service('secrets'), 86 secret_name, 87 prefixes, 88 required, 89 existing, 90 local_secrets, 91 ) 92 return self.secrets 93 94 def upload_artifact(self, artifact_path, content, content_type, ttl): 95 """Shortcut to use upload_artifact helper with current authentication""" 96 path = upload_artifact( 97 self.get_service('queue'), 98 artifact_path, 99 content, 100 content_type, 101 ttl, 102 ) 103 104 return urllib.parse.urljoin(self.default_url, path) 105 106 107def load_secrets( 108 secrets_service, secret_name, prefixes=[], required=[], existing={}, local_secrets=None 109): 110 """ 111 Fetch a specific set of secrets by name and verify that the required 112 secrets exist. 113 Also supports providing local secrets to avoid using remote Taskcluster service 114 for local development (or contributor onboarding) 115 A user can specify prefixes to limit the part of secrets used (useful when a secret 116 is shared amongst several services) 117 """ 118 secrets = {} 119 if existing: 120 secrets.update(existing) 121 122 if isinstance(local_secrets, dict): 123 # Use local secrets file to avoid using Taskcluster secrets 124 logger.info("Using provided local secrets") 125 all_secrets = local_secrets 126 else: 127 # Use Taskcluster secret service 128 assert secret_name is not None, "Missing Taskcluster secret secret_name" 129 all_secrets = secrets_service.get(secret_name).get("secret", dict()) 130 logger.info("Loaded Taskcluster secret {}".format(secret_name)) 131 132 if prefixes: 133 # Use secrets behind supported prefixes 134 for prefix in prefixes: 135 secrets.update(all_secrets.get(prefix, dict())) 136 137 else: 138 # Use all secrets available 139 secrets.update(all_secrets) 140 141 # Check required secrets 142 for required_secret in required: 143 if required_secret not in secrets: 144 raise Exception("Missing value {} in secrets.".format(required_secret)) 145 146 return secrets 147 148 149def upload_artifact(queue_service, artifact_path, content, content_type, ttl): 150 """ 151 DEPRECATED. Do not use. 152 """ 153 task_id = os.environ.get("TASK_ID") 154 run_id = os.environ.get("RUN_ID") 155 proxy = os.environ.get("TASKCLUSTER_PROXY_URL") 156 assert task_id and run_id and proxy, "Can only run in Taskcluster tasks with proxy" 157 assert isinstance(content, str) 158 assert isinstance(ttl, datetime.timedelta) 159 160 # Create S3 artifact on Taskcluster 161 resp = queue_service.createArtifact( 162 task_id, 163 run_id, 164 artifact_path, 165 { 166 "storageType": "s3", 167 "expires": stringDate(datetime.datetime.utcnow() + ttl), 168 "contentType": content_type, 169 }, 170 ) 171 assert resp["storageType"] == "s3", "Not an s3 storage" 172 assert "putUrl" in resp, "Missing putUrl" 173 assert "contentType" in resp, "Missing contentType" 174 175 # Push the artifact on storage service 176 headers = {"Content-Type": resp["contentType"]} 177 push = requests.put(url=resp["putUrl"], headers=headers, data=content) 178 push.raise_for_status() 179 180 # Build the absolute url 181 return "/api/queue/v1/task/{task_id}/runs/{run_id}/artifacts/{path}".format( 182 task_id=task_id, 183 run_id=run_id, 184 path=artifact_path, 185 ) 186