1# -*- coding: ascii -*- 2""" 3web2ldap.app.schema.syntaxes: classes for known attribute types 4 5web2ldap - a web-based LDAP Client, 6see https://www.web2ldap.de for details 7 8(c) 1998-2021 by Michael Stroeder <michael@stroeder.com> 9 10This software is distributed under the terms of the 11Apache License Version 2.0 (Apache-2.0) 12https://www.apache.org/licenses/LICENSE-2.0 13""" 14 15import binascii 16import sys 17import os 18import re 19import imghdr 20import sndhdr 21import urllib.parse 22import uuid 23import datetime 24import time 25import json 26import inspect 27import warnings 28from typing import ( 29 Callable, 30 Dict, 31 List, 32 Optional, 33 Pattern, 34 Sequence, 35 Tuple, 36) 37 38try: 39 import defusedxml.ElementTree 40except ImportError: 41 DEFUSEDXML_AVAIL = False 42else: 43 DEFUSEDXML_AVAIL = True 44 45from collections import defaultdict 46from io import BytesIO 47 48# Detect Python Imaging Library (PIL) 49try: 50 from PIL import Image as PILImage 51except ImportError: 52 PIL_AVAIL = False 53else: 54 PIL_AVAIL = True 55 warnings.simplefilter('error', PILImage.DecompressionBombWarning) 56 57import ipaddress 58 59import ldap0 60import ldap0.ldapurl 61from ldap0.schema.models import AttributeType, ObjectClass, OBJECTCLASS_KIND_STR 62from ldap0.controls.deref import DereferenceControl 63from ldap0.dn import DNObj, is_dn 64from ldap0.res import SearchResultEntry 65from ldap0.schema.subentry import SubSchema 66 67import web2ldapcnf 68 69from ... import ETC_DIR 70from ...web import forms as web_forms 71from ...msbase import ascii_dump, chunks 72from ...utctime import repr2ts, ts2repr, strftimeiso8601 73from ...ldaputil.oidreg import OID_REG 74from ...log import logger 75from ... import cmp 76from . import schema_anchor 77from ..tmpl import get_variant_filename 78from ...utctime import strptime as utc_strptime 79from ..searchform import ( 80 SEARCH_OPT_ATTR_EXISTS, 81 SEARCH_OPT_IS_EQUAL, 82 SEARCH_SCOPE_STR_ONELEVEL, 83) 84 85 86class SyntaxRegistry: 87 """ 88 syntax registry used to register plugin classes 89 """ 90 __slots__ = ( 91 'at2syntax', 92 'oid2syntax', 93 ) 94 95 def __init__(self): 96 self.oid2syntax = ldap0.cidict.CIDict() 97 self.at2syntax = defaultdict(dict) 98 99 def reg_syntax(self, cls): 100 """ 101 register a syntax classes for an OID 102 """ 103 assert isinstance(cls.oid, str), ValueError( 104 'Expected %s.oid to be str, got %r' % (cls.__name__, cls.oid,) 105 ) 106 logger.debug('Register syntax class %r with OID %r', cls.__name__, cls.oid) 107 # FIX ME! 108 # A better approach for unique syntax plugin class registration which 109 # allows overriding older registration is needed. 110 if cls.oid in self.oid2syntax and cls != self.oid2syntax[cls.oid]: 111 raise ValueError( 112 ( 113 'Failed to register syntax class %s.%s with OID %s,' 114 ' already registered by %s.%s' 115 ) % ( 116 cls.__module__, 117 cls.__name__, 118 repr(cls.oid), 119 self.oid2syntax[cls.oid].__module__, 120 self.oid2syntax[cls.oid].__name__, 121 ) 122 ) 123 self.oid2syntax[cls.oid] = cls 124 125 def reg_syntaxes(self, modulename): 126 """ 127 register all syntax classes found in given module 128 """ 129 logger.debug('Register syntax classes from module %r', modulename) 130 for _, cls in inspect.getmembers(sys.modules[modulename], inspect.isclass): 131 if issubclass(cls, LDAPSyntax) and hasattr(cls, 'oid'): 132 self.reg_syntax(cls) 133 134 def reg_at(self, syntax_oid: str, attr_types, structural_oc_oids=None): 135 """ 136 register an attribute type (by OID) to explicitly use a certain LDAPSyntax class 137 """ 138 logger.debug( 139 'Register syntax OID %s for %r / %r', 140 syntax_oid, 141 attr_types, 142 structural_oc_oids, 143 ) 144 assert isinstance(syntax_oid, str), ValueError( 145 'Expected syntax_oid to be str, got %r' % (syntax_oid,) 146 ) 147 structural_oc_oids = list(filter(None, map(str.strip, structural_oc_oids or []))) or [None] 148 for atype in attr_types: 149 atype = atype.strip() 150 for oc_oid in structural_oc_oids: 151 # FIX ME! 152 # A better approach for unique attribute type registration which 153 # allows overriding older registration is needed. 154 if atype in self.at2syntax and oc_oid in self.at2syntax[atype]: 155 logger.warning( 156 ( 157 'Registering attribute type %r with syntax %r' 158 ' overrides existing registration with syntax %r' 159 ), 160 atype, 161 syntax_oid, 162 self.at2syntax[atype], 163 ) 164 self.at2syntax[atype][oc_oid] = syntax_oid 165 166 def get_syntax(self, schema, attrtype_nameoroid, structural_oc): 167 """ 168 returns LDAPSyntax class for given attribute type 169 """ 170 assert isinstance(attrtype_nameoroid, str), ValueError( 171 'Expected attrtype_nameoroid to be str, got %r' % (attrtype_nameoroid,) 172 ) 173 assert structural_oc is None or isinstance(structural_oc, str), ValueError( 174 'Expected structural_oc to be str or None, got %r' % (structural_oc,) 175 ) 176 attrtype_oid = schema.get_oid(AttributeType, attrtype_nameoroid) 177 if structural_oc: 178 structural_oc_oid = schema.get_oid(ObjectClass, structural_oc) 179 else: 180 structural_oc_oid = None 181 syntax_oid = LDAPSyntax.oid 182 try: 183 syntax_oid = self.at2syntax[attrtype_oid][structural_oc_oid] 184 except KeyError: 185 try: 186 syntax_oid = self.at2syntax[attrtype_oid][None] 187 except KeyError: 188 attrtype_se = schema.get_inheritedobj( 189 AttributeType, 190 attrtype_oid, 191 ['syntax'], 192 ) 193 if attrtype_se and attrtype_se.syntax: 194 syntax_oid = attrtype_se.syntax 195 try: 196 syntax_class = self.oid2syntax[syntax_oid] 197 except KeyError: 198 syntax_class = LDAPSyntax 199 return syntax_class 200 201 def get_at(self, app, dn, schema, attr_type, attr_value, entry=None): 202 """ 203 returns LDAPSyntax instance fully initialized for given attribute 204 """ 205 if entry: 206 structural_oc = entry.get_structural_oc() 207 else: 208 structural_oc = None 209 syntax_class = self.get_syntax(schema, attr_type, structural_oc) 210 attr_instance = syntax_class(app, dn, schema, attr_type, attr_value, entry) 211 return attr_instance 212 213 def check(self): 214 """ 215 check whether attribute registry dict contains references by OID 216 for which no LDAPSyntax class are registered 217 """ 218 logger.debug( 219 'Checking %d LDAPSyntax classes and %d attribute type mappings', 220 len(self.oid2syntax), 221 len(self.at2syntax), 222 ) 223 for atype in self.at2syntax: 224 for object_class in self.at2syntax[atype]: 225 if self.at2syntax[atype][object_class] not in self.oid2syntax: 226 logger.warning('No LDAPSyntax registered for (%r, %r)', atype, object_class) 227 228 229#################################################################### 230# Classes of known syntaxes 231#################################################################### 232 233 234class LDAPSyntaxValueError(ValueError): 235 """ 236 Exception raised in case a syntax check failed 237 """ 238 239 240class LDAPSyntaxRegexNoMatch(LDAPSyntaxValueError): 241 """ 242 Exception raised in case a regex pattern check failed 243 """ 244 245 246class LDAPSyntax: 247 """ 248 Base class for all LDAP syntax and attribute value plugin classes 249 """ 250 __slots__ = ( 251 '_app', 252 '_at', 253 '_av', 254 '_av_u', 255 '_dn', 256 '_entry', 257 '_schema', 258 ) 259 oid: str = '' 260 desc: str = 'Any LDAP syntax' 261 input_size: int = 50 262 max_len: int = web2ldapcnf.input_maxfieldlen 263 max_values: int = web2ldapcnf.input_maxattrs 264 mime_type: str = 'application/octet-stream' 265 file_ext: str = 'bin' 266 editable: bool = True 267 pattern: Optional[Pattern[str]] = None 268 input_pattern: Optional[str] = None 269 search_sep: str = '<br>' 270 read_sep: str = '<br>' 271 field_sep: str = '<br>' 272 sani_funcs: Sequence[Callable] = (()) 273 show_val_button: bool = True 274 275 def __init__( 276 self, 277 app, 278 dn: Optional[str], 279 schema: SubSchema, 280 attrType: Optional[str], 281 attr_value: Optional[bytes], 282 entry=None, 283 ): 284 if not entry: 285 entry = ldap0.schema.models.Entry(schema, dn, {}) 286 assert isinstance(dn, str), \ 287 TypeError("Argument 'dn' must be str, was %r" % (dn,)) 288 assert isinstance(attrType, str) or attrType is None, \ 289 TypeError("Argument 'attrType' must be str or None, was %r" % (attrType,)) 290 assert isinstance(attr_value, bytes) or attr_value is None, \ 291 TypeError("Argument 'attr_value' must be bytes or None, was %r" % (attr_value,)) 292 assert entry is None or isinstance(entry, ldap0.schema.models.Entry), \ 293 TypeError('entry must be ldaputil.schema.Entry, was %r' % (entry,)) 294 self._at = attrType 295 self._av = attr_value 296 self._av_u = None 297 self._app = app 298 self._schema = schema 299 self._dn = dn 300 self._entry = entry 301 302 @property 303 def dn(self): 304 return DNObj.from_str(self._dn) 305 306 @property 307 def av_u(self): 308 if (self._av is not None and self._av_u is None): 309 self._av_u = self._app.ls.uc_decode(self._av)[0] 310 return self._av_u 311 312 def sanitize(self, attr_value: bytes) -> bytes: 313 """ 314 Transforms the HTML form input field values into LDAP string 315 representations and returns raw binary string. 316 317 This is the inverse of LDAPSyntax.form_value(). 318 319 When using this method one MUST NOT assume that the whole entry is 320 present. 321 """ 322 for sani_func in self.sani_funcs: 323 attr_value = sani_func(attr_value) 324 return attr_value 325 326 def transmute(self, attr_values: List[bytes]) -> List[bytes]: 327 """ 328 This method can be implemented to transmute attribute values and has 329 to handle LDAP string representations (raw binary strings). 330 331 This method has access to the whole entry after processing all input. 332 333 Implementors should be prepared that this method could be called 334 more than once. If there's nothing to change then simply return the 335 same value list. 336 337 Exceptions KeyError or IndexError are caught by the calling code to 338 re-iterate invoking this method. 339 """ 340 return attr_values 341 342 def _validate(self, attr_value: bytes) -> bool: 343 """ 344 check the syntax of attr_value 345 346 Implementors can overload this method to apply arbitrary syntax checks. 347 """ 348 return True 349 350 def validate(self, attr_value: bytes): 351 if not attr_value: 352 return 353 if self.pattern and (self.pattern.match(attr_value.decode(self._app.ls.charset)) is None): 354 raise LDAPSyntaxRegexNoMatch( 355 "Class %s: %r does not match pattern %r." % ( 356 self.__class__.__name__, 357 attr_value, 358 self.pattern.pattern, 359 ) 360 ) 361 if not self._validate(attr_value): 362 raise LDAPSyntaxValueError( 363 "Class %s: %r does not comply to syntax (attr type %r)." % ( 364 self.__class__.__name__, 365 attr_value, 366 self._at, 367 ) 368 ) 369 # end of validate() 370 371 def value_button(self, command, row, mode, link_text=None) -> str: 372 """ 373 return HTML markup of [+] or [-] submit buttons for adding/removing 374 attribute values 375 376 row 377 row number in input table 378 mode 379 '+' or '-' 380 link_text 381 optionally override displayed link link_text 382 """ 383 link_text = link_text or mode 384 if ( 385 not self.show_val_button or 386 self.max_values <= 1 or 387 len(self._entry.get(self._at, [])) >= self.max_values 388 ): 389 return '' 390 se_obj = self._schema.get_obj(AttributeType, self._at) 391 if se_obj and se_obj.single_value: 392 return '' 393 return ( 394 '<button' 395 ' formaction="%s#in_a_%s"' 396 ' type="submit"' 397 ' name="in_mr"' 398 ' value="%s%d">%s' 399 '</button>' 400 ) % ( 401 self._app.form.action_url(command, self._app.sid), 402 self._app.form.s2d(self._at), 403 mode, row, link_text 404 ) 405 406 def form_value(self) -> str: 407 """ 408 Transform LDAP string representations to HTML form input field 409 values. Returns Unicode string to be encoded with the browser's 410 accepted charset. 411 412 This is the inverse of LDAPSyntax.sanitize(). 413 """ 414 try: 415 result = self.av_u or '' 416 except UnicodeDecodeError: 417 result = '!!!snipped because of UnicodeDecodeError!!!' 418 return result 419 420 def input_fields(self): 421 return (self.input_field(),) 422 423 def input_field(self) -> web_forms.Field: 424 input_field = web_forms.Input( 425 self._at, 426 ': '.join([self._at, self.desc]), 427 self.max_len, 428 self.max_values, 429 self.input_pattern, 430 default=None, 431 size=min(self.max_len, self.input_size), 432 ) 433 input_field.charset = self._app.form.accept_charset 434 input_field.set_default(self.form_value()) 435 return input_field 436 437 def display(self, vidx, links) -> str: 438 try: 439 res = self._app.form.s2d(self.av_u) 440 except UnicodeDecodeError: 441 res = self._app.form.s2d(repr(self._av)) 442 return res 443 444 445class Binary(LDAPSyntax): 446 """ 447 Plugin class for LDAP syntax 'Binary' (see RFC 2252) 448 """ 449 oid: str = '1.3.6.1.4.1.1466.115.121.1.5' 450 desc: str = 'Binary' 451 editable: bool = False 452 453 def input_field(self) -> web_forms.Field: 454 field = web_forms.File( 455 self._at, 456 ': '.join([self._at, self.desc]), 457 self.max_len, self.max_values, None, default=self._av, size=50 458 ) 459 field.mime_type = self.mime_type 460 return field 461 462 def display(self, vidx, links) -> str: 463 return '%d bytes | %s' % ( 464 len(self._av), 465 self._app.anchor( 466 'read', 'View/Load', 467 [ 468 ('dn', self._dn), 469 ('read_attr', self._at), 470 ('read_attrindex', str(vidx)), 471 ], 472 ) 473 ) 474 475 476class Audio(Binary): 477 """ 478 Plugin class for LDAP syntax 'Audio' (see RFC 2252) 479 """ 480 oid: str = '1.3.6.1.4.1.1466.115.121.1.4' 481 desc: str = 'Audio' 482 mime_type: str = 'audio/basic' 483 file_ext: str = 'au' 484 485 def _validate(self, attr_value: bytes) -> bool: 486 with BytesIO(attr_value) as fileobj: 487 res = sndhdr.test_au(attr_value, fileobj) 488 return res is not None 489 490 def display(self, vidx, links) -> str: 491 mimetype = self.mime_type 492 return ( 493 '<embed type="%s" autostart="false" ' 494 'src="%s/read/%s?dn=%s&read_attr=%s&read_attrindex=%d">' 495 '%d bytes of audio data (%s)' 496 ) % ( 497 mimetype, 498 self._app.form.script_name, self._app.sid, 499 urllib.parse.quote(self._dn.encode(self._app.form.accept_charset)), 500 urllib.parse.quote(self._at), 501 vidx, 502 len(self._av), 503 mimetype 504 ) 505 506 507class DirectoryString(LDAPSyntax): 508 """ 509 Plugin class for LDAP syntax 'Directory String' 510 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.6) 511 """ 512 oid: str = '1.3.6.1.4.1.1466.115.121.1.15' 513 desc: str = 'Directory String' 514 html_tmpl = '{av}' 515 516 def _validate(self, attr_value: bytes) -> bool: 517 try: 518 self._app.ls.uc_decode(attr_value) 519 except UnicodeDecodeError: 520 return False 521 return True 522 523 def display(self, vidx, links) -> str: 524 return self.html_tmpl.format( 525 av=self._app.form.s2d(self.av_u) 526 ) 527 528 529class DistinguishedName(DirectoryString): 530 """ 531 Plugin class for LDAP syntax 'DN' 532 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.9) 533 """ 534 oid: str = '1.3.6.1.4.1.1466.115.121.1.12' 535 desc: str = 'Distinguished Name' 536 isBindDN = False 537 hasSubordinates = False 538 ref_attrs: Optional[Sequence[Tuple[Optional[str], str, Optional[str], str]]] = None 539 540 def _validate(self, attr_value: bytes) -> bool: 541 return is_dn(self._app.ls.uc_decode(attr_value)[0]) 542 543 def _additional_links(self): 544 res = [] 545 if self._at.lower() != 'entrydn': 546 res.append( 547 self._app.anchor( 548 'read', 'Read', 549 [('dn', self.av_u)], 550 ) 551 ) 552 if self.hasSubordinates: 553 res.append(self._app.anchor( 554 'search', 'Down', 555 ( 556 ('dn', self.av_u), 557 ('scope', SEARCH_SCOPE_STR_ONELEVEL), 558 ('filterstr', '(objectClass=*)'), 559 ) 560 )) 561 if self.isBindDN: 562 ldap_url_obj = self._app.ls.ldap_url('', add_login=False) 563 res.append( 564 self._app.anchor( 565 'login', 566 'Bind as', 567 [ 568 ('ldapurl', str(ldap_url_obj)), 569 ('dn', self._dn), 570 ('login_who', self.av_u), 571 ], 572 title='Connect and bind new session as\r\n%s' % (self.av_u) 573 ), 574 ) 575 # If self.ref_attrs is not empty then add links for searching back-linking entries 576 for ref_attr_tuple in self.ref_attrs or tuple(): 577 try: 578 ref_attr, ref_text, ref_dn, ref_oc, ref_title = ref_attr_tuple 579 except ValueError: 580 ref_oc = None 581 ref_attr, ref_text, ref_dn, ref_title = ref_attr_tuple 582 ref_attr = ref_attr or self._at 583 ref_dn = ref_dn or self._dn 584 ref_title = ref_title or 'Search %s entries referencing entry %s in attribute %s' % ( 585 ref_oc, self.av_u, ref_attr, 586 ) 587 res.append(self._app.anchor( 588 'search', self._app.form.s2d(ref_text), 589 ( 590 ('dn', ref_dn), 591 ('search_root', str(self._app.naming_context)), 592 ('searchform_mode', 'adv'), 593 ('search_attr', 'objectClass'), 594 ( 595 'search_option', 596 { 597 True: SEARCH_OPT_ATTR_EXISTS, 598 False: SEARCH_OPT_IS_EQUAL, 599 }[ref_oc is None] 600 ), 601 ('search_string', ref_oc or ''), 602 ('search_attr', ref_attr), 603 ('search_option', SEARCH_OPT_IS_EQUAL), 604 ('search_string', self.av_u), 605 ), 606 title=ref_title, 607 )) 608 return res 609 610 def display(self, vidx, links) -> str: 611 res = [self._app.form.s2d(self.av_u or '- World -')] 612 if links: 613 res.extend(self._additional_links()) 614 return web2ldapcnf.command_link_separator.join(res) 615 616 617class BindDN(DistinguishedName): 618 """ 619 Plugin class for DNs probably usable as bind-DN 620 """ 621 oid: str = 'BindDN-oid' 622 desc: str = 'A Distinguished Name used to bind to a directory' 623 isBindDN = True 624 625 626class AuthzDN(DistinguishedName): 627 """ 628 Plugin class for DNs used for authorization 629 """ 630 oid: str = 'AuthzDN-oid' 631 desc: str = 'Authz Distinguished Name' 632 633 def display(self, vidx, links) -> str: 634 result = DistinguishedName.display(self, vidx, links) 635 if links: 636 simple_display_str = DistinguishedName.display( 637 self, 638 vidx, 639 links=False, 640 ) 641 whoami_display_str = self._app.display_authz_dn(who=self.av_u) 642 if whoami_display_str != simple_display_str: 643 result = '<br>'.join((whoami_display_str, result)) 644 return result 645 646 647class NameAndOptionalUID(DistinguishedName): 648 """ 649 Plugin class for LDAP syntax 'Name and Optional UID' 650 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.21) 651 """ 652 oid: str = '1.3.6.1.4.1.1466.115.121.1.34' 653 desc: str = 'Name And Optional UID' 654 655 @staticmethod 656 def _split_dn_and_uid(val: str) -> Tuple[str, Optional[str]]: 657 try: 658 sep_ind = val.rindex('#') 659 except ValueError: 660 dn = val 661 uid = None 662 else: 663 dn = val[0:sep_ind] 664 uid = val[sep_ind+1:] 665 return dn, uid 666 667 def _validate(self, attr_value: bytes) -> bool: 668 dn, _ = self._split_dn_and_uid(self._app.ls.uc_decode(attr_value)[0]) 669 return is_dn(dn) 670 671 def display(self, vidx, links) -> str: 672 value = self.av_u.split('#') 673 dn_str = self._app.display_dn( 674 self.av_u, 675 links=links, 676 ) 677 if len(value) == 1 or not value[1]: 678 return dn_str 679 return web2ldapcnf.command_link_separator.join([ 680 self._app.form.s2d(value[1]), 681 dn_str, 682 ]) 683 684 685class BitString(DirectoryString): 686 """ 687 Plugin class for LDAP syntax 'Bit String' 688 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.2) 689 """ 690 oid: str = '1.3.6.1.4.1.1466.115.121.1.6' 691 desc: str = 'Bit String' 692 pattern = re.compile("^'[01]+'B$") 693 694 695class IA5String(DirectoryString): 696 """ 697 Plugin class for LDAP syntax 'IA5 String' 698 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.15) 699 """ 700 oid: str = '1.3.6.1.4.1.1466.115.121.1.26' 701 desc: str = 'IA5 String' 702 703 def _validate(self, attr_value: bytes) -> bool: 704 try: 705 _ = attr_value.decode('ascii').encode('ascii') 706 except UnicodeError: 707 return False 708 return True 709 710 711class GeneralizedTime(IA5String): 712 """ 713 Plugin class for LDAP syntax 'Generalized Time' 714 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.13) 715 """ 716 oid: str = '1.3.6.1.4.1.1466.115.121.1.24' 717 desc: str = 'Generalized Time' 718 input_size: int = 24 719 max_len: int = 24 720 pattern = re.compile(r'^([0-9]){12,14}((\.|,)[0-9]+)*(Z|(\+|-)[0-9]{4})$') 721 timeDefault = None 722 notBefore = None 723 notAfter = None 724 form_value_fmt = '%Y-%m-%dT%H:%M:%SZ' 725 dtFormats = ( 726 '%Y%m%d%H%M%SZ', 727 '%Y-%m-%dT%H:%M:%SZ', 728 '%Y-%m-%dT%H:%MZ', 729 '%Y-%m-%dT%H:%M:%S+00:00', 730 '%Y-%m-%dT%H:%M:%S-00:00', 731 '%Y-%m-%d %H:%M:%SZ', 732 '%Y-%m-%d %H:%MZ', 733 '%Y-%m-%d %H:%M', 734 '%Y-%m-%d %H:%M:%S+00:00', 735 '%Y-%m-%d %H:%M:%S-00:00', 736 '%d.%m.%YT%H:%M:%SZ', 737 '%d.%m.%YT%H:%MZ', 738 '%d.%m.%YT%H:%M:%S+00:00', 739 '%d.%m.%YT%H:%M:%S-00:00', 740 '%d.%m.%Y %H:%M:%SZ', 741 '%d.%m.%Y %H:%MZ', 742 '%d.%m.%Y %H:%M', 743 '%d.%m.%Y %H:%M:%S+00:00', 744 '%d.%m.%Y %H:%M:%S-00:00', 745 ) 746 acceptable_formats = ( 747 '%Y-%m-%d', 748 '%d.%m.%Y', 749 '%m/%d/%Y', 750 ) 751 dt_display_format = ( 752 '<time datetime="%Y-%m-%dT%H:%M:%SZ">' 753 '%A (%W. week) %Y-%m-%d %H:%M:%S+00:00' 754 '</time>' 755 ) 756 757 def _validate(self, attr_value: bytes) -> bool: 758 try: 759 d_t = utc_strptime(attr_value) 760 except ValueError: 761 return False 762 return ( 763 (self.notBefore is None or self.notBefore <= d_t) 764 and (self.notAfter is None or self.notAfter >= d_t) 765 ) 766 767 def form_value(self) -> str: 768 if not self._av: 769 return '' 770 try: 771 d_t = datetime.datetime.strptime(self.av_u, r'%Y%m%d%H%M%SZ') 772 except ValueError: 773 result = IA5String.form_value(self) 774 else: 775 result = str(datetime.datetime.strftime(d_t, self.form_value_fmt)) 776 return result 777 778 def sanitize(self, attr_value: bytes) -> bytes: 779 av_u = self._app.ls.uc_decode(attr_value.strip().upper())[0] 780 # Special cases first 781 if av_u in {'N', 'NOW', '0'}: 782 return datetime.datetime.strftime( 783 datetime.datetime.utcnow(), 784 r'%Y%m%d%H%M%SZ', 785 ).encode('ascii') 786 # a single integer value is interpreted as seconds relative to now 787 try: 788 float_val = float(av_u) 789 except ValueError: 790 pass 791 else: 792 return datetime.datetime.strftime( 793 datetime.datetime.utcnow()+datetime.timedelta(seconds=float_val), 794 r'%Y%m%d%H%M%SZ', 795 ).encode('ascii') 796 if self.timeDefault: 797 date_format = r'%Y%m%d' + self.timeDefault + 'Z' 798 if av_u in ('T', 'TODAY'): 799 return datetime.datetime.strftime( 800 datetime.datetime.utcnow(), 801 date_format, 802 ).encode('ascii') 803 if av_u in ('Y', 'YESTERDAY'): 804 return datetime.datetime.strftime( 805 datetime.datetime.today()-datetime.timedelta(days=1), 806 date_format, 807 ).encode('ascii') 808 if av_u in ('T', 'TOMORROW'): 809 return datetime.datetime.strftime( 810 datetime.datetime.today()+datetime.timedelta(days=1), 811 date_format, 812 ).encode('ascii') 813 # Try to parse various datetime syntaxes 814 for time_format in self.dtFormats: 815 try: 816 d_t = datetime.datetime.strptime(av_u, time_format) 817 except ValueError: 818 result = None 819 else: 820 result = datetime.datetime.strftime(d_t, r'%Y%m%d%H%M%SZ') 821 break 822 if result is None: 823 if self.timeDefault: 824 for time_format in self.acceptable_formats or []: 825 try: 826 d_t = datetime.datetime.strptime(av_u, time_format) 827 except ValueError: 828 result = None 829 else: 830 result = datetime.datetime.strftime(d_t, r'%Y%m%d'+self.timeDefault+'Z') 831 break 832 else: 833 result = av_u 834 if result is None: 835 return IA5String.sanitize(self, attr_value) 836 return result.encode('ascii') 837 # end of GeneralizedTime.sanitize() 838 839 def display(self, vidx, links) -> str: 840 try: 841 dt_utc = utc_strptime(self.av_u) 842 except ValueError: 843 return IA5String.display(self, vidx, links) 844 try: 845 dt_utc_str = dt_utc.strftime(self.dt_display_format) 846 except ValueError: 847 return IA5String.display(self, vidx, links) 848 if not links: 849 return dt_utc_str 850 current_time = datetime.datetime.utcnow() 851 time_span = (current_time - dt_utc).total_seconds() 852 return '{dt_utc} ({av})<br>{timespan_disp} {timespan_comment}'.format( 853 dt_utc=dt_utc_str, 854 av=self._app.form.s2d(self.av_u), 855 timespan_disp=self._app.form.s2d( 856 ts2repr(Timespan.time_divisors, ' ', abs(time_span)) 857 ), 858 timespan_comment={ 859 1: 'ago', 860 0: '', 861 -1: 'ahead', 862 }[cmp(time_span, 0)] 863 ) 864 865 866class NotBefore(GeneralizedTime): 867 """ 868 Plugin class for attributes indicating start of a period 869 """ 870 oid: str = 'NotBefore-oid' 871 desc: str = 'A not-before timestamp by default starting at 00:00:00' 872 timeDefault = '000000' 873 874 875class NotAfter(GeneralizedTime): 876 """ 877 Plugin class for attributes indicating end of a period 878 """ 879 oid: str = 'NotAfter-oid' 880 desc: str = 'A not-after timestamp by default ending at 23:59:59' 881 timeDefault = '235959' 882 883 884class UTCTime(GeneralizedTime): 885 """ 886 Plugin class for LDAP syntax 'UTC Time' 887 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.34) 888 """ 889 oid: str = '1.3.6.1.4.1.1466.115.121.1.53' 890 desc: str = 'UTC Time' 891 892 893class NullTerminatedDirectoryString(DirectoryString): 894 """ 895 Plugin class for strings terminated with null-byte 896 """ 897 oid: str = 'NullTerminatedDirectoryString-oid' 898 desc: str = 'Directory String terminated by null-byte' 899 900 def sanitize(self, attr_value: bytes) -> bytes: 901 return attr_value + b'\x00' 902 903 def _validate(self, attr_value: bytes) -> bool: 904 return attr_value.endswith(b'\x00') 905 906 def form_value(self) -> str: 907 return self._app.ls.uc_decode((self._av or b'\x00')[:-1])[0] 908 909 def display(self, vidx, links) -> str: 910 return self._app.form.s2d( 911 self._app.ls.uc_decode((self._av or b'\x00')[:-1])[0] 912 ) 913 914 915class OtherMailbox(DirectoryString): 916 """ 917 Plugin class for LDAP syntax 'Other Mailbox' 918 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.27) 919 """ 920 oid: str = '1.3.6.1.4.1.1466.115.121.1.39' 921 desc: str = 'Other Mailbox' 922 charset = 'ascii' 923 924 925class Integer(IA5String): 926 """ 927 Plugin class for LDAP syntax 'Integer' 928 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.16) 929 """ 930 oid: str = '1.3.6.1.4.1.1466.115.121.1.27' 931 desc: str = 'Integer' 932 input_size: int = 12 933 min_value = None 934 max_value = None 935 936 def __init__(self, app, dn: str, schema, attrType: str, attr_value: bytes, entry=None): 937 IA5String.__init__(self, app, dn, schema, attrType, attr_value, entry) 938 if self.max_value is not None: 939 self.max_len = len(str(self.max_value)) 940 941 def _maxlen(self, fval): 942 min_value_len = max_value_len = fval_len = 0 943 if self.min_value is not None: 944 min_value_len = len(str(self.min_value)) 945 if self.max_value is not None: 946 max_value_len = len(str(self.max_value)) 947 if fval is not None: 948 fval_len = len(fval.encode(self._app.ls.charset)) 949 return max(self.input_size, fval_len, min_value_len, max_value_len) 950 951 def _validate(self, attr_value: bytes) -> bool: 952 try: 953 val = int(attr_value) 954 except ValueError: 955 return False 956 min_value, max_value = self.min_value, self.max_value 957 return ( 958 (min_value is None or val >= min_value) and 959 (max_value is None or val <= max_value) 960 ) 961 962 def sanitize(self, attr_value: bytes) -> bytes: 963 try: 964 return str(int(attr_value)).encode('ascii') 965 except ValueError: 966 return attr_value 967 968 def input_field(self) -> web_forms.Field: 969 fval = self.form_value() 970 max_len = self._maxlen(fval) 971 input_field = web_forms.Input( 972 self._at, 973 ': '.join([self._at, self.desc]), 974 max_len, 975 self.max_values, 976 self.input_pattern, 977 default=fval, 978 size=min(self.input_size, max_len), 979 ) 980 input_field.input_type = 'number' 981 return input_field 982 983 984class IPHostAddress(IA5String): 985 """ 986 Plugin class for string representation of IPv4 or IPv6 host address 987 """ 988 oid: str = 'IPHostAddress-oid' 989 desc: str = 'string representation of IPv4 or IPv6 address' 990 # Class in module ipaddr which parses address/network values 991 addr_class = None 992 sani_funcs = ( 993 bytes.strip, 994 ) 995 996 def _validate(self, attr_value: bytes) -> bool: 997 try: 998 addr = ipaddress.ip_address(attr_value.decode('ascii')) 999 except ValueError: 1000 return False 1001 return self.addr_class is None or isinstance(addr, self.addr_class) 1002 1003 1004class IPv4HostAddress(IPHostAddress): 1005 """ 1006 Plugin class for string representation of IPv4 host address 1007 """ 1008 oid: str = 'IPv4HostAddress-oid' 1009 desc: str = 'string representation of IPv4 address' 1010 addr_class = ipaddress.IPv4Address 1011 1012 1013class IPv6HostAddress(IPHostAddress): 1014 """ 1015 Plugin class for string representation of IPv6 host address 1016 """ 1017 oid: str = 'IPv6HostAddress-oid' 1018 desc: str = 'string representation of IPv6 address' 1019 addr_class = ipaddress.IPv6Address 1020 1021 1022class IPNetworkAddress(IPHostAddress): 1023 """ 1024 Plugin class for string representation of IPv4 or IPv6 network address 1025 """ 1026 oid: str = 'IPNetworkAddress-oid' 1027 desc: str = 'string representation of IPv4 or IPv6 network address/mask' 1028 1029 def _validate(self, attr_value: bytes) -> bool: 1030 try: 1031 addr = ipaddress.ip_network(attr_value.decode('ascii'), strict=False) 1032 except ValueError: 1033 return False 1034 return self.addr_class is None or isinstance(addr, self.addr_class) 1035 1036 1037class IPv4NetworkAddress(IPNetworkAddress): 1038 """ 1039 Plugin class for string representation of IPv4 network address 1040 """ 1041 oid: str = 'IPv4NetworkAddress-oid' 1042 desc: str = 'string representation of IPv4 network address/mask' 1043 addr_class = ipaddress.IPv4Network 1044 1045 1046class IPv6NetworkAddress(IPNetworkAddress): 1047 """ 1048 Plugin class for string representation of IPv6 network address 1049 """ 1050 oid: str = 'IPv6NetworkAddress-oid' 1051 desc: str = 'string representation of IPv6 network address/mask' 1052 addr_class = ipaddress.IPv6Network 1053 1054 1055class IPServicePortNumber(Integer): 1056 """ 1057 Plugin class for service port number (see /etc/services) 1058 """ 1059 oid: str = 'IPServicePortNumber-oid' 1060 desc: str = 'Port number for an UDP- or TCP-based service' 1061 min_value = 0 1062 max_value = 65535 1063 1064 1065class MacAddress(IA5String): 1066 """ 1067 Plugin class for IEEEE MAC addresses of network devices 1068 """ 1069 oid: str = 'MacAddress-oid' 1070 desc: str = 'MAC address in hex-colon notation' 1071 min_len: int = 17 1072 max_len: int = 17 1073 pattern = re.compile(r'^([0-9a-f]{2}\:){5}[0-9a-f]{2}$') 1074 1075 def sanitize(self, attr_value: bytes) -> bytes: 1076 attr_value = attr_value.translate(None, b'.-: ').lower().strip() 1077 if len(attr_value) == 12: 1078 return b':'.join([attr_value[i*2:i*2+2] for i in range(6)]) 1079 return attr_value 1080 1081 1082class Uri(DirectoryString): 1083 """ 1084 Plugin class for Uniform Resource Identifiers (URIs, see RFC 2079) 1085 """ 1086 oid: str = 'Uri-OID' 1087 desc: str = 'URI' 1088 pattern = re.compile(r'^(ftp|http|https|news|snews|ldap|ldaps|mailto):(|//)[^ ]*') 1089 sani_funcs = ( 1090 bytes.strip, 1091 ) 1092 1093 def display(self, vidx, links) -> str: 1094 attr_value = self.av_u 1095 try: 1096 url, label = attr_value.split(' ', 1) 1097 except ValueError: 1098 url, label = attr_value, attr_value 1099 display_url = '' 1100 else: 1101 display_url = ' (%s)' % (url) 1102 if ldap0.ldapurl.is_ldapurl(url): 1103 return '<a href="%s?%s">%s%s</a>' % ( 1104 self._app.form.script_name, 1105 self._app.form.s2d(url), 1106 self._app.form.s2d(label), 1107 self._app.form.s2d(display_url), 1108 ) 1109 if url.lower().find('javascript:') >= 0: 1110 return '<code>%s</code>' % ( 1111 DirectoryString.display(self, vidx=False, links=False) 1112 ) 1113 return '<a href="%s/urlredirect/%s?%s">%s%s</a>' % ( 1114 self._app.form.script_name, 1115 self._app.sid, 1116 self._app.form.s2d(url), 1117 self._app.form.s2d(label), 1118 self._app.form.s2d(display_url), 1119 ) 1120 1121 1122class Image(Binary): 1123 """ 1124 Plugin base class for attributes containing image data. 1125 """ 1126 oid: str = 'Image-OID' 1127 desc: str = 'Image base class' 1128 mime_type: str = 'application/octet-stream' 1129 file_ext: str = 'bin' 1130 imageFormat = None 1131 inline_maxlen = 630 # max. number of bytes to use data: URI instead of external URL 1132 1133 def _validate(self, attr_value: bytes) -> bool: 1134 return imghdr.what(None, attr_value) == self.imageFormat.lower() 1135 1136 def sanitize(self, attr_value: bytes) -> bytes: 1137 if not self._validate(attr_value) and PIL_AVAIL: 1138 try: 1139 with BytesIO(attr_value) as imgfile: 1140 img = PILImage.open(imgfile) 1141 imgfile.seek(0) 1142 img.save(imgfile, self.imageFormat) 1143 attr_value = imgfile.getvalue() 1144 except Exception as err: 1145 logger.warning( 1146 'Error converting image data (%d bytes) to %s: %r', 1147 len(attr_value), 1148 self.imageFormat, 1149 err, 1150 ) 1151 return attr_value 1152 1153 def display(self, vidx, links) -> str: 1154 maxwidth, maxheight = 100, 150 1155 width, height = None, None 1156 size_attr_html = '' 1157 if PIL_AVAIL: 1158 try: 1159 with BytesIO(self._av) as imgfile: 1160 img = PILImage.open(imgfile) 1161 except IOError: 1162 pass 1163 else: 1164 width, height = img.size 1165 if width > maxwidth: 1166 size_attr_html = 'width="%d" height="%d"' % ( 1167 maxwidth, 1168 int(float(maxwidth)/width*height), 1169 ) 1170 elif height > maxheight: 1171 size_attr_html = 'width="%d" height="%d"' % ( 1172 int(float(maxheight)/height*width), 1173 maxheight, 1174 ) 1175 else: 1176 size_attr_html = 'width="%d" height="%d"' % (width, height) 1177 attr_value_len = len(self._av) 1178 img_link = ( 1179 '%s/read/%s' 1180 '?dn=%s&read_attr=%s&read_attrindex=%d' 1181 ) % ( 1182 self._app.form.script_name, self._app.sid, 1183 urllib.parse.quote(self._dn), 1184 urllib.parse.quote(self._at), 1185 vidx, 1186 ) 1187 if attr_value_len <= self.inline_maxlen: 1188 return ( 1189 '<a href="%s">' 1190 '<img src="data:%s;base64,\n%s" alt="%d bytes of image data" %s>' 1191 '</a>' 1192 ) % ( 1193 img_link, 1194 self.mime_type, 1195 self._av.encode('base64'), 1196 attr_value_len, 1197 size_attr_html, 1198 ) 1199 return '<a href="%s"><img src="%s" alt="%d bytes of image data" %s></a>' % ( 1200 img_link, 1201 img_link, 1202 attr_value_len, 1203 size_attr_html, 1204 ) 1205 1206 1207class JPEGImage(Image): 1208 """ 1209 Plugin class for LDAP syntax 'JPEG' 1210 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.17) 1211 """ 1212 oid: str = '1.3.6.1.4.1.1466.115.121.1.28' 1213 desc: str = 'JPEG image' 1214 mime_type: str = 'image/jpeg' 1215 file_ext: str = 'jpg' 1216 imageFormat = 'JPEG' 1217 1218 1219class PhotoG3Fax(Binary): 1220 """ 1221 Plugin class for LDAP syntax 'Fax' 1222 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.12) 1223 """ 1224 oid: str = '1.3.6.1.4.1.1466.115.121.1.23' 1225 desc: str = 'Photo (G3 fax)' 1226 mime_type: str = 'image/g3fax' 1227 file_ext: str = 'tif' 1228 1229 1230class OID(IA5String): 1231 """ 1232 Plugin class for LDAP syntax 'OID' 1233 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.26) 1234 """ 1235 oid: str = '1.3.6.1.4.1.1466.115.121.1.38' 1236 desc: str = 'OID' 1237 pattern = re.compile(r'^([a-zA-Z]+[a-zA-Z0-9;-]*|[0-2]?\.([0-9]+\.)*[0-9]+)$') 1238 no_val_button_attrs = frozenset(( 1239 'objectclass', 1240 'structuralobjectclass', 1241 '2.5.4.0', 1242 '2.5.21.9', 1243 )) 1244 1245 def value_button(self, command, row, mode, link_text=None) -> str: 1246 if self._at.lower() in self.no_val_button_attrs: 1247 return '' 1248 return IA5String.value_button(self, command, row, mode, link_text=link_text) 1249 1250 def sanitize(self, attr_value: bytes) -> bytes: 1251 return attr_value.strip() 1252 1253 def display(self, vidx, links) -> str: 1254 try: 1255 name, description, reference = OID_REG[self.av_u] 1256 except (KeyError, ValueError): 1257 try: 1258 se_obj = self._schema.get_obj( 1259 ObjectClass, 1260 self.av_u, 1261 raise_keyerror=1, 1262 ) 1263 except KeyError: 1264 try: 1265 se_obj = self._schema.get_obj( 1266 AttributeType, 1267 self.av_u, 1268 raise_keyerror=1, 1269 ) 1270 except KeyError: 1271 return IA5String.display(self, vidx, links) 1272 return schema_anchor( 1273 self._app, 1274 self.av_u, 1275 AttributeType, 1276 name_template='{name}\n{anchor}', 1277 link_text='»', 1278 ) 1279 if self._at.lower() == 'structuralobjectclass': 1280 name_template = '{name}\n{anchor}' 1281 else: 1282 name_template = '{name}\n (%s){anchor}' % (OBJECTCLASS_KIND_STR[se_obj.kind],) 1283 # objectClass attribute is displayed with different function 1284 return schema_anchor( 1285 self._app, 1286 self.av_u, 1287 ObjectClass, 1288 name_template=name_template, 1289 link_text='»', 1290 ) 1291 return '<strong>%s</strong> (%s):<br>%s (see %s)' % ( 1292 self._app.form.s2d(name), 1293 IA5String.display(self, vidx, links), 1294 self._app.form.s2d(description), 1295 self._app.form.s2d(reference) 1296 ) 1297 1298 1299class LDAPUrl(Uri): 1300 """ 1301 Plugin class for attributes containing LDAP URLs 1302 """ 1303 oid: str = 'LDAPUrl-oid' 1304 desc: str = 'LDAP URL' 1305 1306 def _command_ldap_url(self, ldap_url): 1307 return ldap_url 1308 1309 def display(self, vidx, links) -> str: 1310 try: 1311 if links: 1312 linksstr = self._app.ldap_url_anchor( 1313 self._command_ldap_url(self.av_u), 1314 ) 1315 else: 1316 linksstr = '' 1317 except ValueError: 1318 return '<strong>Not a valid LDAP URL:</strong> %s' % ( 1319 self._app.form.s2d(repr(self._av)) 1320 ) 1321 return '<table><tr><td>%s</td><td><a href="%s">%s</a></td></tr></table>' % ( 1322 linksstr, 1323 self._app.form.s2d(self.av_u), 1324 self._app.form.s2d(self.av_u) 1325 ) 1326 1327 1328class OctetString(Binary): 1329 """ 1330 Plugin class for LDAP syntax 'Octet String' 1331 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.25) 1332 """ 1333 oid: str = '1.3.6.1.4.1.1466.115.121.1.40' 1334 desc: str = 'Octet String' 1335 editable: bool = True 1336 min_input_rows = 1 # minimum number of rows for input field 1337 max_input_rows = 15 # maximum number of rows for in input field 1338 bytes_split = 16 1339 1340 def sanitize(self, attr_value: bytes) -> bytes: 1341 attr_value = attr_value.translate(None, b': ,\r\n') 1342 try: 1343 res = binascii.unhexlify(attr_value) 1344 except binascii.Error: 1345 res = attr_value 1346 return res 1347 1348 def display(self, vidx, links) -> str: 1349 lines = [ 1350 ( 1351 '<tr>' 1352 '<td><code>%0.6X</code></td>' 1353 '<td><code>%s</code></td>' 1354 '<td><code>%s</code></td>' 1355 '</tr>' 1356 ) % ( 1357 i*self.bytes_split, 1358 ':'.join(c[j:j+1].hex().upper() for j in range(len(c))), 1359 self._app.form.s2d(ascii_dump(c), 'ascii'), 1360 ) 1361 for i, c in enumerate(chunks(self._av, self.bytes_split)) 1362 ] 1363 return '\n<table class="HexDump">\n%s\n</table>\n' % ('\n'.join(lines)) 1364 1365 def form_value(self) -> str: 1366 hex_av = (self._av or b'').hex().upper() 1367 hex_range = range(0, len(hex_av), 2) 1368 return str('\r\n'.join( 1369 chunks( 1370 ':'.join([hex_av[i:i+2] for i in hex_range]), 1371 self.bytes_split*3 1372 ) 1373 )) 1374 1375 def input_field(self) -> web_forms.Field: 1376 fval = self.form_value() 1377 return web_forms.Textarea( 1378 self._at, 1379 ': '.join([self._at, self.desc]), 1380 10000, 1, 1381 None, 1382 default=fval, 1383 rows=max(self.min_input_rows, min(self.max_input_rows, fval.count('\r\n'))), 1384 cols=49 1385 ) 1386 1387 1388class MultilineText(DirectoryString): 1389 """ 1390 Plugin base class for multi-line text. 1391 """ 1392 oid: str = 'MultilineText-oid' 1393 desc: str = 'Multiple lines of text' 1394 pattern = re.compile('^.*$', re.S+re.M) 1395 lineSep = b'\r\n' 1396 mime_type: str = 'text/plain' 1397 cols = 66 1398 min_input_rows = 1 # minimum number of rows for input field 1399 max_input_rows = 30 # maximum number of rows for in input field 1400 1401 def _split_lines(self, value): 1402 if self.lineSep: 1403 return value.split(self.lineSep) 1404 return [value] 1405 1406 def sanitize(self, attr_value: bytes) -> bytes: 1407 return attr_value.replace( 1408 b'\r', b'' 1409 ).replace( 1410 b'\n', self.lineSep 1411 ) 1412 1413 def display(self, vidx, links) -> str: 1414 return '<br>'.join([ 1415 self._app.form.s2d(self._app.ls.uc_decode(line_b)[0]) 1416 for line_b in self._split_lines(self._av) 1417 ]) 1418 1419 def form_value(self) -> str: 1420 splitted_lines = [ 1421 self._app.ls.uc_decode(line_b)[0] 1422 for line_b in self._split_lines(self._av or b'') 1423 ] 1424 return '\r\n'.join(splitted_lines) 1425 1426 def input_field(self) -> web_forms.Field: 1427 fval = self.form_value() 1428 return web_forms.Textarea( 1429 self._at, 1430 ': '.join([self._at, self.desc]), 1431 self.max_len, self.max_values, 1432 None, 1433 default=fval, 1434 rows=max(self.min_input_rows, min(self.max_input_rows, fval.count('\r\n'))), 1435 cols=self.cols 1436 ) 1437 1438 1439class PreformattedMultilineText(MultilineText): 1440 """ 1441 Plugin base class for multi-line text displayed with mono-spaced font, 1442 e.g. program code, XML, JSON etc. 1443 """ 1444 oid: str = 'PreformattedMultilineText-oid' 1445 cols = 66 1446 tab_identiation = ' ' 1447 1448 def display(self, vidx, links) -> str: 1449 lines = [ 1450 self._app.form.s2d( 1451 self._app.ls.uc_decode(line_b)[0], 1452 self.tab_identiation, 1453 ) 1454 for line_b in self._split_lines(self._av) 1455 ] 1456 return '<code>%s</code>' % '<br>'.join(lines) 1457 1458 1459class PostalAddress(MultilineText): 1460 """ 1461 Plugin class for LDAP syntax 'Postal Address' 1462 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.28) 1463 """ 1464 oid: str = '1.3.6.1.4.1.1466.115.121.1.41' 1465 desc: str = 'Postal Address' 1466 lineSep = b' $ ' 1467 cols = 40 1468 1469 def _split_lines(self, value): 1470 return [ 1471 v.strip() 1472 for v in value.split(self.lineSep.strip()) 1473 ] 1474 1475 def sanitize(self, attr_value: bytes) -> bytes: 1476 return attr_value.replace(b'\r', b'').replace(b'\n', self.lineSep) 1477 1478 1479class PrintableString(DirectoryString): 1480 """ 1481 Plugin class for LDAP syntax 'Printable String' 1482 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.29) 1483 """ 1484 oid: str = '1.3.6.1.4.1.1466.115.121.1.44' 1485 desc: str = 'Printable String' 1486 pattern = re.compile("^[a-zA-Z0-9'()+,.=/:? -]*$") 1487 charset = 'ascii' 1488 1489 1490class NumericString(PrintableString): 1491 """ 1492 Plugin class for LDAP syntax 'Numeric String' 1493 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.23) 1494 """ 1495 oid: str = '1.3.6.1.4.1.1466.115.121.1.36' 1496 desc: str = 'Numeric String' 1497 pattern = re.compile('^[ 0-9]+$') 1498 1499 1500class EnhancedGuide(PrintableString): 1501 """ 1502 Plugin class for LDAP syntax 'Enhanced Guide' 1503 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.10) 1504 """ 1505 oid: str = '1.3.6.1.4.1.1466.115.121.1.21' 1506 desc: str = 'Enhanced Search Guide' 1507 1508 1509class Guide(EnhancedGuide): 1510 """ 1511 Plugin class for LDAP syntax 'Search Guide' 1512 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.14) 1513 """ 1514 oid: str = '1.3.6.1.4.1.1466.115.121.1.25' 1515 desc: str = 'Search Guide' 1516 1517 1518class TelephoneNumber(PrintableString): 1519 """ 1520 Plugin class for LDAP syntax '' 1521 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.31) 1522 """ 1523 oid: str = '1.3.6.1.4.1.1466.115.121.1.50' 1524 desc: str = 'Telephone Number' 1525 pattern = re.compile('^[0-9+x(). /-]+$') 1526 1527 1528class FacsimileTelephoneNumber(TelephoneNumber): 1529 """ 1530 Plugin class for LDAP syntax 'Facsimile Telephone Number' 1531 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.11) 1532 """ 1533 oid: str = '1.3.6.1.4.1.1466.115.121.1.22' 1534 desc: str = 'Facsimile Number' 1535 pattern = re.compile( 1536 r'^[0-9+x(). /-]+' 1537 r'(\$' 1538 r'(twoDimensional|fineResolution|unlimitedLength|b4Length|a3Width|b4Width|uncompressed)' 1539 r')*$' 1540 ) 1541 1542 1543class TelexNumber(PrintableString): 1544 """ 1545 Plugin class for LDAP syntax 'Telex Number' 1546 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.33) 1547 """ 1548 oid: str = '1.3.6.1.4.1.1466.115.121.1.52' 1549 desc: str = 'Telex Number' 1550 pattern = re.compile("^[a-zA-Z0-9'()+,.=/:?$ -]*$") 1551 1552 1553class TeletexTerminalIdentifier(PrintableString): 1554 """ 1555 Plugin class for LDAP syntax 'Teletex Terminal Identifier' 1556 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.32) 1557 """ 1558 oid: str = '1.3.6.1.4.1.1466.115.121.1.51' 1559 desc: str = 'Teletex Terminal Identifier' 1560 1561 1562class ObjectGUID(LDAPSyntax): 1563 oid: str = 'ObjectGUID-oid' 1564 desc: str = 'Object GUID' 1565 charset = 'ascii' 1566 1567 def display(self, vidx, links) -> str: 1568 objectguid_str = ''.join([ 1569 '%02X' % ord(c) 1570 for c in self._av 1571 ]) 1572 return ldap0.ldapurl.LDAPUrl( 1573 ldapUrl=self._app.ls.uri, 1574 dn='GUID=%s' % (objectguid_str), 1575 who=None, cred=None 1576 ).htmlHREF( 1577 hrefText=objectguid_str, 1578 hrefTarget=None 1579 ) 1580 1581 1582class Date(IA5String): 1583 """ 1584 Plugin base class for a date without(!) time component. 1585 """ 1586 oid: str = 'Date-oid' 1587 desc: str = 'Date in syntax specified by class attribute storage_format' 1588 max_len: int = 10 1589 storage_format = '%Y-%m-%d' 1590 acceptable_formats = ( 1591 '%Y-%m-%d', 1592 '%d.%m.%Y', 1593 '%m/%d/%Y', 1594 ) 1595 1596 def _validate(self, attr_value: bytes) -> bool: 1597 try: 1598 datetime.datetime.strptime( 1599 self._app.ls.uc_decode(attr_value)[0], 1600 self.storage_format 1601 ) 1602 except (UnicodeDecodeError, ValueError): 1603 return False 1604 return True 1605 1606 def sanitize(self, attr_value: bytes) -> bytes: 1607 av_u = attr_value.strip().decode(self._app.ls.charset) 1608 result = attr_value 1609 for time_format in self.acceptable_formats: 1610 try: 1611 time_tuple = datetime.datetime.strptime(av_u, time_format) 1612 except ValueError: 1613 pass 1614 else: 1615 result = datetime.datetime.strftime(time_tuple, self.storage_format).encode('ascii') 1616 break 1617 return result # sanitize() 1618 1619 1620class NumstringDate(Date): 1621 """ 1622 Plugin class for a date using syntax YYYYMMDD typically 1623 using LDAP syntax Numstring. 1624 """ 1625 oid: str = 'NumstringDate-oid' 1626 desc: str = 'Date in syntax YYYYMMDD' 1627 pattern = re.compile('^[0-9]{4}[0-1][0-9][0-3][0-9]$') 1628 storage_format = '%Y%m%d' 1629 1630 1631class ISO8601Date(Date): 1632 """ 1633 Plugin class for a date using syntax YYYY-MM-DD (see ISO 8601). 1634 """ 1635 oid: str = 'ISO8601Date-oid' 1636 desc: str = 'Date in syntax YYYY-MM-DD (see ISO 8601)' 1637 pattern = re.compile('^[0-9]{4}-[0-1][0-9]-[0-3][0-9]$') 1638 storage_format = '%Y-%m-%d' 1639 1640 1641class DateOfBirth(ISO8601Date): 1642 """ 1643 Plugin class for date of birth syntax YYYY-MM-DD (see ISO 8601). 1644 1645 Displays the age based at current time. 1646 """ 1647 oid: str = 'DateOfBirth-oid' 1648 desc: str = 'Date of birth: syntax YYYY-MM-DD (see ISO 8601)' 1649 1650 @staticmethod 1651 def _age(birth_dt): 1652 birth_date = datetime.date( 1653 year=birth_dt.year, 1654 month=birth_dt.month, 1655 day=birth_dt.day, 1656 ) 1657 current_date = datetime.date.today() 1658 age = current_date.year - birth_date.year 1659 if birth_date.month > current_date.month or \ 1660 (birth_date.month == current_date.month and birth_date.day > current_date.day): 1661 age = age - 1 1662 return age 1663 1664 def _validate(self, attr_value: bytes) -> bool: 1665 try: 1666 birth_dt = datetime.datetime.strptime( 1667 self._app.ls.uc_decode(attr_value)[0], 1668 self.storage_format 1669 ) 1670 except ValueError: 1671 return False 1672 return self._age(birth_dt) >= 0 1673 1674 def display(self, vidx, links) -> str: 1675 raw_date = ISO8601Date.display(self, vidx, links) 1676 try: 1677 birth_dt = datetime.datetime.strptime(self.av_u, self.storage_format) 1678 except ValueError: 1679 return raw_date 1680 return '%s (%s years old)' % (raw_date, self._age(birth_dt)) 1681 1682 1683class SecondsSinceEpoch(Integer): 1684 """ 1685 Plugin class for seconds since epoch (1970-01-01 00:00:00). 1686 """ 1687 oid: str = 'SecondsSinceEpoch-oid' 1688 desc: str = 'Seconds since epoch (1970-01-01 00:00:00)' 1689 min_value = 0 1690 1691 def display(self, vidx, links) -> str: 1692 int_str = Integer.display(self, vidx, links) 1693 try: 1694 return '%s (%s)' % ( 1695 strftimeiso8601(time.gmtime(float(self._av))), 1696 int_str, 1697 ) 1698 except ValueError: 1699 return int_str 1700 1701 1702class DaysSinceEpoch(Integer): 1703 """ 1704 Plugin class for days since epoch (1970-01-01). 1705 """ 1706 oid: str = 'DaysSinceEpoch-oid' 1707 desc: str = 'Days since epoch (1970-01-01)' 1708 min_value = 0 1709 1710 def display(self, vidx, links) -> str: 1711 int_str = Integer.display(self, vidx, links) 1712 try: 1713 return '%s (%s)' % ( 1714 strftimeiso8601(time.gmtime(float(self._av)*86400)), 1715 int_str, 1716 ) 1717 except ValueError: 1718 return int_str 1719 1720 1721class Timespan(Integer): 1722 oid: str = 'Timespan-oid' 1723 desc: str = 'Time span in seconds' 1724 input_size: int = LDAPSyntax.input_size 1725 min_value = 0 1726 time_divisors = ( 1727 ('weeks', 604800), 1728 ('days', 86400), 1729 ('hours', 3600), 1730 ('mins', 60), 1731 ('secs', 1), 1732 ) 1733 sep = ',' 1734 1735 def sanitize(self, attr_value: bytes) -> bytes: 1736 if not attr_value: 1737 return attr_value 1738 try: 1739 result = repr2ts( 1740 self.time_divisors, 1741 self.sep, 1742 attr_value.decode('ascii') 1743 ).encode('ascii') 1744 except ValueError: 1745 result = Integer.sanitize(self, attr_value) 1746 return result 1747 1748 def form_value(self) -> str: 1749 if not self._av: 1750 return '' 1751 try: 1752 result = ts2repr(self.time_divisors, self.sep, self._av) 1753 except ValueError: 1754 result = Integer.form_value(self) 1755 return result 1756 1757 def input_field(self) -> web_forms.Field: 1758 return IA5String.input_field(self) 1759 1760 def display(self, vidx, links) -> str: 1761 try: 1762 result = self._app.form.s2d('%s (%s)' % ( 1763 ts2repr(self.time_divisors, self.sep, self.av_u), 1764 Integer.display(self, vidx, links) 1765 )) 1766 except ValueError: 1767 result = Integer.display(self, vidx, links) 1768 return result 1769 1770 1771class SelectList(DirectoryString): 1772 """ 1773 Base class for dictionary based select lists which 1774 should not be used directly 1775 """ 1776 oid: str = 'SelectList-oid' 1777 attr_value_dict: Dict[str, str] = {} # Mapping attribute value to attribute description 1778 input_fallback: bool = True # Fallback to normal input field if attr_value_dict is empty 1779 desc_sep: str = ' ' 1780 tag_tmpl: Dict[bool, str] = { 1781 False: '{attr_text}: {attr_value}', 1782 True: '<span title="{attr_title}">{attr_text}:{sep}{attr_value}</span>', 1783 } 1784 1785 def get_attr_value_dict(self) -> Dict[str, str]: 1786 # Enable empty value in any case 1787 attr_value_dict: Dict[str, str] = {'': '-/-'} 1788 attr_value_dict.update(self.attr_value_dict) 1789 return attr_value_dict 1790 1791 def _sorted_select_options(self): 1792 # First generate a set of all other currently available attribute values 1793 fval = DirectoryString.form_value(self) 1794 # Initialize a dictionary with all options 1795 vdict = self.get_attr_value_dict() 1796 # Remove other existing values from the options dict 1797 for val in self._entry.get(self._at, []): 1798 val = self._app.ls.uc_decode(val)[0] 1799 if val != fval: 1800 try: 1801 del vdict[val] 1802 except KeyError: 1803 pass 1804 # Add the current attribute value if needed 1805 if fval not in vdict: 1806 vdict[fval] = fval 1807 # Finally return the sorted option list 1808 result = [] 1809 for key, val in vdict.items(): 1810 if isinstance(val, str): 1811 result.append((key, val, None)) 1812 elif isinstance(val, tuple): 1813 result.append((key, val[0], val[1])) 1814 return sorted( 1815 result, 1816 key=lambda x: x[1].lower(), 1817 ) 1818 1819 def _validate(self, attr_value: bytes) -> bool: 1820 attr_value_dict: Dict[str, str] = self.get_attr_value_dict() 1821 return self._app.ls.uc_decode(attr_value)[0] in attr_value_dict 1822 1823 def display(self, vidx, links) -> str: 1824 attr_value_str = DirectoryString.display(self, vidx, links) 1825 attr_value_dict: Dict[str, str] = self.get_attr_value_dict() 1826 try: 1827 attr_value_desc = attr_value_dict[self.av_u] 1828 except KeyError: 1829 return attr_value_str 1830 try: 1831 attr_text, attr_title = attr_value_desc 1832 except ValueError: 1833 attr_text, attr_title = attr_value_desc, None 1834 if attr_text == attr_value_str: 1835 return attr_value_str 1836 return self.tag_tmpl[bool(attr_title)].format( 1837 attr_value=attr_value_str, 1838 sep=self.desc_sep, 1839 attr_text=self._app.form.s2d(attr_text), 1840 attr_title=self._app.form.s2d(attr_title or '') 1841 ) 1842 1843 def input_field(self) -> web_forms.Field: 1844 attr_value_dict: Dict[str, str] = self.get_attr_value_dict() 1845 if self.input_fallback and \ 1846 (not attr_value_dict or not list(filter(None, attr_value_dict.keys()))): 1847 return DirectoryString.input_field(self) 1848 field = web_forms.Select( 1849 self._at, 1850 ': '.join([self._at, self.desc]), 1851 1, 1852 options=self._sorted_select_options(), 1853 default=self.form_value(), 1854 required=0 1855 ) 1856 field.charset = self._app.form.accept_charset 1857 return field 1858 1859 1860class PropertiesSelectList(SelectList): 1861 """ 1862 Plugin base class for attribute value select lists of LDAP syntax DirectoryString 1863 constructed and validated by reading a properties file. 1864 """ 1865 oid: str = 'PropertiesSelectList-oid' 1866 properties_pathname: Optional[str] = None 1867 properties_charset: str = 'utf-8' 1868 properties_delimiter: str = '=' 1869 1870 def get_attr_value_dict(self) -> Dict[str, str]: 1871 attr_value_dict: Dict[str, str] = SelectList.get_attr_value_dict(self) 1872 real_path_name = get_variant_filename( 1873 self.properties_pathname, 1874 self._app.form.accept_language 1875 ) 1876 with open(real_path_name, 'rb') as prop_file: 1877 for line in prop_file.readlines(): 1878 line = line.decode(self.properties_charset).strip() 1879 if line and not line.startswith('#'): 1880 key, value = line.split(self.properties_delimiter, 1) 1881 attr_value_dict[key.strip()] = value.strip() 1882 return attr_value_dict 1883 # end of get_attr_value_dict() 1884 1885 1886class DynamicValueSelectList(SelectList, DirectoryString): 1887 """ 1888 Plugin base class for attribute value select lists of LDAP syntax DirectoryString 1889 constructed and validated by internal LDAP search. 1890 """ 1891 oid: str = 'DynamicValueSelectList-oid' 1892 ldap_url: Optional[str] = None 1893 value_prefix: str = '' 1894 value_suffix: str = '' 1895 ignored_errors = ( 1896 ldap0.NO_SUCH_OBJECT, 1897 ldap0.SIZELIMIT_EXCEEDED, 1898 ldap0.TIMELIMIT_EXCEEDED, 1899 ldap0.PARTIAL_RESULTS, 1900 ldap0.INSUFFICIENT_ACCESS, 1901 ldap0.CONSTRAINT_VIOLATION, 1902 ldap0.REFERRAL, 1903 ) 1904 1905 def __init__(self, app, dn: str, schema, attrType: str, attr_value: bytes, entry=None): 1906 self.lu_obj = ldap0.ldapurl.LDAPUrl(self.ldap_url) 1907 self.min_len = len(self.value_prefix)+len(self.value_suffix) 1908 SelectList.__init__(self, app, dn, schema, attrType, attr_value, entry) 1909 1910 def _filterstr(self): 1911 return self.lu_obj.filterstr or '(objectClass=*)' 1912 1913 def _search_ref(self, attr_value: str): 1914 attr_value = attr_value[len(self.value_prefix):-len(self.value_suffix) or None] 1915 search_filter = '(&%s(%s=%s))' % ( 1916 self._filterstr(), 1917 self.lu_obj.attrs[0], 1918 attr_value, 1919 ) 1920 try: 1921 ldap_result = self._app.ls.l.search_s( 1922 self._search_root(), 1923 self.lu_obj.scope, 1924 search_filter, 1925 attrlist=self.lu_obj.attrs, 1926 sizelimit=2, 1927 ) 1928 except ( 1929 ldap0.NO_SUCH_OBJECT, 1930 ldap0.CONSTRAINT_VIOLATION, 1931 ldap0.INSUFFICIENT_ACCESS, 1932 ldap0.REFERRAL, 1933 ldap0.SIZELIMIT_EXCEEDED, 1934 ldap0.TIMELIMIT_EXCEEDED, 1935 ): 1936 return None 1937 # Filter out LDAP referrals 1938 ldap_result = [ 1939 (sre.dn_s, sre.entry_s) 1940 for sre in ldap_result 1941 if isinstance(sre, SearchResultEntry) 1942 ] 1943 if ldap_result and len(ldap_result) == 1: 1944 return ldap_result[0] 1945 return None 1946 1947 def _validate(self, attr_value: bytes) -> bool: 1948 av_u = self._app.ls.uc_decode(attr_value)[0] 1949 if ( 1950 not av_u.startswith(self.value_prefix) or 1951 not av_u.endswith(self.value_suffix) or 1952 len(av_u) < self.min_len or 1953 (self.max_len is not None and len(av_u) > self.max_len) 1954 ): 1955 return False 1956 return self._search_ref(av_u) is not None 1957 1958 def display(self, vidx, links) -> str: 1959 if links and self.lu_obj.attrs: 1960 ref_result = self._search_ref(self.av_u) 1961 if ref_result: 1962 ref_dn, ref_entry = ref_result 1963 try: 1964 attr_value_desc = ref_entry[self.lu_obj.attrs[1]][0] 1965 except (KeyError, IndexError): 1966 display_text, link_html = '', '' 1967 else: 1968 if self.lu_obj.attrs[0].lower() == self.lu_obj.attrs[1].lower(): 1969 display_text = '' 1970 else: 1971 display_text = self._app.form.s2d(attr_value_desc+':') 1972 if links: 1973 link_html = self._app.anchor( 1974 'read', '»', 1975 [('dn', ref_dn)], 1976 ) 1977 else: 1978 link_html = '' 1979 else: 1980 display_text, link_html = '', '' 1981 else: 1982 display_text, link_html = '', '' 1983 return ' '.join(( 1984 display_text, 1985 DirectoryString.display(self, vidx, links), 1986 link_html, 1987 )) 1988 1989 def _search_root(self) -> str: 1990 ldap_url_dn = self.lu_obj.dn 1991 if ldap_url_dn == '_': 1992 result_dn = str(self._app.naming_context) 1993 elif ldap_url_dn == '.': 1994 result_dn = self._dn 1995 elif ldap_url_dn == '..': 1996 result_dn = str(self.dn.parent()) 1997 elif ldap_url_dn.endswith(',_'): 1998 result_dn = ','.join((ldap_url_dn[:-2], str(self._app.naming_context))) 1999 elif ldap_url_dn.endswith(',.'): 2000 result_dn = ','.join((ldap_url_dn[:-2], self._dn)) 2001 elif ldap_url_dn.endswith(',..'): 2002 result_dn = ','.join((ldap_url_dn[:-3], str(self.dn.parent()))) 2003 else: 2004 result_dn = ldap_url_dn 2005 if result_dn.endswith(','): 2006 result_dn = result_dn[:-1] 2007 return result_dn 2008 # end of _search_root() 2009 2010 def get_attr_value_dict(self) -> Dict[str, str]: 2011 attr_value_dict: Dict[str, str] = SelectList.get_attr_value_dict(self) 2012 if self.lu_obj.hostport: 2013 raise ValueError( 2014 'Connecting to other server not supported! hostport attribute was %r' % ( 2015 self.lu_obj.hostport 2016 ) 2017 ) 2018 search_scope = self.lu_obj.scope or ldap0.SCOPE_BASE 2019 search_attrs = (self.lu_obj.attrs or []) + ['description', 'info'] 2020 # Use the existing LDAP connection as current user 2021 try: 2022 ldap_result = self._app.ls.l.search_s( 2023 self._search_root(), 2024 search_scope, 2025 filterstr=self._filterstr(), 2026 attrlist=search_attrs, 2027 ) 2028 except self.ignored_errors: 2029 return {} 2030 if search_scope == ldap0.SCOPE_BASE: 2031 # When reading a single entry we build the map from a single multi-valued attribute 2032 assert len(self.lu_obj.attrs or []) == 1, ValueError( 2033 'attrlist in ldap_url must be of length 1 if scope is base, got %r' % ( 2034 self.lu_obj.attrs, 2035 ) 2036 ) 2037 list_attr = self.lu_obj.attrs[0] 2038 attr_values_u = [ 2039 ''.join(( 2040 self.value_prefix, 2041 attr_value, 2042 self.value_suffix, 2043 )) 2044 for attr_value in ldap_result[0].entry_s[list_attr] 2045 ] 2046 attr_value_dict: Dict[str, str] = { 2047 u: u 2048 for u in attr_values_u 2049 } 2050 else: 2051 if not self.lu_obj.attrs: 2052 option_value_map, option_text_map = (None, None) 2053 elif len(self.lu_obj.attrs) == 1: 2054 option_value_map, option_text_map = (None, self.lu_obj.attrs[0]) 2055 elif len(self.lu_obj.attrs) >= 2: 2056 option_value_map, option_text_map = self.lu_obj.attrs[:2] 2057 for sre in ldap_result: 2058 # Check whether it's a real search result (skip search continuations) 2059 if not isinstance(sre, SearchResultEntry): 2060 continue 2061 sre.entry_s[None] = [sre.dn_s] 2062 try: 2063 option_value = ''.join(( 2064 self.value_prefix, 2065 sre.entry_s[option_value_map][0], 2066 self.value_suffix, 2067 )) 2068 except KeyError: 2069 pass 2070 else: 2071 try: 2072 option_text = sre.entry_s[option_text_map][0] 2073 except KeyError: 2074 option_text = option_value 2075 option_title = sre.entry_s.get('description', sre.entry_s.get('info', ['']))[0] 2076 if option_title: 2077 attr_value_dict[option_value] = (option_text, option_title) 2078 else: 2079 attr_value_dict[option_value] = option_text 2080 return attr_value_dict 2081 # end of get_attr_value_dict() 2082 2083 2084class DynamicDNSelectList(DynamicValueSelectList, DistinguishedName): 2085 """ 2086 Plugin base class for attribute value select lists of LDAP syntax DN 2087 constructed and validated by internal LDAP search. 2088 """ 2089 oid: str = 'DynamicDNSelectList-oid' 2090 2091 def _get_ref_entry(self, dn: str, attrlist=None) -> dict: 2092 try: 2093 sre = self._app.ls.l.read_s( 2094 dn, 2095 attrlist=attrlist or self.lu_obj.attrs, 2096 filterstr=self._filterstr(), 2097 ) 2098 except ( 2099 ldap0.NO_SUCH_OBJECT, 2100 ldap0.CONSTRAINT_VIOLATION, 2101 ldap0.INSUFFICIENT_ACCESS, 2102 ldap0.INVALID_DN_SYNTAX, 2103 ldap0.REFERRAL, 2104 ): 2105 return None 2106 if sre is None: 2107 return None 2108 return sre.entry_s 2109 2110 def _validate(self, attr_value: bytes) -> bool: 2111 return SelectList._validate(self, attr_value) 2112 2113 def display(self, vidx, links) -> str: 2114 if links and self.lu_obj.attrs: 2115 ref_entry = self._get_ref_entry(self.av_u) or {} 2116 try: 2117 attr_value_desc = ref_entry[self.lu_obj.attrs[0]][0] 2118 except (KeyError, IndexError): 2119 display_text = '' 2120 else: 2121 display_text = self._app.form.s2d(attr_value_desc+': ') 2122 else: 2123 display_text = '' 2124 return self.desc_sep.join(( 2125 display_text, 2126 DistinguishedName.display(self, vidx, links) 2127 )) 2128 2129 2130class DerefDynamicDNSelectList(DynamicDNSelectList): 2131 """ 2132 Plugin base class for attribute value select lists of LDAP syntax DN 2133 constructed and validated by internal LDAP search. 2134 2135 Same as DynamicDNSelectList except that Dereference extended control is used. 2136 """ 2137 oid: str = 'DerefDynamicDNSelectList-oid' 2138 2139 def _get_ref_entry(self, dn: str, attrlist=None) -> dict: 2140 deref_crtl = DereferenceControl( 2141 True, 2142 {self._at: self.lu_obj.attrs or ['entryDN']} 2143 ) 2144 try: 2145 ldap_result = self._app.ls.l.search_s( 2146 self._dn, 2147 ldap0.SCOPE_BASE, 2148 filterstr='(objectClass=*)', 2149 attrlist=['1.1'], 2150 req_ctrls=[deref_crtl], 2151 )[0] 2152 except ( 2153 ldap0.NO_SUCH_OBJECT, 2154 ldap0.CONSTRAINT_VIOLATION, 2155 ldap0.INSUFFICIENT_ACCESS, 2156 ldap0.INVALID_DN_SYNTAX, 2157 ldap0.REFERRAL, 2158 ): 2159 return None 2160 if ldap_result is None or not ldap_result.ctrls: 2161 return None 2162 for ref in ldap_result.ctrls[0].derefRes[self._at]: 2163 if ref.dn_s == dn: 2164 return ref.entry_s 2165 return None 2166 2167 2168class Boolean(SelectList, IA5String): 2169 """ 2170 Plugin class for LDAP syntax 'Boolean' 2171 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.3) 2172 """ 2173 oid: str = '1.3.6.1.4.1.1466.115.121.1.7' 2174 desc: str = 'Boolean' 2175 attr_value_dict: Dict[str, str] = { 2176 'TRUE': 'TRUE', 2177 'FALSE': 'FALSE', 2178 } 2179 2180 def get_attr_value_dict(self) -> Dict[str, str]: 2181 attr_value_dict: Dict[str, str] = SelectList.get_attr_value_dict(self) 2182 if self._av and self._av.lower() == self._av: 2183 for key, val in attr_value_dict.items(): 2184 del attr_value_dict[key] 2185 attr_value_dict[key.lower()] = val.lower() 2186 return attr_value_dict 2187 2188 def _validate(self, attr_value: bytes) -> bool: 2189 if not self._av and attr_value.lower() == attr_value: 2190 return SelectList._validate(self, attr_value.upper()) 2191 return SelectList._validate(self, attr_value) 2192 2193 def display(self, vidx, links) -> str: 2194 return IA5String.display(self, vidx, links) 2195 2196 2197class CountryString(PropertiesSelectList): 2198 """ 2199 Plugin class for LDAP syntax 'Country String' 2200 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.4) 2201 """ 2202 oid: str = '1.3.6.1.4.1.1466.115.121.1.11' 2203 desc: str = 'Two letter country string as listed in ISO 3166-2' 2204 properties_pathname = os.path.join( 2205 ETC_DIR, 'properties', 'attribute_select_c.properties' 2206 ) 2207 sani_funcs = ( 2208 bytes.strip, 2209 ) 2210 2211 2212class DeliveryMethod(PrintableString): 2213 """ 2214 Plugin class for LDAP syntax 'Delivery Method' 2215 (see https://datatracker.ietf.org/doc/html/rfc4517#section-3.3.5) 2216 """ 2217 oid: str = '1.3.6.1.4.1.1466.115.121.1.14' 2218 desc: str = 'Delivery Method' 2219 pdm = '(any|mhs|physical|telex|teletex|g3fax|g4fax|ia5|videotex|telephone)' 2220 pattern = re.compile('^%s[ $]*%s$' % (pdm, pdm)) 2221 2222 2223class BitArrayInteger(MultilineText, Integer): 2224 """ 2225 Plugin class for attributes with Integer syntax where the integer 2226 value is interpreted as binary flags 2227 """ 2228 oid: str = 'BitArrayInteger-oid' 2229 flag_desc_table: Sequence[Tuple[str, int]] = tuple() 2230 true_false_desc: Dict[bool, str] = { 2231 False: '-', 2232 True: '+', 2233 } 2234 2235 def __init__(self, app, dn: str, schema, attrType: str, attr_value: bytes, entry=None): 2236 Integer.__init__(self, app, dn, schema, attrType, attr_value, entry) 2237 self.flag_desc2int = dict(self.flag_desc_table) 2238 self.flag_int2desc = { 2239 j: i 2240 for i, j in self.flag_desc_table 2241 } 2242 self.max_value = sum([j for i, j in self.flag_desc_table]) 2243 self.min_input_rows = self.max_input_rows = max(len(self.flag_desc_table), 1) 2244 2245 def sanitize(self, attr_value: bytes) -> bytes: 2246 try: 2247 av_u = attr_value.decode('ascii') 2248 except UnicodeDecodeError: 2249 return attr_value 2250 try: 2251 result = int(av_u) 2252 except ValueError: 2253 result = 0 2254 for row in av_u.split('\n'): 2255 row = row.strip() 2256 try: 2257 flag_set, flag_desc = row[0:1], row[1:] 2258 except IndexError: 2259 pass 2260 else: 2261 if flag_set == '+': 2262 try: 2263 result = result | self.flag_desc2int[flag_desc] 2264 except KeyError: 2265 pass 2266 return str(result).encode('ascii') 2267 2268 def form_value(self) -> str: 2269 attr_value_int = int(self.av_u or 0) 2270 flag_lines = [ 2271 ''.join(( 2272 self.true_false_desc[int((attr_value_int & flag_int) > 0)], 2273 flag_desc 2274 )) 2275 for flag_desc, flag_int in self.flag_desc_table 2276 ] 2277 return '\r\n'.join(flag_lines) 2278 2279 def input_field(self) -> web_forms.Field: 2280 fval = self.form_value() 2281 return web_forms.Textarea( 2282 self._at, 2283 ': '.join([self._at, self.desc]), 2284 self.max_len, self.max_values, 2285 None, 2286 default=fval, 2287 rows=max(self.min_input_rows, min(self.max_input_rows, fval.count('\n'))), 2288 cols=max([len(desc) for desc, _ in self.flag_desc_table])+1 2289 ) 2290 2291 def display(self, vidx, links) -> str: 2292 av_i = int(self._av) 2293 return ( 2294 '%s<br>' 2295 '<table summary="Flags">' 2296 '<tr><th>Property flag</th><th>Value</th><th>Status</th></tr>' 2297 '%s' 2298 '</table>' 2299 ) % ( 2300 Integer.display(self, vidx, links), 2301 '\n'.join([ 2302 '<tr><td>%s</td><td>%s</td><td>%s</td></tr>' % ( 2303 self._app.form.s2d(desc), 2304 hex(flag_value), 2305 {False: '-', True: 'on'}[(av_i & flag_value) > 0] 2306 ) 2307 for desc, flag_value in self.flag_desc_table 2308 ]) 2309 ) 2310 2311 2312class GSER(DirectoryString): 2313 """ 2314 Generic String Encoding Rules (GSER) for ASN.1 Types (see RFC 3641) 2315 """ 2316 oid: str = 'GSER-oid' 2317 desc: str = 'GSER syntax (see RFC 3641)' 2318 2319 2320class UUID(IA5String): 2321 """ 2322 Plugin class for Universally Unique IDentifier (UUID), see RFC 4122 2323 """ 2324 oid: str = '1.3.6.1.1.16.1' 2325 desc: str = 'UUID' 2326 pattern = re.compile( 2327 '^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$' 2328 ) 2329 2330 def sanitize(self, attr_value: bytes) -> bytes: 2331 try: 2332 return str(uuid.UUID(attr_value.decode('ascii').replace(':', ''))).encode('ascii') 2333 except ValueError: 2334 return attr_value 2335 2336 2337class DNSDomain(IA5String): 2338 """ 2339 Plugin class for fully-qualified DNS domain names 2340 """ 2341 oid: str = 'DNSDomain-oid' 2342 desc: str = 'DNS domain name (see RFC 1035)' 2343 pattern = re.compile(r'^(\*|[a-zA-Z0-9_-]+)(\.[a-zA-Z0-9_-]+)*$') 2344 # see https://datatracker.ietf.org/doc/html/rfc2181#section-11 2345 max_len: int = min(255, IA5String.max_len) 2346 sani_funcs = ( 2347 bytes.lower, 2348 bytes.strip, 2349 ) 2350 2351 def sanitize(self, attr_value: bytes) -> bytes: 2352 attr_value = IA5String.sanitize(self, attr_value) 2353 return b'.'.join([ 2354 dc.encode('idna') 2355 for dc in attr_value.decode(self._app.form.accept_charset).split('.') 2356 ]) 2357 2358 def form_value(self) -> str: 2359 try: 2360 result = '.'.join([ 2361 dc.decode('idna') 2362 for dc in (self._av or b'').split(b'.') 2363 ]) 2364 except UnicodeDecodeError: 2365 result = '!!!snipped because of UnicodeDecodeError!!!' 2366 return result 2367 2368 def display(self, vidx, links) -> str: 2369 if self.av_u != self._av.decode('idna'): 2370 return '%s (%s)' % ( 2371 IA5String.display(self, vidx, links), 2372 self._app.form.s2d(self.form_value()) 2373 ) 2374 return IA5String.display(self, vidx, links) 2375 2376 2377class RFC822Address(DNSDomain, IA5String): 2378 """ 2379 Plugin class for RFC 822 addresses 2380 """ 2381 oid: str = 'RFC822Address-oid' 2382 desc: str = 'RFC 822 mail address' 2383 pattern = re.compile(r'^[\w@.+=/_ ()-]+@[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)*$') 2384 html_tmpl = '<a href="mailto:{av}">{av}</a>' 2385 2386 def __init__(self, app, dn: str, schema, attrType: str, attr_value: bytes, entry=None): 2387 IA5String.__init__(self, app, dn, schema, attrType, attr_value, entry) 2388 2389 def form_value(self) -> str: 2390 if not self._av: 2391 return IA5String.form_value(self) 2392 try: 2393 localpart, domainpart = self._av.rsplit(b'@') 2394 except ValueError: 2395 return IA5String.form_value(self) 2396 dns_domain = DNSDomain(self._app, self._dn, self._schema, None, domainpart) 2397 return '@'.join(( 2398 localpart.decode(self._app.ls.charset), 2399 dns_domain.form_value() 2400 )) 2401 2402 def sanitize(self, attr_value: bytes) -> bytes: 2403 try: 2404 localpart, domainpart = attr_value.rsplit(b'@') 2405 except ValueError: 2406 return attr_value 2407 else: 2408 return b'@'.join(( 2409 localpart, 2410 DNSDomain.sanitize(self, domainpart) 2411 )) 2412 2413 2414class DomainComponent(DNSDomain): 2415 """ 2416 Plugin class for a single DNS label 2417 (see https://datatracker.ietf.org/doc/html/rfc2181#section-11) 2418 """ 2419 oid: str = 'DomainComponent-oid' 2420 desc: str = 'DNS domain name component' 2421 pattern = re.compile(r'^(\*|[a-zA-Z0-9_-]+)$') 2422 max_len: int = min(63, DNSDomain.max_len) 2423 2424 2425class JSONValue(PreformattedMultilineText): 2426 """ 2427 Plugin class used for JSON data (see RFC 8259) 2428 """ 2429 oid: str = 'JSONValue-oid' 2430 desc: str = 'JSON data' 2431 lineSep = b'\n' 2432 mime_type: str = 'application/json' 2433 2434 def _validate(self, attr_value: bytes) -> bool: 2435 try: 2436 json.loads(attr_value) 2437 except ValueError: 2438 return False 2439 return True 2440 2441 def _split_lines(self, value): 2442 try: 2443 obj = json.loads(value) 2444 except ValueError: 2445 return PreformattedMultilineText._split_lines(self, value) 2446 return PreformattedMultilineText._split_lines( 2447 self, 2448 json.dumps( 2449 obj, 2450 indent=4, 2451 separators=(',', ': ') 2452 ).encode('utf-8') 2453 ) 2454 2455 def sanitize(self, attr_value: bytes) -> bytes: 2456 try: 2457 obj = json.loads(attr_value) 2458 except ValueError: 2459 return PreformattedMultilineText.sanitize(self, attr_value) 2460 return json.dumps( 2461 obj, 2462 separators=(',', ':') 2463 ).encode('utf-8') 2464 2465 2466class XmlValue(PreformattedMultilineText): 2467 """ 2468 Plugin class used for XML data 2469 """ 2470 oid: str = 'XmlValue-oid' 2471 desc: str = 'XML data' 2472 lineSep = b'\n' 2473 mime_type: str = 'text/xml' 2474 2475 def _validate(self, attr_value: bytes) -> bool: 2476 if not DEFUSEDXML_AVAIL: 2477 return PreformattedMultilineText._validate(self, attr_value) 2478 try: 2479 defusedxml.ElementTree.XML(attr_value) 2480 except defusedxml.ElementTree.ParseError: 2481 return False 2482 return True 2483 2484 2485class ASN1Object(Binary): 2486 """ 2487 Plugin class used for BER-encoded ASN.1 data 2488 """ 2489 oid: str = 'ASN1Object-oid' 2490 desc: str = 'BER-encoded ASN.1 data' 2491 2492 2493class AlgorithmOID(OID): 2494 """ 2495 This base-class class is used for OIDs of cryptographic algorithms 2496 """ 2497 oid: str = 'AlgorithmOID-oid' 2498 2499 2500class HashAlgorithmOID(SelectList, AlgorithmOID): 2501 """ 2502 Plugin class for selection of OIDs for hash algorithms 2503 (see https://www.iana.org/assignments/hash-function-text-names/). 2504 """ 2505 oid: str = 'HashAlgorithmOID-oid' 2506 desc: str = 'values from https://www.iana.org/assignments/hash-function-text-names/' 2507 attr_value_dict: Dict[str, str] = { 2508 '1.2.840.113549.2.2': 'md2', # [RFC3279] 2509 '1.2.840.113549.2.5': 'md5', # [RFC3279] 2510 '1.3.14.3.2.26': 'sha-1', # [RFC3279] 2511 '2.16.840.1.101.3.4.2.4': 'sha-224', # [RFC4055] 2512 '2.16.840.1.101.3.4.2.1': 'sha-256', # [RFC4055] 2513 '2.16.840.1.101.3.4.2.2': 'sha-384', # [RFC4055] 2514 '2.16.840.1.101.3.4.2.3': 'sha-512', # [RFC4055] 2515 } 2516 2517 2518class HMACAlgorithmOID(SelectList, AlgorithmOID): 2519 """ 2520 Plugin class for selection of OIDs for HMAC algorithms (see RFC 8018). 2521 """ 2522 oid: str = 'HMACAlgorithmOID-oid' 2523 desc: str = 'values from RFC 8018' 2524 attr_value_dict: Dict[str, str] = { 2525 # from RFC 8018 2526 '1.2.840.113549.2.7': 'hmacWithSHA1', 2527 '1.2.840.113549.2.8': 'hmacWithSHA224', 2528 '1.2.840.113549.2.9': 'hmacWithSHA256', 2529 '1.2.840.113549.2.10': 'hmacWithSHA384', 2530 '1.2.840.113549.2.11': 'hmacWithSHA512', 2531 } 2532 2533 2534class ComposedAttribute(LDAPSyntax): 2535 """ 2536 This mix-in plugin class composes attribute values from other attribute values. 2537 2538 One can define an ordered sequence of string templates in class 2539 attribute ComposedDirectoryString.compose_templates. 2540 See examples in module web2ldap.app.plugins.inetorgperson. 2541 2542 Obviously this only works for single-valued attributes, 2543 more precisely only the "first" attribute value is used. 2544 """ 2545 oid: str = 'ComposedDirectoryString-oid' 2546 compose_templates: Sequence[str] = () 2547 2548 class SingleValueDict(dict): 2549 """ 2550 dictionary-like class which only stores and returns the 2551 first value of an attribute value list 2552 """ 2553 2554 def __init__(self, entry, encoding): 2555 dict.__init__(self) 2556 self._encoding = encoding 2557 entry = entry or {} 2558 for key, val in entry.items(): 2559 self.__setitem__(key, val) 2560 2561 def __setitem__(self, key, val): 2562 if val and val[0]: 2563 dict.__setitem__(self, key, val[0].decode(self._encoding)) 2564 2565 def form_value(self) -> str: 2566 """ 2567 Return a dummy value that attribute is returned from input form and 2568 then seen by .transmute() 2569 """ 2570 return '' 2571 2572 def transmute(self, attr_values: List[bytes]) -> List[bytes]: 2573 """ 2574 always returns a list with a single value based on the first 2575 successfully applied compose template 2576 """ 2577 entry = self.SingleValueDict(self._entry, encoding=self._app.ls.charset) 2578 for template in self.compose_templates: 2579 try: 2580 attr_values = [template.format(**entry).encode(self._app.ls.charset)] 2581 except KeyError: 2582 continue 2583 else: 2584 break 2585 else: 2586 return attr_values 2587 return attr_values 2588 2589 def input_field(self) -> web_forms.Field: 2590 """ 2591 composed attributes must only have hidden input field 2592 """ 2593 input_field = web_forms.HiddenInput( 2594 self._at, 2595 ': '.join([self._at, self.desc]), 2596 self.max_len, 2597 self.max_values, 2598 None, 2599 default=self.form_value(), 2600 ) 2601 input_field.charset = self._app.form.accept_charset 2602 return input_field 2603 2604 2605class LDAPv3ResultCode(SelectList): 2606 """ 2607 Plugin base class for attributes with Integer syntax 2608 constrained to valid LDAP result code. 2609 """ 2610 oid: str = 'LDAPResultCode-oid' 2611 desc: str = 'LDAPv3 declaration of resultCode in (see RFC 4511)' 2612 attr_value_dict: Dict[str, str] = { 2613 '0': 'success', 2614 '1': 'operationsError', 2615 '2': 'protocolError', 2616 '3': 'timeLimitExceeded', 2617 '4': 'sizeLimitExceeded', 2618 '5': 'compareFalse', 2619 '6': 'compareTrue', 2620 '7': 'authMethodNotSupported', 2621 '8': 'strongerAuthRequired', 2622 '9': 'reserved', 2623 '10': 'referral', 2624 '11': 'adminLimitExceeded', 2625 '12': 'unavailableCriticalExtension', 2626 '13': 'confidentialityRequired', 2627 '14': 'saslBindInProgress', 2628 '16': 'noSuchAttribute', 2629 '17': 'undefinedAttributeType', 2630 '18': 'inappropriateMatching', 2631 '19': 'constraintViolation', 2632 '20': 'attributeOrValueExists', 2633 '21': 'invalidAttributeSyntax', 2634 '32': 'noSuchObject', 2635 '33': 'aliasProblem', 2636 '34': 'invalidDNSyntax', 2637 '35': 'reserved for undefined isLeaf', 2638 '36': 'aliasDereferencingProblem', 2639 '48': 'inappropriateAuthentication', 2640 '49': 'invalidCredentials', 2641 '50': 'insufficientAccessRights', 2642 '51': 'busy', 2643 '52': 'unavailable', 2644 '53': 'unwillingToPerform', 2645 '54': 'loopDetect', 2646 '64': 'namingViolation', 2647 '65': 'objectClassViolation', 2648 '66': 'notAllowedOnNonLeaf', 2649 '67': 'notAllowedOnRDN', 2650 '68': 'entryAlreadyExists', 2651 '69': 'objectClassModsProhibited', 2652 '70': 'reserved for CLDAP', 2653 '71': 'affectsMultipleDSAs', 2654 '80': 'other', 2655 } 2656 2657 2658class SchemaDescription(DirectoryString): 2659 oid: str = 'SchemaDescription-oid' 2660 schema_cls = None 2661 sani_funcs = ( 2662 bytes.strip, 2663 ) 2664 2665 def _validate(self, attr_value: bytes) -> bool: 2666 if self.schema_cls is None: 2667 return DirectoryString._validate(self, attr_value) 2668 try: 2669 _ = self.schema_cls(self._app.ls.uc_decode(attr_value)[0]) 2670 except (IndexError, ValueError): 2671 return False 2672 return True 2673 2674 2675class ObjectClassDescription(SchemaDescription): 2676 oid: str = '1.3.6.1.4.1.1466.115.121.1.37' 2677 schema_cls = ldap0.schema.models.ObjectClass 2678 2679 2680class AttributeTypeDescription(SchemaDescription): 2681 oid: str = '1.3.6.1.4.1.1466.115.121.1.3' 2682 schema_cls = ldap0.schema.models.AttributeType 2683 2684 2685class MatchingRuleDescription(SchemaDescription): 2686 oid: str = '1.3.6.1.4.1.1466.115.121.1.30' 2687 schema_cls = ldap0.schema.models.MatchingRule 2688 2689 2690class MatchingRuleUseDescription(SchemaDescription): 2691 oid: str = '1.3.6.1.4.1.1466.115.121.1.31' 2692 schema_cls = ldap0.schema.models.MatchingRuleUse 2693 2694 2695class LDAPSyntaxDescription(SchemaDescription): 2696 oid: str = '1.3.6.1.4.1.1466.115.121.1.54' 2697 schema_cls = ldap0.schema.models.LDAPSyntax 2698 2699 2700class DITContentRuleDescription(SchemaDescription): 2701 oid: str = '1.3.6.1.4.1.1466.115.121.1.16' 2702 schema_cls = ldap0.schema.models.DITContentRule 2703 2704 2705class DITStructureRuleDescription(SchemaDescription): 2706 oid: str = '1.3.6.1.4.1.1466.115.121.1.17' 2707 schema_cls = ldap0.schema.models.DITStructureRule 2708 2709 2710class NameFormDescription(SchemaDescription): 2711 oid: str = '1.3.6.1.4.1.1466.115.121.1.35' 2712 schema_cls = ldap0.schema.models.NameForm 2713 2714 2715# Set up the central syntax registry instance 2716syntax_registry = SyntaxRegistry() 2717 2718# Register all syntax classes in this module 2719syntax_registry.reg_syntaxes(__name__) 2720