1# 2# Copyright (c), 2016-2020, SISSA (International School for Advanced Studies). 3# All rights reserved. 4# This file is distributed under the terms of the MIT License. 5# See the file 'LICENSE' in the root directory of the present 6# distribution, or http://opensource.org/licenses/MIT. 7# 8# @author Davide Brunato <brunato@sissa.it> 9# 10""" 11This module contains classes for XML Schema model groups. 12""" 13import warnings 14from collections.abc import MutableMapping 15from typing import TYPE_CHECKING, overload, Any, Iterable, Iterator, List, \ 16 MutableSequence, Optional, Tuple, Union 17 18from .. import limits 19from ..exceptions import XMLSchemaValueError 20from ..names import XSD_GROUP, XSD_SEQUENCE, XSD_ALL, XSD_CHOICE, XSD_ELEMENT, \ 21 XSD_ANY, XSI_TYPE, XSD_ANY_TYPE, XSD_ANNOTATION 22from ..etree import etree_element, ElementData 23from ..aliases import ElementType, NamespacesType, SchemaType, IterDecodeType, \ 24 IterEncodeType, ModelParticleType, SchemaElementType, ComponentClassType 25from ..helpers import get_qname, local_name, raw_xml_encode 26 27from .exceptions import XMLSchemaModelError, XMLSchemaModelDepthError, \ 28 XMLSchemaValidationError, XMLSchemaChildrenValidationError, \ 29 XMLSchemaTypeTableWarning 30from .xsdbase import ValidationMixin, XsdComponent, XsdType 31from .particles import ParticleMixin, OccursCalculator 32from .elements import XsdElement, XsdAlternative 33from .wildcards import XsdAnyElement, Xsd11AnyElement 34from .models import ModelVisitor, distinguishable_paths 35 36if TYPE_CHECKING: 37 from ..resources import XMLResource 38 from .complex_types import XsdComplexType 39 40ANY_ELEMENT = etree_element( 41 XSD_ANY, 42 attrib={ 43 'namespace': '##any', 44 'processContents': 'lax', 45 'minOccurs': '0', 46 'maxOccurs': 'unbounded' 47 }) 48 49GroupDecodeType = List[Tuple[Union[str, int], Any, Optional[SchemaElementType]]] 50GroupEncodeType = Tuple[Optional[str], List[ElementType]] 51 52 53class XsdGroup(XsdComponent, MutableSequence[ModelParticleType], 54 ParticleMixin, ValidationMixin[ElementType, GroupDecodeType]): 55 """ 56 Class for XSD 1.0 *model group* definitions. 57 58 .. <group 59 id = ID 60 maxOccurs = (nonNegativeInteger | unbounded) : 1 61 minOccurs = nonNegativeInteger : 1 62 name = NCName 63 ref = QName 64 {any attributes with non-schema namespace . . .}> 65 Content: (annotation?, (all | choice | sequence)?) 66 </group> 67 68 .. <all 69 id = ID 70 maxOccurs = 1 : 1 71 minOccurs = (0 | 1) : 1 72 {any attributes with non-schema namespace . . .}> 73 Content: (annotation?, element*) 74 </all> 75 76 .. <choice 77 id = ID 78 maxOccurs = (nonNegativeInteger | unbounded) : 1 79 minOccurs = nonNegativeInteger : 1 80 {any attributes with non-schema namespace . . .}> 81 Content: (annotation?, (element | group | choice | sequence | any)*) 82 </choice> 83 84 .. <sequence 85 id = ID 86 maxOccurs = (nonNegativeInteger | unbounded) : 1 87 minOccurs = nonNegativeInteger : 1 88 {any attributes with non-schema namespace . . .}> 89 Content: (annotation?, (element | group | choice | sequence | any)*) 90 </sequence> 91 """ 92 parent: Optional[Union['XsdComplexType', 'XsdGroup']] 93 model: str 94 mixed: bool = False 95 ref: Optional['XsdGroup'] 96 restriction: Optional['XsdGroup'] = None 97 98 # For XSD 1.1 openContent processing 99 interleave: Optional[Xsd11AnyElement] = None # if openContent with mode='interleave' 100 suffix: Optional[Xsd11AnyElement] = None # if openContent with mode='suffix'/'interleave' 101 102 _ADMITTED_TAGS = {XSD_GROUP, XSD_SEQUENCE, XSD_ALL, XSD_CHOICE} 103 104 def __init__(self, elem: ElementType, 105 schema: SchemaType, 106 parent: Optional[Union['XsdComplexType', 'XsdGroup']] = None) -> None: 107 108 self._group: List[ModelParticleType] = [] 109 if parent is not None and parent.mixed: 110 self.mixed = parent.mixed 111 super(XsdGroup, self).__init__(elem, schema, parent) 112 113 def __repr__(self) -> str: 114 if self.name is None: 115 return '%s(model=%r, occurs=%r)' % ( 116 self.__class__.__name__, self.model, list(self.occurs) 117 ) 118 elif self.ref is None: 119 return '%s(name=%r, model=%r, occurs=%r)' % ( 120 self.__class__.__name__, self.prefixed_name, self.model, list(self.occurs) 121 ) 122 else: 123 return '%s(ref=%r, model=%r, occurs=%r)' % ( 124 self.__class__.__name__, self.prefixed_name, self.model, list(self.occurs) 125 ) 126 127 @overload 128 def __getitem__(self, i: int) -> ModelParticleType: ... 129 130 @overload 131 def __getitem__(self, s: slice) -> MutableSequence[ModelParticleType]: ... 132 133 def __getitem__(self, i: Union[int, slice]) \ 134 -> Union[ModelParticleType, MutableSequence[ModelParticleType]]: 135 return self._group[i] 136 137 def __setitem__(self, i: Union[int, slice], o: Any) -> None: 138 self._group[i] = o 139 140 def __delitem__(self, i: Union[int, slice]) -> None: 141 del self._group[i] 142 143 def __len__(self) -> int: 144 return len(self._group) 145 146 def insert(self, i: int, item: ModelParticleType) -> None: 147 self._group.insert(i, item) 148 149 def clear(self) -> None: 150 del self._group[:] 151 152 def is_emptiable(self) -> bool: 153 if self.model == 'choice': 154 return self.min_occurs == 0 or not self or any(item.is_emptiable() for item in self) 155 else: 156 return self.min_occurs == 0 or not self or all(item.is_emptiable() for item in self) 157 158 def is_single(self) -> bool: 159 if self.max_occurs != 1 or not self: 160 return False 161 elif len(self) > 1 or not isinstance(self[0], XsdGroup): 162 return True 163 else: 164 return self[0].is_single() 165 166 def is_pointless(self, parent: 'XsdGroup') -> bool: 167 """ 168 Returns `True` if the group may be eliminated without affecting the model, 169 `False` otherwise. A group is pointless if one of those conditions is verified: 170 171 - the group is empty 172 - minOccurs == maxOccurs == 1 and the group has one child 173 - minOccurs == maxOccurs == 1 and the group and its parent have a sequence model 174 - minOccurs == maxOccurs == 1 and the group and its parent have a choice model 175 176 Ref: https://www.w3.org/TR/2004/REC-xmlschema-1-20041028/#coss-particle 177 178 :param parent: effective parent of the model group. 179 """ 180 if not self: 181 return True 182 elif self.min_occurs != 1 or self.max_occurs != 1: 183 return False 184 elif len(self) == 1: 185 return True 186 elif self.model == 'sequence' and parent.model != 'sequence': 187 return False 188 elif self.model == 'choice' and parent.model != 'choice': 189 return False 190 else: 191 return True 192 193 @property 194 def effective_min_occurs(self) -> int: 195 if not self.min_occurs or not self: 196 return 0 197 elif self.model == 'choice': 198 if any(not e.effective_min_occurs for e in self.iter_model()): 199 return 0 200 else: 201 if all(not e.effective_min_occurs for e in self.iter_model()): 202 return 0 203 return self.min_occurs 204 205 @property 206 def effective_max_occurs(self) -> Optional[int]: 207 if self.max_occurs == 0 or not self: 208 return 0 209 210 effective_items: List[Any] 211 value: int 212 213 effective_items = [e for e in self.iter_model() if e.effective_max_occurs != 0] 214 if not effective_items: 215 return 0 216 elif self.max_occurs is None: 217 return None 218 elif self.model == 'choice': 219 try: 220 value = max(e.effective_max_occurs for e in effective_items) 221 except TypeError: 222 return None 223 else: 224 return self.max_occurs * value 225 226 not_emptiable_items = [e for e in effective_items if e.effective_min_occurs] 227 if not not_emptiable_items: 228 try: 229 value = max(e.effective_max_occurs for e in effective_items) 230 except TypeError: 231 return None 232 else: 233 return self.max_occurs * value 234 235 elif len(not_emptiable_items) > 1: 236 return self.max_occurs 237 238 value = not_emptiable_items[0].effective_max_occurs 239 return None if value is None else self.max_occurs * value 240 241 def has_occurs_restriction( 242 self, other: Union[ModelParticleType, ParticleMixin, 'OccursCalculator']) -> bool: 243 244 if not self: 245 return True 246 elif isinstance(other, XsdGroup): 247 return super(XsdGroup, self).has_occurs_restriction(other) 248 249 # Group particle compared to element particle 250 if self.max_occurs is None or any(e.max_occurs is None for e in self): 251 if other.max_occurs is not None: 252 return False 253 elif self.model == 'choice': 254 return self.min_occurs * min(e.min_occurs for e in self) >= other.min_occurs 255 else: 256 return self.min_occurs * sum(e.min_occurs for e in self) >= other.min_occurs 257 258 elif self.model == 'choice': 259 if self.min_occurs * min(e.min_occurs for e in self) < other.min_occurs: 260 return False 261 elif other.max_occurs is None: 262 return True 263 else: 264 value: int 265 try: 266 value = max(e.max_occurs for e in self) # type: ignore[type-var, assignment] 267 except TypeError: 268 return False 269 else: 270 return self.max_occurs * value <= other.max_occurs 271 272 else: 273 if self.min_occurs * sum(e.min_occurs for e in self) < other.min_occurs: 274 return False 275 elif other.max_occurs is None: 276 return True 277 else: 278 try: 279 value = sum(e.max_occurs for e in self) # type: ignore[misc] 280 except TypeError: 281 return False 282 else: 283 return self.max_occurs * value <= other.max_occurs 284 285 def iter_model(self, depth: int = 0) -> Iterator[ModelParticleType]: 286 """ 287 A generator function iterating elements and groups of a model group. 288 Skips pointless groups, iterating deeper through them. 289 Raises `XMLSchemaModelDepthError` if the argument *depth* is over 290 `limits.MAX_MODEL_DEPTH` value. 291 292 :param depth: guard for protect model nesting bombs, incremented at each deepest recursion. 293 """ 294 if depth > limits.MAX_MODEL_DEPTH: 295 raise XMLSchemaModelDepthError(self) 296 for item in self: 297 if isinstance(item, XsdGroup) and item.is_pointless(parent=self): 298 yield from item.iter_model(depth + 1) 299 else: 300 yield item 301 302 def iter_elements(self, depth: int = 0) -> Iterator[SchemaElementType]: 303 """ 304 A generator function iterating model's elements. Raises `XMLSchemaModelDepthError` 305 if the argument *depth* is over `limits.MAX_MODEL_DEPTH` value. 306 307 :param depth: guard for protect model nesting bombs, incremented at each deepest recursion. 308 """ 309 if depth > limits.MAX_MODEL_DEPTH: 310 raise XMLSchemaModelDepthError(self) 311 if self.max_occurs != 0: 312 for item in self: 313 if isinstance(item, XsdGroup): 314 yield from item.iter_elements(depth + 1) 315 else: 316 yield item 317 318 def get_subgroups(self, item: ModelParticleType) -> List['XsdGroup']: 319 """ 320 Returns a list of the groups that represent the path to the enclosed particle. 321 Raises an `XMLSchemaModelError` if *item* is not a particle of the model group. 322 """ 323 subgroups: List[Tuple[XsdGroup, Iterator[ModelParticleType]]] = [] 324 group, children = self, iter(self) 325 326 while True: 327 try: 328 child = next(children) 329 except StopIteration: 330 try: 331 group, children = subgroups.pop() 332 except IndexError: 333 msg = '{!r} is not a particle of the model group' 334 raise XMLSchemaModelError(self, msg.format(item)) from None 335 else: 336 continue 337 338 if child is item: 339 _subgroups = [x[0] for x in subgroups] 340 _subgroups.append(group) 341 return _subgroups 342 elif isinstance(child, XsdGroup): 343 if len(subgroups) > limits.MAX_MODEL_DEPTH: 344 raise XMLSchemaModelDepthError(self) 345 subgroups.append((group, children)) 346 group, children = child, iter(child) 347 348 def overall_min_occurs(self, item: ModelParticleType) -> int: 349 """Returns the overall min occurs of a particle in the model.""" 350 min_occurs = item.min_occurs 351 352 for group in self.get_subgroups(item): 353 if group.model == 'choice' and len(group) > 1: 354 return 0 355 min_occurs *= group.min_occurs 356 357 return min_occurs 358 359 def overall_max_occurs(self, item: ModelParticleType) -> Optional[int]: 360 """Returns the overall max occurs of a particle in the model.""" 361 max_occurs = item.max_occurs 362 363 for group in self.get_subgroups(item): 364 if max_occurs == 0: 365 return 0 366 elif max_occurs is None: 367 continue 368 elif group.max_occurs is None: 369 max_occurs = None 370 else: 371 max_occurs *= group.max_occurs 372 373 return max_occurs 374 375 def copy(self) -> 'XsdGroup': 376 group: XsdGroup = object.__new__(self.__class__) 377 group.__dict__.update(self.__dict__) 378 group.errors = self.errors[:] 379 group._group = self._group[:] 380 return group 381 382 __copy__ = copy 383 384 def _parse(self) -> None: 385 self.clear() 386 self._parse_particle(self.elem) 387 388 if self.elem.tag != XSD_GROUP: 389 # Local group (sequence|all|choice) 390 if 'name' in self.elem.attrib: 391 self.parse_error("attribute 'name' not allowed for a local group") 392 self._parse_content_model(self.elem) 393 394 elif self._parse_reference(): 395 assert self.name is not None 396 try: 397 xsd_group = self.maps.lookup_group(self.name) 398 except KeyError: 399 self.parse_error("missing group %r" % self.prefixed_name) 400 xsd_group = self.schema.create_any_content_group(parent=self) 401 402 if isinstance(xsd_group, XsdGroup): 403 self.model = xsd_group.model 404 if self.model == 'all': 405 if self.max_occurs != 1: 406 self.parse_error("maxOccurs must be 1 for 'all' model groups") 407 if self.min_occurs not in (0, 1): 408 self.parse_error("minOccurs must be (0 | 1) for 'all' model groups") 409 if self.xsd_version == '1.0' and isinstance(self.parent, XsdGroup): 410 self.parse_error("in XSD 1.0 the 'all' model group cannot be nested") 411 self._group.append(xsd_group) 412 self.ref = xsd_group 413 else: 414 # Disallowed circular definition, substitute with any content group. 415 self.parse_error("Circular definitions detected for group %r:" % self.name, 416 xsd_group[0]) 417 self.model = 'sequence' 418 self.mixed = True 419 self._group.append(self.schema.xsd_any_class(ANY_ELEMENT, self.schema, self)) 420 421 else: 422 attrib = self.elem.attrib 423 try: 424 self.name = get_qname(self.target_namespace, attrib['name']) 425 except KeyError: 426 pass 427 else: 428 if self.parent is not None: 429 self.parse_error("attribute 'name' not allowed for a local group") 430 else: 431 if 'minOccurs' in attrib: 432 self.parse_error("attribute 'minOccurs' not allowed for a global group") 433 if 'maxOccurs' in attrib: 434 self.parse_error("attribute 'maxOccurs' not allowed for a global group") 435 436 content_model = self._parse_child_component(self.elem, strict=True) 437 if content_model is not None: 438 if self.parent is None: 439 if 'minOccurs' in content_model.attrib: 440 self.parse_error("attribute 'minOccurs' not allowed for the model " 441 "of a global group", content_model) 442 if 'maxOccurs' in content_model.attrib: 443 self.parse_error("attribute 'maxOccurs' not allowed for the model " 444 "of a global group", content_model) 445 446 if content_model.tag in {XSD_SEQUENCE, XSD_ALL, XSD_CHOICE}: 447 self._parse_content_model(content_model) 448 else: 449 self.parse_error('unexpected tag %r' % content_model.tag, content_model) 450 451 def _parse_content_model(self, content_model: ElementType) -> None: 452 self.model = local_name(content_model.tag) 453 if self.model == 'all': 454 if self.max_occurs != 1: 455 self.parse_error("maxOccurs must be 1 for 'all' model groups") 456 if self.min_occurs not in (0, 1): 457 self.parse_error("minOccurs must be (0 | 1) for 'all' model groups") 458 459 child: ElementType 460 for child in content_model: 461 if child.tag == XSD_ANNOTATION or callable(child.tag): 462 continue 463 elif child.tag == XSD_ELEMENT: 464 # Builds inner elements later, for avoid circularity. 465 self.append(self.schema.xsd_element_class(child, self.schema, self, False)) 466 elif content_model.tag == XSD_ALL: 467 self.parse_error("'all' model can contains only elements.") 468 elif child.tag == XSD_ANY: 469 self._group.append(XsdAnyElement(child, self.schema, self)) 470 elif child.tag in (XSD_SEQUENCE, XSD_CHOICE): 471 self._group.append(XsdGroup(child, self.schema, self)) 472 elif child.tag == XSD_GROUP: 473 try: 474 ref = self.schema.resolve_qname(child.attrib['ref']) 475 except (KeyError, ValueError, RuntimeError) as err: 476 if 'ref' not in child.attrib: 477 self.parse_error("missing attribute 'ref' in local group", child) 478 else: 479 self.parse_error(err, child) 480 continue 481 482 if ref != self.name: 483 xsd_group = XsdGroup(child, self.schema, self) 484 if xsd_group.model == 'all': 485 self.parse_error("'all' model can appears only at 1st level " 486 "of a model group") 487 else: 488 self._group.append(xsd_group) 489 elif self.redefine is None: 490 self.parse_error("Circular definition detected for group %r:" % self.name) 491 else: 492 if child.get('minOccurs', '1') != '1' or child.get('maxOccurs', '1') != '1': 493 self.parse_error("Redefined group reference cannot have " 494 "minOccurs/maxOccurs other than 1:") 495 self._group.append(self.redefine) 496 497 def children_validation_error(self, validation: str, 498 elem: ElementType, 499 index: int, 500 particle: ModelParticleType, 501 occurs: int = 0, 502 expected: Optional[List[SchemaElementType]] = None, 503 source: Optional['XMLResource'] = None, 504 namespaces: Optional[NamespacesType] = None, 505 **_kwargs: Any) -> XMLSchemaChildrenValidationError: 506 """ 507 Helper method for generating model validation errors. If validation mode 508 is 'lax' or 'skip' returns the error, otherwise raise the error. 509 510 :param validation: the validation mode. Can be 'lax' or 'strict'. 511 :param elem: the instance Element. 512 :param index: the child index. 513 :param particle: the XSD component (subgroup or element) associated to the child. 514 :param occurs: the child tag occurs. 515 :param expected: the expected element tags/object names. 516 :param source: the XML resource related to the validation process. 517 :param namespaces: is an optional mapping from namespace prefix to URI. 518 :param _kwargs: keyword arguments of the validation process that are not used. 519 """ 520 error = XMLSchemaChildrenValidationError(self, elem, index, particle, occurs, 521 expected, source, namespaces) 522 if validation == 'strict': 523 raise error 524 return error 525 526 def build(self) -> None: 527 for item in self._group: 528 if isinstance(item, XsdElement): 529 item.build() 530 531 if self.redefine is not None: 532 for group in self.redefine.iter_components(XsdGroup): 533 group.build() 534 535 @property 536 def built(self) -> bool: 537 for item in self: 538 if isinstance(item, XsdElement): 539 if not item.built: 540 return False 541 elif isinstance(item, XsdAnyElement): 542 continue 543 elif item.parent is None: 544 continue 545 elif item.parent is not self.parent and \ 546 isinstance(item.parent, XsdType) and item.parent.parent is None: 547 continue 548 elif not item.ref and not item.built: 549 return False 550 551 return True if self.model else False 552 553 @property 554 def validation_attempted(self) -> str: 555 if self.built: 556 return 'full' 557 elif any(item.validation_attempted == 'partial' for item in self): 558 return 'partial' 559 else: 560 return 'none' 561 562 @property 563 def schema_elem(self) -> ElementType: 564 return self.parent.elem if self.parent is not None else self.elem 565 566 def iter_components(self, xsd_classes: Optional[ComponentClassType] = None) \ 567 -> Iterator[XsdComponent]: 568 if xsd_classes is None or isinstance(self, xsd_classes): 569 yield self 570 for item in self: 571 if item.parent is None: 572 continue 573 elif item.parent is not self.parent and isinstance(item.parent, XsdType) \ 574 and item.parent.parent is None: 575 continue 576 yield from item.iter_components(xsd_classes) 577 578 if self.redefine is not None and self.redefine not in self: 579 yield from self.redefine.iter_components(xsd_classes) 580 581 def admits_restriction(self, model: str) -> bool: 582 if self.model == model: 583 return True 584 elif self.model == 'all': 585 return model == 'sequence' 586 elif self.model == 'choice': 587 return model == 'sequence' or len(self.ref or self) <= 1 588 else: 589 return model == 'choice' or len(self.ref or self) <= 1 590 591 def is_empty(self) -> bool: 592 return not self.mixed and (not self._group or self.max_occurs == 0) 593 594 def is_restriction(self, other: ModelParticleType, check_occurs: bool = True) -> bool: 595 if not self._group: 596 return True 597 elif not isinstance(other, ParticleMixin): 598 raise XMLSchemaValueError("the argument 'other' must be an XSD particle") 599 elif not isinstance(other, XsdGroup): 600 return self.is_element_restriction(other) 601 elif not other: 602 return False 603 elif len(other) == other.min_occurs == other.max_occurs == 1: 604 if len(self) > 1: 605 return self.is_restriction(other[0], check_occurs) 606 elif self.ref is None and isinstance(self[0], XsdGroup) \ 607 and self[0].is_pointless(parent=self): 608 return self[0].is_restriction(other[0], check_occurs) 609 610 # Compare model with model 611 if self.model != other.model and self.model != 'sequence' and \ 612 (len(self) > 1 or self.ref is not None and len(self[0]) > 1): 613 return False 614 elif self.model == other.model or other.model == 'sequence': 615 return self.is_sequence_restriction(other) 616 elif other.model == 'all': 617 return self.is_all_restriction(other) 618 else: # other.model == 'choice': 619 return self.is_choice_restriction(other) 620 621 def is_element_restriction(self, other: ModelParticleType) -> bool: 622 if self.xsd_version == '1.0' and isinstance(other, XsdElement) and \ 623 not other.ref and other.name not in self.schema.substitution_groups: 624 return False 625 elif not self.has_occurs_restriction(other): 626 return False 627 elif self.model == 'choice': 628 if other.name in self.maps.substitution_groups and \ 629 all(isinstance(e, XsdElement) and e.substitution_group == other.name 630 for e in self): 631 return True 632 return any(e.is_restriction(other, False) for e in self) 633 else: 634 min_occurs = 0 635 max_occurs: Optional[int] = 0 636 for item in self.iter_model(): 637 if isinstance(item, XsdGroup): 638 return False 639 elif item.min_occurs == 0 or item.is_restriction(other, False): 640 min_occurs += item.min_occurs 641 if max_occurs is not None: 642 if item.max_occurs is None: 643 max_occurs = None 644 else: 645 max_occurs += item.max_occurs 646 continue 647 return False 648 649 if min_occurs < other.min_occurs: 650 return False 651 elif max_occurs is None: 652 return other.max_occurs is None 653 elif other.max_occurs is None: 654 return True 655 else: 656 return max_occurs <= other.max_occurs 657 658 def is_sequence_restriction(self, other: 'XsdGroup') -> bool: 659 if not self.has_occurs_restriction(other): 660 return False 661 662 check_occurs = other.max_occurs != 0 663 664 # Same model: declarations must simply preserve order 665 other_iterator = iter(other.iter_model()) 666 for item in self.iter_model(): 667 while True: 668 try: 669 other_item = next(other_iterator) 670 except StopIteration: 671 return False 672 if other_item is item or item.is_restriction(other_item, check_occurs): 673 break 674 elif other.model == 'choice': 675 if item.max_occurs != 0: 676 continue 677 elif not other_item.is_matching(item.name, self.default_namespace): 678 continue 679 elif all(e.max_occurs == 0 for e in self.iter_model()): 680 return False 681 else: 682 break 683 elif not other_item.is_emptiable(): 684 return False 685 686 if other.model == 'choice': 687 return True 688 689 while True: 690 try: 691 other_item = next(other_iterator) 692 except StopIteration: 693 return True 694 else: 695 if not other_item.is_emptiable(): 696 return False 697 698 def is_all_restriction(self, other: 'XsdGroup') -> bool: 699 if not self.has_occurs_restriction(other): 700 return False 701 702 check_occurs = other.max_occurs != 0 703 if self.ref is None: 704 restriction_items = [x for x in self] 705 else: 706 restriction_items = [x for x in self[0]] 707 708 for other_item in other.iter_model(): 709 for item in restriction_items: 710 if other_item is item or item.is_restriction(other_item, check_occurs): 711 break 712 else: 713 if not other_item.is_emptiable(): 714 return False 715 continue 716 restriction_items.remove(item) 717 718 return not bool(restriction_items) 719 720 def is_choice_restriction(self, other: 'XsdGroup') -> bool: 721 if self.ref is None: 722 if self.parent is None and other.parent is not None: 723 return False # not allowed restriction in XSD 1.0 724 restriction_items = [x for x in self] 725 elif other.parent is None: 726 restriction_items = [x for x in self[0]] 727 else: 728 return False # not allowed restriction in XSD 1.0 729 730 check_occurs = other.max_occurs != 0 731 max_occurs: Optional[int] = 0 732 other_max_occurs: Optional[int] = 0 733 734 for other_item in other.iter_model(): 735 for item in restriction_items: 736 if other_item is item or item.is_restriction(other_item, check_occurs): 737 if max_occurs is not None: 738 if item.max_occurs is None: 739 max_occurs = None 740 else: 741 max_occurs += item.max_occurs 742 743 if other_max_occurs is not None: 744 if other_item.max_occurs is None: 745 other_max_occurs = None 746 else: 747 other_max_occurs = max(other_max_occurs, other_item.max_occurs) 748 break 749 else: 750 continue 751 restriction_items.remove(item) 752 753 if restriction_items: 754 return False 755 elif other_max_occurs is None: 756 if other.max_occurs != 0: 757 return True 758 other_max_occurs = 0 759 elif other.max_occurs is None: 760 if other_max_occurs != 0: 761 return True 762 other_max_occurs = 0 763 else: 764 other_max_occurs *= other.max_occurs 765 766 if max_occurs is None: 767 return self.max_occurs == 0 768 elif self.max_occurs is None: 769 return max_occurs == 0 770 else: 771 return other_max_occurs >= max_occurs * self.max_occurs 772 773 def check_model(self) -> None: 774 """ 775 Checks if the model group is deterministic. Element Declarations Consistent and 776 Unique Particle Attribution constraints are checked. 777 :raises: an `XMLSchemaModelError` at first violated constraint. 778 """ 779 def safe_iter_path(group: XsdGroup, depth: int) -> Iterator[SchemaElementType]: 780 if not depth: 781 raise XMLSchemaModelDepthError(group) 782 for item in group: 783 if isinstance(item, XsdGroup): 784 current_path.append(item) 785 yield from safe_iter_path(item, depth - 1) 786 current_path.pop() 787 else: 788 yield item 789 790 paths: Any = {} 791 current_path: List[ModelParticleType] = [self] 792 try: 793 any_element = self.parent.open_content.any_element # type: ignore[union-attr] 794 except AttributeError: 795 any_element = None 796 797 for e in safe_iter_path(self, limits.MAX_MODEL_DEPTH): 798 799 previous_path: List[ModelParticleType] 800 for pe, previous_path in paths.values(): 801 # EDC check 802 if not e.is_consistent(pe) or any_element and not any_element.is_consistent(pe): 803 msg = "Element Declarations Consistent violation between %r and %r: " \ 804 "match the same name but with different types" % (e, pe) 805 raise XMLSchemaModelError(self, msg) 806 807 # UPA check 808 if pe is e or not pe.is_overlap(e): 809 continue 810 elif pe.parent is e.parent: 811 if pe.parent.model in {'all', 'choice'}: 812 if isinstance(pe, Xsd11AnyElement) and not isinstance(e, XsdAnyElement): 813 pe.add_precedence(e, self) 814 elif isinstance(e, Xsd11AnyElement) and not isinstance(pe, XsdAnyElement): 815 e.add_precedence(pe, self) 816 else: 817 msg = "{!r} and {!r} overlap and are in the same {!r} group" 818 raise XMLSchemaModelError(self, msg.format(pe, e, pe.parent.model)) 819 elif pe.min_occurs == pe.max_occurs: 820 continue 821 822 if distinguishable_paths(previous_path + [pe], current_path + [e]): 823 continue 824 elif isinstance(pe, Xsd11AnyElement) and not isinstance(e, XsdAnyElement): 825 pe.add_precedence(e, self) 826 elif isinstance(e, Xsd11AnyElement) and not isinstance(pe, XsdAnyElement): 827 e.add_precedence(pe, self) 828 else: 829 msg = "Unique Particle Attribution violation between {!r} and {!r}" 830 raise XMLSchemaModelError(self, msg.format(pe, e)) 831 832 paths[e.name] = e, current_path[:] 833 834 def check_dynamic_context(self, elem: ElementType, 835 xsd_element: SchemaElementType, 836 model_element: SchemaElementType, 837 namespaces: NamespacesType) -> None: 838 839 if model_element is not xsd_element and isinstance(model_element, XsdElement): 840 if 'substitution' in model_element.block \ 841 or xsd_element.type and xsd_element.type.is_blocked(model_element): 842 raise XMLSchemaValidationError( 843 model_element, elem, "substitution of %r is blocked" % model_element 844 ) 845 846 alternatives: Union[Tuple[()], List[XsdAlternative]] = [] 847 if isinstance(xsd_element, XsdAnyElement): 848 if xsd_element.process_contents == 'skip': 849 return 850 851 try: 852 xsd_element = self.maps.lookup_element(elem.tag) 853 except LookupError: 854 try: 855 type_name = elem.attrib[XSI_TYPE].strip() 856 except KeyError: 857 return 858 else: 859 xsd_type = self.maps.get_instance_type( 860 type_name, self.any_type, namespaces 861 ) 862 else: 863 alternatives = xsd_element.alternatives 864 try: 865 type_name = elem.attrib[XSI_TYPE].strip() 866 except KeyError: 867 xsd_type = xsd_element.type 868 else: 869 xsd_type = self.maps.get_instance_type( 870 type_name, xsd_element.type, namespaces 871 ) 872 873 else: 874 if XSI_TYPE not in elem.attrib: 875 xsd_type = xsd_element.type 876 else: 877 alternatives = xsd_element.alternatives 878 try: 879 type_name = elem.attrib[XSI_TYPE].strip() 880 except KeyError: 881 xsd_type = xsd_element.type 882 else: 883 xsd_type = self.maps.get_instance_type( 884 type_name, xsd_element.type, namespaces 885 ) 886 887 if model_element is not xsd_element and \ 888 isinstance(model_element, XsdElement) and model_element.block: 889 for derivation in model_element.block.split(): 890 if xsd_type is not model_element.type and \ 891 xsd_type.is_derived(model_element.type, derivation): 892 reason = "usage of %r with type %s is blocked by head element" 893 raise XMLSchemaValidationError( 894 self, elem, reason % (xsd_element, derivation) 895 ) 896 897 if XSI_TYPE not in elem.attrib: 898 return 899 900 # If it's a restriction the context is the base_type's group 901 group = self.restriction if self.restriction is not None else self 902 903 # Dynamic EDC check of matched element 904 for e in group.iter_elements(): 905 if not isinstance(e, XsdElement): 906 continue 907 elif e.name == elem.tag: 908 other = e 909 else: 910 for other in e.iter_substitutes(): 911 if other.name == elem.tag: 912 break 913 else: 914 continue 915 916 if len(other.alternatives) != len(alternatives) or \ 917 not xsd_type.is_dynamic_consistent(other.type): 918 reason = "%r that matches %r is not consistent with local declaration %r" 919 raise XMLSchemaValidationError(self, reason % (elem, xsd_element, other)) 920 921 if not all(any(a == x for x in alternatives) for a in other.alternatives) or \ 922 not all(any(a == x for x in other.alternatives) for a in alternatives): 923 msg = "Maybe a not equivalent type table between elements %r and %r." 924 warnings.warn(msg % (self, xsd_element), XMLSchemaTypeTableWarning, stacklevel=3) 925 926 def match_element(self, name: str, default_namespace: Optional[str] = None) \ 927 -> Optional[SchemaElementType]: 928 """ 929 Try a model-less match of a child element. Returns the 930 matched element, or `None` if there is no match. 931 """ 932 for xsd_element in self.iter_elements(): 933 if xsd_element.is_matching(name, default_namespace, group=self): 934 return xsd_element 935 return None 936 937 def iter_decode(self, obj: ElementType, validation: str = 'lax', **kwargs: Any) \ 938 -> IterDecodeType[GroupDecodeType]: 939 """ 940 Creates an iterator for decoding an Element content. 941 942 :param obj: an Element. 943 :param validation: the validation mode, can be 'lax', 'strict' or 'skip'. 944 :param kwargs: keyword arguments for the decoding process. 945 :return: yields a list of 3-tuples (key, decoded data, decoder), \ 946 eventually preceded by a sequence of validation or decoding errors. 947 """ 948 result_list: GroupDecodeType = [] 949 cdata_index = 1 # keys for CDATA sections are positive integers 950 951 if not self._group and self.model == 'choice' and self.min_occurs: 952 reason = "an empty 'choice' group with minOccurs > 0 cannot validate any content" 953 yield self.validation_error(validation, reason, obj, **kwargs) 954 yield result_list 955 return 956 957 if not self.mixed: 958 # Check element CDATA 959 if obj.text and obj.text.strip() or \ 960 any(child.tail and child.tail.strip() for child in obj): 961 if len(self) == 1 and isinstance(self[0], XsdAnyElement): 962 pass # [XsdAnyElement()] equals to an empty complexType declaration 963 else: 964 reason = "character data between child elements not allowed" 965 yield self.validation_error(validation, reason, obj, **kwargs) 966 cdata_index = 0 # Do not decode CDATA 967 968 if cdata_index and obj.text is not None: 969 text = str(obj.text.strip()) 970 if text: 971 result_list.append((cdata_index, text, None)) 972 cdata_index += 1 973 974 level = kwargs['level'] = kwargs.pop('level', 0) + 1 975 over_max_depth = 'max_depth' in kwargs and kwargs['max_depth'] <= level 976 if level > limits.MAX_XML_DEPTH: 977 reason = "XML data depth exceeded (MAX_XML_DEPTH=%r)" % limits.MAX_XML_DEPTH 978 self.validation_error('strict', reason, obj, **kwargs) 979 980 try: 981 namespaces = kwargs['namespaces'] 982 except KeyError: 983 namespaces = default_namespace = None 984 else: 985 try: 986 default_namespace = namespaces.get('') 987 except AttributeError: 988 default_namespace = None 989 990 errors: List[Tuple[int, ModelParticleType, int, Optional[List[SchemaElementType]]]] 991 xsd_element: Optional[SchemaElementType] 992 expected: Optional[List[SchemaElementType]] 993 994 model = ModelVisitor(self) 995 errors = [] 996 broken_model = False 997 998 for index, child in enumerate(obj): 999 if callable(child.tag): 1000 continue # child is a <class 'lxml.etree._Comment'> 1001 1002 while model.element is not None: 1003 xsd_element = model.element.match( 1004 child.tag, default_namespace, group=self, occurs=model.occurs 1005 ) 1006 if xsd_element is None: 1007 if self.interleave is not None and self.interleave.is_matching( 1008 child.tag, default_namespace, self, model.occurs): 1009 xsd_element = self.interleave 1010 break 1011 1012 for particle, occurs, expected in model.advance(False): 1013 errors.append((index, particle, occurs, expected)) 1014 model.clear() 1015 broken_model = True # the model is broken, continues with raw decoding. 1016 xsd_element = self.match_element(child.tag, default_namespace) 1017 break 1018 else: 1019 continue 1020 break 1021 1022 try: 1023 self.check_dynamic_context(child, xsd_element, model.element, namespaces) 1024 except XMLSchemaValidationError as err: 1025 yield self.validation_error(validation, err, obj, **kwargs) 1026 1027 for particle, occurs, expected in model.advance(True): 1028 errors.append((index, particle, occurs, expected)) 1029 break 1030 else: 1031 if self.suffix is not None and \ 1032 self.suffix.is_matching(child.tag, default_namespace, self): 1033 xsd_element = self.suffix 1034 else: 1035 xsd_element = self.match_element(child.tag, default_namespace) 1036 if xsd_element is None: 1037 errors.append((index, self, 0, None)) 1038 broken_model = True 1039 elif not broken_model: 1040 errors.append((index, xsd_element, 0, [])) 1041 broken_model = True 1042 1043 if xsd_element is None: 1044 if kwargs.get('keep_unknown') and 'converter' in kwargs: 1045 for result in self.any_type.iter_decode(child, validation, **kwargs): 1046 result_list.append((child.tag, result, None)) 1047 continue 1048 elif 'converter' not in kwargs: 1049 # Validation-only mode: do not append results 1050 for result in xsd_element.iter_decode(child, validation, **kwargs): 1051 if isinstance(result, XMLSchemaValidationError): 1052 yield result 1053 continue 1054 elif over_max_depth: 1055 if 'depth_filler' in kwargs: 1056 func = kwargs['depth_filler'] 1057 result_list.append((child.tag, func(xsd_element), xsd_element)) 1058 continue 1059 1060 for result in xsd_element.iter_decode(child, validation, **kwargs): 1061 if isinstance(result, XMLSchemaValidationError): 1062 yield result 1063 else: 1064 result_list.append((child.tag, result, xsd_element)) 1065 1066 if cdata_index and child.tail is not None: 1067 tail = str(child.tail.strip()) 1068 if tail: 1069 if result_list and isinstance(result_list[-1][0], int): 1070 tail = result_list[-1][1] + ' ' + tail 1071 result_list[-1] = result_list[-1][0], tail, None 1072 else: 1073 result_list.append((cdata_index, tail, None)) 1074 cdata_index += 1 1075 1076 if model.element is not None: 1077 index = len(obj) 1078 for particle, occurs, expected in model.stop(): 1079 errors.append((index, particle, occurs, expected)) 1080 1081 if errors: 1082 for index, particle, occurs, expected in errors: 1083 yield self.children_validation_error( 1084 validation, obj, index, particle, occurs, expected, **kwargs 1085 ) 1086 1087 yield result_list 1088 1089 def iter_encode(self, obj: ElementData, validation: str = 'lax', **kwargs: Any) \ 1090 -> IterEncodeType[GroupEncodeType]: 1091 """ 1092 Creates an iterator for encoding data to a list containing Element data. 1093 1094 :param obj: an ElementData instance. 1095 :param validation: the validation mode: can be 'lax', 'strict' or 'skip'. 1096 :param kwargs: keyword arguments for the encoding process. 1097 :return: yields a couple with the text of the Element and a list of child \ 1098 elements, eventually preceded by a sequence of validation errors. 1099 """ 1100 level = kwargs['level'] = kwargs.get('level', 0) + 1 1101 errors = [] 1102 text = raw_xml_encode(obj.text) 1103 children: List[ElementType] = [] 1104 try: 1105 indent = kwargs['indent'] 1106 except KeyError: 1107 indent = 4 1108 1109 padding = '\n' + ' ' * indent * level 1110 1111 try: 1112 converter = kwargs['converter'] 1113 except KeyError: 1114 converter = kwargs['converter'] = self.schema.get_converter(**kwargs) 1115 1116 default_namespace = converter.get('') 1117 model = ModelVisitor(self) 1118 index = cdata_index = 0 1119 wrong_content_type = False 1120 over_max_depth = 'max_depth' in kwargs and kwargs['max_depth'] <= level 1121 1122 content: Iterable[Any] 1123 if obj.content is None: 1124 content = [] 1125 elif isinstance(obj.content, MutableMapping) or kwargs.get('unordered'): 1126 content = ModelVisitor(self).iter_unordered_content( 1127 obj.content, default_namespace 1128 ) 1129 elif not isinstance(obj.content, MutableSequence): 1130 wrong_content_type = True 1131 content = [] 1132 elif converter.losslessly: 1133 content = obj.content 1134 else: 1135 content = ModelVisitor(self).iter_collapsed_content( 1136 obj.content, default_namespace 1137 ) 1138 1139 for index, (name, value) in enumerate(content): 1140 if isinstance(name, int): 1141 if not children: 1142 text = padding + value if text is None else text + value + padding 1143 elif children[-1].tail is None: 1144 children[-1].tail = padding + value 1145 else: 1146 children[-1].tail += value + padding 1147 cdata_index += 1 1148 continue 1149 1150 xsd_element: Optional[SchemaElementType] 1151 if self.interleave and self.interleave.is_matching(name, default_namespace, group=self): 1152 xsd_element = self.interleave 1153 value = get_qname(default_namespace, name), value 1154 else: 1155 while model.element is not None: 1156 xsd_element = model.element.match( 1157 name, default_namespace, group=self, occurs=model.occurs 1158 ) 1159 if xsd_element is None: 1160 for particle, occurs, expected in model.advance(): 1161 errors.append((index - cdata_index, particle, occurs, expected)) 1162 continue 1163 elif isinstance(xsd_element, XsdAnyElement): 1164 value = get_qname(default_namespace, name), value 1165 1166 for particle, occurs, expected in model.advance(True): 1167 errors.append((index - cdata_index, particle, occurs, expected)) 1168 break 1169 else: 1170 if self.suffix and self.suffix.is_matching(name, default_namespace, group=self): 1171 xsd_element = self.suffix 1172 value = get_qname(default_namespace, name), value 1173 else: 1174 errors.append((index - cdata_index, self, 0, [])) 1175 xsd_element = self.match_element(name, default_namespace) 1176 if isinstance(xsd_element, XsdAnyElement): 1177 value = get_qname(default_namespace, name), value 1178 elif xsd_element is None: 1179 if name.startswith('{') or ':' not in name: 1180 reason = '{!r} does not match any declared element ' \ 1181 'of the model group.'.format(name) 1182 else: 1183 reason = '{} has an unknown prefix {!r}'.format( 1184 name, name.split(':')[0] 1185 ) 1186 yield self.validation_error(validation, reason, value, **kwargs) 1187 continue 1188 1189 if over_max_depth: 1190 continue 1191 1192 for result in xsd_element.iter_encode(value, validation, **kwargs): 1193 if isinstance(result, XMLSchemaValidationError): 1194 yield result 1195 else: 1196 children.append(result) 1197 1198 if model.element is not None: 1199 for particle, occurs, expected in model.stop(): 1200 errors.append((index - cdata_index + 1, particle, occurs, expected)) 1201 1202 if children: 1203 if children[-1].tail is None: 1204 children[-1].tail = padding[:-indent] or '\n' 1205 else: 1206 children[-1].tail = children[-1].tail.strip() + (padding[:-indent] or '\n') 1207 1208 cdata_not_allowed = not self.mixed and text and text.strip() and self and \ 1209 (len(self) > 1 or not isinstance(self[0], XsdAnyElement)) 1210 1211 if errors or cdata_not_allowed or wrong_content_type: 1212 attrib = {k: str(v) for k, v in obj.attributes.items()} 1213 if validation != 'strict' and converter.etree_element_class is not etree_element: 1214 child_tags = [converter.etree_element(e.tag, attrib=e.attrib) for e in children] 1215 elem = converter.etree_element(obj.tag, text, child_tags, attrib) 1216 else: 1217 elem = converter.etree_element(obj.tag, text, children, attrib) 1218 1219 if wrong_content_type: 1220 reason = "wrong content type {!r}".format(type(obj.content)) 1221 yield self.validation_error(validation, reason, elem, **kwargs) 1222 1223 if cdata_not_allowed: 1224 reason = "character data between child elements not allowed" 1225 yield self.validation_error(validation, reason, elem, **kwargs) 1226 1227 for index, particle, occurs, expected in errors: 1228 yield self.children_validation_error( 1229 validation, elem, index, particle, occurs, expected, **kwargs 1230 ) 1231 1232 yield text, children 1233 1234 1235class Xsd11Group(XsdGroup): 1236 """ 1237 Class for XSD 1.1 *model group* definitions. 1238 1239 .. The XSD 1.1 model groups differ from XSD 1.0 groups for the 'all' model, 1240 .. that can contains also other groups. 1241 .. <all 1242 id = ID 1243 maxOccurs = (0 | 1) : 1 1244 minOccurs = (0 | 1) : 1 1245 {any attributes with non-schema namespace . . .}> 1246 Content: (annotation?, (element | any | group)*) 1247 </all> 1248 """ 1249 def _parse_content_model(self, content_model: ElementType) -> None: 1250 self.model = local_name(content_model.tag) 1251 if self.model == 'all': 1252 if self.max_occurs not in (0, 1): 1253 self.parse_error("maxOccurs must be (0 | 1) for 'all' model groups") 1254 if self.min_occurs not in (0, 1): 1255 self.parse_error("minOccurs must be (0 | 1) for 'all' model groups") 1256 1257 for child in content_model: 1258 if child.tag == XSD_ELEMENT: 1259 # Builds inner elements later, for avoid circularity. 1260 self.append(self.schema.xsd_element_class(child, self.schema, self, False)) 1261 elif child.tag == XSD_ANY: 1262 self._group.append(Xsd11AnyElement(child, self.schema, self)) 1263 elif child.tag in (XSD_SEQUENCE, XSD_CHOICE, XSD_ALL): 1264 self._group.append(Xsd11Group(child, self.schema, self)) 1265 elif child.tag == XSD_GROUP: 1266 try: 1267 ref = self.schema.resolve_qname(child.attrib['ref']) 1268 except (KeyError, ValueError, RuntimeError) as err: 1269 if 'ref' not in child.attrib: 1270 self.parse_error("missing attribute 'ref' in local group", child) 1271 else: 1272 self.parse_error(err, child) 1273 continue 1274 1275 if ref != self.name: 1276 xsd_group = Xsd11Group(child, self.schema, self) 1277 self._group.append(xsd_group) 1278 if (self.model != 'all') ^ (xsd_group.model != 'all'): 1279 msg = "an xs:%s group cannot include a reference to an xs:%s group" 1280 self.parse_error(msg % (self.model, xsd_group.model)) 1281 self.pop() 1282 1283 elif self.redefine is None: 1284 self.parse_error("Circular definition detected for group %r:" % self.name) 1285 else: 1286 if child.get('minOccurs', '1') != '1' or child.get('maxOccurs', '1') != '1': 1287 self.parse_error("Redefined group reference cannot have " 1288 "minOccurs/maxOccurs other than 1:") 1289 self._group.append(self.redefine) 1290 1291 def admits_restriction(self, model: str) -> bool: 1292 if self.model == model or self.model == 'all': 1293 return True 1294 elif self.model == 'choice': 1295 return model == 'sequence' or len(self.ref or self) <= 1 1296 else: 1297 return model == 'choice' or len(self.ref or self) <= 1 1298 1299 def is_restriction(self, other: ModelParticleType, check_occurs: bool = True) -> bool: 1300 if not self._group: 1301 return True 1302 elif not isinstance(other, ParticleMixin): 1303 raise XMLSchemaValueError("the argument 'base' must be a %r instance" % ParticleMixin) 1304 elif not isinstance(other, XsdGroup): 1305 return self.is_element_restriction(other) 1306 elif not other: 1307 return False 1308 elif len(other) == other.min_occurs == other.max_occurs == 1: 1309 if len(self) > 1: 1310 return self.is_restriction(other[0], check_occurs) 1311 elif self.ref is None and isinstance(self[0], XsdGroup) \ 1312 and self[0].is_pointless(parent=self): 1313 return self[0].is_restriction(other[0], check_occurs) 1314 1315 if other.model == 'sequence': 1316 return self.is_sequence_restriction(other) 1317 elif other.model == 'all': 1318 return self.is_all_restriction(other) 1319 else: # other.model == 'choice': 1320 return self.is_choice_restriction(other) 1321 1322 def is_sequence_restriction(self, other: XsdGroup) -> bool: 1323 if not self.has_occurs_restriction(other): 1324 return False 1325 1326 check_occurs = other.max_occurs != 0 1327 1328 item_iterator = iter(self.iter_model()) 1329 item = next(item_iterator, None) 1330 1331 for other_item in other.iter_model(): 1332 if item is not None and item.is_restriction(other_item, check_occurs): 1333 item = next(item_iterator, None) 1334 elif not other_item.is_emptiable(): 1335 break 1336 else: 1337 if item is None: 1338 return True 1339 1340 # Restriction check failed: try another check without removing pointless groups 1341 item_iterator = iter(self) 1342 item = next(item_iterator, None) 1343 1344 for other_item in other.iter_model(): 1345 if item is not None and item.is_restriction(other_item, check_occurs): 1346 item = next(item_iterator, None) 1347 elif not other_item.is_emptiable(): 1348 return False 1349 return item is None 1350 1351 def is_all_restriction(self, other: XsdGroup) -> bool: 1352 if not self.has_occurs_restriction(other): 1353 return False 1354 restriction_items = [x for x in self.iter_model()] 1355 1356 base_items = [x for x in other.iter_model()] 1357 1358 # If the base includes more wildcard, calculates and appends a 1359 # wildcard union for validating wildcard unions in restriction 1360 wildcards: List[XsdAnyElement] = [] 1361 for w1 in base_items: 1362 if isinstance(w1, XsdAnyElement): 1363 for w2 in wildcards: 1364 if w1.process_contents == w2.process_contents and w1.occurs == w2.occurs: 1365 w2.union(w1) 1366 w2.extended = True 1367 break 1368 else: 1369 wildcards.append(w1.copy()) 1370 1371 base_items.extend(w for w in wildcards if hasattr(w, 'extended')) 1372 1373 if self.model != 'choice': 1374 restriction_wildcards = [e for e in restriction_items if isinstance(e, XsdAnyElement)] 1375 1376 for other_item in base_items: 1377 min_occurs, max_occurs = 0, other_item.max_occurs 1378 for k in range(len(restriction_items) - 1, -1, -1): 1379 item = restriction_items[k] 1380 1381 if item.is_restriction(other_item, check_occurs=False): 1382 if max_occurs is None: 1383 min_occurs += item.min_occurs 1384 elif item.max_occurs is None or max_occurs < item.max_occurs or \ 1385 min_occurs + item.min_occurs > max_occurs: 1386 continue 1387 else: 1388 min_occurs += item.min_occurs 1389 max_occurs -= item.max_occurs 1390 1391 restriction_items.remove(item) 1392 if not min_occurs or max_occurs == 0: 1393 break 1394 else: 1395 if self.model == 'all' and restriction_wildcards: 1396 if not isinstance(other_item, XsdGroup) and other_item.type \ 1397 and other_item.type.name != XSD_ANY_TYPE: 1398 1399 for w in restriction_wildcards: 1400 if w.is_matching(other_item.name, self.target_namespace): 1401 return False 1402 1403 if min_occurs < other_item.min_occurs: 1404 break 1405 else: 1406 if not restriction_items: 1407 return True 1408 return False 1409 1410 # Restriction with a choice model: this a more complex case 1411 # because the not emptiable elements of the base group have 1412 # to be included in each item of the choice group. 1413 not_emptiable_items = {x for x in base_items if x.min_occurs} 1414 1415 for other_item in base_items: 1416 min_occurs, max_occurs = 0, other_item.max_occurs 1417 for k in range(len(restriction_items) - 1, -1, -1): 1418 item = restriction_items[k] 1419 1420 if item.is_restriction(other_item, check_occurs=False): 1421 if max_occurs is None: 1422 min_occurs += item.min_occurs 1423 elif item.max_occurs is None or max_occurs < item.max_occurs or \ 1424 min_occurs + item.min_occurs > max_occurs: 1425 continue 1426 else: 1427 min_occurs += item.min_occurs 1428 max_occurs -= item.max_occurs 1429 1430 if not_emptiable_items: 1431 if len(not_emptiable_items) > 1: 1432 continue 1433 if other_item not in not_emptiable_items: 1434 continue 1435 1436 restriction_items.remove(item) 1437 if not min_occurs or max_occurs == 0: 1438 break 1439 1440 if min_occurs < other_item.min_occurs: 1441 break 1442 else: 1443 if not restriction_items: 1444 return True 1445 1446 if any(not isinstance(x, XsdGroup) for x in restriction_items): 1447 return False 1448 1449 # If the remaining items are groups try to verify if they are all 1450 # restrictions of the 'all' group and if each group contains all 1451 # not emptiable elements. 1452 for group in restriction_items: 1453 if not group.is_restriction(other): 1454 return False 1455 1456 for item in not_emptiable_items: 1457 for e in group: 1458 if e.name == item.name: 1459 break 1460 else: 1461 return False 1462 else: 1463 return True 1464 1465 def is_choice_restriction(self, other: XsdGroup) -> bool: 1466 restriction_items = [x for x in self.iter_model()] 1467 has_not_empty_item = any(e.max_occurs != 0 for e in restriction_items) 1468 1469 check_occurs = other.max_occurs != 0 1470 max_occurs: Optional[int] = 0 1471 other_max_occurs: Optional[int] = 0 1472 1473 for other_item in other.iter_model(): 1474 for item in restriction_items: 1475 if other_item is item or item.is_restriction(other_item, check_occurs): 1476 if max_occurs is not None: 1477 effective_max_occurs = item.effective_max_occurs 1478 if effective_max_occurs is None: 1479 max_occurs = None 1480 elif self.model == 'choice': 1481 max_occurs = max(max_occurs, effective_max_occurs) 1482 else: 1483 max_occurs += effective_max_occurs 1484 1485 if other_max_occurs is not None: 1486 effective_max_occurs = other_item.effective_max_occurs 1487 if effective_max_occurs is None: 1488 other_max_occurs = None 1489 else: 1490 other_max_occurs = max(other_max_occurs, effective_max_occurs) 1491 break 1492 elif item.max_occurs != 0: 1493 continue 1494 elif not other_item.is_matching(item.name, self.default_namespace): 1495 continue 1496 elif has_not_empty_item: 1497 break 1498 else: 1499 return False 1500 else: 1501 continue 1502 restriction_items.remove(item) 1503 1504 if restriction_items: 1505 return False 1506 elif other_max_occurs is None: 1507 if other.max_occurs != 0: 1508 return True 1509 other_max_occurs = 0 1510 elif other.max_occurs is None: 1511 if other_max_occurs != 0: 1512 return True 1513 other_max_occurs = 0 1514 else: 1515 other_max_occurs *= other.max_occurs 1516 1517 if max_occurs is None: 1518 return self.max_occurs == 0 1519 elif self.max_occurs is None: 1520 return max_occurs == 0 1521 else: 1522 return other_max_occurs >= max_occurs * self.max_occurs 1523