1#!/usr/bin/env python3 2# 3# Tombstone reanimation tests 4# 5# Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2014 6# Copyright (C) Nadezhda Ivanova <nivanova@symas.com> 2014 7# 8# This program is free software; you can redistribute it and/or modify 9# it under the terms of the GNU General Public License as published by 10# the Free Software Foundation; either version 3 of the License, or 11# (at your option) any later version. 12# 13# This program is distributed in the hope that it will be useful, 14# but WITHOUT ANY WARRANTY; without even the implied warranty of 15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16# GNU General Public License for more details. 17# 18# You should have received a copy of the GNU General Public License 19# along with this program. If not, see <http://www.gnu.org/licenses/>. 20 21from __future__ import print_function 22import sys 23import unittest 24 25sys.path.insert(0, "bin/python") 26import samba 27 28from samba.ndr import ndr_unpack, ndr_print 29from samba.dcerpc import misc 30from samba.dcerpc import security 31from samba.dcerpc import drsblobs 32from samba.dcerpc.drsuapi import * 33from samba.tests.password_test import PasswordCommon 34from samba.compat import get_string 35 36import samba.tests 37from ldb import (SCOPE_BASE, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE, Dn, Message, 38 MessageElement, LdbError, 39 ERR_ATTRIBUTE_OR_VALUE_EXISTS, ERR_NO_SUCH_OBJECT, ERR_ENTRY_ALREADY_EXISTS, 40 ERR_OPERATIONS_ERROR, ERR_UNWILLING_TO_PERFORM) 41 42 43class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase): 44 """ verify Samba restores required attributes when 45 user restores a Deleted object 46 """ 47 48 def setUp(self): 49 super(RestoredObjectAttributesBaseTestCase, self).setUp() 50 self.samdb = samba.tests.connect_samdb_env("TEST_SERVER", "TEST_USERNAME", "TEST_PASSWORD") 51 self.base_dn = self.samdb.domain_dn() 52 self.schema_dn = self.samdb.get_schema_basedn().get_linearized() 53 self.configuration_dn = self.samdb.get_config_basedn().get_linearized() 54 55 # permit password changes during this test 56 PasswordCommon.allow_password_changes(self, self.samdb) 57 58 def tearDown(self): 59 super(RestoredObjectAttributesBaseTestCase, self).tearDown() 60 61 def GUID_string(self, guid): 62 return get_string(self.samdb.schema_format_value("objectGUID", guid)) 63 64 def search_guid(self, guid, attrs=["*"]): 65 res = self.samdb.search(base="<GUID=%s>" % self.GUID_string(guid), 66 scope=SCOPE_BASE, attrs=attrs, 67 controls=["show_deleted:1"]) 68 self.assertEquals(len(res), 1) 69 return res[0] 70 71 def search_dn(self, dn): 72 res = self.samdb.search(expression="(objectClass=*)", 73 base=dn, 74 scope=SCOPE_BASE, 75 controls=["show_recycled:1"]) 76 self.assertEquals(len(res), 1) 77 return res[0] 78 79 def _create_object(self, msg): 80 """:param msg: dict with dn and attributes to create an object from""" 81 # delete an object if leftover from previous test 82 samba.tests.delete_force(self.samdb, msg['dn']) 83 self.samdb.add(msg) 84 return self.search_dn(msg['dn']) 85 86 def assertNamesEqual(self, attrs_expected, attrs_extra): 87 self.assertEqual(attrs_expected, attrs_extra, 88 "Actual object does not have expected attributes, missing from expected (%s), extra (%s)" 89 % (str(attrs_expected.difference(attrs_extra)), str(attrs_extra.difference(attrs_expected)))) 90 91 def assertAttributesEqual(self, obj_orig, attrs_orig, obj_restored, attrs_rest): 92 self.assertNamesEqual(attrs_orig, attrs_rest) 93 # remove volatile attributes, they can't be equal 94 attrs_orig -= set(["uSNChanged", "dSCorePropagationData", "whenChanged"]) 95 for attr in attrs_orig: 96 # convert original attr value to ldif 97 orig_val = obj_orig.get(attr) 98 if orig_val is None: 99 continue 100 if not isinstance(orig_val, MessageElement): 101 orig_val = MessageElement(str(orig_val), 0, attr) 102 m = Message() 103 m.add(orig_val) 104 orig_ldif = self.samdb.write_ldif(m, 0) 105 # convert restored attr value to ldif 106 rest_val = obj_restored.get(attr) 107 self.assertFalse(rest_val is None) 108 m = Message() 109 if not isinstance(rest_val, MessageElement): 110 rest_val = MessageElement(str(rest_val), 0, attr) 111 m.add(rest_val) 112 rest_ldif = self.samdb.write_ldif(m, 0) 113 # compare generated ldif's 114 self.assertEqual(orig_ldif, rest_ldif) 115 116 def assertAttributesExists(self, attr_expected, obj_msg): 117 """Check object contains at least expected attrbigutes 118 :param attr_expected: dict of expected attributes with values. ** is any value 119 :param obj_msg: Ldb.Message for the object under test 120 """ 121 actual_names = set(obj_msg.keys()) 122 # Samba does not use 'dSCorePropagationData', so skip it 123 actual_names -= set(['dSCorePropagationData']) 124 expected_names = set(attr_expected.keys()) 125 self.assertNamesEqual(expected_names, actual_names) 126 for name in attr_expected.keys(): 127 expected_val = attr_expected[name] 128 actual_val = obj_msg.get(name) 129 self.assertFalse(actual_val is None, "No value for attribute '%s'" % name) 130 if expected_val == "**": 131 # "**" values means "any" 132 continue 133 # if expected_val is e.g. ldb.bytes we can't depend on 134 # str(actual_value) working, we may just get a decoding 135 # error. Better to just compare raw values 136 if not isinstance(expected_val, str): 137 actual_val = actual_val[0] 138 else: 139 actual_val = str(actual_val) 140 self.assertEqual(expected_val, actual_val, 141 "Unexpected value (%s) for '%s', expected (%s)" % ( 142 repr(actual_val), name, repr(expected_val))) 143 144 def _check_metadata(self, metadata, expected): 145 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, metadata[0]) 146 147 repl_array = [] 148 for o in repl.ctr.array: 149 repl_array.append((o.attid, o.version)) 150 repl_set = set(repl_array) 151 152 expected_set = set(expected) 153 self.assertEqual(len(repl_set), len(expected), 154 "Unexpected metadata, missing from expected (%s), extra (%s)), repl: \n%s" % ( 155 str(expected_set.difference(repl_set)), 156 str(repl_set.difference(expected_set)), 157 ndr_print(repl))) 158 159 i = 0 160 for o in repl.ctr.array: 161 e = expected[i] 162 (attid, version) = e 163 self.assertEquals(attid, o.attid, 164 "(LDAP) Wrong attid " 165 "for expected value %d, wanted 0x%08x got 0x%08x, " 166 "repl: \n%s" 167 % (i, attid, o.attid, ndr_print(repl))) 168 # Allow version to be skipped when it does not matter 169 if version is not None: 170 self.assertEquals(o.version, version, 171 "(LDAP) Wrong version for expected value %d, " 172 "attid 0x%08x, " 173 "wanted %d got %d, repl: \n%s" 174 % (i, o.attid, 175 version, o.version, ndr_print(repl))) 176 i = i + 1 177 178 @staticmethod 179 def restore_deleted_object(samdb, del_dn, new_dn, new_attrs=None): 180 """Restores a deleted object 181 :param samdb: SamDB connection to SAM 182 :param del_dn: str Deleted object DN 183 :param new_dn: str Where to restore the object 184 :param new_attrs: dict Additional attributes to set 185 """ 186 msg = Message() 187 msg.dn = Dn(samdb, str(del_dn)) 188 msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted") 189 msg["distinguishedName"] = MessageElement([str(new_dn)], FLAG_MOD_REPLACE, "distinguishedName") 190 if new_attrs is not None: 191 assert isinstance(new_attrs, dict) 192 for attr in new_attrs: 193 msg[attr] = MessageElement(new_attrs[attr], FLAG_MOD_REPLACE, attr) 194 samdb.modify(msg, ["show_deleted:1"]) 195 196 197class BaseRestoreObjectTestCase(RestoredObjectAttributesBaseTestCase): 198 def setUp(self): 199 super(BaseRestoreObjectTestCase, self).setUp() 200 201 def enable_recycle_bin(self): 202 msg = Message() 203 msg.dn = Dn(self.samdb, "") 204 msg["enableOptionalFeature"] = MessageElement( 205 "CN=Partitions," + self.configuration_dn + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a", 206 FLAG_MOD_ADD, "enableOptionalFeature") 207 try: 208 self.samdb.modify(msg) 209 except LdbError as e: 210 (num, _) = e.args 211 self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) 212 213 def test_undelete(self): 214 print("Testing standard undelete operation") 215 usr1 = "cn=testuser,cn=users," + self.base_dn 216 samba.tests.delete_force(self.samdb, usr1) 217 self.samdb.add({ 218 "dn": usr1, 219 "objectclass": "user", 220 "description": "test user description", 221 "samaccountname": "testuser"}) 222 objLive1 = self.search_dn(usr1) 223 guid1 = objLive1["objectGUID"][0] 224 self.samdb.delete(usr1) 225 objDeleted1 = self.search_guid(guid1) 226 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1) 227 objLive2 = self.search_dn(usr1) 228 self.assertEqual(str(objLive2.dn).lower(), str(objLive1.dn).lower()) 229 samba.tests.delete_force(self.samdb, usr1) 230 231 def test_rename(self): 232 print("Testing attempt to rename deleted object") 233 usr1 = "cn=testuser,cn=users," + self.base_dn 234 self.samdb.add({ 235 "dn": usr1, 236 "objectclass": "user", 237 "description": "test user description", 238 "samaccountname": "testuser"}) 239 objLive1 = self.search_dn(usr1) 240 guid1 = objLive1["objectGUID"][0] 241 self.samdb.delete(usr1) 242 objDeleted1 = self.search_guid(guid1) 243 # just to make sure we get the correct error if the show deleted is missing 244 try: 245 self.samdb.rename(str(objDeleted1.dn), usr1) 246 self.fail() 247 except LdbError as e1: 248 (num, _) = e1.args 249 self.assertEquals(num, ERR_NO_SUCH_OBJECT) 250 251 try: 252 self.samdb.rename(str(objDeleted1.dn), usr1, ["show_deleted:1"]) 253 self.fail() 254 except LdbError as e2: 255 (num, _) = e2.args 256 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 257 258 def test_undelete_with_mod(self): 259 print("Testing standard undelete operation with modification of additional attributes") 260 usr1 = "cn=testuser,cn=users," + self.base_dn 261 self.samdb.add({ 262 "dn": usr1, 263 "objectclass": "user", 264 "description": "test user description", 265 "samaccountname": "testuser"}) 266 objLive1 = self.search_dn(usr1) 267 guid1 = objLive1["objectGUID"][0] 268 self.samdb.delete(usr1) 269 objDeleted1 = self.search_guid(guid1) 270 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1, {"url": "www.samba.org"}) 271 objLive2 = self.search_dn(usr1) 272 self.assertEqual(str(objLive2["url"][0]), "www.samba.org") 273 samba.tests.delete_force(self.samdb, usr1) 274 275 def test_undelete_newuser(self): 276 print("Testing undelete user with a different dn") 277 usr1 = "cn=testuser,cn=users," + self.base_dn 278 usr2 = "cn=testuser2,cn=users," + self.base_dn 279 samba.tests.delete_force(self.samdb, usr1) 280 self.samdb.add({ 281 "dn": usr1, 282 "objectclass": "user", 283 "description": "test user description", 284 "samaccountname": "testuser"}) 285 objLive1 = self.search_dn(usr1) 286 guid1 = objLive1["objectGUID"][0] 287 self.samdb.delete(usr1) 288 objDeleted1 = self.search_guid(guid1) 289 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr2) 290 objLive2 = self.search_dn(usr2) 291 samba.tests.delete_force(self.samdb, usr1) 292 samba.tests.delete_force(self.samdb, usr2) 293 294 def test_undelete_existing(self): 295 print("Testing undelete user after a user with the same dn has been created") 296 usr1 = "cn=testuser,cn=users," + self.base_dn 297 self.samdb.add({ 298 "dn": usr1, 299 "objectclass": "user", 300 "description": "test user description", 301 "samaccountname": "testuser"}) 302 objLive1 = self.search_dn(usr1) 303 guid1 = objLive1["objectGUID"][0] 304 self.samdb.delete(usr1) 305 self.samdb.add({ 306 "dn": usr1, 307 "objectclass": "user", 308 "description": "test user description", 309 "samaccountname": "testuser"}) 310 objDeleted1 = self.search_guid(guid1) 311 try: 312 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1) 313 self.fail() 314 except LdbError as e3: 315 (num, _) = e3.args 316 self.assertEquals(num, ERR_ENTRY_ALREADY_EXISTS) 317 318 def test_undelete_cross_nc(self): 319 print("Cross NC undelete") 320 c1 = "cn=ldaptestcontainer," + self.base_dn 321 c2 = "cn=ldaptestcontainer2," + self.configuration_dn 322 c3 = "cn=ldaptestcontainer," + self.configuration_dn 323 c4 = "cn=ldaptestcontainer2," + self.base_dn 324 samba.tests.delete_force(self.samdb, c1) 325 samba.tests.delete_force(self.samdb, c2) 326 samba.tests.delete_force(self.samdb, c3) 327 samba.tests.delete_force(self.samdb, c4) 328 self.samdb.add({ 329 "dn": c1, 330 "objectclass": "container"}) 331 self.samdb.add({ 332 "dn": c2, 333 "objectclass": "container"}) 334 objLive1 = self.search_dn(c1) 335 objLive2 = self.search_dn(c2) 336 guid1 = objLive1["objectGUID"][0] 337 guid2 = objLive2["objectGUID"][0] 338 self.samdb.delete(c1) 339 self.samdb.delete(c2) 340 objDeleted1 = self.search_guid(guid1) 341 objDeleted2 = self.search_guid(guid2) 342 # try to undelete from base dn to config 343 try: 344 self.restore_deleted_object(self.samdb, objDeleted1.dn, c3) 345 self.fail() 346 except LdbError as e4: 347 (num, _) = e4.args 348 self.assertEquals(num, ERR_OPERATIONS_ERROR) 349 # try to undelete from config to base dn 350 try: 351 self.restore_deleted_object(self.samdb, objDeleted2.dn, c4) 352 self.fail() 353 except LdbError as e5: 354 (num, _) = e5.args 355 self.assertEquals(num, ERR_OPERATIONS_ERROR) 356 # assert undeletion will work in same nc 357 self.restore_deleted_object(self.samdb, objDeleted1.dn, c4) 358 self.restore_deleted_object(self.samdb, objDeleted2.dn, c3) 359 360 361class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase): 362 """Test cases for delete/reanimate user objects""" 363 364 def _expected_user_add_attributes(self, username, user_dn, category): 365 return {'dn': user_dn, 366 'objectClass': '**', 367 'cn': username, 368 'distinguishedName': user_dn, 369 'instanceType': '4', 370 'whenCreated': '**', 371 'whenChanged': '**', 372 'uSNCreated': '**', 373 'uSNChanged': '**', 374 'name': username, 375 'objectGUID': '**', 376 'userAccountControl': '546', 377 'badPwdCount': '0', 378 'badPasswordTime': '0', 379 'codePage': '0', 380 'countryCode': '0', 381 'lastLogon': '0', 382 'lastLogoff': '0', 383 'pwdLastSet': '0', 384 'primaryGroupID': '513', 385 'objectSid': '**', 386 'accountExpires': '9223372036854775807', 387 'logonCount': '0', 388 'sAMAccountName': username, 389 'sAMAccountType': '805306368', 390 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn) 391 } 392 393 def _expected_user_add_metadata(self): 394 return [ 395 (DRSUAPI_ATTID_objectClass, 1), 396 (DRSUAPI_ATTID_cn, 1), 397 (DRSUAPI_ATTID_instanceType, 1), 398 (DRSUAPI_ATTID_whenCreated, 1), 399 (DRSUAPI_ATTID_ntSecurityDescriptor, 1), 400 (DRSUAPI_ATTID_name, 1), 401 (DRSUAPI_ATTID_userAccountControl, None), 402 (DRSUAPI_ATTID_codePage, 1), 403 (DRSUAPI_ATTID_countryCode, 1), 404 (DRSUAPI_ATTID_dBCSPwd, 1), 405 (DRSUAPI_ATTID_logonHours, 1), 406 (DRSUAPI_ATTID_unicodePwd, 1), 407 (DRSUAPI_ATTID_ntPwdHistory, 1), 408 (DRSUAPI_ATTID_pwdLastSet, 1), 409 (DRSUAPI_ATTID_primaryGroupID, 1), 410 (DRSUAPI_ATTID_objectSid, 1), 411 (DRSUAPI_ATTID_accountExpires, 1), 412 (DRSUAPI_ATTID_lmPwdHistory, 1), 413 (DRSUAPI_ATTID_sAMAccountName, 1), 414 (DRSUAPI_ATTID_sAMAccountType, 1), 415 (DRSUAPI_ATTID_objectCategory, 1)] 416 417 def _expected_user_del_attributes(self, username, _guid, _sid): 418 guid = ndr_unpack(misc.GUID, _guid) 419 dn = "CN=%s\\0ADEL:%s,CN=Deleted Objects,%s" % (username, guid, self.base_dn) 420 cn = "%s\nDEL:%s" % (username, guid) 421 return {'dn': dn, 422 'objectClass': '**', 423 'cn': cn, 424 'distinguishedName': dn, 425 'isDeleted': 'TRUE', 426 'isRecycled': 'TRUE', 427 'instanceType': '4', 428 'whenCreated': '**', 429 'whenChanged': '**', 430 'uSNCreated': '**', 431 'uSNChanged': '**', 432 'name': cn, 433 'objectGUID': _guid, 434 'userAccountControl': '546', 435 'objectSid': _sid, 436 'sAMAccountName': username, 437 'lastKnownParent': 'CN=Users,%s' % self.base_dn, 438 } 439 440 def _expected_user_del_metadata(self): 441 return [ 442 (DRSUAPI_ATTID_objectClass, 1), 443 (DRSUAPI_ATTID_cn, 2), 444 (DRSUAPI_ATTID_instanceType, 1), 445 (DRSUAPI_ATTID_whenCreated, 1), 446 (DRSUAPI_ATTID_isDeleted, 1), 447 (DRSUAPI_ATTID_ntSecurityDescriptor, 1), 448 (DRSUAPI_ATTID_name, 2), 449 (DRSUAPI_ATTID_userAccountControl, None), 450 (DRSUAPI_ATTID_codePage, 2), 451 (DRSUAPI_ATTID_countryCode, 2), 452 (DRSUAPI_ATTID_dBCSPwd, 1), 453 (DRSUAPI_ATTID_logonHours, 1), 454 (DRSUAPI_ATTID_unicodePwd, 1), 455 (DRSUAPI_ATTID_ntPwdHistory, 1), 456 (DRSUAPI_ATTID_pwdLastSet, 2), 457 (DRSUAPI_ATTID_primaryGroupID, 2), 458 (DRSUAPI_ATTID_objectSid, 1), 459 (DRSUAPI_ATTID_accountExpires, 2), 460 (DRSUAPI_ATTID_lmPwdHistory, 1), 461 (DRSUAPI_ATTID_sAMAccountName, 1), 462 (DRSUAPI_ATTID_sAMAccountType, 2), 463 (DRSUAPI_ATTID_lastKnownParent, 1), 464 (DRSUAPI_ATTID_objectCategory, 2), 465 (DRSUAPI_ATTID_isRecycled, 1)] 466 467 def _expected_user_restore_attributes(self, username, guid, sid, user_dn, category): 468 return {'dn': user_dn, 469 'objectClass': '**', 470 'cn': username, 471 'distinguishedName': user_dn, 472 'instanceType': '4', 473 'whenCreated': '**', 474 'whenChanged': '**', 475 'uSNCreated': '**', 476 'uSNChanged': '**', 477 'name': username, 478 'objectGUID': guid, 479 'userAccountControl': '546', 480 'badPwdCount': '0', 481 'badPasswordTime': '0', 482 'codePage': '0', 483 'countryCode': '0', 484 'lastLogon': '0', 485 'lastLogoff': '0', 486 'pwdLastSet': '0', 487 'primaryGroupID': '513', 488 'operatorCount': '0', 489 'objectSid': sid, 490 'adminCount': '0', 491 'accountExpires': '0', 492 'logonCount': '0', 493 'sAMAccountName': username, 494 'sAMAccountType': '805306368', 495 'lastKnownParent': 'CN=Users,%s' % self.base_dn, 496 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn) 497 } 498 499 def _expected_user_restore_metadata(self): 500 return [ 501 (DRSUAPI_ATTID_objectClass, 1), 502 (DRSUAPI_ATTID_cn, 3), 503 (DRSUAPI_ATTID_instanceType, 1), 504 (DRSUAPI_ATTID_whenCreated, 1), 505 (DRSUAPI_ATTID_isDeleted, 2), 506 (DRSUAPI_ATTID_ntSecurityDescriptor, 1), 507 (DRSUAPI_ATTID_name, 3), 508 (DRSUAPI_ATTID_userAccountControl, None), 509 (DRSUAPI_ATTID_codePage, 3), 510 (DRSUAPI_ATTID_countryCode, 3), 511 (DRSUAPI_ATTID_dBCSPwd, 1), 512 (DRSUAPI_ATTID_logonHours, 1), 513 (DRSUAPI_ATTID_unicodePwd, 1), 514 (DRSUAPI_ATTID_ntPwdHistory, 1), 515 (DRSUAPI_ATTID_pwdLastSet, 3), 516 (DRSUAPI_ATTID_primaryGroupID, 3), 517 (DRSUAPI_ATTID_operatorCount, 1), 518 (DRSUAPI_ATTID_objectSid, 1), 519 (DRSUAPI_ATTID_adminCount, 1), 520 (DRSUAPI_ATTID_accountExpires, 3), 521 (DRSUAPI_ATTID_lmPwdHistory, 1), 522 (DRSUAPI_ATTID_sAMAccountName, 1), 523 (DRSUAPI_ATTID_sAMAccountType, 3), 524 (DRSUAPI_ATTID_lastKnownParent, 1), 525 (DRSUAPI_ATTID_objectCategory, 3), 526 (DRSUAPI_ATTID_isRecycled, 2)] 527 528 def test_restore_user(self): 529 print("Test restored user attributes") 530 username = "restore_user" 531 usr_dn = "CN=%s,CN=Users,%s" % (username, self.base_dn) 532 samba.tests.delete_force(self.samdb, usr_dn) 533 self.samdb.add({ 534 "dn": usr_dn, 535 "objectClass": "user", 536 "sAMAccountName": username}) 537 obj = self.search_dn(usr_dn) 538 guid = obj["objectGUID"][0] 539 sid = obj["objectSID"][0] 540 obj_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"]) 541 self.assertAttributesExists(self._expected_user_add_attributes(username, usr_dn, "Person"), obj) 542 self._check_metadata(obj_rmd["replPropertyMetaData"], 543 self._expected_user_add_metadata()) 544 self.samdb.delete(usr_dn) 545 obj_del = self.search_guid(guid) 546 obj_del_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"]) 547 orig_attrs = set(obj.keys()) 548 del_attrs = set(obj_del.keys()) 549 self.assertAttributesExists(self._expected_user_del_attributes(username, guid, sid), obj_del) 550 self._check_metadata(obj_del_rmd["replPropertyMetaData"], 551 self._expected_user_del_metadata()) 552 # restore the user and fetch what's restored 553 self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn) 554 obj_restore = self.search_guid(guid) 555 obj_restore_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"]) 556 # check original attributes and restored one are same 557 orig_attrs = set(obj.keys()) 558 # windows restore more attributes that originally we have 559 orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent']) 560 rest_attrs = set(obj_restore.keys()) 561 self.assertAttributesExists(self._expected_user_restore_attributes(username, guid, sid, usr_dn, "Person"), obj_restore) 562 self._check_metadata(obj_restore_rmd["replPropertyMetaData"], 563 self._expected_user_restore_metadata()) 564 565 566class RestoreUserPwdObjectTestCase(RestoredObjectAttributesBaseTestCase): 567 """Test cases for delete/reanimate user objects with password""" 568 569 def _expected_userpw_add_attributes(self, username, user_dn, category): 570 return {'dn': user_dn, 571 'objectClass': '**', 572 'cn': username, 573 'distinguishedName': user_dn, 574 'instanceType': '4', 575 'whenCreated': '**', 576 'whenChanged': '**', 577 'uSNCreated': '**', 578 'uSNChanged': '**', 579 'name': username, 580 'objectGUID': '**', 581 'userAccountControl': '546', 582 'badPwdCount': '0', 583 'badPasswordTime': '0', 584 'codePage': '0', 585 'countryCode': '0', 586 'lastLogon': '0', 587 'lastLogoff': '0', 588 'pwdLastSet': '**', 589 'primaryGroupID': '513', 590 'objectSid': '**', 591 'accountExpires': '9223372036854775807', 592 'logonCount': '0', 593 'sAMAccountName': username, 594 'sAMAccountType': '805306368', 595 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn) 596 } 597 598 def _expected_userpw_add_metadata(self): 599 return [ 600 (DRSUAPI_ATTID_objectClass, 1), 601 (DRSUAPI_ATTID_cn, 1), 602 (DRSUAPI_ATTID_instanceType, 1), 603 (DRSUAPI_ATTID_whenCreated, 1), 604 (DRSUAPI_ATTID_ntSecurityDescriptor, 1), 605 (DRSUAPI_ATTID_name, 1), 606 (DRSUAPI_ATTID_userAccountControl, None), 607 (DRSUAPI_ATTID_codePage, 1), 608 (DRSUAPI_ATTID_countryCode, 1), 609 (DRSUAPI_ATTID_dBCSPwd, 1), 610 (DRSUAPI_ATTID_logonHours, 1), 611 (DRSUAPI_ATTID_unicodePwd, 1), 612 (DRSUAPI_ATTID_ntPwdHistory, 1), 613 (DRSUAPI_ATTID_pwdLastSet, 1), 614 (DRSUAPI_ATTID_primaryGroupID, 1), 615 (DRSUAPI_ATTID_supplementalCredentials, 1), 616 (DRSUAPI_ATTID_objectSid, 1), 617 (DRSUAPI_ATTID_accountExpires, 1), 618 (DRSUAPI_ATTID_lmPwdHistory, 1), 619 (DRSUAPI_ATTID_sAMAccountName, 1), 620 (DRSUAPI_ATTID_sAMAccountType, 1), 621 (DRSUAPI_ATTID_objectCategory, 1)] 622 623 def _expected_userpw_del_attributes(self, username, _guid, _sid): 624 guid = ndr_unpack(misc.GUID, _guid) 625 dn = "CN=%s\\0ADEL:%s,CN=Deleted Objects,%s" % (username, guid, self.base_dn) 626 cn = "%s\nDEL:%s" % (username, guid) 627 return {'dn': dn, 628 'objectClass': '**', 629 'cn': cn, 630 'distinguishedName': dn, 631 'isDeleted': 'TRUE', 632 'isRecycled': 'TRUE', 633 'instanceType': '4', 634 'whenCreated': '**', 635 'whenChanged': '**', 636 'uSNCreated': '**', 637 'uSNChanged': '**', 638 'name': cn, 639 'objectGUID': _guid, 640 'userAccountControl': '546', 641 'objectSid': _sid, 642 'sAMAccountName': username, 643 'lastKnownParent': 'CN=Users,%s' % self.base_dn, 644 } 645 646 def _expected_userpw_del_metadata(self): 647 return [ 648 (DRSUAPI_ATTID_objectClass, 1), 649 (DRSUAPI_ATTID_cn, 2), 650 (DRSUAPI_ATTID_instanceType, 1), 651 (DRSUAPI_ATTID_whenCreated, 1), 652 (DRSUAPI_ATTID_isDeleted, 1), 653 (DRSUAPI_ATTID_ntSecurityDescriptor, 1), 654 (DRSUAPI_ATTID_name, 2), 655 (DRSUAPI_ATTID_userAccountControl, None), 656 (DRSUAPI_ATTID_codePage, 2), 657 (DRSUAPI_ATTID_countryCode, 2), 658 (DRSUAPI_ATTID_dBCSPwd, 1), 659 (DRSUAPI_ATTID_logonHours, 1), 660 (DRSUAPI_ATTID_unicodePwd, 2), 661 (DRSUAPI_ATTID_ntPwdHistory, 2), 662 (DRSUAPI_ATTID_pwdLastSet, 2), 663 (DRSUAPI_ATTID_primaryGroupID, 2), 664 (DRSUAPI_ATTID_supplementalCredentials, 2), 665 (DRSUAPI_ATTID_objectSid, 1), 666 (DRSUAPI_ATTID_accountExpires, 2), 667 (DRSUAPI_ATTID_lmPwdHistory, 2), 668 (DRSUAPI_ATTID_sAMAccountName, 1), 669 (DRSUAPI_ATTID_sAMAccountType, 2), 670 (DRSUAPI_ATTID_lastKnownParent, 1), 671 (DRSUAPI_ATTID_objectCategory, 2), 672 (DRSUAPI_ATTID_isRecycled, 1)] 673 674 def _expected_userpw_restore_attributes(self, username, guid, sid, user_dn, category): 675 return {'dn': user_dn, 676 'objectClass': '**', 677 'cn': username, 678 'distinguishedName': user_dn, 679 'instanceType': '4', 680 'whenCreated': '**', 681 'whenChanged': '**', 682 'uSNCreated': '**', 683 'uSNChanged': '**', 684 'name': username, 685 'objectGUID': guid, 686 'userAccountControl': '546', 687 'badPwdCount': '0', 688 'badPasswordTime': '0', 689 'codePage': '0', 690 'countryCode': '0', 691 'lastLogon': '0', 692 'lastLogoff': '0', 693 'pwdLastSet': '**', 694 'primaryGroupID': '513', 695 'operatorCount': '0', 696 'objectSid': sid, 697 'adminCount': '0', 698 'accountExpires': '0', 699 'logonCount': '0', 700 'sAMAccountName': username, 701 'sAMAccountType': '805306368', 702 'lastKnownParent': 'CN=Users,%s' % self.base_dn, 703 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn) 704 } 705 706 def _expected_userpw_restore_metadata(self): 707 return [ 708 (DRSUAPI_ATTID_objectClass, 1), 709 (DRSUAPI_ATTID_cn, 3), 710 (DRSUAPI_ATTID_instanceType, 1), 711 (DRSUAPI_ATTID_whenCreated, 1), 712 (DRSUAPI_ATTID_isDeleted, 2), 713 (DRSUAPI_ATTID_ntSecurityDescriptor, 1), 714 (DRSUAPI_ATTID_name, 3), 715 (DRSUAPI_ATTID_userAccountControl, None), 716 (DRSUAPI_ATTID_codePage, 3), 717 (DRSUAPI_ATTID_countryCode, 3), 718 (DRSUAPI_ATTID_dBCSPwd, 2), 719 (DRSUAPI_ATTID_logonHours, 1), 720 (DRSUAPI_ATTID_unicodePwd, 3), 721 (DRSUAPI_ATTID_ntPwdHistory, 3), 722 (DRSUAPI_ATTID_pwdLastSet, 4), 723 (DRSUAPI_ATTID_primaryGroupID, 3), 724 (DRSUAPI_ATTID_supplementalCredentials, 3), 725 (DRSUAPI_ATTID_operatorCount, 1), 726 (DRSUAPI_ATTID_objectSid, 1), 727 (DRSUAPI_ATTID_adminCount, 1), 728 (DRSUAPI_ATTID_accountExpires, 3), 729 (DRSUAPI_ATTID_lmPwdHistory, 3), 730 (DRSUAPI_ATTID_sAMAccountName, 1), 731 (DRSUAPI_ATTID_sAMAccountType, 3), 732 (DRSUAPI_ATTID_lastKnownParent, 1), 733 (DRSUAPI_ATTID_objectCategory, 3), 734 (DRSUAPI_ATTID_isRecycled, 2)] 735 736 def test_restorepw_user(self): 737 print("Test restored user attributes") 738 username = "restorepw_user" 739 usr_dn = "CN=%s,CN=Users,%s" % (username, self.base_dn) 740 samba.tests.delete_force(self.samdb, usr_dn) 741 self.samdb.add({ 742 "dn": usr_dn, 743 "objectClass": "user", 744 "userPassword": "thatsAcomplPASS0", 745 "sAMAccountName": username}) 746 obj = self.search_dn(usr_dn) 747 guid = obj["objectGUID"][0] 748 sid = obj["objectSID"][0] 749 obj_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"]) 750 self.assertAttributesExists(self._expected_userpw_add_attributes(username, usr_dn, "Person"), obj) 751 self._check_metadata(obj_rmd["replPropertyMetaData"], 752 self._expected_userpw_add_metadata()) 753 self.samdb.delete(usr_dn) 754 obj_del = self.search_guid(guid) 755 obj_del_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"]) 756 orig_attrs = set(obj.keys()) 757 del_attrs = set(obj_del.keys()) 758 self.assertAttributesExists(self._expected_userpw_del_attributes(username, guid, sid), obj_del) 759 self._check_metadata(obj_del_rmd["replPropertyMetaData"], 760 self._expected_userpw_del_metadata()) 761 # restore the user and fetch what's restored 762 self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn, {"userPassword": ["thatsAcomplPASS1"]}) 763 obj_restore = self.search_guid(guid) 764 obj_restore_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"]) 765 # check original attributes and restored one are same 766 orig_attrs = set(obj.keys()) 767 # windows restore more attributes that originally we have 768 orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent']) 769 rest_attrs = set(obj_restore.keys()) 770 self.assertAttributesExists(self._expected_userpw_restore_attributes(username, guid, sid, usr_dn, "Person"), obj_restore) 771 self._check_metadata(obj_restore_rmd["replPropertyMetaData"], 772 self._expected_userpw_restore_metadata()) 773 774 775class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase): 776 """Test different scenarios for delete/reanimate group objects""" 777 778 def _make_object_dn(self, name): 779 return "CN=%s,CN=Users,%s" % (name, self.base_dn) 780 781 def _create_test_user(self, user_name): 782 user_dn = self._make_object_dn(user_name) 783 ldif = { 784 "dn": user_dn, 785 "objectClass": "user", 786 "sAMAccountName": user_name, 787 } 788 # delete an object if leftover from previous test 789 samba.tests.delete_force(self.samdb, user_dn) 790 # finally, create the group 791 self.samdb.add(ldif) 792 return self.search_dn(user_dn) 793 794 def _create_test_group(self, group_name, members=None): 795 group_dn = self._make_object_dn(group_name) 796 ldif = { 797 "dn": group_dn, 798 "objectClass": "group", 799 "sAMAccountName": group_name, 800 } 801 try: 802 ldif["member"] = [str(usr_dn) for usr_dn in members] 803 except TypeError: 804 pass 805 # delete an object if leftover from previous test 806 samba.tests.delete_force(self.samdb, group_dn) 807 # finally, create the group 808 self.samdb.add(ldif) 809 return self.search_dn(group_dn) 810 811 def _expected_group_attributes(self, groupname, group_dn, category): 812 return {'dn': group_dn, 813 'groupType': '-2147483646', 814 'distinguishedName': group_dn, 815 'sAMAccountName': groupname, 816 'name': groupname, 817 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn), 818 'objectClass': '**', 819 'objectGUID': '**', 820 'lastKnownParent': 'CN=Users,%s' % self.base_dn, 821 'whenChanged': '**', 822 'sAMAccountType': '268435456', 823 'objectSid': '**', 824 'whenCreated': '**', 825 'uSNCreated': '**', 826 'operatorCount': '0', 827 'uSNChanged': '**', 828 'instanceType': '4', 829 'adminCount': '0', 830 'cn': groupname} 831 832 def test_plain_group(self): 833 print("Test restored Group attributes") 834 # create test group 835 obj = self._create_test_group("r_group") 836 guid = obj["objectGUID"][0] 837 # delete the group 838 self.samdb.delete(str(obj.dn)) 839 obj_del = self.search_guid(guid) 840 # restore the Group and fetch what's restored 841 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn) 842 obj_restore = self.search_guid(guid) 843 # check original attributes and restored one are same 844 attr_orig = set(obj.keys()) 845 # windows restore more attributes that originally we have 846 attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent']) 847 attr_rest = set(obj_restore.keys()) 848 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest) 849 self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore) 850 851 def test_group_with_members(self): 852 print("Test restored Group with members attributes") 853 # create test group 854 usr1 = self._create_test_user("r_user_1") 855 usr2 = self._create_test_user("r_user_2") 856 obj = self._create_test_group("r_group", [usr1.dn, usr2.dn]) 857 guid = obj["objectGUID"][0] 858 # delete the group 859 self.samdb.delete(str(obj.dn)) 860 obj_del = self.search_guid(guid) 861 # restore the Group and fetch what's restored 862 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn) 863 obj_restore = self.search_guid(guid) 864 # check original attributes and restored one are same 865 attr_orig = set(obj.keys()) 866 # windows restore more attributes that originally we have 867 attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent']) 868 # and does not restore following attributes 869 attr_orig.remove("member") 870 attr_rest = set(obj_restore.keys()) 871 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest) 872 self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore) 873 874 875class RestoreContainerObjectTestCase(RestoredObjectAttributesBaseTestCase): 876 """Test different scenarios for delete/reanimate OU/container objects""" 877 878 def _expected_container_attributes(self, rdn, name, dn, category): 879 if rdn == 'OU': 880 lastKnownParent = '%s' % self.base_dn 881 else: 882 lastKnownParent = 'CN=Users,%s' % self.base_dn 883 return {'dn': dn, 884 'distinguishedName': dn, 885 'name': name, 886 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn), 887 'objectClass': '**', 888 'objectGUID': '**', 889 'lastKnownParent': lastKnownParent, 890 'whenChanged': '**', 891 'whenCreated': '**', 892 'uSNCreated': '**', 893 'uSNChanged': '**', 894 'instanceType': '4', 895 rdn.lower(): name} 896 897 def _create_test_ou(self, rdn, name=None, description=None): 898 ou_dn = "OU=%s,%s" % (rdn, self.base_dn) 899 # delete an object if leftover from previous test 900 samba.tests.delete_force(self.samdb, ou_dn) 901 # create ou and return created object 902 self.samdb.create_ou(ou_dn, name=name, description=description) 903 return self.search_dn(ou_dn) 904 905 def test_ou_with_name_description(self): 906 print("Test OU reanimation") 907 # create OU to test with 908 obj = self._create_test_ou(rdn="r_ou", 909 name="r_ou name", 910 description="r_ou description") 911 guid = obj["objectGUID"][0] 912 # delete the object 913 self.samdb.delete(str(obj.dn)) 914 obj_del = self.search_guid(guid) 915 # restore the Object and fetch what's restored 916 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn) 917 obj_restore = self.search_guid(guid) 918 # check original attributes and restored one are same 919 attr_orig = set(obj.keys()) 920 attr_rest = set(obj_restore.keys()) 921 # windows restore more attributes that originally we have 922 attr_orig.update(["lastKnownParent"]) 923 # and does not restore following attributes 924 attr_orig -= set(["description"]) 925 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest) 926 expected_attrs = self._expected_container_attributes("OU", "r_ou", str(obj.dn), "Organizational-Unit") 927 self.assertAttributesExists(expected_attrs, obj_restore) 928 929 def test_container(self): 930 print("Test Container reanimation") 931 # create test Container 932 obj = self._create_object({ 933 "dn": "CN=r_container,CN=Users,%s" % self.base_dn, 934 "objectClass": "container" 935 }) 936 guid = obj["objectGUID"][0] 937 # delete the object 938 self.samdb.delete(str(obj.dn)) 939 obj_del = self.search_guid(guid) 940 # restore the Object and fetch what's restored 941 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn) 942 obj_restore = self.search_guid(guid) 943 # check original attributes and restored one are same 944 attr_orig = set(obj.keys()) 945 attr_rest = set(obj_restore.keys()) 946 # windows restore more attributes that originally we have 947 attr_orig.update(["lastKnownParent"]) 948 # and does not restore following attributes 949 attr_orig -= set(["showInAdvancedViewOnly"]) 950 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest) 951 expected_attrs = self._expected_container_attributes("CN", "r_container", 952 str(obj.dn), "Container") 953 self.assertAttributesExists(expected_attrs, obj_restore) 954 955 956if __name__ == '__main__': 957 unittest.main() 958