1"""Manifest structure used to store paths that should be included in a test run.
2
3The manifest is represented by a tree of IncludeManifest objects, the root
4representing the file and each subnode representing a subdirectory that should
5be included or excluded.
6"""
7import glob
8import os
9from urllib.parse import urlparse, urlsplit
10
11from .wptmanifest.node import DataNode
12from .wptmanifest.backends import conditional
13from .wptmanifest.backends.conditional import ManifestItem
14
15
16class IncludeManifest(ManifestItem):
17    def __init__(self, node):
18        """Node in a tree structure representing the paths
19        that should be included or excluded from the test run.
20
21        :param node: AST Node corresponding to this Node.
22        """
23        ManifestItem.__init__(self, node)
24        self.child_map = {}
25
26    @classmethod
27    def create(cls):
28        """Create an empty IncludeManifest tree"""
29        node = DataNode(None)
30        return cls(node)
31
32    def set_defaults(self):
33        if not self.has_key("skip"):
34            self.set("skip", "False")
35
36    def append(self, child):
37        ManifestItem.append(self, child)
38        self.child_map[child.name] = child
39        assert len(self.child_map) == len(self.children)
40
41    def include(self, test):
42        """Return a boolean indicating whether a particular test should be
43        included in a test run, based on the IncludeManifest tree rooted on
44        this object.
45
46        :param test: The test object"""
47        path_components = self._get_components(test.url)
48        return self._include(test, path_components)
49
50    def _include(self, test, path_components):
51        if path_components:
52            next_path_part = path_components.pop()
53            if next_path_part in self.child_map:
54                return self.child_map[next_path_part]._include(test, path_components)
55
56        node = self
57        while node:
58            try:
59                skip_value = self.get("skip", {"test_type": test.item_type}).lower()
60                assert skip_value in ("true", "false")
61                return skip_value != "true"
62            except KeyError:
63                if node.parent is not None:
64                    node = node.parent
65                else:
66                    # Include by default
67                    return True
68
69    def _get_components(self, url):
70        rv = []
71        url_parts = urlsplit(url)
72        variant = ""
73        if url_parts.query:
74            variant += "?" + url_parts.query
75        if url_parts.fragment:
76            variant += "#" + url_parts.fragment
77        if variant:
78            rv.append(variant)
79        rv.extend([item for item in reversed(url_parts.path.split("/")) if item])
80        return rv
81
82    def _add_rule(self, test_manifests, url, direction):
83        maybe_path = os.path.join(os.path.abspath(os.curdir), url)
84        rest, last = os.path.split(maybe_path)
85        fragment = query = None
86        if "#" in last:
87            last, fragment = last.rsplit("#", 1)
88        if "?" in last:
89            last, query = last.rsplit("?", 1)
90
91        maybe_path = os.path.join(rest, last)
92        paths = glob.glob(maybe_path)
93
94        if paths:
95            urls = []
96            for path in paths:
97                for manifest, data in test_manifests.items():
98                    found = False
99                    rel_path = os.path.relpath(path, data["tests_path"])
100                    iterator = manifest.iterpath if os.path.isfile(path) else manifest.iterdir
101                    for test in iterator(rel_path):
102                        if not hasattr(test, "url"):
103                            continue
104                        url = test.url
105                        if query or fragment:
106                            parsed = urlparse(url)
107                            if ((query and query != parsed.query) or
108                                (fragment and fragment != parsed.fragment)):
109                                continue
110                        urls.append(url)
111                        found = True
112                    if found:
113                        break
114        else:
115            urls = [url]
116
117        assert direction in ("include", "exclude")
118
119        for url in urls:
120            components = self._get_components(url)
121
122            node = self
123            while components:
124                component = components.pop()
125                if component not in node.child_map:
126                    new_node = IncludeManifest(DataNode(component))
127                    node.append(new_node)
128                    new_node.set("skip", node.get("skip", {}))
129
130                node = node.child_map[component]
131
132            skip = False if direction == "include" else True
133            node.set("skip", str(skip))
134
135    def add_include(self, test_manifests, url_prefix):
136        """Add a rule indicating that tests under a url path
137        should be included in test runs
138
139        :param url_prefix: The url prefix to include
140        """
141        return self._add_rule(test_manifests, url_prefix, "include")
142
143    def add_exclude(self, test_manifests, url_prefix):
144        """Add a rule indicating that tests under a url path
145        should be excluded from test runs
146
147        :param url_prefix: The url prefix to exclude
148        """
149        return self._add_rule(test_manifests, url_prefix, "exclude")
150
151
152def get_manifest(manifest_path):
153    with open(manifest_path, "rb") as f:
154        return conditional.compile(f, data_cls_getter=lambda x, y: IncludeManifest)
155