1# 2# Copyright (c), 2016-2020, SISSA (International School for Advanced Studies). 3# All rights reserved. 4# This file is distributed under the terms of the MIT License. 5# See the file 'LICENSE' in the root directory of the present 6# distribution, or http://opensource.org/licenses/MIT. 7# 8# @author Davide Brunato <brunato@sissa.it> 9# 10import threading 11from typing import TYPE_CHECKING, cast, Any, Dict, Iterator, Optional, Union 12from elementpath import XPath2Parser, XPathContext, XPathToken, ElementPathError 13 14from ..names import XSD_ASSERT 15from ..aliases import ElementType, SchemaType, SchemaElementType, NamespacesType 16from ..xpath import XMLSchemaProtocol, ElementProtocol, ElementPathMixin, XMLSchemaProxy 17 18from .exceptions import XMLSchemaNotBuiltError, XMLSchemaValidationError 19from .xsdbase import XsdComponent 20from .groups import XsdGroup 21 22 23if TYPE_CHECKING: 24 from ..resources import XMLResource 25 from .attributes import XsdAttributeGroup 26 from .complex_types import XsdComplexType 27 from .elements import XsdElement 28 from .wildcards import XsdAnyElement 29 30 31class XsdAssert(XsdComponent, ElementPathMixin[Union['XsdAssert', SchemaElementType]]): 32 """ 33 Class for XSD *assert* constraint definitions. 34 35 .. <assert 36 id = ID 37 test = an XPath expression 38 xpathDefaultNamespace = (anyURI | (##defaultNamespace | ##targetNamespace | ##local)) 39 {any attributes with non-schema namespace . . .}> 40 Content: (annotation?) 41 </assert> 42 """ 43 parent: 'XsdComplexType' 44 _ADMITTED_TAGS = {XSD_ASSERT} 45 token: Optional[XPathToken] = None 46 parser: Optional[XPath2Parser] = None 47 path = 'true()' 48 49 def __init__(self, elem: ElementType, 50 schema: SchemaType, 51 parent: 'XsdComplexType', 52 base_type: 'XsdComplexType') -> None: 53 54 self._xpath_lock = threading.Lock() 55 self.base_type = base_type 56 super(XsdAssert, self).__init__(elem, schema, parent) 57 58 def __repr__(self) -> str: 59 if len(self.path) < 40: 60 return '%s(test=%r)' % (self.__class__.__name__, self.path) 61 else: 62 return '%s(test=%r)' % (self.__class__.__name__, self.path[:37] + '...') 63 64 def __getstate__(self) -> Dict[str, Any]: 65 state = self.__dict__.copy() 66 state.pop('_xpath_lock', None) 67 return state 68 69 def __setstate__(self, state: Any) -> None: 70 self.__dict__.update(state) 71 self._xpath_lock = threading.Lock() 72 73 def _parse(self) -> None: 74 if self.base_type.is_simple(): 75 self.parse_error("base_type=%r is not a complexType definition" % self.base_type) 76 else: 77 try: 78 self.path = self.elem.attrib['test'].strip() 79 except KeyError as err: 80 self.parse_error(err) 81 82 if 'xpathDefaultNamespace' in self.elem.attrib: 83 self.xpath_default_namespace = self._parse_xpath_default_namespace(self.elem) 84 else: 85 self.xpath_default_namespace = self.schema.xpath_default_namespace 86 87 @property 88 def built(self) -> bool: 89 return self.parser is not None and self.token is not None 90 91 def build(self) -> None: 92 # Assert requires a schema bound parser because select 93 # is on XML elements and with XSD type decoded values 94 self.parser = XPath2Parser( 95 namespaces=self.namespaces, 96 variable_types={'value': self.base_type.sequence_type}, 97 strict=False, 98 default_namespace=self.xpath_default_namespace, 99 schema=self.xpath_proxy, 100 ) 101 102 try: 103 self.token = self.parser.parse(self.path) 104 except ElementPathError as err: 105 self.parse_error(err) 106 self.token = self.parser.parse('true()') 107 finally: 108 if self.parser.variable_types: 109 self.parser.variable_types.clear() 110 111 def __call__(self, elem: ElementType, 112 value: Any = None, 113 namespaces: Optional[NamespacesType] = None, 114 source: Optional['XMLResource'] = None, 115 **kwargs: Any) -> Iterator[XMLSchemaValidationError]: 116 117 if self.parser is None or self.token is None: 118 raise XMLSchemaNotBuiltError(self, "schema bound parser not set") 119 120 with self._xpath_lock: 121 if not self.parser.is_schema_bound() and self.parser.schema: 122 self.parser.schema.bind_parser(self.parser) 123 124 if namespaces is None or isinstance(namespaces, dict): 125 _namespaces = namespaces 126 else: 127 _namespaces = dict(namespaces) 128 129 variables = {'value': None if value is None else self.base_type.text_decode(value)} 130 if source is not None: 131 context = XPathContext(source.root, namespaces=_namespaces, 132 item=elem, variables=variables) 133 else: 134 # If validated from a component (could not work with rooted XPath expressions) 135 context = XPathContext(elem, variables=variables) 136 137 try: 138 if not self.token.evaluate(context): 139 yield XMLSchemaValidationError(self, obj=elem, reason="assertion test if false") 140 except ElementPathError as err: 141 yield XMLSchemaValidationError(self, obj=elem, reason=str(err)) 142 143 # For implementing ElementPathMixin 144 def __iter__(self) -> Iterator[Union['XsdElement', 'XsdAnyElement']]: 145 if isinstance(self.parent.content, XsdGroup): 146 yield from self.parent.content.iter_elements() 147 148 @property 149 def attrib(self) -> 'XsdAttributeGroup': 150 return self.parent.attributes 151 152 @property 153 def type(self) -> 'XsdComplexType': 154 return self.parent 155 156 @property 157 def xpath_proxy(self) -> 'XMLSchemaProxy': 158 return XMLSchemaProxy( 159 schema=cast(XMLSchemaProtocol, self.schema), 160 base_element=cast(ElementProtocol, self) 161 ) 162