1"""Code for finding content."""
2from __future__ import (absolute_import, division, print_function)
3__metaclass__ = type
4
5import abc
6import collections
7import os
8
9from ... import types as t
10
11from ...util import (
12    ANSIBLE_SOURCE_ROOT,
13)
14
15from .. import (
16    PathProvider,
17)
18
19
20class Layout:
21    """Description of content locations and helper methods to access content."""
22    def __init__(self,
23                 root,  # type: str
24                 paths,  # type: t.List[str]
25                 ):  # type: (...) -> None
26        self.root = root
27
28        self.__paths = paths  # contains both file paths and symlinked directory paths (ending with os.path.sep)
29        self.__files = [path for path in paths if not path.endswith(os.path.sep)]  # contains only file paths
30        self.__paths_tree = paths_to_tree(self.__paths)
31        self.__files_tree = paths_to_tree(self.__files)
32
33    def all_files(self, include_symlinked_directories=False):  # type: (bool) -> t.List[str]
34        """Return a list of all file paths."""
35        if include_symlinked_directories:
36            return self.__paths
37
38        return self.__files
39
40    def walk_files(self, directory, include_symlinked_directories=False):  # type: (str, bool) -> t.List[str]
41        """Return a list of file paths found recursively under the given directory."""
42        if include_symlinked_directories:
43            tree = self.__paths_tree
44        else:
45            tree = self.__files_tree
46
47        parts = directory.rstrip(os.path.sep).split(os.path.sep)
48        item = get_tree_item(tree, parts)
49
50        if not item:
51            return []
52
53        directories = collections.deque(item[0].values())
54
55        files = list(item[1])
56
57        while directories:
58            item = directories.pop()
59            directories.extend(item[0].values())
60            files.extend(item[1])
61
62        return files
63
64    def get_dirs(self, directory):  # type: (str) -> t.List[str]
65        """Return a list directory paths found directly under the given directory."""
66        parts = directory.rstrip(os.path.sep).split(os.path.sep)
67        item = get_tree_item(self.__files_tree, parts)
68        return [os.path.join(directory, key) for key in item[0].keys()] if item else []
69
70    def get_files(self, directory):  # type: (str) -> t.List[str]
71        """Return a list of file paths found directly under the given directory."""
72        parts = directory.rstrip(os.path.sep).split(os.path.sep)
73        item = get_tree_item(self.__files_tree, parts)
74        return item[1] if item else []
75
76
77class ContentLayout(Layout):
78    """Information about the current Ansible content being tested."""
79    def __init__(self,
80                 root,  # type: str
81                 paths,  # type: t.List[str]
82                 plugin_paths,  # type: t.Dict[str, str]
83                 collection,  # type: t.Optional[CollectionDetail]
84                 test_path,  # type: str
85                 results_path,  # type: str
86                 sanity_path,  # type: str
87                 sanity_messages,  # type: t.Optional[LayoutMessages]
88                 integration_path,  # type: str
89                 integration_targets_path,  # type: str
90                 integration_vars_path,  # type: str
91                 integration_messages,  # type: t.Optional[LayoutMessages]
92                 unit_path,  # type: str
93                 unit_module_path,  # type: str
94                 unit_module_utils_path,  # type: str
95                 unit_messages,  # type: t.Optional[LayoutMessages]
96                 ):  # type: (...) -> None
97        super(ContentLayout, self).__init__(root, paths)
98
99        self.plugin_paths = plugin_paths
100        self.collection = collection
101        self.test_path = test_path
102        self.results_path = results_path
103        self.sanity_path = sanity_path
104        self.sanity_messages = sanity_messages
105        self.integration_path = integration_path
106        self.integration_targets_path = integration_targets_path
107        self.integration_vars_path = integration_vars_path
108        self.integration_messages = integration_messages
109        self.unit_path = unit_path
110        self.unit_module_path = unit_module_path
111        self.unit_module_utils_path = unit_module_utils_path
112        self.unit_messages = unit_messages
113
114        self.is_ansible = root == ANSIBLE_SOURCE_ROOT
115
116    @property
117    def prefix(self):  # type: () -> str
118        """Return the collection prefix or an empty string if not a collection."""
119        if self.collection:
120            return self.collection.prefix
121
122        return ''
123
124    @property
125    def module_path(self):  # type: () -> t.Optional[str]
126        """Return the path where modules are found, if any."""
127        return self.plugin_paths.get('modules')
128
129    @property
130    def module_utils_path(self):  # type: () -> t.Optional[str]
131        """Return the path where module_utils are found, if any."""
132        return self.plugin_paths.get('module_utils')
133
134    @property
135    def module_utils_powershell_path(self):  # type: () -> t.Optional[str]
136        """Return the path where powershell module_utils are found, if any."""
137        if self.is_ansible:
138            return os.path.join(self.plugin_paths['module_utils'], 'powershell')
139
140        return self.plugin_paths.get('module_utils')
141
142    @property
143    def module_utils_csharp_path(self):  # type: () -> t.Optional[str]
144        """Return the path where csharp module_utils are found, if any."""
145        if self.is_ansible:
146            return os.path.join(self.plugin_paths['module_utils'], 'csharp')
147
148        return self.plugin_paths.get('module_utils')
149
150
151class LayoutMessages:
152    """Messages generated during layout creation that should be deferred for later display."""
153    def __init__(self):
154        self.info = []  # type: t.List[str]
155        self.warning = []  # type: t.List[str]
156        self.error = []  # type: t.List[str]
157
158
159class CollectionDetail:
160    """Details about the layout of the current collection."""
161    def __init__(self,
162                 name,  # type: str
163                 namespace,  # type: str
164                 root,  # type: str
165                 ):  # type: (...) -> None
166        self.name = name
167        self.namespace = namespace
168        self.root = root
169        self.full_name = '%s.%s' % (namespace, name)
170        self.prefix = '%s.' % self.full_name
171        self.directory = os.path.join('ansible_collections', namespace, name)
172
173
174class LayoutProvider(PathProvider):
175    """Base class for layout providers."""
176    PLUGIN_TYPES = (
177        'action',
178        'become',
179        'cache',
180        'callback',
181        'cliconf',
182        'connection',
183        'doc_fragments',
184        'filter',
185        'httpapi',
186        'inventory',
187        'lookup',
188        'module_utils',
189        'modules',
190        'netconf',
191        'shell',
192        'strategy',
193        'terminal',
194        'test',
195        'vars',
196        # The following are plugin directories not directly supported by ansible-core, but used in collections
197        # (https://github.com/ansible-collections/overview/blob/main/collection_requirements.rst#modules--plugins)
198        'plugin_utils',
199        'sub_plugins',
200    )
201
202    @abc.abstractmethod
203    def create(self, root, paths):  # type: (str, t.List[str]) -> ContentLayout
204        """Create a layout using the given root and paths."""
205
206
207def paths_to_tree(paths):  # type: (t.List[str]) -> t.Tuple[t.Dict[str, t.Any], t.List[str]]
208    """Return a filesystem tree from the given list of paths."""
209    tree = {}, []
210
211    for path in paths:
212        parts = path.split(os.path.sep)
213        root = tree
214
215        for part in parts[:-1]:
216            if part not in root[0]:
217                root[0][part] = {}, []
218
219            root = root[0][part]
220
221        root[1].append(path)
222
223    return tree
224
225
226def get_tree_item(tree, parts):  # type: (t.Tuple[t.Dict[str, t.Any], t.List[str]], t.List[str]) -> t.Optional[t.Tuple[t.Dict[str, t.Any], t.List[str]]]
227    """Return the portion of the tree found under the path given by parts, or None if it does not exist."""
228    root = tree
229
230    for part in parts:
231        root = root[0].get(part)
232
233        if not root:
234            return None
235
236    return root
237