1#
2# Copyright (c), 2016-2020, SISSA (International School for Advanced Studies).
3# All rights reserved.
4# This file is distributed under the terms of the MIT License.
5# See the file 'LICENSE' in the root directory of the present
6# distribution, or http://opensource.org/licenses/MIT.
7#
8# @author Davide Brunato <brunato@sissa.it>
9#
10import threading
11from typing import TYPE_CHECKING, cast, Any, Dict, Iterator, Optional, Union
12from elementpath import XPath2Parser, XPathContext, XPathToken, ElementPathError
13
14from ..names import XSD_ASSERT
15from ..aliases import ElementType, SchemaType, SchemaElementType, NamespacesType
16from ..xpath import XMLSchemaProtocol, ElementProtocol, ElementPathMixin, XMLSchemaProxy
17
18from .exceptions import XMLSchemaNotBuiltError, XMLSchemaValidationError
19from .xsdbase import XsdComponent
20from .groups import XsdGroup
21
22
23if TYPE_CHECKING:
24    from ..resources import XMLResource
25    from .attributes import XsdAttributeGroup
26    from .complex_types import XsdComplexType
27    from .elements import XsdElement
28    from .wildcards import XsdAnyElement
29
30
31class XsdAssert(XsdComponent, ElementPathMixin[Union['XsdAssert', SchemaElementType]]):
32    """
33    Class for XSD *assert* constraint definitions.
34
35    ..  <assert
36          id = ID
37          test = an XPath expression
38          xpathDefaultNamespace = (anyURI | (##defaultNamespace | ##targetNamespace | ##local))
39          {any attributes with non-schema namespace . . .}>
40          Content: (annotation?)
41        </assert>
42    """
43    parent: 'XsdComplexType'
44    _ADMITTED_TAGS = {XSD_ASSERT}
45    token: Optional[XPathToken] = None
46    parser: Optional[XPath2Parser] = None
47    path = 'true()'
48
49    def __init__(self, elem: ElementType,
50                 schema: SchemaType,
51                 parent: 'XsdComplexType',
52                 base_type: 'XsdComplexType') -> None:
53
54        self._xpath_lock = threading.Lock()
55        self.base_type = base_type
56        super(XsdAssert, self).__init__(elem, schema, parent)
57
58    def __repr__(self) -> str:
59        if len(self.path) < 40:
60            return '%s(test=%r)' % (self.__class__.__name__, self.path)
61        else:
62            return '%s(test=%r)' % (self.__class__.__name__, self.path[:37] + '...')
63
64    def __getstate__(self) -> Dict[str, Any]:
65        state = self.__dict__.copy()
66        state.pop('_xpath_lock', None)
67        return state
68
69    def __setstate__(self, state: Any) -> None:
70        self.__dict__.update(state)
71        self._xpath_lock = threading.Lock()
72
73    def _parse(self) -> None:
74        if self.base_type.is_simple():
75            self.parse_error("base_type=%r is not a complexType definition" % self.base_type)
76        else:
77            try:
78                self.path = self.elem.attrib['test'].strip()
79            except KeyError as err:
80                self.parse_error(err)
81
82        if 'xpathDefaultNamespace' in self.elem.attrib:
83            self.xpath_default_namespace = self._parse_xpath_default_namespace(self.elem)
84        else:
85            self.xpath_default_namespace = self.schema.xpath_default_namespace
86
87    @property
88    def built(self) -> bool:
89        return self.parser is not None and self.token is not None
90
91    def build(self) -> None:
92        # Assert requires a schema bound parser because select
93        # is on XML elements and with XSD type decoded values
94        self.parser = XPath2Parser(
95            namespaces=self.namespaces,
96            variable_types={'value': self.base_type.sequence_type},
97            strict=False,
98            default_namespace=self.xpath_default_namespace,
99            schema=self.xpath_proxy,
100        )
101
102        try:
103            self.token = self.parser.parse(self.path)
104        except ElementPathError as err:
105            self.parse_error(err)
106            self.token = self.parser.parse('true()')
107        finally:
108            if self.parser.variable_types:
109                self.parser.variable_types.clear()
110
111    def __call__(self, elem: ElementType,
112                 value: Any = None,
113                 namespaces: Optional[NamespacesType] = None,
114                 source: Optional['XMLResource'] = None,
115                 **kwargs: Any) -> Iterator[XMLSchemaValidationError]:
116
117        if self.parser is None or self.token is None:
118            raise XMLSchemaNotBuiltError(self, "schema bound parser not set")
119
120        with self._xpath_lock:
121            if not self.parser.is_schema_bound() and self.parser.schema:
122                self.parser.schema.bind_parser(self.parser)
123
124        if namespaces is None or isinstance(namespaces, dict):
125            _namespaces = namespaces
126        else:
127            _namespaces = dict(namespaces)
128
129        variables = {'value': None if value is None else self.base_type.text_decode(value)}
130        if source is not None:
131            context = XPathContext(source.root, namespaces=_namespaces,
132                                   item=elem, variables=variables)
133        else:
134            # If validated from a component (could not work with rooted XPath expressions)
135            context = XPathContext(elem, variables=variables)
136
137        try:
138            if not self.token.evaluate(context):
139                yield XMLSchemaValidationError(self, obj=elem, reason="assertion test if false")
140        except ElementPathError as err:
141            yield XMLSchemaValidationError(self, obj=elem, reason=str(err))
142
143    # For implementing ElementPathMixin
144    def __iter__(self) -> Iterator[Union['XsdElement', 'XsdAnyElement']]:
145        if isinstance(self.parent.content, XsdGroup):
146            yield from self.parent.content.iter_elements()
147
148    @property
149    def attrib(self) -> 'XsdAttributeGroup':
150        return self.parent.attributes
151
152    @property
153    def type(self) -> 'XsdComplexType':
154        return self.parent
155
156    @property
157    def xpath_proxy(self) -> 'XMLSchemaProxy':
158        return XMLSchemaProxy(
159            schema=cast(XMLSchemaProtocol, self.schema),
160            base_element=cast(ElementProtocol, self)
161        )
162