1#
2# Copyright (c), 2016-2020, SISSA (International School for Advanced Studies).
3# All rights reserved.
4# This file is distributed under the terms of the MIT License.
5# See the file 'LICENSE' in the root directory of the present
6# distribution, or http://opensource.org/licenses/MIT.
7#
8# @author Davide Brunato <brunato@sissa.it>
9#
10"""
11This module contains classes for other XML Schema identity constraints.
12"""
13import re
14import math
15from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Pattern, \
16    Tuple, Union, Counter
17from elementpath import XPath2Parser, ElementPathError, XPathToken, XPathContext, \
18    translate_pattern, datatypes
19
20from ..exceptions import XMLSchemaTypeError, XMLSchemaValueError
21from ..names import XSD_QNAME, XSD_UNIQUE, XSD_KEY, XSD_KEYREF, XSD_SELECTOR, XSD_FIELD
22from ..helpers import get_qname, get_extended_qname
23from ..aliases import ElementType, SchemaType, NamespacesType, AtomicValueType
24from ..xpath import iter_schema_nodes
25from .xsdbase import XsdComponent
26from .attributes import XsdAttribute
27
28if TYPE_CHECKING:
29    from .elements import XsdElement
30
31IdentityFieldItemType = Union[AtomicValueType, XsdAttribute, Tuple[Any, ...], None]
32IdentityCounterType = Tuple[IdentityFieldItemType, ...]
33IdentityMapType = Dict[Union['XsdKey', 'XsdKeyref', str, None],
34                       Union['IdentityCounter', 'KeyrefCounter']]
35
36XSD_IDENTITY_XPATH_SYMBOLS = frozenset((
37    'processing-instruction', 'following-sibling', 'preceding-sibling',
38    'ancestor-or-self', 'attribute', 'following', 'namespace', 'preceding',
39    'ancestor', 'position', 'comment', 'parent', 'child', 'false', 'text', 'node',
40    'true', 'last', 'not', 'and', 'mod', 'div', 'or', '..', '//', '!=', '<=', '>=',
41    '(', ')', '[', ']', '.', '@', ',', '/', '|', '*', '-', '=', '+', '<', '>', ':',
42    '(end)', '(unknown)', '(invalid)', '(name)', '(string)', '(float)', '(decimal)',
43    '(integer)', '::', '{', '}',
44))
45
46
47# XSD identities use a restricted parser and a context for iterate element
48# references. The XMLSchemaProxy is not used for the specific selection of
49# fields and elements and the XSD fields are got at first validation run.
50class IdentityXPathContext(XPathContext):
51    _iter_nodes = staticmethod(iter_schema_nodes)
52
53
54class IdentityXPathParser(XPath2Parser):
55    symbol_table = {
56        k: v for k, v in XPath2Parser.symbol_table.items()  # type: ignore[misc]
57        if k in XSD_IDENTITY_XPATH_SYMBOLS
58    }
59    SYMBOLS = XSD_IDENTITY_XPATH_SYMBOLS
60
61
62class XsdSelector(XsdComponent):
63    """Class for defining an XPath selector for an XSD identity constraint."""
64    _ADMITTED_TAGS = {XSD_SELECTOR}
65    xpath_default_namespace = ''
66    pattern: Union[str, Pattern[str]] = translate_pattern(
67        r"(\.//)?(((child::)?((\i\c*:)?(\i\c*|\*)))|\.)(/(((child::)?"
68        r"((\i\c*:)?(\i\c*|\*)))|\.))*(\|(\.//)?(((child::)?((\i\c*:)?"
69        r"(\i\c*|\*)))|\.)(/(((child::)?((\i\c*:)?(\i\c*|\*)))|\.))*)*",
70        back_references=False,
71        lazy_quantifiers=False,
72        anchors=False
73    )
74    token = None   # type: XPathToken
75    parser = None  # type: IdentityXPathParser
76
77    def __init__(self, elem: ElementType, schema: SchemaType,
78                 parent: Optional['XsdIdentity']) -> None:
79        super(XsdSelector, self).__init__(elem, schema, parent)
80
81    def _parse(self) -> None:
82        try:
83            self.path = self.elem.attrib['xpath']
84        except KeyError:
85            self.parse_error("'xpath' attribute required")
86            self.path = '*'
87        else:
88            path = self.path.replace(' ', '')
89            try:
90                _match = self.pattern.match(path)  # type: ignore[union-attr]
91            except AttributeError:
92                # Compile regex pattern
93                self.__class__.pattern = re.compile(self.pattern)
94                _match = self.pattern.match(path)  # type: ignore[union-attr]
95
96            if not _match:
97                msg = "invalid XPath expression for an {}"
98                self.parse_error(msg.format(self.__class__.__name__))
99
100        # XSD 1.1 xpathDefaultNamespace attribute
101        if self.schema.XSD_VERSION > '1.0':
102            if 'xpathDefaultNamespace' in self.elem.attrib:
103                self.xpath_default_namespace = self._parse_xpath_default_namespace(self.elem)
104            else:
105                self.xpath_default_namespace = self.schema.xpath_default_namespace
106
107        self.parser = IdentityXPathParser(
108            namespaces=self.namespaces,
109            strict=False,
110            compatibility_mode=True,
111            default_namespace=self.xpath_default_namespace,
112        )
113
114        try:
115            self.token = self.parser.parse(self.path)
116        except ElementPathError as err:
117            self.token = self.parser.parse('*')
118            self.parse_error(err)
119
120    def __repr__(self) -> str:
121        return '%s(path=%r)' % (self.__class__.__name__, self.path)
122
123    @property
124    def built(self) -> bool:
125        return self.token is not None
126
127    @property
128    def target_namespace(self) -> str:
129        # TODO: implement a property in elementpath for getting XPath token's namespace
130        if self.token is None:
131            pass  # xpathDefaultNamespace="##targetNamespace"
132        elif self.token.symbol == ':':
133            return self.token[1].namespace or self.xpath_default_namespace
134        elif self.token.symbol == '@' and self.token[0].symbol == ':':
135            return self.token[0][1].namespace or self.xpath_default_namespace
136        return self.schema.target_namespace
137
138
139class XsdFieldSelector(XsdSelector):
140    """Class for defining an XPath field selector for an XSD identity constraint."""
141    _ADMITTED_TAGS = {XSD_FIELD}
142    pattern = translate_pattern(
143        r"(\.//)?((((child::)?((\i\c*:)?(\i\c*|\*)))|\.)/)*((((child::)?"
144        r"((\i\c*:)?(\i\c*|\*)))|\.)|((attribute::|@)((\i\c*:)?(\i\c*|\*))))"
145        r"(\|(\.//)?((((child::)?((\i\c*:)?(\i\c*|\*)))|\.)/)*"
146        r"((((child::)?((\i\c*:)?(\i\c*|\*)))|\.)|"
147        r"((attribute::|@)((\i\c*:)?(\i\c*|\*)))))*",
148        back_references=False,
149        lazy_quantifiers=False,
150        anchors=False
151    )
152
153
154class XsdIdentity(XsdComponent):
155    """
156    Common class for XSD identity constraints.
157
158    :ivar selector: the XPath selector of the identity constraint.
159    :ivar fields: a list containing the XPath field selectors of the identity constraint.
160    """
161    name: str
162    local_name: str
163    prefixed_name: str
164    parent: 'XsdElement'
165    ref: Optional['XsdIdentity']
166
167    selector = None  # type: XsdSelector
168    fields = ()      # type: Union[Tuple[()], List[XsdFieldSelector]]
169
170    # XSD elements bound by selector (for speed-up and for lazy mode)
171    elements: Union[Tuple[()], Dict['XsdElement', Optional[IdentityCounterType]]] = ()
172
173    def __init__(self, elem: ElementType, schema: SchemaType,
174                 parent: Optional['XsdElement']) -> None:
175        super(XsdIdentity, self).__init__(elem, schema, parent)
176
177    def _parse(self) -> None:
178        try:
179            self.name = get_qname(self.target_namespace, self.elem.attrib['name'])
180        except KeyError:
181            self.parse_error("missing required attribute 'name'")
182            self.name = ''
183
184        for child in self.elem:
185            if child.tag == XSD_SELECTOR:
186                self.selector = XsdSelector(child, self.schema, self)
187                break
188        else:
189            self.parse_error("missing 'selector' declaration.")
190
191        self.fields = []
192        for child in self.elem:
193            if child.tag == XSD_FIELD:
194                self.fields.append(XsdFieldSelector(child, self.schema, self))
195
196    def build(self) -> None:
197        if self.ref is True:  # type: ignore[comparison-overlap]
198            try:
199                ref = self.maps.identities[self.name]
200            except KeyError:
201                self.parse_error("unknown identity constraint {!r}".format(self.name))
202                return
203            else:
204                if not isinstance(ref, self.__class__):
205                    self.parse_error("attribute 'ref' points to a different kind constraint")
206                self.selector = ref.selector
207                self.fields = ref.fields
208                self.ref = ref
209
210        context = IdentityXPathContext(self.schema, item=self.parent)  # type: ignore
211
212        self.elements = {}
213        try:
214            for e in self.selector.token.select_results(context):
215                if not isinstance(e, XsdComponent) or isinstance(e, XsdAttribute):
216                    self.parse_error("selector xpath expression can only select elements")
217                elif e.name is not None:
218                    self.elements[e] = None  # type: ignore[index]
219        except AttributeError:
220            pass
221        else:
222            if not self.elements:
223                # Try to detect target XSD elements extracting QNames
224                # of the leaf elements from the XPath expression and
225                # use them to match global elements.
226
227                qname: Any
228                for qname in self.selector.token.iter_leaf_elements():
229                    xsd_element = self.maps.elements.get(
230                        get_extended_qname(qname, self.namespaces)
231                    )
232                    if xsd_element is not None and \
233                            not isinstance(xsd_element, tuple) and \
234                            xsd_element not in self.elements:
235                        self.elements[xsd_element] = None
236
237    @property
238    def built(self) -> bool:
239        return not isinstance(self.elements, tuple)
240
241    def get_fields(self, elem: Union[ElementType, 'XsdElement'],
242                   namespaces: Optional[NamespacesType] = None,
243                   decoders: Optional[Tuple[XsdAttribute, ...]] = None) -> IdentityCounterType:
244        """
245        Get fields for a schema or instance context element.
246
247        :param elem: an Element or an XsdElement
248        :param namespaces: is an optional mapping from namespace prefix to URI.
249        :param decoders: context schema fields decoders.
250        :return: a tuple with field values. An empty field is replaced by `None`.
251        """
252        fields: List[IdentityFieldItemType] = []
253
254        if not isinstance(elem, XsdComponent):
255            context_class = XPathContext
256        else:
257            context_class = IdentityXPathContext
258
259        result: Any
260        value: Union[AtomicValueType, None]
261        for k, field in enumerate(self.fields):
262            result = field.token.get_results(context_class(elem))  # type: ignore
263
264            if not result:
265                if decoders is not None and decoders[k] is not None:
266                    value = decoders[k].value_constraint
267                    if value is not None:
268                        if decoders[k].type.root_type.name == XSD_QNAME:
269                            value = get_extended_qname(value, namespaces)
270
271                        if isinstance(value, list):
272                            fields.append(tuple(value))
273                        elif isinstance(value, bool):
274                            fields.append((value, bool))
275                        elif not isinstance(value, float):
276                            fields.append(value)
277                        elif math.isnan(value):
278                            fields.append(('nan', float))
279                        else:
280                            fields.append((value, float))
281
282                        continue
283
284                if not isinstance(self, XsdKey) or 'ref' in elem.attrib and \
285                        self.schema.meta_schema is None and self.schema.XSD_VERSION != '1.0':
286                    fields.append(None)
287                elif field.target_namespace not in self.maps.namespaces:
288                    fields.append(None)
289                else:
290                    msg = "missing key field {!r} for {!r}"
291                    raise XMLSchemaValueError(msg.format(field.path, self))
292
293            elif len(result) == 1:
294                if decoders is None or decoders[k] is None:
295                    fields.append(result[0])
296                else:
297                    if decoders[k].type.content_type_label not in ('simple', 'mixed'):
298                        raise XMLSchemaTypeError("%r field doesn't have a simple type!" % field)
299
300                    value = decoders[k].data_value(result[0])
301                    if decoders[k].type.root_type.name == XSD_QNAME:
302                        if isinstance(value, str):
303                            value = get_extended_qname(value, namespaces)
304                        elif isinstance(value, datatypes.QName):
305                            value = value.expanded_name
306
307                    if isinstance(value, list):
308                        fields.append(tuple(value))
309                    elif isinstance(value, bool):
310                        fields.append((value, bool))
311                    elif not isinstance(value, float):
312                        fields.append(value)
313                    elif math.isnan(value):
314                        fields.append(('nan', float))
315                    else:
316                        fields.append((value, float))
317            else:
318                raise XMLSchemaValueError("%r field selects multiple values!" % field)
319
320        return tuple(fields)
321
322    def get_counter(self, enabled: bool = True) -> 'IdentityCounter':
323        return IdentityCounter(self, enabled)
324
325
326class XsdUnique(XsdIdentity):
327    _ADMITTED_TAGS = {XSD_UNIQUE}
328
329
330class XsdKey(XsdIdentity):
331    _ADMITTED_TAGS = {XSD_KEY}
332
333
334class XsdKeyref(XsdIdentity):
335    """
336    Implementation of xs:keyref.
337
338    :ivar refer: reference to a *xs:key* declaration that must be in the same element \
339    or in a descendant element.
340    """
341    _ADMITTED_TAGS = {XSD_KEYREF}
342    refer: Optional[Union[str, XsdKey]] = None
343    refer_path = '.'
344
345    def _parse(self) -> None:
346        super(XsdKeyref, self)._parse()
347        try:
348            self.refer = self.schema.resolve_qname(self.elem.attrib['refer'])
349        except (KeyError, ValueError, RuntimeError) as err:
350            if 'refer' not in self.elem.attrib:
351                self.parse_error("missing required attribute 'refer'")
352            else:
353                self.parse_error(err)
354
355    def build(self) -> None:
356        super(XsdKeyref, self).build()
357
358        if isinstance(self.refer, (XsdKey, XsdUnique)):
359            return  # referenced key/unique identity constraint already set
360        elif isinstance(self.ref, XsdKeyref):
361            self.refer = self.ref.refer
362
363        if self.refer is None:
364            return  # attribute or key/unique identity constraint missing
365        elif isinstance(self.refer, str):
366            refer = self.parent.identities.get(self.refer)
367            if refer is not None and refer.ref is None:
368                self.refer = refer  # type: ignore[assignment]
369            else:
370                try:
371                    self.refer = self.maps.identities[self.refer]  # type: ignore[assignment]
372                except KeyError:
373                    self.parse_error("key/unique identity constraint %r is missing" % self.refer)
374                    return
375
376        if not isinstance(self.refer, (XsdKey, XsdUnique)):
377            self.parse_error("reference to a non key/unique identity constraint %r" % self.refer)
378        elif len(self.refer.fields) != len(self.fields):
379            self.parse_error("field cardinality mismatch between %r and %r" % (self, self.refer))
380        elif self.parent is not self.refer.parent:
381            refer_path = self.refer.parent.get_path(ancestor=self.parent)
382            if refer_path is None:
383                # From a note in par. 3.11.5 Part 1 of XSD 1.0 spec: "keyref
384                # identity-constraints may be defined on domains distinct from
385                # the embedded domain of the identity-constraint they reference,
386                # or the domains may be the same but self-embedding at some depth.
387                # In either case the node table for the referenced identity-constraint
388                # needs to propagate upwards, with conflict resolution."
389                refer_path = self.parent.get_path(ancestor=self.refer.parent, reverse=True)
390                if refer_path is None:
391                    path1 = self.parent.get_path(reverse=True)
392                    path2 = self.refer.parent.get_path()
393                    assert path1 is not None
394                    assert path2 is not None
395                    refer_path = f'{path1}/{path2}'
396
397            self.refer_path = refer_path
398
399    @property
400    def built(self) -> bool:
401        return not isinstance(self.elements, tuple) and isinstance(self.refer, XsdIdentity)
402
403    def get_counter(self, enabled: bool = True) -> 'KeyrefCounter':
404        return KeyrefCounter(self, enabled)
405
406
407class Xsd11Unique(XsdUnique):
408    def _parse(self) -> None:
409        if self._parse_reference():
410            self.ref = True  # type: ignore[assignment]
411        else:
412            super(Xsd11Unique, self)._parse()
413
414
415class Xsd11Key(XsdKey):
416    def _parse(self) -> None:
417        if self._parse_reference():
418            self.ref = True  # type: ignore[assignment]
419        else:
420            super(Xsd11Key, self)._parse()
421
422
423class Xsd11Keyref(XsdKeyref):
424    def _parse(self) -> None:
425        if self._parse_reference():
426            self.ref = True  # type: ignore[assignment]
427        else:
428            super(Xsd11Keyref, self)._parse()
429
430
431class IdentityCounter:
432
433    def __init__(self, identity: XsdIdentity, enabled: bool = True) -> None:
434        self.counter: Counter[IdentityCounterType] = Counter[IdentityCounterType]()
435        self.identity = identity
436        self.enabled = enabled
437
438    def __repr__(self) -> str:
439        return "%s%r" % (self.__class__.__name__[:-7], self.counter)
440
441    def clear(self) -> None:
442        self.counter.clear()
443        self.enabled = True
444
445    def increase(self, fields: IdentityCounterType) -> None:
446        self.counter[fields] += 1
447        if self.counter[fields] == 2:
448            msg = "duplicated value {!r} for {!r}"
449            raise XMLSchemaValueError(msg.format(fields, self.identity))
450
451
452class KeyrefCounter(IdentityCounter):
453    identity: XsdKeyref
454
455    def increase(self, fields: IdentityCounterType) -> None:
456        self.counter[fields] += 1
457
458    def iter_errors(self, identities: IdentityMapType) -> Iterator[XMLSchemaValueError]:
459        refer_values = identities[self.identity.refer].counter
460
461        for v in filter(lambda x: x not in refer_values, self.counter):
462            if len(v) == 1 and v[0] in refer_values:
463                continue
464            elif self.counter[v] > 1:
465                msg = "value {} not found for {!r} ({} times)"
466                yield XMLSchemaValueError(msg.format(v, self.identity.refer, self.counter[v]))
467            else:
468                msg = "value {} not found for {!r}"
469                yield XMLSchemaValueError(msg.format(v, self.identity.refer))
470