1"""
2Copyright (c) 2008-2020, Jesus Cea Avion <jcea@jcea.es>
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without
6modification, are permitted provided that the following conditions
7are met:
8
9    1. Redistributions of source code must retain the above copyright
10    notice, this list of conditions and the following disclaimer.
11
12    2. Redistributions in binary form must reproduce the above
13    copyright notice, this list of conditions and the following
14    disclaimer in the documentation and/or other materials provided
15    with the distribution.
16
17    3. Neither the name of Jesus Cea Avion nor the names of its
18    contributors may be used to endorse or promote products derived
19    from this software without specific prior written permission.
20
21    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
22    CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
23    INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
24    MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
25    DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS
26    BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
27    EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
28            TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29            DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
30    ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
31    TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
32    THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33    SUCH DAMAGE.
34    """
35
36"""TestCases for distributed transactions.
37"""
38
39import os
40import unittest
41
42from .test_all import db, test_support, get_new_environment_path, \
43        get_new_database_path
44
45from .test_all import verbose
46
47#----------------------------------------------------------------------
48
49class DBTxn_distributed(unittest.TestCase):
50    num_txns=1234
51    nosync=True
52    must_open_db=False
53    def _create_env(self, must_open_db) :
54        self.dbenv = db.DBEnv()
55        self.dbenv.set_tx_max(self.num_txns)
56        self.dbenv.set_lk_max_lockers(self.num_txns*2)
57        self.dbenv.set_lk_max_locks(self.num_txns*2)
58        self.dbenv.set_lk_max_objects(self.num_txns*2)
59        if self.nosync :
60            self.dbenv.set_flags(db.DB_TXN_NOSYNC,True)
61        self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_THREAD |
62                db.DB_RECOVER |
63                db.DB_INIT_TXN | db.DB_INIT_LOG | db.DB_INIT_MPOOL |
64                db.DB_INIT_LOCK, 0o666)
65        self.db = db.DB(self.dbenv)
66        self.db.set_re_len(db.DB_GID_SIZE)
67        if must_open_db :
68            txn=self.dbenv.txn_begin()
69            self.db.open(self.filename,
70                    db.DB_QUEUE, db.DB_CREATE | db.DB_THREAD, 0o666,
71                    txn=txn)
72            txn.commit()
73
74    def setUp(self) :
75        self.homeDir = get_new_environment_path()
76        self.filename = "test"
77        return self._create_env(must_open_db=True)
78
79    def _destroy_env(self):
80        if self.nosync :
81            self.dbenv.log_flush()
82        self.db.close()
83        self.dbenv.close()
84
85    def tearDown(self):
86        self._destroy_env()
87        test_support.rmtree(self.homeDir)
88
89    def _recreate_env(self,must_open_db) :
90        self._destroy_env()
91        self._create_env(must_open_db)
92
93    def test01_distributed_transactions(self) :
94        txns=set()
95        adapt = lambda x : x
96        import sys
97        if sys.version_info[0] >= 3 :
98            adapt = lambda x : bytes(x, "ascii")
99    # Create transactions, "prepare" them, and
100    # let them be garbage collected.
101        for i in range(self.num_txns) :
102            txn = self.dbenv.txn_begin()
103            gid = "%%%dd" %db.DB_GID_SIZE
104            gid = adapt(gid %i)
105            self.db.put(i, gid, txn=txn, flags=db.DB_APPEND)
106            txns.add(gid)
107            txn.prepare(gid)
108        del txn
109
110        self._recreate_env(self.must_open_db)
111
112    # Get "to be recovered" transactions but
113    # let them be garbage collected.
114        recovered_txns=self.dbenv.txn_recover()
115        self.assertEqual(self.num_txns,len(recovered_txns))
116        for gid,txn in recovered_txns :
117            self.assertTrue(gid in txns)
118        del txn
119        del recovered_txns
120
121        self._recreate_env(self.must_open_db)
122
123    # Get "to be recovered" transactions. Commit, abort and
124    # discard them.
125        recovered_txns=self.dbenv.txn_recover()
126        self.assertEqual(self.num_txns,len(recovered_txns))
127        discard_txns=set()
128        committed_txns=set()
129        state=0
130        for gid,txn in recovered_txns :
131            if state==0 or state==1:
132                committed_txns.add(gid)
133                txn.commit()
134            elif state==2 :
135                txn.abort()
136            elif state==3 :
137                txn.discard()
138                discard_txns.add(gid)
139                state=-1
140            state+=1
141        del txn
142        del recovered_txns
143
144        self._recreate_env(self.must_open_db)
145
146    # Verify the discarded transactions are still
147    # around, and dispose them.
148        recovered_txns=self.dbenv.txn_recover()
149        self.assertEqual(len(discard_txns),len(recovered_txns))
150        for gid,txn in recovered_txns :
151            txn.abort()
152        del txn
153        del recovered_txns
154
155        self._recreate_env(must_open_db=True)
156
157    # Be sure there are not pending transactions.
158    # Check also database size.
159        recovered_txns=self.dbenv.txn_recover()
160        self.assertTrue(len(recovered_txns)==0)
161        self.assertEqual(len(committed_txns),self.db.stat()["nkeys"])
162
163class DBTxn_distributedSYNC(DBTxn_distributed):
164    nosync=False
165
166class DBTxn_distributed_must_open_db(DBTxn_distributed):
167    must_open_db=True
168
169class DBTxn_distributedSYNC_must_open_db(DBTxn_distributed):
170    nosync=False
171    must_open_db=True
172
173#----------------------------------------------------------------------
174
175def test_suite():
176    suite = unittest.TestSuite()
177    suite.addTest(unittest.makeSuite(DBTxn_distributed))
178    suite.addTest(unittest.makeSuite(DBTxn_distributedSYNC))
179    suite.addTest(unittest.makeSuite(DBTxn_distributed_must_open_db))
180    suite.addTest(unittest.makeSuite(DBTxn_distributedSYNC_must_open_db))
181    return suite
182
183
184if __name__ == '__main__':
185    unittest.main(defaultTest='test_suite')
186