1# -*- coding: ascii -*-
2"""
3web2ldap.app.schema.syntaxes: classes for known attribute types
4
5web2ldap - a web-based LDAP Client,
6see https://www.web2ldap.de for details
7
8(c) 1998-2021 by Michael Stroeder <michael@stroeder.com>
9
10This software is distributed under the terms of the
11Apache License Version 2.0 (Apache-2.0)
12https://www.apache.org/licenses/LICENSE-2.0
13"""
14
15import binascii
16import sys
17import os
18import re
19import imghdr
20import sndhdr
21import urllib.parse
22import uuid
23import datetime
24import time
25import json
26import inspect
27import warnings
28from typing import (
29    Callable,
30    Dict,
31    List,
32    Optional,
33    Pattern,
34    Sequence,
35    Tuple,
36)
37
38try:
39    import defusedxml.ElementTree
40except ImportError:
41    DEFUSEDXML_AVAIL = False
42else:
43    DEFUSEDXML_AVAIL = True
44
45from collections import defaultdict
46from io import BytesIO
47
48# Detect Python Imaging Library (PIL)
49try:
50    from PIL import Image as PILImage
51except ImportError:
52    PIL_AVAIL = False
53else:
54    PIL_AVAIL = True
55    warnings.simplefilter('error', PILImage.DecompressionBombWarning)
56
57import ipaddress
58
59import ldap0
60import ldap0.ldapurl
61from ldap0.schema.models import AttributeType, ObjectClass, OBJECTCLASS_KIND_STR
62from ldap0.controls.deref import DereferenceControl
63from ldap0.dn import DNObj, is_dn
64from ldap0.res import SearchResultEntry
65from ldap0.schema.subentry import SubSchema
66
67import web2ldapcnf
68
69from ... import ETC_DIR
70from ...web import forms as web_forms
71from ...msbase import ascii_dump, chunks
72from ...utctime import repr2ts, ts2repr, strftimeiso8601
73from ...ldaputil.oidreg import OID_REG
74from ...log import logger
75from ... import cmp
76from . import schema_anchor
77from ..tmpl import get_variant_filename
78from ...utctime import strptime as utc_strptime
79from ..searchform import (
80    SEARCH_OPT_ATTR_EXISTS,
81    SEARCH_OPT_IS_EQUAL,
82    SEARCH_SCOPE_STR_ONELEVEL,
83)
84
85
86class SyntaxRegistry:
87    """
88    syntax registry used to register plugin classes
89    """
90    __slots__ = (
91        'at2syntax',
92        'oid2syntax',
93    )
94
95    def __init__(self):
96        self.oid2syntax = ldap0.cidict.CIDict()
97        self.at2syntax = defaultdict(dict)
98
99    def reg_syntax(self, cls):
100        """
101        register a syntax classes for an OID
102        """
103        assert isinstance(cls.oid, str), ValueError(
104            'Expected %s.oid to be str, got %r' % (cls.__name__, cls.oid,)
105        )
106        logger.debug('Register syntax class %r with OID %r', cls.__name__, cls.oid)
107        # FIX ME!
108        # A better approach for unique syntax plugin class registration which
109        # allows overriding older registration is needed.
110        if cls.oid in self.oid2syntax and cls != self.oid2syntax[cls.oid]:
111            raise ValueError(
112                (
113                    'Failed to register syntax class %s.%s with OID %s,'
114                    ' already registered by %s.%s'
115                ) % (
116                    cls.__module__,
117                    cls.__name__,
118                    repr(cls.oid),
119                    self.oid2syntax[cls.oid].__module__,
120                    self.oid2syntax[cls.oid].__name__,
121                )
122            )
123        self.oid2syntax[cls.oid] = cls
124
125    def reg_syntaxes(self, modulename):
126        """
127        register all syntax classes found in given module
128        """
129        logger.debug('Register syntax classes from module %r', modulename)
130        for _, cls in inspect.getmembers(sys.modules[modulename], inspect.isclass):
131            if issubclass(cls, LDAPSyntax) and hasattr(cls, 'oid'):
132                self.reg_syntax(cls)
133
134    def reg_at(self, syntax_oid: str, attr_types, structural_oc_oids=None):
135        """
136        register an attribute type (by OID) to explicitly use a certain LDAPSyntax class
137        """
138        logger.debug(
139            'Register syntax OID %s for %r / %r',
140            syntax_oid,
141            attr_types,
142            structural_oc_oids,
143        )
144        assert isinstance(syntax_oid, str), ValueError(
145            'Expected syntax_oid to be str, got %r' % (syntax_oid,)
146        )
147        structural_oc_oids = list(filter(None, map(str.strip, structural_oc_oids or []))) or [None]
148        for atype in attr_types:
149            atype = atype.strip()
150            for oc_oid in structural_oc_oids:
151                # FIX ME!
152                # A better approach for unique attribute type registration which
153                # allows overriding older registration is needed.
154                if atype in self.at2syntax and oc_oid in self.at2syntax[atype]:
155                    logger.warning(
156                        (
157                            'Registering attribute type %r with syntax %r'
158                            ' overrides existing registration with syntax %r'
159                        ),
160                        atype,
161                        syntax_oid,
162                        self.at2syntax[atype],
163                    )
164                self.at2syntax[atype][oc_oid] = syntax_oid
165
166    def get_syntax(self, schema, attrtype_nameoroid, structural_oc):
167        """
168        returns LDAPSyntax class for given attribute type
169        """
170        assert isinstance(attrtype_nameoroid, str), ValueError(
171            'Expected attrtype_nameoroid to be str, got %r' % (attrtype_nameoroid,)
172        )
173        assert structural_oc is None or isinstance(structural_oc, str), ValueError(
174            'Expected structural_oc to be str or None, got %r' % (structural_oc,)
175        )
176        attrtype_oid = schema.get_oid(AttributeType, attrtype_nameoroid)
177        if structural_oc:
178            structural_oc_oid = schema.get_oid(ObjectClass, structural_oc)
179        else:
180            structural_oc_oid = None
181        syntax_oid = LDAPSyntax.oid
182        try:
183            syntax_oid = self.at2syntax[attrtype_oid][structural_oc_oid]
184        except KeyError:
185            try:
186                syntax_oid = self.at2syntax[attrtype_oid][None]
187            except KeyError:
188                attrtype_se = schema.get_inheritedobj(
189                    AttributeType,
190                    attrtype_oid,
191                    ['syntax'],
192                )
193                if attrtype_se and attrtype_se.syntax:
194                    syntax_oid = attrtype_se.syntax
195        try:
196            syntax_class = self.oid2syntax[syntax_oid]
197        except KeyError:
198            syntax_class = LDAPSyntax
199        return syntax_class
200
201    def get_at(self, app, dn, schema, attr_type, attr_value, entry=None):
202        """
203        returns LDAPSyntax instance fully initialized for given attribute
204        """
205        if entry:
206            structural_oc = entry.get_structural_oc()
207        else:
208            structural_oc = None
209        syntax_class = self.get_syntax(schema, attr_type, structural_oc)
210        attr_instance = syntax_class(app, dn, schema, attr_type, attr_value, entry)
211        return attr_instance
212
213    def check(self):
214        """
215        check whether attribute registry dict contains references by OID
216        for which no LDAPSyntax class are registered
217        """
218        logger.debug(
219            'Checking %d LDAPSyntax classes and %d attribute type mappings',
220            len(self.oid2syntax),
221            len(self.at2syntax),
222        )
223        for atype in self.at2syntax:
224            for object_class in self.at2syntax[atype]:
225                if self.at2syntax[atype][object_class] not in self.oid2syntax:
226                    logger.warning('No LDAPSyntax registered for (%r, %r)', atype, object_class)
227
228
229####################################################################
230# Classes of known syntaxes
231####################################################################
232
233
234class LDAPSyntaxValueError(ValueError):
235    """
236    Exception raised in case a syntax check failed
237    """
238
239
240class LDAPSyntaxRegexNoMatch(LDAPSyntaxValueError):
241    """
242    Exception raised in case a regex pattern check failed
243    """
244
245
246class LDAPSyntax:
247    """
248    Base class for all LDAP syntax and attribute value plugin classes
249    """
250    __slots__ = (
251        '_app',
252        '_at',
253        '_av',
254        '_av_u',
255        '_dn',
256        '_entry',
257        '_schema',
258    )
259    oid: str = ''
260    desc: str = 'Any LDAP syntax'
261    input_size: int = 50
262    max_len: int = web2ldapcnf.input_maxfieldlen
263    max_values: int = web2ldapcnf.input_maxattrs
264    mime_type: str = 'application/octet-stream'
265    file_ext: str = 'bin'
266    editable: bool = True
267    pattern: Optional[Pattern[str]] = None
268    input_pattern: Optional[str] = None
269    search_sep: str = '<br>'
270    read_sep: str = '<br>'
271    field_sep: str = '<br>'
272    sani_funcs: Sequence[Callable] = (())
273    show_val_button: bool = True
274
275    def __init__(
276            self,
277            app,
278            dn: Optional[str],
279            schema: SubSchema,
280            attrType: Optional[str],
281            attr_value: Optional[bytes],
282            entry=None,
283        ):
284        if not entry:
285            entry = ldap0.schema.models.Entry(schema, dn, {})
286        assert isinstance(dn, str), \
287            TypeError("Argument 'dn' must be str, was %r" % (dn,))
288        assert isinstance(attrType, str) or attrType is None, \
289            TypeError("Argument 'attrType' must be str or None, was %r" % (attrType,))
290        assert isinstance(attr_value, bytes) or attr_value is None, \
291            TypeError("Argument 'attr_value' must be bytes or None, was %r" % (attr_value,))
292        assert entry is None or isinstance(entry, ldap0.schema.models.Entry), \
293            TypeError('entry must be ldaputil.schema.Entry, was %r' % (entry,))
294        self._at = attrType
295        self._av = attr_value
296        self._av_u = None
297        self._app = app
298        self._schema = schema
299        self._dn = dn
300        self._entry = entry
301
302    @property
303    def dn(self):
304        return DNObj.from_str(self._dn)
305
306    @property
307    def av_u(self):
308        if (self._av is not None and self._av_u is None):
309            self._av_u = self._app.ls.uc_decode(self._av)[0]
310        return self._av_u
311
312    def sanitize(self, attr_value: bytes) -> bytes:
313        """
314        Transforms the HTML form input field values into LDAP string
315        representations and returns raw binary string.
316
317        This is the inverse of LDAPSyntax.form_value().
318
319        When using this method one MUST NOT assume that the whole entry is
320        present.
321        """
322        for sani_func in self.sani_funcs:
323            attr_value = sani_func(attr_value)
324        return attr_value
325
326    def transmute(self, attr_values: List[bytes]) -> List[bytes]:
327        """
328        This method can be implemented to transmute attribute values and has
329        to handle LDAP string representations (raw binary strings).
330
331        This method has access to the whole entry after processing all input.
332
333        Implementors should be prepared that this method could be called
334        more than once. If there's nothing to change then simply return the
335        same value list.
336
337        Exceptions KeyError or IndexError are caught by the calling code to
338        re-iterate invoking this method.
339        """
340        return attr_values
341
342    def _validate(self, attr_value: bytes) -> bool:
343        """
344        check the syntax of attr_value
345
346        Implementors can overload this method to apply arbitrary syntax checks.
347        """
348        return True
349
350    def validate(self, attr_value: bytes):
351        if not attr_value:
352            return
353        if self.pattern and (self.pattern.match(attr_value.decode(self._app.ls.charset)) is None):
354            raise LDAPSyntaxRegexNoMatch(
355                "Class %s: %r does not match pattern %r." % (
356                    self.__class__.__name__,
357                    attr_value,
358                    self.pattern.pattern,
359                )
360            )
361        if not self._validate(attr_value):
362            raise LDAPSyntaxValueError(
363                "Class %s: %r does not comply to syntax (attr type %r)." % (
364                    self.__class__.__name__,
365                    attr_value,
366                    self._at,
367                )
368            )
369        # end of validate()
370
371    def value_button(self, command, row, mode, link_text=None) -> str:
372        """
373        return HTML markup of [+] or [-] submit buttons for adding/removing
374        attribute values
375
376        row
377          row number in input table
378        mode
379          '+' or '-'
380        link_text
381          optionally override displayed link link_text
382        """
383        link_text = link_text or mode
384        if (
385                not self.show_val_button or
386                self.max_values <= 1 or
387                len(self._entry.get(self._at, [])) >= self.max_values
388            ):
389            return ''
390        se_obj = self._schema.get_obj(AttributeType, self._at)
391        if se_obj and se_obj.single_value:
392            return ''
393        return (
394            '<button'
395            ' formaction="%s#in_a_%s"'
396            ' type="submit"'
397            ' name="in_mr"'
398            ' value="%s%d">%s'
399            '</button>'
400        ) % (
401            self._app.form.action_url(command, self._app.sid),
402            self._app.form.s2d(self._at),
403            mode, row, link_text
404        )
405
406    def form_value(self) -> str:
407        """
408        Transform LDAP string representations to HTML form input field
409        values. Returns Unicode string to be encoded with the browser's
410        accepted charset.
411
412        This is the inverse of LDAPSyntax.sanitize().
413        """
414        try:
415            result = self.av_u or ''
416        except UnicodeDecodeError:
417            result = '!!!snipped because of UnicodeDecodeError!!!'
418        return result
419
420    def input_fields(self):
421        return (self.input_field(),)
422
423    def input_field(self) -> web_forms.Field:
424        input_field = web_forms.Input(
425            self._at,
426            ': '.join([self._at, self.desc]),
427            self.max_len,
428            self.max_values,
429            self.input_pattern,
430            default=None,
431            size=min(self.max_len, self.input_size),
432        )
433        input_field.charset = self._app.form.accept_charset
434        input_field.set_default(self.form_value())
435        return input_field
436
437    def display(self, vidx, links) -> str:
438        try:
439            res = self._app.form.s2d(self.av_u)
440        except UnicodeDecodeError:
441            res = self._app.form.s2d(repr(self._av))
442        return res
443
444
445class Binary(LDAPSyntax):
446    """
447    Plugin class for LDAP syntax 'Binary' (see RFC 2252)
448    """
449    oid: str = '1.3.6.1.4.1.1466.115.121.1.5'
450    desc: str = 'Binary'
451    editable: bool = False
452
453    def input_field(self) -> web_forms.Field:
454        field = web_forms.File(
455            self._at,
456            ': '.join([self._at, self.desc]),
457            self.max_len, self.max_values, None, default=self._av, size=50
458        )
459        field.mime_type = self.mime_type
460        return field
461
462    def display(self, vidx, links) -> str:
463        return '%d bytes | %s' % (
464            len(self._av),
465            self._app.anchor(
466                'read', 'View/Load',
467                [
468                    ('dn', self._dn),
469                    ('read_attr', self._at),
470                    ('read_attrindex', str(vidx)),
471                ],
472            )
473        )
474
475
476class Audio(Binary):
477    """
478    Plugin class for LDAP syntax 'Audio' (see RFC 2252)
479    """
480    oid: str = '1.3.6.1.4.1.1466.115.121.1.4'
481    desc: str = 'Audio'
482    mime_type: str = 'audio/basic'
483    file_ext: str = 'au'
484
485    def _validate(self, attr_value: bytes) -> bool:
486        with BytesIO(attr_value) as fileobj:
487            res = sndhdr.test_au(attr_value, fileobj)
488        return res is not None
489
490    def display(self, vidx, links) -> str:
491        mimetype = self.mime_type
492        return (
493            '<embed type="%s" autostart="false" '
494            'src="%s/read/%s?dn=%s&amp;read_attr=%s&amp;read_attrindex=%d">'
495            '%d bytes of audio data (%s)'
496        ) % (
497            mimetype,
498            self._app.form.script_name, self._app.sid,
499            urllib.parse.quote(self._dn.encode(self._app.form.accept_charset)),
500            urllib.parse.quote(self._at),
501            vidx,
502            len(self._av),
503            mimetype
504        )
505
506
507class DirectoryString(LDAPSyntax):
508    """
509    Plugin class for LDAP syntax 'Directory String'
510    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.6)
511    """
512    oid: str = '1.3.6.1.4.1.1466.115.121.1.15'
513    desc: str = 'Directory String'
514    html_tmpl = '{av}'
515
516    def _validate(self, attr_value: bytes) -> bool:
517        try:
518            self._app.ls.uc_decode(attr_value)
519        except UnicodeDecodeError:
520            return False
521        return True
522
523    def display(self, vidx, links) -> str:
524        return self.html_tmpl.format(
525            av=self._app.form.s2d(self.av_u)
526        )
527
528
529class DistinguishedName(DirectoryString):
530    """
531    Plugin class for LDAP syntax 'DN'
532    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.9)
533    """
534    oid: str = '1.3.6.1.4.1.1466.115.121.1.12'
535    desc: str = 'Distinguished Name'
536    isBindDN = False
537    hasSubordinates = False
538    ref_attrs: Optional[Sequence[Tuple[Optional[str], str, Optional[str], str]]] = None
539
540    def _validate(self, attr_value: bytes) -> bool:
541        return is_dn(self._app.ls.uc_decode(attr_value)[0])
542
543    def _additional_links(self):
544        res = []
545        if self._at.lower() != 'entrydn':
546            res.append(
547                self._app.anchor(
548                    'read', 'Read',
549                    [('dn', self.av_u)],
550                )
551            )
552        if self.hasSubordinates:
553            res.append(self._app.anchor(
554                'search', 'Down',
555                (
556                    ('dn', self.av_u),
557                    ('scope', SEARCH_SCOPE_STR_ONELEVEL),
558                    ('filterstr', '(objectClass=*)'),
559                )
560            ))
561        if self.isBindDN:
562            ldap_url_obj = self._app.ls.ldap_url('', add_login=False)
563            res.append(
564                self._app.anchor(
565                    'login',
566                    'Bind as',
567                    [
568                        ('ldapurl', str(ldap_url_obj)),
569                        ('dn', self._dn),
570                        ('login_who', self.av_u),
571                    ],
572                    title='Connect and bind new session as\r\n%s' % (self.av_u)
573                ),
574            )
575        # If self.ref_attrs is not empty then add links for searching back-linking entries
576        for ref_attr_tuple in self.ref_attrs or tuple():
577            try:
578                ref_attr, ref_text, ref_dn, ref_oc, ref_title = ref_attr_tuple
579            except ValueError:
580                ref_oc = None
581                ref_attr, ref_text, ref_dn, ref_title = ref_attr_tuple
582            ref_attr = ref_attr or self._at
583            ref_dn = ref_dn or self._dn
584            ref_title = ref_title or 'Search %s entries referencing entry %s in attribute %s' % (
585                ref_oc, self.av_u, ref_attr,
586            )
587            res.append(self._app.anchor(
588                'search', self._app.form.s2d(ref_text),
589                (
590                    ('dn', ref_dn),
591                    ('search_root', str(self._app.naming_context)),
592                    ('searchform_mode', 'adv'),
593                    ('search_attr', 'objectClass'),
594                    (
595                        'search_option',
596                        {
597                            True: SEARCH_OPT_ATTR_EXISTS,
598                            False: SEARCH_OPT_IS_EQUAL,
599                        }[ref_oc is None]
600                    ),
601                    ('search_string', ref_oc or ''),
602                    ('search_attr', ref_attr),
603                    ('search_option', SEARCH_OPT_IS_EQUAL),
604                    ('search_string', self.av_u),
605                ),
606                title=ref_title,
607            ))
608        return res
609
610    def display(self, vidx, links) -> str:
611        res = [self._app.form.s2d(self.av_u or '- World -')]
612        if links:
613            res.extend(self._additional_links())
614        return web2ldapcnf.command_link_separator.join(res)
615
616
617class BindDN(DistinguishedName):
618    """
619    Plugin class for DNs probably usable as bind-DN
620    """
621    oid: str = 'BindDN-oid'
622    desc: str = 'A Distinguished Name used to bind to a directory'
623    isBindDN = True
624
625
626class AuthzDN(DistinguishedName):
627    """
628    Plugin class for DNs used for authorization
629    """
630    oid: str = 'AuthzDN-oid'
631    desc: str = 'Authz Distinguished Name'
632
633    def display(self, vidx, links) -> str:
634        result = DistinguishedName.display(self, vidx, links)
635        if links:
636            simple_display_str = DistinguishedName.display(
637                self,
638                vidx,
639                links=False,
640            )
641            whoami_display_str = self._app.display_authz_dn(who=self.av_u)
642            if whoami_display_str != simple_display_str:
643                result = '<br>'.join((whoami_display_str, result))
644        return result
645
646
647class NameAndOptionalUID(DistinguishedName):
648    """
649    Plugin class for LDAP syntax 'Name and Optional UID'
650    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.21)
651    """
652    oid: str = '1.3.6.1.4.1.1466.115.121.1.34'
653    desc: str = 'Name And Optional UID'
654
655    @staticmethod
656    def _split_dn_and_uid(val: str) -> Tuple[str, Optional[str]]:
657        try:
658            sep_ind = val.rindex('#')
659        except ValueError:
660            dn = val
661            uid = None
662        else:
663            dn = val[0:sep_ind]
664            uid = val[sep_ind+1:]
665        return dn, uid
666
667    def _validate(self, attr_value: bytes) -> bool:
668        dn, _ = self._split_dn_and_uid(self._app.ls.uc_decode(attr_value)[0])
669        return is_dn(dn)
670
671    def display(self, vidx, links) -> str:
672        value = self.av_u.split('#')
673        dn_str = self._app.display_dn(
674            self.av_u,
675            links=links,
676        )
677        if len(value) == 1 or not value[1]:
678            return dn_str
679        return web2ldapcnf.command_link_separator.join([
680            self._app.form.s2d(value[1]),
681            dn_str,
682        ])
683
684
685class BitString(DirectoryString):
686    """
687    Plugin class for LDAP syntax 'Bit String'
688    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.2)
689    """
690    oid: str = '1.3.6.1.4.1.1466.115.121.1.6'
691    desc: str = 'Bit String'
692    pattern = re.compile("^'[01]+'B$")
693
694
695class IA5String(DirectoryString):
696    """
697    Plugin class for LDAP syntax 'IA5 String'
698    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.15)
699    """
700    oid: str = '1.3.6.1.4.1.1466.115.121.1.26'
701    desc: str = 'IA5 String'
702
703    def _validate(self, attr_value: bytes) -> bool:
704        try:
705            _ = attr_value.decode('ascii').encode('ascii')
706        except UnicodeError:
707            return False
708        return True
709
710
711class GeneralizedTime(IA5String):
712    """
713    Plugin class for LDAP syntax 'Generalized Time'
714    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.13)
715    """
716    oid: str = '1.3.6.1.4.1.1466.115.121.1.24'
717    desc: str = 'Generalized Time'
718    input_size: int = 24
719    max_len: int = 24
720    pattern = re.compile(r'^([0-9]){12,14}((\.|,)[0-9]+)*(Z|(\+|-)[0-9]{4})$')
721    timeDefault = None
722    notBefore = None
723    notAfter = None
724    form_value_fmt = '%Y-%m-%dT%H:%M:%SZ'
725    dtFormats = (
726        '%Y%m%d%H%M%SZ',
727        '%Y-%m-%dT%H:%M:%SZ',
728        '%Y-%m-%dT%H:%MZ',
729        '%Y-%m-%dT%H:%M:%S+00:00',
730        '%Y-%m-%dT%H:%M:%S-00:00',
731        '%Y-%m-%d %H:%M:%SZ',
732        '%Y-%m-%d %H:%MZ',
733        '%Y-%m-%d %H:%M',
734        '%Y-%m-%d %H:%M:%S+00:00',
735        '%Y-%m-%d %H:%M:%S-00:00',
736        '%d.%m.%YT%H:%M:%SZ',
737        '%d.%m.%YT%H:%MZ',
738        '%d.%m.%YT%H:%M:%S+00:00',
739        '%d.%m.%YT%H:%M:%S-00:00',
740        '%d.%m.%Y %H:%M:%SZ',
741        '%d.%m.%Y %H:%MZ',
742        '%d.%m.%Y %H:%M',
743        '%d.%m.%Y %H:%M:%S+00:00',
744        '%d.%m.%Y %H:%M:%S-00:00',
745    )
746    acceptable_formats = (
747        '%Y-%m-%d',
748        '%d.%m.%Y',
749        '%m/%d/%Y',
750    )
751    dt_display_format = (
752        '<time datetime="%Y-%m-%dT%H:%M:%SZ">'
753        '%A (%W. week) %Y-%m-%d %H:%M:%S+00:00'
754        '</time>'
755    )
756
757    def _validate(self, attr_value: bytes) -> bool:
758        try:
759            d_t = utc_strptime(attr_value)
760        except ValueError:
761            return False
762        return (
763            (self.notBefore is None or self.notBefore <= d_t)
764            and (self.notAfter is None or self.notAfter >= d_t)
765        )
766
767    def form_value(self) -> str:
768        if not self._av:
769            return ''
770        try:
771            d_t = datetime.datetime.strptime(self.av_u, r'%Y%m%d%H%M%SZ')
772        except ValueError:
773            result = IA5String.form_value(self)
774        else:
775            result = str(datetime.datetime.strftime(d_t, self.form_value_fmt))
776        return result
777
778    def sanitize(self, attr_value: bytes) -> bytes:
779        av_u = self._app.ls.uc_decode(attr_value.strip().upper())[0]
780        # Special cases first
781        if av_u in {'N', 'NOW', '0'}:
782            return datetime.datetime.strftime(
783                datetime.datetime.utcnow(),
784                r'%Y%m%d%H%M%SZ',
785            ).encode('ascii')
786        # a single integer value is interpreted as seconds relative to now
787        try:
788            float_val = float(av_u)
789        except ValueError:
790            pass
791        else:
792            return datetime.datetime.strftime(
793                datetime.datetime.utcnow()+datetime.timedelta(seconds=float_val),
794                r'%Y%m%d%H%M%SZ',
795            ).encode('ascii')
796        if self.timeDefault:
797            date_format = r'%Y%m%d' + self.timeDefault + 'Z'
798            if av_u in ('T', 'TODAY'):
799                return datetime.datetime.strftime(
800                    datetime.datetime.utcnow(),
801                    date_format,
802                ).encode('ascii')
803            if av_u in ('Y', 'YESTERDAY'):
804                return datetime.datetime.strftime(
805                    datetime.datetime.today()-datetime.timedelta(days=1),
806                    date_format,
807                ).encode('ascii')
808            if av_u in ('T', 'TOMORROW'):
809                return datetime.datetime.strftime(
810                    datetime.datetime.today()+datetime.timedelta(days=1),
811                    date_format,
812                ).encode('ascii')
813        # Try to parse various datetime syntaxes
814        for time_format in self.dtFormats:
815            try:
816                d_t = datetime.datetime.strptime(av_u, time_format)
817            except ValueError:
818                result = None
819            else:
820                result = datetime.datetime.strftime(d_t, r'%Y%m%d%H%M%SZ')
821                break
822        if result is None:
823            if self.timeDefault:
824                for time_format in self.acceptable_formats or []:
825                    try:
826                        d_t = datetime.datetime.strptime(av_u, time_format)
827                    except ValueError:
828                        result = None
829                    else:
830                        result = datetime.datetime.strftime(d_t, r'%Y%m%d'+self.timeDefault+'Z')
831                        break
832            else:
833                result = av_u
834        if result is None:
835            return IA5String.sanitize(self, attr_value)
836        return result.encode('ascii')
837        # end of GeneralizedTime.sanitize()
838
839    def display(self, vidx, links) -> str:
840        try:
841            dt_utc = utc_strptime(self.av_u)
842        except ValueError:
843            return IA5String.display(self, vidx, links)
844        try:
845            dt_utc_str = dt_utc.strftime(self.dt_display_format)
846        except ValueError:
847            return IA5String.display(self, vidx, links)
848        if not links:
849            return dt_utc_str
850        current_time = datetime.datetime.utcnow()
851        time_span = (current_time - dt_utc).total_seconds()
852        return '{dt_utc} ({av})<br>{timespan_disp} {timespan_comment}'.format(
853            dt_utc=dt_utc_str,
854            av=self._app.form.s2d(self.av_u),
855            timespan_disp=self._app.form.s2d(
856                ts2repr(Timespan.time_divisors, ' ', abs(time_span))
857            ),
858            timespan_comment={
859                1: 'ago',
860                0: '',
861                -1: 'ahead',
862            }[cmp(time_span, 0)]
863        )
864
865
866class NotBefore(GeneralizedTime):
867    """
868    Plugin class for attributes indicating start of a period
869    """
870    oid: str = 'NotBefore-oid'
871    desc: str = 'A not-before timestamp by default starting at 00:00:00'
872    timeDefault = '000000'
873
874
875class NotAfter(GeneralizedTime):
876    """
877    Plugin class for attributes indicating end of a period
878    """
879    oid: str = 'NotAfter-oid'
880    desc: str = 'A not-after timestamp by default ending at 23:59:59'
881    timeDefault = '235959'
882
883
884class UTCTime(GeneralizedTime):
885    """
886    Plugin class for LDAP syntax 'UTC Time'
887    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.34)
888    """
889    oid: str = '1.3.6.1.4.1.1466.115.121.1.53'
890    desc: str = 'UTC Time'
891
892
893class NullTerminatedDirectoryString(DirectoryString):
894    """
895    Plugin class for strings terminated with null-byte
896    """
897    oid: str = 'NullTerminatedDirectoryString-oid'
898    desc: str = 'Directory String terminated by null-byte'
899
900    def sanitize(self, attr_value: bytes) -> bytes:
901        return attr_value + b'\x00'
902
903    def _validate(self, attr_value: bytes) -> bool:
904        return attr_value.endswith(b'\x00')
905
906    def form_value(self) -> str:
907        return self._app.ls.uc_decode((self._av or b'\x00')[:-1])[0]
908
909    def display(self, vidx, links) -> str:
910        return self._app.form.s2d(
911            self._app.ls.uc_decode((self._av or b'\x00')[:-1])[0]
912        )
913
914
915class OtherMailbox(DirectoryString):
916    """
917    Plugin class for LDAP syntax 'Other Mailbox'
918    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.27)
919    """
920    oid: str = '1.3.6.1.4.1.1466.115.121.1.39'
921    desc: str = 'Other Mailbox'
922    charset = 'ascii'
923
924
925class Integer(IA5String):
926    """
927    Plugin class for LDAP syntax 'Integer'
928    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.16)
929    """
930    oid: str = '1.3.6.1.4.1.1466.115.121.1.27'
931    desc: str = 'Integer'
932    input_size: int = 12
933    min_value = None
934    max_value = None
935
936    def __init__(self, app, dn: str, schema, attrType: str, attr_value: bytes, entry=None):
937        IA5String.__init__(self, app, dn, schema, attrType, attr_value, entry)
938        if self.max_value is not None:
939            self.max_len = len(str(self.max_value))
940
941    def _maxlen(self, fval):
942        min_value_len = max_value_len = fval_len = 0
943        if self.min_value is not None:
944            min_value_len = len(str(self.min_value))
945        if self.max_value is not None:
946            max_value_len = len(str(self.max_value))
947        if fval is not None:
948            fval_len = len(fval.encode(self._app.ls.charset))
949        return max(self.input_size, fval_len, min_value_len, max_value_len)
950
951    def _validate(self, attr_value: bytes) -> bool:
952        try:
953            val = int(attr_value)
954        except ValueError:
955            return False
956        min_value, max_value = self.min_value, self.max_value
957        return (
958            (min_value is None or val >= min_value) and
959            (max_value is None or val <= max_value)
960        )
961
962    def sanitize(self, attr_value: bytes) -> bytes:
963        try:
964            return str(int(attr_value)).encode('ascii')
965        except ValueError:
966            return attr_value
967
968    def input_field(self) -> web_forms.Field:
969        fval = self.form_value()
970        max_len = self._maxlen(fval)
971        input_field = web_forms.Input(
972            self._at,
973            ': '.join([self._at, self.desc]),
974            max_len,
975            self.max_values,
976            self.input_pattern,
977            default=fval,
978            size=min(self.input_size, max_len),
979        )
980        input_field.input_type = 'number'
981        return input_field
982
983
984class IPHostAddress(IA5String):
985    """
986    Plugin class for string representation of IPv4 or IPv6 host address
987    """
988    oid: str = 'IPHostAddress-oid'
989    desc: str = 'string representation of IPv4 or IPv6 address'
990    # Class in module ipaddr which parses address/network values
991    addr_class = None
992    sani_funcs = (
993        bytes.strip,
994    )
995
996    def _validate(self, attr_value: bytes) -> bool:
997        try:
998            addr = ipaddress.ip_address(attr_value.decode('ascii'))
999        except ValueError:
1000            return False
1001        return self.addr_class is None or isinstance(addr, self.addr_class)
1002
1003
1004class IPv4HostAddress(IPHostAddress):
1005    """
1006    Plugin class for string representation of IPv4 host address
1007    """
1008    oid: str = 'IPv4HostAddress-oid'
1009    desc: str = 'string representation of IPv4 address'
1010    addr_class = ipaddress.IPv4Address
1011
1012
1013class IPv6HostAddress(IPHostAddress):
1014    """
1015    Plugin class for string representation of IPv6 host address
1016    """
1017    oid: str = 'IPv6HostAddress-oid'
1018    desc: str = 'string representation of IPv6 address'
1019    addr_class = ipaddress.IPv6Address
1020
1021
1022class IPNetworkAddress(IPHostAddress):
1023    """
1024    Plugin class for string representation of IPv4 or IPv6 network address
1025    """
1026    oid: str = 'IPNetworkAddress-oid'
1027    desc: str = 'string representation of IPv4 or IPv6 network address/mask'
1028
1029    def _validate(self, attr_value: bytes) -> bool:
1030        try:
1031            addr = ipaddress.ip_network(attr_value.decode('ascii'), strict=False)
1032        except ValueError:
1033            return False
1034        return self.addr_class is None or isinstance(addr, self.addr_class)
1035
1036
1037class IPv4NetworkAddress(IPNetworkAddress):
1038    """
1039    Plugin class for string representation of IPv4 network address
1040    """
1041    oid: str = 'IPv4NetworkAddress-oid'
1042    desc: str = 'string representation of IPv4 network address/mask'
1043    addr_class = ipaddress.IPv4Network
1044
1045
1046class IPv6NetworkAddress(IPNetworkAddress):
1047    """
1048    Plugin class for string representation of IPv6 network address
1049    """
1050    oid: str = 'IPv6NetworkAddress-oid'
1051    desc: str = 'string representation of IPv6 network address/mask'
1052    addr_class = ipaddress.IPv6Network
1053
1054
1055class IPServicePortNumber(Integer):
1056    """
1057    Plugin class for service port number (see /etc/services)
1058    """
1059    oid: str = 'IPServicePortNumber-oid'
1060    desc: str = 'Port number for an UDP- or TCP-based service'
1061    min_value = 0
1062    max_value = 65535
1063
1064
1065class MacAddress(IA5String):
1066    """
1067    Plugin class for IEEEE MAC addresses of network devices
1068    """
1069    oid: str = 'MacAddress-oid'
1070    desc: str = 'MAC address in hex-colon notation'
1071    min_len: int = 17
1072    max_len: int = 17
1073    pattern = re.compile(r'^([0-9a-f]{2}\:){5}[0-9a-f]{2}$')
1074
1075    def sanitize(self, attr_value: bytes) -> bytes:
1076        attr_value = attr_value.translate(None, b'.-: ').lower().strip()
1077        if len(attr_value) == 12:
1078            return b':'.join([attr_value[i*2:i*2+2] for i in range(6)])
1079        return attr_value
1080
1081
1082class Uri(DirectoryString):
1083    """
1084    Plugin class for Uniform Resource Identifiers (URIs, see RFC 2079)
1085    """
1086    oid: str = 'Uri-OID'
1087    desc: str = 'URI'
1088    pattern = re.compile(r'^(ftp|http|https|news|snews|ldap|ldaps|mailto):(|//)[^ ]*')
1089    sani_funcs = (
1090        bytes.strip,
1091    )
1092
1093    def display(self, vidx, links) -> str:
1094        attr_value = self.av_u
1095        try:
1096            url, label = attr_value.split(' ', 1)
1097        except ValueError:
1098            url, label = attr_value, attr_value
1099            display_url = ''
1100        else:
1101            display_url = ' (%s)' % (url)
1102        if ldap0.ldapurl.is_ldapurl(url):
1103            return '<a href="%s?%s">%s%s</a>' % (
1104                self._app.form.script_name,
1105                self._app.form.s2d(url),
1106                self._app.form.s2d(label),
1107                self._app.form.s2d(display_url),
1108            )
1109        if url.lower().find('javascript:') >= 0:
1110            return '<code>%s</code>' % (
1111                DirectoryString.display(self, vidx=False, links=False)
1112            )
1113        return '<a href="%s/urlredirect/%s?%s">%s%s</a>' % (
1114            self._app.form.script_name,
1115            self._app.sid,
1116            self._app.form.s2d(url),
1117            self._app.form.s2d(label),
1118            self._app.form.s2d(display_url),
1119        )
1120
1121
1122class Image(Binary):
1123    """
1124    Plugin base class for attributes containing image data.
1125    """
1126    oid: str = 'Image-OID'
1127    desc: str = 'Image base class'
1128    mime_type: str = 'application/octet-stream'
1129    file_ext: str = 'bin'
1130    imageFormat = None
1131    inline_maxlen = 630  # max. number of bytes to use data: URI instead of external URL
1132
1133    def _validate(self, attr_value: bytes) -> bool:
1134        return imghdr.what(None, attr_value) == self.imageFormat.lower()
1135
1136    def sanitize(self, attr_value: bytes) -> bytes:
1137        if not self._validate(attr_value) and PIL_AVAIL:
1138            try:
1139                with BytesIO(attr_value) as imgfile:
1140                    img = PILImage.open(imgfile)
1141                    imgfile.seek(0)
1142                    img.save(imgfile, self.imageFormat)
1143                    attr_value = imgfile.getvalue()
1144            except Exception as err:
1145                logger.warning(
1146                    'Error converting image data (%d bytes) to %s: %r',
1147                    len(attr_value),
1148                    self.imageFormat,
1149                    err,
1150                )
1151        return attr_value
1152
1153    def display(self, vidx, links) -> str:
1154        maxwidth, maxheight = 100, 150
1155        width, height = None, None
1156        size_attr_html = ''
1157        if PIL_AVAIL:
1158            try:
1159                with BytesIO(self._av) as imgfile:
1160                    img = PILImage.open(imgfile)
1161            except IOError:
1162                pass
1163            else:
1164                width, height = img.size
1165                if width > maxwidth:
1166                    size_attr_html = 'width="%d" height="%d"' % (
1167                        maxwidth,
1168                        int(float(maxwidth)/width*height),
1169                    )
1170                elif height > maxheight:
1171                    size_attr_html = 'width="%d" height="%d"' % (
1172                        int(float(maxheight)/height*width),
1173                        maxheight,
1174                    )
1175                else:
1176                    size_attr_html = 'width="%d" height="%d"' % (width, height)
1177        attr_value_len = len(self._av)
1178        img_link = (
1179            '%s/read/%s'
1180            '?dn=%s&amp;read_attr=%s&amp;read_attrindex=%d'
1181        ) % (
1182            self._app.form.script_name, self._app.sid,
1183            urllib.parse.quote(self._dn),
1184            urllib.parse.quote(self._at),
1185            vidx,
1186        )
1187        if attr_value_len <= self.inline_maxlen:
1188            return (
1189                '<a href="%s">'
1190                '<img src="data:%s;base64,\n%s" alt="%d bytes of image data" %s>'
1191                '</a>'
1192            ) % (
1193                img_link,
1194                self.mime_type,
1195                self._av.encode('base64'),
1196                attr_value_len,
1197                size_attr_html,
1198            )
1199        return '<a href="%s"><img src="%s" alt="%d bytes of image data" %s></a>' % (
1200            img_link,
1201            img_link,
1202            attr_value_len,
1203            size_attr_html,
1204        )
1205
1206
1207class JPEGImage(Image):
1208    """
1209    Plugin class for LDAP syntax 'JPEG'
1210    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.17)
1211    """
1212    oid: str = '1.3.6.1.4.1.1466.115.121.1.28'
1213    desc: str = 'JPEG image'
1214    mime_type: str = 'image/jpeg'
1215    file_ext: str = 'jpg'
1216    imageFormat = 'JPEG'
1217
1218
1219class PhotoG3Fax(Binary):
1220    """
1221    Plugin class for LDAP syntax 'Fax'
1222    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.12)
1223    """
1224    oid: str = '1.3.6.1.4.1.1466.115.121.1.23'
1225    desc: str = 'Photo (G3 fax)'
1226    mime_type: str = 'image/g3fax'
1227    file_ext: str = 'tif'
1228
1229
1230class OID(IA5String):
1231    """
1232    Plugin class for LDAP syntax 'OID'
1233    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.26)
1234    """
1235    oid: str = '1.3.6.1.4.1.1466.115.121.1.38'
1236    desc: str = 'OID'
1237    pattern = re.compile(r'^([a-zA-Z]+[a-zA-Z0-9;-]*|[0-2]?\.([0-9]+\.)*[0-9]+)$')
1238    no_val_button_attrs = frozenset((
1239        'objectclass',
1240        'structuralobjectclass',
1241        '2.5.4.0',
1242        '2.5.21.9',
1243    ))
1244
1245    def value_button(self, command, row, mode, link_text=None) -> str:
1246        if self._at.lower() in self.no_val_button_attrs:
1247            return ''
1248        return IA5String.value_button(self, command, row, mode, link_text=link_text)
1249
1250    def sanitize(self, attr_value: bytes) -> bytes:
1251        return attr_value.strip()
1252
1253    def display(self, vidx, links) -> str:
1254        try:
1255            name, description, reference = OID_REG[self.av_u]
1256        except (KeyError, ValueError):
1257            try:
1258                se_obj = self._schema.get_obj(
1259                    ObjectClass,
1260                    self.av_u,
1261                    raise_keyerror=1,
1262                )
1263            except KeyError:
1264                try:
1265                    se_obj = self._schema.get_obj(
1266                        AttributeType,
1267                        self.av_u,
1268                        raise_keyerror=1,
1269                    )
1270                except KeyError:
1271                    return IA5String.display(self, vidx, links)
1272                return schema_anchor(
1273                    self._app,
1274                    self.av_u,
1275                    AttributeType,
1276                    name_template='{name}\n{anchor}',
1277                    link_text='&raquo',
1278                )
1279            if self._at.lower() == 'structuralobjectclass':
1280                name_template = '{name}\n{anchor}'
1281            else:
1282                name_template = '{name}\n (%s){anchor}' % (OBJECTCLASS_KIND_STR[se_obj.kind],)
1283            # objectClass attribute is displayed with different function
1284            return schema_anchor(
1285                self._app,
1286                self.av_u,
1287                ObjectClass,
1288                name_template=name_template,
1289                link_text='&raquo',
1290            )
1291        return '<strong>%s</strong> (%s):<br>%s (see %s)' % (
1292            self._app.form.s2d(name),
1293            IA5String.display(self, vidx, links),
1294            self._app.form.s2d(description),
1295            self._app.form.s2d(reference)
1296        )
1297
1298
1299class LDAPUrl(Uri):
1300    """
1301    Plugin class for attributes containing LDAP URLs
1302    """
1303    oid: str = 'LDAPUrl-oid'
1304    desc: str = 'LDAP URL'
1305
1306    def _command_ldap_url(self, ldap_url):
1307        return ldap_url
1308
1309    def display(self, vidx, links) -> str:
1310        try:
1311            if links:
1312                linksstr = self._app.ldap_url_anchor(
1313                    self._command_ldap_url(self.av_u),
1314                )
1315            else:
1316                linksstr = ''
1317        except ValueError:
1318            return '<strong>Not a valid LDAP URL:</strong> %s' % (
1319                self._app.form.s2d(repr(self._av))
1320            )
1321        return '<table><tr><td>%s</td><td><a href="%s">%s</a></td></tr></table>' % (
1322            linksstr,
1323            self._app.form.s2d(self.av_u),
1324            self._app.form.s2d(self.av_u)
1325        )
1326
1327
1328class OctetString(Binary):
1329    """
1330    Plugin class for LDAP syntax 'Octet String'
1331    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.25)
1332    """
1333    oid: str = '1.3.6.1.4.1.1466.115.121.1.40'
1334    desc: str = 'Octet String'
1335    editable: bool = True
1336    min_input_rows = 1  # minimum number of rows for input field
1337    max_input_rows = 15 # maximum number of rows for in input field
1338    bytes_split = 16
1339
1340    def sanitize(self, attr_value: bytes) -> bytes:
1341        attr_value = attr_value.translate(None, b': ,\r\n')
1342        try:
1343            res = binascii.unhexlify(attr_value)
1344        except binascii.Error:
1345            res = attr_value
1346        return res
1347
1348    def display(self, vidx, links) -> str:
1349        lines = [
1350            (
1351                '<tr>'
1352                '<td><code>%0.6X</code></td>'
1353                '<td><code>%s</code></td>'
1354                '<td><code>%s</code></td>'
1355                '</tr>'
1356            ) % (
1357                i*self.bytes_split,
1358                ':'.join(c[j:j+1].hex().upper() for j in range(len(c))),
1359                self._app.form.s2d(ascii_dump(c), 'ascii'),
1360            )
1361            for i, c in enumerate(chunks(self._av, self.bytes_split))
1362        ]
1363        return '\n<table class="HexDump">\n%s\n</table>\n' % ('\n'.join(lines))
1364
1365    def form_value(self) -> str:
1366        hex_av = (self._av or b'').hex().upper()
1367        hex_range = range(0, len(hex_av), 2)
1368        return str('\r\n'.join(
1369            chunks(
1370                ':'.join([hex_av[i:i+2] for i in hex_range]),
1371                self.bytes_split*3
1372            )
1373        ))
1374
1375    def input_field(self) -> web_forms.Field:
1376        fval = self.form_value()
1377        return web_forms.Textarea(
1378            self._at,
1379            ': '.join([self._at, self.desc]),
1380            10000, 1,
1381            None,
1382            default=fval,
1383            rows=max(self.min_input_rows, min(self.max_input_rows, fval.count('\r\n'))),
1384            cols=49
1385        )
1386
1387
1388class MultilineText(DirectoryString):
1389    """
1390    Plugin base class for multi-line text.
1391    """
1392    oid: str = 'MultilineText-oid'
1393    desc: str = 'Multiple lines of text'
1394    pattern = re.compile('^.*$', re.S+re.M)
1395    lineSep = b'\r\n'
1396    mime_type: str = 'text/plain'
1397    cols = 66
1398    min_input_rows = 1   # minimum number of rows for input field
1399    max_input_rows = 30  # maximum number of rows for in input field
1400
1401    def _split_lines(self, value):
1402        if self.lineSep:
1403            return value.split(self.lineSep)
1404        return [value]
1405
1406    def sanitize(self, attr_value: bytes) -> bytes:
1407        return attr_value.replace(
1408            b'\r', b''
1409        ).replace(
1410            b'\n', self.lineSep
1411        )
1412
1413    def display(self, vidx, links) -> str:
1414        return '<br>'.join([
1415            self._app.form.s2d(self._app.ls.uc_decode(line_b)[0])
1416            for line_b in self._split_lines(self._av)
1417        ])
1418
1419    def form_value(self) -> str:
1420        splitted_lines = [
1421            self._app.ls.uc_decode(line_b)[0]
1422            for line_b in self._split_lines(self._av or b'')
1423        ]
1424        return '\r\n'.join(splitted_lines)
1425
1426    def input_field(self) -> web_forms.Field:
1427        fval = self.form_value()
1428        return web_forms.Textarea(
1429            self._at,
1430            ': '.join([self._at, self.desc]),
1431            self.max_len, self.max_values,
1432            None,
1433            default=fval,
1434            rows=max(self.min_input_rows, min(self.max_input_rows, fval.count('\r\n'))),
1435            cols=self.cols
1436        )
1437
1438
1439class PreformattedMultilineText(MultilineText):
1440    """
1441    Plugin base class for multi-line text displayed with mono-spaced font,
1442    e.g. program code, XML, JSON etc.
1443    """
1444    oid: str = 'PreformattedMultilineText-oid'
1445    cols = 66
1446    tab_identiation = '&nbsp;&nbsp;&nbsp;&nbsp;'
1447
1448    def display(self, vidx, links) -> str:
1449        lines = [
1450            self._app.form.s2d(
1451                self._app.ls.uc_decode(line_b)[0],
1452                self.tab_identiation,
1453            )
1454            for line_b in self._split_lines(self._av)
1455        ]
1456        return '<code>%s</code>' % '<br>'.join(lines)
1457
1458
1459class PostalAddress(MultilineText):
1460    """
1461    Plugin class for LDAP syntax 'Postal Address'
1462    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.28)
1463    """
1464    oid: str = '1.3.6.1.4.1.1466.115.121.1.41'
1465    desc: str = 'Postal Address'
1466    lineSep = b' $ '
1467    cols = 40
1468
1469    def _split_lines(self, value):
1470        return [
1471            v.strip()
1472            for v in value.split(self.lineSep.strip())
1473        ]
1474
1475    def sanitize(self, attr_value: bytes) -> bytes:
1476        return attr_value.replace(b'\r', b'').replace(b'\n', self.lineSep)
1477
1478
1479class PrintableString(DirectoryString):
1480    """
1481    Plugin class for LDAP syntax 'Printable String'
1482    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.29)
1483    """
1484    oid: str = '1.3.6.1.4.1.1466.115.121.1.44'
1485    desc: str = 'Printable String'
1486    pattern = re.compile("^[a-zA-Z0-9'()+,.=/:? -]*$")
1487    charset = 'ascii'
1488
1489
1490class NumericString(PrintableString):
1491    """
1492    Plugin class for LDAP syntax 'Numeric String'
1493    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.23)
1494    """
1495    oid: str = '1.3.6.1.4.1.1466.115.121.1.36'
1496    desc: str = 'Numeric String'
1497    pattern = re.compile('^[ 0-9]+$')
1498
1499
1500class EnhancedGuide(PrintableString):
1501    """
1502    Plugin class for LDAP syntax 'Enhanced Guide'
1503    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.10)
1504    """
1505    oid: str = '1.3.6.1.4.1.1466.115.121.1.21'
1506    desc: str = 'Enhanced Search Guide'
1507
1508
1509class Guide(EnhancedGuide):
1510    """
1511    Plugin class for LDAP syntax 'Search Guide'
1512    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.14)
1513    """
1514    oid: str = '1.3.6.1.4.1.1466.115.121.1.25'
1515    desc: str = 'Search Guide'
1516
1517
1518class TelephoneNumber(PrintableString):
1519    """
1520    Plugin class for LDAP syntax ''
1521    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.31)
1522    """
1523    oid: str = '1.3.6.1.4.1.1466.115.121.1.50'
1524    desc: str = 'Telephone Number'
1525    pattern = re.compile('^[0-9+x(). /-]+$')
1526
1527
1528class FacsimileTelephoneNumber(TelephoneNumber):
1529    """
1530    Plugin class for LDAP syntax 'Facsimile Telephone Number'
1531    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.11)
1532    """
1533    oid: str = '1.3.6.1.4.1.1466.115.121.1.22'
1534    desc: str = 'Facsimile Number'
1535    pattern = re.compile(
1536        r'^[0-9+x(). /-]+'
1537        r'(\$'
1538        r'(twoDimensional|fineResolution|unlimitedLength|b4Length|a3Width|b4Width|uncompressed)'
1539        r')*$'
1540    )
1541
1542
1543class TelexNumber(PrintableString):
1544    """
1545    Plugin class for LDAP syntax 'Telex Number'
1546    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.33)
1547    """
1548    oid: str = '1.3.6.1.4.1.1466.115.121.1.52'
1549    desc: str = 'Telex Number'
1550    pattern = re.compile("^[a-zA-Z0-9'()+,.=/:?$ -]*$")
1551
1552
1553class TeletexTerminalIdentifier(PrintableString):
1554    """
1555    Plugin class for LDAP syntax 'Teletex Terminal Identifier'
1556    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.32)
1557    """
1558    oid: str = '1.3.6.1.4.1.1466.115.121.1.51'
1559    desc: str = 'Teletex Terminal Identifier'
1560
1561
1562class ObjectGUID(LDAPSyntax):
1563    oid: str = 'ObjectGUID-oid'
1564    desc: str = 'Object GUID'
1565    charset = 'ascii'
1566
1567    def display(self, vidx, links) -> str:
1568        objectguid_str = ''.join([
1569            '%02X' % ord(c)
1570            for c in self._av
1571        ])
1572        return ldap0.ldapurl.LDAPUrl(
1573            ldapUrl=self._app.ls.uri,
1574            dn='GUID=%s' % (objectguid_str),
1575            who=None, cred=None
1576        ).htmlHREF(
1577            hrefText=objectguid_str,
1578            hrefTarget=None
1579        )
1580
1581
1582class Date(IA5String):
1583    """
1584    Plugin base class for a date without(!) time component.
1585    """
1586    oid: str = 'Date-oid'
1587    desc: str = 'Date in syntax specified by class attribute storage_format'
1588    max_len: int = 10
1589    storage_format = '%Y-%m-%d'
1590    acceptable_formats = (
1591        '%Y-%m-%d',
1592        '%d.%m.%Y',
1593        '%m/%d/%Y',
1594    )
1595
1596    def _validate(self, attr_value: bytes) -> bool:
1597        try:
1598            datetime.datetime.strptime(
1599                self._app.ls.uc_decode(attr_value)[0],
1600                self.storage_format
1601            )
1602        except (UnicodeDecodeError, ValueError):
1603            return False
1604        return True
1605
1606    def sanitize(self, attr_value: bytes) -> bytes:
1607        av_u = attr_value.strip().decode(self._app.ls.charset)
1608        result = attr_value
1609        for time_format in self.acceptable_formats:
1610            try:
1611                time_tuple = datetime.datetime.strptime(av_u, time_format)
1612            except ValueError:
1613                pass
1614            else:
1615                result = datetime.datetime.strftime(time_tuple, self.storage_format).encode('ascii')
1616                break
1617        return result # sanitize()
1618
1619
1620class NumstringDate(Date):
1621    """
1622    Plugin class for a date using syntax YYYYMMDD typically
1623    using LDAP syntax Numstring.
1624    """
1625    oid: str = 'NumstringDate-oid'
1626    desc: str = 'Date in syntax YYYYMMDD'
1627    pattern = re.compile('^[0-9]{4}[0-1][0-9][0-3][0-9]$')
1628    storage_format = '%Y%m%d'
1629
1630
1631class ISO8601Date(Date):
1632    """
1633    Plugin class for a date using syntax YYYY-MM-DD (see ISO 8601).
1634    """
1635    oid: str = 'ISO8601Date-oid'
1636    desc: str = 'Date in syntax YYYY-MM-DD (see ISO 8601)'
1637    pattern = re.compile('^[0-9]{4}-[0-1][0-9]-[0-3][0-9]$')
1638    storage_format = '%Y-%m-%d'
1639
1640
1641class DateOfBirth(ISO8601Date):
1642    """
1643    Plugin class for date of birth syntax YYYY-MM-DD (see ISO 8601).
1644
1645    Displays the age based at current time.
1646    """
1647    oid: str = 'DateOfBirth-oid'
1648    desc: str = 'Date of birth: syntax YYYY-MM-DD (see ISO 8601)'
1649
1650    @staticmethod
1651    def _age(birth_dt):
1652        birth_date = datetime.date(
1653            year=birth_dt.year,
1654            month=birth_dt.month,
1655            day=birth_dt.day,
1656        )
1657        current_date = datetime.date.today()
1658        age = current_date.year - birth_date.year
1659        if birth_date.month > current_date.month or \
1660           (birth_date.month == current_date.month and birth_date.day > current_date.day):
1661            age = age - 1
1662        return age
1663
1664    def _validate(self, attr_value: bytes) -> bool:
1665        try:
1666            birth_dt = datetime.datetime.strptime(
1667                self._app.ls.uc_decode(attr_value)[0],
1668                self.storage_format
1669            )
1670        except ValueError:
1671            return False
1672        return self._age(birth_dt) >= 0
1673
1674    def display(self, vidx, links) -> str:
1675        raw_date = ISO8601Date.display(self, vidx, links)
1676        try:
1677            birth_dt = datetime.datetime.strptime(self.av_u, self.storage_format)
1678        except ValueError:
1679            return raw_date
1680        return '%s (%s years old)' % (raw_date, self._age(birth_dt))
1681
1682
1683class SecondsSinceEpoch(Integer):
1684    """
1685    Plugin class for seconds since epoch (1970-01-01 00:00:00).
1686    """
1687    oid: str = 'SecondsSinceEpoch-oid'
1688    desc: str = 'Seconds since epoch (1970-01-01 00:00:00)'
1689    min_value = 0
1690
1691    def display(self, vidx, links) -> str:
1692        int_str = Integer.display(self, vidx, links)
1693        try:
1694            return '%s (%s)' % (
1695                strftimeiso8601(time.gmtime(float(self._av))),
1696                int_str,
1697            )
1698        except ValueError:
1699            return int_str
1700
1701
1702class DaysSinceEpoch(Integer):
1703    """
1704    Plugin class for days since epoch (1970-01-01).
1705    """
1706    oid: str = 'DaysSinceEpoch-oid'
1707    desc: str = 'Days since epoch (1970-01-01)'
1708    min_value = 0
1709
1710    def display(self, vidx, links) -> str:
1711        int_str = Integer.display(self, vidx, links)
1712        try:
1713            return '%s (%s)' % (
1714                strftimeiso8601(time.gmtime(float(self._av)*86400)),
1715                int_str,
1716            )
1717        except ValueError:
1718            return int_str
1719
1720
1721class Timespan(Integer):
1722    oid: str = 'Timespan-oid'
1723    desc: str = 'Time span in seconds'
1724    input_size: int = LDAPSyntax.input_size
1725    min_value = 0
1726    time_divisors = (
1727        ('weeks', 604800),
1728        ('days', 86400),
1729        ('hours', 3600),
1730        ('mins', 60),
1731        ('secs', 1),
1732    )
1733    sep = ','
1734
1735    def sanitize(self, attr_value: bytes) -> bytes:
1736        if not attr_value:
1737            return attr_value
1738        try:
1739            result = repr2ts(
1740                self.time_divisors,
1741                self.sep,
1742                attr_value.decode('ascii')
1743            ).encode('ascii')
1744        except ValueError:
1745            result = Integer.sanitize(self, attr_value)
1746        return result
1747
1748    def form_value(self) -> str:
1749        if not self._av:
1750            return ''
1751        try:
1752            result = ts2repr(self.time_divisors, self.sep, self._av)
1753        except ValueError:
1754            result = Integer.form_value(self)
1755        return result
1756
1757    def input_field(self) -> web_forms.Field:
1758        return IA5String.input_field(self)
1759
1760    def display(self, vidx, links) -> str:
1761        try:
1762            result = self._app.form.s2d('%s (%s)' % (
1763                ts2repr(self.time_divisors, self.sep, self.av_u),
1764                Integer.display(self, vidx, links)
1765            ))
1766        except ValueError:
1767            result = Integer.display(self, vidx, links)
1768        return result
1769
1770
1771class SelectList(DirectoryString):
1772    """
1773    Base class for dictionary based select lists which
1774    should not be used directly
1775    """
1776    oid: str = 'SelectList-oid'
1777    attr_value_dict: Dict[str, str] = {}   # Mapping attribute value to attribute description
1778    input_fallback: bool = True  # Fallback to normal input field if attr_value_dict is empty
1779    desc_sep: str = ' '
1780    tag_tmpl: Dict[bool, str] = {
1781        False: '{attr_text}: {attr_value}',
1782        True: '<span title="{attr_title}">{attr_text}:{sep}{attr_value}</span>',
1783    }
1784
1785    def get_attr_value_dict(self) -> Dict[str, str]:
1786        # Enable empty value in any case
1787        attr_value_dict: Dict[str, str] = {'': '-/-'}
1788        attr_value_dict.update(self.attr_value_dict)
1789        return attr_value_dict
1790
1791    def _sorted_select_options(self):
1792        # First generate a set of all other currently available attribute values
1793        fval = DirectoryString.form_value(self)
1794        # Initialize a dictionary with all options
1795        vdict = self.get_attr_value_dict()
1796        # Remove other existing values from the options dict
1797        for val in self._entry.get(self._at, []):
1798            val = self._app.ls.uc_decode(val)[0]
1799            if val != fval:
1800                try:
1801                    del vdict[val]
1802                except KeyError:
1803                    pass
1804        # Add the current attribute value if needed
1805        if fval not in vdict:
1806            vdict[fval] = fval
1807        # Finally return the sorted option list
1808        result = []
1809        for key, val in vdict.items():
1810            if isinstance(val, str):
1811                result.append((key, val, None))
1812            elif isinstance(val, tuple):
1813                result.append((key, val[0], val[1]))
1814        return sorted(
1815            result,
1816            key=lambda x: x[1].lower(),
1817        )
1818
1819    def _validate(self, attr_value: bytes) -> bool:
1820        attr_value_dict: Dict[str, str] = self.get_attr_value_dict()
1821        return self._app.ls.uc_decode(attr_value)[0] in attr_value_dict
1822
1823    def display(self, vidx, links) -> str:
1824        attr_value_str = DirectoryString.display(self, vidx, links)
1825        attr_value_dict: Dict[str, str] = self.get_attr_value_dict()
1826        try:
1827            attr_value_desc = attr_value_dict[self.av_u]
1828        except KeyError:
1829            return attr_value_str
1830        try:
1831            attr_text, attr_title = attr_value_desc
1832        except ValueError:
1833            attr_text, attr_title = attr_value_desc, None
1834        if attr_text == attr_value_str:
1835            return attr_value_str
1836        return self.tag_tmpl[bool(attr_title)].format(
1837            attr_value=attr_value_str,
1838            sep=self.desc_sep,
1839            attr_text=self._app.form.s2d(attr_text),
1840            attr_title=self._app.form.s2d(attr_title or '')
1841        )
1842
1843    def input_field(self) -> web_forms.Field:
1844        attr_value_dict: Dict[str, str] = self.get_attr_value_dict()
1845        if self.input_fallback and \
1846           (not attr_value_dict or not list(filter(None, attr_value_dict.keys()))):
1847            return DirectoryString.input_field(self)
1848        field = web_forms.Select(
1849            self._at,
1850            ': '.join([self._at, self.desc]),
1851            1,
1852            options=self._sorted_select_options(),
1853            default=self.form_value(),
1854            required=0
1855        )
1856        field.charset = self._app.form.accept_charset
1857        return field
1858
1859
1860class PropertiesSelectList(SelectList):
1861    """
1862    Plugin base class for attribute value select lists of LDAP syntax DirectoryString
1863    constructed and validated by reading a properties file.
1864    """
1865    oid: str = 'PropertiesSelectList-oid'
1866    properties_pathname: Optional[str] = None
1867    properties_charset: str = 'utf-8'
1868    properties_delimiter: str = '='
1869
1870    def get_attr_value_dict(self) -> Dict[str, str]:
1871        attr_value_dict: Dict[str, str] = SelectList.get_attr_value_dict(self)
1872        real_path_name = get_variant_filename(
1873            self.properties_pathname,
1874            self._app.form.accept_language
1875        )
1876        with open(real_path_name, 'rb') as prop_file:
1877            for line in prop_file.readlines():
1878                line = line.decode(self.properties_charset).strip()
1879                if line and not line.startswith('#'):
1880                    key, value = line.split(self.properties_delimiter, 1)
1881                    attr_value_dict[key.strip()] = value.strip()
1882        return attr_value_dict
1883        # end of get_attr_value_dict()
1884
1885
1886class DynamicValueSelectList(SelectList, DirectoryString):
1887    """
1888    Plugin base class for attribute value select lists of LDAP syntax DirectoryString
1889    constructed and validated by internal LDAP search.
1890    """
1891    oid: str = 'DynamicValueSelectList-oid'
1892    ldap_url: Optional[str] = None
1893    value_prefix: str = ''
1894    value_suffix: str = ''
1895    ignored_errors = (
1896        ldap0.NO_SUCH_OBJECT,
1897        ldap0.SIZELIMIT_EXCEEDED,
1898        ldap0.TIMELIMIT_EXCEEDED,
1899        ldap0.PARTIAL_RESULTS,
1900        ldap0.INSUFFICIENT_ACCESS,
1901        ldap0.CONSTRAINT_VIOLATION,
1902        ldap0.REFERRAL,
1903    )
1904
1905    def __init__(self, app, dn: str, schema, attrType: str, attr_value: bytes, entry=None):
1906        self.lu_obj = ldap0.ldapurl.LDAPUrl(self.ldap_url)
1907        self.min_len = len(self.value_prefix)+len(self.value_suffix)
1908        SelectList.__init__(self, app, dn, schema, attrType, attr_value, entry)
1909
1910    def _filterstr(self):
1911        return self.lu_obj.filterstr or '(objectClass=*)'
1912
1913    def _search_ref(self, attr_value: str):
1914        attr_value = attr_value[len(self.value_prefix):-len(self.value_suffix) or None]
1915        search_filter = '(&%s(%s=%s))' % (
1916            self._filterstr(),
1917            self.lu_obj.attrs[0],
1918            attr_value,
1919        )
1920        try:
1921            ldap_result = self._app.ls.l.search_s(
1922                self._search_root(),
1923                self.lu_obj.scope,
1924                search_filter,
1925                attrlist=self.lu_obj.attrs,
1926                sizelimit=2,
1927            )
1928        except (
1929                ldap0.NO_SUCH_OBJECT,
1930                ldap0.CONSTRAINT_VIOLATION,
1931                ldap0.INSUFFICIENT_ACCESS,
1932                ldap0.REFERRAL,
1933                ldap0.SIZELIMIT_EXCEEDED,
1934                ldap0.TIMELIMIT_EXCEEDED,
1935            ):
1936            return None
1937        # Filter out LDAP referrals
1938        ldap_result = [
1939            (sre.dn_s, sre.entry_s)
1940            for sre in ldap_result
1941            if isinstance(sre, SearchResultEntry)
1942        ]
1943        if ldap_result and len(ldap_result) == 1:
1944            return ldap_result[0]
1945        return None
1946
1947    def _validate(self, attr_value: bytes) -> bool:
1948        av_u = self._app.ls.uc_decode(attr_value)[0]
1949        if (
1950                not av_u.startswith(self.value_prefix) or
1951                not av_u.endswith(self.value_suffix) or
1952                len(av_u) < self.min_len or
1953                (self.max_len is not None and len(av_u) > self.max_len)
1954            ):
1955            return False
1956        return self._search_ref(av_u) is not None
1957
1958    def display(self, vidx, links) -> str:
1959        if links and self.lu_obj.attrs:
1960            ref_result = self._search_ref(self.av_u)
1961            if ref_result:
1962                ref_dn, ref_entry = ref_result
1963                try:
1964                    attr_value_desc = ref_entry[self.lu_obj.attrs[1]][0]
1965                except (KeyError, IndexError):
1966                    display_text, link_html = '', ''
1967                else:
1968                    if self.lu_obj.attrs[0].lower() == self.lu_obj.attrs[1].lower():
1969                        display_text = ''
1970                    else:
1971                        display_text = self._app.form.s2d(attr_value_desc+':')
1972                    if links:
1973                        link_html = self._app.anchor(
1974                            'read', '&raquo;',
1975                            [('dn', ref_dn)],
1976                        )
1977                    else:
1978                        link_html = ''
1979            else:
1980                display_text, link_html = '', ''
1981        else:
1982            display_text, link_html = '', ''
1983        return ' '.join((
1984            display_text,
1985            DirectoryString.display(self, vidx, links),
1986            link_html,
1987        ))
1988
1989    def _search_root(self) -> str:
1990        ldap_url_dn = self.lu_obj.dn
1991        if ldap_url_dn == '_':
1992            result_dn = str(self._app.naming_context)
1993        elif ldap_url_dn == '.':
1994            result_dn = self._dn
1995        elif ldap_url_dn == '..':
1996            result_dn = str(self.dn.parent())
1997        elif ldap_url_dn.endswith(',_'):
1998            result_dn = ','.join((ldap_url_dn[:-2], str(self._app.naming_context)))
1999        elif ldap_url_dn.endswith(',.'):
2000            result_dn = ','.join((ldap_url_dn[:-2], self._dn))
2001        elif ldap_url_dn.endswith(',..'):
2002            result_dn = ','.join((ldap_url_dn[:-3], str(self.dn.parent())))
2003        else:
2004            result_dn = ldap_url_dn
2005        if result_dn.endswith(','):
2006            result_dn = result_dn[:-1]
2007        return result_dn
2008        # end of _search_root()
2009
2010    def get_attr_value_dict(self) -> Dict[str, str]:
2011        attr_value_dict: Dict[str, str] = SelectList.get_attr_value_dict(self)
2012        if self.lu_obj.hostport:
2013            raise ValueError(
2014                'Connecting to other server not supported! hostport attribute was %r' % (
2015                    self.lu_obj.hostport
2016                )
2017            )
2018        search_scope = self.lu_obj.scope or ldap0.SCOPE_BASE
2019        search_attrs = (self.lu_obj.attrs or []) + ['description', 'info']
2020        # Use the existing LDAP connection as current user
2021        try:
2022            ldap_result = self._app.ls.l.search_s(
2023                self._search_root(),
2024                search_scope,
2025                filterstr=self._filterstr(),
2026                attrlist=search_attrs,
2027            )
2028        except self.ignored_errors:
2029            return {}
2030        if search_scope == ldap0.SCOPE_BASE:
2031            # When reading a single entry we build the map from a single multi-valued attribute
2032            assert len(self.lu_obj.attrs or []) == 1, ValueError(
2033                'attrlist in ldap_url must be of length 1 if scope is base, got %r' % (
2034                    self.lu_obj.attrs,
2035                )
2036            )
2037            list_attr = self.lu_obj.attrs[0]
2038            attr_values_u = [
2039                ''.join((
2040                    self.value_prefix,
2041                    attr_value,
2042                    self.value_suffix,
2043                ))
2044                for attr_value in ldap_result[0].entry_s[list_attr]
2045            ]
2046            attr_value_dict: Dict[str, str] = {
2047                u: u
2048                for u in attr_values_u
2049            }
2050        else:
2051            if not self.lu_obj.attrs:
2052                option_value_map, option_text_map = (None, None)
2053            elif len(self.lu_obj.attrs) == 1:
2054                option_value_map, option_text_map = (None, self.lu_obj.attrs[0])
2055            elif len(self.lu_obj.attrs) >= 2:
2056                option_value_map, option_text_map = self.lu_obj.attrs[:2]
2057            for sre in ldap_result:
2058                # Check whether it's a real search result (skip search continuations)
2059                if not isinstance(sre, SearchResultEntry):
2060                    continue
2061                sre.entry_s[None] = [sre.dn_s]
2062                try:
2063                    option_value = ''.join((
2064                        self.value_prefix,
2065                        sre.entry_s[option_value_map][0],
2066                        self.value_suffix,
2067                    ))
2068                except KeyError:
2069                    pass
2070                else:
2071                    try:
2072                        option_text = sre.entry_s[option_text_map][0]
2073                    except KeyError:
2074                        option_text = option_value
2075                    option_title = sre.entry_s.get('description', sre.entry_s.get('info', ['']))[0]
2076                    if option_title:
2077                        attr_value_dict[option_value] = (option_text, option_title)
2078                    else:
2079                        attr_value_dict[option_value] = option_text
2080        return attr_value_dict
2081        # end of get_attr_value_dict()
2082
2083
2084class DynamicDNSelectList(DynamicValueSelectList, DistinguishedName):
2085    """
2086    Plugin base class for attribute value select lists of LDAP syntax DN
2087    constructed and validated by internal LDAP search.
2088    """
2089    oid: str = 'DynamicDNSelectList-oid'
2090
2091    def _get_ref_entry(self, dn: str, attrlist=None) -> dict:
2092        try:
2093            sre = self._app.ls.l.read_s(
2094                dn,
2095                attrlist=attrlist or self.lu_obj.attrs,
2096                filterstr=self._filterstr(),
2097            )
2098        except (
2099                ldap0.NO_SUCH_OBJECT,
2100                ldap0.CONSTRAINT_VIOLATION,
2101                ldap0.INSUFFICIENT_ACCESS,
2102                ldap0.INVALID_DN_SYNTAX,
2103                ldap0.REFERRAL,
2104            ):
2105            return None
2106        if sre is None:
2107            return None
2108        return sre.entry_s
2109
2110    def _validate(self, attr_value: bytes) -> bool:
2111        return SelectList._validate(self, attr_value)
2112
2113    def display(self, vidx, links) -> str:
2114        if links and self.lu_obj.attrs:
2115            ref_entry = self._get_ref_entry(self.av_u) or {}
2116            try:
2117                attr_value_desc = ref_entry[self.lu_obj.attrs[0]][0]
2118            except (KeyError, IndexError):
2119                display_text = ''
2120            else:
2121                display_text = self._app.form.s2d(attr_value_desc+': ')
2122        else:
2123            display_text = ''
2124        return self.desc_sep.join((
2125            display_text,
2126            DistinguishedName.display(self, vidx, links)
2127        ))
2128
2129
2130class DerefDynamicDNSelectList(DynamicDNSelectList):
2131    """
2132    Plugin base class for attribute value select lists of LDAP syntax DN
2133    constructed and validated by internal LDAP search.
2134
2135    Same as DynamicDNSelectList except that Dereference extended control is used.
2136    """
2137    oid: str = 'DerefDynamicDNSelectList-oid'
2138
2139    def _get_ref_entry(self, dn: str, attrlist=None) -> dict:
2140        deref_crtl = DereferenceControl(
2141            True,
2142            {self._at: self.lu_obj.attrs or ['entryDN']}
2143        )
2144        try:
2145            ldap_result = self._app.ls.l.search_s(
2146                self._dn,
2147                ldap0.SCOPE_BASE,
2148                filterstr='(objectClass=*)',
2149                attrlist=['1.1'],
2150                req_ctrls=[deref_crtl],
2151            )[0]
2152        except (
2153                ldap0.NO_SUCH_OBJECT,
2154                ldap0.CONSTRAINT_VIOLATION,
2155                ldap0.INSUFFICIENT_ACCESS,
2156                ldap0.INVALID_DN_SYNTAX,
2157                ldap0.REFERRAL,
2158            ):
2159            return None
2160        if ldap_result is None or not ldap_result.ctrls:
2161            return None
2162        for ref in ldap_result.ctrls[0].derefRes[self._at]:
2163            if ref.dn_s == dn:
2164                return ref.entry_s
2165        return None
2166
2167
2168class Boolean(SelectList, IA5String):
2169    """
2170    Plugin class for LDAP syntax 'Boolean'
2171    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.3)
2172    """
2173    oid: str = '1.3.6.1.4.1.1466.115.121.1.7'
2174    desc: str = 'Boolean'
2175    attr_value_dict: Dict[str, str] = {
2176        'TRUE': 'TRUE',
2177        'FALSE': 'FALSE',
2178    }
2179
2180    def get_attr_value_dict(self) -> Dict[str, str]:
2181        attr_value_dict: Dict[str, str] = SelectList.get_attr_value_dict(self)
2182        if self._av and self._av.lower() == self._av:
2183            for key, val in attr_value_dict.items():
2184                del attr_value_dict[key]
2185                attr_value_dict[key.lower()] = val.lower()
2186        return attr_value_dict
2187
2188    def _validate(self, attr_value: bytes) -> bool:
2189        if not self._av and attr_value.lower() == attr_value:
2190            return SelectList._validate(self, attr_value.upper())
2191        return SelectList._validate(self, attr_value)
2192
2193    def display(self, vidx, links) -> str:
2194        return IA5String.display(self, vidx, links)
2195
2196
2197class CountryString(PropertiesSelectList):
2198    """
2199    Plugin class for LDAP syntax 'Country String'
2200    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.4)
2201    """
2202    oid: str = '1.3.6.1.4.1.1466.115.121.1.11'
2203    desc: str = 'Two letter country string as listed in ISO 3166-2'
2204    properties_pathname = os.path.join(
2205        ETC_DIR, 'properties', 'attribute_select_c.properties'
2206    )
2207    sani_funcs = (
2208        bytes.strip,
2209    )
2210
2211
2212class DeliveryMethod(PrintableString):
2213    """
2214    Plugin class for LDAP syntax 'Delivery Method'
2215    (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.5)
2216    """
2217    oid: str = '1.3.6.1.4.1.1466.115.121.1.14'
2218    desc: str = 'Delivery Method'
2219    pdm = '(any|mhs|physical|telex|teletex|g3fax|g4fax|ia5|videotex|telephone)'
2220    pattern = re.compile('^%s[ $]*%s$' % (pdm, pdm))
2221
2222
2223class BitArrayInteger(MultilineText, Integer):
2224    """
2225    Plugin class for attributes with Integer syntax where the integer
2226    value is interpreted as binary flags
2227    """
2228    oid: str = 'BitArrayInteger-oid'
2229    flag_desc_table: Sequence[Tuple[str, int]] = tuple()
2230    true_false_desc: Dict[bool, str] = {
2231        False: '-',
2232        True: '+',
2233    }
2234
2235    def __init__(self, app, dn: str, schema, attrType: str, attr_value: bytes, entry=None):
2236        Integer.__init__(self, app, dn, schema, attrType, attr_value, entry)
2237        self.flag_desc2int = dict(self.flag_desc_table)
2238        self.flag_int2desc = {
2239            j: i
2240            for i, j in self.flag_desc_table
2241        }
2242        self.max_value = sum([j for i, j in self.flag_desc_table])
2243        self.min_input_rows = self.max_input_rows = max(len(self.flag_desc_table), 1)
2244
2245    def sanitize(self, attr_value: bytes) -> bytes:
2246        try:
2247            av_u = attr_value.decode('ascii')
2248        except UnicodeDecodeError:
2249            return attr_value
2250        try:
2251            result = int(av_u)
2252        except ValueError:
2253            result = 0
2254            for row in av_u.split('\n'):
2255                row = row.strip()
2256                try:
2257                    flag_set, flag_desc = row[0:1], row[1:]
2258                except IndexError:
2259                    pass
2260                else:
2261                    if flag_set == '+':
2262                        try:
2263                            result = result | self.flag_desc2int[flag_desc]
2264                        except KeyError:
2265                            pass
2266        return str(result).encode('ascii')
2267
2268    def form_value(self) -> str:
2269        attr_value_int = int(self.av_u or 0)
2270        flag_lines = [
2271            ''.join((
2272                self.true_false_desc[int((attr_value_int & flag_int) > 0)],
2273                flag_desc
2274            ))
2275            for flag_desc, flag_int in self.flag_desc_table
2276        ]
2277        return '\r\n'.join(flag_lines)
2278
2279    def input_field(self) -> web_forms.Field:
2280        fval = self.form_value()
2281        return web_forms.Textarea(
2282            self._at,
2283            ': '.join([self._at, self.desc]),
2284            self.max_len, self.max_values,
2285            None,
2286            default=fval,
2287            rows=max(self.min_input_rows, min(self.max_input_rows, fval.count('\n'))),
2288            cols=max([len(desc) for desc, _ in self.flag_desc_table])+1
2289        )
2290
2291    def display(self, vidx, links) -> str:
2292        av_i = int(self._av)
2293        return (
2294            '%s<br>'
2295            '<table summary="Flags">'
2296            '<tr><th>Property flag</th><th>Value</th><th>Status</th></tr>'
2297            '%s'
2298            '</table>'
2299        ) % (
2300            Integer.display(self, vidx, links),
2301            '\n'.join([
2302                '<tr><td>%s</td><td>%s</td><td>%s</td></tr>' % (
2303                    self._app.form.s2d(desc),
2304                    hex(flag_value),
2305                    {False: '-', True: 'on'}[(av_i & flag_value) > 0]
2306                )
2307                for desc, flag_value in self.flag_desc_table
2308            ])
2309        )
2310
2311
2312class GSER(DirectoryString):
2313    """
2314    Generic String Encoding Rules (GSER) for ASN.1 Types (see RFC 3641)
2315    """
2316    oid: str = 'GSER-oid'
2317    desc: str = 'GSER syntax (see RFC 3641)'
2318
2319
2320class UUID(IA5String):
2321    """
2322    Plugin class for Universally Unique IDentifier (UUID), see RFC 4122
2323    """
2324    oid: str = '1.3.6.1.1.16.1'
2325    desc: str = 'UUID'
2326    pattern = re.compile(
2327        '^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$'
2328    )
2329
2330    def sanitize(self, attr_value: bytes) -> bytes:
2331        try:
2332            return str(uuid.UUID(attr_value.decode('ascii').replace(':', ''))).encode('ascii')
2333        except ValueError:
2334            return attr_value
2335
2336
2337class DNSDomain(IA5String):
2338    """
2339    Plugin class for fully-qualified DNS domain names
2340    """
2341    oid: str = 'DNSDomain-oid'
2342    desc: str = 'DNS domain name (see RFC 1035)'
2343    pattern = re.compile(r'^(\*|[a-zA-Z0-9_-]+)(\.[a-zA-Z0-9_-]+)*$')
2344    # see https://datatracker.ietf.org/doc/html/rfc2181#section-11
2345    max_len: int = min(255, IA5String.max_len)
2346    sani_funcs = (
2347        bytes.lower,
2348        bytes.strip,
2349    )
2350
2351    def sanitize(self, attr_value: bytes) -> bytes:
2352        attr_value = IA5String.sanitize(self, attr_value)
2353        return b'.'.join([
2354            dc.encode('idna')
2355            for dc in attr_value.decode(self._app.form.accept_charset).split('.')
2356        ])
2357
2358    def form_value(self) -> str:
2359        try:
2360            result = '.'.join([
2361                dc.decode('idna')
2362                for dc in (self._av or b'').split(b'.')
2363            ])
2364        except UnicodeDecodeError:
2365            result = '!!!snipped because of UnicodeDecodeError!!!'
2366        return result
2367
2368    def display(self, vidx, links) -> str:
2369        if self.av_u != self._av.decode('idna'):
2370            return '%s (%s)' % (
2371                IA5String.display(self, vidx, links),
2372                self._app.form.s2d(self.form_value())
2373            )
2374        return IA5String.display(self, vidx, links)
2375
2376
2377class RFC822Address(DNSDomain, IA5String):
2378    """
2379    Plugin class for RFC 822 addresses
2380    """
2381    oid: str = 'RFC822Address-oid'
2382    desc: str = 'RFC 822 mail address'
2383    pattern = re.compile(r'^[\w@.+=/_ ()-]+@[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)*$')
2384    html_tmpl = '<a href="mailto:{av}">{av}</a>'
2385
2386    def __init__(self, app, dn: str, schema, attrType: str, attr_value: bytes, entry=None):
2387        IA5String.__init__(self, app, dn, schema, attrType, attr_value, entry)
2388
2389    def form_value(self) -> str:
2390        if not self._av:
2391            return IA5String.form_value(self)
2392        try:
2393            localpart, domainpart = self._av.rsplit(b'@')
2394        except ValueError:
2395            return IA5String.form_value(self)
2396        dns_domain = DNSDomain(self._app, self._dn, self._schema, None, domainpart)
2397        return '@'.join((
2398            localpart.decode(self._app.ls.charset),
2399            dns_domain.form_value()
2400        ))
2401
2402    def sanitize(self, attr_value: bytes) -> bytes:
2403        try:
2404            localpart, domainpart = attr_value.rsplit(b'@')
2405        except ValueError:
2406            return attr_value
2407        else:
2408            return b'@'.join((
2409                localpart,
2410                DNSDomain.sanitize(self, domainpart)
2411            ))
2412
2413
2414class DomainComponent(DNSDomain):
2415    """
2416    Plugin class for a single DNS label
2417    (see https://datatracker.ietf.org/doc/html/rfc2181#section-11)
2418    """
2419    oid: str = 'DomainComponent-oid'
2420    desc: str = 'DNS domain name component'
2421    pattern = re.compile(r'^(\*|[a-zA-Z0-9_-]+)$')
2422    max_len: int = min(63, DNSDomain.max_len)
2423
2424
2425class JSONValue(PreformattedMultilineText):
2426    """
2427    Plugin class used for JSON data (see RFC 8259)
2428    """
2429    oid: str = 'JSONValue-oid'
2430    desc: str = 'JSON data'
2431    lineSep = b'\n'
2432    mime_type: str = 'application/json'
2433
2434    def _validate(self, attr_value: bytes) -> bool:
2435        try:
2436            json.loads(attr_value)
2437        except ValueError:
2438            return False
2439        return True
2440
2441    def _split_lines(self, value):
2442        try:
2443            obj = json.loads(value)
2444        except ValueError:
2445            return PreformattedMultilineText._split_lines(self, value)
2446        return PreformattedMultilineText._split_lines(
2447            self,
2448            json.dumps(
2449                obj,
2450                indent=4,
2451                separators=(',', ': ')
2452            ).encode('utf-8')
2453        )
2454
2455    def sanitize(self, attr_value: bytes) -> bytes:
2456        try:
2457            obj = json.loads(attr_value)
2458        except ValueError:
2459            return PreformattedMultilineText.sanitize(self, attr_value)
2460        return json.dumps(
2461            obj,
2462            separators=(',', ':')
2463        ).encode('utf-8')
2464
2465
2466class XmlValue(PreformattedMultilineText):
2467    """
2468    Plugin class used for XML data
2469    """
2470    oid: str = 'XmlValue-oid'
2471    desc: str = 'XML data'
2472    lineSep = b'\n'
2473    mime_type: str = 'text/xml'
2474
2475    def _validate(self, attr_value: bytes) -> bool:
2476        if not DEFUSEDXML_AVAIL:
2477            return PreformattedMultilineText._validate(self, attr_value)
2478        try:
2479            defusedxml.ElementTree.XML(attr_value)
2480        except defusedxml.ElementTree.ParseError:
2481            return False
2482        return True
2483
2484
2485class ASN1Object(Binary):
2486    """
2487    Plugin class used for BER-encoded ASN.1 data
2488    """
2489    oid: str = 'ASN1Object-oid'
2490    desc: str = 'BER-encoded ASN.1 data'
2491
2492
2493class AlgorithmOID(OID):
2494    """
2495    This base-class class is used for OIDs of cryptographic algorithms
2496    """
2497    oid: str = 'AlgorithmOID-oid'
2498
2499
2500class HashAlgorithmOID(SelectList, AlgorithmOID):
2501    """
2502    Plugin class for selection of OIDs for hash algorithms
2503    (see https://www.iana.org/assignments/hash-function-text-names/).
2504    """
2505    oid: str = 'HashAlgorithmOID-oid'
2506    desc: str = 'values from https://www.iana.org/assignments/hash-function-text-names/'
2507    attr_value_dict: Dict[str, str] = {
2508        '1.2.840.113549.2.2': 'md2',          # [RFC3279]
2509        '1.2.840.113549.2.5': 'md5',          # [RFC3279]
2510        '1.3.14.3.2.26': 'sha-1',             # [RFC3279]
2511        '2.16.840.1.101.3.4.2.4': 'sha-224',  # [RFC4055]
2512        '2.16.840.1.101.3.4.2.1': 'sha-256',  # [RFC4055]
2513        '2.16.840.1.101.3.4.2.2': 'sha-384',  # [RFC4055]
2514        '2.16.840.1.101.3.4.2.3': 'sha-512',  # [RFC4055]
2515    }
2516
2517
2518class HMACAlgorithmOID(SelectList, AlgorithmOID):
2519    """
2520    Plugin class for selection of OIDs for HMAC algorithms (see RFC 8018).
2521    """
2522    oid: str = 'HMACAlgorithmOID-oid'
2523    desc: str = 'values from RFC 8018'
2524    attr_value_dict: Dict[str, str] = {
2525        # from RFC 8018
2526        '1.2.840.113549.2.7': 'hmacWithSHA1',
2527        '1.2.840.113549.2.8': 'hmacWithSHA224',
2528        '1.2.840.113549.2.9': 'hmacWithSHA256',
2529        '1.2.840.113549.2.10': 'hmacWithSHA384',
2530        '1.2.840.113549.2.11': 'hmacWithSHA512',
2531    }
2532
2533
2534class ComposedAttribute(LDAPSyntax):
2535    """
2536    This mix-in plugin class composes attribute values from other attribute values.
2537
2538    One can define an ordered sequence of string templates in class
2539    attribute ComposedDirectoryString.compose_templates.
2540    See examples in module web2ldap.app.plugins.inetorgperson.
2541
2542    Obviously this only works for single-valued attributes,
2543    more precisely only the "first" attribute value is used.
2544    """
2545    oid: str = 'ComposedDirectoryString-oid'
2546    compose_templates: Sequence[str] = ()
2547
2548    class SingleValueDict(dict):
2549        """
2550        dictionary-like class which only stores and returns the
2551        first value of an attribute value list
2552        """
2553
2554        def __init__(self, entry, encoding):
2555            dict.__init__(self)
2556            self._encoding = encoding
2557            entry = entry or {}
2558            for key, val in entry.items():
2559                self.__setitem__(key, val)
2560
2561        def __setitem__(self, key, val):
2562            if val and val[0]:
2563                dict.__setitem__(self, key, val[0].decode(self._encoding))
2564
2565    def form_value(self) -> str:
2566        """
2567        Return a dummy value that attribute is returned from input form and
2568        then seen by .transmute()
2569        """
2570        return ''
2571
2572    def transmute(self, attr_values: List[bytes]) -> List[bytes]:
2573        """
2574        always returns a list with a single value based on the first
2575        successfully applied compose template
2576        """
2577        entry = self.SingleValueDict(self._entry, encoding=self._app.ls.charset)
2578        for template in self.compose_templates:
2579            try:
2580                attr_values = [template.format(**entry).encode(self._app.ls.charset)]
2581            except KeyError:
2582                continue
2583            else:
2584                break
2585        else:
2586            return attr_values
2587        return attr_values
2588
2589    def input_field(self) -> web_forms.Field:
2590        """
2591        composed attributes must only have hidden input field
2592        """
2593        input_field = web_forms.HiddenInput(
2594            self._at,
2595            ': '.join([self._at, self.desc]),
2596            self.max_len,
2597            self.max_values,
2598            None,
2599            default=self.form_value(),
2600        )
2601        input_field.charset = self._app.form.accept_charset
2602        return input_field
2603
2604
2605class LDAPv3ResultCode(SelectList):
2606    """
2607    Plugin base class for attributes with Integer syntax
2608    constrained to valid LDAP result code.
2609    """
2610    oid: str = 'LDAPResultCode-oid'
2611    desc: str = 'LDAPv3 declaration of resultCode in (see RFC 4511)'
2612    attr_value_dict: Dict[str, str] = {
2613        '0': 'success',
2614        '1': 'operationsError',
2615        '2': 'protocolError',
2616        '3': 'timeLimitExceeded',
2617        '4': 'sizeLimitExceeded',
2618        '5': 'compareFalse',
2619        '6': 'compareTrue',
2620        '7': 'authMethodNotSupported',
2621        '8': 'strongerAuthRequired',
2622        '9': 'reserved',
2623        '10': 'referral',
2624        '11': 'adminLimitExceeded',
2625        '12': 'unavailableCriticalExtension',
2626        '13': 'confidentialityRequired',
2627        '14': 'saslBindInProgress',
2628        '16': 'noSuchAttribute',
2629        '17': 'undefinedAttributeType',
2630        '18': 'inappropriateMatching',
2631        '19': 'constraintViolation',
2632        '20': 'attributeOrValueExists',
2633        '21': 'invalidAttributeSyntax',
2634        '32': 'noSuchObject',
2635        '33': 'aliasProblem',
2636        '34': 'invalidDNSyntax',
2637        '35': 'reserved for undefined isLeaf',
2638        '36': 'aliasDereferencingProblem',
2639        '48': 'inappropriateAuthentication',
2640        '49': 'invalidCredentials',
2641        '50': 'insufficientAccessRights',
2642        '51': 'busy',
2643        '52': 'unavailable',
2644        '53': 'unwillingToPerform',
2645        '54': 'loopDetect',
2646        '64': 'namingViolation',
2647        '65': 'objectClassViolation',
2648        '66': 'notAllowedOnNonLeaf',
2649        '67': 'notAllowedOnRDN',
2650        '68': 'entryAlreadyExists',
2651        '69': 'objectClassModsProhibited',
2652        '70': 'reserved for CLDAP',
2653        '71': 'affectsMultipleDSAs',
2654        '80': 'other',
2655    }
2656
2657
2658class SchemaDescription(DirectoryString):
2659    oid: str = 'SchemaDescription-oid'
2660    schema_cls = None
2661    sani_funcs = (
2662        bytes.strip,
2663    )
2664
2665    def _validate(self, attr_value: bytes) -> bool:
2666        if self.schema_cls is None:
2667            return DirectoryString._validate(self, attr_value)
2668        try:
2669            _ = self.schema_cls(self._app.ls.uc_decode(attr_value)[0])
2670        except (IndexError, ValueError):
2671            return False
2672        return True
2673
2674
2675class ObjectClassDescription(SchemaDescription):
2676    oid: str = '1.3.6.1.4.1.1466.115.121.1.37'
2677    schema_cls = ldap0.schema.models.ObjectClass
2678
2679
2680class AttributeTypeDescription(SchemaDescription):
2681    oid: str = '1.3.6.1.4.1.1466.115.121.1.3'
2682    schema_cls = ldap0.schema.models.AttributeType
2683
2684
2685class MatchingRuleDescription(SchemaDescription):
2686    oid: str = '1.3.6.1.4.1.1466.115.121.1.30'
2687    schema_cls = ldap0.schema.models.MatchingRule
2688
2689
2690class MatchingRuleUseDescription(SchemaDescription):
2691    oid: str = '1.3.6.1.4.1.1466.115.121.1.31'
2692    schema_cls = ldap0.schema.models.MatchingRuleUse
2693
2694
2695class LDAPSyntaxDescription(SchemaDescription):
2696    oid: str = '1.3.6.1.4.1.1466.115.121.1.54'
2697    schema_cls = ldap0.schema.models.LDAPSyntax
2698
2699
2700class DITContentRuleDescription(SchemaDescription):
2701    oid: str = '1.3.6.1.4.1.1466.115.121.1.16'
2702    schema_cls = ldap0.schema.models.DITContentRule
2703
2704
2705class DITStructureRuleDescription(SchemaDescription):
2706    oid: str = '1.3.6.1.4.1.1466.115.121.1.17'
2707    schema_cls = ldap0.schema.models.DITStructureRule
2708
2709
2710class NameFormDescription(SchemaDescription):
2711    oid: str = '1.3.6.1.4.1.1466.115.121.1.35'
2712    schema_cls = ldap0.schema.models.NameForm
2713
2714
2715# Set up the central syntax registry instance
2716syntax_registry = SyntaxRegistry()
2717
2718# Register all syntax classes in this module
2719syntax_registry.reg_syntaxes(__name__)
2720