1""" 2ldap.schema.subentry - subschema subentry handling 3 4See https://www.python-ldap.org/ for details. 5""" 6 7import copy 8from urllib.request import urlopen 9 10import ldap.cidict,ldap.schema 11from ldap.schema.models import * 12 13import ldapurl 14import ldif 15 16 17SCHEMA_CLASS_MAPPING = ldap.cidict.cidict() 18SCHEMA_ATTR_MAPPING = {} 19 20for o in list(vars().values()): 21 if hasattr(o,'schema_attribute'): 22 SCHEMA_CLASS_MAPPING[o.schema_attribute] = o 23 SCHEMA_ATTR_MAPPING[o] = o.schema_attribute 24 25SCHEMA_ATTRS = list(SCHEMA_CLASS_MAPPING) 26 27 28class SubschemaError(ValueError): 29 pass 30 31 32class OIDNotUnique(SubschemaError): 33 34 def __init__(self,desc): 35 self.desc = desc 36 37 def __str__(self): 38 return 'OID not unique for %s' % (self.desc) 39 40 41class NameNotUnique(SubschemaError): 42 43 def __init__(self,desc): 44 self.desc = desc 45 46 def __str__(self): 47 return 'NAME not unique for %s' % (self.desc) 48 49 50class SubSchema: 51 """ 52 Arguments: 53 54 sub_schema_sub_entry 55 Dictionary usually returned by LDAP search or the LDIF parser 56 containing the sub schema sub entry 57 58 check_uniqueness 59 Defines whether uniqueness of OIDs and NAME is checked. 60 61 0 62 no check 63 1 64 check but add schema description with work-around 65 2 66 check and raise exception if non-unique OID or NAME is found 67 68 Class attributes: 69 70 sed 71 Dictionary holding the subschema information as pre-parsed 72 SchemaElement objects (do not access directly!) 73 name2oid 74 Dictionary holding the mapping from NAMEs to OIDs 75 (do not access directly!) 76 non_unique_oids 77 List of OIDs used at least twice in the subschema 78 non_unique_names 79 List of NAMEs used at least twice in the subschema for the same schema element 80 """ 81 82 def __init__(self,sub_schema_sub_entry,check_uniqueness=1): 83 84 # Initialize all dictionaries 85 self.name2oid = {} 86 self.sed = {} 87 self.non_unique_oids = {} 88 self.non_unique_names = {} 89 for c in SCHEMA_CLASS_MAPPING.values(): 90 self.name2oid[c] = ldap.cidict.cidict() 91 self.sed[c] = {} 92 self.non_unique_names[c] = ldap.cidict.cidict() 93 94 # Transform entry dict to case-insensitive dict 95 e = ldap.cidict.cidict(sub_schema_sub_entry) 96 97 # Build the schema registry in dictionaries 98 for attr_type in SCHEMA_ATTRS: 99 100 for attr_value in filter(None,e.get(attr_type,[])): 101 102 se_class = SCHEMA_CLASS_MAPPING[attr_type] 103 se_instance = se_class(attr_value) 104 se_id = se_instance.get_id() 105 106 if check_uniqueness and se_id in self.sed[se_class]: 107 self.non_unique_oids[se_id] = None 108 if check_uniqueness==1: 109 # Add to subschema by adding suffix to ID 110 suffix_counter = 1 111 new_se_id = se_id 112 while new_se_id in self.sed[se_class]: 113 new_se_id = ';'.join((se_id,str(suffix_counter))) 114 suffix_counter += 1 115 else: 116 se_id = new_se_id 117 elif check_uniqueness>=2: 118 raise OIDNotUnique(attr_value) 119 120 # Store the schema element instance in the central registry 121 self.sed[se_class][se_id] = se_instance 122 123 if hasattr(se_instance,'names'): 124 for name in ldap.cidict.cidict({}.fromkeys(se_instance.names)): 125 if check_uniqueness and name in self.name2oid[se_class]: 126 self.non_unique_names[se_class][se_id] = None 127 raise NameNotUnique(attr_value) 128 else: 129 self.name2oid[se_class][name] = se_id 130 131 # Turn dict into list maybe more handy for applications 132 self.non_unique_oids = list(self.non_unique_oids) 133 134 return # subSchema.__init__() 135 136 137 def ldap_entry(self): 138 """ 139 Returns a dictionary containing the sub schema sub entry 140 """ 141 # Initialize the dictionary with empty lists 142 entry = {} 143 # Collect the schema elements and store them in 144 # entry's attributes 145 for se_class, elements in self.sed.items(): 146 for se in elements.values(): 147 se_str = str(se) 148 try: 149 entry[SCHEMA_ATTR_MAPPING[se_class]].append(se_str) 150 except KeyError: 151 entry[SCHEMA_ATTR_MAPPING[se_class]] = [ se_str ] 152 return entry 153 154 def listall(self,schema_element_class,schema_element_filters=None): 155 """ 156 Returns a list of OIDs of all available schema 157 elements of a given schema element class. 158 """ 159 avail_se = self.sed[schema_element_class] 160 if schema_element_filters: 161 result = [] 162 for se_key, se in avail_se.items(): 163 for fk,fv in schema_element_filters: 164 try: 165 if getattr(se,fk) in fv: 166 result.append(se_key) 167 except AttributeError: 168 pass 169 else: 170 result = list(avail_se) 171 return result 172 173 174 def tree(self,schema_element_class,schema_element_filters=None): 175 """ 176 Returns a ldap.cidict.cidict dictionary representing the 177 tree structure of the schema elements. 178 """ 179 assert schema_element_class in [ObjectClass,AttributeType] 180 avail_se = self.listall(schema_element_class,schema_element_filters) 181 top_node = '_' 182 tree = ldap.cidict.cidict({top_node:[]}) 183 # 1. Pass: Register all nodes 184 for se in avail_se: 185 tree[se] = [] 186 # 2. Pass: Register all sup references 187 for se_oid in avail_se: 188 se_obj = self.get_obj(schema_element_class,se_oid,None) 189 if se_obj.__class__!=schema_element_class: 190 # Ignore schema elements not matching schema_element_class. 191 # This helps with falsely assigned OIDs. 192 continue 193 assert se_obj.__class__==schema_element_class, \ 194 "Schema element referenced by {} must be of class {} but was {}".format( 195 se_oid,schema_element_class.__name__,se_obj.__class__ 196 ) 197 for s in se_obj.sup or ('_',): 198 sup_oid = self.getoid(schema_element_class,s) 199 try: 200 tree[sup_oid].append(se_oid) 201 except: 202 pass 203 return tree 204 205 206 def getoid(self,se_class,nameoroid,raise_keyerror=0): 207 """ 208 Get an OID by name or OID 209 """ 210 nameoroid_stripped = nameoroid.split(';')[0].strip() 211 if nameoroid_stripped in self.sed[se_class]: 212 # name_or_oid is already a registered OID 213 return nameoroid_stripped 214 else: 215 try: 216 result_oid = self.name2oid[se_class][nameoroid_stripped] 217 except KeyError: 218 if raise_keyerror: 219 raise KeyError('No registered {}-OID for nameoroid {}'.format(se_class.__name__,repr(nameoroid_stripped))) 220 else: 221 result_oid = nameoroid_stripped 222 return result_oid 223 224 225 def get_inheritedattr(self,se_class,nameoroid,name): 226 """ 227 Get a possibly inherited attribute specified by name 228 of a schema element specified by nameoroid. 229 Returns None if class attribute is not set at all. 230 231 Raises KeyError if no schema element is found by nameoroid. 232 """ 233 se = self.sed[se_class][self.getoid(se_class,nameoroid)] 234 try: 235 result = getattr(se,name) 236 except AttributeError: 237 result = None 238 if result is None and se.sup: 239 result = self.get_inheritedattr(se_class,se.sup[0],name) 240 return result 241 242 243 def get_obj(self,se_class,nameoroid,default=None,raise_keyerror=0): 244 """ 245 Get a schema element by name or OID 246 """ 247 se_oid = self.getoid(se_class,nameoroid) 248 try: 249 se_obj = self.sed[se_class][se_oid] 250 except KeyError: 251 if raise_keyerror: 252 raise KeyError('No ldap.schema.{} instance with nameoroid {} and se_oid {}'.format( 253 se_class.__name__,repr(nameoroid),repr(se_oid)) 254 ) 255 else: 256 se_obj = default 257 return se_obj 258 259 260 def get_inheritedobj(self,se_class,nameoroid,inherited=None): 261 """ 262 Get a schema element by name or OID with all class attributes 263 set including inherited class attributes 264 """ 265 inherited = inherited or [] 266 se = copy.copy(self.sed[se_class].get(self.getoid(se_class,nameoroid))) 267 if se and hasattr(se,'sup'): 268 for class_attr_name in inherited: 269 setattr(se,class_attr_name,self.get_inheritedattr(se_class,nameoroid,class_attr_name)) 270 return se 271 272 273 def get_syntax(self,nameoroid): 274 """ 275 Get the syntax of an attribute type specified by name or OID 276 """ 277 at_oid = self.getoid(AttributeType,nameoroid) 278 try: 279 at_obj = self.get_inheritedobj(AttributeType,at_oid) 280 except KeyError: 281 return None 282 else: 283 return at_obj.syntax 284 285 286 def get_structural_oc(self,oc_list): 287 """ 288 Returns OID of structural object class in oc_list 289 if any is present. Returns None else. 290 """ 291 # Get tree of all STRUCTURAL object classes 292 oc_tree = self.tree(ObjectClass,[('kind',[0])]) 293 # Filter all STRUCTURAL object classes 294 struct_ocs = {} 295 for oc_nameoroid in oc_list: 296 oc_se = self.get_obj(ObjectClass,oc_nameoroid,None) 297 if oc_se and oc_se.kind==0: 298 struct_ocs[oc_se.oid] = None 299 result = None 300 # Build a copy of the oid list, to be cleaned as we go. 301 struct_oc_list = list(struct_ocs) 302 while struct_oc_list: 303 oid = struct_oc_list.pop() 304 for child_oid in oc_tree[oid]: 305 if self.getoid(ObjectClass,child_oid) in struct_ocs: 306 break 307 else: 308 result = oid 309 return result 310 311 312 def get_applicable_aux_classes(self,nameoroid): 313 """ 314 Return a list of the applicable AUXILIARY object classes 315 for a STRUCTURAL object class specified by 'nameoroid' 316 if the object class is governed by a DIT content rule. 317 If there's no DIT content rule all available AUXILIARY 318 object classes are returned. 319 """ 320 content_rule = self.get_obj(DITContentRule,nameoroid) 321 if content_rule: 322 # Return AUXILIARY object classes from DITContentRule instance 323 return content_rule.aux 324 else: 325 # list all AUXILIARY object classes 326 return self.listall(ObjectClass,[('kind',[2])]) 327 328 def attribute_types( 329 self,object_class_list,attr_type_filter=None,raise_keyerror=1,ignore_dit_content_rule=0 330 ): 331 """ 332 Returns a 2-tuple of all must and may attributes including 333 all inherited attributes of superior object classes 334 by walking up classes along the SUP attribute. 335 336 The attributes are stored in a ldap.cidict.cidict dictionary. 337 338 object_class_list 339 list of strings specifying object class names or OIDs 340 attr_type_filter 341 list of 2-tuples containing lists of class attributes 342 which has to be matched 343 raise_keyerror 344 All KeyError exceptions for non-existent schema elements 345 are ignored 346 ignore_dit_content_rule 347 A DIT content rule governing the structural object class 348 is ignored 349 """ 350 AttributeType = ldap.schema.AttributeType 351 ObjectClass = ldap.schema.ObjectClass 352 353 # Map object_class_list to object_class_oids (list of OIDs) 354 object_class_oids = [ 355 self.getoid(ObjectClass,o) 356 for o in object_class_list 357 ] 358 # Initialize 359 oid_cache = {} 360 361 r_must,r_may = ldap.cidict.cidict(),ldap.cidict.cidict() 362 if '1.3.6.1.4.1.1466.101.120.111' in object_class_oids: 363 # Object class 'extensibleObject' MAY carry every attribute type 364 for at_obj in self.sed[AttributeType].values(): 365 r_may[at_obj.oid] = at_obj 366 367 # Loop over OIDs of all given object classes 368 while object_class_oids: 369 object_class_oid = object_class_oids.pop(0) 370 # Check whether the objectClass with this OID 371 # has already been processed 372 if object_class_oid in oid_cache: 373 continue 374 # Cache this OID as already being processed 375 oid_cache[object_class_oid] = None 376 try: 377 object_class = self.sed[ObjectClass][object_class_oid] 378 except KeyError: 379 if raise_keyerror: 380 raise 381 # Ignore this object class 382 continue 383 assert isinstance(object_class,ObjectClass) 384 assert hasattr(object_class,'must'),ValueError(object_class_oid) 385 assert hasattr(object_class,'may'),ValueError(object_class_oid) 386 for a in object_class.must: 387 se_oid = self.getoid(AttributeType,a,raise_keyerror=raise_keyerror) 388 r_must[se_oid] = self.get_obj(AttributeType,se_oid,raise_keyerror=raise_keyerror) 389 for a in object_class.may: 390 se_oid = self.getoid(AttributeType,a,raise_keyerror=raise_keyerror) 391 r_may[se_oid] = self.get_obj(AttributeType,se_oid,raise_keyerror=raise_keyerror) 392 393 object_class_oids.extend([ 394 self.getoid(ObjectClass,o) 395 for o in object_class.sup 396 ]) 397 398 # Process DIT content rules 399 if not ignore_dit_content_rule: 400 structural_oc = self.get_structural_oc(object_class_list) 401 if structural_oc: 402 # Process applicable DIT content rule 403 try: 404 dit_content_rule = self.get_obj(DITContentRule,structural_oc,raise_keyerror=1) 405 except KeyError: 406 # Not DIT content rule found for structural objectclass 407 pass 408 else: 409 for a in dit_content_rule.must: 410 se_oid = self.getoid(AttributeType,a,raise_keyerror=raise_keyerror) 411 r_must[se_oid] = self.get_obj(AttributeType,se_oid,raise_keyerror=raise_keyerror) 412 for a in dit_content_rule.may: 413 se_oid = self.getoid(AttributeType,a,raise_keyerror=raise_keyerror) 414 r_may[se_oid] = self.get_obj(AttributeType,se_oid,raise_keyerror=raise_keyerror) 415 for a in dit_content_rule.nots: 416 a_oid = self.getoid(AttributeType,a,raise_keyerror=raise_keyerror) 417 try: 418 del r_may[a_oid] 419 except KeyError: 420 pass 421 422 # Remove all mandantory attribute types from 423 # optional attribute type list 424 for a in list(r_may): 425 if a in r_must: 426 del r_may[a] 427 428 # Apply attr_type_filter to results 429 if attr_type_filter: 430 for l in [r_must,r_may]: 431 for a in list(l): 432 for afk,afv in attr_type_filter: 433 try: 434 schema_attr_type = self.sed[AttributeType][a] 435 except KeyError: 436 if raise_keyerror: 437 raise KeyError('No attribute type found in sub schema by name %s' % (a)) 438 # If there's no schema element for this attribute type 439 # but still KeyError is to be ignored we filter it away 440 del l[a] 441 break 442 else: 443 if not getattr(schema_attr_type,afk) in afv: 444 del l[a] 445 break 446 447 return r_must,r_may # attribute_types() 448 449 450def urlfetch(uri,trace_level=0): 451 """ 452 Fetches a parsed schema entry by uri. 453 454 If uri is a LDAP URL the LDAP server is queried directly. 455 Otherwise uri is assumed to point to a LDIF file which 456 is loaded with urllib. 457 """ 458 uri = uri.strip() 459 if uri.startswith(('ldap:', 'ldaps:', 'ldapi:')): 460 ldap_url = ldapurl.LDAPUrl(uri) 461 462 l=ldap.initialize(ldap_url.initializeUrl(),trace_level) 463 l.protocol_version = ldap.VERSION3 464 l.simple_bind_s(ldap_url.who or '', ldap_url.cred or '') 465 subschemasubentry_dn = l.search_subschemasubentry_s(ldap_url.dn) 466 if subschemasubentry_dn is None: 467 s_temp = None 468 else: 469 if ldap_url.attrs is None: 470 schema_attrs = SCHEMA_ATTRS 471 else: 472 schema_attrs = ldap_url.attrs 473 s_temp = l.read_subschemasubentry_s( 474 subschemasubentry_dn,attrs=schema_attrs 475 ) 476 l.unbind_s() 477 del l 478 else: 479 ldif_file = urlopen(uri) 480 ldif_parser = ldif.LDIFRecordList(ldif_file,max_entries=1) 481 ldif_parser.parse() 482 subschemasubentry_dn,s_temp = ldif_parser.all_records[0] 483 # Work-around for mixed-cased attribute names 484 subschemasubentry_entry = ldap.cidict.cidict() 485 s_temp = s_temp or {} 486 for at,av in s_temp.items(): 487 if at in SCHEMA_CLASS_MAPPING: 488 try: 489 subschemasubentry_entry[at].extend(av) 490 except KeyError: 491 subschemasubentry_entry[at] = av 492 # Finally parse the schema 493 if subschemasubentry_dn!=None: 494 parsed_sub_schema = ldap.schema.SubSchema(subschemasubentry_entry) 495 else: 496 parsed_sub_schema = None 497 return subschemasubentry_dn, parsed_sub_schema 498