1#!/usr/bin/env python 2# 3# Public Domain 2014-2018 MongoDB, Inc. 4# Public Domain 2008-2014 WiredTiger, Inc. 5# 6# This is free and unencumbered software released into the public domain. 7# 8# Anyone is free to copy, modify, publish, use, compile, sell, or 9# distribute this software, either in source code form or as a compiled 10# binary, for any purpose, commercial or non-commercial, and by any 11# means. 12# 13# In jurisdictions that recognize copyright laws, the author or authors 14# of this software dedicate any and all copyright interest in the 15# software to the public domain. We make this dedication for the benefit 16# of the public at large and to the detriment of our heirs and 17# successors. We intend this dedication to be an overt act of 18# relinquishment in perpetuity of all present and future rights to this 19# software under copyright law. 20# 21# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 22# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 23# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. 24# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR 25# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, 26# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 27# OTHER DEALINGS IN THE SOFTWARE. 28# 29# test_txn02.py 30# Transactions: commits and rollbacks 31# 32 33import fnmatch, os, shutil, time 34from suite_subprocess import suite_subprocess 35from wtscenario import make_scenarios 36import wttest 37 38class test_txn02(wttest.WiredTigerTestCase, suite_subprocess): 39 logmax = "100K" 40 tablename = 'test_txn02' 41 uri = 'table:' + tablename 42 archive_list = ['true', 'false'] 43 conn_list = ['reopen', 'stay_open'] 44 sync_list = [ 45 '(method=dsync,enabled)', 46 '(method=fsync,enabled)', 47 '(method=none,enabled)', 48 '(enabled=false)' 49 ] 50 51 types = [ 52 ('row', dict(tabletype='row', 53 create_params = 'key_format=i,value_format=i')), 54 ('var', dict(tabletype='var', 55 create_params = 'key_format=r,value_format=i')), 56 ('fix', dict(tabletype='fix', 57 create_params = 'key_format=r,value_format=8t')), 58 ] 59 op1s = [ 60 ('i4', dict(op1=('insert', 4))), 61 ('r1', dict(op1=('remove', 1))), 62 ('u10', dict(op1=('update', 10))), 63 ] 64 op2s = [ 65 ('i6', dict(op2=('insert', 6))), 66 ('r4', dict(op2=('remove', 4))), 67 ('u4', dict(op2=('update', 4))), 68 ] 69 op3s = [ 70 ('i12', dict(op3=('insert', 12))), 71 ('r4', dict(op3=('remove', 4))), 72 ('u4', dict(op3=('update', 4))), 73 ] 74 op4s = [ 75 ('i14', dict(op4=('insert', 14))), 76 ('r12', dict(op4=('remove', 12))), 77 ('u12', dict(op4=('update', 12))), 78 ] 79 txn1s = [('t1c', dict(txn1='commit')), ('t1r', dict(txn1='rollback'))] 80 txn2s = [('t2c', dict(txn2='commit')), ('t2r', dict(txn2='rollback'))] 81 txn3s = [('t3c', dict(txn3='commit')), ('t3r', dict(txn3='rollback'))] 82 txn4s = [('t4c', dict(txn4='commit')), ('t4r', dict(txn4='rollback'))] 83 84 # This test generates thousands of potential scenarios. 85 # For default runs, we'll use a small subset of them, for 86 # long runs (when --long is set) we'll set a much larger limit. 87 scenarios = make_scenarios(types, 88 op1s, txn1s, op2s, txn2s, op3s, txn3s, op4s, txn4s, 89 prune=20, prunelong=5000) 90 91 # Each check_log() call takes a second, so we don't call it for 92 # every scenario, we'll limit it to the value of checklog_calls. 93 checklog_calls = 100 if wttest.islongtest() else 2 94 checklog_mod = (len(scenarios) / checklog_calls + 1) 95 96 _debug = False 97 def debug(self, msg): 98 if not self._debug: 99 return 100 print(msg) 101 102 def conn_config(self): 103 # Cycle through the different transaction_sync values in a 104 # deterministic manner. 105 txn_sync = self.sync_list[ 106 self.scenario_number % len(self.sync_list)] 107 # 108 # We don't want to run zero fill with only the same settings, such 109 # as archive or sync, which are an even number of options. 110 # 111 freq = 3 112 zerofill = 'false' 113 if self.scenario_number % freq == 0: 114 zerofill = 'true' 115 return 'log=(archive=false,enabled,file_max=%s),' % self.logmax + \ 116 'log=(zero_fill=%s),' % zerofill + \ 117 'transaction_sync="%s",' % txn_sync 118 119 # Check that a cursor (optionally started in a new transaction), sees the 120 # expected values. 121 def check(self, session, txn_config, expected): 122 if txn_config: 123 session.begin_transaction(txn_config) 124 c = session.open_cursor(self.uri, None) 125 actual = dict((k, v) for k, v in c if v != 0) 126 # Search for the expected items as well as iterating 127 for k, v in expected.iteritems(): 128 self.assertEqual(c[k], v) 129 c.close() 130 if txn_config: 131 session.commit_transaction() 132 self.assertEqual(actual, expected) 133 134 # Check the state of the system with respect to the current cursor and 135 # different isolation levels. 136 def check_all(self, current, committed): 137 # Transactions see their own changes. 138 # Read-uncommitted transactions see all changes. 139 # Snapshot and read-committed transactions should not see changes. 140 self.check(self.session, None, current) 141 self.check(self.session2, "isolation=snapshot", committed) 142 self.check(self.session2, "isolation=read-committed", committed) 143 self.check(self.session2, "isolation=read-uncommitted", current) 144 145 # Opening a clone of the database home directory should run 146 # recovery and see the committed results. Flush the log because 147 # the backup may not get all the log records if we are running 148 # without a sync option. Use sync=off to force a write to the OS. 149 self.session.log_flush('sync=off') 150 self.backup(self.backup_dir) 151 backup_conn_params = 'log=(enabled,file_max=%s)' % self.logmax 152 backup_conn = self.wiredtiger_open(self.backup_dir, backup_conn_params) 153 try: 154 self.check(backup_conn.open_session(), None, committed) 155 finally: 156 backup_conn.close() 157 158 def check_log(self, committed): 159 self.backup(self.backup_dir) 160 # 161 # Open and close the backup connection a few times to force 162 # repeated recovery and log archiving even if later recoveries 163 # are essentially no-ops. Confirm that the backup contains 164 # the committed operations after recovery. 165 # 166 # Cycle through the different archive values in a 167 # deterministic manner. 168 self.archive = self.archive_list[ 169 self.scenario_number % len(self.archive_list)] 170 backup_conn_params = \ 171 'log=(enabled,file_max=%s,archive=%s)' % (self.logmax, self.archive) 172 orig_logs = fnmatch.filter(os.listdir(self.backup_dir), "*gerLog*") 173 endcount = 2 174 count = 0 175 while count < endcount: 176 backup_conn = self.wiredtiger_open(self.backup_dir, 177 backup_conn_params) 178 try: 179 session = backup_conn.open_session() 180 finally: 181 self.check(backup_conn.open_session(), None, committed) 182 # Sleep long enough so that the archive thread is guaranteed 183 # to run before we close the connection. 184 time.sleep(1.0) 185 backup_conn.close() 186 count += 1 187 # 188 # Check logs after repeated openings. The first log should 189 # have been archived if configured. Subsequent openings would not 190 # archive because no checkpoint is written due to no modifications. 191 # 192 cur_logs = fnmatch.filter(os.listdir(self.backup_dir), "*gerLog*") 193 for o in orig_logs: 194 if self.archive == 'true': 195 self.assertEqual(False, o in cur_logs) 196 else: 197 self.assertEqual(True, o in cur_logs) 198 # 199 # Run printlog and make sure it exits with zero status. 200 # Printlog should not run recovery nor advance the logs. Make sure 201 # it does not. 202 # 203 self.runWt(['-h', self.backup_dir, 'printlog'], outfilename='printlog.out') 204 pr_logs = fnmatch.filter(os.listdir(self.backup_dir), "*gerLog*") 205 self.assertEqual(cur_logs, pr_logs) 206 207 def test_ops(self): 208 self.backup_dir = os.path.join(self.home, "WT_BACKUP") 209 self.session2 = self.conn.open_session() 210 self.debug("Creating %s with config '%s'" % (self.uri, self.create_params)) 211 self.session.create(self.uri, self.create_params) 212 # Set up the table with entries for 1, 2, 10 and 11. 213 # We use the overwrite config so insert can update as needed. 214 c = self.session.open_cursor(self.uri, None, 'overwrite') 215 c[1] = c[2] = c[10] = c[11] = 1 216 current = {1:1, 2:1, 10:1, 11:1} 217 committed = current.copy() 218 219 reopen = self.conn_list[ 220 self.scenario_number % len(self.conn_list)] 221 ops = (self.op1, self.op2, self.op3, self.op4) 222 txns = (self.txn1, self.txn2, self.txn3, self.txn4) 223 for i, ot in enumerate(zip(ops, txns)): 224 ok, txn = ot 225 op, k = ok 226 self.debug('{}({})[{}]'.format(ok[0], ok[1], txn)) 227 228 # Close and reopen the connection and cursor. 229 if reopen == 'reopen': 230 self.reopen_conn() 231 self.session2 = self.conn.open_session() 232 c = self.session.open_cursor(self.uri, None, 'overwrite') 233 234 self.session.begin_transaction( 235 (self.scenario_number % 2) and 'sync' or None) 236 # Test multiple operations per transaction by always 237 # doing the same operation on key k + 1. 238 k1 = k + 1 239 self.debug('Operation. Num: %d: %s(%d)[%s]' % (i, ok[0], ok[1], txn)) 240 if op == 'insert' or op == 'update': 241 c[k] = c[k1] = i + 2 242 current[k] = current[k1] = i + 2 243 elif op == 'remove': 244 c.set_key(k) 245 c.remove() 246 c.set_key(k1) 247 c.remove() 248 if k in current: 249 del current[k] 250 if k1 in current: 251 del current[k1] 252 253 self.debug(str(current)) 254 # Check the state after each operation. 255 self.check_all(current, committed) 256 257 if txn == 'commit': 258 committed = current.copy() 259 self.session.commit_transaction() 260 elif txn == 'rollback': 261 current = committed.copy() 262 self.session.rollback_transaction() 263 264 # Check the state after each commit/rollback. 265 self.check_all(current, committed) 266 267 # check_log() is slow, we don't run it on every scenario. 268 if self.scenario_number % test_txn02.checklog_mod == 0: 269 self.check_log(committed) 270 271if __name__ == '__main__': 272 wttest.run() 273