1""" 2slapdtest - module for spawning test instances of OpenLDAP's slapd server 3 4See https://www.python-ldap.org/ for details. 5""" 6import os 7import socket 8import sys 9import time 10import subprocess 11import logging 12import atexit 13from logging.handlers import SysLogHandler 14import unittest 15from shutil import which 16from urllib.parse import quote_plus 17 18# Switch off processing .ldaprc or ldap.conf before importing _ldap 19os.environ['LDAPNOINIT'] = '1' 20 21import ldap 22 23HERE = os.path.abspath(os.path.dirname(__file__)) 24 25# a template string for generating simple slapd.d file 26SLAPD_CONF_TEMPLATE = r"""dn: cn=config 27objectClass: olcGlobal 28cn: config 29olcServerID: %(serverid)s 30olcLogLevel: %(loglevel)s 31olcAllows: bind_v2 32olcAuthzRegexp: {0}"gidnumber=%(root_gid)s\+uidnumber=%(root_uid)s,cn=peercred,cn=external,cn=auth" "%(rootdn)s" 33olcAuthzRegexp: {1}"C=DE, O=python-ldap, OU=slapd-test, CN=([A-Za-z]+)" "ldap://ou=people,dc=local???($1)" 34olcTLSCACertificateFile: %(cafile)s 35olcTLSCertificateFile: %(servercert)s 36olcTLSCertificateKeyFile: %(serverkey)s 37olcTLSVerifyClient: try 38 39dn: cn=module,cn=config 40objectClass: olcModuleList 41cn: module 42olcModuleLoad: back_%(database)s 43 44dn: olcDatabase=%(database)s,cn=config 45objectClass: olcDatabaseConfig 46objectClass: olcMdbConfig 47olcDatabase: %(database)s 48olcSuffix: %(suffix)s 49olcRootDN: %(rootdn)s 50olcRootPW: %(rootpw)s 51olcDbDirectory: %(directory)s 52""" 53 54LOCALHOST = '127.0.0.1' 55 56CI_DISABLED = set(os.environ.get('CI_DISABLED', '').split(':')) 57if 'LDAPI' in CI_DISABLED: 58 HAVE_LDAPI = False 59else: 60 HAVE_LDAPI = hasattr(socket, 'AF_UNIX') 61 62 63def identity(test_item): 64 """Identity decorator 65 66 """ 67 return test_item 68 69 70def skip_unless_ci(reason, feature=None): 71 """Skip test unless test case is executed on CI like Travis CI 72 """ 73 if not os.environ.get('CI', False): 74 return unittest.skip(reason) 75 elif feature in CI_DISABLED: 76 return unittest.skip(reason) 77 else: 78 # Don't skip on Travis 79 return identity 80 81 82def requires_tls(): 83 """Decorator for TLS tests 84 85 Tests are not skipped on CI (e.g. Travis CI) 86 """ 87 if not ldap.TLS_AVAIL: 88 return skip_unless_ci("test needs ldap.TLS_AVAIL", feature='TLS') 89 else: 90 return identity 91 92 93def requires_sasl(): 94 if not ldap.SASL_AVAIL: 95 return skip_unless_ci( 96 "test needs ldap.SASL_AVAIL", feature='SASL') 97 else: 98 return identity 99 100 101def requires_ldapi(): 102 if not HAVE_LDAPI: 103 return skip_unless_ci( 104 "test needs ldapi support (AF_UNIX)", feature='LDAPI') 105 else: 106 return identity 107 108def requires_init_fd(): 109 if not ldap.INIT_FD_AVAIL: 110 return skip_unless_ci( 111 "test needs ldap.INIT_FD", feature='INIT_FD') 112 else: 113 return identity 114 115 116def _add_sbin(path): 117 """Add /sbin and related directories to a command search path""" 118 directories = path.split(os.pathsep) 119 if sys.platform != 'win32': 120 for sbin in '/usr/local/sbin', '/sbin', '/usr/sbin': 121 if sbin not in directories: 122 directories.append(sbin) 123 return os.pathsep.join(directories) 124 125def combined_logger( 126 log_name, 127 log_level=logging.WARN, 128 sys_log_format='%(levelname)s %(message)s', 129 console_log_format='%(asctime)s %(levelname)s %(message)s', 130 ): 131 """ 132 Returns a combined SysLogHandler/StreamHandler logging instance 133 with formatters 134 """ 135 if 'LOGLEVEL' in os.environ: 136 log_level = os.environ['LOGLEVEL'] 137 try: 138 log_level = int(log_level) 139 except ValueError: 140 pass 141 # for writing to syslog 142 new_logger = logging.getLogger(log_name) 143 if sys_log_format and os.path.exists('/dev/log'): 144 my_syslog_formatter = logging.Formatter( 145 fmt=' '.join((log_name, sys_log_format))) 146 my_syslog_handler = logging.handlers.SysLogHandler( 147 address='/dev/log', 148 facility=SysLogHandler.LOG_DAEMON, 149 ) 150 my_syslog_handler.setFormatter(my_syslog_formatter) 151 new_logger.addHandler(my_syslog_handler) 152 if console_log_format: 153 my_stream_formatter = logging.Formatter(fmt=console_log_format) 154 my_stream_handler = logging.StreamHandler() 155 my_stream_handler.setFormatter(my_stream_formatter) 156 new_logger.addHandler(my_stream_handler) 157 new_logger.setLevel(log_level) 158 return new_logger # end of combined_logger() 159 160 161class SlapdObject: 162 """ 163 Controller class for a slapd instance, OpenLDAP's server. 164 165 This class creates a temporary data store for slapd, runs it 166 listening on a private Unix domain socket and TCP port, 167 and initializes it with a top-level entry and the root user. 168 169 When a reference to an instance of this class is lost, the slapd 170 server is shut down. 171 172 An instance can be used as a context manager. When exiting the context 173 manager, the slapd server is shut down and the temporary data store is 174 removed. 175 176 :param openldap_schema_files: A list of schema names or schema paths to 177 load at startup. By default this only contains `core`. 178 179 .. versionchanged:: 3.1 180 181 Added context manager functionality 182 """ 183 slapd_conf_template = SLAPD_CONF_TEMPLATE 184 database = 'mdb' 185 suffix = 'dc=slapd-test,dc=python-ldap,dc=org' 186 root_cn = 'Manager' 187 root_pw = 'password' 188 slapd_loglevel = 'stats stats2' 189 local_host = LOCALHOST 190 testrunsubdirs = ( 191 'slapd.d', 192 ) 193 openldap_schema_files = ( 194 'core.ldif', 195 ) 196 197 TMPDIR = os.environ.get('TMP', os.getcwd()) 198 if 'SCHEMA' in os.environ: 199 SCHEMADIR = os.environ['SCHEMA'] 200 elif os.path.isdir("/etc/openldap/schema"): 201 SCHEMADIR = "/etc/openldap/schema" 202 elif os.path.isdir("/etc/ldap/schema"): 203 SCHEMADIR = "/etc/ldap/schema" 204 else: 205 SCHEMADIR = None 206 207 BIN_PATH = os.environ.get('BIN', os.environ.get('PATH', os.defpath)) 208 SBIN_PATH = os.environ.get('SBIN', _add_sbin(BIN_PATH)) 209 210 # create loggers once, multiple calls mess up refleak tests 211 _log = combined_logger('python-ldap-test') 212 213 def __init__(self): 214 self._proc = None 215 self._port = self._avail_tcp_port() 216 self.server_id = self._port % 4096 217 self.testrundir = os.path.join(self.TMPDIR, 'python-ldap-test-%d' % self._port) 218 self._slapd_conf = os.path.join(self.testrundir, 'slapd.d') 219 self._db_directory = os.path.join(self.testrundir, "openldap-data") 220 self.ldap_uri = "ldap://%s:%d/" % (self.local_host, self._port) 221 if HAVE_LDAPI: 222 ldapi_path = os.path.join(self.testrundir, 'ldapi') 223 self.ldapi_uri = "ldapi://%s" % quote_plus(ldapi_path) 224 self.default_ldap_uri = self.ldapi_uri 225 # use SASL/EXTERNAL via LDAPI when invoking OpenLDAP CLI tools 226 self.cli_sasl_external = ldap.SASL_AVAIL 227 else: 228 self.ldapi_uri = None 229 self.default_ldap_uri = self.ldap_uri 230 # Use simple bind via LDAP uri 231 self.cli_sasl_external = False 232 233 self._find_commands() 234 235 if self.SCHEMADIR is None: 236 raise ValueError('SCHEMADIR is None, ldap schemas are missing.') 237 238 # TLS certs 239 self.cafile = os.path.join(HERE, 'certs/ca.pem') 240 self.servercert = os.path.join(HERE, 'certs/server.pem') 241 self.serverkey = os.path.join(HERE, 'certs/server.key') 242 self.clientcert = os.path.join(HERE, 'certs/client.pem') 243 self.clientkey = os.path.join(HERE, 'certs/client.key') 244 245 @property 246 def root_dn(self): 247 return 'cn={self.root_cn},{self.suffix}'.format(self=self) 248 249 @property 250 def hostname(self): 251 return self.local_host 252 253 @property 254 def port(self): 255 return self._port 256 257 def _find_commands(self): 258 self.PATH_LDAPADD = self._find_command('ldapadd') 259 self.PATH_LDAPDELETE = self._find_command('ldapdelete') 260 self.PATH_LDAPMODIFY = self._find_command('ldapmodify') 261 self.PATH_LDAPWHOAMI = self._find_command('ldapwhoami') 262 self.PATH_SLAPADD = self._find_command('slapadd') 263 264 self.PATH_SLAPD = os.environ.get('SLAPD', None) 265 if not self.PATH_SLAPD: 266 self.PATH_SLAPD = self._find_command('slapd', in_sbin=True) 267 268 def _find_command(self, cmd, in_sbin=False): 269 if in_sbin: 270 path = self.SBIN_PATH 271 var_name = 'SBIN' 272 else: 273 path = self.BIN_PATH 274 var_name = 'BIN' 275 command = which(cmd, path=path) 276 if command is None: 277 raise ValueError( 278 "Command '{}' not found. Set the {} environment variable to " 279 "override slapdtest's search path.".format(cmd, var_name) 280 ) 281 return command 282 283 def setup_rundir(self): 284 """ 285 creates rundir structure 286 287 for setting up a custom directory structure you have to override 288 this method 289 """ 290 os.mkdir(self.testrundir) 291 os.mkdir(self._db_directory) 292 self._create_sub_dirs(self.testrunsubdirs) 293 294 def _cleanup_rundir(self): 295 """ 296 Recursively delete whole directory specified by `path' 297 """ 298 # cleanup_rundir() is called in atexit handler. Until Python 3.4, 299 # the rest of the world is already destroyed. 300 import os, os.path 301 if not os.path.exists(self.testrundir): 302 return 303 self._log.debug('clean-up %s', self.testrundir) 304 for dirpath, dirnames, filenames in os.walk( 305 self.testrundir, 306 topdown=False 307 ): 308 for filename in filenames: 309 self._log.debug('remove %s', os.path.join(dirpath, filename)) 310 os.remove(os.path.join(dirpath, filename)) 311 for dirname in dirnames: 312 self._log.debug('rmdir %s', os.path.join(dirpath, dirname)) 313 os.rmdir(os.path.join(dirpath, dirname)) 314 os.rmdir(self.testrundir) 315 self._log.info('cleaned-up %s', self.testrundir) 316 317 def _avail_tcp_port(self): 318 """ 319 find an available port for TCP connection 320 """ 321 sock = socket.socket() 322 try: 323 sock.bind((self.local_host, 0)) 324 port = sock.getsockname()[1] 325 finally: 326 sock.close() 327 self._log.info('Found available port %d', port) 328 return port 329 330 def gen_config(self): 331 """ 332 generates a slapd.conf and returns it as one string 333 334 for generating specific static configuration files you have to 335 override this method 336 """ 337 config_dict = { 338 'serverid': hex(self.server_id), 339 'loglevel': self.slapd_loglevel, 340 'database': self.database, 341 'directory': self._db_directory, 342 'suffix': self.suffix, 343 'rootdn': self.root_dn, 344 'rootpw': self.root_pw, 345 'root_uid': os.getuid(), 346 'root_gid': os.getgid(), 347 'cafile': self.cafile, 348 'servercert': self.servercert, 349 'serverkey': self.serverkey, 350 } 351 return self.slapd_conf_template % config_dict 352 353 def _create_sub_dirs(self, dir_names): 354 """ 355 create sub-directories beneath self.testrundir 356 """ 357 for dname in dir_names: 358 dir_name = os.path.join(self.testrundir, dname) 359 self._log.debug('Create directory %s', dir_name) 360 os.mkdir(dir_name) 361 362 def _write_config(self): 363 """Loads the slapd.d configuration.""" 364 self._log.debug("importing configuration: %s", self._slapd_conf) 365 366 self.slapadd(self.gen_config(), ["-n0"]) 367 ldif_paths = [ 368 schema 369 if os.path.exists(schema) 370 else os.path.join(self.SCHEMADIR, schema) 371 for schema in self.openldap_schema_files 372 ] 373 for ldif_path in ldif_paths: 374 self.slapadd(None, ["-n0", "-l", ldif_path]) 375 376 self._log.debug("import ok: %s", self._slapd_conf) 377 378 def _test_config(self): 379 self._log.debug('testing config %s', self._slapd_conf) 380 popen_list = [ 381 self.PATH_SLAPD, 382 "-Ttest", 383 "-F", self._slapd_conf, 384 "-u", 385 "-v", 386 "-d", "config" 387 ] 388 p = subprocess.run( 389 popen_list, 390 stdout=subprocess.PIPE, 391 stderr=subprocess.STDOUT 392 ) 393 if p.returncode != 0: 394 self._log.error(p.stdout.decode("utf-8")) 395 raise RuntimeError("configuration test failed") 396 self._log.info("config ok: %s", self._slapd_conf) 397 398 def _start_slapd(self): 399 """ 400 Spawns/forks the slapd process 401 """ 402 urls = [self.ldap_uri] 403 if self.ldapi_uri: 404 urls.append(self.ldapi_uri) 405 slapd_args = [ 406 self.PATH_SLAPD, 407 '-F', self._slapd_conf, 408 '-h', ' '.join(urls), 409 ] 410 if self._log.isEnabledFor(logging.DEBUG): 411 slapd_args.extend(['-d', '-1']) 412 else: 413 slapd_args.extend(['-d', '0']) 414 self._log.info('starting slapd: %r', ' '.join(slapd_args)) 415 self._proc = subprocess.Popen(slapd_args) 416 # Waits until the LDAP server socket is open, or slapd crashed 417 deadline = time.monotonic() + 10 418 # no cover to avoid spurious coverage changes, see 419 # https://github.com/python-ldap/python-ldap/issues/127 420 while True: # pragma: no cover 421 if self._proc.poll() is not None: 422 self._stopped() 423 raise RuntimeError("slapd exited before opening port") 424 try: 425 self._log.debug( 426 "slapd connection check to %s", self.default_ldap_uri 427 ) 428 self.ldapwhoami() 429 except RuntimeError: 430 if time.monotonic() >= deadline: 431 break 432 time.sleep(0.2) 433 else: 434 return 435 raise RuntimeError("slapd did not start properly") 436 437 def start(self): 438 """ 439 Starts the slapd server process running, and waits for it to come up. 440 """ 441 442 if self._proc is None: 443 # prepare directory structure 444 atexit.register(self.stop) 445 self._cleanup_rundir() 446 self.setup_rundir() 447 self._write_config() 448 self._test_config() 449 self._start_slapd() 450 self._log.debug( 451 'slapd with pid=%d listening on %s and %s', 452 self._proc.pid, self.ldap_uri, self.ldapi_uri 453 ) 454 455 def stop(self): 456 """ 457 Stops the slapd server, and waits for it to terminate and cleans up 458 """ 459 if self._proc is not None: 460 self._log.debug('stopping slapd with pid %d', self._proc.pid) 461 self._proc.terminate() 462 self.wait() 463 self._cleanup_rundir() 464 atexit.unregister(self.stop) 465 466 def restart(self): 467 """ 468 Restarts the slapd server with same data 469 """ 470 self._proc.terminate() 471 self.wait() 472 self._start_slapd() 473 474 def wait(self): 475 """Waits for the slapd process to terminate by itself.""" 476 if self._proc: 477 self._proc.wait() 478 self._stopped() 479 480 def _stopped(self): 481 """Called when the slapd server is known to have terminated""" 482 if self._proc is not None: 483 self._log.info('slapd[%d] terminated', self._proc.pid) 484 self._proc = None 485 486 def _cli_auth_args(self): 487 if self.cli_sasl_external: 488 authc_args = [ 489 '-Y', 'EXTERNAL', 490 ] 491 if not self._log.isEnabledFor(logging.DEBUG): 492 authc_args.append('-Q') 493 else: 494 authc_args = [ 495 '-x', 496 '-D', self.root_dn, 497 '-w', self.root_pw, 498 ] 499 return authc_args 500 501 # no cover to avoid spurious coverage changes 502 def _cli_popen(self, ldapcommand, extra_args=None, ldap_uri=None, 503 stdin_data=None): # pragma: no cover 504 if ldap_uri is None: 505 ldap_uri = self.default_ldap_uri 506 507 if ldapcommand.split("/")[-1].startswith("ldap"): 508 args = [ldapcommand, '-H', ldap_uri] + self._cli_auth_args() 509 else: 510 args = [ldapcommand, '-F', self._slapd_conf] 511 512 args += (extra_args or []) 513 514 self._log.debug('Run command: %r', ' '.join(args)) 515 proc = subprocess.Popen( 516 args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, 517 stderr=subprocess.PIPE 518 ) 519 self._log.debug('stdin_data=%r', stdin_data) 520 stdout_data, stderr_data = proc.communicate(stdin_data) 521 if stdout_data is not None: 522 self._log.debug('stdout_data=%r', stdout_data) 523 if stderr_data is not None: 524 self._log.debug('stderr_data=%r', stderr_data) 525 if proc.wait() != 0: 526 raise RuntimeError( 527 '{!r} process failed:\n{!r}\n{!r}'.format( 528 args, stdout_data, stderr_data 529 ) 530 ) 531 return stdout_data, stderr_data 532 533 def ldapwhoami(self, extra_args=None): 534 """ 535 Runs ldapwhoami on this slapd instance 536 """ 537 self._cli_popen(self.PATH_LDAPWHOAMI, extra_args=extra_args) 538 539 def ldapadd(self, ldif, extra_args=None): 540 """ 541 Runs ldapadd on this slapd instance, passing it the ldif content 542 """ 543 self._cli_popen(self.PATH_LDAPADD, extra_args=extra_args, 544 stdin_data=ldif.encode('utf-8')) 545 546 def ldapmodify(self, ldif, extra_args=None): 547 """ 548 Runs ldapadd on this slapd instance, passing it the ldif content 549 """ 550 self._cli_popen(self.PATH_LDAPMODIFY, extra_args=extra_args, 551 stdin_data=ldif.encode('utf-8')) 552 553 def ldapdelete(self, dn, recursive=False, extra_args=None): 554 """ 555 Runs ldapdelete on this slapd instance, deleting 'dn' 556 """ 557 if extra_args is None: 558 extra_args = [] 559 if recursive: 560 extra_args.append('-r') 561 extra_args.append(dn) 562 self._cli_popen(self.PATH_LDAPDELETE, extra_args=extra_args) 563 564 def slapadd(self, ldif, extra_args=None): 565 """ 566 Runs slapadd on this slapd instance, passing it the ldif content 567 """ 568 self._cli_popen( 569 self.PATH_SLAPADD, 570 stdin_data=ldif.encode("utf-8") if ldif else None, 571 extra_args=extra_args, 572 ) 573 574 def __enter__(self): 575 self.start() 576 return self 577 578 def __exit__(self, exc_type, exc_value, traceback): 579 self.stop() 580 581 582class SlapdTestCase(unittest.TestCase): 583 """ 584 test class which also clones or initializes a running slapd 585 """ 586 587 server_class = SlapdObject 588 server = None 589 ldap_object_class = None 590 591 def _open_ldap_conn(self, who=None, cred=None, **kwargs): 592 """ 593 return a LDAPObject instance after simple bind 594 """ 595 ldap_conn = self.ldap_object_class(self.server.ldap_uri, **kwargs) 596 ldap_conn.protocol_version = 3 597 #ldap_conn.set_option(ldap.OPT_REFERRALS, 0) 598 ldap_conn.simple_bind_s(who or self.server.root_dn, cred or self.server.root_pw) 599 return ldap_conn 600 601 @classmethod 602 def setUpClass(cls): 603 cls.server = cls.server_class() 604 cls.server.start() 605 606 @classmethod 607 def tearDownClass(cls): 608 cls.server.stop() 609