1#!/usr/bin/env python3
2#
3# Tombstone reanimation tests
4#
5# Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2014
6# Copyright (C) Nadezhda Ivanova <nivanova@symas.com> 2014
7#
8# This program is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 3 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
21from __future__ import print_function
22import sys
23import unittest
24
25sys.path.insert(0, "bin/python")
26import samba
27
28from samba.ndr import ndr_unpack, ndr_print
29from samba.dcerpc import misc
30from samba.dcerpc import security
31from samba.dcerpc import drsblobs
32from samba.dcerpc.drsuapi import *
33from samba.tests.password_test import PasswordCommon
34from samba.compat import get_string
35
36import samba.tests
37from ldb import (SCOPE_BASE, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE, Dn, Message,
38                 MessageElement, LdbError,
39                 ERR_ATTRIBUTE_OR_VALUE_EXISTS, ERR_NO_SUCH_OBJECT, ERR_ENTRY_ALREADY_EXISTS,
40                 ERR_OPERATIONS_ERROR, ERR_UNWILLING_TO_PERFORM)
41
42
43class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase):
44    """ verify Samba restores required attributes when
45        user restores a Deleted object
46    """
47
48    def setUp(self):
49        super(RestoredObjectAttributesBaseTestCase, self).setUp()
50        self.samdb = samba.tests.connect_samdb_env("TEST_SERVER", "TEST_USERNAME", "TEST_PASSWORD")
51        self.base_dn = self.samdb.domain_dn()
52        self.schema_dn = self.samdb.get_schema_basedn().get_linearized()
53        self.configuration_dn = self.samdb.get_config_basedn().get_linearized()
54
55        # permit password changes during this test
56        PasswordCommon.allow_password_changes(self, self.samdb)
57
58    def tearDown(self):
59        super(RestoredObjectAttributesBaseTestCase, self).tearDown()
60
61    def GUID_string(self, guid):
62        return get_string(self.samdb.schema_format_value("objectGUID", guid))
63
64    def search_guid(self, guid, attrs=["*"]):
65        res = self.samdb.search(base="<GUID=%s>" % self.GUID_string(guid),
66                                scope=SCOPE_BASE, attrs=attrs,
67                                controls=["show_deleted:1"])
68        self.assertEquals(len(res), 1)
69        return res[0]
70
71    def search_dn(self, dn):
72        res = self.samdb.search(expression="(objectClass=*)",
73                                base=dn,
74                                scope=SCOPE_BASE,
75                                controls=["show_recycled:1"])
76        self.assertEquals(len(res), 1)
77        return res[0]
78
79    def _create_object(self, msg):
80        """:param msg: dict with dn and attributes to create an object from"""
81        # delete an object if leftover from previous test
82        samba.tests.delete_force(self.samdb, msg['dn'])
83        self.samdb.add(msg)
84        return self.search_dn(msg['dn'])
85
86    def assertNamesEqual(self, attrs_expected, attrs_extra):
87        self.assertEqual(attrs_expected, attrs_extra,
88                         "Actual object does not have expected attributes, missing from expected (%s), extra (%s)"
89                         % (str(attrs_expected.difference(attrs_extra)), str(attrs_extra.difference(attrs_expected))))
90
91    def assertAttributesEqual(self, obj_orig, attrs_orig, obj_restored, attrs_rest):
92        self.assertNamesEqual(attrs_orig, attrs_rest)
93        # remove volatile attributes, they can't be equal
94        attrs_orig -= set(["uSNChanged", "dSCorePropagationData", "whenChanged"])
95        for attr in attrs_orig:
96            # convert original attr value to ldif
97            orig_val = obj_orig.get(attr)
98            if orig_val is None:
99                continue
100            if not isinstance(orig_val, MessageElement):
101                orig_val = MessageElement(str(orig_val), 0, attr)
102            m = Message()
103            m.add(orig_val)
104            orig_ldif = self.samdb.write_ldif(m, 0)
105            # convert restored attr value to ldif
106            rest_val = obj_restored.get(attr)
107            self.assertFalse(rest_val is None)
108            m = Message()
109            if not isinstance(rest_val, MessageElement):
110                rest_val = MessageElement(str(rest_val), 0, attr)
111            m.add(rest_val)
112            rest_ldif = self.samdb.write_ldif(m, 0)
113            # compare generated ldif's
114            self.assertEqual(orig_ldif, rest_ldif)
115
116    def assertAttributesExists(self, attr_expected, obj_msg):
117        """Check object contains at least expected attrbigutes
118        :param attr_expected: dict of expected attributes with values. ** is any value
119        :param obj_msg: Ldb.Message for the object under test
120        """
121        actual_names = set(obj_msg.keys())
122        # Samba does not use 'dSCorePropagationData', so skip it
123        actual_names -= set(['dSCorePropagationData'])
124        expected_names = set(attr_expected.keys())
125        self.assertNamesEqual(expected_names, actual_names)
126        for name in attr_expected.keys():
127            expected_val = attr_expected[name]
128            actual_val = obj_msg.get(name)
129            self.assertFalse(actual_val is None, "No value for attribute '%s'" % name)
130            if expected_val == "**":
131                # "**" values means "any"
132                continue
133            # if expected_val is e.g. ldb.bytes we can't depend on
134            # str(actual_value) working, we may just get a decoding
135            # error. Better to just compare raw values
136            if not isinstance(expected_val, str):
137                actual_val = actual_val[0]
138            else:
139                actual_val = str(actual_val)
140            self.assertEqual(expected_val, actual_val,
141                             "Unexpected value (%s) for '%s', expected (%s)" % (
142                             repr(actual_val), name, repr(expected_val)))
143
144    def _check_metadata(self, metadata, expected):
145        repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, metadata[0])
146
147        repl_array = []
148        for o in repl.ctr.array:
149            repl_array.append((o.attid, o.version))
150        repl_set = set(repl_array)
151
152        expected_set = set(expected)
153        self.assertEqual(len(repl_set), len(expected),
154                         "Unexpected metadata, missing from expected (%s), extra (%s)), repl: \n%s" % (
155                         str(expected_set.difference(repl_set)),
156                         str(repl_set.difference(expected_set)),
157                         ndr_print(repl)))
158
159        i = 0
160        for o in repl.ctr.array:
161            e = expected[i]
162            (attid, version) = e
163            self.assertEquals(attid, o.attid,
164                              "(LDAP) Wrong attid "
165                              "for expected value %d, wanted 0x%08x got 0x%08x, "
166                              "repl: \n%s"
167                              % (i, attid, o.attid, ndr_print(repl)))
168            # Allow version to be skipped when it does not matter
169            if version is not None:
170                self.assertEquals(o.version, version,
171                                  "(LDAP) Wrong version for expected value %d, "
172                                  "attid 0x%08x, "
173                                  "wanted %d got %d, repl: \n%s"
174                                  % (i, o.attid,
175                                     version, o.version, ndr_print(repl)))
176            i = i + 1
177
178    @staticmethod
179    def restore_deleted_object(samdb, del_dn, new_dn, new_attrs=None):
180        """Restores a deleted object
181        :param samdb: SamDB connection to SAM
182        :param del_dn: str Deleted object DN
183        :param new_dn: str Where to restore the object
184        :param new_attrs: dict Additional attributes to set
185        """
186        msg = Message()
187        msg.dn = Dn(samdb, str(del_dn))
188        msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
189        msg["distinguishedName"] = MessageElement([str(new_dn)], FLAG_MOD_REPLACE, "distinguishedName")
190        if new_attrs is not None:
191            assert isinstance(new_attrs, dict)
192            for attr in new_attrs:
193                msg[attr] = MessageElement(new_attrs[attr], FLAG_MOD_REPLACE, attr)
194        samdb.modify(msg, ["show_deleted:1"])
195
196
197class BaseRestoreObjectTestCase(RestoredObjectAttributesBaseTestCase):
198    def setUp(self):
199        super(BaseRestoreObjectTestCase, self).setUp()
200
201    def enable_recycle_bin(self):
202        msg = Message()
203        msg.dn = Dn(self.samdb, "")
204        msg["enableOptionalFeature"] = MessageElement(
205            "CN=Partitions," + self.configuration_dn + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
206            FLAG_MOD_ADD, "enableOptionalFeature")
207        try:
208            self.samdb.modify(msg)
209        except LdbError as e:
210            (num, _) = e.args
211            self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS)
212
213    def test_undelete(self):
214        print("Testing standard undelete operation")
215        usr1 = "cn=testuser,cn=users," + self.base_dn
216        samba.tests.delete_force(self.samdb, usr1)
217        self.samdb.add({
218            "dn": usr1,
219            "objectclass": "user",
220            "description": "test user description",
221            "samaccountname": "testuser"})
222        objLive1 = self.search_dn(usr1)
223        guid1 = objLive1["objectGUID"][0]
224        self.samdb.delete(usr1)
225        objDeleted1 = self.search_guid(guid1)
226        self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1)
227        objLive2 = self.search_dn(usr1)
228        self.assertEqual(str(objLive2.dn).lower(), str(objLive1.dn).lower())
229        samba.tests.delete_force(self.samdb, usr1)
230
231    def test_rename(self):
232        print("Testing attempt to rename deleted object")
233        usr1 = "cn=testuser,cn=users," + self.base_dn
234        self.samdb.add({
235            "dn": usr1,
236            "objectclass": "user",
237            "description": "test user description",
238            "samaccountname": "testuser"})
239        objLive1 = self.search_dn(usr1)
240        guid1 = objLive1["objectGUID"][0]
241        self.samdb.delete(usr1)
242        objDeleted1 = self.search_guid(guid1)
243        # just to make sure we get the correct error if the show deleted is missing
244        try:
245            self.samdb.rename(str(objDeleted1.dn), usr1)
246            self.fail()
247        except LdbError as e1:
248            (num, _) = e1.args
249            self.assertEquals(num, ERR_NO_SUCH_OBJECT)
250
251        try:
252            self.samdb.rename(str(objDeleted1.dn), usr1, ["show_deleted:1"])
253            self.fail()
254        except LdbError as e2:
255            (num, _) = e2.args
256            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
257
258    def test_undelete_with_mod(self):
259        print("Testing standard undelete operation with modification of additional attributes")
260        usr1 = "cn=testuser,cn=users," + self.base_dn
261        self.samdb.add({
262            "dn": usr1,
263            "objectclass": "user",
264            "description": "test user description",
265            "samaccountname": "testuser"})
266        objLive1 = self.search_dn(usr1)
267        guid1 = objLive1["objectGUID"][0]
268        self.samdb.delete(usr1)
269        objDeleted1 = self.search_guid(guid1)
270        self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1, {"url": "www.samba.org"})
271        objLive2 = self.search_dn(usr1)
272        self.assertEqual(str(objLive2["url"][0]), "www.samba.org")
273        samba.tests.delete_force(self.samdb, usr1)
274
275    def test_undelete_newuser(self):
276        print("Testing undelete user with a different dn")
277        usr1 = "cn=testuser,cn=users," + self.base_dn
278        usr2 = "cn=testuser2,cn=users," + self.base_dn
279        samba.tests.delete_force(self.samdb, usr1)
280        self.samdb.add({
281            "dn": usr1,
282            "objectclass": "user",
283            "description": "test user description",
284            "samaccountname": "testuser"})
285        objLive1 = self.search_dn(usr1)
286        guid1 = objLive1["objectGUID"][0]
287        self.samdb.delete(usr1)
288        objDeleted1 = self.search_guid(guid1)
289        self.restore_deleted_object(self.samdb, objDeleted1.dn, usr2)
290        objLive2 = self.search_dn(usr2)
291        samba.tests.delete_force(self.samdb, usr1)
292        samba.tests.delete_force(self.samdb, usr2)
293
294    def test_undelete_existing(self):
295        print("Testing undelete user after a user with the same dn has been created")
296        usr1 = "cn=testuser,cn=users," + self.base_dn
297        self.samdb.add({
298            "dn": usr1,
299            "objectclass": "user",
300            "description": "test user description",
301            "samaccountname": "testuser"})
302        objLive1 = self.search_dn(usr1)
303        guid1 = objLive1["objectGUID"][0]
304        self.samdb.delete(usr1)
305        self.samdb.add({
306            "dn": usr1,
307            "objectclass": "user",
308            "description": "test user description",
309            "samaccountname": "testuser"})
310        objDeleted1 = self.search_guid(guid1)
311        try:
312            self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1)
313            self.fail()
314        except LdbError as e3:
315            (num, _) = e3.args
316            self.assertEquals(num, ERR_ENTRY_ALREADY_EXISTS)
317
318    def test_undelete_cross_nc(self):
319        print("Cross NC undelete")
320        c1 = "cn=ldaptestcontainer," + self.base_dn
321        c2 = "cn=ldaptestcontainer2," + self.configuration_dn
322        c3 = "cn=ldaptestcontainer," + self.configuration_dn
323        c4 = "cn=ldaptestcontainer2," + self.base_dn
324        samba.tests.delete_force(self.samdb, c1)
325        samba.tests.delete_force(self.samdb, c2)
326        samba.tests.delete_force(self.samdb, c3)
327        samba.tests.delete_force(self.samdb, c4)
328        self.samdb.add({
329            "dn": c1,
330            "objectclass": "container"})
331        self.samdb.add({
332            "dn": c2,
333            "objectclass": "container"})
334        objLive1 = self.search_dn(c1)
335        objLive2 = self.search_dn(c2)
336        guid1 = objLive1["objectGUID"][0]
337        guid2 = objLive2["objectGUID"][0]
338        self.samdb.delete(c1)
339        self.samdb.delete(c2)
340        objDeleted1 = self.search_guid(guid1)
341        objDeleted2 = self.search_guid(guid2)
342        # try to undelete from base dn to config
343        try:
344            self.restore_deleted_object(self.samdb, objDeleted1.dn, c3)
345            self.fail()
346        except LdbError as e4:
347            (num, _) = e4.args
348            self.assertEquals(num, ERR_OPERATIONS_ERROR)
349        # try to undelete from config to base dn
350        try:
351            self.restore_deleted_object(self.samdb, objDeleted2.dn, c4)
352            self.fail()
353        except LdbError as e5:
354            (num, _) = e5.args
355            self.assertEquals(num, ERR_OPERATIONS_ERROR)
356        # assert undeletion will work in same nc
357        self.restore_deleted_object(self.samdb, objDeleted1.dn, c4)
358        self.restore_deleted_object(self.samdb, objDeleted2.dn, c3)
359
360
361class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase):
362    """Test cases for delete/reanimate user objects"""
363
364    def _expected_user_add_attributes(self, username, user_dn, category):
365        return {'dn': user_dn,
366                'objectClass': '**',
367                'cn': username,
368                'distinguishedName': user_dn,
369                'instanceType': '4',
370                'whenCreated': '**',
371                'whenChanged': '**',
372                'uSNCreated': '**',
373                'uSNChanged': '**',
374                'name': username,
375                'objectGUID': '**',
376                'userAccountControl': '546',
377                'badPwdCount': '0',
378                'badPasswordTime': '0',
379                'codePage': '0',
380                'countryCode': '0',
381                'lastLogon': '0',
382                'lastLogoff': '0',
383                'pwdLastSet': '0',
384                'primaryGroupID': '513',
385                'objectSid': '**',
386                'accountExpires': '9223372036854775807',
387                'logonCount': '0',
388                'sAMAccountName': username,
389                'sAMAccountType': '805306368',
390                'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
391                }
392
393    def _expected_user_add_metadata(self):
394        return [
395            (DRSUAPI_ATTID_objectClass, 1),
396            (DRSUAPI_ATTID_cn, 1),
397            (DRSUAPI_ATTID_instanceType, 1),
398            (DRSUAPI_ATTID_whenCreated, 1),
399            (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
400            (DRSUAPI_ATTID_name, 1),
401            (DRSUAPI_ATTID_userAccountControl, None),
402            (DRSUAPI_ATTID_codePage, 1),
403            (DRSUAPI_ATTID_countryCode, 1),
404            (DRSUAPI_ATTID_dBCSPwd, 1),
405            (DRSUAPI_ATTID_logonHours, 1),
406            (DRSUAPI_ATTID_unicodePwd, 1),
407            (DRSUAPI_ATTID_ntPwdHistory, 1),
408            (DRSUAPI_ATTID_pwdLastSet, 1),
409            (DRSUAPI_ATTID_primaryGroupID, 1),
410            (DRSUAPI_ATTID_objectSid, 1),
411            (DRSUAPI_ATTID_accountExpires, 1),
412            (DRSUAPI_ATTID_lmPwdHistory, 1),
413            (DRSUAPI_ATTID_sAMAccountName, 1),
414            (DRSUAPI_ATTID_sAMAccountType, 1),
415            (DRSUAPI_ATTID_objectCategory, 1)]
416
417    def _expected_user_del_attributes(self, username, _guid, _sid):
418        guid = ndr_unpack(misc.GUID, _guid)
419        dn = "CN=%s\\0ADEL:%s,CN=Deleted Objects,%s" % (username, guid, self.base_dn)
420        cn = "%s\nDEL:%s" % (username, guid)
421        return {'dn': dn,
422                'objectClass': '**',
423                'cn': cn,
424                'distinguishedName': dn,
425                'isDeleted': 'TRUE',
426                'isRecycled': 'TRUE',
427                'instanceType': '4',
428                'whenCreated': '**',
429                'whenChanged': '**',
430                'uSNCreated': '**',
431                'uSNChanged': '**',
432                'name': cn,
433                'objectGUID': _guid,
434                'userAccountControl': '546',
435                'objectSid': _sid,
436                'sAMAccountName': username,
437                'lastKnownParent': 'CN=Users,%s' % self.base_dn,
438                }
439
440    def _expected_user_del_metadata(self):
441        return [
442            (DRSUAPI_ATTID_objectClass, 1),
443            (DRSUAPI_ATTID_cn, 2),
444            (DRSUAPI_ATTID_instanceType, 1),
445            (DRSUAPI_ATTID_whenCreated, 1),
446            (DRSUAPI_ATTID_isDeleted, 1),
447            (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
448            (DRSUAPI_ATTID_name, 2),
449            (DRSUAPI_ATTID_userAccountControl, None),
450            (DRSUAPI_ATTID_codePage, 2),
451            (DRSUAPI_ATTID_countryCode, 2),
452            (DRSUAPI_ATTID_dBCSPwd, 1),
453            (DRSUAPI_ATTID_logonHours, 1),
454            (DRSUAPI_ATTID_unicodePwd, 1),
455            (DRSUAPI_ATTID_ntPwdHistory, 1),
456            (DRSUAPI_ATTID_pwdLastSet, 2),
457            (DRSUAPI_ATTID_primaryGroupID, 2),
458            (DRSUAPI_ATTID_objectSid, 1),
459            (DRSUAPI_ATTID_accountExpires, 2),
460            (DRSUAPI_ATTID_lmPwdHistory, 1),
461            (DRSUAPI_ATTID_sAMAccountName, 1),
462            (DRSUAPI_ATTID_sAMAccountType, 2),
463            (DRSUAPI_ATTID_lastKnownParent, 1),
464            (DRSUAPI_ATTID_objectCategory, 2),
465            (DRSUAPI_ATTID_isRecycled, 1)]
466
467    def _expected_user_restore_attributes(self, username, guid, sid, user_dn, category):
468        return {'dn': user_dn,
469                'objectClass': '**',
470                'cn': username,
471                'distinguishedName': user_dn,
472                'instanceType': '4',
473                'whenCreated': '**',
474                'whenChanged': '**',
475                'uSNCreated': '**',
476                'uSNChanged': '**',
477                'name': username,
478                'objectGUID': guid,
479                'userAccountControl': '546',
480                'badPwdCount': '0',
481                'badPasswordTime': '0',
482                'codePage': '0',
483                'countryCode': '0',
484                'lastLogon': '0',
485                'lastLogoff': '0',
486                'pwdLastSet': '0',
487                'primaryGroupID': '513',
488                'operatorCount': '0',
489                'objectSid': sid,
490                'adminCount': '0',
491                'accountExpires': '0',
492                'logonCount': '0',
493                'sAMAccountName': username,
494                'sAMAccountType': '805306368',
495                'lastKnownParent': 'CN=Users,%s' % self.base_dn,
496                'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
497                }
498
499    def _expected_user_restore_metadata(self):
500        return [
501            (DRSUAPI_ATTID_objectClass, 1),
502            (DRSUAPI_ATTID_cn, 3),
503            (DRSUAPI_ATTID_instanceType, 1),
504            (DRSUAPI_ATTID_whenCreated, 1),
505            (DRSUAPI_ATTID_isDeleted, 2),
506            (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
507            (DRSUAPI_ATTID_name, 3),
508            (DRSUAPI_ATTID_userAccountControl, None),
509            (DRSUAPI_ATTID_codePage, 3),
510            (DRSUAPI_ATTID_countryCode, 3),
511            (DRSUAPI_ATTID_dBCSPwd, 1),
512            (DRSUAPI_ATTID_logonHours, 1),
513            (DRSUAPI_ATTID_unicodePwd, 1),
514            (DRSUAPI_ATTID_ntPwdHistory, 1),
515            (DRSUAPI_ATTID_pwdLastSet, 3),
516            (DRSUAPI_ATTID_primaryGroupID, 3),
517            (DRSUAPI_ATTID_operatorCount, 1),
518            (DRSUAPI_ATTID_objectSid, 1),
519            (DRSUAPI_ATTID_adminCount, 1),
520            (DRSUAPI_ATTID_accountExpires, 3),
521            (DRSUAPI_ATTID_lmPwdHistory, 1),
522            (DRSUAPI_ATTID_sAMAccountName, 1),
523            (DRSUAPI_ATTID_sAMAccountType, 3),
524            (DRSUAPI_ATTID_lastKnownParent, 1),
525            (DRSUAPI_ATTID_objectCategory, 3),
526            (DRSUAPI_ATTID_isRecycled, 2)]
527
528    def test_restore_user(self):
529        print("Test restored user attributes")
530        username = "restore_user"
531        usr_dn = "CN=%s,CN=Users,%s" % (username, self.base_dn)
532        samba.tests.delete_force(self.samdb, usr_dn)
533        self.samdb.add({
534            "dn": usr_dn,
535            "objectClass": "user",
536            "sAMAccountName": username})
537        obj = self.search_dn(usr_dn)
538        guid = obj["objectGUID"][0]
539        sid = obj["objectSID"][0]
540        obj_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
541        self.assertAttributesExists(self._expected_user_add_attributes(username, usr_dn, "Person"), obj)
542        self._check_metadata(obj_rmd["replPropertyMetaData"],
543                             self._expected_user_add_metadata())
544        self.samdb.delete(usr_dn)
545        obj_del = self.search_guid(guid)
546        obj_del_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
547        orig_attrs = set(obj.keys())
548        del_attrs = set(obj_del.keys())
549        self.assertAttributesExists(self._expected_user_del_attributes(username, guid, sid), obj_del)
550        self._check_metadata(obj_del_rmd["replPropertyMetaData"],
551                             self._expected_user_del_metadata())
552        # restore the user and fetch what's restored
553        self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn)
554        obj_restore = self.search_guid(guid)
555        obj_restore_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
556        # check original attributes and restored one are same
557        orig_attrs = set(obj.keys())
558        # windows restore more attributes that originally we have
559        orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent'])
560        rest_attrs = set(obj_restore.keys())
561        self.assertAttributesExists(self._expected_user_restore_attributes(username, guid, sid, usr_dn, "Person"), obj_restore)
562        self._check_metadata(obj_restore_rmd["replPropertyMetaData"],
563                             self._expected_user_restore_metadata())
564
565
566class RestoreUserPwdObjectTestCase(RestoredObjectAttributesBaseTestCase):
567    """Test cases for delete/reanimate user objects with password"""
568
569    def _expected_userpw_add_attributes(self, username, user_dn, category):
570        return {'dn': user_dn,
571                'objectClass': '**',
572                'cn': username,
573                'distinguishedName': user_dn,
574                'instanceType': '4',
575                'whenCreated': '**',
576                'whenChanged': '**',
577                'uSNCreated': '**',
578                'uSNChanged': '**',
579                'name': username,
580                'objectGUID': '**',
581                'userAccountControl': '546',
582                'badPwdCount': '0',
583                'badPasswordTime': '0',
584                'codePage': '0',
585                'countryCode': '0',
586                'lastLogon': '0',
587                'lastLogoff': '0',
588                'pwdLastSet': '**',
589                'primaryGroupID': '513',
590                'objectSid': '**',
591                'accountExpires': '9223372036854775807',
592                'logonCount': '0',
593                'sAMAccountName': username,
594                'sAMAccountType': '805306368',
595                'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
596                }
597
598    def _expected_userpw_add_metadata(self):
599        return [
600            (DRSUAPI_ATTID_objectClass, 1),
601            (DRSUAPI_ATTID_cn, 1),
602            (DRSUAPI_ATTID_instanceType, 1),
603            (DRSUAPI_ATTID_whenCreated, 1),
604            (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
605            (DRSUAPI_ATTID_name, 1),
606            (DRSUAPI_ATTID_userAccountControl, None),
607            (DRSUAPI_ATTID_codePage, 1),
608            (DRSUAPI_ATTID_countryCode, 1),
609            (DRSUAPI_ATTID_dBCSPwd, 1),
610            (DRSUAPI_ATTID_logonHours, 1),
611            (DRSUAPI_ATTID_unicodePwd, 1),
612            (DRSUAPI_ATTID_ntPwdHistory, 1),
613            (DRSUAPI_ATTID_pwdLastSet, 1),
614            (DRSUAPI_ATTID_primaryGroupID, 1),
615            (DRSUAPI_ATTID_supplementalCredentials, 1),
616            (DRSUAPI_ATTID_objectSid, 1),
617            (DRSUAPI_ATTID_accountExpires, 1),
618            (DRSUAPI_ATTID_lmPwdHistory, 1),
619            (DRSUAPI_ATTID_sAMAccountName, 1),
620            (DRSUAPI_ATTID_sAMAccountType, 1),
621            (DRSUAPI_ATTID_objectCategory, 1)]
622
623    def _expected_userpw_del_attributes(self, username, _guid, _sid):
624        guid = ndr_unpack(misc.GUID, _guid)
625        dn = "CN=%s\\0ADEL:%s,CN=Deleted Objects,%s" % (username, guid, self.base_dn)
626        cn = "%s\nDEL:%s" % (username, guid)
627        return {'dn': dn,
628                'objectClass': '**',
629                'cn': cn,
630                'distinguishedName': dn,
631                'isDeleted': 'TRUE',
632                'isRecycled': 'TRUE',
633                'instanceType': '4',
634                'whenCreated': '**',
635                'whenChanged': '**',
636                'uSNCreated': '**',
637                'uSNChanged': '**',
638                'name': cn,
639                'objectGUID': _guid,
640                'userAccountControl': '546',
641                'objectSid': _sid,
642                'sAMAccountName': username,
643                'lastKnownParent': 'CN=Users,%s' % self.base_dn,
644                }
645
646    def _expected_userpw_del_metadata(self):
647        return [
648            (DRSUAPI_ATTID_objectClass, 1),
649            (DRSUAPI_ATTID_cn, 2),
650            (DRSUAPI_ATTID_instanceType, 1),
651            (DRSUAPI_ATTID_whenCreated, 1),
652            (DRSUAPI_ATTID_isDeleted, 1),
653            (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
654            (DRSUAPI_ATTID_name, 2),
655            (DRSUAPI_ATTID_userAccountControl, None),
656            (DRSUAPI_ATTID_codePage, 2),
657            (DRSUAPI_ATTID_countryCode, 2),
658            (DRSUAPI_ATTID_dBCSPwd, 1),
659            (DRSUAPI_ATTID_logonHours, 1),
660            (DRSUAPI_ATTID_unicodePwd, 2),
661            (DRSUAPI_ATTID_ntPwdHistory, 2),
662            (DRSUAPI_ATTID_pwdLastSet, 2),
663            (DRSUAPI_ATTID_primaryGroupID, 2),
664            (DRSUAPI_ATTID_supplementalCredentials, 2),
665            (DRSUAPI_ATTID_objectSid, 1),
666            (DRSUAPI_ATTID_accountExpires, 2),
667            (DRSUAPI_ATTID_lmPwdHistory, 2),
668            (DRSUAPI_ATTID_sAMAccountName, 1),
669            (DRSUAPI_ATTID_sAMAccountType, 2),
670            (DRSUAPI_ATTID_lastKnownParent, 1),
671            (DRSUAPI_ATTID_objectCategory, 2),
672            (DRSUAPI_ATTID_isRecycled, 1)]
673
674    def _expected_userpw_restore_attributes(self, username, guid, sid, user_dn, category):
675        return {'dn': user_dn,
676                'objectClass': '**',
677                'cn': username,
678                'distinguishedName': user_dn,
679                'instanceType': '4',
680                'whenCreated': '**',
681                'whenChanged': '**',
682                'uSNCreated': '**',
683                'uSNChanged': '**',
684                'name': username,
685                'objectGUID': guid,
686                'userAccountControl': '546',
687                'badPwdCount': '0',
688                'badPasswordTime': '0',
689                'codePage': '0',
690                'countryCode': '0',
691                'lastLogon': '0',
692                'lastLogoff': '0',
693                'pwdLastSet': '**',
694                'primaryGroupID': '513',
695                'operatorCount': '0',
696                'objectSid': sid,
697                'adminCount': '0',
698                'accountExpires': '0',
699                'logonCount': '0',
700                'sAMAccountName': username,
701                'sAMAccountType': '805306368',
702                'lastKnownParent': 'CN=Users,%s' % self.base_dn,
703                'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
704                }
705
706    def _expected_userpw_restore_metadata(self):
707        return [
708            (DRSUAPI_ATTID_objectClass, 1),
709            (DRSUAPI_ATTID_cn, 3),
710            (DRSUAPI_ATTID_instanceType, 1),
711            (DRSUAPI_ATTID_whenCreated, 1),
712            (DRSUAPI_ATTID_isDeleted, 2),
713            (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
714            (DRSUAPI_ATTID_name, 3),
715            (DRSUAPI_ATTID_userAccountControl, None),
716            (DRSUAPI_ATTID_codePage, 3),
717            (DRSUAPI_ATTID_countryCode, 3),
718            (DRSUAPI_ATTID_dBCSPwd, 2),
719            (DRSUAPI_ATTID_logonHours, 1),
720            (DRSUAPI_ATTID_unicodePwd, 3),
721            (DRSUAPI_ATTID_ntPwdHistory, 3),
722            (DRSUAPI_ATTID_pwdLastSet, 4),
723            (DRSUAPI_ATTID_primaryGroupID, 3),
724            (DRSUAPI_ATTID_supplementalCredentials, 3),
725            (DRSUAPI_ATTID_operatorCount, 1),
726            (DRSUAPI_ATTID_objectSid, 1),
727            (DRSUAPI_ATTID_adminCount, 1),
728            (DRSUAPI_ATTID_accountExpires, 3),
729            (DRSUAPI_ATTID_lmPwdHistory, 3),
730            (DRSUAPI_ATTID_sAMAccountName, 1),
731            (DRSUAPI_ATTID_sAMAccountType, 3),
732            (DRSUAPI_ATTID_lastKnownParent, 1),
733            (DRSUAPI_ATTID_objectCategory, 3),
734            (DRSUAPI_ATTID_isRecycled, 2)]
735
736    def test_restorepw_user(self):
737        print("Test restored user attributes")
738        username = "restorepw_user"
739        usr_dn = "CN=%s,CN=Users,%s" % (username, self.base_dn)
740        samba.tests.delete_force(self.samdb, usr_dn)
741        self.samdb.add({
742            "dn": usr_dn,
743            "objectClass": "user",
744            "userPassword": "thatsAcomplPASS0",
745            "sAMAccountName": username})
746        obj = self.search_dn(usr_dn)
747        guid = obj["objectGUID"][0]
748        sid = obj["objectSID"][0]
749        obj_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
750        self.assertAttributesExists(self._expected_userpw_add_attributes(username, usr_dn, "Person"), obj)
751        self._check_metadata(obj_rmd["replPropertyMetaData"],
752                             self._expected_userpw_add_metadata())
753        self.samdb.delete(usr_dn)
754        obj_del = self.search_guid(guid)
755        obj_del_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
756        orig_attrs = set(obj.keys())
757        del_attrs = set(obj_del.keys())
758        self.assertAttributesExists(self._expected_userpw_del_attributes(username, guid, sid), obj_del)
759        self._check_metadata(obj_del_rmd["replPropertyMetaData"],
760                             self._expected_userpw_del_metadata())
761        # restore the user and fetch what's restored
762        self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn, {"userPassword": ["thatsAcomplPASS1"]})
763        obj_restore = self.search_guid(guid)
764        obj_restore_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
765        # check original attributes and restored one are same
766        orig_attrs = set(obj.keys())
767        # windows restore more attributes that originally we have
768        orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent'])
769        rest_attrs = set(obj_restore.keys())
770        self.assertAttributesExists(self._expected_userpw_restore_attributes(username, guid, sid, usr_dn, "Person"), obj_restore)
771        self._check_metadata(obj_restore_rmd["replPropertyMetaData"],
772                             self._expected_userpw_restore_metadata())
773
774
775class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase):
776    """Test different scenarios for delete/reanimate group objects"""
777
778    def _make_object_dn(self, name):
779        return "CN=%s,CN=Users,%s" % (name, self.base_dn)
780
781    def _create_test_user(self, user_name):
782        user_dn = self._make_object_dn(user_name)
783        ldif = {
784            "dn": user_dn,
785            "objectClass": "user",
786            "sAMAccountName": user_name,
787        }
788        # delete an object if leftover from previous test
789        samba.tests.delete_force(self.samdb, user_dn)
790        # finally, create the group
791        self.samdb.add(ldif)
792        return self.search_dn(user_dn)
793
794    def _create_test_group(self, group_name, members=None):
795        group_dn = self._make_object_dn(group_name)
796        ldif = {
797            "dn": group_dn,
798            "objectClass": "group",
799            "sAMAccountName": group_name,
800        }
801        try:
802            ldif["member"] = [str(usr_dn) for usr_dn in members]
803        except TypeError:
804            pass
805        # delete an object if leftover from previous test
806        samba.tests.delete_force(self.samdb, group_dn)
807        # finally, create the group
808        self.samdb.add(ldif)
809        return self.search_dn(group_dn)
810
811    def _expected_group_attributes(self, groupname, group_dn, category):
812        return {'dn': group_dn,
813                'groupType': '-2147483646',
814                'distinguishedName': group_dn,
815                'sAMAccountName': groupname,
816                'name': groupname,
817                'objectCategory': 'CN=%s,%s' % (category, self.schema_dn),
818                'objectClass': '**',
819                'objectGUID': '**',
820                'lastKnownParent': 'CN=Users,%s' % self.base_dn,
821                'whenChanged': '**',
822                'sAMAccountType': '268435456',
823                'objectSid': '**',
824                'whenCreated': '**',
825                'uSNCreated': '**',
826                'operatorCount': '0',
827                'uSNChanged': '**',
828                'instanceType': '4',
829                'adminCount': '0',
830                'cn': groupname}
831
832    def test_plain_group(self):
833        print("Test restored Group attributes")
834        # create test group
835        obj = self._create_test_group("r_group")
836        guid = obj["objectGUID"][0]
837        # delete the group
838        self.samdb.delete(str(obj.dn))
839        obj_del = self.search_guid(guid)
840        # restore the Group and fetch what's restored
841        self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
842        obj_restore = self.search_guid(guid)
843        # check original attributes and restored one are same
844        attr_orig = set(obj.keys())
845        # windows restore more attributes that originally we have
846        attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
847        attr_rest = set(obj_restore.keys())
848        self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
849        self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore)
850
851    def test_group_with_members(self):
852        print("Test restored Group with members attributes")
853        # create test group
854        usr1 = self._create_test_user("r_user_1")
855        usr2 = self._create_test_user("r_user_2")
856        obj = self._create_test_group("r_group", [usr1.dn, usr2.dn])
857        guid = obj["objectGUID"][0]
858        # delete the group
859        self.samdb.delete(str(obj.dn))
860        obj_del = self.search_guid(guid)
861        # restore the Group and fetch what's restored
862        self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
863        obj_restore = self.search_guid(guid)
864        # check original attributes and restored one are same
865        attr_orig = set(obj.keys())
866        # windows restore more attributes that originally we have
867        attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
868        # and does not restore following attributes
869        attr_orig.remove("member")
870        attr_rest = set(obj_restore.keys())
871        self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
872        self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore)
873
874
875class RestoreContainerObjectTestCase(RestoredObjectAttributesBaseTestCase):
876    """Test different scenarios for delete/reanimate OU/container objects"""
877
878    def _expected_container_attributes(self, rdn, name, dn, category):
879        if rdn == 'OU':
880            lastKnownParent = '%s' % self.base_dn
881        else:
882            lastKnownParent = 'CN=Users,%s' % self.base_dn
883        return {'dn': dn,
884                'distinguishedName': dn,
885                'name': name,
886                'objectCategory': 'CN=%s,%s' % (category, self.schema_dn),
887                'objectClass': '**',
888                'objectGUID': '**',
889                'lastKnownParent': lastKnownParent,
890                'whenChanged': '**',
891                'whenCreated': '**',
892                'uSNCreated': '**',
893                'uSNChanged': '**',
894                'instanceType': '4',
895                rdn.lower(): name}
896
897    def _create_test_ou(self, rdn, name=None, description=None):
898        ou_dn = "OU=%s,%s" % (rdn, self.base_dn)
899        # delete an object if leftover from previous test
900        samba.tests.delete_force(self.samdb, ou_dn)
901        # create ou and return created object
902        self.samdb.create_ou(ou_dn, name=name, description=description)
903        return self.search_dn(ou_dn)
904
905    def test_ou_with_name_description(self):
906        print("Test OU reanimation")
907        # create OU to test with
908        obj = self._create_test_ou(rdn="r_ou",
909                                   name="r_ou name",
910                                   description="r_ou description")
911        guid = obj["objectGUID"][0]
912        # delete the object
913        self.samdb.delete(str(obj.dn))
914        obj_del = self.search_guid(guid)
915        # restore the Object and fetch what's restored
916        self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
917        obj_restore = self.search_guid(guid)
918        # check original attributes and restored one are same
919        attr_orig = set(obj.keys())
920        attr_rest = set(obj_restore.keys())
921        # windows restore more attributes that originally we have
922        attr_orig.update(["lastKnownParent"])
923        # and does not restore following attributes
924        attr_orig -= set(["description"])
925        self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
926        expected_attrs = self._expected_container_attributes("OU", "r_ou", str(obj.dn), "Organizational-Unit")
927        self.assertAttributesExists(expected_attrs, obj_restore)
928
929    def test_container(self):
930        print("Test Container reanimation")
931        # create test Container
932        obj = self._create_object({
933            "dn": "CN=r_container,CN=Users,%s" % self.base_dn,
934            "objectClass": "container"
935        })
936        guid = obj["objectGUID"][0]
937        # delete the object
938        self.samdb.delete(str(obj.dn))
939        obj_del = self.search_guid(guid)
940        # restore the Object and fetch what's restored
941        self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
942        obj_restore = self.search_guid(guid)
943        # check original attributes and restored one are same
944        attr_orig = set(obj.keys())
945        attr_rest = set(obj_restore.keys())
946        # windows restore more attributes that originally we have
947        attr_orig.update(["lastKnownParent"])
948        # and does not restore following attributes
949        attr_orig -= set(["showInAdvancedViewOnly"])
950        self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
951        expected_attrs = self._expected_container_attributes("CN", "r_container",
952                                                             str(obj.dn), "Container")
953        self.assertAttributesExists(expected_attrs, obj_restore)
954
955
956if __name__ == '__main__':
957    unittest.main()
958