1#
2# Copyright (c), 2016-2020, SISSA (International School for Advanced Studies).
3# All rights reserved.
4# This file is distributed under the terms of the MIT License.
5# See the file 'LICENSE' in the root directory of the present
6# distribution, or http://opensource.org/licenses/MIT.
7#
8# @author Davide Brunato <brunato@sissa.it>
9#
10import copy
11import os.path
12import pathlib
13import platform
14import re
15import string
16from io import StringIO, BytesIO
17from typing import cast, Any, AnyStr, Dict, Optional, IO, Iterator, List, \
18    MutableMapping, Union, Tuple
19from urllib.request import urlopen
20from urllib.parse import urlsplit, urlunsplit, unquote, quote_from_bytes
21from urllib.error import URLError
22
23from elementpath import iter_select, XPathContext, XPath2Parser
24from elementpath.protocols import ElementProtocol
25
26from .exceptions import XMLSchemaTypeError, XMLSchemaValueError, XMLResourceError
27from .names import XML_NAMESPACE
28from .etree import ElementTree, PyElementTree, SafeXMLParser, etree_tostring
29from .aliases import ElementType, ElementTreeType, NamespacesType, XMLSourceType, \
30    NormalizedLocationsType, LocationsType, NsmapType, ParentMapType
31from .helpers import get_namespace, is_etree_element, is_etree_document, \
32    etree_iter_location_hints
33
34DEFUSE_MODES = frozenset(('never', 'remote', 'always'))
35SECURITY_MODES = frozenset(('all', 'remote', 'local', 'sandbox'))
36
37###
38# Restricted XPath parser for XML resources
39LAZY_XML_XPATH_SYMBOLS = frozenset((
40    'position', 'last', 'not', 'and', 'or', '!=', '<=', '>=', '(', ')', 'text',
41    '[', ']', '.', ',', '/', '|', '*', '=', '<', '>', ':', '@', '(end)',
42    '(unknown)', '(invalid)', '(name)', '(string)', '(float)', '(decimal)',
43    '(integer)'
44))
45
46DRIVE_LETTERS = frozenset(string.ascii_letters)
47
48
49class LazyXPath2Parser(XPath2Parser):
50    symbol_table = {
51        k: v for k, v in XPath2Parser.symbol_table.items()  # type: ignore[misc]
52        if k in LAZY_XML_XPATH_SYMBOLS
53    }
54    SYMBOLS = LAZY_XML_XPATH_SYMBOLS
55
56
57class LazySelector:
58    """A limited XPath selector class for lazy XML resources."""
59
60    def __init__(self, path: str, namespaces: Optional[NamespacesType] = None) -> None:
61        self.parser = LazyXPath2Parser(namespaces, strict=False)
62        self.path = path
63        self.root_token = self.parser.parse(path)
64
65    def __repr__(self) -> str:
66        return '%s(path=%r)' % (self.__class__.__name__, self.path)
67
68    def select(self, root: ElementProtocol, **kwargs: Any) -> List[ElementProtocol]:
69        context = XPathContext(root, **kwargs)
70        results = self.root_token.get_results(context)
71        if not isinstance(results, list) or any(not is_etree_element(x) for x in results):
72            msg = "XPath expressions on lazy resources can select only elements"
73            raise XMLResourceError(msg)
74        return results
75
76    def iter_select(self, root: ElementProtocol, **kwargs: Any) -> Iterator[ElementProtocol]:
77        context = XPathContext(root, **kwargs)
78        for elem in self.root_token.select_results(context):
79            if not is_etree_element(elem):
80                msg = "XPath expressions on lazy resources can select only elements"
81                raise XMLResourceError(msg)
82            yield cast(ElementProtocol, elem)
83
84
85###
86# URL normalization (that fixes many headaches :)
87class _PurePath(pathlib.PurePath):
88    """
89    A version of pathlib.PurePath adapted for managing the creation
90    from URIs and the simple normalization of paths.
91    """
92    _from_parts: Any
93    _flavour: Any
94
95    def __new__(cls, *args: str) -> '_PurePath':
96        if cls is _PurePath:
97            cls = _WindowsPurePath if os.name == 'nt' else _PosixPurePath
98        return cast('_PurePath', cls._from_parts(args))
99
100    @classmethod
101    def from_uri(cls, uri: str) -> '_PurePath':
102        uri = uri.strip()
103        if not uri:
104            raise XMLSchemaValueError("Empty URI provided!")
105
106        if uri.startswith(r'\\'):
107            return _WindowsPurePath(uri)  # UNC path
108        elif uri.startswith('/'):
109            return cls(uri)
110
111        parts = urlsplit(uri)
112        if not parts.scheme:
113            return cls(uri)
114        elif parts.scheme in DRIVE_LETTERS and len(parts.scheme) == 1:
115            return _WindowsPurePath(uri)  # Eg. k:/Python/lib/....
116        elif parts.scheme != 'file':
117            return _PosixPurePath(unquote(parts.path))
118
119        # Get file URI path because urlsplit does not parse it well
120        start = 7 if uri.startswith('file:///') else 5
121        if parts.query:
122            path = uri[start:uri.index('?')]
123        elif parts.fragment:
124            path = uri[start:uri.index('#')]
125        else:
126            path = uri[start:]
127
128        if ':' in path:
129            # Windows path with a drive
130            pos = path.index(':')
131            if pos == 2 and path[0] == '/' and path[1] in DRIVE_LETTERS:
132                return _WindowsPurePath(unquote(path[1:]))
133
134            obj = _WindowsPurePath(unquote(path))
135            if len(obj.drive) != 2 or obj.drive[1] != ':':
136                raise XMLSchemaValueError("Invalid URI {!r}".format(uri))
137            return obj
138
139        if '\\' in path:
140            return _WindowsPurePath(unquote(path))
141        return cls(unquote(path))
142
143    def as_uri(self) -> str:
144        if not self.is_absolute():
145            uri: str = self._flavour.make_uri(self)
146            while uri.startswith('file:/'):
147                uri = uri.replace('file:/', 'file:', 1)
148            return uri
149
150        uri = cast(str, self._flavour.make_uri(self))
151        if isinstance(self, _WindowsPurePath) and str(self).startswith(r'\\'):
152            # UNC format case: use the format where the host part is included
153            # in the path part, to let urlopen() works.
154            if not uri.startswith('file:////'):
155                return uri.replace('file://', 'file:////')
156        return uri
157
158    def normalize(self) -> '_PurePath':
159        normalized_path = self._flavour.pathmod.normpath(str(self))
160        return cast('_PurePath', self._from_parts((normalized_path,)))
161
162
163class _PosixPurePath(_PurePath, pathlib.PurePosixPath):
164    __slots__ = ()
165
166
167class _WindowsPurePath(_PurePath, pathlib.PureWindowsPath):
168    __slots__ = ()
169
170
171def normalize_url(url: str, base_url: Optional[str] = None,
172                  keep_relative: bool = False) -> str:
173    """
174    Returns a normalized URL eventually joining it to a base URL if it's a relative path.
175    Path names are converted to 'file' scheme URLs.
176
177    :param url: a relative or absolute URL.
178    :param base_url: a reference base URL.
179    :param keep_relative: if set to `True` keeps relative file paths, which would \
180    not strictly conformant to specification (RFC 8089), because *urlopen()* doesn't \
181    accept a simple pathname.
182    :return: a normalized URL string.
183    """
184    url_parts = urlsplit(url)
185    if not is_local_scheme(url_parts.scheme):
186        return url_parts.geturl()
187
188    path = _PurePath.from_uri(url)
189    if path.is_absolute():
190        return path.normalize().as_uri()
191
192    if base_url is not None:
193        base_url_parts = urlsplit(base_url)
194        base_path = _PurePath.from_uri(base_url)
195        if is_local_scheme(base_url_parts.scheme):
196            path = base_path.joinpath(path)
197        elif not url_parts.scheme:
198            path = base_path.joinpath(path).normalize()
199            return urlunsplit((
200                base_url_parts.scheme,
201                base_url_parts.netloc,
202                quote_from_bytes(bytes(path)),
203                url_parts.query,
204                url_parts.fragment
205            ))
206
207    if path.is_absolute() or keep_relative:
208        return path.normalize().as_uri()
209
210    base_path = _PurePath(os.getcwd())
211    return base_path.joinpath(path).normalize().as_uri()
212
213
214###
215# Internal helper functions
216
217def is_url(obj: Any) -> bool:
218    """
219    Checks if and object can be an URL, restricting to strings that cannot be XML data.
220    """
221    if isinstance(obj, str):
222        if '\n' in obj or obj.lstrip().startswith('<'):
223            return False
224    elif isinstance(obj, bytes):
225        if b'\n' in obj or obj.lstrip().startswith(b'<'):
226            return False
227    else:
228        return False
229
230    try:
231        urlsplit(obj.strip())
232    except ValueError:
233        return False
234    else:
235        return True
236
237
238def is_local_scheme(scheme: str) -> bool:
239    return not scheme or scheme == 'file' or scheme in DRIVE_LETTERS
240
241
242def is_remote_url(url: Any) -> bool:
243    return is_url(url) and not is_local_scheme(urlsplit(url.strip()).scheme)
244
245
246def is_local_url(url: Any) -> bool:
247    return is_url(url) and is_local_scheme(urlsplit(url.strip()).scheme)
248
249
250def url_path_is_file(url: str) -> bool:
251    if not is_local_url(url):
252        return False
253    if os.path.isfile(url):
254        return True
255    path = unquote(urlsplit(normalize_url(url)).path)
256    if path.startswith('/') and platform.system() == 'Windows':
257        path = path[1:]
258    return os.path.isfile(path)
259
260
261###
262# API for XML resources
263
264def normalize_locations(locations: LocationsType,
265                        base_url: Optional[str] = None,
266                        keep_relative: bool = False) -> NormalizedLocationsType:
267    """
268    Returns a list of normalized locations. The locations are normalized using
269    the base URL of the instance.
270
271    :param locations: a dictionary or a list of couples containing namespace location hints.
272    :param base_url: the reference base URL for construct the normalized URL from the argument.
273    :param keep_relative: if set to `True` keeps relative file paths, which would not strictly \
274    conformant to URL format specification.
275    :return: a list of couples containing normalized namespace location hints.
276    """
277    normalized_locations = []
278    if isinstance(locations, dict):
279        for ns, value in locations.items():
280            if isinstance(value, list):
281                normalized_locations.extend(
282                    [(ns, normalize_url(url, base_url, keep_relative)) for url in value]
283                )
284            else:
285                normalized_locations.append((ns, normalize_url(value, base_url, keep_relative)))
286    else:
287        normalized_locations.extend(
288            [(ns, normalize_url(url, base_url, keep_relative)) for ns, url in locations]
289        )
290    return normalized_locations
291
292
293def fetch_resource(location: str, base_url: Optional[str] = None, timeout: int = 30) -> str:
294    """
295    Fetch a resource by trying to access it. If the resource is accessible
296    returns its URL, otherwise raises an :class:`XMLResourceError`.
297
298    :param location: an URL or a file path.
299    :param base_url: reference base URL for normalizing local and relative URLs.
300    :param timeout: the timeout in seconds for the connection attempt in case of remote data.
301    :return: a normalized URL.
302    """
303    if not location:
304        raise XMLSchemaValueError("'location' argument must contain a not empty string")
305
306    url = normalize_url(location, base_url)
307    try:
308        with urlopen(url, timeout=timeout):
309            return url
310    except URLError as err:
311        # fallback joining the path without a base URL
312        alt_url = normalize_url(location)
313        if url == alt_url:
314            raise XMLResourceError("cannot access to resource %r: %s" % (url, err.reason))
315
316        try:
317            with urlopen(alt_url, timeout=timeout):
318                return alt_url
319        except URLError:
320            raise XMLResourceError("cannot access to resource %r: %s" % (url, err.reason))
321
322
323def fetch_schema_locations(source: Union['XMLResource', XMLSourceType],
324                           locations: Optional[LocationsType] = None,
325                           base_url: Optional[str] = None,
326                           allow: str = 'all',
327                           defuse: str = 'remote',
328                           timeout: int = 30) -> Tuple[str, NormalizedLocationsType]:
329    """
330    Fetches schema location hints from an XML data source and a list of location hints.
331    If an accessible schema location is not found raises a ValueError.
332
333    :param source: can be an :class:`XMLResource` instance, a file-like object a path \
334    to a file or an URI of a resource or an Element instance or an ElementTree instance or \
335    a string containing the XML data. If the passed argument is not an :class:`XMLResource` \
336    instance a new one is built using this and *defuse*, *timeout* and *lazy* arguments.
337    :param locations: a dictionary or dictionary items with additional schema location hints.
338    :param base_url: the same argument of the :class:`XMLResource`.
339    :param allow: the same argument of the :class:`XMLResource`.
340    :param defuse: the same argument of the :class:`XMLResource`.
341    :param timeout: the same argument of the :class:`XMLResource` but with a reduced default.
342    :return: A 2-tuple with the URL referring to the first reachable schema resource \
343    and a list of dictionary items with normalized location hints.
344    """
345    if not isinstance(source, XMLResource):
346        resource = XMLResource(source, base_url, allow, defuse, timeout, lazy=True)
347    else:
348        resource = source
349
350    base_url = resource.base_url
351    namespace = resource.namespace
352    locations = resource.get_locations(locations, root_only=False)
353    if not locations:
354        msg = "{!r} does not contain any schema location hint"
355        raise XMLSchemaValueError(msg.format(source))
356
357    for ns, url in sorted(locations, key=lambda x: x[0] != namespace):
358        try:
359            return fetch_resource(url, base_url, timeout), locations
360        except XMLResourceError:
361            pass
362
363    raise XMLSchemaValueError("not found a schema for XML data resource {!r}.".format(source))
364
365
366def fetch_schema(source: Union['XMLResource', XMLSourceType],
367                 locations: Optional[LocationsType] = None,
368                 base_url: Optional[str] = None,
369                 allow: str = 'all',
370                 defuse: str = 'remote',
371                 timeout: int = 30) -> str:
372    """
373    Like :meth:`fetch_schema_locations` but returns only a reachable
374    location hint for a schema related to the source's namespace.
375    """
376    return fetch_schema_locations(source, locations, base_url, allow, defuse, timeout)[0]
377
378
379def fetch_namespaces(source: XMLSourceType,
380                     base_url: Optional[str] = None,
381                     allow: str = 'all',
382                     defuse: str = 'remote',
383                     timeout: int = 30) -> NamespacesType:
384    """
385    Fetches namespaces information from the XML data source. The argument *source*
386    can be a string containing the XML document or file path or an url or a file-like
387    object or an ElementTree instance or an Element instance. A dictionary with
388    namespace mappings is returned.
389    """
390    resource = XMLResource(source, base_url, allow, defuse, timeout, lazy=True)
391    return resource.get_namespaces(root_only=False)
392
393
394class XMLResource:
395    """
396    XML resource reader based on ElementTree and urllib.
397
398    :param source: a string containing the XML document or file path or an URL or a \
399    file like object or an ElementTree or an Element.
400    :param base_url: is an optional base URL, used for the normalization of relative paths \
401    when the URL of the resource can't be obtained from the source argument. For security \
402    access to a local file resource is always denied if the *base_url* is a remote URL.
403    :param allow: defines the security mode for accessing resource locations. Can be \
404    'all', 'remote', 'local' or 'sandbox'. Default is 'all' that means all types of \
405    URLs are allowed. With 'remote' only remote resource URLs are allowed. With 'local' \
406    only file paths and URLs are allowed. With 'sandbox' only file paths and URLs that \
407    are under the directory path identified by the *base_url* argument are allowed.
408    :param defuse: defines when to defuse XML data using a `SafeXMLParser`. Can be \
409    'always', 'remote' or 'never'. For default defuses only remote XML data.
410    :param timeout: the timeout in seconds for the connection attempt in case of remote data.
411    :param lazy: if a value `False` or 0 is provided the XML data is fully loaded into and \
412    processed from memory. For default only the root element of the source is loaded, \
413    except in case the *source* argument is an Element or an ElementTree instance. A \
414    positive integer also defines the depth at which the lazy resource can be better \
415    iterated (`True` means 1).
416    """
417    # Protected attributes for data and resource location
418    _source: XMLSourceType
419    _root: ElementType
420    _text: Optional[str] = None
421    _url: Optional[str] = None
422    _nsmap: Optional[Dict[ElementType, List[Tuple[str, str]]]] = None
423    _parent_map: Optional[ParentMapType] = None
424    _lazy: Union[bool, int] = False
425
426    def __init__(self, source: XMLSourceType,
427                 base_url: Optional[str] = None,
428                 allow: str = 'all',
429                 defuse: str = 'remote',
430                 timeout: int = 300,
431                 lazy: Union[bool, int] = False) -> None:
432
433        if base_url is not None and not isinstance(base_url, str):
434            msg = "invalid type {!r} for the attribute 'base_url'"
435            raise XMLSchemaTypeError(msg.format(type(base_url)))
436        self._base_url = base_url
437
438        if not isinstance(allow, str):
439            msg = "invalid type {!r} for the attribute 'allow'"
440            raise XMLSchemaTypeError(msg.format(type(allow)))
441        elif allow not in SECURITY_MODES:
442            msg = "'allow' attribute: {!r} is not a security mode"
443            raise XMLSchemaValueError(msg.format(allow))
444        elif allow == 'sandbox' and self._base_url is None:
445            msg = "block access to files out of sandbox requires 'base_url' to be set"
446            raise XMLResourceError(msg)
447        self._allow = allow
448
449        if not isinstance(defuse, str):
450            msg = "invalid type {!r} for the attribute 'defuse'"
451            raise XMLSchemaTypeError(msg.format(type(defuse)))
452        elif defuse not in DEFUSE_MODES:
453            msg = "'defuse' attribute: {!r} is not a defuse mode"
454            raise XMLSchemaValueError(msg.format(defuse))
455        self._defuse = defuse
456
457        if not isinstance(timeout, int):
458            msg = "invalid type {!r} for the attribute 'timeout'"
459            raise XMLSchemaTypeError(msg.format(type(timeout)))
460        elif timeout <= 0:
461            msg = "the attribute 'timeout' must be a positive integer"
462            raise XMLSchemaValueError(msg)
463        self._timeout = timeout
464
465        self.parse(source, lazy)
466
467    def __repr__(self) -> str:
468        return '%s(root=%r)' % (self.__class__.__name__, self._root)
469
470    @property
471    def source(self) -> XMLSourceType:
472        """The XML data source."""
473        return self._source
474
475    @property
476    def root(self) -> ElementType:
477        """The XML tree root Element."""
478        return self._root
479
480    @property
481    def text(self) -> Optional[str]:
482        """The XML text source, `None` if it's not available."""
483        return self._text
484
485    @property
486    def name(self) -> Optional[str]:
487        """
488        The source name, is `None` if the instance is created from an Element or a string.
489        """
490        return None if self._url is None else os.path.basename(self._url)
491
492    @property
493    def url(self) -> Optional[str]:
494        """
495        The source URL, `None` if the instance is created from an Element or a string.
496        """
497        return self._url
498
499    @property
500    def base_url(self) -> Optional[str]:
501        """The effective base URL used for completing relative locations."""
502        return os.path.dirname(self._url) if self._url else self._base_url
503
504    @property
505    def filepath(self) -> Optional[str]:
506        """
507        The resource filepath if the instance is created from a local file, `None` otherwise.
508        """
509        if self._url:
510            url_parts = urlsplit(self._url)
511            if url_parts.scheme in ('', 'file'):
512                return url_parts.path
513        return None
514
515    @property
516    def allow(self) -> str:
517        """The security mode for accessing resource locations."""
518        return self._allow
519
520    @property
521    def defuse(self) -> str:
522        """When to defuse XML data."""
523        return self._defuse
524
525    @property
526    def timeout(self) -> int:
527        """The timeout in seconds for accessing remote resources."""
528        return self._timeout
529
530    def _access_control(self, url: str) -> None:
531        if self._allow == 'all':
532            return
533        elif self._allow == 'remote':
534            if is_local_url(url):
535                raise XMLResourceError("block access to local resource {}".format(url))
536        elif is_remote_url(url):
537            raise XMLResourceError("block access to remote resource {}".format(url))
538        elif self._allow == 'sandbox' and self._base_url is not None:
539            if not url.startswith(normalize_url(self._base_url)):
540                raise XMLResourceError("block access to out of sandbox file {}".format(url))
541
542    def _update_nsmap(self, nsmap: MutableMapping[str, str], prefix: str, uri: str) -> None:
543        if not prefix:
544            if not uri:
545                return
546            elif '' not in nsmap:
547                if self.namespace:
548                    nsmap[prefix] = uri
549                    return
550            elif nsmap[''] == uri:
551                return
552            prefix = 'default'
553
554        while prefix in nsmap:
555            if nsmap[prefix] == uri:
556                return
557            match = re.search(r'(\d+)$', prefix)
558            if match:
559                index = int(match.group()) + 1
560                prefix = prefix[:match.span()[0]] + str(index)
561            else:
562                prefix += '0'
563        nsmap[prefix] = uri
564
565    def _lazy_iterparse(self, resource: IO[AnyStr], nsmap: Optional[NsmapType] = None) \
566            -> Iterator[Tuple[str, ElementType]]:
567        events: Tuple[str, ...]
568        _nsmap: List[Tuple[str, str]]
569
570        if nsmap is None:
571            events = 'start', 'end'
572            _nsmap = []
573        else:
574            events = 'start-ns', 'end-ns', 'start', 'end'
575            if isinstance(nsmap, list):
576                _nsmap = nsmap
577                _nsmap.clear()
578            else:
579                _nsmap = []
580
581        if self._defuse == 'remote' and is_remote_url(self.base_url) \
582                or self._defuse == 'always':
583            safe_parser = SafeXMLParser(target=PyElementTree.TreeBuilder())
584            tree_iterator = PyElementTree.iterparse(resource, events, safe_parser)
585        else:
586            tree_iterator = ElementTree.iterparse(resource, events)
587
588        root_started = False
589        nsmap_update = False
590
591        _root: ElementType = getattr(self, '_root', None)
592
593        try:
594            for event, node in tree_iterator:
595                if event == 'start':
596                    if not root_started:
597                        self._root = node
598                        root_started = True
599                    if nsmap_update and isinstance(nsmap, dict):
600                        for prefix, uri in _nsmap:
601                            self._update_nsmap(nsmap, prefix, uri)
602                        nsmap_update = False
603                    yield event, node
604
605                elif event == 'end':
606                    yield event, node
607                elif nsmap is not None:
608                    if event == 'start-ns':
609                        _nsmap.append(node)
610                    else:
611                        _nsmap.pop()
612                    nsmap_update = isinstance(nsmap, dict)
613
614        except Exception as err:
615            self._root = _root
616            if isinstance(err, PyElementTree.ParseError):
617                raise ElementTree.ParseError(str(err)) from None
618            raise
619
620    def _parse(self, resource: IO[AnyStr]) -> None:
621        if self._defuse == 'remote' and is_remote_url(self.base_url) \
622                or self._defuse == 'always':
623
624            if not hasattr(resource, 'seekable') or not resource.seekable():
625                text = resource.read()
626                if isinstance(text, str):
627                    resource = StringIO(text)
628                else:
629                    resource = BytesIO(text)
630
631            safe_parser = SafeXMLParser(target=PyElementTree.TreeBuilder())
632            try:
633                for _ in PyElementTree.iterparse(resource, ('start',), safe_parser):
634                    break
635            except PyElementTree.ParseError as err:
636                raise ElementTree.ParseError(str(err))
637            else:
638                resource.seek(0)
639
640        elem: Optional[ElementType] = None
641        nsmap: List[Tuple[str, str]] = []
642        nsmap_changed = False
643        namespaces = {}
644        events = 'start-ns', 'end-ns', 'end'
645
646        for event, node in ElementTree.iterparse(resource, events):
647            if event == 'end':
648                if nsmap_changed or elem is None:
649                    namespaces[node] = nsmap[:]
650                    nsmap_changed = False
651                else:
652                    namespaces[node] = namespaces[elem]
653                elem = node
654            elif event == 'start-ns':
655                nsmap.append(node)
656                nsmap_changed = True
657            else:
658                nsmap.pop()
659                nsmap_changed = True
660
661        assert elem is not None
662        self._root = elem
663        self._nsmap = namespaces
664
665    def parse(self, source: XMLSourceType, lazy: Union[bool, int] = False) -> None:
666        if isinstance(lazy, bool):
667            pass
668        elif not isinstance(lazy, int):
669            msg = "invalid type {!r} for the attribute 'lazy'"
670            raise XMLSchemaTypeError(msg.format(type(lazy)))
671        elif lazy < 0:
672            msg = "invalid value {!r} for the attribute 'lazy'"
673            raise XMLSchemaValueError(msg.format(lazy))
674
675        url: Optional[str]
676        if isinstance(source, (str, bytes)):
677            if is_url(source):
678                # source is a string containing an URL or a file path
679                if isinstance(source, str):
680                    url = normalize_url(source)
681                else:
682                    url = normalize_url(source.decode())
683                self._access_control(url)
684
685                _url, self._url = self._url, url
686                try:
687                    with urlopen(url, timeout=self._timeout) as resource:
688                        if not lazy:
689                            self._parse(resource)
690                        else:
691                            for _ in self._lazy_iterparse(resource):  # pragma: no cover
692                                break
693                except Exception:
694                    self._url = _url
695                    raise
696                else:
697                    self._text = None
698                    self._lazy = lazy
699
700            else:
701                # source is a string containing an XML document
702                _url, self._url = self._url, None
703                if isinstance(source, str):
704                    resource = StringIO(source)
705                else:
706                    resource = BytesIO(source)
707
708                try:
709                    if not lazy:
710                        self._parse(resource)
711                    else:
712                        for _ in self._lazy_iterparse(resource):  # pragma: no cover
713                            break
714                except Exception:
715                    self._url = _url
716                    raise
717                else:
718                    if isinstance(source, str):
719                        self._text = source
720                    else:
721                        self._text = source.decode()
722                    self._lazy = False
723
724        elif isinstance(source, StringIO):
725            _url, self._url = self._url, None
726            try:
727                if not lazy:
728                    self._parse(source)
729                else:
730                    for _ in self._lazy_iterparse(source):  # pragma: no cover
731                        break
732            except Exception:
733                self._url = _url
734                raise
735            else:
736                self._text = source.getvalue()
737                self._lazy = lazy
738
739        elif hasattr(source, 'read'):
740            # source is a readable resource (remote or local file)
741            url = getattr(source, 'url', None)
742            if url is not None:
743                # Save remote urls for open new resources (non seekable)
744                if is_remote_url(url):
745                    self._access_control(url)
746                else:
747                    url = None
748
749            _url, self._url = self._url, url
750            try:
751                if not lazy:
752                    self._parse(cast(IO[str], source))
753                else:
754                    for _ in self._lazy_iterparse(cast(IO[str], source)):  # pragma: no cover
755                        break
756            except Exception:
757                self._url = _url
758                raise
759            else:
760                self._text = None
761                self._lazy = lazy
762
763        else:
764            # Source is already an Element or an ElementTree.
765            if hasattr(source, 'tag') and hasattr(source, 'attrib'):
766                # Source is already an Element --> nothing to parse
767                self._root = cast(ElementType, source)
768            elif is_etree_document(source):
769                # Could be only an ElementTree object at last
770                self._root = cast(ElementTreeType, source).getroot()
771            else:
772                raise XMLSchemaTypeError(
773                    "wrong type %r for 'source' attribute: an ElementTree object or "
774                    "an Element instance or a string containing XML data or an URL "
775                    "or a file-like object is required." % type(source)
776                )
777
778            self._text = self._url = None
779            self._lazy = False
780
781            # TODO for Python 3.8+: need a Protocol for checking this with isinstance()
782            if not hasattr(self._root, 'nsmap'):
783                self._nsmap = None
784            else:
785                self._nsmap = {}
786
787                nsmap: Any = []
788                lxml_nsmap = None
789                for elem in cast(Any, self._root.iter()):
790                    if lxml_nsmap != elem.nsmap:
791                        lxml_nsmap = elem.nsmap
792                        nsmap = [(k or '', v) for k, v in elem.nsmap.items()]
793                    self._nsmap[elem] = nsmap
794
795        self._parent_map = None
796        self._source = source
797
798    @property
799    def namespace(self) -> str:
800        """The namespace of the XML resource."""
801        return '' if self._root is None else get_namespace(self._root.tag)
802
803    @property
804    def parent_map(self) -> Dict[ElementType, Optional[ElementType]]:
805        if self._lazy:
806            raise XMLResourceError("cannot create the parent map of a lazy resource")
807        if self._parent_map is None:
808            assert self._root is not None
809            self._parent_map = {child: elem for elem in self._root.iter() for child in elem}
810            self._parent_map[self._root] = None
811        return self._parent_map
812
813    def get_absolute_path(self, path: Optional[str] = None) -> str:
814        if path is None:
815            if self._lazy:
816                return '/%s/%s' % (self._root.tag, '/'.join('*' * int(self._lazy)))
817            return '/%s' % self._root.tag
818        elif path.startswith('/'):
819            return path
820        else:
821            return '/%s/%s' % (self._root.tag, path)
822
823    def get_text(self) -> str:
824        """
825        Gets the source text of the XML document. If the source text is not
826        available creates an encoded string representation of the XML tree.
827        Il the resource is lazy raises a resource error.
828        """
829        if self._text is not None:
830            return self._text
831        elif self._url is not None:
832            self.load()
833            if self._text is not None:
834                return self._text
835
836        return self.tostring(xml_declaration=True)
837
838    def tostring(self, indent: str = '', max_lines: Optional[int] = None,
839                 spaces_for_tab: int = 4, xml_declaration: bool = False) -> str:
840        """Generates a string representation of the XML resource."""
841        if self._lazy:
842            raise XMLResourceError("cannot serialize a lazy resource")
843
844        elem = self._root
845        namespaces = self.get_namespaces(root_only=False)
846        _string = etree_tostring(elem, namespaces, indent, max_lines,
847                                 spaces_for_tab, xml_declaration)
848        if isinstance(_string, bytes):
849            return _string.decode('utf-8')
850        return _string
851
852    def subresource(self, elem: ElementType) -> 'XMLResource':
853        """Create an XMLResource instance from a subelement of a non-lazy XML tree."""
854        if self._lazy:
855            raise XMLResourceError("cannot create a subresource from a lazy resource")
856
857        for e in self._root.iter():  # pragma: no cover
858            if e is elem:
859                break
860        else:
861            msg = "{!r} is not an element or the XML resource tree"
862            raise XMLResourceError(msg.format(elem))
863
864        resource = XMLResource(elem, self.base_url, self._allow, self._defuse, self._timeout)
865        if not hasattr(elem, 'nsmap') and self._nsmap is not None:
866            namespaces = {}
867            _nsmap = self._nsmap[elem]
868            _nsmap_initial_len = len(_nsmap)
869            nsmap = list(dict(_nsmap).items())
870
871            for e in elem.iter():
872                if _nsmap is not self._nsmap[e]:
873                    _nsmap = self._nsmap[e]
874                    nsmap = nsmap[:]
875                    nsmap.extend(_nsmap[_nsmap_initial_len:])
876                namespaces[e] = nsmap
877
878            resource._nsmap = namespaces
879
880        return resource
881
882    def open(self) -> IO[AnyStr]:
883        """
884        Returns a opened resource reader object for the instance URL. If the
885        source attribute is a seekable file-like object rewind the source and
886        return it.
887        """
888        if self.seek(0) == 0:
889            return cast(IO[AnyStr], self._source)
890        elif self._url is None:
891            raise XMLResourceError("can't open, the resource has no URL associated.")
892
893        try:
894            return cast(IO[AnyStr], urlopen(self._url, timeout=self._timeout))
895        except URLError as err:
896            raise XMLResourceError(
897                "cannot access to resource %r: %s" % (self._url, err.reason)
898            )
899
900    def seek(self, position: int) -> Optional[int]:
901        """
902        Change stream position if the XML resource was created with a seekable
903        file-like object. In the other cases this method has no effect.
904        """
905        if not hasattr(self._source, 'read'):
906            return None
907
908        try:
909            if not self._source.seekable():  # type: ignore[union-attr]
910                return None
911        except AttributeError:
912            return None  # pragma: no cover
913        except ValueError as err:
914            raise XMLResourceError(str(err)) from None
915        else:
916            return self._source.seek(position)  # type: ignore[union-attr]
917
918    def close(self) -> None:
919        """
920        Close the XML resource if it's created with a file-like object.
921        In other cases this method has no effect.
922        """
923        try:
924            self._source.close()  # type: ignore[union-attr]
925        except (AttributeError, TypeError):
926            pass
927
928    def load(self) -> None:
929        """
930        Loads the XML text from the data source. If the data source is an Element
931        the source XML text can't be retrieved.
932        """
933        if self._url is None and not hasattr(self._source, 'read'):
934            return  # Created from Element or text source --> already loaded
935        elif self._lazy:
936            raise XMLResourceError("cannot load a lazy resource")
937
938        resource = self.open()
939        try:
940            data = resource.read()
941        finally:
942            # We don't want to close the file obj if it wasn't originally
943            # opened by `XMLResource`. That is the concern of the code
944            # where the file obj came from.
945            if resource is not self._source:
946                resource.close()
947
948        if isinstance(data, bytes):
949            try:
950                text = data.decode('utf-8')
951            except UnicodeDecodeError:
952                text = data.decode('iso-8859-1')
953        else:
954            text = data
955
956        self._text = text
957
958    def is_lazy(self) -> bool:
959        """Returns `True` if the XML resource is lazy."""
960        return bool(self._lazy)
961
962    def is_remote(self) -> bool:
963        """Returns `True` if the resource is related with remote XML data."""
964        return is_remote_url(self._url)
965
966    def is_local(self) -> bool:
967        """Returns `True` if the resource is related with local XML data."""
968        return is_local_url(self._url)
969
970    @property
971    def lazy_depth(self) -> int:
972        """
973        The optimal depth for validate this resource. Is a positive
974        integer for lazy resources and 0 for fully loaded XML trees.
975        """
976        return int(self._lazy)
977
978    def is_loaded(self) -> bool:
979        """Returns `True` if the XML text of the data source is loaded."""
980        return self._text is not None
981
982    def iter(self, tag: Optional[str] = None,
983             nsmap: Optional[MutableMapping[str, str]] = None) -> Iterator[ElementType]:
984        """
985        XML resource tree iterator. The iteration of a lazy resource
986        is in reverse order (top level element is the last). If tag
987        is not None or '*', only elements whose tag equals tag are
988        returned from the iterator. Provide a *nsmap* list for
989        tracking the namespaces of yielded elements. If *nsmap* is
990        a dictionary the tracking of namespaces is cumulative on
991        the whole tree, renaming prefixes in case of conflicts.
992        """
993        if self._lazy:
994            resource = self.open()
995            tag = '*' if tag is None else tag.strip()
996            try:
997                for event, node in self._lazy_iterparse(resource, nsmap):
998                    if event == 'end':
999                        if tag == '*' or node.tag == tag:
1000                            yield node
1001                        node.clear()
1002            finally:
1003                # Close the resource only if it was originally opened by XMLResource
1004                if resource is not self._source:
1005                    resource.close()
1006
1007        elif not self._nsmap or nsmap is None:
1008            yield from self._root.iter(tag)
1009        else:
1010            _nsmap = None
1011            for elem in self._root.iter(tag):
1012                try:
1013                    if _nsmap is not self._nsmap[elem]:
1014                        _nsmap = self._nsmap[elem]
1015                        if isinstance(nsmap, list):
1016                            nsmap.clear()
1017                            nsmap.extend(_nsmap)
1018                        else:
1019                            for prefix, uri in _nsmap:
1020                                self._update_nsmap(nsmap, prefix, uri)
1021                except KeyError:
1022                    pass
1023
1024                yield elem
1025
1026    def iter_location_hints(self, tag: Optional[str] = None) -> Iterator[Tuple[str, str]]:
1027        """
1028        Yields all schema location hints of the XML resource. If tag
1029        is not None or '*', only location hints of elements whose tag
1030        equals tag are returned from the iterator.
1031        """
1032        for elem in self.iter(tag):
1033            yield from etree_iter_location_hints(elem)
1034
1035    def iter_depth(self, mode: int = 1, nsmap: Optional[NsmapType] = None,
1036                   ancestors: Optional[List[ElementType]] = None) -> Iterator[ElementType]:
1037        """
1038        Iterates XML subtrees. For fully loaded resources yields the root element.
1039        On lazy resources the argument *mode* can change the sequence and the
1040        completeness of yielded elements. There are four possible modes, that
1041        generate different sequences of elements:\n
1042          1. Only the elements at *depth_level* level of the tree\n
1043          2. Only a root element pruned at *depth_level*\n
1044          3. The elements at *depth_level* and then a pruned root\n
1045          4. An incomplete root at start, the elements at *depth_level* and a pruned root
1046
1047        :param mode: an integer in range [1..4] that defines the iteration mode.
1048        :param nsmap: provide a list/dict for tracking the namespaces of yielded \
1049        elements. If a list is passed the tracking is done at element level, otherwise \
1050        the tracking is on the whole tree, renaming prefixes in case of conflicts.
1051        :param ancestors: provide a list for tracking the ancestors of yielded elements.
1052        """
1053        if ancestors is not None:
1054            ancestors.clear()
1055
1056        if not self._lazy:
1057            if nsmap is not None and self._nsmap:
1058                if isinstance(nsmap, list):
1059                    nsmap.clear()
1060                    nsmap.extend(self._nsmap[self._root])
1061                else:
1062                    for elem in self._root.iter():
1063                        for prefix, uri in self._nsmap[elem]:
1064                            self._update_nsmap(nsmap, prefix, uri)
1065
1066            yield self._root
1067            return
1068
1069        if mode not in (1, 2, 3, 4):
1070            raise XMLSchemaValueError("invalid argument mode={!r}".format(mode))
1071
1072        resource = self.open()
1073        level = 0
1074        subtree_level = int(self._lazy)
1075
1076        try:
1077            for event, node in self._lazy_iterparse(resource, nsmap):
1078                if event == "start":
1079                    if not level:
1080                        if mode == 4:
1081                            yield node
1082                    if ancestors is not None and level < subtree_level:
1083                        ancestors.append(node)
1084                    level += 1
1085                else:
1086                    level -= 1
1087                    if not level:
1088                        if mode != 1:
1089                            yield node
1090                    elif level != subtree_level:
1091                        if ancestors is not None and level < subtree_level:
1092                            ancestors.pop()
1093                        continue  # pragma: no cover
1094                    elif mode != 2:
1095                        yield node
1096
1097                    del node[:]  # delete children, keep attributes, text and tail.
1098        finally:
1099            if self._source is not resource:
1100                resource.close()
1101
1102    def iterfind(self, path: str,
1103                 namespaces: Optional[NamespacesType] = None,
1104                 nsmap: Optional[NsmapType] = None,
1105                 ancestors: Optional[List[ElementType]] = None) -> Iterator[ElementType]:
1106        """
1107        Apply XPath selection to XML resource that yields full subtrees.
1108
1109        :param path: an XPath expression to select element nodes.
1110        :param namespaces: an optional mapping from namespace prefixes to URIs \
1111        used for parsing the XPath expression.
1112        :param nsmap: provide a list/dict for tracking the namespaces of yielded \
1113        elements. If a list is passed the tracking is done at element level, otherwise \
1114        the tracking is on the whole tree, renaming prefixes in case of conflicts.
1115        :param ancestors: provide a list for tracking the ancestors of yielded elements.
1116        """
1117        selector: Any
1118
1119        if self._lazy:
1120            selector = LazySelector(path, namespaces)
1121            path = path.replace(' ', '').replace('./', '')
1122            resource = self.open()
1123            level = 0
1124            select_all = '*' in path and set(path).issubset({'*', '/'})
1125            if path == '.':
1126                subtree_level = 0
1127            elif path.startswith('/'):
1128                subtree_level = path.count('/') - 1
1129            else:
1130                subtree_level = path.count('/') + 1
1131
1132            try:
1133                for event, node in self._lazy_iterparse(resource, nsmap):
1134                    if event == "start":
1135                        if ancestors is not None and level < subtree_level:
1136                            ancestors.append(node)
1137                        level += 1
1138                    else:
1139                        level -= 1
1140                        if not level:
1141                            if subtree_level:
1142                                pass
1143                            elif select_all or node in selector.select(self._root):
1144                                yield node
1145                        elif not subtree_level:
1146                            continue
1147                        elif level != subtree_level:
1148                            if ancestors is not None and level < subtree_level:
1149                                ancestors.pop()
1150                            continue  # pragma: no cover
1151                        elif select_all or node in selector.select(self._root):
1152                            yield node
1153
1154                        del node[:]  # delete children, keep attributes, text and tail.
1155
1156            finally:
1157                if self._source is not resource:
1158                    resource.close()
1159
1160        else:
1161            if ancestors is None:
1162                selector = iter_select
1163            else:
1164                parent_map = self.parent_map
1165                ancestors.clear()
1166
1167                def selector(*args: Any, **kwargs: Any) -> Iterator[Any]:
1168                    assert ancestors is not None
1169                    for e in iter_select(*args, **kwargs):
1170                        if e is self._root:
1171                            ancestors.clear()
1172                        else:
1173                            _ancestors = []
1174                            parent = e
1175                            try:
1176                                while True:
1177                                    parent = parent_map[parent]
1178                                    if parent is not None:
1179                                        _ancestors.append(parent)
1180                            except KeyError:
1181                                pass
1182
1183                            if _ancestors:
1184                                ancestors.clear()
1185                                ancestors.extend(reversed(_ancestors))
1186
1187                        yield e
1188
1189            if not self._nsmap or nsmap is None:
1190                yield from selector(self._root, path, namespaces, strict=False)
1191            else:
1192                _nsmap = None
1193                for elem in selector(self._root, path, namespaces, strict=False):
1194                    try:
1195                        if _nsmap is not self._nsmap[elem]:
1196                            _nsmap = self._nsmap[elem]
1197                            if isinstance(nsmap, list):
1198                                nsmap.clear()
1199                                nsmap.extend(_nsmap)
1200                            else:
1201                                for prefix, uri in _nsmap:
1202                                    self._update_nsmap(nsmap, prefix, uri)
1203                    except KeyError:
1204                        pass
1205
1206                    yield elem
1207
1208    def find(self, path: str,
1209             namespaces: Optional[NamespacesType] = None,
1210             nsmap: Optional[NsmapType] = None,
1211             ancestors: Optional[List[ElementType]] = None) -> Optional[ElementType]:
1212        return next(self.iterfind(path, namespaces, nsmap, ancestors), None)
1213
1214    def findall(self, path: str, namespaces: Optional[NamespacesType] = None) \
1215            -> List[ElementType]:
1216        return list(self.iterfind(path, namespaces))
1217
1218    def get_namespaces(self, namespaces: Optional[NamespacesType] = None,
1219                       root_only: Optional[bool] = None) -> NamespacesType:
1220        """
1221        Extracts namespaces with related prefixes from the XML resource. If a duplicate
1222        prefix declaration is encountered and the prefix maps a different namespace,
1223        adds the namespace using a different generated prefix. The empty prefix '' is
1224        used only if it's declared at root level to avoid erroneous mapping of local
1225        names. In other cases uses 'default' prefix as substitute.
1226
1227        :param namespaces: builds the namespace map starting over the dictionary provided.
1228        :param root_only: if `True`, or `None` and the resource is lazy, extracts \
1229        only the namespaces declared in the root element.
1230        :return: a dictionary for mapping namespace prefixes to full URI.
1231        """
1232        if root_only is None:
1233            root_only = bool(self._lazy)
1234
1235        if namespaces is None:
1236            namespaces = {}
1237        elif namespaces.get('xml', XML_NAMESPACE) != XML_NAMESPACE:
1238            msg = "reserved prefix (xml) must not be bound to another namespace name"
1239            raise XMLSchemaValueError(msg)
1240        else:
1241            namespaces = copy.copy(namespaces)
1242
1243        try:
1244            for _ in self.iter(nsmap=namespaces):
1245                if root_only:
1246                    break
1247        except (ElementTree.ParseError, PyElementTree.ParseError, UnicodeEncodeError):
1248            pass
1249
1250        return namespaces
1251
1252    def get_locations(self, locations: Optional[LocationsType] = None,
1253                      root_only: Optional[bool] = None) -> NormalizedLocationsType:
1254        """
1255        Extracts a list of schema location hints from the XML resource.
1256        The locations are normalized using the base URL of the instance.
1257
1258        :param locations: a sequence of schema location hints inserted \
1259        before the ones extracted from the XML resource. Locations passed \
1260        within a tuple container are not normalized.
1261        :param root_only: if `True`, or if `None` and the resource is lazy, \
1262        extracts the location hints of the root element only.
1263        :returns: a list of couples containing normalized location hints.
1264        """
1265        if root_only is None:
1266            root_only = bool(self._lazy)
1267
1268        if not locations:
1269            location_hints = []
1270        elif isinstance(locations, tuple):
1271            location_hints = [x for x in locations]
1272        else:
1273            location_hints = normalize_locations(locations, self.base_url)
1274
1275        if root_only:
1276            location_hints.extend([
1277                (ns, normalize_url(url, self.base_url))
1278                for ns, url in etree_iter_location_hints(self._root)
1279            ])
1280        else:
1281            location_hints.extend([
1282                (ns, normalize_url(url, self.base_url))
1283                for ns, url in self.iter_location_hints()
1284            ])
1285        return location_hints
1286