1# (c) 2019 Ansible Project
2# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
3
4from __future__ import (absolute_import, division, print_function)
5__metaclass__ = type
6
7import os
8
9from collections import defaultdict
10
11from ansible.errors import AnsibleError
12from ansible.collections import is_collection_path
13from ansible.module_utils._text import to_bytes
14from ansible.utils.collection_loader import AnsibleCollectionConfig
15from ansible.utils.display import Display
16
17display = Display()
18
19
20def list_valid_collection_paths(search_paths=None, warn=False):
21    """
22    Filter out non existing or invalid search_paths for collections
23    :param search_paths: list of text-string paths, if none load default config
24    :param warn: display warning if search_path does not exist
25    :return: subset of original list
26    """
27
28    if search_paths is None:
29        search_paths = []
30
31    search_paths.extend(AnsibleCollectionConfig.collection_paths)
32
33    for path in search_paths:
34
35        b_path = to_bytes(path)
36        if not os.path.exists(b_path):
37            # warn for missing, but not if default
38            if warn:
39                display.warning("The configured collection path {0} does not exist.".format(path))
40            continue
41
42        if not os.path.isdir(b_path):
43            if warn:
44                display.warning("The configured collection path {0}, exists, but it is not a directory.".format(path))
45            continue
46
47        yield path
48
49
50def list_collection_dirs(search_paths=None, coll_filter=None):
51    """
52    Return paths for the specific collections found in passed or configured search paths
53    :param search_paths: list of text-string paths, if none load default config
54    :param coll_filter: limit collections to just the specific namespace or collection, if None all are returned
55    :return: list of collection directory paths
56    """
57
58    collection = None
59    namespace = None
60    if coll_filter is not None:
61        if '.' in coll_filter:
62            try:
63                (namespace, collection) = coll_filter.split('.')
64            except ValueError:
65                raise AnsibleError("Invalid collection pattern supplied: %s" % coll_filter)
66        else:
67            namespace = coll_filter
68
69    collections = defaultdict(dict)
70    for path in list_valid_collection_paths(search_paths):
71
72        b_path = to_bytes(path)
73        if os.path.isdir(b_path):
74            b_coll_root = to_bytes(os.path.join(path, 'ansible_collections'))
75
76            if os.path.exists(b_coll_root) and os.path.isdir(b_coll_root):
77
78                if namespace is None:
79                    namespaces = os.listdir(b_coll_root)
80                else:
81                    namespaces = [namespace]
82
83                for ns in namespaces:
84                    b_namespace_dir = os.path.join(b_coll_root, to_bytes(ns))
85
86                    if os.path.isdir(b_namespace_dir):
87
88                        if collection is None:
89                            colls = os.listdir(b_namespace_dir)
90                        else:
91                            colls = [collection]
92
93                        for mycoll in colls:
94
95                            # skip dupe collections as they will be masked in execution
96                            if mycoll not in collections[ns]:
97                                b_coll = to_bytes(mycoll)
98                                b_coll_dir = os.path.join(b_namespace_dir, b_coll)
99                                if is_collection_path(b_coll_dir):
100                                    collections[ns][mycoll] = b_coll_dir
101                                    yield b_coll_dir
102