1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Tests various schema replication scenarios
5#
6# Copyright (C) Catalyst.Net Ltd. 2017
7#
8# This program is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 3 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program.  If not, see <http://www.gnu.org/licenses/>.
20#
21
22#
23# Usage:
24#  export DC1=dc1_dns_name
25#  export DC2=dc2_dns_name
26#  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27#  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN \
28#       getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
29#
30
31from __future__ import print_function
32import drs_base
33import samba.tests
34import ldb
35from ldb import SCOPE_BASE
36import random
37
38from samba.dcerpc import drsuapi
39
40
41class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
42    def setUp(self):
43        super(DrsReplicaSyncIntegrityTestCase, self).setUp()
44
45        self.init_test_state()
46
47        # Note that DC2 is the DC with the testenv-specific quirks (e.g. it's
48        # the vampire_dc), so we point this test directly at that DC
49        self.set_test_ldb_dc(self.ldb_dc2)
50
51        self.ou = str(samba.tests.create_test_ou(self.test_ldb_dc,
52                                                 "getncchanges"))
53        self.base_dn = self.test_ldb_dc.get_default_basedn()
54
55        self.default_conn = DcConnection(self, self.ldb_dc2, self.dnsname_dc2)
56        self.set_dc_connection(self.default_conn)
57
58    def tearDown(self):
59        super(DrsReplicaSyncIntegrityTestCase, self).tearDown()
60        # tidyup groups and users
61        try:
62            self.ldb_dc2.delete(self.ou, ["tree_delete:1"])
63        except ldb.LdbError as e:
64            (enum, string) = e.args
65            if enum == ldb.ERR_NO_SUCH_OBJECT:
66                pass
67
68    def init_test_state(self):
69        self.rxd_dn_list = []
70        self.rxd_links = []
71        self.rxd_guids = []
72        self.last_ctr = None
73
74        # 100 is the minimum max_objects that Microsoft seems to honour
75        # (the max honoured is 400ish), so we use that in these tests
76        self.max_objects = 100
77
78        # store whether we used GET_TGT/GET_ANC flags in the requests
79        self.used_get_tgt = False
80        self.used_get_anc = False
81
82    def add_object(self, dn, objectclass="organizationalunit"):
83        """Adds an OU object"""
84        self.test_ldb_dc.add({"dn": dn, "objectclass": objectclass})
85        res = self.test_ldb_dc.search(base=dn, scope=SCOPE_BASE)
86        self.assertEquals(len(res), 1)
87
88    def modify_object(self, dn, attr, value):
89        """Modifies an object's USN by adding an attribute value to it"""
90        m = ldb.Message()
91        m.dn = ldb.Dn(self.test_ldb_dc, dn)
92        m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_ADD, attr)
93        self.test_ldb_dc.modify(m)
94
95    def delete_attribute(self, dn, attr, value):
96        """Deletes an attribute from an object"""
97        m = ldb.Message()
98        m.dn = ldb.Dn(self.test_ldb_dc, dn)
99        m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_DELETE, attr)
100        self.test_ldb_dc.modify(m)
101
102    def start_new_repl_cycle(self):
103        """Resets enough state info to start a new replication cycle"""
104        # reset rxd_links, but leave rxd_guids and rxd_dn_list alone so we know
105        # whether a parent/target is unknown and needs GET_ANC/GET_TGT to
106        # resolve
107        self.rxd_links = []
108
109        self.used_get_tgt = False
110        self.used_get_anc = False
111        # mostly preserve self.last_ctr, so that we use the last HWM
112        if self.last_ctr is not None:
113            self.last_ctr.more_data = True
114
115    def create_object_range(self, start, end, prefix="",
116                            children=None, parent_list=None):
117        """
118        Creates a block of objects. Object names are numbered sequentially,
119        using the optional prefix supplied. If the children parameter is
120        supplied it will create a parent-child hierarchy and return the
121        top-level parents separately.
122        """
123        dn_list = []
124
125        # Use dummy/empty lists if we're not creating a parent/child hierarchy
126        if children is None:
127            children = []
128
129        if parent_list is None:
130            parent_list = []
131
132        # Create the parents first, then the children.
133        # This makes it easier to see in debug when GET_ANC takes effect
134        # because the parent/children become interleaved (by default,
135        # this approach means the objects are organized into blocks of
136        # parents and blocks of children together)
137        for x in range(start, end):
138            ou = "OU=test_ou_%s%d,%s" % (prefix, x, self.ou)
139            self.add_object(ou)
140            dn_list.append(ou)
141
142            # keep track of the top-level parents (if needed)
143            parent_list.append(ou)
144
145        # create the block of children (if needed)
146        for x in range(start, end):
147            for child in children:
148                ou = "OU=test_ou_child%s%d,%s" % (child, x, parent_list[x])
149                self.add_object(ou)
150                dn_list.append(ou)
151
152        return dn_list
153
154    def assert_expected_data(self, expected_list):
155        """
156        Asserts that we received all the DNs that we expected and
157        none are missing.
158        """
159        received_list = self.rxd_dn_list
160
161        # Note that with GET_ANC Windows can end up sending the same parent
162        # object multiple times, so this might be noteworthy but doesn't
163        # warrant failing the test
164        num_received = len(received_list)
165        num_expected = len(expected_list)
166        if num_received != num_expected:
167            print("Note: received %d objects but expected %d" % (num_received,
168                                                                 num_expected))
169
170        # Check that we received every object that we were expecting
171        for dn in expected_list:
172            self.assertTrue(dn in received_list,
173                            "DN '%s' missing from replication." % dn)
174
175    def test_repl_integrity(self):
176        """
177        Modify the objects being replicated while the replication is still
178        in progress and check that no object loss occurs.
179        """
180
181        # The server behaviour differs between samba and Windows. Samba returns
182        # the objects in the original order (up to the pre-modify HWM). Windows
183        # incorporates the modified objects and returns them in the new order
184        # (i.e. modified objects last), up to the post-modify HWM. The
185        # Microsoft docs state the Windows behaviour is optional.
186
187        # Create a range of objects to replicate.
188        expected_dn_list = self.create_object_range(0, 400)
189        (orig_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc)
190
191        # We ask for the first page of 100 objects.
192        # For this test, we don't care what order we receive the objects in,
193        # so long as by the end we've received everything
194        self.repl_get_next()
195
196        # Modify some of the second page of objects. This should bump the
197        # highwatermark
198        for x in range(100, 200):
199            self.modify_object(expected_dn_list[x], "displayName", "OU%d" % x)
200
201        (post_modify_hwm, _) = self._get_highest_hwm_utdv(self.test_ldb_dc)
202        self.assertTrue(post_modify_hwm.highest_usn > orig_hwm.highest_usn)
203
204        # Get the remaining blocks of data
205        while not self.replication_complete():
206            self.repl_get_next()
207
208        # Check we still receive all the objects we're expecting
209        self.assert_expected_data(expected_dn_list)
210
211    def is_parent_known(self, dn, known_dn_list):
212        """
213        Returns True if the parent of the dn specified is in known_dn_list
214        """
215
216        # we can sometimes get system objects like the RID Manager returned.
217        # Ignore anything that is not under the test OU we created
218        if self.ou not in dn:
219            return True
220
221        # Remove the child portion from the name to get the parent's DN
222        name_substrings = dn.split(",")
223        del name_substrings[0]
224
225        parent_dn = ",".join(name_substrings)
226
227        # check either this object is a parent (it's parent is the top-level
228        # test object), or its parent has been seen previously
229        return parent_dn == self.ou or parent_dn in known_dn_list
230
231    def _repl_send_request(self, get_anc=False, get_tgt=False):
232        """
233        Sends a GetNCChanges request for the next block of replication data.
234        """
235
236        # we're just trying to mimic regular client behaviour here, so just
237        # use the highwatermark in the last response we received
238        if self.last_ctr:
239            highwatermark = self.last_ctr.new_highwatermark
240            uptodateness_vector = self.last_ctr.uptodateness_vector
241        else:
242            # this is the first replication chunk
243            highwatermark = None
244            uptodateness_vector = None
245
246        # Ask for the next block of replication data
247        replica_flags = drsuapi.DRSUAPI_DRS_WRIT_REP
248        more_flags = 0
249
250        if get_anc:
251            replica_flags |= drsuapi.DRSUAPI_DRS_GET_ANC
252            self.used_get_anc = True
253
254        if get_tgt:
255            more_flags = drsuapi.DRSUAPI_DRS_GET_TGT
256            self.used_get_tgt = True
257
258        # return the response from the DC
259        return self._get_replication(replica_flags,
260                                     max_objects=self.max_objects,
261                                     highwatermark=highwatermark,
262                                     uptodateness_vector=uptodateness_vector,
263
264                                     more_flags=more_flags)
265
266    def repl_get_next(self, get_anc=False, get_tgt=False, assert_links=False):
267        """
268        Requests the next block of replication data. This tries to simulate
269        client behaviour - if we receive a replicated object that we don't know
270        the parent of, then re-request the block with the GET_ANC flag set.
271        If we don't know the target object for a linked attribute, then
272        re-request with GET_TGT.
273        """
274
275        # send a request to the DC and get the response
276        ctr6 = self._repl_send_request(get_anc=get_anc, get_tgt=get_tgt)
277
278        # extract the object DNs and their GUIDs from the response
279        rxd_dn_list = self._get_ctr6_dn_list(ctr6)
280        rxd_guid_list = self._get_ctr6_object_guids(ctr6)
281
282        # we'll add new objects as we discover them, so take a copy of the
283        # ones we already know about, so we can modify these lists safely
284        known_objects = self.rxd_dn_list[:]
285        known_guids = self.rxd_guids[:]
286
287        # check that we know the parent for every object received
288        for i in range(0, len(rxd_dn_list)):
289
290            dn = rxd_dn_list[i]
291            guid = rxd_guid_list[i]
292
293            if self.is_parent_known(dn, known_objects):
294
295                # the new DN is now known so add it to the list.
296                # It may be the parent of another child in this block
297                known_objects.append(dn)
298                known_guids.append(guid)
299            else:
300                # If we've already set the GET_ANC flag then it should mean
301                # we receive the parents before the child
302                self.assertFalse(get_anc, "Unknown parent for object %s" % dn)
303
304                print("Unknown parent for %s - try GET_ANC" % dn)
305
306                # try the same thing again with the GET_ANC flag set this time
307                return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
308                                          assert_links=assert_links)
309
310        # check we know about references to any objects in the linked attrs
311        received_links = self._get_ctr6_links(ctr6)
312
313        # This is so that older versions of Samba fail - we want the links to
314        # be sent roughly with the objects, rather than getting all links at
315        # the end
316        if assert_links:
317            self.assertTrue(len(received_links) > 0,
318                            "Links were expected in the GetNCChanges response")
319
320        for link in received_links:
321
322            # skip any links that aren't part of the test
323            if self.ou not in link.targetDN:
324                continue
325
326            # check the source object is known (Windows can actually send links
327            # where we don't know the source object yet). Samba shouldn't ever
328            # hit this case because it gets the links based on the source
329            if link.identifier not in known_guids:
330
331                # If we've already set the GET_ANC flag then it should mean
332                # this case doesn't happen
333                self.assertFalse(get_anc, "Unknown source object for GUID %s"
334                                 % link.identifier)
335
336                print("Unknown source GUID %s - try GET_ANC" % link.identifier)
337
338                # try the same thing again with the GET_ANC flag set this time
339                return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
340                                          assert_links=assert_links)
341
342            # check we know the target object
343            if link.targetGUID not in known_guids:
344
345                # If we've already set the GET_TGT flag then we should have
346                # already received any objects we need to know about
347                self.assertFalse(get_tgt, "Unknown linked target for object %s"
348                                 % link.targetDN)
349
350                print("Unknown target for %s - try GET_TGT" % link.targetDN)
351
352                # try the same thing again with the GET_TGT flag set this time
353                return self.repl_get_next(get_anc=get_anc, get_tgt=True,
354                                          assert_links=assert_links)
355
356        # store the last successful result so we know what HWM to request next
357        self.last_ctr = ctr6
358
359        # store the objects, GUIDs, and links we received
360        self.rxd_dn_list += self._get_ctr6_dn_list(ctr6)
361        self.rxd_links += self._get_ctr6_links(ctr6)
362        self.rxd_guids += self._get_ctr6_object_guids(ctr6)
363
364        return ctr6
365
366    def replication_complete(self):
367        """Returns True if the current/last replication cycle is complete"""
368
369        if self.last_ctr is None or self.last_ctr.more_data:
370            return False
371        else:
372            return True
373
374    def test_repl_integrity_get_anc(self):
375        """
376        Modify the parent objects being replicated while the replication is
377        still in progress (using GET_ANC) and check that no object loss occurs.
378        """
379
380        # Note that GET_ANC behaviour varies between Windows and Samba.
381        # On Samba GET_ANC results in the replication restarting from the very
382        # beginning. After that, Samba remembers GET_ANC and also sends the
383        # parents in subsequent requests (regardless of whether GET_ANC is
384        # specified in the later request).
385        # Windows only sends the parents if GET_ANC was specified in the last
386        # request. It will also resend a parent, even if it's already sent the
387        # parent in a previous response (whereas Samba doesn't).
388
389        # Create a small block of 50 parents, each with 2 children (A and B)
390        # This is so that we receive some children in the first block, so we
391        # can resend with GET_ANC before we learn too many parents
392        parent_dn_list = []
393        expected_dn_list = self.create_object_range(0, 50, prefix="parent",
394                                                    children=("A", "B"),
395                                                    parent_list=parent_dn_list)
396
397        # create the remaining parents and children
398        expected_dn_list += self.create_object_range(50, 150, prefix="parent",
399                                                     children=("A", "B"),
400                                                     parent_list=parent_dn_list)
401
402        # We've now got objects in the following order:
403        # [50 parents][100 children][100 parents][200 children]
404
405        # Modify the first parent so that it's now ordered last by USN
406        # This means we set the GET_ANC flag pretty much straight away
407        # because we receive the first child before the first parent
408        self.modify_object(parent_dn_list[0], "displayName", "OU0")
409
410        # modify a later block of parents so they also get reordered
411        for x in range(50, 100):
412            self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)
413
414        # Get the first block of objects - this should resend the request with
415        # GET_ANC set because we won't know about the first child's parent.
416        # On samba GET_ANC essentially starts the sync from scratch again, so
417        # we get this over with early before we learn too many parents
418        self.repl_get_next()
419
420        # modify the last chunk of parents. They should now have a USN higher
421        # than the highwater-mark for the replication cycle
422        for x in range(100, 150):
423            self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)
424
425        # Get the remaining blocks of data - this will resend the request with
426        # GET_ANC if it encounters an object it doesn't have the parent for.
427        while not self.replication_complete():
428            self.repl_get_next()
429
430        # The way the test objects have been created should force
431        # self.repl_get_next() to use the GET_ANC flag. If this doesn't
432        # actually happen, then the test isn't doing its job properly
433        self.assertTrue(self.used_get_anc,
434                        "Test didn't use the GET_ANC flag as expected")
435
436        # Check we get all the objects we're expecting
437        self.assert_expected_data(expected_dn_list)
438
439    def assert_expected_links(self, objects_with_links, link_attr="managedBy",
440                              num_expected=None):
441        """
442        Asserts that a GetNCChanges response contains any expected links
443        for the objects it contains.
444        """
445        received_links = self.rxd_links
446
447        if num_expected is None:
448            num_expected = len(objects_with_links)
449
450        self.assertTrue(len(received_links) == num_expected,
451                        "Received %d links but expected %d"
452                        % (len(received_links), num_expected))
453
454        for dn in objects_with_links:
455            self.assert_object_has_link(dn, link_attr, received_links)
456
457    def assert_object_has_link(self, dn, link_attr, received_links):
458        """
459        Queries the object in the DB and asserts there is a link in the
460        GetNCChanges response that matches.
461        """
462
463        # Look up the link attribute in the DB
464        # The extended_dn option will dump the GUID info for the link
465        # attribute (as a hex blob)
466        res = self.test_ldb_dc.search(ldb.Dn(self.test_ldb_dc, dn),
467                                      attrs=[link_attr],
468                                      controls=['extended_dn:1:0'],
469                                      scope=ldb.SCOPE_BASE)
470
471        # We didn't find the expected link attribute in the DB for the object.
472        # Something has gone wrong somewhere...
473        self.assertTrue(link_attr in res[0],
474                        "%s in DB doesn't have attribute %s" % (dn, link_attr))
475
476        # find the received link in the list and assert that the target and
477        # source GUIDs match what's in the DB
478        for val in [str(val) for val in res[0][link_attr]]:
479            # Work out the expected source and target GUIDs for the DB link
480            target_dn = ldb.Dn(self.test_ldb_dc, val)
481            targetGUID_blob = target_dn.get_extended_component("GUID")
482            sourceGUID_blob = res[0].dn.get_extended_component("GUID")
483
484            found = False
485
486            for link in received_links:
487                if link.selfGUID_blob == sourceGUID_blob and \
488                   link.targetGUID_blob == targetGUID_blob:
489
490                    found = True
491
492                    if self._debug:
493                        print("Link %s --> %s" % (dn[:25], link.targetDN[:25]))
494                    break
495
496            self.assertTrue(found,
497                            "Did not receive expected link for DN %s" % dn)
498
499    def test_repl_get_tgt(self):
500        """
501        Creates a scenario where we should receive the linked attribute before
502        we know about the target object, and therefore need to use GET_TGT.
503        Note: Samba currently avoids this problem by sending all its links last
504        """
505
506        # create the test objects
507        reportees = self.create_object_range(0, 100, prefix="reportee")
508        managers = self.create_object_range(0, 100, prefix="manager")
509        all_objects = managers + reportees
510        expected_links = reportees
511
512        # add a link attribute to each reportee object that points to the
513        # corresponding manager object as the target
514        for i in range(0, 100):
515            self.modify_object(reportees[i], "managedBy", managers[i])
516
517        # touch the managers (the link-target objects) again to make sure the
518        # reportees (link source objects) get returned first by the replication
519        for i in range(0, 100):
520            self.modify_object(managers[i], "displayName", "OU%d" % i)
521
522        links_expected = True
523
524        # Get all the replication data - this code should resend the requests
525        # with GET_TGT
526        while not self.replication_complete():
527
528            # get the next block of replication data (this sets GET_TGT
529            # if needed)
530            self.repl_get_next(assert_links=links_expected)
531            links_expected = len(self.rxd_links) < len(expected_links)
532
533        # The way the test objects have been created should force
534        # self.repl_get_next() to use the GET_TGT flag. If this doesn't
535        # actually happen, then the test isn't doing its job properly
536        self.assertTrue(self.used_get_tgt,
537                        "Test didn't use the GET_TGT flag as expected")
538
539        # Check we get all the objects we're expecting
540        self.assert_expected_data(all_objects)
541
542        # Check we received links for all the reportees
543        self.assert_expected_links(expected_links)
544
545    def test_repl_get_tgt_chain(self):
546        """
547        Tests the behaviour of GET_TGT with a more complicated scenario.
548        Here we create a chain of objects linked together, so if we follow
549        the link target, then we'd traverse ~200 objects each time.
550        """
551
552        # create the test objects
553        objectsA = self.create_object_range(0, 100, prefix="AAA")
554        objectsB = self.create_object_range(0, 100, prefix="BBB")
555        objectsC = self.create_object_range(0, 100, prefix="CCC")
556
557        # create a complex set of object links:
558        #   A0-->B0-->C1-->B2-->C3-->B4-->and so on...
559        # Basically each object-A should link to a circular chain of 200 B/C
560        # objects. We create the links in separate chunks here, as it makes it
561        # clearer what happens with the USN (links on Windows have their own
562        # USN, so this approach means the A->B/B->C links aren't interleaved)
563        for i in range(0, 100):
564            self.modify_object(objectsA[i], "managedBy", objectsB[i])
565
566        for i in range(0, 100):
567            self.modify_object(objectsB[i], "managedBy",
568                               objectsC[(i + 1) % 100])
569
570        for i in range(0, 100):
571            self.modify_object(objectsC[i], "managedBy",
572                               objectsB[(i + 1) % 100])
573
574        all_objects = objectsA + objectsB + objectsC
575        expected_links = all_objects
576
577        # the default order the objects now get returned in should be:
578        # [A0-A99][B0-B99][C0-C99]
579
580        links_expected = True
581
582        # Get all the replication data - this code should resend the requests
583        # with GET_TGT
584        while not self.replication_complete():
585
586            # get the next block of replication data (this sets GET_TGT
587            # if needed)
588            self.repl_get_next(assert_links=links_expected)
589            links_expected = len(self.rxd_links) < len(expected_links)
590
591        # The way the test objects have been created should force
592        # self.repl_get_next() to use the GET_TGT flag. If this doesn't
593        # actually happen, then the test isn't doing its job properly
594        self.assertTrue(self.used_get_tgt,
595                        "Test didn't use the GET_TGT flag as expected")
596
597        # Check we get all the objects we're expecting
598        self.assert_expected_data(all_objects)
599
600        # Check we received links for all the reportees
601        self.assert_expected_links(expected_links)
602
603    def test_repl_integrity_link_attr(self):
604        """
605        Tests adding links to new objects while a replication is in progress.
606        """
607
608        # create some source objects for the linked attributes, sandwiched
609        # between 2 blocks of filler objects
610        filler = self.create_object_range(0, 100, prefix="filler")
611        reportees = self.create_object_range(0, 100, prefix="reportee")
612        filler += self.create_object_range(100, 200, prefix="filler")
613
614        # Start the replication and get the first block of filler objects
615        # (We're being mean here and setting the GET_TGT flag right from the
616        # start. On earlier Samba versions, if the client encountered an
617        # unknown target object and retried with GET_TGT, it would restart the
618        # replication cycle from scratch, which avoids the problem).
619        self.repl_get_next(get_tgt=True)
620
621        # create the target objects and add the links. These objects should be
622        # outside the scope of the Samba replication cycle, but the links
623        # should still get sent with the source object
624        managers = self.create_object_range(0, 100, prefix="manager")
625
626        for i in range(0, 100):
627            self.modify_object(reportees[i], "managedBy", managers[i])
628
629        expected_objects = managers + reportees + filler
630        expected_links = reportees
631
632        # complete the replication
633        while not self.replication_complete():
634            self.repl_get_next(get_tgt=True)
635
636        # If we didn't receive the most recently created objects in the last
637        # replication cycle, then kick off another replication to get them
638        if len(self.rxd_dn_list) < len(expected_objects):
639            self.repl_get_next()
640
641            while not self.replication_complete():
642                self.repl_get_next()
643
644        # Check we get all the objects we're expecting
645        self.assert_expected_data(expected_objects)
646
647        # Check we received links for all the parents
648        self.assert_expected_links(expected_links)
649
650    def test_repl_get_anc_link_attr(self):
651        """
652        A basic GET_ANC test where the parents have linked attributes
653        """
654
655        # Create a block of 100 parents and 100 children
656        parent_dn_list = []
657        expected_dn_list = self.create_object_range(0, 100, prefix="parent",
658                                                    children=("A"),
659                                                    parent_list=parent_dn_list)
660
661        # Add links from the parents to the children
662        for x in range(0, 100):
663            self.modify_object(parent_dn_list[x], "managedBy",
664                               expected_dn_list[x + 100])
665
666        # add some filler objects at the end. This allows us to easily see
667        # which chunk the links get sent in
668        expected_dn_list += self.create_object_range(0, 100, prefix="filler")
669
670        # We've now got objects in the following order:
671        # [100 x children][100 x parents][100 x filler]
672
673        # Get the replication data - because the block of children come first,
674        # this should retry the request with GET_ANC
675        while not self.replication_complete():
676            self.repl_get_next()
677
678        self.assertTrue(self.used_get_anc,
679                        "Test didn't use the GET_ANC flag as expected")
680
681        # Check we get all the objects we're expecting
682        self.assert_expected_data(expected_dn_list)
683
684        # Check we received links for all the parents
685        self.assert_expected_links(parent_dn_list)
686
687    def test_repl_get_tgt_and_anc(self):
688        """
689        Check we can resolve an unknown ancestor when fetching the link target,
690        i.e. tests using GET_TGT and GET_ANC in combination
691        """
692
693        # Create some parent/child objects (the child will be the link target)
694        parents = []
695        all_objects = self.create_object_range(0, 100, prefix="parent",
696                                               children=["la_tgt"],
697                                               parent_list=parents)
698
699        children = [item for item in all_objects if item not in parents]
700
701        # create the link source objects and link them to the child/target
702        la_sources = self.create_object_range(0, 100, prefix="la_src")
703        all_objects += la_sources
704
705        for i in range(0, 100):
706            self.modify_object(la_sources[i], "managedBy", children[i])
707
708        expected_links = la_sources
709
710        # modify the children/targets so they come after the link source
711        for x in range(0, 100):
712            self.modify_object(children[x], "displayName", "OU%d" % x)
713
714        # modify the parents, so they now come last in the replication
715        for x in range(0, 100):
716            self.modify_object(parents[x], "displayName", "OU%d" % x)
717
718        # We've now got objects in the following order:
719        # [100 la_source][100 la_target][100 parents (of la_target)]
720
721        links_expected = True
722
723        # Get all the replication data - this code should resend the requests
724        # with GET_TGT and GET_ANC
725        while not self.replication_complete():
726
727            # get the next block of replication data (this sets
728            # GET_TGT/GET_ANC)
729            self.repl_get_next(assert_links=links_expected)
730            links_expected = len(self.rxd_links) < len(expected_links)
731
732        # The way the test objects have been created should force
733        # self.repl_get_next() to use the GET_TGT/GET_ANC flags. If this
734        # doesn't actually happen, then the test isn't doing its job properly
735        self.assertTrue(self.used_get_tgt,
736                        "Test didn't use the GET_TGT flag as expected")
737        self.assertTrue(self.used_get_anc,
738                        "Test didn't use the GET_ANC flag as expected")
739
740        # Check we get all the objects we're expecting
741        self.assert_expected_data(all_objects)
742
743        # Check we received links for all the link sources
744        self.assert_expected_links(expected_links)
745
746        # Second part of test. Add some extra objects and kick off another
747        # replication. The test code will use the HWM from the last replication
748        # so we'll only receive the objects we modify below
749        self.start_new_repl_cycle()
750
751        # add an extra level of grandchildren that hang off a child
752        # that got created last time
753        new_parent = "OU=test_new_parent,%s" % children[0]
754        self.add_object(new_parent)
755        new_children = []
756
757        for x in range(0, 50):
758            dn = "OU=test_new_la_tgt%d,%s" % (x, new_parent)
759            self.add_object(dn)
760            new_children.append(dn)
761
762        # replace half of the links to point to the new children
763        for x in range(0, 50):
764            self.delete_attribute(la_sources[x], "managedBy", children[x])
765            self.modify_object(la_sources[x], "managedBy", new_children[x])
766
767        # add some filler objects to fill up the 1st chunk
768        filler = self.create_object_range(0, 100, prefix="filler")
769
770        # modify the new children/targets so they come after the link source
771        for x in range(0, 50):
772            self.modify_object(new_children[x], "displayName", "OU-%d" % x)
773
774        # modify the parent, so it now comes last in the replication
775        self.modify_object(new_parent, "displayName", "OU%d" % x)
776
777        # We should now get the modified objects in the following order:
778        # [50 links (x 2)][100 filler][50 new children][new parent]
779        # Note that the link sources aren't actually sent (their new linked
780        # attributes are sent, but apart from that, nothing has changed)
781        all_objects = filler + new_children + [new_parent]
782        expected_links = la_sources[:50]
783
784        links_expected = True
785
786        while not self.replication_complete():
787            self.repl_get_next(assert_links=links_expected)
788            links_expected = len(self.rxd_links) < len(expected_links)
789
790        self.assertTrue(self.used_get_tgt,
791                        "Test didn't use the GET_TGT flag as expected")
792        self.assertTrue(self.used_get_anc,
793                        "Test didn't use the GET_ANC flag as expected")
794
795        # Check we get all the objects we're expecting
796        self.assert_expected_data(all_objects)
797
798        # Check we received links (50 deleted links and 50 new)
799        self.assert_expected_links(expected_links, num_expected=100)
800
801    def _repl_integrity_obj_deletion(self, delete_link_source=True):
802        """
803        Tests deleting link objects while a replication is in progress.
804        """
805
806        # create some objects and link them together, with some filler
807        # object in between the link sources
808        la_sources = self.create_object_range(0, 100, prefix="la_source")
809        la_targets = self.create_object_range(0, 100, prefix="la_targets")
810
811        for i in range(0, 50):
812            self.modify_object(la_sources[i], "managedBy", la_targets[i])
813
814        filler = self.create_object_range(0, 100, prefix="filler")
815
816        for i in range(50, 100):
817            self.modify_object(la_sources[i], "managedBy", la_targets[i])
818
819        # touch the targets so that the sources get replicated first
820        for i in range(0, 100):
821            self.modify_object(la_targets[i], "displayName", "OU%d" % i)
822
823        # objects should now be in the following USN order:
824        # [50 la_source][100 filler][50 la_source][100 la_target]
825
826        # Get the first block containing 50 link sources
827        self.repl_get_next()
828
829        # delete either the link targets or link source objects
830        if delete_link_source:
831            objects_to_delete = la_sources
832            # in GET_TGT testenvs we only receive the first 50 source objects
833            expected_objects = la_sources[:50] + la_targets + filler
834        else:
835            objects_to_delete = la_targets
836            expected_objects = la_sources + filler
837
838        for obj in objects_to_delete:
839            self.ldb_dc2.delete(obj)
840
841        # complete the replication
842        while not self.replication_complete():
843            self.repl_get_next()
844
845        # Check we get all the objects we're expecting
846        self.assert_expected_data(expected_objects)
847
848        # we can't use assert_expected_links() here because it tries to check
849        # against the deleted objects on the DC. (Although we receive some
850        # links from the first block processed, the Samba client should end up
851        # deleting these, as the source/target object involved is deleted)
852        self.assertTrue(len(self.rxd_links) == 50,
853                        "Expected 50 links, not %d" % len(self.rxd_links))
854
855    def test_repl_integrity_src_obj_deletion(self):
856        self._repl_integrity_obj_deletion(delete_link_source=True)
857
858    def test_repl_integrity_tgt_obj_deletion(self):
859        self._repl_integrity_obj_deletion(delete_link_source=False)
860
861    def restore_deleted_object(self, guid, new_dn):
862        """Re-animates a deleted object"""
863
864        guid_str = self._GUID_string(guid)
865        res = self.test_ldb_dc.search(base="<GUID=%s>" % guid_str,
866                                      attrs=["isDeleted"],
867                                      controls=['show_deleted:1'],
868                                      scope=ldb.SCOPE_BASE)
869        if len(res) != 1:
870            return
871
872        msg = ldb.Message()
873        msg.dn = res[0].dn
874        msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE,
875                                              "isDeleted")
876        msg["distinguishedName"] = ldb.MessageElement([new_dn],
877                                                      ldb.FLAG_MOD_REPLACE,
878                                                      "distinguishedName")
879        self.test_ldb_dc.modify(msg, ["show_deleted:1"])
880
881    def sync_DCs(self, nc_dn=None):
882        # make sure DC1 has all the changes we've made to DC2
883        self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2,
884                                nc_dn=nc_dn)
885
886    def get_object_guid(self, dn):
887        res = self.test_ldb_dc.search(base=dn, attrs=["objectGUID"],
888                                      scope=ldb.SCOPE_BASE)
889        return res[0]['objectGUID'][0]
890
891    def set_dc_connection(self, conn):
892        """
893        Switches over the connection state info that the underlying drs_base
894        class uses so that we replicate with a different DC.
895        """
896        self.default_hwm = conn.default_hwm
897        self.default_utdv = conn.default_utdv
898        self.drs = conn.drs
899        self.drs_handle = conn.drs_handle
900        self.set_test_ldb_dc(conn.ldb_dc)
901
902    def assert_DCs_replication_is_consistent(self, peer_conn, all_objects,
903                                             expected_links):
904        """
905        Replicates against both the primary and secondary DCs in the testenv
906        and checks that both return the expected results.
907        """
908        print("Checking replication against primary test DC...")
909
910        # get the replication data from the test DC first
911        while not self.replication_complete():
912            self.repl_get_next()
913
914        # Check we get all the objects and links we're expecting
915        self.assert_expected_data(all_objects)
916        self.assert_expected_links(expected_links)
917
918        # switch over the DC state info so we now talk to the peer DC
919        self.set_dc_connection(peer_conn)
920        self.init_test_state()
921
922        print("Checking replication against secondary test DC...")
923
924        # check that we get the same information from the 2nd DC
925        while not self.replication_complete():
926            self.repl_get_next()
927
928        self.assert_expected_data(all_objects)
929        self.assert_expected_links(expected_links)
930
931        # switch back to using the default connection
932        self.set_dc_connection(self.default_conn)
933
934    def test_repl_integrity_obj_reanimation(self):
935        """
936        Checks receiving links for a re-animated object doesn't lose links.
937        We test this against the peer DC to make sure it doesn't drop links.
938        """
939
940        # This test is a little different in that we're particularly interested
941        # in exercising the replmd client code on the second DC.
942        # First, make sure the peer DC has the base OU, then connect to it (so
943        # we store its initial HWM)
944        self.sync_DCs()
945        peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)
946
947        # create the link source/target objects
948        la_sources = self.create_object_range(0, 100, prefix="la_src")
949        la_targets = self.create_object_range(0, 100, prefix="la_tgt")
950
951        # store the target object's GUIDs (we need to know these to
952        # reanimate them)
953        target_guids = []
954
955        for dn in la_targets:
956            target_guids.append(self.get_object_guid(dn))
957
958        # delete the link target
959        for x in range(0, 100):
960            self.ldb_dc2.delete(la_targets[x])
961
962        # sync the DCs, then disable replication. We want the peer DC to get
963        # all the following changes in a single replication cycle
964        self.sync_DCs()
965        self._disable_all_repl(self.dnsname_dc2)
966
967        # restore the target objects for the linked attributes again
968        for x in range(0, 100):
969            self.restore_deleted_object(target_guids[x], la_targets[x])
970
971        # add the links
972        for x in range(0, 100):
973            self.modify_object(la_sources[x], "managedBy", la_targets[x])
974
975        # create some additional filler objects
976        filler = self.create_object_range(0, 100, prefix="filler")
977
978        # modify the targets so they now come last
979        for x in range(0, 100):
980            self.modify_object(la_targets[x], "displayName", "OU-%d" % x)
981
982        # the objects should now be sent in the following order:
983        # [la sources + links][filler][la targets]
984        all_objects = la_sources + la_targets + filler
985        expected_links = la_sources
986
987        # Enable replication again make sure the 2 DCs are back in sync
988        self._enable_all_repl(self.dnsname_dc2)
989        self.sync_DCs()
990
991        # Get the replication data from each DC in turn.
992        # Check that both give us all the objects and links we're expecting,
993        # i.e. no links were lost
994        self.assert_DCs_replication_is_consistent(peer_conn, all_objects,
995                                                  expected_links)
996
997    def _test_repl_integrity_cross_partition_links(self, get_tgt=False):
998        """
999        Checks that a cross-partition link to an unknown target object does
1000        not result in missing links.
1001        """
1002
1003        # check the peer DC is up-to-date, then connect (storing its HWM)
1004        self.sync_DCs()
1005        peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)
1006
1007        # stop replication so the peer gets the following objects in one go
1008        self._disable_all_repl(self.dnsname_dc2)
1009
1010        # optionally force the client-side to use GET_TGT locally, by adding a
1011        # one-way link to a missing/deleted target object
1012        if get_tgt:
1013            missing_target = "OU=missing_tgt,%s" % self.ou
1014            self.add_object(missing_target)
1015            get_tgt_source = "CN=get_tgt_src,%s" % self.ou
1016            self.add_object(get_tgt_source,
1017                            objectclass="msExchConfigurationContainer")
1018            self.modify_object(get_tgt_source, "addressBookRoots2",
1019                               missing_target)
1020            self.test_ldb_dc.delete(missing_target)
1021
1022        # create a link source object in the main NC
1023        la_source = "OU=cross_nc_src,%s" % self.ou
1024        self.add_object(la_source)
1025
1026        # create the link target (a server object) in the config NC
1027        sites_dn = "CN=Sites,%s" % self.config_dn
1028        servers_dn = "CN=Servers,CN=Default-First-Site-Name,%s" % sites_dn
1029        rand = random.randint(1, 10000000)
1030        la_target = "CN=getncchanges-%d,%s" % (rand, servers_dn)
1031        self.add_object(la_target, objectclass="server")
1032
1033        # add a cross-partition link between the two
1034        self.modify_object(la_source, "managedBy", la_target)
1035
1036        # First, sync to the peer the NC containing the link source object
1037        self.sync_DCs()
1038
1039        # Now, before the peer has received the partition containing the target
1040        # object, try replicating from the peer. It will only know about half
1041        # of the link at this point, but it should be a valid scenario
1042        self.set_dc_connection(peer_conn)
1043
1044        while not self.replication_complete():
1045            # pretend we've received other link targets out of order and that's
1046            # forced us to use GET_TGT. This checks the peer doesn't fail
1047            # trying to fetch a cross-partition target object that doesn't
1048            # exist
1049            self.repl_get_next(get_tgt=True)
1050
1051        self.set_dc_connection(self.default_conn)
1052
1053        # delete the GET_TGT test object. We're not interested in asserting its
1054        # links - it was just there to make the client use GET_TGT (and it
1055        # creates an inconsistency because one DC correctly ignores the link,
1056        # because it points to a deleted object)
1057        if get_tgt:
1058            self.test_ldb_dc.delete(get_tgt_source)
1059
1060        self.init_test_state()
1061
1062        # Now sync across the partition containing the link target object
1063        self.sync_DCs(nc_dn=self.config_dn)
1064        self._enable_all_repl(self.dnsname_dc2)
1065
1066        # Get the replication data from each DC in turn.
1067        # Check that both return the cross-partition link (note we're not
1068        # checking the config domain NC here for simplicity)
1069        self.assert_DCs_replication_is_consistent(peer_conn,
1070                                                  all_objects=[la_source],
1071                                                  expected_links=[la_source])
1072
1073        # the cross-partition linked attribute has a missing backlink. Check
1074        # that we can still delete it successfully
1075        self.delete_attribute(la_source, "managedBy", la_target)
1076        self.sync_DCs()
1077
1078        res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source),
1079                                      attrs=["managedBy"],
1080                                      controls=['extended_dn:1:0'],
1081                                      scope=ldb.SCOPE_BASE)
1082        self.assertFalse("managedBy" in res[0],
1083                         "%s in DB still has managedBy attribute" % la_source)
1084        res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc2, la_source),
1085                                      attrs=["managedBy"],
1086                                      controls=['extended_dn:1:0'],
1087                                      scope=ldb.SCOPE_BASE)
1088        self.assertFalse("managedBy" in res[0],
1089                         "%s in DB still has managedBy attribute" % la_source)
1090
1091        # Check receiving a cross-partition link to a deleted target.
1092        # Delete the target and make sure the deletion is sync'd between DCs
1093        target_guid = self.get_object_guid(la_target)
1094        self.test_ldb_dc.delete(la_target)
1095        self.sync_DCs(nc_dn=self.config_dn)
1096        self._disable_all_repl(self.dnsname_dc2)
1097
1098        # re-animate the target
1099        self.restore_deleted_object(target_guid, la_target)
1100        self.modify_object(la_source, "managedBy", la_target)
1101
1102        # now sync the link - because the target is in another partition, the
1103        # peer DC receives a link for a deleted target, which it should accept
1104        self.sync_DCs()
1105        res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source),
1106                                      attrs=["managedBy"],
1107                                      controls=['extended_dn:1:0'],
1108                                      scope=ldb.SCOPE_BASE)
1109        self.assertTrue("managedBy" in res[0],
1110                        "%s in DB missing managedBy attribute" % la_source)
1111
1112        # cleanup the server object we created in the Configuration partition
1113        self.test_ldb_dc.delete(la_target)
1114        self._enable_all_repl(self.dnsname_dc2)
1115
1116    def test_repl_integrity_cross_partition_links(self):
1117        self._test_repl_integrity_cross_partition_links(get_tgt=False)
1118
1119    def test_repl_integrity_cross_partition_links_with_tgt(self):
1120        self._test_repl_integrity_cross_partition_links(get_tgt=True)
1121
1122    def test_repl_get_tgt_multivalued_links(self):
1123        """Tests replication with multi-valued link attributes."""
1124
1125        # create the target/source objects and link them together
1126        la_targets = self.create_object_range(0, 500, prefix="la_tgt")
1127        la_source = "CN=la_src,%s" % self.ou
1128        self.add_object(la_source, objectclass="msExchConfigurationContainer")
1129
1130        for tgt in la_targets:
1131            self.modify_object(la_source, "addressBookRoots2", tgt)
1132
1133        filler = self.create_object_range(0, 100, prefix="filler")
1134
1135        # We should receive the objects/links in the following order:
1136        # [500 targets + 1 source][500 links][100 filler]
1137        expected_objects = la_targets + [la_source] + filler
1138        link_only_chunk = False
1139
1140        # First do the replication without needing GET_TGT
1141        while not self.replication_complete():
1142            ctr6 = self.repl_get_next()
1143
1144            if ctr6.object_count == 0 and ctr6.linked_attributes_count != 0:
1145                link_only_chunk = True
1146
1147        # we should receive one chunk that contains only links
1148        self.assertTrue(link_only_chunk,
1149                        "Expected to receive a chunk containing only links")
1150
1151        # check we received all the expected objects/links
1152        self.assert_expected_data(expected_objects)
1153        self.assert_expected_links([la_source], link_attr="addressBookRoots2",
1154                                   num_expected=500)
1155
1156        # Do the replication again, forcing the use of GET_TGT this time
1157        self.init_test_state()
1158
1159        for x in range(0, 500):
1160            self.modify_object(la_targets[x], "displayName", "OU-%d" % x)
1161
1162        # The objects/links should get sent in the following order:
1163        # [1 source][500 targets][500 links][100 filler]
1164
1165        while not self.replication_complete():
1166            ctr6 = self.repl_get_next()
1167
1168        self.assertTrue(self.used_get_tgt,
1169                        "Test didn't use the GET_TGT flag as expected")
1170
1171        # check we received all the expected objects/links
1172        self.assert_expected_data(expected_objects)
1173        self.assert_expected_links([la_source], link_attr="addressBookRoots2",
1174                                   num_expected=500)
1175
1176
1177class DcConnection:
1178    """Helper class to track a connection to another DC"""
1179
1180    def __init__(self, drs_base, ldb_dc, dnsname_dc):
1181        self.ldb_dc = ldb_dc
1182        (self.drs, self.drs_handle) = drs_base._ds_bind(dnsname_dc)
1183        (self.default_hwm, utdv) = drs_base._get_highest_hwm_utdv(ldb_dc)
1184        self.default_utdv = utdv
1185