1# 2# Copyright (c), 2016-2020, SISSA (International School for Advanced Studies). 3# All rights reserved. 4# This file is distributed under the terms of the MIT License. 5# See the file 'LICENSE' in the root directory of the present 6# distribution, or http://opensource.org/licenses/MIT. 7# 8# @author Davide Brunato <brunato@sissa.it> 9# 10""" 11This module contains classes for other XML Schema identity constraints. 12""" 13import re 14import math 15from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Pattern, \ 16 Tuple, Union, Counter 17from elementpath import XPath2Parser, ElementPathError, XPathToken, XPathContext, \ 18 translate_pattern, datatypes 19 20from ..exceptions import XMLSchemaTypeError, XMLSchemaValueError 21from ..names import XSD_QNAME, XSD_UNIQUE, XSD_KEY, XSD_KEYREF, XSD_SELECTOR, XSD_FIELD 22from ..helpers import get_qname, get_extended_qname 23from ..aliases import ElementType, SchemaType, NamespacesType, AtomicValueType 24from ..xpath import iter_schema_nodes 25from .xsdbase import XsdComponent 26from .attributes import XsdAttribute 27 28if TYPE_CHECKING: 29 from .elements import XsdElement 30 31IdentityFieldItemType = Union[AtomicValueType, XsdAttribute, Tuple[Any, ...], None] 32IdentityCounterType = Tuple[IdentityFieldItemType, ...] 33IdentityMapType = Dict[Union['XsdKey', 'XsdKeyref', str, None], 34 Union['IdentityCounter', 'KeyrefCounter']] 35 36XSD_IDENTITY_XPATH_SYMBOLS = frozenset(( 37 'processing-instruction', 'following-sibling', 'preceding-sibling', 38 'ancestor-or-self', 'attribute', 'following', 'namespace', 'preceding', 39 'ancestor', 'position', 'comment', 'parent', 'child', 'false', 'text', 'node', 40 'true', 'last', 'not', 'and', 'mod', 'div', 'or', '..', '//', '!=', '<=', '>=', 41 '(', ')', '[', ']', '.', '@', ',', '/', '|', '*', '-', '=', '+', '<', '>', ':', 42 '(end)', '(unknown)', '(invalid)', '(name)', '(string)', '(float)', '(decimal)', 43 '(integer)', '::', '{', '}', 44)) 45 46 47# XSD identities use a restricted parser and a context for iterate element 48# references. The XMLSchemaProxy is not used for the specific selection of 49# fields and elements and the XSD fields are got at first validation run. 50class IdentityXPathContext(XPathContext): 51 _iter_nodes = staticmethod(iter_schema_nodes) 52 53 54class IdentityXPathParser(XPath2Parser): 55 symbol_table = { 56 k: v for k, v in XPath2Parser.symbol_table.items() # type: ignore[misc] 57 if k in XSD_IDENTITY_XPATH_SYMBOLS 58 } 59 SYMBOLS = XSD_IDENTITY_XPATH_SYMBOLS 60 61 62class XsdSelector(XsdComponent): 63 """Class for defining an XPath selector for an XSD identity constraint.""" 64 _ADMITTED_TAGS = {XSD_SELECTOR} 65 xpath_default_namespace = '' 66 pattern: Union[str, Pattern[str]] = translate_pattern( 67 r"(\.//)?(((child::)?((\i\c*:)?(\i\c*|\*)))|\.)(/(((child::)?" 68 r"((\i\c*:)?(\i\c*|\*)))|\.))*(\|(\.//)?(((child::)?((\i\c*:)?" 69 r"(\i\c*|\*)))|\.)(/(((child::)?((\i\c*:)?(\i\c*|\*)))|\.))*)*", 70 back_references=False, 71 lazy_quantifiers=False, 72 anchors=False 73 ) 74 token = None # type: XPathToken 75 parser = None # type: IdentityXPathParser 76 77 def __init__(self, elem: ElementType, schema: SchemaType, 78 parent: Optional['XsdIdentity']) -> None: 79 super(XsdSelector, self).__init__(elem, schema, parent) 80 81 def _parse(self) -> None: 82 try: 83 self.path = self.elem.attrib['xpath'] 84 except KeyError: 85 self.parse_error("'xpath' attribute required") 86 self.path = '*' 87 else: 88 path = self.path.replace(' ', '') 89 try: 90 _match = self.pattern.match(path) # type: ignore[union-attr] 91 except AttributeError: 92 # Compile regex pattern 93 self.__class__.pattern = re.compile(self.pattern) 94 _match = self.pattern.match(path) # type: ignore[union-attr] 95 96 if not _match: 97 msg = "invalid XPath expression for an {}" 98 self.parse_error(msg.format(self.__class__.__name__)) 99 100 # XSD 1.1 xpathDefaultNamespace attribute 101 if self.schema.XSD_VERSION > '1.0': 102 if 'xpathDefaultNamespace' in self.elem.attrib: 103 self.xpath_default_namespace = self._parse_xpath_default_namespace(self.elem) 104 else: 105 self.xpath_default_namespace = self.schema.xpath_default_namespace 106 107 self.parser = IdentityXPathParser( 108 namespaces=self.namespaces, 109 strict=False, 110 compatibility_mode=True, 111 default_namespace=self.xpath_default_namespace, 112 ) 113 114 try: 115 self.token = self.parser.parse(self.path) 116 except ElementPathError as err: 117 self.token = self.parser.parse('*') 118 self.parse_error(err) 119 120 def __repr__(self) -> str: 121 return '%s(path=%r)' % (self.__class__.__name__, self.path) 122 123 @property 124 def built(self) -> bool: 125 return self.token is not None 126 127 @property 128 def target_namespace(self) -> str: 129 # TODO: implement a property in elementpath for getting XPath token's namespace 130 if self.token is None: 131 pass # xpathDefaultNamespace="##targetNamespace" 132 elif self.token.symbol == ':': 133 return self.token[1].namespace or self.xpath_default_namespace 134 elif self.token.symbol == '@' and self.token[0].symbol == ':': 135 return self.token[0][1].namespace or self.xpath_default_namespace 136 return self.schema.target_namespace 137 138 139class XsdFieldSelector(XsdSelector): 140 """Class for defining an XPath field selector for an XSD identity constraint.""" 141 _ADMITTED_TAGS = {XSD_FIELD} 142 pattern = translate_pattern( 143 r"(\.//)?((((child::)?((\i\c*:)?(\i\c*|\*)))|\.)/)*((((child::)?" 144 r"((\i\c*:)?(\i\c*|\*)))|\.)|((attribute::|@)((\i\c*:)?(\i\c*|\*))))" 145 r"(\|(\.//)?((((child::)?((\i\c*:)?(\i\c*|\*)))|\.)/)*" 146 r"((((child::)?((\i\c*:)?(\i\c*|\*)))|\.)|" 147 r"((attribute::|@)((\i\c*:)?(\i\c*|\*)))))*", 148 back_references=False, 149 lazy_quantifiers=False, 150 anchors=False 151 ) 152 153 154class XsdIdentity(XsdComponent): 155 """ 156 Common class for XSD identity constraints. 157 158 :ivar selector: the XPath selector of the identity constraint. 159 :ivar fields: a list containing the XPath field selectors of the identity constraint. 160 """ 161 name: str 162 local_name: str 163 prefixed_name: str 164 parent: 'XsdElement' 165 ref: Optional['XsdIdentity'] 166 167 selector = None # type: XsdSelector 168 fields = () # type: Union[Tuple[()], List[XsdFieldSelector]] 169 170 # XSD elements bound by selector (for speed-up and for lazy mode) 171 elements: Union[Tuple[()], Dict['XsdElement', Optional[IdentityCounterType]]] = () 172 173 def __init__(self, elem: ElementType, schema: SchemaType, 174 parent: Optional['XsdElement']) -> None: 175 super(XsdIdentity, self).__init__(elem, schema, parent) 176 177 def _parse(self) -> None: 178 try: 179 self.name = get_qname(self.target_namespace, self.elem.attrib['name']) 180 except KeyError: 181 self.parse_error("missing required attribute 'name'") 182 self.name = '' 183 184 for child in self.elem: 185 if child.tag == XSD_SELECTOR: 186 self.selector = XsdSelector(child, self.schema, self) 187 break 188 else: 189 self.parse_error("missing 'selector' declaration.") 190 191 self.fields = [] 192 for child in self.elem: 193 if child.tag == XSD_FIELD: 194 self.fields.append(XsdFieldSelector(child, self.schema, self)) 195 196 def build(self) -> None: 197 if self.ref is True: # type: ignore[comparison-overlap] 198 try: 199 ref = self.maps.identities[self.name] 200 except KeyError: 201 self.parse_error("unknown identity constraint {!r}".format(self.name)) 202 return 203 else: 204 if not isinstance(ref, self.__class__): 205 self.parse_error("attribute 'ref' points to a different kind constraint") 206 self.selector = ref.selector 207 self.fields = ref.fields 208 self.ref = ref 209 210 context = IdentityXPathContext(self.schema, item=self.parent) # type: ignore 211 212 self.elements = {} 213 try: 214 for e in self.selector.token.select_results(context): 215 if not isinstance(e, XsdComponent) or isinstance(e, XsdAttribute): 216 self.parse_error("selector xpath expression can only select elements") 217 elif e.name is not None: 218 self.elements[e] = None # type: ignore[index] 219 except AttributeError: 220 pass 221 else: 222 if not self.elements: 223 # Try to detect target XSD elements extracting QNames 224 # of the leaf elements from the XPath expression and 225 # use them to match global elements. 226 227 qname: Any 228 for qname in self.selector.token.iter_leaf_elements(): 229 xsd_element = self.maps.elements.get( 230 get_extended_qname(qname, self.namespaces) 231 ) 232 if xsd_element is not None and \ 233 not isinstance(xsd_element, tuple) and \ 234 xsd_element not in self.elements: 235 self.elements[xsd_element] = None 236 237 @property 238 def built(self) -> bool: 239 return not isinstance(self.elements, tuple) 240 241 def get_fields(self, elem: Union[ElementType, 'XsdElement'], 242 namespaces: Optional[NamespacesType] = None, 243 decoders: Optional[Tuple[XsdAttribute, ...]] = None) -> IdentityCounterType: 244 """ 245 Get fields for a schema or instance context element. 246 247 :param elem: an Element or an XsdElement 248 :param namespaces: is an optional mapping from namespace prefix to URI. 249 :param decoders: context schema fields decoders. 250 :return: a tuple with field values. An empty field is replaced by `None`. 251 """ 252 fields: List[IdentityFieldItemType] = [] 253 254 if not isinstance(elem, XsdComponent): 255 context_class = XPathContext 256 else: 257 context_class = IdentityXPathContext 258 259 result: Any 260 value: Union[AtomicValueType, None] 261 for k, field in enumerate(self.fields): 262 result = field.token.get_results(context_class(elem)) # type: ignore 263 264 if not result: 265 if decoders is not None and decoders[k] is not None: 266 value = decoders[k].value_constraint 267 if value is not None: 268 if decoders[k].type.root_type.name == XSD_QNAME: 269 value = get_extended_qname(value, namespaces) 270 271 if isinstance(value, list): 272 fields.append(tuple(value)) 273 elif isinstance(value, bool): 274 fields.append((value, bool)) 275 elif not isinstance(value, float): 276 fields.append(value) 277 elif math.isnan(value): 278 fields.append(('nan', float)) 279 else: 280 fields.append((value, float)) 281 282 continue 283 284 if not isinstance(self, XsdKey) or 'ref' in elem.attrib and \ 285 self.schema.meta_schema is None and self.schema.XSD_VERSION != '1.0': 286 fields.append(None) 287 elif field.target_namespace not in self.maps.namespaces: 288 fields.append(None) 289 else: 290 msg = "missing key field {!r} for {!r}" 291 raise XMLSchemaValueError(msg.format(field.path, self)) 292 293 elif len(result) == 1: 294 if decoders is None or decoders[k] is None: 295 fields.append(result[0]) 296 else: 297 if decoders[k].type.content_type_label not in ('simple', 'mixed'): 298 raise XMLSchemaTypeError("%r field doesn't have a simple type!" % field) 299 300 value = decoders[k].data_value(result[0]) 301 if decoders[k].type.root_type.name == XSD_QNAME: 302 if isinstance(value, str): 303 value = get_extended_qname(value, namespaces) 304 elif isinstance(value, datatypes.QName): 305 value = value.expanded_name 306 307 if isinstance(value, list): 308 fields.append(tuple(value)) 309 elif isinstance(value, bool): 310 fields.append((value, bool)) 311 elif not isinstance(value, float): 312 fields.append(value) 313 elif math.isnan(value): 314 fields.append(('nan', float)) 315 else: 316 fields.append((value, float)) 317 else: 318 raise XMLSchemaValueError("%r field selects multiple values!" % field) 319 320 return tuple(fields) 321 322 def get_counter(self, enabled: bool = True) -> 'IdentityCounter': 323 return IdentityCounter(self, enabled) 324 325 326class XsdUnique(XsdIdentity): 327 _ADMITTED_TAGS = {XSD_UNIQUE} 328 329 330class XsdKey(XsdIdentity): 331 _ADMITTED_TAGS = {XSD_KEY} 332 333 334class XsdKeyref(XsdIdentity): 335 """ 336 Implementation of xs:keyref. 337 338 :ivar refer: reference to a *xs:key* declaration that must be in the same element \ 339 or in a descendant element. 340 """ 341 _ADMITTED_TAGS = {XSD_KEYREF} 342 refer: Optional[Union[str, XsdKey]] = None 343 refer_path = '.' 344 345 def _parse(self) -> None: 346 super(XsdKeyref, self)._parse() 347 try: 348 self.refer = self.schema.resolve_qname(self.elem.attrib['refer']) 349 except (KeyError, ValueError, RuntimeError) as err: 350 if 'refer' not in self.elem.attrib: 351 self.parse_error("missing required attribute 'refer'") 352 else: 353 self.parse_error(err) 354 355 def build(self) -> None: 356 super(XsdKeyref, self).build() 357 358 if isinstance(self.refer, (XsdKey, XsdUnique)): 359 return # referenced key/unique identity constraint already set 360 elif isinstance(self.ref, XsdKeyref): 361 self.refer = self.ref.refer 362 363 if self.refer is None: 364 return # attribute or key/unique identity constraint missing 365 elif isinstance(self.refer, str): 366 refer = self.parent.identities.get(self.refer) 367 if refer is not None and refer.ref is None: 368 self.refer = refer # type: ignore[assignment] 369 else: 370 try: 371 self.refer = self.maps.identities[self.refer] # type: ignore[assignment] 372 except KeyError: 373 self.parse_error("key/unique identity constraint %r is missing" % self.refer) 374 return 375 376 if not isinstance(self.refer, (XsdKey, XsdUnique)): 377 self.parse_error("reference to a non key/unique identity constraint %r" % self.refer) 378 elif len(self.refer.fields) != len(self.fields): 379 self.parse_error("field cardinality mismatch between %r and %r" % (self, self.refer)) 380 elif self.parent is not self.refer.parent: 381 refer_path = self.refer.parent.get_path(ancestor=self.parent) 382 if refer_path is None: 383 # From a note in par. 3.11.5 Part 1 of XSD 1.0 spec: "keyref 384 # identity-constraints may be defined on domains distinct from 385 # the embedded domain of the identity-constraint they reference, 386 # or the domains may be the same but self-embedding at some depth. 387 # In either case the node table for the referenced identity-constraint 388 # needs to propagate upwards, with conflict resolution." 389 refer_path = self.parent.get_path(ancestor=self.refer.parent, reverse=True) 390 if refer_path is None: 391 path1 = self.parent.get_path(reverse=True) 392 path2 = self.refer.parent.get_path() 393 assert path1 is not None 394 assert path2 is not None 395 refer_path = f'{path1}/{path2}' 396 397 self.refer_path = refer_path 398 399 @property 400 def built(self) -> bool: 401 return not isinstance(self.elements, tuple) and isinstance(self.refer, XsdIdentity) 402 403 def get_counter(self, enabled: bool = True) -> 'KeyrefCounter': 404 return KeyrefCounter(self, enabled) 405 406 407class Xsd11Unique(XsdUnique): 408 def _parse(self) -> None: 409 if self._parse_reference(): 410 self.ref = True # type: ignore[assignment] 411 else: 412 super(Xsd11Unique, self)._parse() 413 414 415class Xsd11Key(XsdKey): 416 def _parse(self) -> None: 417 if self._parse_reference(): 418 self.ref = True # type: ignore[assignment] 419 else: 420 super(Xsd11Key, self)._parse() 421 422 423class Xsd11Keyref(XsdKeyref): 424 def _parse(self) -> None: 425 if self._parse_reference(): 426 self.ref = True # type: ignore[assignment] 427 else: 428 super(Xsd11Keyref, self)._parse() 429 430 431class IdentityCounter: 432 433 def __init__(self, identity: XsdIdentity, enabled: bool = True) -> None: 434 self.counter: Counter[IdentityCounterType] = Counter[IdentityCounterType]() 435 self.identity = identity 436 self.enabled = enabled 437 438 def __repr__(self) -> str: 439 return "%s%r" % (self.__class__.__name__[:-7], self.counter) 440 441 def clear(self) -> None: 442 self.counter.clear() 443 self.enabled = True 444 445 def increase(self, fields: IdentityCounterType) -> None: 446 self.counter[fields] += 1 447 if self.counter[fields] == 2: 448 msg = "duplicated value {!r} for {!r}" 449 raise XMLSchemaValueError(msg.format(fields, self.identity)) 450 451 452class KeyrefCounter(IdentityCounter): 453 identity: XsdKeyref 454 455 def increase(self, fields: IdentityCounterType) -> None: 456 self.counter[fields] += 1 457 458 def iter_errors(self, identities: IdentityMapType) -> Iterator[XMLSchemaValueError]: 459 refer_values = identities[self.identity.refer].counter 460 461 for v in filter(lambda x: x not in refer_values, self.counter): 462 if len(v) == 1 and v[0] in refer_values: 463 continue 464 elif self.counter[v] > 1: 465 msg = "value {} not found for {!r} ({} times)" 466 yield XMLSchemaValueError(msg.format(v, self.identity.refer, self.counter[v])) 467 else: 468 msg = "value {} not found for {!r}" 469 yield XMLSchemaValueError(msg.format(v, self.identity.refer)) 470