1#!/usr/bin/env python
2#
3# Public Domain 2014-2018 MongoDB, Inc.
4# Public Domain 2008-2014 WiredTiger, Inc.
5#
6# This is free and unencumbered software released into the public domain.
7#
8# Anyone is free to copy, modify, publish, use, compile, sell, or
9# distribute this software, either in source code form or as a compiled
10# binary, for any purpose, commercial or non-commercial, and by any
11# means.
12#
13# In jurisdictions that recognize copyright laws, the author or authors
14# of this software dedicate any and all copyright interest in the
15# software to the public domain. We make this dedication for the benefit
16# of the public at large and to the detriment of our heirs and
17# successors. We intend this dedication to be an overt act of
18# relinquishment in perpetuity of all present and future rights to this
19# software under copyright law.
20#
21# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
22# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
23# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
24# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
25# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
26# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
27# OTHER DEALINGS IN THE SOFTWARE.
28#
29# test_txn02.py
30#   Transactions: commits and rollbacks
31#
32
33import fnmatch, os, shutil, time
34from suite_subprocess import suite_subprocess
35from wtscenario import make_scenarios
36import wttest
37
38class test_txn02(wttest.WiredTigerTestCase, suite_subprocess):
39    logmax = "100K"
40    tablename = 'test_txn02'
41    uri = 'table:' + tablename
42    archive_list = ['true', 'false']
43    conn_list = ['reopen', 'stay_open']
44    sync_list = [
45        '(method=dsync,enabled)',
46        '(method=fsync,enabled)',
47        '(method=none,enabled)',
48        '(enabled=false)'
49    ]
50
51    types = [
52        ('row', dict(tabletype='row',
53                    create_params = 'key_format=i,value_format=i')),
54        ('var', dict(tabletype='var',
55                    create_params = 'key_format=r,value_format=i')),
56        ('fix', dict(tabletype='fix',
57                    create_params = 'key_format=r,value_format=8t')),
58    ]
59    op1s = [
60        ('i4', dict(op1=('insert', 4))),
61        ('r1', dict(op1=('remove', 1))),
62        ('u10', dict(op1=('update', 10))),
63    ]
64    op2s = [
65        ('i6', dict(op2=('insert', 6))),
66        ('r4', dict(op2=('remove', 4))),
67        ('u4', dict(op2=('update', 4))),
68    ]
69    op3s = [
70        ('i12', dict(op3=('insert', 12))),
71        ('r4', dict(op3=('remove', 4))),
72        ('u4', dict(op3=('update', 4))),
73    ]
74    op4s = [
75        ('i14', dict(op4=('insert', 14))),
76        ('r12', dict(op4=('remove', 12))),
77        ('u12', dict(op4=('update', 12))),
78    ]
79    txn1s = [('t1c', dict(txn1='commit')), ('t1r', dict(txn1='rollback'))]
80    txn2s = [('t2c', dict(txn2='commit')), ('t2r', dict(txn2='rollback'))]
81    txn3s = [('t3c', dict(txn3='commit')), ('t3r', dict(txn3='rollback'))]
82    txn4s = [('t4c', dict(txn4='commit')), ('t4r', dict(txn4='rollback'))]
83
84    # This test generates thousands of potential scenarios.
85    # For default runs, we'll use a small subset of them, for
86    # long runs (when --long is set) we'll set a much larger limit.
87    scenarios = make_scenarios(types,
88        op1s, txn1s, op2s, txn2s, op3s, txn3s, op4s, txn4s,
89        prune=20, prunelong=5000)
90
91    # Each check_log() call takes a second, so we don't call it for
92    # every scenario, we'll limit it to the value of checklog_calls.
93    checklog_calls = 100 if wttest.islongtest() else 2
94    checklog_mod = (len(scenarios) / checklog_calls + 1)
95
96    _debug = False
97    def debug(self, msg):
98        if not self._debug:
99            return
100        print(msg)
101
102    def conn_config(self):
103        # Cycle through the different transaction_sync values in a
104        # deterministic manner.
105        txn_sync = self.sync_list[
106            self.scenario_number % len(self.sync_list)]
107        #
108        # We don't want to run zero fill with only the same settings, such
109        # as archive or sync, which are an even number of options.
110        #
111        freq = 3
112        zerofill = 'false'
113        if self.scenario_number % freq == 0:
114            zerofill = 'true'
115        return 'log=(archive=false,enabled,file_max=%s),' % self.logmax + \
116            'log=(zero_fill=%s),' % zerofill + \
117            'transaction_sync="%s",' % txn_sync
118
119    # Check that a cursor (optionally started in a new transaction), sees the
120    # expected values.
121    def check(self, session, txn_config, expected):
122        if txn_config:
123            session.begin_transaction(txn_config)
124        c = session.open_cursor(self.uri, None)
125        actual = dict((k, v) for k, v in c if v != 0)
126        # Search for the expected items as well as iterating
127        for k, v in expected.iteritems():
128            self.assertEqual(c[k], v)
129        c.close()
130        if txn_config:
131            session.commit_transaction()
132        self.assertEqual(actual, expected)
133
134    # Check the state of the system with respect to the current cursor and
135    # different isolation levels.
136    def check_all(self, current, committed):
137        # Transactions see their own changes.
138        # Read-uncommitted transactions see all changes.
139        # Snapshot and read-committed transactions should not see changes.
140        self.check(self.session, None, current)
141        self.check(self.session2, "isolation=snapshot", committed)
142        self.check(self.session2, "isolation=read-committed", committed)
143        self.check(self.session2, "isolation=read-uncommitted", current)
144
145        # Opening a clone of the database home directory should run
146        # recovery and see the committed results.  Flush the log because
147        # the backup may not get all the log records if we are running
148        # without a sync option.  Use sync=off to force a write to the OS.
149        self.session.log_flush('sync=off')
150        self.backup(self.backup_dir)
151        backup_conn_params = 'log=(enabled,file_max=%s)' % self.logmax
152        backup_conn = self.wiredtiger_open(self.backup_dir, backup_conn_params)
153        try:
154            self.check(backup_conn.open_session(), None, committed)
155        finally:
156            backup_conn.close()
157
158    def check_log(self, committed):
159        self.backup(self.backup_dir)
160        #
161        # Open and close the backup connection a few times to force
162        # repeated recovery and log archiving even if later recoveries
163        # are essentially no-ops. Confirm that the backup contains
164        # the committed operations after recovery.
165        #
166        # Cycle through the different archive values in a
167        # deterministic manner.
168        self.archive = self.archive_list[
169            self.scenario_number % len(self.archive_list)]
170        backup_conn_params = \
171            'log=(enabled,file_max=%s,archive=%s)' % (self.logmax, self.archive)
172        orig_logs = fnmatch.filter(os.listdir(self.backup_dir), "*gerLog*")
173        endcount = 2
174        count = 0
175        while count < endcount:
176            backup_conn = self.wiredtiger_open(self.backup_dir,
177                                               backup_conn_params)
178            try:
179                session = backup_conn.open_session()
180            finally:
181                self.check(backup_conn.open_session(), None, committed)
182                # Sleep long enough so that the archive thread is guaranteed
183                # to run before we close the connection.
184                time.sleep(1.0)
185                backup_conn.close()
186            count += 1
187        #
188        # Check logs after repeated openings. The first log should
189        # have been archived if configured. Subsequent openings would not
190        # archive because no checkpoint is written due to no modifications.
191        #
192        cur_logs = fnmatch.filter(os.listdir(self.backup_dir), "*gerLog*")
193        for o in orig_logs:
194            if self.archive == 'true':
195                self.assertEqual(False, o in cur_logs)
196            else:
197                self.assertEqual(True, o in cur_logs)
198        #
199        # Run printlog and make sure it exits with zero status.
200        # Printlog should not run recovery nor advance the logs.  Make sure
201        # it does not.
202        #
203        self.runWt(['-h', self.backup_dir, 'printlog'], outfilename='printlog.out')
204        pr_logs = fnmatch.filter(os.listdir(self.backup_dir), "*gerLog*")
205        self.assertEqual(cur_logs, pr_logs)
206
207    def test_ops(self):
208        self.backup_dir = os.path.join(self.home, "WT_BACKUP")
209        self.session2 = self.conn.open_session()
210        self.debug("Creating %s with config '%s'" % (self.uri, self.create_params))
211        self.session.create(self.uri, self.create_params)
212        # Set up the table with entries for 1, 2, 10 and 11.
213        # We use the overwrite config so insert can update as needed.
214        c = self.session.open_cursor(self.uri, None, 'overwrite')
215        c[1] = c[2] = c[10] = c[11] = 1
216        current = {1:1, 2:1, 10:1, 11:1}
217        committed = current.copy()
218
219        reopen = self.conn_list[
220            self.scenario_number % len(self.conn_list)]
221        ops = (self.op1, self.op2, self.op3, self.op4)
222        txns = (self.txn1, self.txn2, self.txn3, self.txn4)
223        for i, ot in enumerate(zip(ops, txns)):
224            ok, txn = ot
225            op, k = ok
226            self.debug('{}({})[{}]'.format(ok[0], ok[1], txn))
227
228            # Close and reopen the connection and cursor.
229            if reopen == 'reopen':
230                self.reopen_conn()
231                self.session2 = self.conn.open_session()
232                c = self.session.open_cursor(self.uri, None, 'overwrite')
233
234            self.session.begin_transaction(
235                (self.scenario_number % 2) and 'sync' or None)
236            # Test multiple operations per transaction by always
237            # doing the same operation on key k + 1.
238            k1 = k + 1
239            self.debug('Operation. Num: %d: %s(%d)[%s]' % (i, ok[0], ok[1], txn))
240            if op == 'insert' or op == 'update':
241                c[k] = c[k1] = i + 2
242                current[k] = current[k1] = i + 2
243            elif op == 'remove':
244                c.set_key(k)
245                c.remove()
246                c.set_key(k1)
247                c.remove()
248                if k in current:
249                    del current[k]
250                if k1 in current:
251                    del current[k1]
252
253            self.debug(str(current))
254            # Check the state after each operation.
255            self.check_all(current, committed)
256
257            if txn == 'commit':
258                committed = current.copy()
259                self.session.commit_transaction()
260            elif txn == 'rollback':
261                current = committed.copy()
262                self.session.rollback_transaction()
263
264            # Check the state after each commit/rollback.
265            self.check_all(current, committed)
266
267        # check_log() is slow, we don't run it on every scenario.
268        if self.scenario_number % test_txn02.checklog_mod == 0:
269            self.check_log(committed)
270
271if __name__ == '__main__':
272    wttest.run()
273