1""" 2Copyright (c) 2008-2020, Jesus Cea Avion <jcea@jcea.es> 3All rights reserved. 4 5Redistribution and use in source and binary forms, with or without 6modification, are permitted provided that the following conditions 7are met: 8 9 1. Redistributions of source code must retain the above copyright 10 notice, this list of conditions and the following disclaimer. 11 12 2. Redistributions in binary form must reproduce the above 13 copyright notice, this list of conditions and the following 14 disclaimer in the documentation and/or other materials provided 15 with the distribution. 16 17 3. Neither the name of Jesus Cea Avion nor the names of its 18 contributors may be used to endorse or promote products derived 19 from this software without specific prior written permission. 20 21 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND 22 CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, 23 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 24 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 25 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS 26 BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 27 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 28 TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 30 ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR 31 TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF 32 THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 33 SUCH DAMAGE. 34 """ 35 36"""TestCases for distributed transactions. 37""" 38 39import os 40import unittest 41 42from .test_all import db, test_support, get_new_environment_path, \ 43 get_new_database_path 44 45from .test_all import verbose 46 47#---------------------------------------------------------------------- 48 49class DBTxn_distributed(unittest.TestCase): 50 num_txns=1234 51 nosync=True 52 must_open_db=False 53 def _create_env(self, must_open_db) : 54 self.dbenv = db.DBEnv() 55 self.dbenv.set_tx_max(self.num_txns) 56 self.dbenv.set_lk_max_lockers(self.num_txns*2) 57 self.dbenv.set_lk_max_locks(self.num_txns*2) 58 self.dbenv.set_lk_max_objects(self.num_txns*2) 59 if self.nosync : 60 self.dbenv.set_flags(db.DB_TXN_NOSYNC,True) 61 self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_THREAD | 62 db.DB_RECOVER | 63 db.DB_INIT_TXN | db.DB_INIT_LOG | db.DB_INIT_MPOOL | 64 db.DB_INIT_LOCK, 0o666) 65 self.db = db.DB(self.dbenv) 66 self.db.set_re_len(db.DB_GID_SIZE) 67 if must_open_db : 68 txn=self.dbenv.txn_begin() 69 self.db.open(self.filename, 70 db.DB_QUEUE, db.DB_CREATE | db.DB_THREAD, 0o666, 71 txn=txn) 72 txn.commit() 73 74 def setUp(self) : 75 self.homeDir = get_new_environment_path() 76 self.filename = "test" 77 return self._create_env(must_open_db=True) 78 79 def _destroy_env(self): 80 if self.nosync : 81 self.dbenv.log_flush() 82 self.db.close() 83 self.dbenv.close() 84 85 def tearDown(self): 86 self._destroy_env() 87 test_support.rmtree(self.homeDir) 88 89 def _recreate_env(self,must_open_db) : 90 self._destroy_env() 91 self._create_env(must_open_db) 92 93 def test01_distributed_transactions(self) : 94 txns=set() 95 adapt = lambda x : x 96 import sys 97 if sys.version_info[0] >= 3 : 98 adapt = lambda x : bytes(x, "ascii") 99 # Create transactions, "prepare" them, and 100 # let them be garbage collected. 101 for i in range(self.num_txns) : 102 txn = self.dbenv.txn_begin() 103 gid = "%%%dd" %db.DB_GID_SIZE 104 gid = adapt(gid %i) 105 self.db.put(i, gid, txn=txn, flags=db.DB_APPEND) 106 txns.add(gid) 107 txn.prepare(gid) 108 del txn 109 110 self._recreate_env(self.must_open_db) 111 112 # Get "to be recovered" transactions but 113 # let them be garbage collected. 114 recovered_txns=self.dbenv.txn_recover() 115 self.assertEqual(self.num_txns,len(recovered_txns)) 116 for gid,txn in recovered_txns : 117 self.assertTrue(gid in txns) 118 del txn 119 del recovered_txns 120 121 self._recreate_env(self.must_open_db) 122 123 # Get "to be recovered" transactions. Commit, abort and 124 # discard them. 125 recovered_txns=self.dbenv.txn_recover() 126 self.assertEqual(self.num_txns,len(recovered_txns)) 127 discard_txns=set() 128 committed_txns=set() 129 state=0 130 for gid,txn in recovered_txns : 131 if state==0 or state==1: 132 committed_txns.add(gid) 133 txn.commit() 134 elif state==2 : 135 txn.abort() 136 elif state==3 : 137 txn.discard() 138 discard_txns.add(gid) 139 state=-1 140 state+=1 141 del txn 142 del recovered_txns 143 144 self._recreate_env(self.must_open_db) 145 146 # Verify the discarded transactions are still 147 # around, and dispose them. 148 recovered_txns=self.dbenv.txn_recover() 149 self.assertEqual(len(discard_txns),len(recovered_txns)) 150 for gid,txn in recovered_txns : 151 txn.abort() 152 del txn 153 del recovered_txns 154 155 self._recreate_env(must_open_db=True) 156 157 # Be sure there are not pending transactions. 158 # Check also database size. 159 recovered_txns=self.dbenv.txn_recover() 160 self.assertTrue(len(recovered_txns)==0) 161 self.assertEqual(len(committed_txns),self.db.stat()["nkeys"]) 162 163class DBTxn_distributedSYNC(DBTxn_distributed): 164 nosync=False 165 166class DBTxn_distributed_must_open_db(DBTxn_distributed): 167 must_open_db=True 168 169class DBTxn_distributedSYNC_must_open_db(DBTxn_distributed): 170 nosync=False 171 must_open_db=True 172 173#---------------------------------------------------------------------- 174 175def test_suite(): 176 suite = unittest.TestSuite() 177 suite.addTest(unittest.makeSuite(DBTxn_distributed)) 178 suite.addTest(unittest.makeSuite(DBTxn_distributedSYNC)) 179 suite.addTest(unittest.makeSuite(DBTxn_distributed_must_open_db)) 180 suite.addTest(unittest.makeSuite(DBTxn_distributedSYNC_must_open_db)) 181 return suite 182 183 184if __name__ == '__main__': 185 unittest.main(defaultTest='test_suite') 186