1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Tests various schema replication scenarios 5# 6# Copyright (C) Catalyst.Net Ltd. 2017 7# 8# This program is free software; you can redistribute it and/or modify 9# it under the terms of the GNU General Public License as published by 10# the Free Software Foundation; either version 3 of the License, or 11# (at your option) any later version. 12# 13# This program is distributed in the hope that it will be useful, 14# but WITHOUT ANY WARRANTY; without even the implied warranty of 15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16# GNU General Public License for more details. 17# 18# You should have received a copy of the GNU General Public License 19# along with this program. If not, see <http://www.gnu.org/licenses/>. 20# 21 22# 23# Usage: 24# export DC1=dc1_dns_name 25# export DC2=dc2_dns_name 26# export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun 27# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN \ 28# getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD" 29# 30 31from __future__ import print_function 32import drs_base 33import samba.tests 34import ldb 35from ldb import SCOPE_BASE 36import random 37 38from samba.dcerpc import drsuapi 39 40 41class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): 42 def setUp(self): 43 super(DrsReplicaSyncIntegrityTestCase, self).setUp() 44 45 self.init_test_state() 46 47 # Note that DC2 is the DC with the testenv-specific quirks (e.g. it's 48 # the vampire_dc), so we point this test directly at that DC 49 self.set_test_ldb_dc(self.ldb_dc2) 50 51 self.ou = str(samba.tests.create_test_ou(self.test_ldb_dc, 52 "getncchanges")) 53 self.base_dn = self.test_ldb_dc.get_default_basedn() 54 55 self.default_conn = DcConnection(self, self.ldb_dc2, self.dnsname_dc2) 56 self.set_dc_connection(self.default_conn) 57 58 def tearDown(self): 59 super(DrsReplicaSyncIntegrityTestCase, self).tearDown() 60 # tidyup groups and users 61 try: 62 self.ldb_dc2.delete(self.ou, ["tree_delete:1"]) 63 except ldb.LdbError as e: 64 (enum, string) = e.args 65 if enum == ldb.ERR_NO_SUCH_OBJECT: 66 pass 67 68 def init_test_state(self): 69 self.rxd_dn_list = [] 70 self.rxd_links = [] 71 self.rxd_guids = [] 72 self.last_ctr = None 73 74 # 100 is the minimum max_objects that Microsoft seems to honour 75 # (the max honoured is 400ish), so we use that in these tests 76 self.max_objects = 100 77 78 # store whether we used GET_TGT/GET_ANC flags in the requests 79 self.used_get_tgt = False 80 self.used_get_anc = False 81 82 def add_object(self, dn, objectclass="organizationalunit"): 83 """Adds an OU object""" 84 self.test_ldb_dc.add({"dn": dn, "objectclass": objectclass}) 85 res = self.test_ldb_dc.search(base=dn, scope=SCOPE_BASE) 86 self.assertEquals(len(res), 1) 87 88 def modify_object(self, dn, attr, value): 89 """Modifies an object's USN by adding an attribute value to it""" 90 m = ldb.Message() 91 m.dn = ldb.Dn(self.test_ldb_dc, dn) 92 m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_ADD, attr) 93 self.test_ldb_dc.modify(m) 94 95 def delete_attribute(self, dn, attr, value): 96 """Deletes an attribute from an object""" 97 m = ldb.Message() 98 m.dn = ldb.Dn(self.test_ldb_dc, dn) 99 m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_DELETE, attr) 100 self.test_ldb_dc.modify(m) 101 102 def start_new_repl_cycle(self): 103 """Resets enough state info to start a new replication cycle""" 104 # reset rxd_links, but leave rxd_guids and rxd_dn_list alone so we know 105 # whether a parent/target is unknown and needs GET_ANC/GET_TGT to 106 # resolve 107 self.rxd_links = [] 108 109 self.used_get_tgt = False 110 self.used_get_anc = False 111 # mostly preserve self.last_ctr, so that we use the last HWM 112 if self.last_ctr is not None: 113 self.last_ctr.more_data = True 114 115 def create_object_range(self, start, end, prefix="", 116 children=None, parent_list=None): 117 """ 118 Creates a block of objects. Object names are numbered sequentially, 119 using the optional prefix supplied. If the children parameter is 120 supplied it will create a parent-child hierarchy and return the 121 top-level parents separately. 122 """ 123 dn_list = [] 124 125 # Use dummy/empty lists if we're not creating a parent/child hierarchy 126 if children is None: 127 children = [] 128 129 if parent_list is None: 130 parent_list = [] 131 132 # Create the parents first, then the children. 133 # This makes it easier to see in debug when GET_ANC takes effect 134 # because the parent/children become interleaved (by default, 135 # this approach means the objects are organized into blocks of 136 # parents and blocks of children together) 137 for x in range(start, end): 138 ou = "OU=test_ou_%s%d,%s" % (prefix, x, self.ou) 139 self.add_object(ou) 140 dn_list.append(ou) 141 142 # keep track of the top-level parents (if needed) 143 parent_list.append(ou) 144 145 # create the block of children (if needed) 146 for x in range(start, end): 147 for child in children: 148 ou = "OU=test_ou_child%s%d,%s" % (child, x, parent_list[x]) 149 self.add_object(ou) 150 dn_list.append(ou) 151 152 return dn_list 153 154 def assert_expected_data(self, expected_list): 155 """ 156 Asserts that we received all the DNs that we expected and 157 none are missing. 158 """ 159 received_list = self.rxd_dn_list 160 161 # Note that with GET_ANC Windows can end up sending the same parent 162 # object multiple times, so this might be noteworthy but doesn't 163 # warrant failing the test 164 num_received = len(received_list) 165 num_expected = len(expected_list) 166 if num_received != num_expected: 167 print("Note: received %d objects but expected %d" % (num_received, 168 num_expected)) 169 170 # Check that we received every object that we were expecting 171 for dn in expected_list: 172 self.assertTrue(dn in received_list, 173 "DN '%s' missing from replication." % dn) 174 175 def test_repl_integrity(self): 176 """ 177 Modify the objects being replicated while the replication is still 178 in progress and check that no object loss occurs. 179 """ 180 181 # The server behaviour differs between samba and Windows. Samba returns 182 # the objects in the original order (up to the pre-modify HWM). Windows 183 # incorporates the modified objects and returns them in the new order 184 # (i.e. modified objects last), up to the post-modify HWM. The 185 # Microsoft docs state the Windows behaviour is optional. 186 187 # Create a range of objects to replicate. 188 expected_dn_list = self.create_object_range(0, 400) 189 (orig_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc) 190 191 # We ask for the first page of 100 objects. 192 # For this test, we don't care what order we receive the objects in, 193 # so long as by the end we've received everything 194 self.repl_get_next() 195 196 # Modify some of the second page of objects. This should bump the 197 # highwatermark 198 for x in range(100, 200): 199 self.modify_object(expected_dn_list[x], "displayName", "OU%d" % x) 200 201 (post_modify_hwm, _) = self._get_highest_hwm_utdv(self.test_ldb_dc) 202 self.assertTrue(post_modify_hwm.highest_usn > orig_hwm.highest_usn) 203 204 # Get the remaining blocks of data 205 while not self.replication_complete(): 206 self.repl_get_next() 207 208 # Check we still receive all the objects we're expecting 209 self.assert_expected_data(expected_dn_list) 210 211 def is_parent_known(self, dn, known_dn_list): 212 """ 213 Returns True if the parent of the dn specified is in known_dn_list 214 """ 215 216 # we can sometimes get system objects like the RID Manager returned. 217 # Ignore anything that is not under the test OU we created 218 if self.ou not in dn: 219 return True 220 221 # Remove the child portion from the name to get the parent's DN 222 name_substrings = dn.split(",") 223 del name_substrings[0] 224 225 parent_dn = ",".join(name_substrings) 226 227 # check either this object is a parent (it's parent is the top-level 228 # test object), or its parent has been seen previously 229 return parent_dn == self.ou or parent_dn in known_dn_list 230 231 def _repl_send_request(self, get_anc=False, get_tgt=False): 232 """ 233 Sends a GetNCChanges request for the next block of replication data. 234 """ 235 236 # we're just trying to mimic regular client behaviour here, so just 237 # use the highwatermark in the last response we received 238 if self.last_ctr: 239 highwatermark = self.last_ctr.new_highwatermark 240 uptodateness_vector = self.last_ctr.uptodateness_vector 241 else: 242 # this is the first replication chunk 243 highwatermark = None 244 uptodateness_vector = None 245 246 # Ask for the next block of replication data 247 replica_flags = drsuapi.DRSUAPI_DRS_WRIT_REP 248 more_flags = 0 249 250 if get_anc: 251 replica_flags |= drsuapi.DRSUAPI_DRS_GET_ANC 252 self.used_get_anc = True 253 254 if get_tgt: 255 more_flags = drsuapi.DRSUAPI_DRS_GET_TGT 256 self.used_get_tgt = True 257 258 # return the response from the DC 259 return self._get_replication(replica_flags, 260 max_objects=self.max_objects, 261 highwatermark=highwatermark, 262 uptodateness_vector=uptodateness_vector, 263 264 more_flags=more_flags) 265 266 def repl_get_next(self, get_anc=False, get_tgt=False, assert_links=False): 267 """ 268 Requests the next block of replication data. This tries to simulate 269 client behaviour - if we receive a replicated object that we don't know 270 the parent of, then re-request the block with the GET_ANC flag set. 271 If we don't know the target object for a linked attribute, then 272 re-request with GET_TGT. 273 """ 274 275 # send a request to the DC and get the response 276 ctr6 = self._repl_send_request(get_anc=get_anc, get_tgt=get_tgt) 277 278 # extract the object DNs and their GUIDs from the response 279 rxd_dn_list = self._get_ctr6_dn_list(ctr6) 280 rxd_guid_list = self._get_ctr6_object_guids(ctr6) 281 282 # we'll add new objects as we discover them, so take a copy of the 283 # ones we already know about, so we can modify these lists safely 284 known_objects = self.rxd_dn_list[:] 285 known_guids = self.rxd_guids[:] 286 287 # check that we know the parent for every object received 288 for i in range(0, len(rxd_dn_list)): 289 290 dn = rxd_dn_list[i] 291 guid = rxd_guid_list[i] 292 293 if self.is_parent_known(dn, known_objects): 294 295 # the new DN is now known so add it to the list. 296 # It may be the parent of another child in this block 297 known_objects.append(dn) 298 known_guids.append(guid) 299 else: 300 # If we've already set the GET_ANC flag then it should mean 301 # we receive the parents before the child 302 self.assertFalse(get_anc, "Unknown parent for object %s" % dn) 303 304 print("Unknown parent for %s - try GET_ANC" % dn) 305 306 # try the same thing again with the GET_ANC flag set this time 307 return self.repl_get_next(get_anc=True, get_tgt=get_tgt, 308 assert_links=assert_links) 309 310 # check we know about references to any objects in the linked attrs 311 received_links = self._get_ctr6_links(ctr6) 312 313 # This is so that older versions of Samba fail - we want the links to 314 # be sent roughly with the objects, rather than getting all links at 315 # the end 316 if assert_links: 317 self.assertTrue(len(received_links) > 0, 318 "Links were expected in the GetNCChanges response") 319 320 for link in received_links: 321 322 # skip any links that aren't part of the test 323 if self.ou not in link.targetDN: 324 continue 325 326 # check the source object is known (Windows can actually send links 327 # where we don't know the source object yet). Samba shouldn't ever 328 # hit this case because it gets the links based on the source 329 if link.identifier not in known_guids: 330 331 # If we've already set the GET_ANC flag then it should mean 332 # this case doesn't happen 333 self.assertFalse(get_anc, "Unknown source object for GUID %s" 334 % link.identifier) 335 336 print("Unknown source GUID %s - try GET_ANC" % link.identifier) 337 338 # try the same thing again with the GET_ANC flag set this time 339 return self.repl_get_next(get_anc=True, get_tgt=get_tgt, 340 assert_links=assert_links) 341 342 # check we know the target object 343 if link.targetGUID not in known_guids: 344 345 # If we've already set the GET_TGT flag then we should have 346 # already received any objects we need to know about 347 self.assertFalse(get_tgt, "Unknown linked target for object %s" 348 % link.targetDN) 349 350 print("Unknown target for %s - try GET_TGT" % link.targetDN) 351 352 # try the same thing again with the GET_TGT flag set this time 353 return self.repl_get_next(get_anc=get_anc, get_tgt=True, 354 assert_links=assert_links) 355 356 # store the last successful result so we know what HWM to request next 357 self.last_ctr = ctr6 358 359 # store the objects, GUIDs, and links we received 360 self.rxd_dn_list += self._get_ctr6_dn_list(ctr6) 361 self.rxd_links += self._get_ctr6_links(ctr6) 362 self.rxd_guids += self._get_ctr6_object_guids(ctr6) 363 364 return ctr6 365 366 def replication_complete(self): 367 """Returns True if the current/last replication cycle is complete""" 368 369 if self.last_ctr is None or self.last_ctr.more_data: 370 return False 371 else: 372 return True 373 374 def test_repl_integrity_get_anc(self): 375 """ 376 Modify the parent objects being replicated while the replication is 377 still in progress (using GET_ANC) and check that no object loss occurs. 378 """ 379 380 # Note that GET_ANC behaviour varies between Windows and Samba. 381 # On Samba GET_ANC results in the replication restarting from the very 382 # beginning. After that, Samba remembers GET_ANC and also sends the 383 # parents in subsequent requests (regardless of whether GET_ANC is 384 # specified in the later request). 385 # Windows only sends the parents if GET_ANC was specified in the last 386 # request. It will also resend a parent, even if it's already sent the 387 # parent in a previous response (whereas Samba doesn't). 388 389 # Create a small block of 50 parents, each with 2 children (A and B) 390 # This is so that we receive some children in the first block, so we 391 # can resend with GET_ANC before we learn too many parents 392 parent_dn_list = [] 393 expected_dn_list = self.create_object_range(0, 50, prefix="parent", 394 children=("A", "B"), 395 parent_list=parent_dn_list) 396 397 # create the remaining parents and children 398 expected_dn_list += self.create_object_range(50, 150, prefix="parent", 399 children=("A", "B"), 400 parent_list=parent_dn_list) 401 402 # We've now got objects in the following order: 403 # [50 parents][100 children][100 parents][200 children] 404 405 # Modify the first parent so that it's now ordered last by USN 406 # This means we set the GET_ANC flag pretty much straight away 407 # because we receive the first child before the first parent 408 self.modify_object(parent_dn_list[0], "displayName", "OU0") 409 410 # modify a later block of parents so they also get reordered 411 for x in range(50, 100): 412 self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x) 413 414 # Get the first block of objects - this should resend the request with 415 # GET_ANC set because we won't know about the first child's parent. 416 # On samba GET_ANC essentially starts the sync from scratch again, so 417 # we get this over with early before we learn too many parents 418 self.repl_get_next() 419 420 # modify the last chunk of parents. They should now have a USN higher 421 # than the highwater-mark for the replication cycle 422 for x in range(100, 150): 423 self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x) 424 425 # Get the remaining blocks of data - this will resend the request with 426 # GET_ANC if it encounters an object it doesn't have the parent for. 427 while not self.replication_complete(): 428 self.repl_get_next() 429 430 # The way the test objects have been created should force 431 # self.repl_get_next() to use the GET_ANC flag. If this doesn't 432 # actually happen, then the test isn't doing its job properly 433 self.assertTrue(self.used_get_anc, 434 "Test didn't use the GET_ANC flag as expected") 435 436 # Check we get all the objects we're expecting 437 self.assert_expected_data(expected_dn_list) 438 439 def assert_expected_links(self, objects_with_links, link_attr="managedBy", 440 num_expected=None): 441 """ 442 Asserts that a GetNCChanges response contains any expected links 443 for the objects it contains. 444 """ 445 received_links = self.rxd_links 446 447 if num_expected is None: 448 num_expected = len(objects_with_links) 449 450 self.assertTrue(len(received_links) == num_expected, 451 "Received %d links but expected %d" 452 % (len(received_links), num_expected)) 453 454 for dn in objects_with_links: 455 self.assert_object_has_link(dn, link_attr, received_links) 456 457 def assert_object_has_link(self, dn, link_attr, received_links): 458 """ 459 Queries the object in the DB and asserts there is a link in the 460 GetNCChanges response that matches. 461 """ 462 463 # Look up the link attribute in the DB 464 # The extended_dn option will dump the GUID info for the link 465 # attribute (as a hex blob) 466 res = self.test_ldb_dc.search(ldb.Dn(self.test_ldb_dc, dn), 467 attrs=[link_attr], 468 controls=['extended_dn:1:0'], 469 scope=ldb.SCOPE_BASE) 470 471 # We didn't find the expected link attribute in the DB for the object. 472 # Something has gone wrong somewhere... 473 self.assertTrue(link_attr in res[0], 474 "%s in DB doesn't have attribute %s" % (dn, link_attr)) 475 476 # find the received link in the list and assert that the target and 477 # source GUIDs match what's in the DB 478 for val in [str(val) for val in res[0][link_attr]]: 479 # Work out the expected source and target GUIDs for the DB link 480 target_dn = ldb.Dn(self.test_ldb_dc, val) 481 targetGUID_blob = target_dn.get_extended_component("GUID") 482 sourceGUID_blob = res[0].dn.get_extended_component("GUID") 483 484 found = False 485 486 for link in received_links: 487 if link.selfGUID_blob == sourceGUID_blob and \ 488 link.targetGUID_blob == targetGUID_blob: 489 490 found = True 491 492 if self._debug: 493 print("Link %s --> %s" % (dn[:25], link.targetDN[:25])) 494 break 495 496 self.assertTrue(found, 497 "Did not receive expected link for DN %s" % dn) 498 499 def test_repl_get_tgt(self): 500 """ 501 Creates a scenario where we should receive the linked attribute before 502 we know about the target object, and therefore need to use GET_TGT. 503 Note: Samba currently avoids this problem by sending all its links last 504 """ 505 506 # create the test objects 507 reportees = self.create_object_range(0, 100, prefix="reportee") 508 managers = self.create_object_range(0, 100, prefix="manager") 509 all_objects = managers + reportees 510 expected_links = reportees 511 512 # add a link attribute to each reportee object that points to the 513 # corresponding manager object as the target 514 for i in range(0, 100): 515 self.modify_object(reportees[i], "managedBy", managers[i]) 516 517 # touch the managers (the link-target objects) again to make sure the 518 # reportees (link source objects) get returned first by the replication 519 for i in range(0, 100): 520 self.modify_object(managers[i], "displayName", "OU%d" % i) 521 522 links_expected = True 523 524 # Get all the replication data - this code should resend the requests 525 # with GET_TGT 526 while not self.replication_complete(): 527 528 # get the next block of replication data (this sets GET_TGT 529 # if needed) 530 self.repl_get_next(assert_links=links_expected) 531 links_expected = len(self.rxd_links) < len(expected_links) 532 533 # The way the test objects have been created should force 534 # self.repl_get_next() to use the GET_TGT flag. If this doesn't 535 # actually happen, then the test isn't doing its job properly 536 self.assertTrue(self.used_get_tgt, 537 "Test didn't use the GET_TGT flag as expected") 538 539 # Check we get all the objects we're expecting 540 self.assert_expected_data(all_objects) 541 542 # Check we received links for all the reportees 543 self.assert_expected_links(expected_links) 544 545 def test_repl_get_tgt_chain(self): 546 """ 547 Tests the behaviour of GET_TGT with a more complicated scenario. 548 Here we create a chain of objects linked together, so if we follow 549 the link target, then we'd traverse ~200 objects each time. 550 """ 551 552 # create the test objects 553 objectsA = self.create_object_range(0, 100, prefix="AAA") 554 objectsB = self.create_object_range(0, 100, prefix="BBB") 555 objectsC = self.create_object_range(0, 100, prefix="CCC") 556 557 # create a complex set of object links: 558 # A0-->B0-->C1-->B2-->C3-->B4-->and so on... 559 # Basically each object-A should link to a circular chain of 200 B/C 560 # objects. We create the links in separate chunks here, as it makes it 561 # clearer what happens with the USN (links on Windows have their own 562 # USN, so this approach means the A->B/B->C links aren't interleaved) 563 for i in range(0, 100): 564 self.modify_object(objectsA[i], "managedBy", objectsB[i]) 565 566 for i in range(0, 100): 567 self.modify_object(objectsB[i], "managedBy", 568 objectsC[(i + 1) % 100]) 569 570 for i in range(0, 100): 571 self.modify_object(objectsC[i], "managedBy", 572 objectsB[(i + 1) % 100]) 573 574 all_objects = objectsA + objectsB + objectsC 575 expected_links = all_objects 576 577 # the default order the objects now get returned in should be: 578 # [A0-A99][B0-B99][C0-C99] 579 580 links_expected = True 581 582 # Get all the replication data - this code should resend the requests 583 # with GET_TGT 584 while not self.replication_complete(): 585 586 # get the next block of replication data (this sets GET_TGT 587 # if needed) 588 self.repl_get_next(assert_links=links_expected) 589 links_expected = len(self.rxd_links) < len(expected_links) 590 591 # The way the test objects have been created should force 592 # self.repl_get_next() to use the GET_TGT flag. If this doesn't 593 # actually happen, then the test isn't doing its job properly 594 self.assertTrue(self.used_get_tgt, 595 "Test didn't use the GET_TGT flag as expected") 596 597 # Check we get all the objects we're expecting 598 self.assert_expected_data(all_objects) 599 600 # Check we received links for all the reportees 601 self.assert_expected_links(expected_links) 602 603 def test_repl_integrity_link_attr(self): 604 """ 605 Tests adding links to new objects while a replication is in progress. 606 """ 607 608 # create some source objects for the linked attributes, sandwiched 609 # between 2 blocks of filler objects 610 filler = self.create_object_range(0, 100, prefix="filler") 611 reportees = self.create_object_range(0, 100, prefix="reportee") 612 filler += self.create_object_range(100, 200, prefix="filler") 613 614 # Start the replication and get the first block of filler objects 615 # (We're being mean here and setting the GET_TGT flag right from the 616 # start. On earlier Samba versions, if the client encountered an 617 # unknown target object and retried with GET_TGT, it would restart the 618 # replication cycle from scratch, which avoids the problem). 619 self.repl_get_next(get_tgt=True) 620 621 # create the target objects and add the links. These objects should be 622 # outside the scope of the Samba replication cycle, but the links 623 # should still get sent with the source object 624 managers = self.create_object_range(0, 100, prefix="manager") 625 626 for i in range(0, 100): 627 self.modify_object(reportees[i], "managedBy", managers[i]) 628 629 expected_objects = managers + reportees + filler 630 expected_links = reportees 631 632 # complete the replication 633 while not self.replication_complete(): 634 self.repl_get_next(get_tgt=True) 635 636 # If we didn't receive the most recently created objects in the last 637 # replication cycle, then kick off another replication to get them 638 if len(self.rxd_dn_list) < len(expected_objects): 639 self.repl_get_next() 640 641 while not self.replication_complete(): 642 self.repl_get_next() 643 644 # Check we get all the objects we're expecting 645 self.assert_expected_data(expected_objects) 646 647 # Check we received links for all the parents 648 self.assert_expected_links(expected_links) 649 650 def test_repl_get_anc_link_attr(self): 651 """ 652 A basic GET_ANC test where the parents have linked attributes 653 """ 654 655 # Create a block of 100 parents and 100 children 656 parent_dn_list = [] 657 expected_dn_list = self.create_object_range(0, 100, prefix="parent", 658 children=("A"), 659 parent_list=parent_dn_list) 660 661 # Add links from the parents to the children 662 for x in range(0, 100): 663 self.modify_object(parent_dn_list[x], "managedBy", 664 expected_dn_list[x + 100]) 665 666 # add some filler objects at the end. This allows us to easily see 667 # which chunk the links get sent in 668 expected_dn_list += self.create_object_range(0, 100, prefix="filler") 669 670 # We've now got objects in the following order: 671 # [100 x children][100 x parents][100 x filler] 672 673 # Get the replication data - because the block of children come first, 674 # this should retry the request with GET_ANC 675 while not self.replication_complete(): 676 self.repl_get_next() 677 678 self.assertTrue(self.used_get_anc, 679 "Test didn't use the GET_ANC flag as expected") 680 681 # Check we get all the objects we're expecting 682 self.assert_expected_data(expected_dn_list) 683 684 # Check we received links for all the parents 685 self.assert_expected_links(parent_dn_list) 686 687 def test_repl_get_tgt_and_anc(self): 688 """ 689 Check we can resolve an unknown ancestor when fetching the link target, 690 i.e. tests using GET_TGT and GET_ANC in combination 691 """ 692 693 # Create some parent/child objects (the child will be the link target) 694 parents = [] 695 all_objects = self.create_object_range(0, 100, prefix="parent", 696 children=["la_tgt"], 697 parent_list=parents) 698 699 children = [item for item in all_objects if item not in parents] 700 701 # create the link source objects and link them to the child/target 702 la_sources = self.create_object_range(0, 100, prefix="la_src") 703 all_objects += la_sources 704 705 for i in range(0, 100): 706 self.modify_object(la_sources[i], "managedBy", children[i]) 707 708 expected_links = la_sources 709 710 # modify the children/targets so they come after the link source 711 for x in range(0, 100): 712 self.modify_object(children[x], "displayName", "OU%d" % x) 713 714 # modify the parents, so they now come last in the replication 715 for x in range(0, 100): 716 self.modify_object(parents[x], "displayName", "OU%d" % x) 717 718 # We've now got objects in the following order: 719 # [100 la_source][100 la_target][100 parents (of la_target)] 720 721 links_expected = True 722 723 # Get all the replication data - this code should resend the requests 724 # with GET_TGT and GET_ANC 725 while not self.replication_complete(): 726 727 # get the next block of replication data (this sets 728 # GET_TGT/GET_ANC) 729 self.repl_get_next(assert_links=links_expected) 730 links_expected = len(self.rxd_links) < len(expected_links) 731 732 # The way the test objects have been created should force 733 # self.repl_get_next() to use the GET_TGT/GET_ANC flags. If this 734 # doesn't actually happen, then the test isn't doing its job properly 735 self.assertTrue(self.used_get_tgt, 736 "Test didn't use the GET_TGT flag as expected") 737 self.assertTrue(self.used_get_anc, 738 "Test didn't use the GET_ANC flag as expected") 739 740 # Check we get all the objects we're expecting 741 self.assert_expected_data(all_objects) 742 743 # Check we received links for all the link sources 744 self.assert_expected_links(expected_links) 745 746 # Second part of test. Add some extra objects and kick off another 747 # replication. The test code will use the HWM from the last replication 748 # so we'll only receive the objects we modify below 749 self.start_new_repl_cycle() 750 751 # add an extra level of grandchildren that hang off a child 752 # that got created last time 753 new_parent = "OU=test_new_parent,%s" % children[0] 754 self.add_object(new_parent) 755 new_children = [] 756 757 for x in range(0, 50): 758 dn = "OU=test_new_la_tgt%d,%s" % (x, new_parent) 759 self.add_object(dn) 760 new_children.append(dn) 761 762 # replace half of the links to point to the new children 763 for x in range(0, 50): 764 self.delete_attribute(la_sources[x], "managedBy", children[x]) 765 self.modify_object(la_sources[x], "managedBy", new_children[x]) 766 767 # add some filler objects to fill up the 1st chunk 768 filler = self.create_object_range(0, 100, prefix="filler") 769 770 # modify the new children/targets so they come after the link source 771 for x in range(0, 50): 772 self.modify_object(new_children[x], "displayName", "OU-%d" % x) 773 774 # modify the parent, so it now comes last in the replication 775 self.modify_object(new_parent, "displayName", "OU%d" % x) 776 777 # We should now get the modified objects in the following order: 778 # [50 links (x 2)][100 filler][50 new children][new parent] 779 # Note that the link sources aren't actually sent (their new linked 780 # attributes are sent, but apart from that, nothing has changed) 781 all_objects = filler + new_children + [new_parent] 782 expected_links = la_sources[:50] 783 784 links_expected = True 785 786 while not self.replication_complete(): 787 self.repl_get_next(assert_links=links_expected) 788 links_expected = len(self.rxd_links) < len(expected_links) 789 790 self.assertTrue(self.used_get_tgt, 791 "Test didn't use the GET_TGT flag as expected") 792 self.assertTrue(self.used_get_anc, 793 "Test didn't use the GET_ANC flag as expected") 794 795 # Check we get all the objects we're expecting 796 self.assert_expected_data(all_objects) 797 798 # Check we received links (50 deleted links and 50 new) 799 self.assert_expected_links(expected_links, num_expected=100) 800 801 def _repl_integrity_obj_deletion(self, delete_link_source=True): 802 """ 803 Tests deleting link objects while a replication is in progress. 804 """ 805 806 # create some objects and link them together, with some filler 807 # object in between the link sources 808 la_sources = self.create_object_range(0, 100, prefix="la_source") 809 la_targets = self.create_object_range(0, 100, prefix="la_targets") 810 811 for i in range(0, 50): 812 self.modify_object(la_sources[i], "managedBy", la_targets[i]) 813 814 filler = self.create_object_range(0, 100, prefix="filler") 815 816 for i in range(50, 100): 817 self.modify_object(la_sources[i], "managedBy", la_targets[i]) 818 819 # touch the targets so that the sources get replicated first 820 for i in range(0, 100): 821 self.modify_object(la_targets[i], "displayName", "OU%d" % i) 822 823 # objects should now be in the following USN order: 824 # [50 la_source][100 filler][50 la_source][100 la_target] 825 826 # Get the first block containing 50 link sources 827 self.repl_get_next() 828 829 # delete either the link targets or link source objects 830 if delete_link_source: 831 objects_to_delete = la_sources 832 # in GET_TGT testenvs we only receive the first 50 source objects 833 expected_objects = la_sources[:50] + la_targets + filler 834 else: 835 objects_to_delete = la_targets 836 expected_objects = la_sources + filler 837 838 for obj in objects_to_delete: 839 self.ldb_dc2.delete(obj) 840 841 # complete the replication 842 while not self.replication_complete(): 843 self.repl_get_next() 844 845 # Check we get all the objects we're expecting 846 self.assert_expected_data(expected_objects) 847 848 # we can't use assert_expected_links() here because it tries to check 849 # against the deleted objects on the DC. (Although we receive some 850 # links from the first block processed, the Samba client should end up 851 # deleting these, as the source/target object involved is deleted) 852 self.assertTrue(len(self.rxd_links) == 50, 853 "Expected 50 links, not %d" % len(self.rxd_links)) 854 855 def test_repl_integrity_src_obj_deletion(self): 856 self._repl_integrity_obj_deletion(delete_link_source=True) 857 858 def test_repl_integrity_tgt_obj_deletion(self): 859 self._repl_integrity_obj_deletion(delete_link_source=False) 860 861 def restore_deleted_object(self, guid, new_dn): 862 """Re-animates a deleted object""" 863 864 guid_str = self._GUID_string(guid) 865 res = self.test_ldb_dc.search(base="<GUID=%s>" % guid_str, 866 attrs=["isDeleted"], 867 controls=['show_deleted:1'], 868 scope=ldb.SCOPE_BASE) 869 if len(res) != 1: 870 return 871 872 msg = ldb.Message() 873 msg.dn = res[0].dn 874 msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, 875 "isDeleted") 876 msg["distinguishedName"] = ldb.MessageElement([new_dn], 877 ldb.FLAG_MOD_REPLACE, 878 "distinguishedName") 879 self.test_ldb_dc.modify(msg, ["show_deleted:1"]) 880 881 def sync_DCs(self, nc_dn=None): 882 # make sure DC1 has all the changes we've made to DC2 883 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, 884 nc_dn=nc_dn) 885 886 def get_object_guid(self, dn): 887 res = self.test_ldb_dc.search(base=dn, attrs=["objectGUID"], 888 scope=ldb.SCOPE_BASE) 889 return res[0]['objectGUID'][0] 890 891 def set_dc_connection(self, conn): 892 """ 893 Switches over the connection state info that the underlying drs_base 894 class uses so that we replicate with a different DC. 895 """ 896 self.default_hwm = conn.default_hwm 897 self.default_utdv = conn.default_utdv 898 self.drs = conn.drs 899 self.drs_handle = conn.drs_handle 900 self.set_test_ldb_dc(conn.ldb_dc) 901 902 def assert_DCs_replication_is_consistent(self, peer_conn, all_objects, 903 expected_links): 904 """ 905 Replicates against both the primary and secondary DCs in the testenv 906 and checks that both return the expected results. 907 """ 908 print("Checking replication against primary test DC...") 909 910 # get the replication data from the test DC first 911 while not self.replication_complete(): 912 self.repl_get_next() 913 914 # Check we get all the objects and links we're expecting 915 self.assert_expected_data(all_objects) 916 self.assert_expected_links(expected_links) 917 918 # switch over the DC state info so we now talk to the peer DC 919 self.set_dc_connection(peer_conn) 920 self.init_test_state() 921 922 print("Checking replication against secondary test DC...") 923 924 # check that we get the same information from the 2nd DC 925 while not self.replication_complete(): 926 self.repl_get_next() 927 928 self.assert_expected_data(all_objects) 929 self.assert_expected_links(expected_links) 930 931 # switch back to using the default connection 932 self.set_dc_connection(self.default_conn) 933 934 def test_repl_integrity_obj_reanimation(self): 935 """ 936 Checks receiving links for a re-animated object doesn't lose links. 937 We test this against the peer DC to make sure it doesn't drop links. 938 """ 939 940 # This test is a little different in that we're particularly interested 941 # in exercising the replmd client code on the second DC. 942 # First, make sure the peer DC has the base OU, then connect to it (so 943 # we store its initial HWM) 944 self.sync_DCs() 945 peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1) 946 947 # create the link source/target objects 948 la_sources = self.create_object_range(0, 100, prefix="la_src") 949 la_targets = self.create_object_range(0, 100, prefix="la_tgt") 950 951 # store the target object's GUIDs (we need to know these to 952 # reanimate them) 953 target_guids = [] 954 955 for dn in la_targets: 956 target_guids.append(self.get_object_guid(dn)) 957 958 # delete the link target 959 for x in range(0, 100): 960 self.ldb_dc2.delete(la_targets[x]) 961 962 # sync the DCs, then disable replication. We want the peer DC to get 963 # all the following changes in a single replication cycle 964 self.sync_DCs() 965 self._disable_all_repl(self.dnsname_dc2) 966 967 # restore the target objects for the linked attributes again 968 for x in range(0, 100): 969 self.restore_deleted_object(target_guids[x], la_targets[x]) 970 971 # add the links 972 for x in range(0, 100): 973 self.modify_object(la_sources[x], "managedBy", la_targets[x]) 974 975 # create some additional filler objects 976 filler = self.create_object_range(0, 100, prefix="filler") 977 978 # modify the targets so they now come last 979 for x in range(0, 100): 980 self.modify_object(la_targets[x], "displayName", "OU-%d" % x) 981 982 # the objects should now be sent in the following order: 983 # [la sources + links][filler][la targets] 984 all_objects = la_sources + la_targets + filler 985 expected_links = la_sources 986 987 # Enable replication again make sure the 2 DCs are back in sync 988 self._enable_all_repl(self.dnsname_dc2) 989 self.sync_DCs() 990 991 # Get the replication data from each DC in turn. 992 # Check that both give us all the objects and links we're expecting, 993 # i.e. no links were lost 994 self.assert_DCs_replication_is_consistent(peer_conn, all_objects, 995 expected_links) 996 997 def _test_repl_integrity_cross_partition_links(self, get_tgt=False): 998 """ 999 Checks that a cross-partition link to an unknown target object does 1000 not result in missing links. 1001 """ 1002 1003 # check the peer DC is up-to-date, then connect (storing its HWM) 1004 self.sync_DCs() 1005 peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1) 1006 1007 # stop replication so the peer gets the following objects in one go 1008 self._disable_all_repl(self.dnsname_dc2) 1009 1010 # optionally force the client-side to use GET_TGT locally, by adding a 1011 # one-way link to a missing/deleted target object 1012 if get_tgt: 1013 missing_target = "OU=missing_tgt,%s" % self.ou 1014 self.add_object(missing_target) 1015 get_tgt_source = "CN=get_tgt_src,%s" % self.ou 1016 self.add_object(get_tgt_source, 1017 objectclass="msExchConfigurationContainer") 1018 self.modify_object(get_tgt_source, "addressBookRoots2", 1019 missing_target) 1020 self.test_ldb_dc.delete(missing_target) 1021 1022 # create a link source object in the main NC 1023 la_source = "OU=cross_nc_src,%s" % self.ou 1024 self.add_object(la_source) 1025 1026 # create the link target (a server object) in the config NC 1027 sites_dn = "CN=Sites,%s" % self.config_dn 1028 servers_dn = "CN=Servers,CN=Default-First-Site-Name,%s" % sites_dn 1029 rand = random.randint(1, 10000000) 1030 la_target = "CN=getncchanges-%d,%s" % (rand, servers_dn) 1031 self.add_object(la_target, objectclass="server") 1032 1033 # add a cross-partition link between the two 1034 self.modify_object(la_source, "managedBy", la_target) 1035 1036 # First, sync to the peer the NC containing the link source object 1037 self.sync_DCs() 1038 1039 # Now, before the peer has received the partition containing the target 1040 # object, try replicating from the peer. It will only know about half 1041 # of the link at this point, but it should be a valid scenario 1042 self.set_dc_connection(peer_conn) 1043 1044 while not self.replication_complete(): 1045 # pretend we've received other link targets out of order and that's 1046 # forced us to use GET_TGT. This checks the peer doesn't fail 1047 # trying to fetch a cross-partition target object that doesn't 1048 # exist 1049 self.repl_get_next(get_tgt=True) 1050 1051 self.set_dc_connection(self.default_conn) 1052 1053 # delete the GET_TGT test object. We're not interested in asserting its 1054 # links - it was just there to make the client use GET_TGT (and it 1055 # creates an inconsistency because one DC correctly ignores the link, 1056 # because it points to a deleted object) 1057 if get_tgt: 1058 self.test_ldb_dc.delete(get_tgt_source) 1059 1060 self.init_test_state() 1061 1062 # Now sync across the partition containing the link target object 1063 self.sync_DCs(nc_dn=self.config_dn) 1064 self._enable_all_repl(self.dnsname_dc2) 1065 1066 # Get the replication data from each DC in turn. 1067 # Check that both return the cross-partition link (note we're not 1068 # checking the config domain NC here for simplicity) 1069 self.assert_DCs_replication_is_consistent(peer_conn, 1070 all_objects=[la_source], 1071 expected_links=[la_source]) 1072 1073 # the cross-partition linked attribute has a missing backlink. Check 1074 # that we can still delete it successfully 1075 self.delete_attribute(la_source, "managedBy", la_target) 1076 self.sync_DCs() 1077 1078 res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source), 1079 attrs=["managedBy"], 1080 controls=['extended_dn:1:0'], 1081 scope=ldb.SCOPE_BASE) 1082 self.assertFalse("managedBy" in res[0], 1083 "%s in DB still has managedBy attribute" % la_source) 1084 res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc2, la_source), 1085 attrs=["managedBy"], 1086 controls=['extended_dn:1:0'], 1087 scope=ldb.SCOPE_BASE) 1088 self.assertFalse("managedBy" in res[0], 1089 "%s in DB still has managedBy attribute" % la_source) 1090 1091 # Check receiving a cross-partition link to a deleted target. 1092 # Delete the target and make sure the deletion is sync'd between DCs 1093 target_guid = self.get_object_guid(la_target) 1094 self.test_ldb_dc.delete(la_target) 1095 self.sync_DCs(nc_dn=self.config_dn) 1096 self._disable_all_repl(self.dnsname_dc2) 1097 1098 # re-animate the target 1099 self.restore_deleted_object(target_guid, la_target) 1100 self.modify_object(la_source, "managedBy", la_target) 1101 1102 # now sync the link - because the target is in another partition, the 1103 # peer DC receives a link for a deleted target, which it should accept 1104 self.sync_DCs() 1105 res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source), 1106 attrs=["managedBy"], 1107 controls=['extended_dn:1:0'], 1108 scope=ldb.SCOPE_BASE) 1109 self.assertTrue("managedBy" in res[0], 1110 "%s in DB missing managedBy attribute" % la_source) 1111 1112 # cleanup the server object we created in the Configuration partition 1113 self.test_ldb_dc.delete(la_target) 1114 self._enable_all_repl(self.dnsname_dc2) 1115 1116 def test_repl_integrity_cross_partition_links(self): 1117 self._test_repl_integrity_cross_partition_links(get_tgt=False) 1118 1119 def test_repl_integrity_cross_partition_links_with_tgt(self): 1120 self._test_repl_integrity_cross_partition_links(get_tgt=True) 1121 1122 def test_repl_get_tgt_multivalued_links(self): 1123 """Tests replication with multi-valued link attributes.""" 1124 1125 # create the target/source objects and link them together 1126 la_targets = self.create_object_range(0, 500, prefix="la_tgt") 1127 la_source = "CN=la_src,%s" % self.ou 1128 self.add_object(la_source, objectclass="msExchConfigurationContainer") 1129 1130 for tgt in la_targets: 1131 self.modify_object(la_source, "addressBookRoots2", tgt) 1132 1133 filler = self.create_object_range(0, 100, prefix="filler") 1134 1135 # We should receive the objects/links in the following order: 1136 # [500 targets + 1 source][500 links][100 filler] 1137 expected_objects = la_targets + [la_source] + filler 1138 link_only_chunk = False 1139 1140 # First do the replication without needing GET_TGT 1141 while not self.replication_complete(): 1142 ctr6 = self.repl_get_next() 1143 1144 if ctr6.object_count == 0 and ctr6.linked_attributes_count != 0: 1145 link_only_chunk = True 1146 1147 # we should receive one chunk that contains only links 1148 self.assertTrue(link_only_chunk, 1149 "Expected to receive a chunk containing only links") 1150 1151 # check we received all the expected objects/links 1152 self.assert_expected_data(expected_objects) 1153 self.assert_expected_links([la_source], link_attr="addressBookRoots2", 1154 num_expected=500) 1155 1156 # Do the replication again, forcing the use of GET_TGT this time 1157 self.init_test_state() 1158 1159 for x in range(0, 500): 1160 self.modify_object(la_targets[x], "displayName", "OU-%d" % x) 1161 1162 # The objects/links should get sent in the following order: 1163 # [1 source][500 targets][500 links][100 filler] 1164 1165 while not self.replication_complete(): 1166 ctr6 = self.repl_get_next() 1167 1168 self.assertTrue(self.used_get_tgt, 1169 "Test didn't use the GET_TGT flag as expected") 1170 1171 # check we received all the expected objects/links 1172 self.assert_expected_data(expected_objects) 1173 self.assert_expected_links([la_source], link_attr="addressBookRoots2", 1174 num_expected=500) 1175 1176 1177class DcConnection: 1178 """Helper class to track a connection to another DC""" 1179 1180 def __init__(self, drs_base, ldb_dc, dnsname_dc): 1181 self.ldb_dc = ldb_dc 1182 (self.drs, self.drs_handle) = drs_base._ds_bind(dnsname_dc) 1183 (self.default_hwm, utdv) = drs_base._get_highest_hwm_utdv(ldb_dc) 1184 self.default_utdv = utdv 1185