1"""Manifest structure used to store paths that should be included in a test run.
2
3The manifest is represented by a tree of IncludeManifest objects, the root
4representing the file and each subnode representing a subdirectory that should
5be included or excluded.
6"""
7import glob
8import os
9from six import iteritems
10from six.moves.urllib.parse import urlparse, urlsplit
11
12from .wptmanifest.node import DataNode
13from .wptmanifest.backends import conditional
14from .wptmanifest.backends.conditional import ManifestItem
15
16
17class IncludeManifest(ManifestItem):
18    def __init__(self, node):
19        """Node in a tree structure representing the paths
20        that should be included or excluded from the test run.
21
22        :param node: AST Node corresponding to this Node.
23        """
24        ManifestItem.__init__(self, node)
25        self.child_map = {}
26
27    @classmethod
28    def create(cls):
29        """Create an empty IncludeManifest tree"""
30        node = DataNode(None)
31        return cls(node)
32
33    def set_defaults(self):
34        if not self.has_key("skip"):
35            self.set("skip", "False")
36
37    def append(self, child):
38        ManifestItem.append(self, child)
39        self.child_map[child.name] = child
40        assert len(self.child_map) == len(self.children)
41
42    def include(self, test):
43        """Return a boolean indicating whether a particular test should be
44        included in a test run, based on the IncludeManifest tree rooted on
45        this object.
46
47        :param test: The test object"""
48        path_components = self._get_components(test.url)
49        return self._include(test, path_components)
50
51    def _include(self, test, path_components):
52        if path_components:
53            next_path_part = path_components.pop()
54            if next_path_part in self.child_map:
55                return self.child_map[next_path_part]._include(test, path_components)
56
57        node = self
58        while node:
59            try:
60                skip_value = self.get("skip", {"test_type": test.item_type}).lower()
61                assert skip_value in ("true", "false")
62                return skip_value != "true"
63            except KeyError:
64                if node.parent is not None:
65                    node = node.parent
66                else:
67                    # Include by default
68                    return True
69
70    def _get_components(self, url):
71        rv = []
72        url_parts = urlsplit(url)
73        variant = ""
74        if url_parts.query:
75            variant += "?" + url_parts.query
76        if url_parts.fragment:
77            variant += "#" + url_parts.fragment
78        if variant:
79            rv.append(variant)
80        rv.extend([item for item in reversed(url_parts.path.split("/")) if item])
81        return rv
82
83    def _add_rule(self, test_manifests, url, direction):
84        maybe_path = os.path.join(os.path.abspath(os.curdir), url)
85        rest, last = os.path.split(maybe_path)
86        fragment = query = None
87        if "#" in last:
88            last, fragment = last.rsplit("#", 1)
89        if "?" in last:
90            last, query = last.rsplit("?", 1)
91
92        maybe_path = os.path.join(rest, last)
93        paths = glob.glob(maybe_path)
94
95        if paths:
96            urls = []
97            for path in paths:
98                for manifest, data in iteritems(test_manifests):
99                    found = False
100                    rel_path = os.path.relpath(path, data["tests_path"])
101                    iterator = manifest.iterpath if os.path.isfile(path) else manifest.iterdir
102                    for test in iterator(rel_path):
103                        if not hasattr(test, "url"):
104                            continue
105                        url = test.url
106                        if query or fragment:
107                            parsed = urlparse(url)
108                            if ((query and query != parsed.query) or
109                                (fragment and fragment != parsed.fragment)):
110                                continue
111                        urls.append(url)
112                        found = True
113                    if found:
114                        break
115        else:
116            urls = [url]
117
118        assert direction in ("include", "exclude")
119
120        for url in urls:
121            components = self._get_components(url)
122
123            node = self
124            while components:
125                component = components.pop()
126                if component not in node.child_map:
127                    new_node = IncludeManifest(DataNode(component))
128                    node.append(new_node)
129                    new_node.set("skip", node.get("skip", {}))
130
131                node = node.child_map[component]
132
133            skip = False if direction == "include" else True
134            node.set("skip", str(skip))
135
136    def add_include(self, test_manifests, url_prefix):
137        """Add a rule indicating that tests under a url path
138        should be included in test runs
139
140        :param url_prefix: The url prefix to include
141        """
142        return self._add_rule(test_manifests, url_prefix, "include")
143
144    def add_exclude(self, test_manifests, url_prefix):
145        """Add a rule indicating that tests under a url path
146        should be excluded from test runs
147
148        :param url_prefix: The url prefix to exclude
149        """
150        return self._add_rule(test_manifests, url_prefix, "exclude")
151
152
153def get_manifest(manifest_path):
154    with open(manifest_path, "rb") as f:
155        return conditional.compile(f, data_cls_getter=lambda x, y: IncludeManifest)
156