1"""
2Automatic tests for python-ldap's module ldap.ldapobject
3
4See https://www.python-ldap.org/ for details.
5"""
6import errno
7import linecache
8import os
9import socket
10import unittest
11import pickle
12
13# Switch off processing .ldaprc or ldap.conf before importing _ldap
14os.environ['LDAPNOINIT'] = '1'
15
16import ldap
17from ldap.ldapobject import SimpleLDAPObject, ReconnectLDAPObject
18
19from slapdtest import SlapdTestCase
20from slapdtest import requires_ldapi, requires_sasl, requires_tls
21from slapdtest import requires_init_fd
22
23
24LDIF_TEMPLATE = """dn: %(suffix)s
25objectClass: dcObject
26objectClass: organization
27dc: %(dc)s
28o: %(dc)s
29
30dn: %(rootdn)s
31objectClass: applicationProcess
32objectClass: simpleSecurityObject
33cn: %(rootcn)s
34userPassword: %(rootpw)s
35
36dn: cn=user1,%(suffix)s
37objectClass: applicationProcess
38objectClass: simpleSecurityObject
39cn: user1
40userPassword: user1_pw
41
42dn: cn=Foo1,%(suffix)s
43objectClass: organizationalRole
44cn: Foo1
45
46dn: cn=Foo2,%(suffix)s
47objectClass: organizationalRole
48cn: Foo2
49
50dn: cn=Foo3,%(suffix)s
51objectClass: organizationalRole
52cn: Foo3
53
54dn: ou=Container,%(suffix)s
55objectClass: organizationalUnit
56ou: Container
57
58dn: cn=Foo4,ou=Container,%(suffix)s
59objectClass: organizationalRole
60cn: Foo4
61
62"""
63
64SCHEMA_TEMPLATE = """dn: cn=mySchema,cn=schema,cn=config
65objectClass: olcSchemaConfig
66cn: mySchema
67olcAttributeTypes: ( 1.3.6.1.4.1.56207.1.1.1 NAME 'myAttribute'
68    DESC 'fobar attribute'
69    EQUALITY caseExactMatch
70    ORDERING caseExactOrderingMatch
71    SUBSTR caseExactSubstringsMatch
72    SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
73    SINGLE-VALUE
74    USAGE userApplications
75    X-ORIGIN 'foobar' )
76olcObjectClasses: ( 1.3.6.1.4.1.56207.1.2.2 NAME 'myClass'
77    DESC 'foobar objectclass'
78    SUP top
79    STRUCTURAL
80    MUST myAttribute
81    X-ORIGIN 'foobar' )"""
82
83
84class Test00_SimpleLDAPObject(SlapdTestCase):
85    """
86    test LDAP search operations
87    """
88
89    ldap_object_class = SimpleLDAPObject
90
91    @classmethod
92    def setUpClass(cls):
93        super().setUpClass()
94        # insert some Foo* objects via ldapadd
95        cls.server.ldapadd(
96            LDIF_TEMPLATE % {
97                'suffix':cls.server.suffix,
98                'rootdn':cls.server.root_dn,
99                'rootcn':cls.server.root_cn,
100                'rootpw':cls.server.root_pw,
101                'dc': cls.server.suffix.split(',')[0][3:],
102            }
103        )
104
105    def setUp(self):
106        try:
107            self._ldap_conn
108        except AttributeError:
109            # open local LDAP connection
110            self._ldap_conn = self._open_ldap_conn(bytes_mode=False)
111
112    def tearDown(self):
113        del self._ldap_conn
114
115    def reset_connection(self):
116        try:
117            del self._ldap_conn
118        except AttributeError:
119            pass
120
121        self._ldap_conn = self._open_ldap_conn(bytes_mode=False)
122
123    def test_reject_bytes_base(self):
124        base = self.server.suffix
125        l = self._ldap_conn
126
127        with self.assertRaises(TypeError) as e:
128            l.search_s(
129                base.encode('utf-8'), ldap.SCOPE_SUBTREE, '(cn=Foo*)', ['*']
130            )
131        # Python 3.4.x does not include 'search_ext()' in message
132        self.assertEqual(
133            "search_ext() argument 1 must be str, not bytes",
134            str(e.exception)
135        )
136
137        with self.assertRaises(TypeError) as e:
138            l.search_s(
139                base, ldap.SCOPE_SUBTREE, b'(cn=Foo*)', ['*']
140            )
141        self.assertEqual(
142            "search_ext() argument 3 must be str, not bytes",
143            str(e.exception)
144        )
145
146        with self.assertRaises(TypeError) as e:
147            l.search_s(
148                base, ldap.SCOPE_SUBTREE, '(cn=Foo*)', [b'*']
149            )
150        self.assertEqual(
151            ('attrs_from_List(): expected string in list', b'*'),
152            e.exception.args
153        )
154
155    def test_search_keys_are_text(self):
156        base = self.server.suffix
157        l = self._ldap_conn
158        result = l.search_s(base, ldap.SCOPE_SUBTREE, '(cn=Foo*)', ['*'])
159        result.sort()
160        dn, fields = result[0]
161        self.assertEqual(dn, 'cn=Foo1,%s' % base)
162        self.assertEqual(type(dn), str)
163        for key, values in fields.items():
164            self.assertEqual(type(key), str)
165            for value in values:
166                self.assertEqual(type(value), bytes)
167
168    def test_search_accepts_unicode_dn(self):
169        base = self.server.suffix
170        l = self._ldap_conn
171
172        with self.assertRaises(ldap.NO_SUCH_OBJECT):
173            result = l.search_s("CN=abc\U0001f498def", ldap.SCOPE_SUBTREE)
174
175    def test_filterstr_accepts_unicode(self):
176        l = self._ldap_conn
177        base = self.server.suffix
178        result = l.search_s(base, ldap.SCOPE_SUBTREE, '(cn=abc\U0001f498def)', ['*'])
179        self.assertEqual(result, [])
180
181    def test_attrlist_accepts_unicode(self):
182        base = self.server.suffix
183        result = self._ldap_conn.search_s(
184            base, ldap.SCOPE_SUBTREE,
185            '(cn=Foo*)', ['abc', 'abc\U0001f498def'])
186        result.sort()
187
188        for dn, attrs in result:
189            self.assertIsInstance(dn, str)
190            self.assertEqual(attrs, {})
191
192    def test001_search_subtree(self):
193        result = self._ldap_conn.search_s(
194            self.server.suffix,
195            ldap.SCOPE_SUBTREE,
196            '(cn=Foo*)',
197            attrlist=['*'],
198        )
199        result.sort()
200        self.assertEqual(
201            result,
202            [
203                (
204                    'cn=Foo1,'+self.server.suffix,
205                    {'cn': [b'Foo1'], 'objectClass': [b'organizationalRole']}
206                ),
207                (
208                    'cn=Foo2,'+self.server.suffix,
209                    {'cn': [b'Foo2'], 'objectClass': [b'organizationalRole']}
210                ),
211                (
212                    'cn=Foo3,'+self.server.suffix,
213                    {'cn': [b'Foo3'], 'objectClass': [b'organizationalRole']}
214                ),
215                (
216                    'cn=Foo4,ou=Container,'+self.server.suffix,
217                    {'cn': [b'Foo4'], 'objectClass': [b'organizationalRole']}
218                ),
219            ]
220        )
221
222    def test002_search_onelevel(self):
223        result = self._ldap_conn.search_s(
224            self.server.suffix,
225            ldap.SCOPE_ONELEVEL,
226            '(cn=Foo*)',
227            ['*'],
228        )
229        result.sort()
230        self.assertEqual(
231            result,
232            [
233                (
234                    'cn=Foo1,'+self.server.suffix,
235                    {'cn': [b'Foo1'], 'objectClass': [b'organizationalRole']}
236                ),
237                (
238                    'cn=Foo2,'+self.server.suffix,
239                    {'cn': [b'Foo2'], 'objectClass': [b'organizationalRole']}
240                ),
241                (
242                    'cn=Foo3,'+self.server.suffix,
243                    {'cn': [b'Foo3'], 'objectClass': [b'organizationalRole']}
244                ),
245            ]
246        )
247
248    def test003_search_oneattr(self):
249        result = self._ldap_conn.search_s(
250            self.server.suffix,
251            ldap.SCOPE_SUBTREE,
252            '(cn=Foo4)',
253            ['cn'],
254        )
255        result.sort()
256        self.assertEqual(
257            result,
258            [('cn=Foo4,ou=Container,'+self.server.suffix, {'cn': [b'Foo4']})]
259        )
260
261    def test_find_unique_entry(self):
262        result = self._ldap_conn.find_unique_entry(
263            self.server.suffix,
264            ldap.SCOPE_SUBTREE,
265            '(cn=Foo4)',
266            ['cn'],
267        )
268        self.assertEqual(
269            result,
270            ('cn=Foo4,ou=Container,'+self.server.suffix, {'cn': [b'Foo4']})
271        )
272        with self.assertRaises(ldap.SIZELIMIT_EXCEEDED):
273            # > 2 entries returned
274            self._ldap_conn.find_unique_entry(
275                self.server.suffix,
276                ldap.SCOPE_ONELEVEL,
277                '(cn=Foo*)',
278                ['*'],
279            )
280        with self.assertRaises(ldap.NO_UNIQUE_ENTRY):
281            # 0 entries returned
282            self._ldap_conn.find_unique_entry(
283                self.server.suffix,
284                ldap.SCOPE_ONELEVEL,
285                '(cn=Bar*)',
286                ['*'],
287            )
288
289    def test_search_subschema(self):
290        l = self._ldap_conn
291        dn = l.search_subschemasubentry_s()
292        self.assertIsInstance(dn, str)
293        self.assertEqual(dn, "cn=Subschema")
294        subschema = l.read_subschemasubentry_s(dn)
295        self.assertIsInstance(subschema, dict)
296        self.assertEqual(
297            sorted(subschema),
298            [
299                'attributeTypes',
300                'ldapSyntaxes',
301                'matchingRuleUse',
302                'matchingRules',
303                'objectClasses'
304            ]
305        )
306
307    def test004_enotconn(self):
308        l = self.ldap_object_class('ldap://127.0.0.1:42')
309        try:
310            m = l.simple_bind_s("", "")
311            r = l.result4(m, ldap.MSG_ALL, self.timeout)
312        except ldap.SERVER_DOWN as ldap_err:
313            errno_val = ldap_err.args[0]['errno']
314            if errno_val != errno.ENOTCONN:
315                self.fail("expected errno=%d, got %d"
316                          % (errno.ENOTCONN, errno_val))
317            info = ldap_err.args[0]['info']
318            expected_info = os.strerror(errno.ENOTCONN)
319            if info != expected_info:
320                self.fail(f"expected info={expected_info!r}, got {info!r}")
321        else:
322            self.fail("expected SERVER_DOWN, got %r" % r)
323
324    def test005_invalid_credentials(self):
325        l = self.ldap_object_class(self.server.ldap_uri)
326        # search with invalid filter
327        try:
328            m = l.simple_bind(self.server.root_dn, self.server.root_pw+'wrong')
329            r = l.result4(m, ldap.MSG_ALL)
330        except ldap.INVALID_CREDENTIALS:
331            pass
332        else:
333            self.fail("expected INVALID_CREDENTIALS, got %r" % r)
334
335    @requires_sasl()
336    @requires_ldapi()
337    def test006_sasl_external_bind_s(self):
338        l = self.ldap_object_class(self.server.ldapi_uri)
339        l.sasl_external_bind_s()
340        self.assertEqual(l.whoami_s(), 'dn:'+self.server.root_dn.lower())
341        authz_id = 'dn:cn=Foo2,%s' % (self.server.suffix)
342        l = self.ldap_object_class(self.server.ldapi_uri)
343        l.sasl_external_bind_s(authz_id=authz_id)
344        self.assertEqual(l.whoami_s(), authz_id.lower())
345
346    @requires_sasl()
347    @requires_ldapi()
348    def test006_sasl_options(self):
349        l = self.ldap_object_class(self.server.ldapi_uri)
350
351        minssf = l.get_option(ldap.OPT_X_SASL_SSF_MIN)
352        self.assertGreaterEqual(minssf, 0)
353        self.assertLessEqual(minssf, 256)
354        maxssf = l.get_option(ldap.OPT_X_SASL_SSF_MAX)
355        self.assertGreaterEqual(maxssf, 0)
356        # libldap sets SSF_MAX to INT_MAX
357        self.assertLessEqual(maxssf, 2**31 - 1)
358
359        l.set_option(ldap.OPT_X_SASL_SSF_MIN, 56)
360        l.set_option(ldap.OPT_X_SASL_SSF_MAX, 256)
361        self.assertEqual(l.get_option(ldap.OPT_X_SASL_SSF_MIN), 56)
362        self.assertEqual(l.get_option(ldap.OPT_X_SASL_SSF_MAX), 256)
363
364        l.sasl_external_bind_s()
365        with self.assertRaisesRegex(ValueError, "write-only option"):
366            l.get_option(ldap.OPT_X_SASL_SSF_EXTERNAL)
367        l.set_option(ldap.OPT_X_SASL_SSF_EXTERNAL, 256)
368        self.assertEqual(l.whoami_s(), 'dn:' + self.server.root_dn.lower())
369
370    def test007_timeout(self):
371        l = self.ldap_object_class(self.server.ldap_uri)
372        m = l.search_ext(self.server.suffix, ldap.SCOPE_SUBTREE, '(objectClass=*)')
373        l.abandon(m)
374        with self.assertRaises(ldap.TIMEOUT):
375            result = l.result(m, timeout=0.001)
376
377    def assertIsSubclass(self, cls, other):
378        self.assertTrue(
379            issubclass(cls, other),
380            cls.__mro__
381        )
382
383    def test_simple_bind_noarg(self):
384        l = self.ldap_object_class(self.server.ldap_uri)
385        l.simple_bind_s()
386        self.assertEqual(l.whoami_s(), '')
387        l = self.ldap_object_class(self.server.ldap_uri)
388        l.simple_bind_s(None, None)
389        self.assertEqual(l.whoami_s(), '')
390
391    def _check_byteswarning(self, warning, expected_message):
392        self.assertIs(warning.category, ldap.LDAPBytesWarning)
393        self.assertIn(expected_message, str(warning.message))
394
395        def _normalize(filename):
396            # Python 2 likes to report the ".pyc" file in warnings,
397            # tracebacks or __file__.
398            # Use the corresponding ".py" in that case.
399            if filename.endswith('.pyc'):
400                return filename[:-1]
401            return filename
402
403        # Assert warning points to a line marked CORRECT LINE in this file
404        self.assertEquals(_normalize(warning.filename), _normalize(__file__))
405        self.assertIn(
406            'CORRECT LINE',
407            linecache.getline(warning.filename, warning.lineno)
408        )
409
410    @requires_tls()
411    def test_multiple_starttls(self):
412        # Test for openldap does not re-register nss shutdown callbacks
413        # after nss_Shutdown is called
414        # https://github.com/python-ldap/python-ldap/issues/60
415        # https://bugzilla.redhat.com/show_bug.cgi?id=1520990
416        for _ in range(10):
417            l = self.ldap_object_class(self.server.ldap_uri)
418            l.set_option(ldap.OPT_X_TLS_CACERTFILE, self.server.cafile)
419            l.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
420            l.start_tls_s()
421            l.simple_bind_s(self.server.root_dn, self.server.root_pw)
422            self.assertEqual(l.whoami_s(), 'dn:' + self.server.root_dn)
423
424    def test_dse(self):
425        dse = self._ldap_conn.read_rootdse_s()
426        self.assertIsInstance(dse, dict)
427        self.assertEqual(dse['supportedLDAPVersion'], [b'3'])
428        keys = set(dse)
429        # SASL info may be missing in restricted build environments
430        keys.discard('supportedSASLMechanisms')
431        self.assertEqual(
432            keys,
433            {'configContext', 'entryDN', 'namingContexts', 'objectClass',
434             'structuralObjectClass', 'subschemaSubentry',
435             'supportedControl', 'supportedExtension', 'supportedFeatures',
436             'supportedLDAPVersion'}
437        )
438        self.assertEqual(
439            self._ldap_conn.get_naming_contexts(),
440            [self.server.suffix.encode('utf-8')]
441        )
442
443    def test_compare_s_true(self):
444        base = self.server.suffix
445        l = self._ldap_conn
446        result = l.compare_s('cn=Foo1,%s' % base, 'cn', b'Foo1')
447        self.assertIs(result, True)
448
449    def test_compare_s_false(self):
450        base = self.server.suffix
451        l = self._ldap_conn
452        result = l.compare_s('cn=Foo1,%s' % base, 'cn', b'Foo2')
453        self.assertIs(result, False)
454
455    def test_compare_s_notfound(self):
456        base = self.server.suffix
457        l = self._ldap_conn
458        with self.assertRaises(ldap.NO_SUCH_OBJECT):
459            result = l.compare_s('cn=invalid,%s' % base, 'cn', b'Foo2')
460
461    def test_compare_s_invalidattr(self):
462        base = self.server.suffix
463        l = self._ldap_conn
464        with self.assertRaises(ldap.UNDEFINED_TYPE):
465            result = l.compare_s('cn=Foo1,%s' % base, 'invalidattr', b'invalid')
466
467    def test_compare_true_exception_contains_message_id(self):
468        base = self.server.suffix
469        l = self._ldap_conn
470        msgid = l.compare('cn=Foo1,%s' % base, 'cn', b'Foo1')
471        with self.assertRaises(ldap.COMPARE_TRUE) as cm:
472            l.result()
473        self.assertEqual(cm.exception.args[0]["msgid"], msgid)
474
475    def test_async_search_no_such_object_exception_contains_message_id(self):
476        msgid = self._ldap_conn.search("CN=XXX", ldap.SCOPE_SUBTREE)
477        with self.assertRaises(ldap.NO_SUCH_OBJECT) as cm:
478            self._ldap_conn.result()
479        self.assertEqual(cm.exception.args[0]["msgid"], msgid)
480
481    def test_passwd_s(self):
482        l = self._ldap_conn
483
484        # first, create a user to change password on
485        dn = "cn=PasswordTest," + self.server.suffix
486        result, pmsg, msgid, ctrls = l.add_ext_s(
487            dn,
488            [
489                ('objectClass', b'person'),
490                ('sn', b'PasswordTest'),
491                ('cn', b'PasswordTest'),
492                ('userPassword', b'initial'),
493            ]
494        )
495        self.assertEqual(result, ldap.RES_ADD)
496        self.assertIsInstance(msgid, int)
497        self.assertEqual(pmsg, [])
498        self.assertEqual(ctrls, [])
499
500        # try changing password with a wrong old-pw
501        with self.assertRaises(ldap.UNWILLING_TO_PERFORM):
502            l.passwd_s(dn, "bogus", "ignored")
503
504        # have the server generate a new random pw
505        respoid, respvalue = l.passwd_s(dn, "initial", None, extract_newpw=True)
506        self.assertEqual(respoid, None)
507
508        password = respvalue.genPasswd
509        self.assertIsInstance(password, bytes)
510
511        # try changing password back
512        respoid, respvalue = l.passwd_s(dn, password, "initial")
513        self.assertEqual(respoid, None)
514        self.assertEqual(respvalue, None)
515
516        l.delete_s(dn)
517
518    def test_slapadd(self):
519        with self.assertRaises(ldap.INVALID_DN_SYNTAX):
520            self._ldap_conn.add_s("myAttribute=foobar,ou=Container,%s" % self.server.suffix, [
521                ("objectClass", b'myClass'),
522                ("myAttribute", b'foobar'),
523            ])
524
525        self.server.slapadd(SCHEMA_TEMPLATE, ["-n0"])
526        self.server.restart()
527        self.reset_connection()
528
529        self._ldap_conn.add_s("myAttribute=foobar,ou=Container,%s" % self.server.suffix, [
530            ("objectClass", b'myClass'),
531            ("myAttribute", b'foobar'),
532        ])
533
534
535class Test01_ReconnectLDAPObject(Test00_SimpleLDAPObject):
536    """
537    test ReconnectLDAPObject by restarting slapd
538    """
539
540    ldap_object_class = ReconnectLDAPObject
541
542    @requires_sasl()
543    @requires_ldapi()
544    def test101_reconnect_sasl_external(self):
545        l = self.ldap_object_class(self.server.ldapi_uri)
546        l.sasl_external_bind_s()
547        authz_id = l.whoami_s()
548        self.assertEqual(authz_id, 'dn:'+self.server.root_dn.lower())
549        self.server.restart()
550        self.assertEqual(l.whoami_s(), authz_id)
551
552    def test102_reconnect_simple_bind(self):
553        l = self.ldap_object_class(self.server.ldap_uri)
554        bind_dn = 'cn=user1,'+self.server.suffix
555        l.simple_bind_s(bind_dn, 'user1_pw')
556        self.assertEqual(l.whoami_s(), 'dn:'+bind_dn)
557        self.server.restart()
558        self.assertEqual(l.whoami_s(), 'dn:'+bind_dn)
559
560    def test103_reconnect_get_state(self):
561        l1 = self.ldap_object_class(self.server.ldap_uri)
562        bind_dn = 'cn=user1,'+self.server.suffix
563        l1.simple_bind_s(bind_dn, 'user1_pw')
564        self.assertEqual(l1.whoami_s(), 'dn:'+bind_dn)
565        self.assertEqual(
566            l1.__getstate__(),
567            {
568                '_last_bind': (
569                    'simple_bind_s',
570                    (bind_dn, 'user1_pw'),
571                    {}
572                ),
573                '_options': [(17, 3)],
574                '_reconnects_done': 0,
575                '_retry_delay': 60.0,
576                '_retry_max': 1,
577                '_start_tls': 0,
578                '_trace_level': ldap._trace_level,
579                '_trace_stack_limit': 5,
580                '_uri': self.server.ldap_uri,
581                'timeout': -1,
582            },
583        )
584
585    def test104_reconnect_restore(self):
586        l1 = self.ldap_object_class(self.server.ldap_uri)
587        bind_dn = 'cn=user1,'+self.server.suffix
588        l1.simple_bind_s(bind_dn, 'user1_pw')
589        self.assertEqual(l1.whoami_s(), 'dn:'+bind_dn)
590        l1_state = pickle.dumps(l1)
591        del l1
592        l2 = pickle.loads(l1_state)
593        self.assertEqual(l2.whoami_s(), 'dn:'+bind_dn)
594
595    def test105_reconnect_restore(self):
596        l1 = self.ldap_object_class(self.server.ldap_uri, retry_max=2, retry_delay=1)
597        bind_dn = 'cn=user1,'+self.server.suffix
598        l1.simple_bind_s(bind_dn, 'user1_pw')
599        self.assertEqual(l1.whoami_s(), 'dn:'+bind_dn)
600        self.server._proc.terminate()
601        self.server.wait()
602        try:
603            l1.whoami_s()
604        except ldap.SERVER_DOWN:
605            pass
606        else:
607            self.assertEqual(True, False)
608        finally:
609            self.server._start_slapd()
610        self.assertEqual(l1.whoami_s(), 'dn:'+bind_dn)
611
612
613@requires_init_fd()
614class Test03_SimpleLDAPObjectWithFileno(Test00_SimpleLDAPObject):
615    def _open_ldap_conn(self, who=None, cred=None, **kwargs):
616        if hasattr(self, '_sock'):
617            raise RuntimeError("socket already connected")
618        self._sock = socket.create_connection(
619            (self.server.hostname, self.server.port)
620        )
621        return super()._open_ldap_conn(
622            who=who, cred=cred, fileno=self._sock.fileno(), **kwargs
623        )
624
625    def tearDown(self):
626        self._sock.close()
627        del self._sock
628        super().tearDown()
629
630    def reset_connection(self):
631        self._sock.close()
632        del self._sock
633        super(Test03_SimpleLDAPObjectWithFileno, self).reset_connection()
634
635
636if __name__ == '__main__':
637    unittest.main()
638