1""" 2Copyright (c) 2008-2020, Jesus Cea Avion <jcea@jcea.es> 3All rights reserved. 4 5Redistribution and use in source and binary forms, with or without 6modification, are permitted provided that the following conditions 7are met: 8 9 1. Redistributions of source code must retain the above copyright 10 notice, this list of conditions and the following disclaimer. 11 12 2. Redistributions in binary form must reproduce the above 13 copyright notice, this list of conditions and the following 14 disclaimer in the documentation and/or other materials provided 15 with the distribution. 16 17 3. Neither the name of Jesus Cea Avion nor the names of its 18 contributors may be used to endorse or promote products derived 19 from this software without specific prior written permission. 20 21 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND 22 CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, 23 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 24 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 25 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS 26 BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 27 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 28 TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 30 ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR 31 TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF 32 THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 33 SUCH DAMAGE. 34 """ 35 36"""TestCases for distributed transactions. 37""" 38 39import os 40import time 41import unittest 42import sys 43 44from .test_all import db, test_support, have_threads, verbose, \ 45 get_new_environment_path, get_new_database_path 46 47 48#---------------------------------------------------------------------- 49 50class DBReplication(unittest.TestCase) : 51 def setUp(self) : 52 self.homeDirMaster = get_new_environment_path() 53 self.homeDirClient = get_new_environment_path() 54 55 self.dbenvMaster = db.DBEnv() 56 self.dbenvClient = db.DBEnv() 57 58 # Must use "DB_THREAD" because the Replication Manager will 59 # be executed in other threads but will use the same environment. 60 # http://forums.oracle.com/forums/thread.jspa?threadID=645788&tstart=0 61 self.dbenvMaster.open(self.homeDirMaster, db.DB_CREATE | db.DB_INIT_TXN 62 | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | 63 db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0o666) 64 self.dbenvClient.open(self.homeDirClient, db.DB_CREATE | db.DB_INIT_TXN 65 | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | 66 db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0o666) 67 68 self.confirmed_master=self.client_startupdone=False 69 def confirmed_master(a,b,c) : 70 if b==db.DB_EVENT_REP_MASTER : 71 self.confirmed_master=True 72 73 def client_startupdone(a,b,c) : 74 if b==db.DB_EVENT_REP_STARTUPDONE : 75 self.client_startupdone=True 76 77 self.dbenvMaster.set_event_notify(confirmed_master) 78 self.dbenvClient.set_event_notify(client_startupdone) 79 80 #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True) 81 #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True) 82 #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True) 83 #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True) 84 85 self.dbMaster = self.dbClient = None 86 87 88 def tearDown(self): 89 if self.dbClient : 90 self.dbClient.close() 91 if self.dbMaster : 92 self.dbMaster.close() 93 94 # Here we assign dummy event handlers to allow GC of the test object. 95 # Since the dummy handler doesn't use any outer scope variable, it 96 # doesn't keep any reference to the test object. 97 def dummy(*args) : 98 pass 99 self.dbenvMaster.set_event_notify(dummy) 100 self.dbenvClient.set_event_notify(dummy) 101 102 self.dbenvClient.close() 103 self.dbenvMaster.close() 104 test_support.rmtree(self.homeDirClient) 105 test_support.rmtree(self.homeDirMaster) 106 107class DBReplicationManager(DBReplication) : 108 def test01_basic_replication(self) : 109 if sys.version_info < (3, 9): 110 find_unused_port = test_support.find_unused_port 111 else: 112 from test.support.socket_helper import find_unused_port 113 master_port = find_unused_port() 114 client_port = find_unused_port() 115 116 if db.version() >= (5, 2) : 117 self.site = self.dbenvMaster.repmgr_site("127.0.0.1", master_port) 118 self.site.set_config(db.DB_GROUP_CREATOR, True) 119 self.site.set_config(db.DB_LOCAL_SITE, True) 120 self.site2 = self.dbenvMaster.repmgr_site("127.0.0.1", client_port) 121 122 self.site3 = self.dbenvClient.repmgr_site("127.0.0.1", master_port) 123 self.site3.set_config(db.DB_BOOTSTRAP_HELPER, True) 124 self.site4 = self.dbenvClient.repmgr_site("127.0.0.1", client_port) 125 self.site4.set_config(db.DB_LOCAL_SITE, True) 126 127 d = { 128 db.DB_BOOTSTRAP_HELPER: [False, False, True, False], 129 db.DB_GROUP_CREATOR: [True, False, False, False], 130 db.DB_LEGACY: [False, False, False, False], 131 db.DB_LOCAL_SITE: [True, False, False, True], 132 db.DB_REPMGR_PEER: [False, False, False, False ], 133 } 134 135 for i, j in list(d.items()) : 136 for k, v in \ 137 zip([self.site, self.site2, self.site3, self.site4], j) : 138 if v : 139 self.assertTrue(k.get_config(i)) 140 else : 141 self.assertFalse(k.get_config(i)) 142 143 self.assertNotEqual(self.site.get_eid(), self.site2.get_eid()) 144 self.assertNotEqual(self.site3.get_eid(), self.site4.get_eid()) 145 146 for i, j in zip([self.site, self.site2, self.site3, self.site4], \ 147 [master_port, client_port, master_port, client_port]) : 148 addr = i.get_address() 149 self.assertEqual(addr, ("127.0.0.1", j)) 150 151 for i in [self.site, self.site2] : 152 self.assertEqual(i.get_address(), 153 self.dbenvMaster.repmgr_site_by_eid(i.get_eid()).get_address()) 154 for i in [self.site3, self.site4] : 155 self.assertEqual(i.get_address(), 156 self.dbenvClient.repmgr_site_by_eid(i.get_eid()).get_address()) 157 else : 158 self.dbenvMaster.repmgr_set_local_site("127.0.0.1", master_port) 159 self.dbenvClient.repmgr_set_local_site("127.0.0.1", client_port) 160 self.dbenvMaster.repmgr_add_remote_site("127.0.0.1", client_port) 161 self.dbenvClient.repmgr_add_remote_site("127.0.0.1", master_port) 162 163 self.dbenvMaster.rep_set_nsites(2) 164 self.dbenvClient.rep_set_nsites(2) 165 166 self.dbenvMaster.rep_set_priority(10) 167 self.dbenvClient.rep_set_priority(0) 168 169 self.dbenvMaster.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100123) 170 self.dbenvClient.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100321) 171 self.assertEqual(self.dbenvMaster.rep_get_timeout( 172 db.DB_REP_CONNECTION_RETRY), 100123) 173 self.assertEqual(self.dbenvClient.rep_get_timeout( 174 db.DB_REP_CONNECTION_RETRY), 100321) 175 176 self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100234) 177 self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100432) 178 self.assertEqual(self.dbenvMaster.rep_get_timeout( 179 db.DB_REP_ELECTION_TIMEOUT), 100234) 180 self.assertEqual(self.dbenvClient.rep_get_timeout( 181 db.DB_REP_ELECTION_TIMEOUT), 100432) 182 183 self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100345) 184 self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100543) 185 self.assertEqual(self.dbenvMaster.rep_get_timeout( 186 db.DB_REP_ELECTION_RETRY), 100345) 187 self.assertEqual(self.dbenvClient.rep_get_timeout( 188 db.DB_REP_ELECTION_RETRY), 100543) 189 190 self.dbenvMaster.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL) 191 self.dbenvClient.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL) 192 193 self.dbenvMaster.repmgr_start(1, db.DB_REP_MASTER); 194 self.dbenvClient.repmgr_start(1, db.DB_REP_CLIENT); 195 196 self.assertEqual(self.dbenvMaster.rep_get_nsites(),2) 197 self.assertEqual(self.dbenvClient.rep_get_nsites(),2) 198 self.assertEqual(self.dbenvMaster.rep_get_priority(),10) 199 self.assertEqual(self.dbenvClient.rep_get_priority(),0) 200 self.assertEqual(self.dbenvMaster.repmgr_get_ack_policy(), 201 db.DB_REPMGR_ACKS_ALL) 202 self.assertEqual(self.dbenvClient.repmgr_get_ack_policy(), 203 db.DB_REPMGR_ACKS_ALL) 204 205 # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE 206 # is not generated if the master has no new transactions. 207 # This is solved in BDB 4.6 (#15542). 208 import time 209 timeout = time.time()+10 210 while (time.time()<timeout) and not (self.confirmed_master and self.client_startupdone) : 211 time.sleep(0.02) 212 self.assertTrue(time.time()<timeout) 213 214 d = self.dbenvMaster.repmgr_site_list() 215 self.assertEqual(len(d), 1) 216 d = list(d.values())[0] # There is only one 217 self.assertEqual(d[0], "127.0.0.1") 218 self.assertEqual(d[1], client_port) 219 self.assertTrue((d[2]==db.DB_REPMGR_CONNECTED) or \ 220 (d[2]==db.DB_REPMGR_DISCONNECTED)) 221 222 d = self.dbenvClient.repmgr_site_list() 223 self.assertEqual(len(d), 1) 224 d = list(d.values())[0] # There is only one 225 self.assertEqual(d[0], "127.0.0.1") 226 self.assertEqual(d[1], master_port) 227 self.assertTrue((d[2]==db.DB_REPMGR_CONNECTED) or \ 228 (d[2]==db.DB_REPMGR_DISCONNECTED)) 229 230 d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR); 231 self.assertTrue("msgs_queued" in d) 232 233 self.dbMaster=db.DB(self.dbenvMaster) 234 txn=self.dbenvMaster.txn_begin() 235 self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0o666, txn=txn) 236 txn.commit() 237 238 import time,os.path 239 timeout=time.time()+10 240 while (time.time()<timeout) and \ 241 not (os.path.exists(os.path.join(self.homeDirClient,"test"))) : 242 time.sleep(0.01) 243 244 self.dbClient=db.DB(self.dbenvClient) 245 while True : 246 txn=self.dbenvClient.txn_begin() 247 try : 248 self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY, 249 mode=0o666, txn=txn) 250 except db.DBRepHandleDeadError : 251 txn.abort() 252 self.dbClient.close() 253 self.dbClient=db.DB(self.dbenvClient) 254 continue 255 256 txn.commit() 257 break 258 259 txn=self.dbenvMaster.txn_begin() 260 self.dbMaster.put("ABC", "123", txn=txn) 261 txn.commit() 262 import time 263 timeout=time.time()+10 264 v=None 265 while (time.time()<timeout) and (v is None) : 266 txn=self.dbenvClient.txn_begin() 267 v=self.dbClient.get("ABC", txn=txn) 268 txn.commit() 269 if v is None : 270 time.sleep(0.02) 271 self.assertTrue(time.time()<timeout) 272 self.assertEqual("123", v) 273 274 txn=self.dbenvMaster.txn_begin() 275 self.dbMaster.delete("ABC", txn=txn) 276 txn.commit() 277 timeout=time.time()+10 278 while (time.time()<timeout) and (v is not None) : 279 txn=self.dbenvClient.txn_begin() 280 v=self.dbClient.get("ABC", txn=txn) 281 txn.commit() 282 if v is None : 283 time.sleep(0.02) 284 self.assertTrue(time.time()<timeout) 285 self.assertEqual(None, v) 286 287class DBBaseReplication(DBReplication) : 288 def setUp(self) : 289 DBReplication.setUp(self) 290 def confirmed_master(a,b,c) : 291 if (b == db.DB_EVENT_REP_MASTER) or (b == db.DB_EVENT_REP_ELECTED) : 292 self.confirmed_master = True 293 294 def client_startupdone(a,b,c) : 295 if b == db.DB_EVENT_REP_STARTUPDONE : 296 self.client_startupdone = True 297 298 self.dbenvMaster.set_event_notify(confirmed_master) 299 self.dbenvClient.set_event_notify(client_startupdone) 300 301 import queue 302 self.m2c = queue.Queue() 303 self.c2m = queue.Queue() 304 305 # There are only two nodes, so we don't need to 306 # do any routing decision 307 def m2c(dbenv, control, rec, lsnp, envid, flags) : 308 self.m2c.put((control, rec)) 309 310 def c2m(dbenv, control, rec, lsnp, envid, flags) : 311 self.c2m.put((control, rec)) 312 313 self.dbenvMaster.rep_set_transport(13,m2c) 314 self.dbenvMaster.rep_set_priority(10) 315 self.dbenvClient.rep_set_transport(3,c2m) 316 self.dbenvClient.rep_set_priority(0) 317 318 self.assertEqual(self.dbenvMaster.rep_get_priority(),10) 319 self.assertEqual(self.dbenvClient.rep_get_priority(),0) 320 321 #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True) 322 #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True) 323 #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True) 324 #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True) 325 326 def thread_master() : 327 return self.thread_do(self.dbenvMaster, self.c2m, 3, 328 self.master_doing_election, True) 329 330 def thread_client() : 331 return self.thread_do(self.dbenvClient, self.m2c, 13, 332 self.client_doing_election, False) 333 334 from threading import Thread 335 t_m=Thread(target=thread_master) 336 t_c=Thread(target=thread_client) 337 import sys 338 if sys.version_info[0] < 3 : 339 t_m.setDaemon(True) 340 t_c.setDaemon(True) 341 else : 342 t_m.daemon = True 343 t_c.daemon = True 344 345 self.t_m = t_m 346 self.t_c = t_c 347 348 self.dbMaster = self.dbClient = None 349 350 self.master_doing_election=[False] 351 self.client_doing_election=[False] 352 353 354 def tearDown(self): 355 if self.dbClient : 356 self.dbClient.close() 357 if self.dbMaster : 358 self.dbMaster.close() 359 self.m2c.put(None) 360 self.c2m.put(None) 361 self.t_m.join() 362 self.t_c.join() 363 364 # Here we assign dummy event handlers to allow GC of the test object. 365 # Since the dummy handler doesn't use any outer scope variable, it 366 # doesn't keep any reference to the test object. 367 def dummy(*args) : 368 pass 369 self.dbenvMaster.set_event_notify(dummy) 370 self.dbenvClient.set_event_notify(dummy) 371 self.dbenvMaster.rep_set_transport(13,dummy) 372 self.dbenvClient.rep_set_transport(3,dummy) 373 374 self.dbenvClient.close() 375 self.dbenvMaster.close() 376 test_support.rmtree(self.homeDirClient) 377 test_support.rmtree(self.homeDirMaster) 378 379 def basic_rep_threading(self) : 380 self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER) 381 self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT) 382 383 def thread_do(env, q, envid, election_status, must_be_master) : 384 while True : 385 v=q.get() 386 if v is None : return 387 env.rep_process_message(v[0], v[1], envid) 388 389 self.thread_do = thread_do 390 391 self.t_m.start() 392 self.t_c.start() 393 394 def test01_basic_replication(self) : 395 self.basic_rep_threading() 396 397 # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE 398 # is not generated if the master has no new transactions. 399 # This is solved in BDB 4.6 (#15542). 400 import time 401 timeout = time.time()+10 402 while (time.time()<timeout) and not (self.confirmed_master and 403 self.client_startupdone) : 404 time.sleep(0.02) 405 self.assertTrue(time.time()<timeout) 406 407 self.dbMaster=db.DB(self.dbenvMaster) 408 txn=self.dbenvMaster.txn_begin() 409 self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0o666, txn=txn) 410 txn.commit() 411 412 import time,os.path 413 timeout=time.time()+10 414 while (time.time()<timeout) and \ 415 not (os.path.exists(os.path.join(self.homeDirClient,"test"))) : 416 time.sleep(0.01) 417 418 self.dbClient=db.DB(self.dbenvClient) 419 while True : 420 txn=self.dbenvClient.txn_begin() 421 try : 422 self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY, 423 mode=0o666, txn=txn) 424 except db.DBRepHandleDeadError : 425 txn.abort() 426 self.dbClient.close() 427 self.dbClient=db.DB(self.dbenvClient) 428 continue 429 430 txn.commit() 431 break 432 433 d = self.dbenvMaster.rep_stat(flags=db.DB_STAT_CLEAR); 434 self.assertTrue("master_changes" in d) 435 436 txn=self.dbenvMaster.txn_begin() 437 self.dbMaster.put("ABC", "123", txn=txn) 438 txn.commit() 439 import time 440 timeout=time.time()+10 441 v=None 442 while (time.time()<timeout) and (v is None) : 443 txn=self.dbenvClient.txn_begin() 444 v=self.dbClient.get("ABC", txn=txn) 445 txn.commit() 446 if v is None : 447 time.sleep(0.02) 448 self.assertTrue(time.time()<timeout) 449 self.assertEqual("123", v) 450 451 txn=self.dbenvMaster.txn_begin() 452 self.dbMaster.delete("ABC", txn=txn) 453 txn.commit() 454 timeout=time.time()+10 455 while (time.time()<timeout) and (v is not None) : 456 txn=self.dbenvClient.txn_begin() 457 v=self.dbClient.get("ABC", txn=txn) 458 txn.commit() 459 if v is None : 460 time.sleep(0.02) 461 self.assertTrue(time.time()<timeout) 462 self.assertEqual(None, v) 463 464 def test02_test_request(self) : 465 self.basic_rep_threading() 466 (minimum, maximum) = self.dbenvClient.rep_get_request() 467 self.dbenvClient.rep_set_request(minimum-1, maximum+1) 468 self.assertEqual(self.dbenvClient.rep_get_request(), 469 (minimum-1, maximum+1)) 470 471 def test03_master_election(self) : 472 # Get ready to hold an election 473 #self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER) 474 self.dbenvMaster.rep_start(flags=db.DB_REP_CLIENT) 475 self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT) 476 477 def thread_do(env, q, envid, election_status, must_be_master) : 478 while True : 479 v=q.get() 480 if v is None : return 481 r = env.rep_process_message(v[0],v[1],envid) 482 if must_be_master and self.confirmed_master : 483 self.dbenvMaster.rep_start(flags = db.DB_REP_MASTER) 484 must_be_master = False 485 486 if r[0] == db.DB_REP_HOLDELECTION : 487 def elect() : 488 while True : 489 try : 490 env.rep_elect(2, 1) 491 election_status[0] = False 492 break 493 except db.DBRepUnavailError : 494 pass 495 if not election_status[0] and not self.confirmed_master : 496 from threading import Thread 497 election_status[0] = True 498 t=Thread(target=elect) 499 import sys 500 if sys.version_info[0] < 3 : 501 t.setDaemon(True) 502 else : 503 t.daemon = True 504 t.start() 505 506 self.thread_do = thread_do 507 508 self.t_m.start() 509 self.t_c.start() 510 511 self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000) 512 self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000) 513 self.client_doing_election[0] = True 514 while True : 515 try : 516 self.dbenvClient.rep_elect(2, 1) 517 self.client_doing_election[0] = False 518 break 519 except db.DBRepUnavailError : 520 pass 521 522 self.assertTrue(self.confirmed_master) 523 524 # Race condition showed up after upgrading to Solaris 10 Update 10 525 # https://forums.oracle.com/forums/thread.jspa?messageID=9902860 526 # jcea@jcea.es: See private email from Paula Bingham (Oracle), 527 # in 20110929. 528 while not (self.dbenvClient.rep_stat()["startup_complete"]) : 529 pass 530 531 def test04_test_clockskew(self) : 532 fast, slow = 1234, 1230 533 self.dbenvMaster.rep_set_clockskew(fast, slow) 534 self.assertEqual((fast, slow), 535 self.dbenvMaster.rep_get_clockskew()) 536 self.basic_rep_threading() 537 538#---------------------------------------------------------------------- 539 540def test_suite(): 541 suite = unittest.TestSuite() 542 dbenv = db.DBEnv() 543 try : 544 dbenv.repmgr_get_ack_policy() 545 ReplicationManager_available=True 546 except : 547 ReplicationManager_available=False 548 dbenv.close() 549 del dbenv 550 if ReplicationManager_available : 551 suite.addTest(unittest.makeSuite(DBReplicationManager)) 552 553 if have_threads : 554 suite.addTest(unittest.makeSuite(DBBaseReplication)) 555 556 return suite 557 558 559if __name__ == '__main__': 560 unittest.main(defaultTest='test_suite') 561