1# (C) 2013, James Cammarata <jcammarata@ansible.com> 2# Copyright: (c) 2019, Ansible Project 3# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 4 5from __future__ import (absolute_import, division, print_function) 6__metaclass__ = type 7 8import hashlib 9import json 10import os 11import tarfile 12import uuid 13import time 14 15from ansible import constants as C 16from ansible.errors import AnsibleError 17from ansible.galaxy.user_agent import user_agent 18from ansible.module_utils.api import retry_with_delays_and_condition 19from ansible.module_utils.api import generate_jittered_backoff 20from ansible.module_utils.six import string_types 21from ansible.module_utils.six.moves.urllib.error import HTTPError 22from ansible.module_utils.six.moves.urllib.parse import quote as urlquote, urlencode, urlparse, parse_qs, urljoin 23from ansible.module_utils._text import to_bytes, to_native, to_text 24from ansible.module_utils.urls import open_url, prepare_multipart 25from ansible.utils.display import Display 26from ansible.utils.hashing import secure_hash_s 27 28try: 29 from urllib.parse import urlparse 30except ImportError: 31 # Python 2 32 from urlparse import urlparse 33 34display = Display() 35COLLECTION_PAGE_SIZE = 100 36RETRY_HTTP_ERROR_CODES = [ # TODO: Allow user-configuration 37 429, # Too Many Requests 38 520, # Galaxy rate limit error code (Cloudflare unknown error) 39] 40 41 42def is_rate_limit_exception(exception): 43 # Note: cloud.redhat.com masks rate limit errors with 403 (Forbidden) error codes. 44 # Since 403 could reflect the actual problem (such as an expired token), we should 45 # not retry by default. 46 return isinstance(exception, GalaxyError) and exception.http_code in RETRY_HTTP_ERROR_CODES 47 48 49def g_connect(versions): 50 """ 51 Wrapper to lazily initialize connection info to Galaxy and verify the API versions required are available on the 52 endpoint. 53 54 :param versions: A list of API versions that the function supports. 55 """ 56 def decorator(method): 57 def wrapped(self, *args, **kwargs): 58 if not self._available_api_versions: 59 display.vvvv("Initial connection to galaxy_server: %s" % self.api_server) 60 61 # Determine the type of Galaxy server we are talking to. First try it unauthenticated then with Bearer 62 # auth for Automation Hub. 63 n_url = self.api_server 64 error_context_msg = 'Error when finding available api versions from %s (%s)' % (self.name, n_url) 65 66 if self.api_server == 'https://galaxy.ansible.com' or self.api_server == 'https://galaxy.ansible.com/': 67 n_url = 'https://galaxy.ansible.com/api/' 68 69 try: 70 data = self._call_galaxy(n_url, method='GET', error_context_msg=error_context_msg) 71 except (AnsibleError, GalaxyError, ValueError, KeyError) as err: 72 # Either the URL doesnt exist, or other error. Or the URL exists, but isn't a galaxy API 73 # root (not JSON, no 'available_versions') so try appending '/api/' 74 if n_url.endswith('/api') or n_url.endswith('/api/'): 75 raise 76 77 # Let exceptions here bubble up but raise the original if this returns a 404 (/api/ wasn't found). 78 n_url = _urljoin(n_url, '/api/') 79 try: 80 data = self._call_galaxy(n_url, method='GET', error_context_msg=error_context_msg) 81 except GalaxyError as new_err: 82 if new_err.http_code == 404: 83 raise err 84 raise 85 86 if 'available_versions' not in data: 87 raise AnsibleError("Tried to find galaxy API root at %s but no 'available_versions' are available " 88 "on %s" % (n_url, self.api_server)) 89 90 # Update api_server to point to the "real" API root, which in this case could have been the configured 91 # url + '/api/' appended. 92 self.api_server = n_url 93 94 # Default to only supporting v1, if only v1 is returned we also assume that v2 is available even though 95 # it isn't returned in the available_versions dict. 96 available_versions = data.get('available_versions', {u'v1': u'v1/'}) 97 if list(available_versions.keys()) == [u'v1']: 98 available_versions[u'v2'] = u'v2/' 99 100 self._available_api_versions = available_versions 101 display.vvvv("Found API version '%s' with Galaxy server %s (%s)" 102 % (', '.join(available_versions.keys()), self.name, self.api_server)) 103 104 # Verify that the API versions the function works with are available on the server specified. 105 available_versions = set(self._available_api_versions.keys()) 106 common_versions = set(versions).intersection(available_versions) 107 if not common_versions: 108 raise AnsibleError("Galaxy action %s requires API versions '%s' but only '%s' are available on %s %s" 109 % (method.__name__, ", ".join(versions), ", ".join(available_versions), 110 self.name, self.api_server)) 111 112 return method(self, *args, **kwargs) 113 return wrapped 114 return decorator 115 116 117def _urljoin(*args): 118 return '/'.join(to_native(a, errors='surrogate_or_strict').strip('/') for a in args + ('',) if a) 119 120 121class GalaxyError(AnsibleError): 122 """ Error for bad Galaxy server responses. """ 123 124 def __init__(self, http_error, message): 125 super(GalaxyError, self).__init__(message) 126 self.http_code = http_error.code 127 self.url = http_error.geturl() 128 129 try: 130 http_msg = to_text(http_error.read()) 131 err_info = json.loads(http_msg) 132 except (AttributeError, ValueError): 133 err_info = {} 134 135 url_split = self.url.split('/') 136 if 'v2' in url_split: 137 galaxy_msg = err_info.get('message', http_error.reason) 138 code = err_info.get('code', 'Unknown') 139 full_error_msg = u"%s (HTTP Code: %d, Message: %s Code: %s)" % (message, self.http_code, galaxy_msg, code) 140 elif 'v3' in url_split: 141 errors = err_info.get('errors', []) 142 if not errors: 143 errors = [{}] # Defaults are set below, we just need to make sure 1 error is present. 144 145 message_lines = [] 146 for error in errors: 147 error_msg = error.get('detail') or error.get('title') or http_error.reason 148 error_code = error.get('code') or 'Unknown' 149 message_line = u"(HTTP Code: %d, Message: %s Code: %s)" % (self.http_code, error_msg, error_code) 150 message_lines.append(message_line) 151 152 full_error_msg = "%s %s" % (message, ', '.join(message_lines)) 153 else: 154 # v1 and unknown API endpoints 155 galaxy_msg = err_info.get('default', http_error.reason) 156 full_error_msg = u"%s (HTTP Code: %d, Message: %s)" % (message, self.http_code, galaxy_msg) 157 158 self.message = to_native(full_error_msg) 159 160 161class CollectionVersionMetadata: 162 163 def __init__(self, namespace, name, version, download_url, artifact_sha256, dependencies): 164 """ 165 Contains common information about a collection on a Galaxy server to smooth through API differences for 166 Collection and define a standard meta info for a collection. 167 168 :param namespace: The namespace name. 169 :param name: The collection name. 170 :param version: The version that the metadata refers to. 171 :param download_url: The URL to download the collection. 172 :param artifact_sha256: The SHA256 of the collection artifact for later verification. 173 :param dependencies: A dict of dependencies of the collection. 174 """ 175 self.namespace = namespace 176 self.name = name 177 self.version = version 178 self.download_url = download_url 179 self.artifact_sha256 = artifact_sha256 180 self.dependencies = dependencies 181 182 183class GalaxyAPI: 184 """ This class is meant to be used as a API client for an Ansible Galaxy server """ 185 186 def __init__(self, galaxy, name, url, username=None, password=None, token=None, validate_certs=True): 187 self.galaxy = galaxy 188 self.name = name 189 self.username = username 190 self.password = password 191 self.token = token 192 self.api_server = url 193 self.validate_certs = validate_certs 194 self._available_api_versions = {} 195 196 display.debug('Validate TLS certificates for %s: %s' % (self.api_server, self.validate_certs)) 197 198 @property 199 @g_connect(['v1', 'v2', 'v3']) 200 def available_api_versions(self): 201 # Calling g_connect will populate self._available_api_versions 202 return self._available_api_versions 203 204 @retry_with_delays_and_condition( 205 backoff_iterator=generate_jittered_backoff(retries=6, delay_base=2, delay_threshold=40), 206 should_retry_error=is_rate_limit_exception 207 ) 208 def _call_galaxy(self, url, args=None, headers=None, method=None, auth_required=False, error_context_msg=None): 209 headers = headers or {} 210 self._add_auth_token(headers, url, required=auth_required) 211 212 try: 213 display.vvvv("Calling Galaxy at %s" % url) 214 resp = open_url(to_native(url), data=args, validate_certs=self.validate_certs, headers=headers, 215 method=method, timeout=20, http_agent=user_agent(), follow_redirects='safe') 216 except HTTPError as e: 217 raise GalaxyError(e, error_context_msg) 218 except Exception as e: 219 raise AnsibleError("Unknown error when attempting to call Galaxy at '%s': %s" % (url, to_native(e))) 220 221 resp_data = to_text(resp.read(), errors='surrogate_or_strict') 222 try: 223 data = json.loads(resp_data) 224 except ValueError: 225 raise AnsibleError("Failed to parse Galaxy response from '%s' as JSON:\n%s" 226 % (resp.url, to_native(resp_data))) 227 228 return data 229 230 def _add_auth_token(self, headers, url, token_type=None, required=False): 231 # Don't add the auth token if one is already present 232 if 'Authorization' in headers: 233 return 234 235 if not self.token and required: 236 raise AnsibleError("No access token or username set. A token can be set with --api-key " 237 "or at {0}.".format(to_native(C.GALAXY_TOKEN_PATH))) 238 239 if self.token: 240 headers.update(self.token.headers()) 241 242 @g_connect(['v1']) 243 def authenticate(self, github_token): 244 """ 245 Retrieve an authentication token 246 """ 247 url = _urljoin(self.api_server, self.available_api_versions['v1'], "tokens") + '/' 248 args = urlencode({"github_token": github_token}) 249 resp = open_url(url, data=args, validate_certs=self.validate_certs, method="POST", http_agent=user_agent()) 250 data = json.loads(to_text(resp.read(), errors='surrogate_or_strict')) 251 return data 252 253 @g_connect(['v1']) 254 def create_import_task(self, github_user, github_repo, reference=None, role_name=None): 255 """ 256 Post an import request 257 """ 258 url = _urljoin(self.api_server, self.available_api_versions['v1'], "imports") + '/' 259 args = { 260 "github_user": github_user, 261 "github_repo": github_repo, 262 "github_reference": reference if reference else "" 263 } 264 if role_name: 265 args['alternate_role_name'] = role_name 266 elif github_repo.startswith('ansible-role'): 267 args['alternate_role_name'] = github_repo[len('ansible-role') + 1:] 268 data = self._call_galaxy(url, args=urlencode(args), method="POST") 269 if data.get('results', None): 270 return data['results'] 271 return data 272 273 @g_connect(['v1']) 274 def get_import_task(self, task_id=None, github_user=None, github_repo=None): 275 """ 276 Check the status of an import task. 277 """ 278 url = _urljoin(self.api_server, self.available_api_versions['v1'], "imports") 279 if task_id is not None: 280 url = "%s?id=%d" % (url, task_id) 281 elif github_user is not None and github_repo is not None: 282 url = "%s?github_user=%s&github_repo=%s" % (url, github_user, github_repo) 283 else: 284 raise AnsibleError("Expected task_id or github_user and github_repo") 285 286 data = self._call_galaxy(url) 287 return data['results'] 288 289 @g_connect(['v1']) 290 def lookup_role_by_name(self, role_name, notify=True): 291 """ 292 Find a role by name. 293 """ 294 role_name = to_text(urlquote(to_bytes(role_name))) 295 296 try: 297 parts = role_name.split(".") 298 user_name = ".".join(parts[0:-1]) 299 role_name = parts[-1] 300 if notify: 301 display.display("- downloading role '%s', owned by %s" % (role_name, user_name)) 302 except Exception: 303 raise AnsibleError("Invalid role name (%s). Specify role as format: username.rolename" % role_name) 304 305 url = _urljoin(self.api_server, self.available_api_versions['v1'], "roles", 306 "?owner__username=%s&name=%s" % (user_name, role_name)) 307 data = self._call_galaxy(url) 308 if len(data["results"]) != 0: 309 return data["results"][0] 310 return None 311 312 @g_connect(['v1']) 313 def fetch_role_related(self, related, role_id): 314 """ 315 Fetch the list of related items for the given role. 316 The url comes from the 'related' field of the role. 317 """ 318 319 results = [] 320 try: 321 url = _urljoin(self.api_server, self.available_api_versions['v1'], "roles", role_id, related, 322 "?page_size=50") 323 data = self._call_galaxy(url) 324 results = data['results'] 325 done = (data.get('next_link', None) is None) 326 327 # https://github.com/ansible/ansible/issues/64355 328 # api_server contains part of the API path but next_link includes the /api part so strip it out. 329 url_info = urlparse(self.api_server) 330 base_url = "%s://%s/" % (url_info.scheme, url_info.netloc) 331 332 while not done: 333 url = _urljoin(base_url, data['next_link']) 334 data = self._call_galaxy(url) 335 results += data['results'] 336 done = (data.get('next_link', None) is None) 337 except Exception as e: 338 display.warning("Unable to retrieve role (id=%s) data (%s), but this is not fatal so we continue: %s" 339 % (role_id, related, to_text(e))) 340 return results 341 342 @g_connect(['v1']) 343 def get_list(self, what): 344 """ 345 Fetch the list of items specified. 346 """ 347 try: 348 url = _urljoin(self.api_server, self.available_api_versions['v1'], what, "?page_size") 349 data = self._call_galaxy(url) 350 if "results" in data: 351 results = data['results'] 352 else: 353 results = data 354 done = True 355 if "next" in data: 356 done = (data.get('next_link', None) is None) 357 while not done: 358 url = _urljoin(self.api_server, data['next_link']) 359 data = self._call_galaxy(url) 360 results += data['results'] 361 done = (data.get('next_link', None) is None) 362 return results 363 except Exception as error: 364 raise AnsibleError("Failed to download the %s list: %s" % (what, to_native(error))) 365 366 @g_connect(['v1']) 367 def search_roles(self, search, **kwargs): 368 369 search_url = _urljoin(self.api_server, self.available_api_versions['v1'], "search", "roles", "?") 370 371 if search: 372 search_url += '&autocomplete=' + to_text(urlquote(to_bytes(search))) 373 374 tags = kwargs.get('tags', None) 375 platforms = kwargs.get('platforms', None) 376 page_size = kwargs.get('page_size', None) 377 author = kwargs.get('author', None) 378 379 if tags and isinstance(tags, string_types): 380 tags = tags.split(',') 381 search_url += '&tags_autocomplete=' + '+'.join(tags) 382 383 if platforms and isinstance(platforms, string_types): 384 platforms = platforms.split(',') 385 search_url += '&platforms_autocomplete=' + '+'.join(platforms) 386 387 if page_size: 388 search_url += '&page_size=%s' % page_size 389 390 if author: 391 search_url += '&username_autocomplete=%s' % author 392 393 data = self._call_galaxy(search_url) 394 return data 395 396 @g_connect(['v1']) 397 def add_secret(self, source, github_user, github_repo, secret): 398 url = _urljoin(self.api_server, self.available_api_versions['v1'], "notification_secrets") + '/' 399 args = urlencode({ 400 "source": source, 401 "github_user": github_user, 402 "github_repo": github_repo, 403 "secret": secret 404 }) 405 data = self._call_galaxy(url, args=args, method="POST") 406 return data 407 408 @g_connect(['v1']) 409 def list_secrets(self): 410 url = _urljoin(self.api_server, self.available_api_versions['v1'], "notification_secrets") 411 data = self._call_galaxy(url, auth_required=True) 412 return data 413 414 @g_connect(['v1']) 415 def remove_secret(self, secret_id): 416 url = _urljoin(self.api_server, self.available_api_versions['v1'], "notification_secrets", secret_id) + '/' 417 data = self._call_galaxy(url, auth_required=True, method='DELETE') 418 return data 419 420 @g_connect(['v1']) 421 def delete_role(self, github_user, github_repo): 422 url = _urljoin(self.api_server, self.available_api_versions['v1'], "removerole", 423 "?github_user=%s&github_repo=%s" % (github_user, github_repo)) 424 data = self._call_galaxy(url, auth_required=True, method='DELETE') 425 return data 426 427 # Collection APIs # 428 429 @g_connect(['v2', 'v3']) 430 def publish_collection(self, collection_path): 431 """ 432 Publishes a collection to a Galaxy server and returns the import task URI. 433 434 :param collection_path: The path to the collection tarball to publish. 435 :return: The import task URI that contains the import results. 436 """ 437 display.display("Publishing collection artifact '%s' to %s %s" % (collection_path, self.name, self.api_server)) 438 439 b_collection_path = to_bytes(collection_path, errors='surrogate_or_strict') 440 if not os.path.exists(b_collection_path): 441 raise AnsibleError("The collection path specified '%s' does not exist." % to_native(collection_path)) 442 elif not tarfile.is_tarfile(b_collection_path): 443 raise AnsibleError("The collection path specified '%s' is not a tarball, use 'ansible-galaxy collection " 444 "build' to create a proper release artifact." % to_native(collection_path)) 445 446 with open(b_collection_path, 'rb') as collection_tar: 447 sha256 = secure_hash_s(collection_tar.read(), hash_func=hashlib.sha256) 448 449 content_type, b_form_data = prepare_multipart( 450 { 451 'sha256': sha256, 452 'file': { 453 'filename': b_collection_path, 454 'mime_type': 'application/octet-stream', 455 }, 456 } 457 ) 458 459 headers = { 460 'Content-type': content_type, 461 'Content-length': len(b_form_data), 462 } 463 464 if 'v3' in self.available_api_versions: 465 n_url = _urljoin(self.api_server, self.available_api_versions['v3'], 'artifacts', 'collections') + '/' 466 else: 467 n_url = _urljoin(self.api_server, self.available_api_versions['v2'], 'collections') + '/' 468 469 resp = self._call_galaxy(n_url, args=b_form_data, headers=headers, method='POST', auth_required=True, 470 error_context_msg='Error when publishing collection to %s (%s)' 471 % (self.name, self.api_server)) 472 473 return resp['task'] 474 475 @g_connect(['v2', 'v3']) 476 def wait_import_task(self, task_id, timeout=0): 477 """ 478 Waits until the import process on the Galaxy server has completed or the timeout is reached. 479 480 :param task_id: The id of the import task to wait for. This can be parsed out of the return 481 value for GalaxyAPI.publish_collection. 482 :param timeout: The timeout in seconds, 0 is no timeout. 483 """ 484 state = 'waiting' 485 data = None 486 487 # Construct the appropriate URL per version 488 if 'v3' in self.available_api_versions: 489 full_url = _urljoin(self.api_server, self.available_api_versions['v3'], 490 'imports/collections', task_id, '/') 491 else: 492 full_url = _urljoin(self.api_server, self.available_api_versions['v2'], 493 'collection-imports', task_id, '/') 494 495 display.display("Waiting until Galaxy import task %s has completed" % full_url) 496 start = time.time() 497 wait = 2 498 499 while timeout == 0 or (time.time() - start) < timeout: 500 data = self._call_galaxy(full_url, method='GET', auth_required=True, 501 error_context_msg='Error when getting import task results at %s' % full_url) 502 503 state = data.get('state', 'waiting') 504 505 if data.get('finished_at', None): 506 break 507 508 display.vvv('Galaxy import process has a status of %s, wait %d seconds before trying again' 509 % (state, wait)) 510 time.sleep(wait) 511 512 # poor man's exponential backoff algo so we don't flood the Galaxy API, cap at 30 seconds. 513 wait = min(30, wait * 1.5) 514 if state == 'waiting': 515 raise AnsibleError("Timeout while waiting for the Galaxy import process to finish, check progress at '%s'" 516 % to_native(full_url)) 517 518 for message in data.get('messages', []): 519 level = message['level'] 520 if level == 'error': 521 display.error("Galaxy import error message: %s" % message['message']) 522 elif level == 'warning': 523 display.warning("Galaxy import warning message: %s" % message['message']) 524 else: 525 display.vvv("Galaxy import message: %s - %s" % (level, message['message'])) 526 527 if state == 'failed': 528 code = to_native(data['error'].get('code', 'UNKNOWN')) 529 description = to_native( 530 data['error'].get('description', "Unknown error, see %s for more details" % full_url)) 531 raise AnsibleError("Galaxy import process failed: %s (Code: %s)" % (description, code)) 532 533 @g_connect(['v2', 'v3']) 534 def get_collection_version_metadata(self, namespace, name, version): 535 """ 536 Gets the collection information from the Galaxy server about a specific Collection version. 537 538 :param namespace: The collection namespace. 539 :param name: The collection name. 540 :param version: Version of the collection to get the information for. 541 :return: CollectionVersionMetadata about the collection at the version requested. 542 """ 543 api_path = self.available_api_versions.get('v3', self.available_api_versions.get('v2')) 544 url_paths = [self.api_server, api_path, 'collections', namespace, name, 'versions', version, '/'] 545 546 n_collection_url = _urljoin(*url_paths) 547 error_context_msg = 'Error when getting collection version metadata for %s.%s:%s from %s (%s)' \ 548 % (namespace, name, version, self.name, self.api_server) 549 data = self._call_galaxy(n_collection_url, error_context_msg=error_context_msg) 550 551 return CollectionVersionMetadata(data['namespace']['name'], data['collection']['name'], data['version'], 552 data['download_url'], data['artifact']['sha256'], 553 data['metadata']['dependencies']) 554 555 @g_connect(['v2', 'v3']) 556 def get_collection_versions(self, namespace, name): 557 """ 558 Gets a list of available versions for a collection on a Galaxy server. 559 560 :param namespace: The collection namespace. 561 :param name: The collection name. 562 :return: A list of versions that are available. 563 """ 564 relative_link = False 565 if 'v3' in self.available_api_versions: 566 api_path = self.available_api_versions['v3'] 567 results_key = 'data' 568 pagination_path = ['links', 'next'] 569 relative_link = True # AH pagination results are relative an not an absolute URI. 570 else: 571 api_path = self.available_api_versions['v2'] 572 results_key = 'results' 573 pagination_path = ['next'] 574 575 page_size_name = 'limit' if 'v3' in self.available_api_versions else 'page_size' 576 n_url = _urljoin(self.api_server, api_path, 'collections', namespace, name, 'versions', '/?%s=%d' % (page_size_name, COLLECTION_PAGE_SIZE)) 577 n_url_info = urlparse(n_url) 578 579 error_context_msg = 'Error when getting available collection versions for %s.%s from %s (%s)' \ 580 % (namespace, name, self.name, self.api_server) 581 data = self._call_galaxy(n_url, error_context_msg=error_context_msg) 582 583 versions = [] 584 while True: 585 versions += [v['version'] for v in data[results_key]] 586 587 next_link = data 588 for path in pagination_path: 589 next_link = next_link.get(path, {}) 590 591 if not next_link: 592 break 593 elif relative_link: 594 # TODO: This assumes the pagination result is relative to the root server. Will need to be verified 595 # with someone who knows the AH API. 596 597 # Remove the query string from the versions_url to use the next_link's query 598 n_url = urljoin(n_url, urlparse(n_url).path) 599 next_link = n_url.replace(n_url_info.path, next_link) 600 601 data = self._call_galaxy(to_native(next_link, errors='surrogate_or_strict'), 602 error_context_msg=error_context_msg) 603 604 return versions 605