1"""
2Copyright (c) 2008-2020, Jesus Cea Avion <jcea@jcea.es>
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without
6modification, are permitted provided that the following conditions
7are met:
8
9    1. Redistributions of source code must retain the above copyright
10    notice, this list of conditions and the following disclaimer.
11
12    2. Redistributions in binary form must reproduce the above
13    copyright notice, this list of conditions and the following
14    disclaimer in the documentation and/or other materials provided
15    with the distribution.
16
17    3. Neither the name of Jesus Cea Avion nor the names of its
18    contributors may be used to endorse or promote products derived
19    from this software without specific prior written permission.
20
21    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
22    CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
23    INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
24    MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
25    DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS
26    BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
27    EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
28            TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29            DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
30    ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
31    TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
32    THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33    SUCH DAMAGE.
34    """
35
36"""TestCases for distributed transactions.
37"""
38
39import os
40import time
41import unittest
42import sys
43
44from test_all import db, test_support, have_threads, verbose, \
45        get_new_environment_path, get_new_database_path
46
47
48#----------------------------------------------------------------------
49
50class DBReplication(unittest.TestCase) :
51    def setUp(self) :
52        self.homeDirMaster = get_new_environment_path()
53        self.homeDirClient = get_new_environment_path()
54
55        self.dbenvMaster = db.DBEnv()
56        self.dbenvClient = db.DBEnv()
57
58        # Must use "DB_THREAD" because the Replication Manager will
59        # be executed in other threads but will use the same environment.
60        # http://forums.oracle.com/forums/thread.jspa?threadID=645788&tstart=0
61        self.dbenvMaster.open(self.homeDirMaster, db.DB_CREATE | db.DB_INIT_TXN
62                | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
63                db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666)
64        self.dbenvClient.open(self.homeDirClient, db.DB_CREATE | db.DB_INIT_TXN
65                | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
66                db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666)
67
68        self.confirmed_master=self.client_startupdone=False
69        def confirmed_master(a,b,c) :
70            if b==db.DB_EVENT_REP_MASTER :
71                self.confirmed_master=True
72
73        def client_startupdone(a,b,c) :
74            if b==db.DB_EVENT_REP_STARTUPDONE :
75                self.client_startupdone=True
76
77        self.dbenvMaster.set_event_notify(confirmed_master)
78        self.dbenvClient.set_event_notify(client_startupdone)
79
80        #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
81        #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
82        #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
83        #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
84
85        self.dbMaster = self.dbClient = None
86
87
88    def tearDown(self):
89        if self.dbClient :
90            self.dbClient.close()
91        if self.dbMaster :
92            self.dbMaster.close()
93
94        # Here we assign dummy event handlers to allow GC of the test object.
95        # Since the dummy handler doesn't use any outer scope variable, it
96        # doesn't keep any reference to the test object.
97        def dummy(*args) :
98            pass
99        self.dbenvMaster.set_event_notify(dummy)
100        self.dbenvClient.set_event_notify(dummy)
101
102        self.dbenvClient.close()
103        self.dbenvMaster.close()
104        test_support.rmtree(self.homeDirClient)
105        test_support.rmtree(self.homeDirMaster)
106
107class DBReplicationManager(DBReplication) :
108    def test01_basic_replication(self) :
109        if sys.version_info < (3, 9):
110            find_unused_port = test_support.find_unused_port
111        else:
112            from test.support.socket_helper import find_unused_port
113        master_port = find_unused_port()
114        client_port = find_unused_port()
115
116        if db.version() >= (5, 2) :
117            self.site = self.dbenvMaster.repmgr_site("127.0.0.1", master_port)
118            self.site.set_config(db.DB_GROUP_CREATOR, True)
119            self.site.set_config(db.DB_LOCAL_SITE, True)
120            self.site2 = self.dbenvMaster.repmgr_site("127.0.0.1", client_port)
121
122            self.site3 = self.dbenvClient.repmgr_site("127.0.0.1", master_port)
123            self.site3.set_config(db.DB_BOOTSTRAP_HELPER, True)
124            self.site4 = self.dbenvClient.repmgr_site("127.0.0.1", client_port)
125            self.site4.set_config(db.DB_LOCAL_SITE, True)
126
127            d = {
128                    db.DB_BOOTSTRAP_HELPER: [False, False, True, False],
129                    db.DB_GROUP_CREATOR: [True, False, False, False],
130                    db.DB_LEGACY: [False, False, False, False],
131                    db.DB_LOCAL_SITE: [True, False, False, True],
132                    db.DB_REPMGR_PEER: [False, False, False, False ],
133                }
134
135            for i, j in d.items() :
136                for k, v in \
137                        zip([self.site, self.site2, self.site3, self.site4], j) :
138                    if v :
139                        self.assertTrue(k.get_config(i))
140                    else :
141                        self.assertFalse(k.get_config(i))
142
143            self.assertNotEqual(self.site.get_eid(), self.site2.get_eid())
144            self.assertNotEqual(self.site3.get_eid(), self.site4.get_eid())
145
146            for i, j in zip([self.site, self.site2, self.site3, self.site4], \
147                    [master_port, client_port, master_port, client_port]) :
148                addr = i.get_address()
149                self.assertEqual(addr, ("127.0.0.1", j))
150
151            for i in [self.site, self.site2] :
152                self.assertEqual(i.get_address(),
153                        self.dbenvMaster.repmgr_site_by_eid(i.get_eid()).get_address())
154            for i in [self.site3, self.site4] :
155                self.assertEqual(i.get_address(),
156                        self.dbenvClient.repmgr_site_by_eid(i.get_eid()).get_address())
157        else :
158            self.dbenvMaster.repmgr_set_local_site("127.0.0.1", master_port)
159            self.dbenvClient.repmgr_set_local_site("127.0.0.1", client_port)
160            self.dbenvMaster.repmgr_add_remote_site("127.0.0.1", client_port)
161            self.dbenvClient.repmgr_add_remote_site("127.0.0.1", master_port)
162
163            self.dbenvMaster.rep_set_nsites(2)
164            self.dbenvClient.rep_set_nsites(2)
165
166        self.dbenvMaster.rep_set_priority(10)
167        self.dbenvClient.rep_set_priority(0)
168
169        self.dbenvMaster.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100123)
170        self.dbenvClient.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100321)
171        self.assertEqual(self.dbenvMaster.rep_get_timeout(
172            db.DB_REP_CONNECTION_RETRY), 100123)
173        self.assertEqual(self.dbenvClient.rep_get_timeout(
174            db.DB_REP_CONNECTION_RETRY), 100321)
175
176        self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100234)
177        self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100432)
178        self.assertEqual(self.dbenvMaster.rep_get_timeout(
179            db.DB_REP_ELECTION_TIMEOUT), 100234)
180        self.assertEqual(self.dbenvClient.rep_get_timeout(
181            db.DB_REP_ELECTION_TIMEOUT), 100432)
182
183        self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100345)
184        self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100543)
185        self.assertEqual(self.dbenvMaster.rep_get_timeout(
186            db.DB_REP_ELECTION_RETRY), 100345)
187        self.assertEqual(self.dbenvClient.rep_get_timeout(
188            db.DB_REP_ELECTION_RETRY), 100543)
189
190        self.dbenvMaster.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
191        self.dbenvClient.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
192
193        self.dbenvMaster.repmgr_start(1, db.DB_REP_MASTER);
194        self.dbenvClient.repmgr_start(1, db.DB_REP_CLIENT);
195
196        self.assertEqual(self.dbenvMaster.rep_get_nsites(),2)
197        self.assertEqual(self.dbenvClient.rep_get_nsites(),2)
198        self.assertEqual(self.dbenvMaster.rep_get_priority(),10)
199        self.assertEqual(self.dbenvClient.rep_get_priority(),0)
200        self.assertEqual(self.dbenvMaster.repmgr_get_ack_policy(),
201                db.DB_REPMGR_ACKS_ALL)
202        self.assertEqual(self.dbenvClient.repmgr_get_ack_policy(),
203                db.DB_REPMGR_ACKS_ALL)
204
205        # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
206        # is not generated if the master has no new transactions.
207        # This is solved in BDB 4.6 (#15542).
208        import time
209        timeout = time.time()+10
210        while (time.time()<timeout) and not (self.confirmed_master and self.client_startupdone) :
211            time.sleep(0.02)
212        self.assertTrue(time.time()<timeout)
213
214        d = self.dbenvMaster.repmgr_site_list()
215        self.assertEqual(len(d), 1)
216        d = d.values()[0]  # There is only one
217        self.assertEqual(d[0], "127.0.0.1")
218        self.assertEqual(d[1], client_port)
219        self.assertTrue((d[2]==db.DB_REPMGR_CONNECTED) or \
220                (d[2]==db.DB_REPMGR_DISCONNECTED))
221
222        d = self.dbenvClient.repmgr_site_list()
223        self.assertEqual(len(d), 1)
224        d = d.values()[0]  # There is only one
225        self.assertEqual(d[0], "127.0.0.1")
226        self.assertEqual(d[1], master_port)
227        self.assertTrue((d[2]==db.DB_REPMGR_CONNECTED) or \
228                (d[2]==db.DB_REPMGR_DISCONNECTED))
229
230        d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR);
231        self.assertTrue("msgs_queued" in d)
232
233        self.dbMaster=db.DB(self.dbenvMaster)
234        txn=self.dbenvMaster.txn_begin()
235        self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
236        txn.commit()
237
238        import time,os.path
239        timeout=time.time()+10
240        while (time.time()<timeout) and \
241          not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
242            time.sleep(0.01)
243
244        self.dbClient=db.DB(self.dbenvClient)
245        while True :
246            txn=self.dbenvClient.txn_begin()
247            try :
248                self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
249                        mode=0666, txn=txn)
250            except db.DBRepHandleDeadError :
251                txn.abort()
252                self.dbClient.close()
253                self.dbClient=db.DB(self.dbenvClient)
254                continue
255
256            txn.commit()
257            break
258
259        txn=self.dbenvMaster.txn_begin()
260        self.dbMaster.put("ABC", "123", txn=txn)
261        txn.commit()
262        import time
263        timeout=time.time()+10
264        v=None
265        while (time.time()<timeout) and (v is None) :
266            txn=self.dbenvClient.txn_begin()
267            v=self.dbClient.get("ABC", txn=txn)
268            txn.commit()
269            if v is None :
270                time.sleep(0.02)
271        self.assertTrue(time.time()<timeout)
272        self.assertEqual("123", v)
273
274        txn=self.dbenvMaster.txn_begin()
275        self.dbMaster.delete("ABC", txn=txn)
276        txn.commit()
277        timeout=time.time()+10
278        while (time.time()<timeout) and (v is not None) :
279            txn=self.dbenvClient.txn_begin()
280            v=self.dbClient.get("ABC", txn=txn)
281            txn.commit()
282            if v is None :
283                time.sleep(0.02)
284        self.assertTrue(time.time()<timeout)
285        self.assertEqual(None, v)
286
287class DBBaseReplication(DBReplication) :
288    def setUp(self) :
289        DBReplication.setUp(self)
290        def confirmed_master(a,b,c) :
291            if (b == db.DB_EVENT_REP_MASTER) or (b == db.DB_EVENT_REP_ELECTED) :
292                self.confirmed_master = True
293
294        def client_startupdone(a,b,c) :
295            if b == db.DB_EVENT_REP_STARTUPDONE :
296                self.client_startupdone = True
297
298        self.dbenvMaster.set_event_notify(confirmed_master)
299        self.dbenvClient.set_event_notify(client_startupdone)
300
301        import Queue
302        self.m2c = Queue.Queue()
303        self.c2m = Queue.Queue()
304
305        # There are only two nodes, so we don't need to
306        # do any routing decision
307        def m2c(dbenv, control, rec, lsnp, envid, flags) :
308            self.m2c.put((control, rec))
309
310        def c2m(dbenv, control, rec, lsnp, envid, flags) :
311            self.c2m.put((control, rec))
312
313        self.dbenvMaster.rep_set_transport(13,m2c)
314        self.dbenvMaster.rep_set_priority(10)
315        self.dbenvClient.rep_set_transport(3,c2m)
316        self.dbenvClient.rep_set_priority(0)
317
318        self.assertEqual(self.dbenvMaster.rep_get_priority(),10)
319        self.assertEqual(self.dbenvClient.rep_get_priority(),0)
320
321        #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
322        #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
323        #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
324        #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
325
326        def thread_master() :
327            return self.thread_do(self.dbenvMaster, self.c2m, 3,
328                    self.master_doing_election, True)
329
330        def thread_client() :
331            return self.thread_do(self.dbenvClient, self.m2c, 13,
332                    self.client_doing_election, False)
333
334        from threading import Thread
335        t_m=Thread(target=thread_master)
336        t_c=Thread(target=thread_client)
337        import sys
338        if sys.version_info[0] < 3 :
339            t_m.setDaemon(True)
340            t_c.setDaemon(True)
341        else :
342            t_m.daemon = True
343            t_c.daemon = True
344
345        self.t_m = t_m
346        self.t_c = t_c
347
348        self.dbMaster = self.dbClient = None
349
350        self.master_doing_election=[False]
351        self.client_doing_election=[False]
352
353
354    def tearDown(self):
355        if self.dbClient :
356            self.dbClient.close()
357        if self.dbMaster :
358            self.dbMaster.close()
359        self.m2c.put(None)
360        self.c2m.put(None)
361        self.t_m.join()
362        self.t_c.join()
363
364        # Here we assign dummy event handlers to allow GC of the test object.
365        # Since the dummy handler doesn't use any outer scope variable, it
366        # doesn't keep any reference to the test object.
367        def dummy(*args) :
368            pass
369        self.dbenvMaster.set_event_notify(dummy)
370        self.dbenvClient.set_event_notify(dummy)
371        self.dbenvMaster.rep_set_transport(13,dummy)
372        self.dbenvClient.rep_set_transport(3,dummy)
373
374        self.dbenvClient.close()
375        self.dbenvMaster.close()
376        test_support.rmtree(self.homeDirClient)
377        test_support.rmtree(self.homeDirMaster)
378
379    def basic_rep_threading(self) :
380        self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
381        self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
382
383        def thread_do(env, q, envid, election_status, must_be_master) :
384            while True :
385                v=q.get()
386                if v is None : return
387                env.rep_process_message(v[0], v[1], envid)
388
389        self.thread_do = thread_do
390
391        self.t_m.start()
392        self.t_c.start()
393
394    def test01_basic_replication(self) :
395        self.basic_rep_threading()
396
397        # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
398        # is not generated if the master has no new transactions.
399        # This is solved in BDB 4.6 (#15542).
400        import time
401        timeout = time.time()+10
402        while (time.time()<timeout) and not (self.confirmed_master and
403                self.client_startupdone) :
404            time.sleep(0.02)
405        self.assertTrue(time.time()<timeout)
406
407        self.dbMaster=db.DB(self.dbenvMaster)
408        txn=self.dbenvMaster.txn_begin()
409        self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
410        txn.commit()
411
412        import time,os.path
413        timeout=time.time()+10
414        while (time.time()<timeout) and \
415          not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
416            time.sleep(0.01)
417
418        self.dbClient=db.DB(self.dbenvClient)
419        while True :
420            txn=self.dbenvClient.txn_begin()
421            try :
422                self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
423                        mode=0666, txn=txn)
424            except db.DBRepHandleDeadError :
425                txn.abort()
426                self.dbClient.close()
427                self.dbClient=db.DB(self.dbenvClient)
428                continue
429
430            txn.commit()
431            break
432
433        d = self.dbenvMaster.rep_stat(flags=db.DB_STAT_CLEAR);
434        self.assertTrue("master_changes" in d)
435
436        txn=self.dbenvMaster.txn_begin()
437        self.dbMaster.put("ABC", "123", txn=txn)
438        txn.commit()
439        import time
440        timeout=time.time()+10
441        v=None
442        while (time.time()<timeout) and (v is None) :
443            txn=self.dbenvClient.txn_begin()
444            v=self.dbClient.get("ABC", txn=txn)
445            txn.commit()
446            if v is None :
447                time.sleep(0.02)
448        self.assertTrue(time.time()<timeout)
449        self.assertEqual("123", v)
450
451        txn=self.dbenvMaster.txn_begin()
452        self.dbMaster.delete("ABC", txn=txn)
453        txn.commit()
454        timeout=time.time()+10
455        while (time.time()<timeout) and (v is not None) :
456            txn=self.dbenvClient.txn_begin()
457            v=self.dbClient.get("ABC", txn=txn)
458            txn.commit()
459            if v is None :
460                time.sleep(0.02)
461        self.assertTrue(time.time()<timeout)
462        self.assertEqual(None, v)
463
464    def test02_test_request(self) :
465        self.basic_rep_threading()
466        (minimum, maximum) = self.dbenvClient.rep_get_request()
467        self.dbenvClient.rep_set_request(minimum-1, maximum+1)
468        self.assertEqual(self.dbenvClient.rep_get_request(),
469                (minimum-1, maximum+1))
470
471    def test03_master_election(self) :
472        # Get ready to hold an election
473        #self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
474        self.dbenvMaster.rep_start(flags=db.DB_REP_CLIENT)
475        self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
476
477        def thread_do(env, q, envid, election_status, must_be_master) :
478            while True :
479                v=q.get()
480                if v is None : return
481                r = env.rep_process_message(v[0],v[1],envid)
482                if must_be_master and self.confirmed_master :
483                    self.dbenvMaster.rep_start(flags = db.DB_REP_MASTER)
484                    must_be_master = False
485
486                if r[0] == db.DB_REP_HOLDELECTION :
487                    def elect() :
488                        while True :
489                            try :
490                                env.rep_elect(2, 1)
491                                election_status[0] = False
492                                break
493                            except db.DBRepUnavailError :
494                                pass
495                    if not election_status[0] and not self.confirmed_master :
496                        from threading import Thread
497                        election_status[0] = True
498                        t=Thread(target=elect)
499                        import sys
500                        if sys.version_info[0] < 3 :
501                            t.setDaemon(True)
502                        else :
503                            t.daemon = True
504                        t.start()
505
506        self.thread_do = thread_do
507
508        self.t_m.start()
509        self.t_c.start()
510
511        self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
512        self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
513        self.client_doing_election[0] = True
514        while True :
515            try :
516                self.dbenvClient.rep_elect(2, 1)
517                self.client_doing_election[0] = False
518                break
519            except db.DBRepUnavailError :
520                pass
521
522        self.assertTrue(self.confirmed_master)
523
524        # Race condition showed up after upgrading to Solaris 10 Update 10
525        # https://forums.oracle.com/forums/thread.jspa?messageID=9902860
526        # jcea@jcea.es: See private email from Paula Bingham (Oracle),
527        # in 20110929.
528        while not (self.dbenvClient.rep_stat()["startup_complete"]) :
529            pass
530
531    def test04_test_clockskew(self) :
532        fast, slow = 1234, 1230
533        self.dbenvMaster.rep_set_clockskew(fast, slow)
534        self.assertEqual((fast, slow),
535                self.dbenvMaster.rep_get_clockskew())
536        self.basic_rep_threading()
537
538#----------------------------------------------------------------------
539
540def test_suite():
541    suite = unittest.TestSuite()
542    dbenv = db.DBEnv()
543    try :
544        dbenv.repmgr_get_ack_policy()
545        ReplicationManager_available=True
546    except :
547        ReplicationManager_available=False
548    dbenv.close()
549    del dbenv
550    if ReplicationManager_available :
551        suite.addTest(unittest.makeSuite(DBReplicationManager))
552
553    if have_threads :
554        suite.addTest(unittest.makeSuite(DBBaseReplication))
555
556    return suite
557
558
559if __name__ == '__main__':
560    unittest.main(defaultTest='test_suite')
561