1# (C) 2013, James Cammarata <jcammarata@ansible.com>
2# Copyright: (c) 2019, Ansible Project
3# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
4
5from __future__ import (absolute_import, division, print_function)
6__metaclass__ = type
7
8import hashlib
9import json
10import os
11import tarfile
12import uuid
13import time
14
15from ansible import constants as C
16from ansible.errors import AnsibleError
17from ansible.galaxy.user_agent import user_agent
18from ansible.module_utils.api import retry_with_delays_and_condition
19from ansible.module_utils.api import generate_jittered_backoff
20from ansible.module_utils.six import string_types
21from ansible.module_utils.six.moves.urllib.error import HTTPError
22from ansible.module_utils.six.moves.urllib.parse import quote as urlquote, urlencode, urlparse, parse_qs, urljoin
23from ansible.module_utils._text import to_bytes, to_native, to_text
24from ansible.module_utils.urls import open_url, prepare_multipart
25from ansible.utils.display import Display
26from ansible.utils.hashing import secure_hash_s
27
28try:
29    from urllib.parse import urlparse
30except ImportError:
31    # Python 2
32    from urlparse import urlparse
33
34display = Display()
35COLLECTION_PAGE_SIZE = 100
36RETRY_HTTP_ERROR_CODES = [  # TODO: Allow user-configuration
37    429,  # Too Many Requests
38    520,  # Galaxy rate limit error code (Cloudflare unknown error)
39]
40
41
42def is_rate_limit_exception(exception):
43    # Note: cloud.redhat.com masks rate limit errors with 403 (Forbidden) error codes.
44    # Since 403 could reflect the actual problem (such as an expired token), we should
45    # not retry by default.
46    return isinstance(exception, GalaxyError) and exception.http_code in RETRY_HTTP_ERROR_CODES
47
48
49def g_connect(versions):
50    """
51    Wrapper to lazily initialize connection info to Galaxy and verify the API versions required are available on the
52    endpoint.
53
54    :param versions: A list of API versions that the function supports.
55    """
56    def decorator(method):
57        def wrapped(self, *args, **kwargs):
58            if not self._available_api_versions:
59                display.vvvv("Initial connection to galaxy_server: %s" % self.api_server)
60
61                # Determine the type of Galaxy server we are talking to. First try it unauthenticated then with Bearer
62                # auth for Automation Hub.
63                n_url = self.api_server
64                error_context_msg = 'Error when finding available api versions from %s (%s)' % (self.name, n_url)
65
66                if self.api_server == 'https://galaxy.ansible.com' or self.api_server == 'https://galaxy.ansible.com/':
67                    n_url = 'https://galaxy.ansible.com/api/'
68
69                try:
70                    data = self._call_galaxy(n_url, method='GET', error_context_msg=error_context_msg)
71                except (AnsibleError, GalaxyError, ValueError, KeyError) as err:
72                    # Either the URL doesnt exist, or other error. Or the URL exists, but isn't a galaxy API
73                    # root (not JSON, no 'available_versions') so try appending '/api/'
74                    if n_url.endswith('/api') or n_url.endswith('/api/'):
75                        raise
76
77                    # Let exceptions here bubble up but raise the original if this returns a 404 (/api/ wasn't found).
78                    n_url = _urljoin(n_url, '/api/')
79                    try:
80                        data = self._call_galaxy(n_url, method='GET', error_context_msg=error_context_msg)
81                    except GalaxyError as new_err:
82                        if new_err.http_code == 404:
83                            raise err
84                        raise
85
86                if 'available_versions' not in data:
87                    raise AnsibleError("Tried to find galaxy API root at %s but no 'available_versions' are available "
88                                       "on %s" % (n_url, self.api_server))
89
90                # Update api_server to point to the "real" API root, which in this case could have been the configured
91                # url + '/api/' appended.
92                self.api_server = n_url
93
94                # Default to only supporting v1, if only v1 is returned we also assume that v2 is available even though
95                # it isn't returned in the available_versions dict.
96                available_versions = data.get('available_versions', {u'v1': u'v1/'})
97                if list(available_versions.keys()) == [u'v1']:
98                    available_versions[u'v2'] = u'v2/'
99
100                self._available_api_versions = available_versions
101                display.vvvv("Found API version '%s' with Galaxy server %s (%s)"
102                             % (', '.join(available_versions.keys()), self.name, self.api_server))
103
104            # Verify that the API versions the function works with are available on the server specified.
105            available_versions = set(self._available_api_versions.keys())
106            common_versions = set(versions).intersection(available_versions)
107            if not common_versions:
108                raise AnsibleError("Galaxy action %s requires API versions '%s' but only '%s' are available on %s %s"
109                                   % (method.__name__, ", ".join(versions), ", ".join(available_versions),
110                                      self.name, self.api_server))
111
112            return method(self, *args, **kwargs)
113        return wrapped
114    return decorator
115
116
117def _urljoin(*args):
118    return '/'.join(to_native(a, errors='surrogate_or_strict').strip('/') for a in args + ('',) if a)
119
120
121class GalaxyError(AnsibleError):
122    """ Error for bad Galaxy server responses. """
123
124    def __init__(self, http_error, message):
125        super(GalaxyError, self).__init__(message)
126        self.http_code = http_error.code
127        self.url = http_error.geturl()
128
129        try:
130            http_msg = to_text(http_error.read())
131            err_info = json.loads(http_msg)
132        except (AttributeError, ValueError):
133            err_info = {}
134
135        url_split = self.url.split('/')
136        if 'v2' in url_split:
137            galaxy_msg = err_info.get('message', http_error.reason)
138            code = err_info.get('code', 'Unknown')
139            full_error_msg = u"%s (HTTP Code: %d, Message: %s Code: %s)" % (message, self.http_code, galaxy_msg, code)
140        elif 'v3' in url_split:
141            errors = err_info.get('errors', [])
142            if not errors:
143                errors = [{}]  # Defaults are set below, we just need to make sure 1 error is present.
144
145            message_lines = []
146            for error in errors:
147                error_msg = error.get('detail') or error.get('title') or http_error.reason
148                error_code = error.get('code') or 'Unknown'
149                message_line = u"(HTTP Code: %d, Message: %s Code: %s)" % (self.http_code, error_msg, error_code)
150                message_lines.append(message_line)
151
152            full_error_msg = "%s %s" % (message, ', '.join(message_lines))
153        else:
154            # v1 and unknown API endpoints
155            galaxy_msg = err_info.get('default', http_error.reason)
156            full_error_msg = u"%s (HTTP Code: %d, Message: %s)" % (message, self.http_code, galaxy_msg)
157
158        self.message = to_native(full_error_msg)
159
160
161class CollectionVersionMetadata:
162
163    def __init__(self, namespace, name, version, download_url, artifact_sha256, dependencies):
164        """
165        Contains common information about a collection on a Galaxy server to smooth through API differences for
166        Collection and define a standard meta info for a collection.
167
168        :param namespace: The namespace name.
169        :param name: The collection name.
170        :param version: The version that the metadata refers to.
171        :param download_url: The URL to download the collection.
172        :param artifact_sha256: The SHA256 of the collection artifact for later verification.
173        :param dependencies: A dict of dependencies of the collection.
174        """
175        self.namespace = namespace
176        self.name = name
177        self.version = version
178        self.download_url = download_url
179        self.artifact_sha256 = artifact_sha256
180        self.dependencies = dependencies
181
182
183class GalaxyAPI:
184    """ This class is meant to be used as a API client for an Ansible Galaxy server """
185
186    def __init__(self, galaxy, name, url, username=None, password=None, token=None, validate_certs=True):
187        self.galaxy = galaxy
188        self.name = name
189        self.username = username
190        self.password = password
191        self.token = token
192        self.api_server = url
193        self.validate_certs = validate_certs
194        self._available_api_versions = {}
195
196        display.debug('Validate TLS certificates for %s: %s' % (self.api_server, self.validate_certs))
197
198    @property
199    @g_connect(['v1', 'v2', 'v3'])
200    def available_api_versions(self):
201        # Calling g_connect will populate self._available_api_versions
202        return self._available_api_versions
203
204    @retry_with_delays_and_condition(
205        backoff_iterator=generate_jittered_backoff(retries=6, delay_base=2, delay_threshold=40),
206        should_retry_error=is_rate_limit_exception
207    )
208    def _call_galaxy(self, url, args=None, headers=None, method=None, auth_required=False, error_context_msg=None):
209        headers = headers or {}
210        self._add_auth_token(headers, url, required=auth_required)
211
212        try:
213            display.vvvv("Calling Galaxy at %s" % url)
214            resp = open_url(to_native(url), data=args, validate_certs=self.validate_certs, headers=headers,
215                            method=method, timeout=20, http_agent=user_agent(), follow_redirects='safe')
216        except HTTPError as e:
217            raise GalaxyError(e, error_context_msg)
218        except Exception as e:
219            raise AnsibleError("Unknown error when attempting to call Galaxy at '%s': %s" % (url, to_native(e)))
220
221        resp_data = to_text(resp.read(), errors='surrogate_or_strict')
222        try:
223            data = json.loads(resp_data)
224        except ValueError:
225            raise AnsibleError("Failed to parse Galaxy response from '%s' as JSON:\n%s"
226                               % (resp.url, to_native(resp_data)))
227
228        return data
229
230    def _add_auth_token(self, headers, url, token_type=None, required=False):
231        # Don't add the auth token if one is already present
232        if 'Authorization' in headers:
233            return
234
235        if not self.token and required:
236            raise AnsibleError("No access token or username set. A token can be set with --api-key "
237                               "or at {0}.".format(to_native(C.GALAXY_TOKEN_PATH)))
238
239        if self.token:
240            headers.update(self.token.headers())
241
242    @g_connect(['v1'])
243    def authenticate(self, github_token):
244        """
245        Retrieve an authentication token
246        """
247        url = _urljoin(self.api_server, self.available_api_versions['v1'], "tokens") + '/'
248        args = urlencode({"github_token": github_token})
249        resp = open_url(url, data=args, validate_certs=self.validate_certs, method="POST", http_agent=user_agent())
250        data = json.loads(to_text(resp.read(), errors='surrogate_or_strict'))
251        return data
252
253    @g_connect(['v1'])
254    def create_import_task(self, github_user, github_repo, reference=None, role_name=None):
255        """
256        Post an import request
257        """
258        url = _urljoin(self.api_server, self.available_api_versions['v1'], "imports") + '/'
259        args = {
260            "github_user": github_user,
261            "github_repo": github_repo,
262            "github_reference": reference if reference else ""
263        }
264        if role_name:
265            args['alternate_role_name'] = role_name
266        elif github_repo.startswith('ansible-role'):
267            args['alternate_role_name'] = github_repo[len('ansible-role') + 1:]
268        data = self._call_galaxy(url, args=urlencode(args), method="POST")
269        if data.get('results', None):
270            return data['results']
271        return data
272
273    @g_connect(['v1'])
274    def get_import_task(self, task_id=None, github_user=None, github_repo=None):
275        """
276        Check the status of an import task.
277        """
278        url = _urljoin(self.api_server, self.available_api_versions['v1'], "imports")
279        if task_id is not None:
280            url = "%s?id=%d" % (url, task_id)
281        elif github_user is not None and github_repo is not None:
282            url = "%s?github_user=%s&github_repo=%s" % (url, github_user, github_repo)
283        else:
284            raise AnsibleError("Expected task_id or github_user and github_repo")
285
286        data = self._call_galaxy(url)
287        return data['results']
288
289    @g_connect(['v1'])
290    def lookup_role_by_name(self, role_name, notify=True):
291        """
292        Find a role by name.
293        """
294        role_name = to_text(urlquote(to_bytes(role_name)))
295
296        try:
297            parts = role_name.split(".")
298            user_name = ".".join(parts[0:-1])
299            role_name = parts[-1]
300            if notify:
301                display.display("- downloading role '%s', owned by %s" % (role_name, user_name))
302        except Exception:
303            raise AnsibleError("Invalid role name (%s). Specify role as format: username.rolename" % role_name)
304
305        url = _urljoin(self.api_server, self.available_api_versions['v1'], "roles",
306                       "?owner__username=%s&name=%s" % (user_name, role_name))
307        data = self._call_galaxy(url)
308        if len(data["results"]) != 0:
309            return data["results"][0]
310        return None
311
312    @g_connect(['v1'])
313    def fetch_role_related(self, related, role_id):
314        """
315        Fetch the list of related items for the given role.
316        The url comes from the 'related' field of the role.
317        """
318
319        results = []
320        try:
321            url = _urljoin(self.api_server, self.available_api_versions['v1'], "roles", role_id, related,
322                           "?page_size=50")
323            data = self._call_galaxy(url)
324            results = data['results']
325            done = (data.get('next_link', None) is None)
326
327            # https://github.com/ansible/ansible/issues/64355
328            # api_server contains part of the API path but next_link includes the /api part so strip it out.
329            url_info = urlparse(self.api_server)
330            base_url = "%s://%s/" % (url_info.scheme, url_info.netloc)
331
332            while not done:
333                url = _urljoin(base_url, data['next_link'])
334                data = self._call_galaxy(url)
335                results += data['results']
336                done = (data.get('next_link', None) is None)
337        except Exception as e:
338            display.warning("Unable to retrieve role (id=%s) data (%s), but this is not fatal so we continue: %s"
339                            % (role_id, related, to_text(e)))
340        return results
341
342    @g_connect(['v1'])
343    def get_list(self, what):
344        """
345        Fetch the list of items specified.
346        """
347        try:
348            url = _urljoin(self.api_server, self.available_api_versions['v1'], what, "?page_size")
349            data = self._call_galaxy(url)
350            if "results" in data:
351                results = data['results']
352            else:
353                results = data
354            done = True
355            if "next" in data:
356                done = (data.get('next_link', None) is None)
357            while not done:
358                url = _urljoin(self.api_server, data['next_link'])
359                data = self._call_galaxy(url)
360                results += data['results']
361                done = (data.get('next_link', None) is None)
362            return results
363        except Exception as error:
364            raise AnsibleError("Failed to download the %s list: %s" % (what, to_native(error)))
365
366    @g_connect(['v1'])
367    def search_roles(self, search, **kwargs):
368
369        search_url = _urljoin(self.api_server, self.available_api_versions['v1'], "search", "roles", "?")
370
371        if search:
372            search_url += '&autocomplete=' + to_text(urlquote(to_bytes(search)))
373
374        tags = kwargs.get('tags', None)
375        platforms = kwargs.get('platforms', None)
376        page_size = kwargs.get('page_size', None)
377        author = kwargs.get('author', None)
378
379        if tags and isinstance(tags, string_types):
380            tags = tags.split(',')
381            search_url += '&tags_autocomplete=' + '+'.join(tags)
382
383        if platforms and isinstance(platforms, string_types):
384            platforms = platforms.split(',')
385            search_url += '&platforms_autocomplete=' + '+'.join(platforms)
386
387        if page_size:
388            search_url += '&page_size=%s' % page_size
389
390        if author:
391            search_url += '&username_autocomplete=%s' % author
392
393        data = self._call_galaxy(search_url)
394        return data
395
396    @g_connect(['v1'])
397    def add_secret(self, source, github_user, github_repo, secret):
398        url = _urljoin(self.api_server, self.available_api_versions['v1'], "notification_secrets") + '/'
399        args = urlencode({
400            "source": source,
401            "github_user": github_user,
402            "github_repo": github_repo,
403            "secret": secret
404        })
405        data = self._call_galaxy(url, args=args, method="POST")
406        return data
407
408    @g_connect(['v1'])
409    def list_secrets(self):
410        url = _urljoin(self.api_server, self.available_api_versions['v1'], "notification_secrets")
411        data = self._call_galaxy(url, auth_required=True)
412        return data
413
414    @g_connect(['v1'])
415    def remove_secret(self, secret_id):
416        url = _urljoin(self.api_server, self.available_api_versions['v1'], "notification_secrets", secret_id) + '/'
417        data = self._call_galaxy(url, auth_required=True, method='DELETE')
418        return data
419
420    @g_connect(['v1'])
421    def delete_role(self, github_user, github_repo):
422        url = _urljoin(self.api_server, self.available_api_versions['v1'], "removerole",
423                       "?github_user=%s&github_repo=%s" % (github_user, github_repo))
424        data = self._call_galaxy(url, auth_required=True, method='DELETE')
425        return data
426
427    # Collection APIs #
428
429    @g_connect(['v2', 'v3'])
430    def publish_collection(self, collection_path):
431        """
432        Publishes a collection to a Galaxy server and returns the import task URI.
433
434        :param collection_path: The path to the collection tarball to publish.
435        :return: The import task URI that contains the import results.
436        """
437        display.display("Publishing collection artifact '%s' to %s %s" % (collection_path, self.name, self.api_server))
438
439        b_collection_path = to_bytes(collection_path, errors='surrogate_or_strict')
440        if not os.path.exists(b_collection_path):
441            raise AnsibleError("The collection path specified '%s' does not exist." % to_native(collection_path))
442        elif not tarfile.is_tarfile(b_collection_path):
443            raise AnsibleError("The collection path specified '%s' is not a tarball, use 'ansible-galaxy collection "
444                               "build' to create a proper release artifact." % to_native(collection_path))
445
446        with open(b_collection_path, 'rb') as collection_tar:
447            sha256 = secure_hash_s(collection_tar.read(), hash_func=hashlib.sha256)
448
449        content_type, b_form_data = prepare_multipart(
450            {
451                'sha256': sha256,
452                'file': {
453                    'filename': b_collection_path,
454                    'mime_type': 'application/octet-stream',
455                },
456            }
457        )
458
459        headers = {
460            'Content-type': content_type,
461            'Content-length': len(b_form_data),
462        }
463
464        if 'v3' in self.available_api_versions:
465            n_url = _urljoin(self.api_server, self.available_api_versions['v3'], 'artifacts', 'collections') + '/'
466        else:
467            n_url = _urljoin(self.api_server, self.available_api_versions['v2'], 'collections') + '/'
468
469        resp = self._call_galaxy(n_url, args=b_form_data, headers=headers, method='POST', auth_required=True,
470                                 error_context_msg='Error when publishing collection to %s (%s)'
471                                                   % (self.name, self.api_server))
472
473        return resp['task']
474
475    @g_connect(['v2', 'v3'])
476    def wait_import_task(self, task_id, timeout=0):
477        """
478        Waits until the import process on the Galaxy server has completed or the timeout is reached.
479
480        :param task_id: The id of the import task to wait for. This can be parsed out of the return
481            value for GalaxyAPI.publish_collection.
482        :param timeout: The timeout in seconds, 0 is no timeout.
483        """
484        state = 'waiting'
485        data = None
486
487        # Construct the appropriate URL per version
488        if 'v3' in self.available_api_versions:
489            full_url = _urljoin(self.api_server, self.available_api_versions['v3'],
490                                'imports/collections', task_id, '/')
491        else:
492            full_url = _urljoin(self.api_server, self.available_api_versions['v2'],
493                                'collection-imports', task_id, '/')
494
495        display.display("Waiting until Galaxy import task %s has completed" % full_url)
496        start = time.time()
497        wait = 2
498
499        while timeout == 0 or (time.time() - start) < timeout:
500            data = self._call_galaxy(full_url, method='GET', auth_required=True,
501                                     error_context_msg='Error when getting import task results at %s' % full_url)
502
503            state = data.get('state', 'waiting')
504
505            if data.get('finished_at', None):
506                break
507
508            display.vvv('Galaxy import process has a status of %s, wait %d seconds before trying again'
509                        % (state, wait))
510            time.sleep(wait)
511
512            # poor man's exponential backoff algo so we don't flood the Galaxy API, cap at 30 seconds.
513            wait = min(30, wait * 1.5)
514        if state == 'waiting':
515            raise AnsibleError("Timeout while waiting for the Galaxy import process to finish, check progress at '%s'"
516                               % to_native(full_url))
517
518        for message in data.get('messages', []):
519            level = message['level']
520            if level == 'error':
521                display.error("Galaxy import error message: %s" % message['message'])
522            elif level == 'warning':
523                display.warning("Galaxy import warning message: %s" % message['message'])
524            else:
525                display.vvv("Galaxy import message: %s - %s" % (level, message['message']))
526
527        if state == 'failed':
528            code = to_native(data['error'].get('code', 'UNKNOWN'))
529            description = to_native(
530                data['error'].get('description', "Unknown error, see %s for more details" % full_url))
531            raise AnsibleError("Galaxy import process failed: %s (Code: %s)" % (description, code))
532
533    @g_connect(['v2', 'v3'])
534    def get_collection_version_metadata(self, namespace, name, version):
535        """
536        Gets the collection information from the Galaxy server about a specific Collection version.
537
538        :param namespace: The collection namespace.
539        :param name: The collection name.
540        :param version: Version of the collection to get the information for.
541        :return: CollectionVersionMetadata about the collection at the version requested.
542        """
543        api_path = self.available_api_versions.get('v3', self.available_api_versions.get('v2'))
544        url_paths = [self.api_server, api_path, 'collections', namespace, name, 'versions', version, '/']
545
546        n_collection_url = _urljoin(*url_paths)
547        error_context_msg = 'Error when getting collection version metadata for %s.%s:%s from %s (%s)' \
548                            % (namespace, name, version, self.name, self.api_server)
549        data = self._call_galaxy(n_collection_url, error_context_msg=error_context_msg)
550
551        return CollectionVersionMetadata(data['namespace']['name'], data['collection']['name'], data['version'],
552                                         data['download_url'], data['artifact']['sha256'],
553                                         data['metadata']['dependencies'])
554
555    @g_connect(['v2', 'v3'])
556    def get_collection_versions(self, namespace, name):
557        """
558        Gets a list of available versions for a collection on a Galaxy server.
559
560        :param namespace: The collection namespace.
561        :param name: The collection name.
562        :return: A list of versions that are available.
563        """
564        relative_link = False
565        if 'v3' in self.available_api_versions:
566            api_path = self.available_api_versions['v3']
567            results_key = 'data'
568            pagination_path = ['links', 'next']
569            relative_link = True  # AH pagination results are relative an not an absolute URI.
570        else:
571            api_path = self.available_api_versions['v2']
572            results_key = 'results'
573            pagination_path = ['next']
574
575        page_size_name = 'limit' if 'v3' in self.available_api_versions else 'page_size'
576        n_url = _urljoin(self.api_server, api_path, 'collections', namespace, name, 'versions', '/?%s=%d' % (page_size_name, COLLECTION_PAGE_SIZE))
577        n_url_info = urlparse(n_url)
578
579        error_context_msg = 'Error when getting available collection versions for %s.%s from %s (%s)' \
580                            % (namespace, name, self.name, self.api_server)
581        data = self._call_galaxy(n_url, error_context_msg=error_context_msg)
582
583        versions = []
584        while True:
585            versions += [v['version'] for v in data[results_key]]
586
587            next_link = data
588            for path in pagination_path:
589                next_link = next_link.get(path, {})
590
591            if not next_link:
592                break
593            elif relative_link:
594                # TODO: This assumes the pagination result is relative to the root server. Will need to be verified
595                # with someone who knows the AH API.
596
597                # Remove the query string from the versions_url to use the next_link's query
598                n_url = urljoin(n_url, urlparse(n_url).path)
599                next_link = n_url.replace(n_url_info.path, next_link)
600
601            data = self._call_galaxy(to_native(next_link, errors='surrogate_or_strict'),
602                                     error_context_msg=error_context_msg)
603
604        return versions
605