1# -*- coding: utf-8 -*-
2# Copyright (c) 2014 Rackspace
3# Copyright (c) 2015 Ian Cordasco
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#    http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16from collections import namedtuple
17
18from .compat import to_str
19from .exceptions import InvalidAuthority, ResolutionError
20from .misc import (
21    ABSOLUTE_URI_MATCHER, FRAGMENT_MATCHER, IPv4_MATCHER, PATH_MATCHER,
22    QUERY_MATCHER, SCHEME_MATCHER, SUBAUTHORITY_MATCHER, URI_MATCHER,
23    URI_COMPONENTS, merge_paths
24    )
25from .normalizers import (
26    encode_component, normalize_scheme, normalize_authority, normalize_path,
27    normalize_query, normalize_fragment
28    )
29
30
31class URIReference(namedtuple('URIReference', URI_COMPONENTS)):
32    slots = ()
33
34    def __new__(cls, scheme, authority, path, query, fragment,
35                encoding='utf-8'):
36        ref = super(URIReference, cls).__new__(
37            cls,
38            scheme or None,
39            authority or None,
40            path or None,
41            query or None,
42            fragment or None)
43        ref.encoding = encoding
44        return ref
45
46    def __eq__(self, other):
47        other_ref = other
48        if isinstance(other, tuple):
49            other_ref = URIReference(*other)
50        elif not isinstance(other, URIReference):
51            try:
52                other_ref = URIReference.from_string(other)
53            except TypeError:
54                raise TypeError(
55                    'Unable to compare URIReference() to {0}()'.format(
56                        type(other).__name__))
57
58        # See http://tools.ietf.org/html/rfc3986#section-6.2
59        naive_equality = tuple(self) == tuple(other_ref)
60        return naive_equality or self.normalized_equality(other_ref)
61
62    @classmethod
63    def from_string(cls, uri_string, encoding='utf-8'):
64        """Parse a URI reference from the given unicode URI string.
65
66        :param str uri_string: Unicode URI to be parsed into a reference.
67        :param str encoding: The encoding of the string provided
68        :returns: :class:`URIReference` or subclass thereof
69        """
70        uri_string = to_str(uri_string, encoding)
71
72        split_uri = URI_MATCHER.match(uri_string).groupdict()
73        return cls(split_uri['scheme'], split_uri['authority'],
74                   encode_component(split_uri['path'], encoding),
75                   encode_component(split_uri['query'], encoding),
76                   encode_component(split_uri['fragment'], encoding), encoding)
77
78    def authority_info(self):
79        """Returns a dictionary with the ``userinfo``, ``host``, and ``port``.
80
81        If the authority is not valid, it will raise a ``InvalidAuthority``
82        Exception.
83
84        :returns:
85            ``{'userinfo': 'username:password', 'host': 'www.example.com',
86            'port': '80'}``
87        :rtype: dict
88        :raises InvalidAuthority: If the authority is not ``None`` and can not
89            be parsed.
90        """
91        if not self.authority:
92            return {'userinfo': None, 'host': None, 'port': None}
93
94        match = SUBAUTHORITY_MATCHER.match(self.authority)
95
96        if match is None:
97            # In this case, we have an authority that was parsed from the URI
98            # Reference, but it cannot be further parsed by our
99            # SUBAUTHORITY_MATCHER. In this case it must not be a valid
100            # authority.
101            raise InvalidAuthority(self.authority.encode(self.encoding))
102
103        # We had a match, now let's ensure that it is actually a valid host
104        # address if it is IPv4
105        matches = match.groupdict()
106        host = matches.get('host')
107
108        if (host and IPv4_MATCHER.match(host) and not
109                valid_ipv4_host_address(host)):
110            # If we have a host, it appears to be IPv4 and it does not have
111            # valid bytes, it is an InvalidAuthority.
112            raise InvalidAuthority(self.authority.encode(self.encoding))
113
114        return matches
115
116    @property
117    def host(self):
118        """If present, a string representing the host."""
119        try:
120            authority = self.authority_info()
121        except InvalidAuthority:
122            return None
123        return authority['host']
124
125    @property
126    def port(self):
127        """If present, the port (as a string) extracted from the authority."""
128        try:
129            authority = self.authority_info()
130        except InvalidAuthority:
131            return None
132        return authority['port']
133
134    @property
135    def userinfo(self):
136        """If present, the userinfo extracted from the authority."""
137        try:
138            authority = self.authority_info()
139        except InvalidAuthority:
140            return None
141        return authority['userinfo']
142
143    def is_absolute(self):
144        """Determine if this URI Reference is an absolute URI.
145
146        See http://tools.ietf.org/html/rfc3986#section-4.3 for explanation.
147
148        :returns: ``True`` if it is an absolute URI, ``False`` otherwise.
149        :rtype: bool
150        """
151        return bool(ABSOLUTE_URI_MATCHER.match(self.unsplit()))
152
153    def is_valid(self, **kwargs):
154        """Determines if the URI is valid.
155
156        :param bool require_scheme: Set to ``True`` if you wish to require the
157            presence of the scheme component.
158        :param bool require_authority: Set to ``True`` if you wish to require
159            the presence of the authority component.
160        :param bool require_path: Set to ``True`` if you wish to require the
161            presence of the path component.
162        :param bool require_query: Set to ``True`` if you wish to require the
163            presence of the query component.
164        :param bool require_fragment: Set to ``True`` if you wish to require
165            the presence of the fragment component.
166        :returns: ``True`` if the URI is valid. ``False`` otherwise.
167        :rtype: bool
168        """
169        validators = [
170            (self.scheme_is_valid, kwargs.get('require_scheme', False)),
171            (self.authority_is_valid, kwargs.get('require_authority', False)),
172            (self.path_is_valid, kwargs.get('require_path', False)),
173            (self.query_is_valid, kwargs.get('require_query', False)),
174            (self.fragment_is_valid, kwargs.get('require_fragment', False)),
175            ]
176        return all(v(r) for v, r in validators)
177
178    def _is_valid(self, value, matcher, require):
179        if require:
180            return (value is not None
181                    and matcher.match(value))
182
183        # require is False and value is not None
184        return value is None or matcher.match(value)
185
186    def authority_is_valid(self, require=False):
187        """Determines if the authority component is valid.
188
189        :param str require: Set to ``True`` to require the presence of this
190            component.
191        :returns: ``True`` if the authority is valid. ``False`` otherwise.
192        :rtype: bool
193        """
194        try:
195            self.authority_info()
196        except InvalidAuthority:
197            return False
198
199        is_valid = self._is_valid(self.authority,
200                                  SUBAUTHORITY_MATCHER,
201                                  require)
202
203        # Ensure that IPv4 addresses have valid bytes
204        if is_valid and self.host and IPv4_MATCHER.match(self.host):
205            return valid_ipv4_host_address(self.host)
206
207        # Perhaps the host didn't exist or if it did, it wasn't an IPv4-like
208        # address. In either case, we want to rely on the `_is_valid` check,
209        # so let's return that.
210        return is_valid
211
212    def scheme_is_valid(self, require=False):
213        """Determines if the scheme component is valid.
214
215        :param str require: Set to ``True`` to require the presence of this
216            component.
217        :returns: ``True`` if the scheme is valid. ``False`` otherwise.
218        :rtype: bool
219        """
220        return self._is_valid(self.scheme, SCHEME_MATCHER, require)
221
222    def path_is_valid(self, require=False):
223        """Determines if the path component is valid.
224
225        :param str require: Set to ``True`` to require the presence of this
226            component.
227        :returns: ``True`` if the path is valid. ``False`` otherwise.
228        :rtype: bool
229        """
230        return self._is_valid(self.path, PATH_MATCHER, require)
231
232    def query_is_valid(self, require=False):
233        """Determines if the query component is valid.
234
235        :param str require: Set to ``True`` to require the presence of this
236            component.
237        :returns: ``True`` if the query is valid. ``False`` otherwise.
238        :rtype: bool
239        """
240        return self._is_valid(self.query, QUERY_MATCHER, require)
241
242    def fragment_is_valid(self, require=False):
243        """Determines if the fragment component is valid.
244
245        :param str require: Set to ``True`` to require the presence of this
246            component.
247        :returns: ``True`` if the fragment is valid. ``False`` otherwise.
248        :rtype: bool
249        """
250        return self._is_valid(self.fragment, FRAGMENT_MATCHER, require)
251
252    def normalize(self):
253        """Normalize this reference as described in Section 6.2.2
254
255        This is not an in-place normalization. Instead this creates a new
256        URIReference.
257
258        :returns: A new reference object with normalized components.
259        :rtype: URIReference
260        """
261        # See http://tools.ietf.org/html/rfc3986#section-6.2.2 for logic in
262        # this method.
263        return URIReference(normalize_scheme(self.scheme or ''),
264                            normalize_authority(
265                                (self.userinfo, self.host, self.port)),
266                            normalize_path(self.path or ''),
267                            normalize_query(self.query or ''),
268                            normalize_fragment(self.fragment or ''))
269
270    def normalized_equality(self, other_ref):
271        """Compare this URIReference to another URIReference.
272
273        :param URIReference other_ref: (required), The reference with which
274            we're comparing.
275        :returns: ``True`` if the references are equal, ``False`` otherwise.
276        :rtype: bool
277        """
278        return tuple(self.normalize()) == tuple(other_ref.normalize())
279
280    def resolve_with(self, base_uri, strict=False):
281        """Use an absolute URI Reference to resolve this relative reference.
282
283        Assuming this is a relative reference that you would like to resolve,
284        use the provided base URI to resolve it.
285
286        See http://tools.ietf.org/html/rfc3986#section-5 for more information.
287
288        :param base_uri: Either a string or URIReference. It must be an
289            absolute URI or it will raise an exception.
290        :returns: A new URIReference which is the result of resolving this
291            reference using ``base_uri``.
292        :rtype: :class:`URIReference`
293        :raises ResolutionError: If the ``base_uri`` is not an absolute URI.
294        """
295        if not isinstance(base_uri, URIReference):
296            base_uri = URIReference.from_string(base_uri)
297
298        if not base_uri.is_absolute():
299            raise ResolutionError(base_uri)
300
301        # This is optional per
302        # http://tools.ietf.org/html/rfc3986#section-5.2.1
303        base_uri = base_uri.normalize()
304
305        # The reference we're resolving
306        resolving = self
307
308        if not strict and resolving.scheme == base_uri.scheme:
309            resolving = resolving.copy_with(scheme=None)
310
311        # http://tools.ietf.org/html/rfc3986#page-32
312        if resolving.scheme is not None:
313            target = resolving.copy_with(path=normalize_path(resolving.path))
314        else:
315            if resolving.authority is not None:
316                target = resolving.copy_with(
317                    scheme=base_uri.scheme,
318                    path=normalize_path(resolving.path)
319                )
320            else:
321                if resolving.path is None:
322                    if resolving.query is not None:
323                        query = resolving.query
324                    else:
325                        query = base_uri.query
326                    target = resolving.copy_with(
327                        scheme=base_uri.scheme,
328                        authority=base_uri.authority,
329                        path=base_uri.path,
330                        query=query
331                    )
332                else:
333                    if resolving.path.startswith('/'):
334                        path = normalize_path(resolving.path)
335                    else:
336                        path = normalize_path(
337                            merge_paths(base_uri, resolving.path)
338                        )
339                    target = resolving.copy_with(
340                        scheme=base_uri.scheme,
341                        authority=base_uri.authority,
342                        path=path,
343                        query=resolving.query
344                    )
345        return target
346
347    def unsplit(self):
348        """Create a URI string from the components.
349
350        :returns: The URI Reference reconstituted as a string.
351        :rtype: str
352        """
353        # See http://tools.ietf.org/html/rfc3986#section-5.3
354        result_list = []
355        if self.scheme:
356            result_list.extend([self.scheme, ':'])
357        if self.authority:
358            result_list.extend(['//', self.authority])
359        if self.path:
360            result_list.append(self.path)
361        if self.query:
362            result_list.extend(['?', self.query])
363        if self.fragment:
364            result_list.extend(['#', self.fragment])
365        return ''.join(result_list)
366
367    def copy_with(self, scheme=None, authority=None, path=None, query=None,
368                  fragment=None):
369        attributes = {
370            'scheme': scheme,
371            'authority': authority,
372            'path': path,
373            'query': query,
374            'fragment': fragment,
375        }
376        for key, value in list(attributes.items()):
377            if value is None:
378                del attributes[key]
379        return self._replace(**attributes)
380
381
382def valid_ipv4_host_address(host):
383    # If the host exists, and it might be IPv4, check each byte in the
384    # address.
385    return all([0 <= int(byte, base=10) <= 255 for byte in host.split('.')])
386