1""" 2Automatic tests for python-ldap's module ldap.ldapobject 3 4See https://www.python-ldap.org/ for details. 5""" 6import errno 7import linecache 8import os 9import socket 10import unittest 11import pickle 12 13# Switch off processing .ldaprc or ldap.conf before importing _ldap 14os.environ['LDAPNOINIT'] = '1' 15 16import ldap 17from ldap.ldapobject import SimpleLDAPObject, ReconnectLDAPObject 18 19from slapdtest import SlapdTestCase 20from slapdtest import requires_ldapi, requires_sasl, requires_tls 21from slapdtest import requires_init_fd 22 23 24LDIF_TEMPLATE = """dn: %(suffix)s 25objectClass: dcObject 26objectClass: organization 27dc: %(dc)s 28o: %(dc)s 29 30dn: %(rootdn)s 31objectClass: applicationProcess 32objectClass: simpleSecurityObject 33cn: %(rootcn)s 34userPassword: %(rootpw)s 35 36dn: cn=user1,%(suffix)s 37objectClass: applicationProcess 38objectClass: simpleSecurityObject 39cn: user1 40userPassword: user1_pw 41 42dn: cn=Foo1,%(suffix)s 43objectClass: organizationalRole 44cn: Foo1 45 46dn: cn=Foo2,%(suffix)s 47objectClass: organizationalRole 48cn: Foo2 49 50dn: cn=Foo3,%(suffix)s 51objectClass: organizationalRole 52cn: Foo3 53 54dn: ou=Container,%(suffix)s 55objectClass: organizationalUnit 56ou: Container 57 58dn: cn=Foo4,ou=Container,%(suffix)s 59objectClass: organizationalRole 60cn: Foo4 61 62""" 63 64SCHEMA_TEMPLATE = """dn: cn=mySchema,cn=schema,cn=config 65objectClass: olcSchemaConfig 66cn: mySchema 67olcAttributeTypes: ( 1.3.6.1.4.1.56207.1.1.1 NAME 'myAttribute' 68 DESC 'fobar attribute' 69 EQUALITY caseExactMatch 70 ORDERING caseExactOrderingMatch 71 SUBSTR caseExactSubstringsMatch 72 SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 73 SINGLE-VALUE 74 USAGE userApplications 75 X-ORIGIN 'foobar' ) 76olcObjectClasses: ( 1.3.6.1.4.1.56207.1.2.2 NAME 'myClass' 77 DESC 'foobar objectclass' 78 SUP top 79 STRUCTURAL 80 MUST myAttribute 81 X-ORIGIN 'foobar' )""" 82 83 84class Test00_SimpleLDAPObject(SlapdTestCase): 85 """ 86 test LDAP search operations 87 """ 88 89 ldap_object_class = SimpleLDAPObject 90 91 @classmethod 92 def setUpClass(cls): 93 super().setUpClass() 94 # insert some Foo* objects via ldapadd 95 cls.server.ldapadd( 96 LDIF_TEMPLATE % { 97 'suffix':cls.server.suffix, 98 'rootdn':cls.server.root_dn, 99 'rootcn':cls.server.root_cn, 100 'rootpw':cls.server.root_pw, 101 'dc': cls.server.suffix.split(',')[0][3:], 102 } 103 ) 104 105 def setUp(self): 106 try: 107 self._ldap_conn 108 except AttributeError: 109 # open local LDAP connection 110 self._ldap_conn = self._open_ldap_conn(bytes_mode=False) 111 112 def tearDown(self): 113 del self._ldap_conn 114 115 def reset_connection(self): 116 try: 117 del self._ldap_conn 118 except AttributeError: 119 pass 120 121 self._ldap_conn = self._open_ldap_conn(bytes_mode=False) 122 123 def test_reject_bytes_base(self): 124 base = self.server.suffix 125 l = self._ldap_conn 126 127 with self.assertRaises(TypeError) as e: 128 l.search_s( 129 base.encode('utf-8'), ldap.SCOPE_SUBTREE, '(cn=Foo*)', ['*'] 130 ) 131 # Python 3.4.x does not include 'search_ext()' in message 132 self.assertEqual( 133 "search_ext() argument 1 must be str, not bytes", 134 str(e.exception) 135 ) 136 137 with self.assertRaises(TypeError) as e: 138 l.search_s( 139 base, ldap.SCOPE_SUBTREE, b'(cn=Foo*)', ['*'] 140 ) 141 self.assertEqual( 142 "search_ext() argument 3 must be str, not bytes", 143 str(e.exception) 144 ) 145 146 with self.assertRaises(TypeError) as e: 147 l.search_s( 148 base, ldap.SCOPE_SUBTREE, '(cn=Foo*)', [b'*'] 149 ) 150 self.assertEqual( 151 ('attrs_from_List(): expected string in list', b'*'), 152 e.exception.args 153 ) 154 155 def test_search_keys_are_text(self): 156 base = self.server.suffix 157 l = self._ldap_conn 158 result = l.search_s(base, ldap.SCOPE_SUBTREE, '(cn=Foo*)', ['*']) 159 result.sort() 160 dn, fields = result[0] 161 self.assertEqual(dn, 'cn=Foo1,%s' % base) 162 self.assertEqual(type(dn), str) 163 for key, values in fields.items(): 164 self.assertEqual(type(key), str) 165 for value in values: 166 self.assertEqual(type(value), bytes) 167 168 def test_search_accepts_unicode_dn(self): 169 base = self.server.suffix 170 l = self._ldap_conn 171 172 with self.assertRaises(ldap.NO_SUCH_OBJECT): 173 result = l.search_s("CN=abc\U0001f498def", ldap.SCOPE_SUBTREE) 174 175 def test_filterstr_accepts_unicode(self): 176 l = self._ldap_conn 177 base = self.server.suffix 178 result = l.search_s(base, ldap.SCOPE_SUBTREE, '(cn=abc\U0001f498def)', ['*']) 179 self.assertEqual(result, []) 180 181 def test_attrlist_accepts_unicode(self): 182 base = self.server.suffix 183 result = self._ldap_conn.search_s( 184 base, ldap.SCOPE_SUBTREE, 185 '(cn=Foo*)', ['abc', 'abc\U0001f498def']) 186 result.sort() 187 188 for dn, attrs in result: 189 self.assertIsInstance(dn, str) 190 self.assertEqual(attrs, {}) 191 192 def test001_search_subtree(self): 193 result = self._ldap_conn.search_s( 194 self.server.suffix, 195 ldap.SCOPE_SUBTREE, 196 '(cn=Foo*)', 197 attrlist=['*'], 198 ) 199 result.sort() 200 self.assertEqual( 201 result, 202 [ 203 ( 204 'cn=Foo1,'+self.server.suffix, 205 {'cn': [b'Foo1'], 'objectClass': [b'organizationalRole']} 206 ), 207 ( 208 'cn=Foo2,'+self.server.suffix, 209 {'cn': [b'Foo2'], 'objectClass': [b'organizationalRole']} 210 ), 211 ( 212 'cn=Foo3,'+self.server.suffix, 213 {'cn': [b'Foo3'], 'objectClass': [b'organizationalRole']} 214 ), 215 ( 216 'cn=Foo4,ou=Container,'+self.server.suffix, 217 {'cn': [b'Foo4'], 'objectClass': [b'organizationalRole']} 218 ), 219 ] 220 ) 221 222 def test002_search_onelevel(self): 223 result = self._ldap_conn.search_s( 224 self.server.suffix, 225 ldap.SCOPE_ONELEVEL, 226 '(cn=Foo*)', 227 ['*'], 228 ) 229 result.sort() 230 self.assertEqual( 231 result, 232 [ 233 ( 234 'cn=Foo1,'+self.server.suffix, 235 {'cn': [b'Foo1'], 'objectClass': [b'organizationalRole']} 236 ), 237 ( 238 'cn=Foo2,'+self.server.suffix, 239 {'cn': [b'Foo2'], 'objectClass': [b'organizationalRole']} 240 ), 241 ( 242 'cn=Foo3,'+self.server.suffix, 243 {'cn': [b'Foo3'], 'objectClass': [b'organizationalRole']} 244 ), 245 ] 246 ) 247 248 def test003_search_oneattr(self): 249 result = self._ldap_conn.search_s( 250 self.server.suffix, 251 ldap.SCOPE_SUBTREE, 252 '(cn=Foo4)', 253 ['cn'], 254 ) 255 result.sort() 256 self.assertEqual( 257 result, 258 [('cn=Foo4,ou=Container,'+self.server.suffix, {'cn': [b'Foo4']})] 259 ) 260 261 def test_find_unique_entry(self): 262 result = self._ldap_conn.find_unique_entry( 263 self.server.suffix, 264 ldap.SCOPE_SUBTREE, 265 '(cn=Foo4)', 266 ['cn'], 267 ) 268 self.assertEqual( 269 result, 270 ('cn=Foo4,ou=Container,'+self.server.suffix, {'cn': [b'Foo4']}) 271 ) 272 with self.assertRaises(ldap.SIZELIMIT_EXCEEDED): 273 # > 2 entries returned 274 self._ldap_conn.find_unique_entry( 275 self.server.suffix, 276 ldap.SCOPE_ONELEVEL, 277 '(cn=Foo*)', 278 ['*'], 279 ) 280 with self.assertRaises(ldap.NO_UNIQUE_ENTRY): 281 # 0 entries returned 282 self._ldap_conn.find_unique_entry( 283 self.server.suffix, 284 ldap.SCOPE_ONELEVEL, 285 '(cn=Bar*)', 286 ['*'], 287 ) 288 289 def test_search_subschema(self): 290 l = self._ldap_conn 291 dn = l.search_subschemasubentry_s() 292 self.assertIsInstance(dn, str) 293 self.assertEqual(dn, "cn=Subschema") 294 subschema = l.read_subschemasubentry_s(dn) 295 self.assertIsInstance(subschema, dict) 296 self.assertEqual( 297 sorted(subschema), 298 [ 299 'attributeTypes', 300 'ldapSyntaxes', 301 'matchingRuleUse', 302 'matchingRules', 303 'objectClasses' 304 ] 305 ) 306 307 def test004_enotconn(self): 308 l = self.ldap_object_class('ldap://127.0.0.1:42') 309 try: 310 m = l.simple_bind_s("", "") 311 r = l.result4(m, ldap.MSG_ALL, self.timeout) 312 except ldap.SERVER_DOWN as ldap_err: 313 errno_val = ldap_err.args[0]['errno'] 314 if errno_val != errno.ENOTCONN: 315 self.fail("expected errno=%d, got %d" 316 % (errno.ENOTCONN, errno_val)) 317 info = ldap_err.args[0]['info'] 318 expected_info = os.strerror(errno.ENOTCONN) 319 if info != expected_info: 320 self.fail(f"expected info={expected_info!r}, got {info!r}") 321 else: 322 self.fail("expected SERVER_DOWN, got %r" % r) 323 324 def test005_invalid_credentials(self): 325 l = self.ldap_object_class(self.server.ldap_uri) 326 # search with invalid filter 327 try: 328 m = l.simple_bind(self.server.root_dn, self.server.root_pw+'wrong') 329 r = l.result4(m, ldap.MSG_ALL) 330 except ldap.INVALID_CREDENTIALS: 331 pass 332 else: 333 self.fail("expected INVALID_CREDENTIALS, got %r" % r) 334 335 @requires_sasl() 336 @requires_ldapi() 337 def test006_sasl_external_bind_s(self): 338 l = self.ldap_object_class(self.server.ldapi_uri) 339 l.sasl_external_bind_s() 340 self.assertEqual(l.whoami_s(), 'dn:'+self.server.root_dn.lower()) 341 authz_id = 'dn:cn=Foo2,%s' % (self.server.suffix) 342 l = self.ldap_object_class(self.server.ldapi_uri) 343 l.sasl_external_bind_s(authz_id=authz_id) 344 self.assertEqual(l.whoami_s(), authz_id.lower()) 345 346 @requires_sasl() 347 @requires_ldapi() 348 def test006_sasl_options(self): 349 l = self.ldap_object_class(self.server.ldapi_uri) 350 351 minssf = l.get_option(ldap.OPT_X_SASL_SSF_MIN) 352 self.assertGreaterEqual(minssf, 0) 353 self.assertLessEqual(minssf, 256) 354 maxssf = l.get_option(ldap.OPT_X_SASL_SSF_MAX) 355 self.assertGreaterEqual(maxssf, 0) 356 # libldap sets SSF_MAX to INT_MAX 357 self.assertLessEqual(maxssf, 2**31 - 1) 358 359 l.set_option(ldap.OPT_X_SASL_SSF_MIN, 56) 360 l.set_option(ldap.OPT_X_SASL_SSF_MAX, 256) 361 self.assertEqual(l.get_option(ldap.OPT_X_SASL_SSF_MIN), 56) 362 self.assertEqual(l.get_option(ldap.OPT_X_SASL_SSF_MAX), 256) 363 364 l.sasl_external_bind_s() 365 with self.assertRaisesRegex(ValueError, "write-only option"): 366 l.get_option(ldap.OPT_X_SASL_SSF_EXTERNAL) 367 l.set_option(ldap.OPT_X_SASL_SSF_EXTERNAL, 256) 368 self.assertEqual(l.whoami_s(), 'dn:' + self.server.root_dn.lower()) 369 370 def test007_timeout(self): 371 l = self.ldap_object_class(self.server.ldap_uri) 372 m = l.search_ext(self.server.suffix, ldap.SCOPE_SUBTREE, '(objectClass=*)') 373 l.abandon(m) 374 with self.assertRaises(ldap.TIMEOUT): 375 result = l.result(m, timeout=0.001) 376 377 def assertIsSubclass(self, cls, other): 378 self.assertTrue( 379 issubclass(cls, other), 380 cls.__mro__ 381 ) 382 383 def test_simple_bind_noarg(self): 384 l = self.ldap_object_class(self.server.ldap_uri) 385 l.simple_bind_s() 386 self.assertEqual(l.whoami_s(), '') 387 l = self.ldap_object_class(self.server.ldap_uri) 388 l.simple_bind_s(None, None) 389 self.assertEqual(l.whoami_s(), '') 390 391 def _check_byteswarning(self, warning, expected_message): 392 self.assertIs(warning.category, ldap.LDAPBytesWarning) 393 self.assertIn(expected_message, str(warning.message)) 394 395 def _normalize(filename): 396 # Python 2 likes to report the ".pyc" file in warnings, 397 # tracebacks or __file__. 398 # Use the corresponding ".py" in that case. 399 if filename.endswith('.pyc'): 400 return filename[:-1] 401 return filename 402 403 # Assert warning points to a line marked CORRECT LINE in this file 404 self.assertEquals(_normalize(warning.filename), _normalize(__file__)) 405 self.assertIn( 406 'CORRECT LINE', 407 linecache.getline(warning.filename, warning.lineno) 408 ) 409 410 @requires_tls() 411 def test_multiple_starttls(self): 412 # Test for openldap does not re-register nss shutdown callbacks 413 # after nss_Shutdown is called 414 # https://github.com/python-ldap/python-ldap/issues/60 415 # https://bugzilla.redhat.com/show_bug.cgi?id=1520990 416 for _ in range(10): 417 l = self.ldap_object_class(self.server.ldap_uri) 418 l.set_option(ldap.OPT_X_TLS_CACERTFILE, self.server.cafile) 419 l.set_option(ldap.OPT_X_TLS_NEWCTX, 0) 420 l.start_tls_s() 421 l.simple_bind_s(self.server.root_dn, self.server.root_pw) 422 self.assertEqual(l.whoami_s(), 'dn:' + self.server.root_dn) 423 424 def test_dse(self): 425 dse = self._ldap_conn.read_rootdse_s() 426 self.assertIsInstance(dse, dict) 427 self.assertEqual(dse['supportedLDAPVersion'], [b'3']) 428 keys = set(dse) 429 # SASL info may be missing in restricted build environments 430 keys.discard('supportedSASLMechanisms') 431 self.assertEqual( 432 keys, 433 {'configContext', 'entryDN', 'namingContexts', 'objectClass', 434 'structuralObjectClass', 'subschemaSubentry', 435 'supportedControl', 'supportedExtension', 'supportedFeatures', 436 'supportedLDAPVersion'} 437 ) 438 self.assertEqual( 439 self._ldap_conn.get_naming_contexts(), 440 [self.server.suffix.encode('utf-8')] 441 ) 442 443 def test_compare_s_true(self): 444 base = self.server.suffix 445 l = self._ldap_conn 446 result = l.compare_s('cn=Foo1,%s' % base, 'cn', b'Foo1') 447 self.assertIs(result, True) 448 449 def test_compare_s_false(self): 450 base = self.server.suffix 451 l = self._ldap_conn 452 result = l.compare_s('cn=Foo1,%s' % base, 'cn', b'Foo2') 453 self.assertIs(result, False) 454 455 def test_compare_s_notfound(self): 456 base = self.server.suffix 457 l = self._ldap_conn 458 with self.assertRaises(ldap.NO_SUCH_OBJECT): 459 result = l.compare_s('cn=invalid,%s' % base, 'cn', b'Foo2') 460 461 def test_compare_s_invalidattr(self): 462 base = self.server.suffix 463 l = self._ldap_conn 464 with self.assertRaises(ldap.UNDEFINED_TYPE): 465 result = l.compare_s('cn=Foo1,%s' % base, 'invalidattr', b'invalid') 466 467 def test_compare_true_exception_contains_message_id(self): 468 base = self.server.suffix 469 l = self._ldap_conn 470 msgid = l.compare('cn=Foo1,%s' % base, 'cn', b'Foo1') 471 with self.assertRaises(ldap.COMPARE_TRUE) as cm: 472 l.result() 473 self.assertEqual(cm.exception.args[0]["msgid"], msgid) 474 475 def test_async_search_no_such_object_exception_contains_message_id(self): 476 msgid = self._ldap_conn.search("CN=XXX", ldap.SCOPE_SUBTREE) 477 with self.assertRaises(ldap.NO_SUCH_OBJECT) as cm: 478 self._ldap_conn.result() 479 self.assertEqual(cm.exception.args[0]["msgid"], msgid) 480 481 def test_passwd_s(self): 482 l = self._ldap_conn 483 484 # first, create a user to change password on 485 dn = "cn=PasswordTest," + self.server.suffix 486 result, pmsg, msgid, ctrls = l.add_ext_s( 487 dn, 488 [ 489 ('objectClass', b'person'), 490 ('sn', b'PasswordTest'), 491 ('cn', b'PasswordTest'), 492 ('userPassword', b'initial'), 493 ] 494 ) 495 self.assertEqual(result, ldap.RES_ADD) 496 self.assertIsInstance(msgid, int) 497 self.assertEqual(pmsg, []) 498 self.assertEqual(ctrls, []) 499 500 # try changing password with a wrong old-pw 501 with self.assertRaises(ldap.UNWILLING_TO_PERFORM): 502 l.passwd_s(dn, "bogus", "ignored") 503 504 # have the server generate a new random pw 505 respoid, respvalue = l.passwd_s(dn, "initial", None, extract_newpw=True) 506 self.assertEqual(respoid, None) 507 508 password = respvalue.genPasswd 509 self.assertIsInstance(password, bytes) 510 511 # try changing password back 512 respoid, respvalue = l.passwd_s(dn, password, "initial") 513 self.assertEqual(respoid, None) 514 self.assertEqual(respvalue, None) 515 516 l.delete_s(dn) 517 518 def test_slapadd(self): 519 with self.assertRaises(ldap.INVALID_DN_SYNTAX): 520 self._ldap_conn.add_s("myAttribute=foobar,ou=Container,%s" % self.server.suffix, [ 521 ("objectClass", b'myClass'), 522 ("myAttribute", b'foobar'), 523 ]) 524 525 self.server.slapadd(SCHEMA_TEMPLATE, ["-n0"]) 526 self.server.restart() 527 self.reset_connection() 528 529 self._ldap_conn.add_s("myAttribute=foobar,ou=Container,%s" % self.server.suffix, [ 530 ("objectClass", b'myClass'), 531 ("myAttribute", b'foobar'), 532 ]) 533 534 535class Test01_ReconnectLDAPObject(Test00_SimpleLDAPObject): 536 """ 537 test ReconnectLDAPObject by restarting slapd 538 """ 539 540 ldap_object_class = ReconnectLDAPObject 541 542 @requires_sasl() 543 @requires_ldapi() 544 def test101_reconnect_sasl_external(self): 545 l = self.ldap_object_class(self.server.ldapi_uri) 546 l.sasl_external_bind_s() 547 authz_id = l.whoami_s() 548 self.assertEqual(authz_id, 'dn:'+self.server.root_dn.lower()) 549 self.server.restart() 550 self.assertEqual(l.whoami_s(), authz_id) 551 552 def test102_reconnect_simple_bind(self): 553 l = self.ldap_object_class(self.server.ldap_uri) 554 bind_dn = 'cn=user1,'+self.server.suffix 555 l.simple_bind_s(bind_dn, 'user1_pw') 556 self.assertEqual(l.whoami_s(), 'dn:'+bind_dn) 557 self.server.restart() 558 self.assertEqual(l.whoami_s(), 'dn:'+bind_dn) 559 560 def test103_reconnect_get_state(self): 561 l1 = self.ldap_object_class(self.server.ldap_uri) 562 bind_dn = 'cn=user1,'+self.server.suffix 563 l1.simple_bind_s(bind_dn, 'user1_pw') 564 self.assertEqual(l1.whoami_s(), 'dn:'+bind_dn) 565 self.assertEqual( 566 l1.__getstate__(), 567 { 568 '_last_bind': ( 569 'simple_bind_s', 570 (bind_dn, 'user1_pw'), 571 {} 572 ), 573 '_options': [(17, 3)], 574 '_reconnects_done': 0, 575 '_retry_delay': 60.0, 576 '_retry_max': 1, 577 '_start_tls': 0, 578 '_trace_level': ldap._trace_level, 579 '_trace_stack_limit': 5, 580 '_uri': self.server.ldap_uri, 581 'timeout': -1, 582 }, 583 ) 584 585 def test104_reconnect_restore(self): 586 l1 = self.ldap_object_class(self.server.ldap_uri) 587 bind_dn = 'cn=user1,'+self.server.suffix 588 l1.simple_bind_s(bind_dn, 'user1_pw') 589 self.assertEqual(l1.whoami_s(), 'dn:'+bind_dn) 590 l1_state = pickle.dumps(l1) 591 del l1 592 l2 = pickle.loads(l1_state) 593 self.assertEqual(l2.whoami_s(), 'dn:'+bind_dn) 594 595 def test105_reconnect_restore(self): 596 l1 = self.ldap_object_class(self.server.ldap_uri, retry_max=2, retry_delay=1) 597 bind_dn = 'cn=user1,'+self.server.suffix 598 l1.simple_bind_s(bind_dn, 'user1_pw') 599 self.assertEqual(l1.whoami_s(), 'dn:'+bind_dn) 600 self.server._proc.terminate() 601 self.server.wait() 602 try: 603 l1.whoami_s() 604 except ldap.SERVER_DOWN: 605 pass 606 else: 607 self.assertEqual(True, False) 608 finally: 609 self.server._start_slapd() 610 self.assertEqual(l1.whoami_s(), 'dn:'+bind_dn) 611 612 613@requires_init_fd() 614class Test03_SimpleLDAPObjectWithFileno(Test00_SimpleLDAPObject): 615 def _open_ldap_conn(self, who=None, cred=None, **kwargs): 616 if hasattr(self, '_sock'): 617 raise RuntimeError("socket already connected") 618 self._sock = socket.create_connection( 619 (self.server.hostname, self.server.port) 620 ) 621 return super()._open_ldap_conn( 622 who=who, cred=cred, fileno=self._sock.fileno(), **kwargs 623 ) 624 625 def tearDown(self): 626 self._sock.close() 627 del self._sock 628 super().tearDown() 629 630 def reset_connection(self): 631 self._sock.close() 632 del self._sock 633 super(Test03_SimpleLDAPObjectWithFileno, self).reset_connection() 634 635 636if __name__ == '__main__': 637 unittest.main() 638