1# -*- coding: utf-8 -*- 2# Copyright (c) 2014 Rackspace 3# Copyright (c) 2015 Ian Cordasco 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 13# implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16from collections import namedtuple 17 18from .compat import to_str 19from .exceptions import InvalidAuthority, ResolutionError 20from .misc import ( 21 ABSOLUTE_URI_MATCHER, FRAGMENT_MATCHER, IPv4_MATCHER, PATH_MATCHER, 22 QUERY_MATCHER, SCHEME_MATCHER, SUBAUTHORITY_MATCHER, URI_MATCHER, 23 URI_COMPONENTS, merge_paths 24 ) 25from .normalizers import ( 26 encode_component, normalize_scheme, normalize_authority, normalize_path, 27 normalize_query, normalize_fragment 28 ) 29 30 31class URIReference(namedtuple('URIReference', URI_COMPONENTS)): 32 slots = () 33 34 def __new__(cls, scheme, authority, path, query, fragment, 35 encoding='utf-8'): 36 ref = super(URIReference, cls).__new__( 37 cls, 38 scheme or None, 39 authority or None, 40 path or None, 41 query or None, 42 fragment or None) 43 ref.encoding = encoding 44 return ref 45 46 def __eq__(self, other): 47 other_ref = other 48 if isinstance(other, tuple): 49 other_ref = URIReference(*other) 50 elif not isinstance(other, URIReference): 51 try: 52 other_ref = URIReference.from_string(other) 53 except TypeError: 54 raise TypeError( 55 'Unable to compare URIReference() to {0}()'.format( 56 type(other).__name__)) 57 58 # See http://tools.ietf.org/html/rfc3986#section-6.2 59 naive_equality = tuple(self) == tuple(other_ref) 60 return naive_equality or self.normalized_equality(other_ref) 61 62 @classmethod 63 def from_string(cls, uri_string, encoding='utf-8'): 64 """Parse a URI reference from the given unicode URI string. 65 66 :param str uri_string: Unicode URI to be parsed into a reference. 67 :param str encoding: The encoding of the string provided 68 :returns: :class:`URIReference` or subclass thereof 69 """ 70 uri_string = to_str(uri_string, encoding) 71 72 split_uri = URI_MATCHER.match(uri_string).groupdict() 73 return cls(split_uri['scheme'], split_uri['authority'], 74 encode_component(split_uri['path'], encoding), 75 encode_component(split_uri['query'], encoding), 76 encode_component(split_uri['fragment'], encoding), encoding) 77 78 def authority_info(self): 79 """Returns a dictionary with the ``userinfo``, ``host``, and ``port``. 80 81 If the authority is not valid, it will raise a ``InvalidAuthority`` 82 Exception. 83 84 :returns: 85 ``{'userinfo': 'username:password', 'host': 'www.example.com', 86 'port': '80'}`` 87 :rtype: dict 88 :raises InvalidAuthority: If the authority is not ``None`` and can not 89 be parsed. 90 """ 91 if not self.authority: 92 return {'userinfo': None, 'host': None, 'port': None} 93 94 match = SUBAUTHORITY_MATCHER.match(self.authority) 95 96 if match is None: 97 # In this case, we have an authority that was parsed from the URI 98 # Reference, but it cannot be further parsed by our 99 # SUBAUTHORITY_MATCHER. In this case it must not be a valid 100 # authority. 101 raise InvalidAuthority(self.authority.encode(self.encoding)) 102 103 # We had a match, now let's ensure that it is actually a valid host 104 # address if it is IPv4 105 matches = match.groupdict() 106 host = matches.get('host') 107 108 if (host and IPv4_MATCHER.match(host) and not 109 valid_ipv4_host_address(host)): 110 # If we have a host, it appears to be IPv4 and it does not have 111 # valid bytes, it is an InvalidAuthority. 112 raise InvalidAuthority(self.authority.encode(self.encoding)) 113 114 return matches 115 116 @property 117 def host(self): 118 """If present, a string representing the host.""" 119 try: 120 authority = self.authority_info() 121 except InvalidAuthority: 122 return None 123 return authority['host'] 124 125 @property 126 def port(self): 127 """If present, the port (as a string) extracted from the authority.""" 128 try: 129 authority = self.authority_info() 130 except InvalidAuthority: 131 return None 132 return authority['port'] 133 134 @property 135 def userinfo(self): 136 """If present, the userinfo extracted from the authority.""" 137 try: 138 authority = self.authority_info() 139 except InvalidAuthority: 140 return None 141 return authority['userinfo'] 142 143 def is_absolute(self): 144 """Determine if this URI Reference is an absolute URI. 145 146 See http://tools.ietf.org/html/rfc3986#section-4.3 for explanation. 147 148 :returns: ``True`` if it is an absolute URI, ``False`` otherwise. 149 :rtype: bool 150 """ 151 return bool(ABSOLUTE_URI_MATCHER.match(self.unsplit())) 152 153 def is_valid(self, **kwargs): 154 """Determines if the URI is valid. 155 156 :param bool require_scheme: Set to ``True`` if you wish to require the 157 presence of the scheme component. 158 :param bool require_authority: Set to ``True`` if you wish to require 159 the presence of the authority component. 160 :param bool require_path: Set to ``True`` if you wish to require the 161 presence of the path component. 162 :param bool require_query: Set to ``True`` if you wish to require the 163 presence of the query component. 164 :param bool require_fragment: Set to ``True`` if you wish to require 165 the presence of the fragment component. 166 :returns: ``True`` if the URI is valid. ``False`` otherwise. 167 :rtype: bool 168 """ 169 validators = [ 170 (self.scheme_is_valid, kwargs.get('require_scheme', False)), 171 (self.authority_is_valid, kwargs.get('require_authority', False)), 172 (self.path_is_valid, kwargs.get('require_path', False)), 173 (self.query_is_valid, kwargs.get('require_query', False)), 174 (self.fragment_is_valid, kwargs.get('require_fragment', False)), 175 ] 176 return all(v(r) for v, r in validators) 177 178 def _is_valid(self, value, matcher, require): 179 if require: 180 return (value is not None 181 and matcher.match(value)) 182 183 # require is False and value is not None 184 return value is None or matcher.match(value) 185 186 def authority_is_valid(self, require=False): 187 """Determines if the authority component is valid. 188 189 :param str require: Set to ``True`` to require the presence of this 190 component. 191 :returns: ``True`` if the authority is valid. ``False`` otherwise. 192 :rtype: bool 193 """ 194 try: 195 self.authority_info() 196 except InvalidAuthority: 197 return False 198 199 is_valid = self._is_valid(self.authority, 200 SUBAUTHORITY_MATCHER, 201 require) 202 203 # Ensure that IPv4 addresses have valid bytes 204 if is_valid and self.host and IPv4_MATCHER.match(self.host): 205 return valid_ipv4_host_address(self.host) 206 207 # Perhaps the host didn't exist or if it did, it wasn't an IPv4-like 208 # address. In either case, we want to rely on the `_is_valid` check, 209 # so let's return that. 210 return is_valid 211 212 def scheme_is_valid(self, require=False): 213 """Determines if the scheme component is valid. 214 215 :param str require: Set to ``True`` to require the presence of this 216 component. 217 :returns: ``True`` if the scheme is valid. ``False`` otherwise. 218 :rtype: bool 219 """ 220 return self._is_valid(self.scheme, SCHEME_MATCHER, require) 221 222 def path_is_valid(self, require=False): 223 """Determines if the path component is valid. 224 225 :param str require: Set to ``True`` to require the presence of this 226 component. 227 :returns: ``True`` if the path is valid. ``False`` otherwise. 228 :rtype: bool 229 """ 230 return self._is_valid(self.path, PATH_MATCHER, require) 231 232 def query_is_valid(self, require=False): 233 """Determines if the query component is valid. 234 235 :param str require: Set to ``True`` to require the presence of this 236 component. 237 :returns: ``True`` if the query is valid. ``False`` otherwise. 238 :rtype: bool 239 """ 240 return self._is_valid(self.query, QUERY_MATCHER, require) 241 242 def fragment_is_valid(self, require=False): 243 """Determines if the fragment component is valid. 244 245 :param str require: Set to ``True`` to require the presence of this 246 component. 247 :returns: ``True`` if the fragment is valid. ``False`` otherwise. 248 :rtype: bool 249 """ 250 return self._is_valid(self.fragment, FRAGMENT_MATCHER, require) 251 252 def normalize(self): 253 """Normalize this reference as described in Section 6.2.2 254 255 This is not an in-place normalization. Instead this creates a new 256 URIReference. 257 258 :returns: A new reference object with normalized components. 259 :rtype: URIReference 260 """ 261 # See http://tools.ietf.org/html/rfc3986#section-6.2.2 for logic in 262 # this method. 263 return URIReference(normalize_scheme(self.scheme or ''), 264 normalize_authority( 265 (self.userinfo, self.host, self.port)), 266 normalize_path(self.path or ''), 267 normalize_query(self.query or ''), 268 normalize_fragment(self.fragment or '')) 269 270 def normalized_equality(self, other_ref): 271 """Compare this URIReference to another URIReference. 272 273 :param URIReference other_ref: (required), The reference with which 274 we're comparing. 275 :returns: ``True`` if the references are equal, ``False`` otherwise. 276 :rtype: bool 277 """ 278 return tuple(self.normalize()) == tuple(other_ref.normalize()) 279 280 def resolve_with(self, base_uri, strict=False): 281 """Use an absolute URI Reference to resolve this relative reference. 282 283 Assuming this is a relative reference that you would like to resolve, 284 use the provided base URI to resolve it. 285 286 See http://tools.ietf.org/html/rfc3986#section-5 for more information. 287 288 :param base_uri: Either a string or URIReference. It must be an 289 absolute URI or it will raise an exception. 290 :returns: A new URIReference which is the result of resolving this 291 reference using ``base_uri``. 292 :rtype: :class:`URIReference` 293 :raises ResolutionError: If the ``base_uri`` is not an absolute URI. 294 """ 295 if not isinstance(base_uri, URIReference): 296 base_uri = URIReference.from_string(base_uri) 297 298 if not base_uri.is_absolute(): 299 raise ResolutionError(base_uri) 300 301 # This is optional per 302 # http://tools.ietf.org/html/rfc3986#section-5.2.1 303 base_uri = base_uri.normalize() 304 305 # The reference we're resolving 306 resolving = self 307 308 if not strict and resolving.scheme == base_uri.scheme: 309 resolving = resolving.copy_with(scheme=None) 310 311 # http://tools.ietf.org/html/rfc3986#page-32 312 if resolving.scheme is not None: 313 target = resolving.copy_with(path=normalize_path(resolving.path)) 314 else: 315 if resolving.authority is not None: 316 target = resolving.copy_with( 317 scheme=base_uri.scheme, 318 path=normalize_path(resolving.path) 319 ) 320 else: 321 if resolving.path is None: 322 if resolving.query is not None: 323 query = resolving.query 324 else: 325 query = base_uri.query 326 target = resolving.copy_with( 327 scheme=base_uri.scheme, 328 authority=base_uri.authority, 329 path=base_uri.path, 330 query=query 331 ) 332 else: 333 if resolving.path.startswith('/'): 334 path = normalize_path(resolving.path) 335 else: 336 path = normalize_path( 337 merge_paths(base_uri, resolving.path) 338 ) 339 target = resolving.copy_with( 340 scheme=base_uri.scheme, 341 authority=base_uri.authority, 342 path=path, 343 query=resolving.query 344 ) 345 return target 346 347 def unsplit(self): 348 """Create a URI string from the components. 349 350 :returns: The URI Reference reconstituted as a string. 351 :rtype: str 352 """ 353 # See http://tools.ietf.org/html/rfc3986#section-5.3 354 result_list = [] 355 if self.scheme: 356 result_list.extend([self.scheme, ':']) 357 if self.authority: 358 result_list.extend(['//', self.authority]) 359 if self.path: 360 result_list.append(self.path) 361 if self.query: 362 result_list.extend(['?', self.query]) 363 if self.fragment: 364 result_list.extend(['#', self.fragment]) 365 return ''.join(result_list) 366 367 def copy_with(self, scheme=None, authority=None, path=None, query=None, 368 fragment=None): 369 attributes = { 370 'scheme': scheme, 371 'authority': authority, 372 'path': path, 373 'query': query, 374 'fragment': fragment, 375 } 376 for key, value in list(attributes.items()): 377 if value is None: 378 del attributes[key] 379 return self._replace(**attributes) 380 381 382def valid_ipv4_host_address(host): 383 # If the host exists, and it might be IPv4, check each byte in the 384 # address. 385 return all([0 <= int(byte, base=10) <= 255 for byte in host.split('.')]) 386