1# 2# Copyright (c), 2016-2020, SISSA (International School for Advanced Studies). 3# All rights reserved. 4# This file is distributed under the terms of the MIT License. 5# See the file 'LICENSE' in the root directory of the present 6# distribution, or http://opensource.org/licenses/MIT. 7# 8# @author Davide Brunato <brunato@sissa.it> 9# 10import copy 11import os.path 12import pathlib 13import platform 14import re 15import string 16from io import StringIO, BytesIO 17from typing import cast, Any, AnyStr, Dict, Optional, IO, Iterator, List, \ 18 MutableMapping, Union, Tuple 19from urllib.request import urlopen 20from urllib.parse import urlsplit, urlunsplit, unquote, quote_from_bytes 21from urllib.error import URLError 22 23from elementpath import iter_select, XPathContext, XPath2Parser 24from elementpath.protocols import ElementProtocol 25 26from .exceptions import XMLSchemaTypeError, XMLSchemaValueError, XMLResourceError 27from .names import XML_NAMESPACE 28from .etree import ElementTree, PyElementTree, SafeXMLParser, etree_tostring 29from .aliases import ElementType, ElementTreeType, NamespacesType, XMLSourceType, \ 30 NormalizedLocationsType, LocationsType, NsmapType, ParentMapType 31from .helpers import get_namespace, is_etree_element, is_etree_document, \ 32 etree_iter_location_hints 33 34DEFUSE_MODES = frozenset(('never', 'remote', 'always')) 35SECURITY_MODES = frozenset(('all', 'remote', 'local', 'sandbox')) 36 37### 38# Restricted XPath parser for XML resources 39LAZY_XML_XPATH_SYMBOLS = frozenset(( 40 'position', 'last', 'not', 'and', 'or', '!=', '<=', '>=', '(', ')', 'text', 41 '[', ']', '.', ',', '/', '|', '*', '=', '<', '>', ':', '@', '(end)', 42 '(unknown)', '(invalid)', '(name)', '(string)', '(float)', '(decimal)', 43 '(integer)' 44)) 45 46DRIVE_LETTERS = frozenset(string.ascii_letters) 47 48 49class LazyXPath2Parser(XPath2Parser): 50 symbol_table = { 51 k: v for k, v in XPath2Parser.symbol_table.items() # type: ignore[misc] 52 if k in LAZY_XML_XPATH_SYMBOLS 53 } 54 SYMBOLS = LAZY_XML_XPATH_SYMBOLS 55 56 57class LazySelector: 58 """A limited XPath selector class for lazy XML resources.""" 59 60 def __init__(self, path: str, namespaces: Optional[NamespacesType] = None) -> None: 61 self.parser = LazyXPath2Parser(namespaces, strict=False) 62 self.path = path 63 self.root_token = self.parser.parse(path) 64 65 def __repr__(self) -> str: 66 return '%s(path=%r)' % (self.__class__.__name__, self.path) 67 68 def select(self, root: ElementProtocol, **kwargs: Any) -> List[ElementProtocol]: 69 context = XPathContext(root, **kwargs) 70 results = self.root_token.get_results(context) 71 if not isinstance(results, list) or any(not is_etree_element(x) for x in results): 72 msg = "XPath expressions on lazy resources can select only elements" 73 raise XMLResourceError(msg) 74 return results 75 76 def iter_select(self, root: ElementProtocol, **kwargs: Any) -> Iterator[ElementProtocol]: 77 context = XPathContext(root, **kwargs) 78 for elem in self.root_token.select_results(context): 79 if not is_etree_element(elem): 80 msg = "XPath expressions on lazy resources can select only elements" 81 raise XMLResourceError(msg) 82 yield cast(ElementProtocol, elem) 83 84 85### 86# URL normalization (that fixes many headaches :) 87class _PurePath(pathlib.PurePath): 88 """ 89 A version of pathlib.PurePath adapted for managing the creation 90 from URIs and the simple normalization of paths. 91 """ 92 _from_parts: Any 93 _flavour: Any 94 95 def __new__(cls, *args: str) -> '_PurePath': 96 if cls is _PurePath: 97 cls = _WindowsPurePath if os.name == 'nt' else _PosixPurePath 98 return cast('_PurePath', cls._from_parts(args)) 99 100 @classmethod 101 def from_uri(cls, uri: str) -> '_PurePath': 102 uri = uri.strip() 103 if not uri: 104 raise XMLSchemaValueError("Empty URI provided!") 105 106 if uri.startswith(r'\\'): 107 return _WindowsPurePath(uri) # UNC path 108 elif uri.startswith('/'): 109 return cls(uri) 110 111 parts = urlsplit(uri) 112 if not parts.scheme: 113 return cls(uri) 114 elif parts.scheme in DRIVE_LETTERS and len(parts.scheme) == 1: 115 return _WindowsPurePath(uri) # Eg. k:/Python/lib/.... 116 elif parts.scheme != 'file': 117 return _PosixPurePath(unquote(parts.path)) 118 119 # Get file URI path because urlsplit does not parse it well 120 start = 7 if uri.startswith('file:///') else 5 121 if parts.query: 122 path = uri[start:uri.index('?')] 123 elif parts.fragment: 124 path = uri[start:uri.index('#')] 125 else: 126 path = uri[start:] 127 128 if ':' in path: 129 # Windows path with a drive 130 pos = path.index(':') 131 if pos == 2 and path[0] == '/' and path[1] in DRIVE_LETTERS: 132 return _WindowsPurePath(unquote(path[1:])) 133 134 obj = _WindowsPurePath(unquote(path)) 135 if len(obj.drive) != 2 or obj.drive[1] != ':': 136 raise XMLSchemaValueError("Invalid URI {!r}".format(uri)) 137 return obj 138 139 if '\\' in path: 140 return _WindowsPurePath(unquote(path)) 141 return cls(unquote(path)) 142 143 def as_uri(self) -> str: 144 if not self.is_absolute(): 145 uri: str = self._flavour.make_uri(self) 146 while uri.startswith('file:/'): 147 uri = uri.replace('file:/', 'file:', 1) 148 return uri 149 150 uri = cast(str, self._flavour.make_uri(self)) 151 if isinstance(self, _WindowsPurePath) and str(self).startswith(r'\\'): 152 # UNC format case: use the format where the host part is included 153 # in the path part, to let urlopen() works. 154 if not uri.startswith('file:////'): 155 return uri.replace('file://', 'file:////') 156 return uri 157 158 def normalize(self) -> '_PurePath': 159 normalized_path = self._flavour.pathmod.normpath(str(self)) 160 return cast('_PurePath', self._from_parts((normalized_path,))) 161 162 163class _PosixPurePath(_PurePath, pathlib.PurePosixPath): 164 __slots__ = () 165 166 167class _WindowsPurePath(_PurePath, pathlib.PureWindowsPath): 168 __slots__ = () 169 170 171def normalize_url(url: str, base_url: Optional[str] = None, 172 keep_relative: bool = False) -> str: 173 """ 174 Returns a normalized URL eventually joining it to a base URL if it's a relative path. 175 Path names are converted to 'file' scheme URLs. 176 177 :param url: a relative or absolute URL. 178 :param base_url: a reference base URL. 179 :param keep_relative: if set to `True` keeps relative file paths, which would \ 180 not strictly conformant to specification (RFC 8089), because *urlopen()* doesn't \ 181 accept a simple pathname. 182 :return: a normalized URL string. 183 """ 184 url_parts = urlsplit(url) 185 if not is_local_scheme(url_parts.scheme): 186 return url_parts.geturl() 187 188 path = _PurePath.from_uri(url) 189 if path.is_absolute(): 190 return path.normalize().as_uri() 191 192 if base_url is not None: 193 base_url_parts = urlsplit(base_url) 194 base_path = _PurePath.from_uri(base_url) 195 if is_local_scheme(base_url_parts.scheme): 196 path = base_path.joinpath(path) 197 elif not url_parts.scheme: 198 path = base_path.joinpath(path).normalize() 199 return urlunsplit(( 200 base_url_parts.scheme, 201 base_url_parts.netloc, 202 quote_from_bytes(bytes(path)), 203 url_parts.query, 204 url_parts.fragment 205 )) 206 207 if path.is_absolute() or keep_relative: 208 return path.normalize().as_uri() 209 210 base_path = _PurePath(os.getcwd()) 211 return base_path.joinpath(path).normalize().as_uri() 212 213 214### 215# Internal helper functions 216 217def is_url(obj: Any) -> bool: 218 """ 219 Checks if and object can be an URL, restricting to strings that cannot be XML data. 220 """ 221 if isinstance(obj, str): 222 if '\n' in obj or obj.lstrip().startswith('<'): 223 return False 224 elif isinstance(obj, bytes): 225 if b'\n' in obj or obj.lstrip().startswith(b'<'): 226 return False 227 else: 228 return False 229 230 try: 231 urlsplit(obj.strip()) 232 except ValueError: 233 return False 234 else: 235 return True 236 237 238def is_local_scheme(scheme: str) -> bool: 239 return not scheme or scheme == 'file' or scheme in DRIVE_LETTERS 240 241 242def is_remote_url(url: Any) -> bool: 243 return is_url(url) and not is_local_scheme(urlsplit(url.strip()).scheme) 244 245 246def is_local_url(url: Any) -> bool: 247 return is_url(url) and is_local_scheme(urlsplit(url.strip()).scheme) 248 249 250def url_path_is_file(url: str) -> bool: 251 if not is_local_url(url): 252 return False 253 if os.path.isfile(url): 254 return True 255 path = unquote(urlsplit(normalize_url(url)).path) 256 if path.startswith('/') and platform.system() == 'Windows': 257 path = path[1:] 258 return os.path.isfile(path) 259 260 261### 262# API for XML resources 263 264def normalize_locations(locations: LocationsType, 265 base_url: Optional[str] = None, 266 keep_relative: bool = False) -> NormalizedLocationsType: 267 """ 268 Returns a list of normalized locations. The locations are normalized using 269 the base URL of the instance. 270 271 :param locations: a dictionary or a list of couples containing namespace location hints. 272 :param base_url: the reference base URL for construct the normalized URL from the argument. 273 :param keep_relative: if set to `True` keeps relative file paths, which would not strictly \ 274 conformant to URL format specification. 275 :return: a list of couples containing normalized namespace location hints. 276 """ 277 normalized_locations = [] 278 if isinstance(locations, dict): 279 for ns, value in locations.items(): 280 if isinstance(value, list): 281 normalized_locations.extend( 282 [(ns, normalize_url(url, base_url, keep_relative)) for url in value] 283 ) 284 else: 285 normalized_locations.append((ns, normalize_url(value, base_url, keep_relative))) 286 else: 287 normalized_locations.extend( 288 [(ns, normalize_url(url, base_url, keep_relative)) for ns, url in locations] 289 ) 290 return normalized_locations 291 292 293def fetch_resource(location: str, base_url: Optional[str] = None, timeout: int = 30) -> str: 294 """ 295 Fetch a resource by trying to access it. If the resource is accessible 296 returns its URL, otherwise raises an :class:`XMLResourceError`. 297 298 :param location: an URL or a file path. 299 :param base_url: reference base URL for normalizing local and relative URLs. 300 :param timeout: the timeout in seconds for the connection attempt in case of remote data. 301 :return: a normalized URL. 302 """ 303 if not location: 304 raise XMLSchemaValueError("'location' argument must contain a not empty string") 305 306 url = normalize_url(location, base_url) 307 try: 308 with urlopen(url, timeout=timeout): 309 return url 310 except URLError as err: 311 # fallback joining the path without a base URL 312 alt_url = normalize_url(location) 313 if url == alt_url: 314 raise XMLResourceError("cannot access to resource %r: %s" % (url, err.reason)) 315 316 try: 317 with urlopen(alt_url, timeout=timeout): 318 return alt_url 319 except URLError: 320 raise XMLResourceError("cannot access to resource %r: %s" % (url, err.reason)) 321 322 323def fetch_schema_locations(source: Union['XMLResource', XMLSourceType], 324 locations: Optional[LocationsType] = None, 325 base_url: Optional[str] = None, 326 allow: str = 'all', 327 defuse: str = 'remote', 328 timeout: int = 30) -> Tuple[str, NormalizedLocationsType]: 329 """ 330 Fetches schema location hints from an XML data source and a list of location hints. 331 If an accessible schema location is not found raises a ValueError. 332 333 :param source: can be an :class:`XMLResource` instance, a file-like object a path \ 334 to a file or an URI of a resource or an Element instance or an ElementTree instance or \ 335 a string containing the XML data. If the passed argument is not an :class:`XMLResource` \ 336 instance a new one is built using this and *defuse*, *timeout* and *lazy* arguments. 337 :param locations: a dictionary or dictionary items with additional schema location hints. 338 :param base_url: the same argument of the :class:`XMLResource`. 339 :param allow: the same argument of the :class:`XMLResource`. 340 :param defuse: the same argument of the :class:`XMLResource`. 341 :param timeout: the same argument of the :class:`XMLResource` but with a reduced default. 342 :return: A 2-tuple with the URL referring to the first reachable schema resource \ 343 and a list of dictionary items with normalized location hints. 344 """ 345 if not isinstance(source, XMLResource): 346 resource = XMLResource(source, base_url, allow, defuse, timeout, lazy=True) 347 else: 348 resource = source 349 350 base_url = resource.base_url 351 namespace = resource.namespace 352 locations = resource.get_locations(locations, root_only=False) 353 if not locations: 354 msg = "{!r} does not contain any schema location hint" 355 raise XMLSchemaValueError(msg.format(source)) 356 357 for ns, url in sorted(locations, key=lambda x: x[0] != namespace): 358 try: 359 return fetch_resource(url, base_url, timeout), locations 360 except XMLResourceError: 361 pass 362 363 raise XMLSchemaValueError("not found a schema for XML data resource {!r}.".format(source)) 364 365 366def fetch_schema(source: Union['XMLResource', XMLSourceType], 367 locations: Optional[LocationsType] = None, 368 base_url: Optional[str] = None, 369 allow: str = 'all', 370 defuse: str = 'remote', 371 timeout: int = 30) -> str: 372 """ 373 Like :meth:`fetch_schema_locations` but returns only a reachable 374 location hint for a schema related to the source's namespace. 375 """ 376 return fetch_schema_locations(source, locations, base_url, allow, defuse, timeout)[0] 377 378 379def fetch_namespaces(source: XMLSourceType, 380 base_url: Optional[str] = None, 381 allow: str = 'all', 382 defuse: str = 'remote', 383 timeout: int = 30) -> NamespacesType: 384 """ 385 Fetches namespaces information from the XML data source. The argument *source* 386 can be a string containing the XML document or file path or an url or a file-like 387 object or an ElementTree instance or an Element instance. A dictionary with 388 namespace mappings is returned. 389 """ 390 resource = XMLResource(source, base_url, allow, defuse, timeout, lazy=True) 391 return resource.get_namespaces(root_only=False) 392 393 394class XMLResource: 395 """ 396 XML resource reader based on ElementTree and urllib. 397 398 :param source: a string containing the XML document or file path or an URL or a \ 399 file like object or an ElementTree or an Element. 400 :param base_url: is an optional base URL, used for the normalization of relative paths \ 401 when the URL of the resource can't be obtained from the source argument. For security \ 402 access to a local file resource is always denied if the *base_url* is a remote URL. 403 :param allow: defines the security mode for accessing resource locations. Can be \ 404 'all', 'remote', 'local' or 'sandbox'. Default is 'all' that means all types of \ 405 URLs are allowed. With 'remote' only remote resource URLs are allowed. With 'local' \ 406 only file paths and URLs are allowed. With 'sandbox' only file paths and URLs that \ 407 are under the directory path identified by the *base_url* argument are allowed. 408 :param defuse: defines when to defuse XML data using a `SafeXMLParser`. Can be \ 409 'always', 'remote' or 'never'. For default defuses only remote XML data. 410 :param timeout: the timeout in seconds for the connection attempt in case of remote data. 411 :param lazy: if a value `False` or 0 is provided the XML data is fully loaded into and \ 412 processed from memory. For default only the root element of the source is loaded, \ 413 except in case the *source* argument is an Element or an ElementTree instance. A \ 414 positive integer also defines the depth at which the lazy resource can be better \ 415 iterated (`True` means 1). 416 """ 417 # Protected attributes for data and resource location 418 _source: XMLSourceType 419 _root: ElementType 420 _text: Optional[str] = None 421 _url: Optional[str] = None 422 _nsmap: Optional[Dict[ElementType, List[Tuple[str, str]]]] = None 423 _parent_map: Optional[ParentMapType] = None 424 _lazy: Union[bool, int] = False 425 426 def __init__(self, source: XMLSourceType, 427 base_url: Optional[str] = None, 428 allow: str = 'all', 429 defuse: str = 'remote', 430 timeout: int = 300, 431 lazy: Union[bool, int] = False) -> None: 432 433 if base_url is not None and not isinstance(base_url, str): 434 msg = "invalid type {!r} for the attribute 'base_url'" 435 raise XMLSchemaTypeError(msg.format(type(base_url))) 436 self._base_url = base_url 437 438 if not isinstance(allow, str): 439 msg = "invalid type {!r} for the attribute 'allow'" 440 raise XMLSchemaTypeError(msg.format(type(allow))) 441 elif allow not in SECURITY_MODES: 442 msg = "'allow' attribute: {!r} is not a security mode" 443 raise XMLSchemaValueError(msg.format(allow)) 444 elif allow == 'sandbox' and self._base_url is None: 445 msg = "block access to files out of sandbox requires 'base_url' to be set" 446 raise XMLResourceError(msg) 447 self._allow = allow 448 449 if not isinstance(defuse, str): 450 msg = "invalid type {!r} for the attribute 'defuse'" 451 raise XMLSchemaTypeError(msg.format(type(defuse))) 452 elif defuse not in DEFUSE_MODES: 453 msg = "'defuse' attribute: {!r} is not a defuse mode" 454 raise XMLSchemaValueError(msg.format(defuse)) 455 self._defuse = defuse 456 457 if not isinstance(timeout, int): 458 msg = "invalid type {!r} for the attribute 'timeout'" 459 raise XMLSchemaTypeError(msg.format(type(timeout))) 460 elif timeout <= 0: 461 msg = "the attribute 'timeout' must be a positive integer" 462 raise XMLSchemaValueError(msg) 463 self._timeout = timeout 464 465 self.parse(source, lazy) 466 467 def __repr__(self) -> str: 468 return '%s(root=%r)' % (self.__class__.__name__, self._root) 469 470 @property 471 def source(self) -> XMLSourceType: 472 """The XML data source.""" 473 return self._source 474 475 @property 476 def root(self) -> ElementType: 477 """The XML tree root Element.""" 478 return self._root 479 480 @property 481 def text(self) -> Optional[str]: 482 """The XML text source, `None` if it's not available.""" 483 return self._text 484 485 @property 486 def name(self) -> Optional[str]: 487 """ 488 The source name, is `None` if the instance is created from an Element or a string. 489 """ 490 return None if self._url is None else os.path.basename(self._url) 491 492 @property 493 def url(self) -> Optional[str]: 494 """ 495 The source URL, `None` if the instance is created from an Element or a string. 496 """ 497 return self._url 498 499 @property 500 def base_url(self) -> Optional[str]: 501 """The effective base URL used for completing relative locations.""" 502 return os.path.dirname(self._url) if self._url else self._base_url 503 504 @property 505 def filepath(self) -> Optional[str]: 506 """ 507 The resource filepath if the instance is created from a local file, `None` otherwise. 508 """ 509 if self._url: 510 url_parts = urlsplit(self._url) 511 if url_parts.scheme in ('', 'file'): 512 return url_parts.path 513 return None 514 515 @property 516 def allow(self) -> str: 517 """The security mode for accessing resource locations.""" 518 return self._allow 519 520 @property 521 def defuse(self) -> str: 522 """When to defuse XML data.""" 523 return self._defuse 524 525 @property 526 def timeout(self) -> int: 527 """The timeout in seconds for accessing remote resources.""" 528 return self._timeout 529 530 def _access_control(self, url: str) -> None: 531 if self._allow == 'all': 532 return 533 elif self._allow == 'remote': 534 if is_local_url(url): 535 raise XMLResourceError("block access to local resource {}".format(url)) 536 elif is_remote_url(url): 537 raise XMLResourceError("block access to remote resource {}".format(url)) 538 elif self._allow == 'sandbox' and self._base_url is not None: 539 if not url.startswith(normalize_url(self._base_url)): 540 raise XMLResourceError("block access to out of sandbox file {}".format(url)) 541 542 def _update_nsmap(self, nsmap: MutableMapping[str, str], prefix: str, uri: str) -> None: 543 if not prefix: 544 if not uri: 545 return 546 elif '' not in nsmap: 547 if self.namespace: 548 nsmap[prefix] = uri 549 return 550 elif nsmap[''] == uri: 551 return 552 prefix = 'default' 553 554 while prefix in nsmap: 555 if nsmap[prefix] == uri: 556 return 557 match = re.search(r'(\d+)$', prefix) 558 if match: 559 index = int(match.group()) + 1 560 prefix = prefix[:match.span()[0]] + str(index) 561 else: 562 prefix += '0' 563 nsmap[prefix] = uri 564 565 def _lazy_iterparse(self, resource: IO[AnyStr], nsmap: Optional[NsmapType] = None) \ 566 -> Iterator[Tuple[str, ElementType]]: 567 events: Tuple[str, ...] 568 _nsmap: List[Tuple[str, str]] 569 570 if nsmap is None: 571 events = 'start', 'end' 572 _nsmap = [] 573 else: 574 events = 'start-ns', 'end-ns', 'start', 'end' 575 if isinstance(nsmap, list): 576 _nsmap = nsmap 577 _nsmap.clear() 578 else: 579 _nsmap = [] 580 581 if self._defuse == 'remote' and is_remote_url(self.base_url) \ 582 or self._defuse == 'always': 583 safe_parser = SafeXMLParser(target=PyElementTree.TreeBuilder()) 584 tree_iterator = PyElementTree.iterparse(resource, events, safe_parser) 585 else: 586 tree_iterator = ElementTree.iterparse(resource, events) 587 588 root_started = False 589 nsmap_update = False 590 591 _root: ElementType = getattr(self, '_root', None) 592 593 try: 594 for event, node in tree_iterator: 595 if event == 'start': 596 if not root_started: 597 self._root = node 598 root_started = True 599 if nsmap_update and isinstance(nsmap, dict): 600 for prefix, uri in _nsmap: 601 self._update_nsmap(nsmap, prefix, uri) 602 nsmap_update = False 603 yield event, node 604 605 elif event == 'end': 606 yield event, node 607 elif nsmap is not None: 608 if event == 'start-ns': 609 _nsmap.append(node) 610 else: 611 _nsmap.pop() 612 nsmap_update = isinstance(nsmap, dict) 613 614 except Exception as err: 615 self._root = _root 616 if isinstance(err, PyElementTree.ParseError): 617 raise ElementTree.ParseError(str(err)) from None 618 raise 619 620 def _parse(self, resource: IO[AnyStr]) -> None: 621 if self._defuse == 'remote' and is_remote_url(self.base_url) \ 622 or self._defuse == 'always': 623 624 if not hasattr(resource, 'seekable') or not resource.seekable(): 625 text = resource.read() 626 if isinstance(text, str): 627 resource = StringIO(text) 628 else: 629 resource = BytesIO(text) 630 631 safe_parser = SafeXMLParser(target=PyElementTree.TreeBuilder()) 632 try: 633 for _ in PyElementTree.iterparse(resource, ('start',), safe_parser): 634 break 635 except PyElementTree.ParseError as err: 636 raise ElementTree.ParseError(str(err)) 637 else: 638 resource.seek(0) 639 640 elem: Optional[ElementType] = None 641 nsmap: List[Tuple[str, str]] = [] 642 nsmap_changed = False 643 namespaces = {} 644 events = 'start-ns', 'end-ns', 'end' 645 646 for event, node in ElementTree.iterparse(resource, events): 647 if event == 'end': 648 if nsmap_changed or elem is None: 649 namespaces[node] = nsmap[:] 650 nsmap_changed = False 651 else: 652 namespaces[node] = namespaces[elem] 653 elem = node 654 elif event == 'start-ns': 655 nsmap.append(node) 656 nsmap_changed = True 657 else: 658 nsmap.pop() 659 nsmap_changed = True 660 661 assert elem is not None 662 self._root = elem 663 self._nsmap = namespaces 664 665 def parse(self, source: XMLSourceType, lazy: Union[bool, int] = False) -> None: 666 if isinstance(lazy, bool): 667 pass 668 elif not isinstance(lazy, int): 669 msg = "invalid type {!r} for the attribute 'lazy'" 670 raise XMLSchemaTypeError(msg.format(type(lazy))) 671 elif lazy < 0: 672 msg = "invalid value {!r} for the attribute 'lazy'" 673 raise XMLSchemaValueError(msg.format(lazy)) 674 675 url: Optional[str] 676 if isinstance(source, (str, bytes)): 677 if is_url(source): 678 # source is a string containing an URL or a file path 679 if isinstance(source, str): 680 url = normalize_url(source) 681 else: 682 url = normalize_url(source.decode()) 683 self._access_control(url) 684 685 _url, self._url = self._url, url 686 try: 687 with urlopen(url, timeout=self._timeout) as resource: 688 if not lazy: 689 self._parse(resource) 690 else: 691 for _ in self._lazy_iterparse(resource): # pragma: no cover 692 break 693 except Exception: 694 self._url = _url 695 raise 696 else: 697 self._text = None 698 self._lazy = lazy 699 700 else: 701 # source is a string containing an XML document 702 _url, self._url = self._url, None 703 if isinstance(source, str): 704 resource = StringIO(source) 705 else: 706 resource = BytesIO(source) 707 708 try: 709 if not lazy: 710 self._parse(resource) 711 else: 712 for _ in self._lazy_iterparse(resource): # pragma: no cover 713 break 714 except Exception: 715 self._url = _url 716 raise 717 else: 718 if isinstance(source, str): 719 self._text = source 720 else: 721 self._text = source.decode() 722 self._lazy = False 723 724 elif isinstance(source, StringIO): 725 _url, self._url = self._url, None 726 try: 727 if not lazy: 728 self._parse(source) 729 else: 730 for _ in self._lazy_iterparse(source): # pragma: no cover 731 break 732 except Exception: 733 self._url = _url 734 raise 735 else: 736 self._text = source.getvalue() 737 self._lazy = lazy 738 739 elif hasattr(source, 'read'): 740 # source is a readable resource (remote or local file) 741 url = getattr(source, 'url', None) 742 if url is not None: 743 # Save remote urls for open new resources (non seekable) 744 if is_remote_url(url): 745 self._access_control(url) 746 else: 747 url = None 748 749 _url, self._url = self._url, url 750 try: 751 if not lazy: 752 self._parse(cast(IO[str], source)) 753 else: 754 for _ in self._lazy_iterparse(cast(IO[str], source)): # pragma: no cover 755 break 756 except Exception: 757 self._url = _url 758 raise 759 else: 760 self._text = None 761 self._lazy = lazy 762 763 else: 764 # Source is already an Element or an ElementTree. 765 if hasattr(source, 'tag') and hasattr(source, 'attrib'): 766 # Source is already an Element --> nothing to parse 767 self._root = cast(ElementType, source) 768 elif is_etree_document(source): 769 # Could be only an ElementTree object at last 770 self._root = cast(ElementTreeType, source).getroot() 771 else: 772 raise XMLSchemaTypeError( 773 "wrong type %r for 'source' attribute: an ElementTree object or " 774 "an Element instance or a string containing XML data or an URL " 775 "or a file-like object is required." % type(source) 776 ) 777 778 self._text = self._url = None 779 self._lazy = False 780 781 # TODO for Python 3.8+: need a Protocol for checking this with isinstance() 782 if not hasattr(self._root, 'nsmap'): 783 self._nsmap = None 784 else: 785 self._nsmap = {} 786 787 nsmap: Any = [] 788 lxml_nsmap = None 789 for elem in cast(Any, self._root.iter()): 790 if lxml_nsmap != elem.nsmap: 791 lxml_nsmap = elem.nsmap 792 nsmap = [(k or '', v) for k, v in elem.nsmap.items()] 793 self._nsmap[elem] = nsmap 794 795 self._parent_map = None 796 self._source = source 797 798 @property 799 def namespace(self) -> str: 800 """The namespace of the XML resource.""" 801 return '' if self._root is None else get_namespace(self._root.tag) 802 803 @property 804 def parent_map(self) -> Dict[ElementType, Optional[ElementType]]: 805 if self._lazy: 806 raise XMLResourceError("cannot create the parent map of a lazy resource") 807 if self._parent_map is None: 808 assert self._root is not None 809 self._parent_map = {child: elem for elem in self._root.iter() for child in elem} 810 self._parent_map[self._root] = None 811 return self._parent_map 812 813 def get_absolute_path(self, path: Optional[str] = None) -> str: 814 if path is None: 815 if self._lazy: 816 return '/%s/%s' % (self._root.tag, '/'.join('*' * int(self._lazy))) 817 return '/%s' % self._root.tag 818 elif path.startswith('/'): 819 return path 820 else: 821 return '/%s/%s' % (self._root.tag, path) 822 823 def get_text(self) -> str: 824 """ 825 Gets the source text of the XML document. If the source text is not 826 available creates an encoded string representation of the XML tree. 827 Il the resource is lazy raises a resource error. 828 """ 829 if self._text is not None: 830 return self._text 831 elif self._url is not None: 832 self.load() 833 if self._text is not None: 834 return self._text 835 836 return self.tostring(xml_declaration=True) 837 838 def tostring(self, indent: str = '', max_lines: Optional[int] = None, 839 spaces_for_tab: int = 4, xml_declaration: bool = False) -> str: 840 """Generates a string representation of the XML resource.""" 841 if self._lazy: 842 raise XMLResourceError("cannot serialize a lazy resource") 843 844 elem = self._root 845 namespaces = self.get_namespaces(root_only=False) 846 _string = etree_tostring(elem, namespaces, indent, max_lines, 847 spaces_for_tab, xml_declaration) 848 if isinstance(_string, bytes): 849 return _string.decode('utf-8') 850 return _string 851 852 def subresource(self, elem: ElementType) -> 'XMLResource': 853 """Create an XMLResource instance from a subelement of a non-lazy XML tree.""" 854 if self._lazy: 855 raise XMLResourceError("cannot create a subresource from a lazy resource") 856 857 for e in self._root.iter(): # pragma: no cover 858 if e is elem: 859 break 860 else: 861 msg = "{!r} is not an element or the XML resource tree" 862 raise XMLResourceError(msg.format(elem)) 863 864 resource = XMLResource(elem, self.base_url, self._allow, self._defuse, self._timeout) 865 if not hasattr(elem, 'nsmap') and self._nsmap is not None: 866 namespaces = {} 867 _nsmap = self._nsmap[elem] 868 _nsmap_initial_len = len(_nsmap) 869 nsmap = list(dict(_nsmap).items()) 870 871 for e in elem.iter(): 872 if _nsmap is not self._nsmap[e]: 873 _nsmap = self._nsmap[e] 874 nsmap = nsmap[:] 875 nsmap.extend(_nsmap[_nsmap_initial_len:]) 876 namespaces[e] = nsmap 877 878 resource._nsmap = namespaces 879 880 return resource 881 882 def open(self) -> IO[AnyStr]: 883 """ 884 Returns a opened resource reader object for the instance URL. If the 885 source attribute is a seekable file-like object rewind the source and 886 return it. 887 """ 888 if self.seek(0) == 0: 889 return cast(IO[AnyStr], self._source) 890 elif self._url is None: 891 raise XMLResourceError("can't open, the resource has no URL associated.") 892 893 try: 894 return cast(IO[AnyStr], urlopen(self._url, timeout=self._timeout)) 895 except URLError as err: 896 raise XMLResourceError( 897 "cannot access to resource %r: %s" % (self._url, err.reason) 898 ) 899 900 def seek(self, position: int) -> Optional[int]: 901 """ 902 Change stream position if the XML resource was created with a seekable 903 file-like object. In the other cases this method has no effect. 904 """ 905 if not hasattr(self._source, 'read'): 906 return None 907 908 try: 909 if not self._source.seekable(): # type: ignore[union-attr] 910 return None 911 except AttributeError: 912 return None # pragma: no cover 913 except ValueError as err: 914 raise XMLResourceError(str(err)) from None 915 else: 916 return self._source.seek(position) # type: ignore[union-attr] 917 918 def close(self) -> None: 919 """ 920 Close the XML resource if it's created with a file-like object. 921 In other cases this method has no effect. 922 """ 923 try: 924 self._source.close() # type: ignore[union-attr] 925 except (AttributeError, TypeError): 926 pass 927 928 def load(self) -> None: 929 """ 930 Loads the XML text from the data source. If the data source is an Element 931 the source XML text can't be retrieved. 932 """ 933 if self._url is None and not hasattr(self._source, 'read'): 934 return # Created from Element or text source --> already loaded 935 elif self._lazy: 936 raise XMLResourceError("cannot load a lazy resource") 937 938 resource = self.open() 939 try: 940 data = resource.read() 941 finally: 942 # We don't want to close the file obj if it wasn't originally 943 # opened by `XMLResource`. That is the concern of the code 944 # where the file obj came from. 945 if resource is not self._source: 946 resource.close() 947 948 if isinstance(data, bytes): 949 try: 950 text = data.decode('utf-8') 951 except UnicodeDecodeError: 952 text = data.decode('iso-8859-1') 953 else: 954 text = data 955 956 self._text = text 957 958 def is_lazy(self) -> bool: 959 """Returns `True` if the XML resource is lazy.""" 960 return bool(self._lazy) 961 962 def is_remote(self) -> bool: 963 """Returns `True` if the resource is related with remote XML data.""" 964 return is_remote_url(self._url) 965 966 def is_local(self) -> bool: 967 """Returns `True` if the resource is related with local XML data.""" 968 return is_local_url(self._url) 969 970 @property 971 def lazy_depth(self) -> int: 972 """ 973 The optimal depth for validate this resource. Is a positive 974 integer for lazy resources and 0 for fully loaded XML trees. 975 """ 976 return int(self._lazy) 977 978 def is_loaded(self) -> bool: 979 """Returns `True` if the XML text of the data source is loaded.""" 980 return self._text is not None 981 982 def iter(self, tag: Optional[str] = None, 983 nsmap: Optional[MutableMapping[str, str]] = None) -> Iterator[ElementType]: 984 """ 985 XML resource tree iterator. The iteration of a lazy resource 986 is in reverse order (top level element is the last). If tag 987 is not None or '*', only elements whose tag equals tag are 988 returned from the iterator. Provide a *nsmap* list for 989 tracking the namespaces of yielded elements. If *nsmap* is 990 a dictionary the tracking of namespaces is cumulative on 991 the whole tree, renaming prefixes in case of conflicts. 992 """ 993 if self._lazy: 994 resource = self.open() 995 tag = '*' if tag is None else tag.strip() 996 try: 997 for event, node in self._lazy_iterparse(resource, nsmap): 998 if event == 'end': 999 if tag == '*' or node.tag == tag: 1000 yield node 1001 node.clear() 1002 finally: 1003 # Close the resource only if it was originally opened by XMLResource 1004 if resource is not self._source: 1005 resource.close() 1006 1007 elif not self._nsmap or nsmap is None: 1008 yield from self._root.iter(tag) 1009 else: 1010 _nsmap = None 1011 for elem in self._root.iter(tag): 1012 try: 1013 if _nsmap is not self._nsmap[elem]: 1014 _nsmap = self._nsmap[elem] 1015 if isinstance(nsmap, list): 1016 nsmap.clear() 1017 nsmap.extend(_nsmap) 1018 else: 1019 for prefix, uri in _nsmap: 1020 self._update_nsmap(nsmap, prefix, uri) 1021 except KeyError: 1022 pass 1023 1024 yield elem 1025 1026 def iter_location_hints(self, tag: Optional[str] = None) -> Iterator[Tuple[str, str]]: 1027 """ 1028 Yields all schema location hints of the XML resource. If tag 1029 is not None or '*', only location hints of elements whose tag 1030 equals tag are returned from the iterator. 1031 """ 1032 for elem in self.iter(tag): 1033 yield from etree_iter_location_hints(elem) 1034 1035 def iter_depth(self, mode: int = 1, nsmap: Optional[NsmapType] = None, 1036 ancestors: Optional[List[ElementType]] = None) -> Iterator[ElementType]: 1037 """ 1038 Iterates XML subtrees. For fully loaded resources yields the root element. 1039 On lazy resources the argument *mode* can change the sequence and the 1040 completeness of yielded elements. There are four possible modes, that 1041 generate different sequences of elements:\n 1042 1. Only the elements at *depth_level* level of the tree\n 1043 2. Only a root element pruned at *depth_level*\n 1044 3. The elements at *depth_level* and then a pruned root\n 1045 4. An incomplete root at start, the elements at *depth_level* and a pruned root 1046 1047 :param mode: an integer in range [1..4] that defines the iteration mode. 1048 :param nsmap: provide a list/dict for tracking the namespaces of yielded \ 1049 elements. If a list is passed the tracking is done at element level, otherwise \ 1050 the tracking is on the whole tree, renaming prefixes in case of conflicts. 1051 :param ancestors: provide a list for tracking the ancestors of yielded elements. 1052 """ 1053 if ancestors is not None: 1054 ancestors.clear() 1055 1056 if not self._lazy: 1057 if nsmap is not None and self._nsmap: 1058 if isinstance(nsmap, list): 1059 nsmap.clear() 1060 nsmap.extend(self._nsmap[self._root]) 1061 else: 1062 for elem in self._root.iter(): 1063 for prefix, uri in self._nsmap[elem]: 1064 self._update_nsmap(nsmap, prefix, uri) 1065 1066 yield self._root 1067 return 1068 1069 if mode not in (1, 2, 3, 4): 1070 raise XMLSchemaValueError("invalid argument mode={!r}".format(mode)) 1071 1072 resource = self.open() 1073 level = 0 1074 subtree_level = int(self._lazy) 1075 1076 try: 1077 for event, node in self._lazy_iterparse(resource, nsmap): 1078 if event == "start": 1079 if not level: 1080 if mode == 4: 1081 yield node 1082 if ancestors is not None and level < subtree_level: 1083 ancestors.append(node) 1084 level += 1 1085 else: 1086 level -= 1 1087 if not level: 1088 if mode != 1: 1089 yield node 1090 elif level != subtree_level: 1091 if ancestors is not None and level < subtree_level: 1092 ancestors.pop() 1093 continue # pragma: no cover 1094 elif mode != 2: 1095 yield node 1096 1097 del node[:] # delete children, keep attributes, text and tail. 1098 finally: 1099 if self._source is not resource: 1100 resource.close() 1101 1102 def iterfind(self, path: str, 1103 namespaces: Optional[NamespacesType] = None, 1104 nsmap: Optional[NsmapType] = None, 1105 ancestors: Optional[List[ElementType]] = None) -> Iterator[ElementType]: 1106 """ 1107 Apply XPath selection to XML resource that yields full subtrees. 1108 1109 :param path: an XPath expression to select element nodes. 1110 :param namespaces: an optional mapping from namespace prefixes to URIs \ 1111 used for parsing the XPath expression. 1112 :param nsmap: provide a list/dict for tracking the namespaces of yielded \ 1113 elements. If a list is passed the tracking is done at element level, otherwise \ 1114 the tracking is on the whole tree, renaming prefixes in case of conflicts. 1115 :param ancestors: provide a list for tracking the ancestors of yielded elements. 1116 """ 1117 selector: Any 1118 1119 if self._lazy: 1120 selector = LazySelector(path, namespaces) 1121 path = path.replace(' ', '').replace('./', '') 1122 resource = self.open() 1123 level = 0 1124 select_all = '*' in path and set(path).issubset({'*', '/'}) 1125 if path == '.': 1126 subtree_level = 0 1127 elif path.startswith('/'): 1128 subtree_level = path.count('/') - 1 1129 else: 1130 subtree_level = path.count('/') + 1 1131 1132 try: 1133 for event, node in self._lazy_iterparse(resource, nsmap): 1134 if event == "start": 1135 if ancestors is not None and level < subtree_level: 1136 ancestors.append(node) 1137 level += 1 1138 else: 1139 level -= 1 1140 if not level: 1141 if subtree_level: 1142 pass 1143 elif select_all or node in selector.select(self._root): 1144 yield node 1145 elif not subtree_level: 1146 continue 1147 elif level != subtree_level: 1148 if ancestors is not None and level < subtree_level: 1149 ancestors.pop() 1150 continue # pragma: no cover 1151 elif select_all or node in selector.select(self._root): 1152 yield node 1153 1154 del node[:] # delete children, keep attributes, text and tail. 1155 1156 finally: 1157 if self._source is not resource: 1158 resource.close() 1159 1160 else: 1161 if ancestors is None: 1162 selector = iter_select 1163 else: 1164 parent_map = self.parent_map 1165 ancestors.clear() 1166 1167 def selector(*args: Any, **kwargs: Any) -> Iterator[Any]: 1168 assert ancestors is not None 1169 for e in iter_select(*args, **kwargs): 1170 if e is self._root: 1171 ancestors.clear() 1172 else: 1173 _ancestors = [] 1174 parent = e 1175 try: 1176 while True: 1177 parent = parent_map[parent] 1178 if parent is not None: 1179 _ancestors.append(parent) 1180 except KeyError: 1181 pass 1182 1183 if _ancestors: 1184 ancestors.clear() 1185 ancestors.extend(reversed(_ancestors)) 1186 1187 yield e 1188 1189 if not self._nsmap or nsmap is None: 1190 yield from selector(self._root, path, namespaces, strict=False) 1191 else: 1192 _nsmap = None 1193 for elem in selector(self._root, path, namespaces, strict=False): 1194 try: 1195 if _nsmap is not self._nsmap[elem]: 1196 _nsmap = self._nsmap[elem] 1197 if isinstance(nsmap, list): 1198 nsmap.clear() 1199 nsmap.extend(_nsmap) 1200 else: 1201 for prefix, uri in _nsmap: 1202 self._update_nsmap(nsmap, prefix, uri) 1203 except KeyError: 1204 pass 1205 1206 yield elem 1207 1208 def find(self, path: str, 1209 namespaces: Optional[NamespacesType] = None, 1210 nsmap: Optional[NsmapType] = None, 1211 ancestors: Optional[List[ElementType]] = None) -> Optional[ElementType]: 1212 return next(self.iterfind(path, namespaces, nsmap, ancestors), None) 1213 1214 def findall(self, path: str, namespaces: Optional[NamespacesType] = None) \ 1215 -> List[ElementType]: 1216 return list(self.iterfind(path, namespaces)) 1217 1218 def get_namespaces(self, namespaces: Optional[NamespacesType] = None, 1219 root_only: Optional[bool] = None) -> NamespacesType: 1220 """ 1221 Extracts namespaces with related prefixes from the XML resource. If a duplicate 1222 prefix declaration is encountered and the prefix maps a different namespace, 1223 adds the namespace using a different generated prefix. The empty prefix '' is 1224 used only if it's declared at root level to avoid erroneous mapping of local 1225 names. In other cases uses 'default' prefix as substitute. 1226 1227 :param namespaces: builds the namespace map starting over the dictionary provided. 1228 :param root_only: if `True`, or `None` and the resource is lazy, extracts \ 1229 only the namespaces declared in the root element. 1230 :return: a dictionary for mapping namespace prefixes to full URI. 1231 """ 1232 if root_only is None: 1233 root_only = bool(self._lazy) 1234 1235 if namespaces is None: 1236 namespaces = {} 1237 elif namespaces.get('xml', XML_NAMESPACE) != XML_NAMESPACE: 1238 msg = "reserved prefix (xml) must not be bound to another namespace name" 1239 raise XMLSchemaValueError(msg) 1240 else: 1241 namespaces = copy.copy(namespaces) 1242 1243 try: 1244 for _ in self.iter(nsmap=namespaces): 1245 if root_only: 1246 break 1247 except (ElementTree.ParseError, PyElementTree.ParseError, UnicodeEncodeError): 1248 pass 1249 1250 return namespaces 1251 1252 def get_locations(self, locations: Optional[LocationsType] = None, 1253 root_only: Optional[bool] = None) -> NormalizedLocationsType: 1254 """ 1255 Extracts a list of schema location hints from the XML resource. 1256 The locations are normalized using the base URL of the instance. 1257 1258 :param locations: a sequence of schema location hints inserted \ 1259 before the ones extracted from the XML resource. Locations passed \ 1260 within a tuple container are not normalized. 1261 :param root_only: if `True`, or if `None` and the resource is lazy, \ 1262 extracts the location hints of the root element only. 1263 :returns: a list of couples containing normalized location hints. 1264 """ 1265 if root_only is None: 1266 root_only = bool(self._lazy) 1267 1268 if not locations: 1269 location_hints = [] 1270 elif isinstance(locations, tuple): 1271 location_hints = [x for x in locations] 1272 else: 1273 location_hints = normalize_locations(locations, self.base_url) 1274 1275 if root_only: 1276 location_hints.extend([ 1277 (ns, normalize_url(url, self.base_url)) 1278 for ns, url in etree_iter_location_hints(self._root) 1279 ]) 1280 else: 1281 location_hints.extend([ 1282 (ns, normalize_url(url, self.base_url)) 1283 for ns, url in self.iter_location_hints() 1284 ]) 1285 return location_hints 1286