1#-*- coding: iso-8859-1 -*-
2# pysqlite2/test/regression.py: pysqlite regression tests
3#
4# Copyright (C) 2006-2010 Gerhard H�ring <gh@ghaering.de>
5#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty.  In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17#    claim that you wrote the original software. If you use this software
18#    in a product, an acknowledgment in the product documentation would be
19#    appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21#    misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
24import datetime
25import unittest
26import sqlite3 as sqlite
27import weakref
28import functools
29from test import support
30
31class RegressionTests(unittest.TestCase):
32    def setUp(self):
33        self.con = sqlite.connect(":memory:")
34
35    def tearDown(self):
36        self.con.close()
37
38    def CheckPragmaUserVersion(self):
39        # This used to crash pysqlite because this pragma command returns NULL for the column name
40        cur = self.con.cursor()
41        cur.execute("pragma user_version")
42
43    def CheckPragmaSchemaVersion(self):
44        # This still crashed pysqlite <= 2.2.1
45        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
46        try:
47            cur = self.con.cursor()
48            cur.execute("pragma schema_version")
49        finally:
50            cur.close()
51            con.close()
52
53    def CheckStatementReset(self):
54        # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
55        # reset before a rollback, but only those that are still in the
56        # statement cache. The others are not accessible from the connection object.
57        con = sqlite.connect(":memory:", cached_statements=5)
58        cursors = [con.cursor() for x in range(5)]
59        cursors[0].execute("create table test(x)")
60        for i in range(10):
61            cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
62
63        for i in range(5):
64            cursors[i].execute(" " * i + "select x from test")
65
66        con.rollback()
67
68    def CheckColumnNameWithSpaces(self):
69        cur = self.con.cursor()
70        cur.execute('select 1 as "foo bar [datetime]"')
71        self.assertEqual(cur.description[0][0], "foo bar [datetime]")
72
73        cur.execute('select 1 as "foo baz"')
74        self.assertEqual(cur.description[0][0], "foo baz")
75
76    def CheckStatementFinalizationOnCloseDb(self):
77        # pysqlite versions <= 2.3.3 only finalized statements in the statement
78        # cache when closing the database. statements that were still
79        # referenced in cursors weren't closed and could provoke "
80        # "OperationalError: Unable to close due to unfinalised statements".
81        con = sqlite.connect(":memory:")
82        cursors = []
83        # default statement cache size is 100
84        for i in range(105):
85            cur = con.cursor()
86            cursors.append(cur)
87            cur.execute("select 1 x union select " + str(i))
88        con.close()
89
90    @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2), 'needs sqlite 3.2.2 or newer')
91    def CheckOnConflictRollback(self):
92        con = sqlite.connect(":memory:")
93        con.execute("create table foo(x, unique(x) on conflict rollback)")
94        con.execute("insert into foo(x) values (1)")
95        try:
96            con.execute("insert into foo(x) values (1)")
97        except sqlite.DatabaseError:
98            pass
99        con.execute("insert into foo(x) values (2)")
100        try:
101            con.commit()
102        except sqlite.OperationalError:
103            self.fail("pysqlite knew nothing about the implicit ROLLBACK")
104
105    def CheckWorkaroundForBuggySqliteTransferBindings(self):
106        """
107        pysqlite would crash with older SQLite versions unless
108        a workaround is implemented.
109        """
110        self.con.execute("create table foo(bar)")
111        self.con.execute("drop table foo")
112        self.con.execute("create table foo(bar)")
113
114    def CheckEmptyStatement(self):
115        """
116        pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
117        for "no-operation" statements
118        """
119        self.con.execute("")
120
121    def CheckTypeMapUsage(self):
122        """
123        pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
124        a statement. This test exhibits the problem.
125        """
126        SELECT = "select * from foo"
127        con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
128        con.execute("create table foo(bar timestamp)")
129        con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
130        con.execute(SELECT)
131        con.execute("drop table foo")
132        con.execute("create table foo(bar integer)")
133        con.execute("insert into foo(bar) values (5)")
134        con.execute(SELECT)
135
136    def CheckBindMutatingList(self):
137        # Issue41662: Crash when mutate a list of parameters during iteration.
138        class X:
139            def __conform__(self, protocol):
140                parameters.clear()
141                return "..."
142        parameters = [X(), 0]
143        con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
144        con.execute("create table foo(bar X, baz integer)")
145        # Should not crash
146        with self.assertRaises(IndexError):
147            con.execute("insert into foo(bar, baz) values (?, ?)", parameters)
148
149    def CheckErrorMsgDecodeError(self):
150        # When porting the module to Python 3.0, the error message about
151        # decoding errors disappeared. This verifies they're back again.
152        with self.assertRaises(sqlite.OperationalError) as cm:
153            self.con.execute("select 'xxx' || ? || 'yyy' colname",
154                             (bytes(bytearray([250])),)).fetchone()
155        msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
156        self.assertIn(msg, str(cm.exception))
157
158    def CheckRegisterAdapter(self):
159        """
160        See issue 3312.
161        """
162        self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
163
164    def CheckSetIsolationLevel(self):
165        # See issue 27881.
166        class CustomStr(str):
167            def upper(self):
168                return None
169            def __del__(self):
170                con.isolation_level = ""
171
172        con = sqlite.connect(":memory:")
173        con.isolation_level = None
174        for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
175            with self.subTest(level=level):
176                con.isolation_level = level
177                con.isolation_level = level.lower()
178                con.isolation_level = level.capitalize()
179                con.isolation_level = CustomStr(level)
180
181        # setting isolation_level failure should not alter previous state
182        con.isolation_level = None
183        con.isolation_level = "DEFERRED"
184        pairs = [
185            (1, TypeError), (b'', TypeError), ("abc", ValueError),
186            ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
187        ]
188        for value, exc in pairs:
189            with self.subTest(level=value):
190                with self.assertRaises(exc):
191                    con.isolation_level = value
192                self.assertEqual(con.isolation_level, "DEFERRED")
193
194    def CheckCursorConstructorCallCheck(self):
195        """
196        Verifies that cursor methods check whether base class __init__ was
197        called.
198        """
199        class Cursor(sqlite.Cursor):
200            def __init__(self, con):
201                pass
202
203        con = sqlite.connect(":memory:")
204        cur = Cursor(con)
205        with self.assertRaises(sqlite.ProgrammingError):
206            cur.execute("select 4+5").fetchall()
207        with self.assertRaisesRegex(sqlite.ProgrammingError,
208                                    r'^Base Cursor\.__init__ not called\.$'):
209            cur.close()
210
211    def CheckStrSubclass(self):
212        """
213        The Python 3.0 port of the module didn't cope with values of subclasses of str.
214        """
215        class MyStr(str): pass
216        self.con.execute("select ?", (MyStr("abc"),))
217
218    def CheckConnectionConstructorCallCheck(self):
219        """
220        Verifies that connection methods check whether base class __init__ was
221        called.
222        """
223        class Connection(sqlite.Connection):
224            def __init__(self, name):
225                pass
226
227        con = Connection(":memory:")
228        with self.assertRaises(sqlite.ProgrammingError):
229            cur = con.cursor()
230
231    def CheckCursorRegistration(self):
232        """
233        Verifies that subclassed cursor classes are correctly registered with
234        the connection object, too.  (fetch-across-rollback problem)
235        """
236        class Connection(sqlite.Connection):
237            def cursor(self):
238                return Cursor(self)
239
240        class Cursor(sqlite.Cursor):
241            def __init__(self, con):
242                sqlite.Cursor.__init__(self, con)
243
244        con = Connection(":memory:")
245        cur = con.cursor()
246        cur.execute("create table foo(x)")
247        cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)])
248        cur.execute("select x from foo")
249        con.rollback()
250        with self.assertRaises(sqlite.InterfaceError):
251            cur.fetchall()
252
253    def CheckAutoCommit(self):
254        """
255        Verifies that creating a connection in autocommit mode works.
256        2.5.3 introduced a regression so that these could no longer
257        be created.
258        """
259        con = sqlite.connect(":memory:", isolation_level=None)
260
261    def CheckPragmaAutocommit(self):
262        """
263        Verifies that running a PRAGMA statement that does an autocommit does
264        work. This did not work in 2.5.3/2.5.4.
265        """
266        cur = self.con.cursor()
267        cur.execute("create table foo(bar)")
268        cur.execute("insert into foo(bar) values (5)")
269
270        cur.execute("pragma page_size")
271        row = cur.fetchone()
272
273    def CheckConnectionCall(self):
274        """
275        Call a connection with a non-string SQL request: check error handling
276        of the statement constructor.
277        """
278        self.assertRaises(sqlite.Warning, self.con, 1)
279
280    def CheckCollation(self):
281        def collation_cb(a, b):
282            return 1
283        self.assertRaises(sqlite.ProgrammingError, self.con.create_collation,
284            # Lone surrogate cannot be encoded to the default encoding (utf8)
285            "\uDC80", collation_cb)
286
287    def CheckRecursiveCursorUse(self):
288        """
289        http://bugs.python.org/issue10811
290
291        Recursively using a cursor, such as when reusing it from a generator led to segfaults.
292        Now we catch recursive cursor usage and raise a ProgrammingError.
293        """
294        con = sqlite.connect(":memory:")
295
296        cur = con.cursor()
297        cur.execute("create table a (bar)")
298        cur.execute("create table b (baz)")
299
300        def foo():
301            cur.execute("insert into a (bar) values (?)", (1,))
302            yield 1
303
304        with self.assertRaises(sqlite.ProgrammingError):
305            cur.executemany("insert into b (baz) values (?)",
306                            ((i,) for i in foo()))
307
308    def CheckConvertTimestampMicrosecondPadding(self):
309        """
310        http://bugs.python.org/issue14720
311
312        The microsecond parsing of convert_timestamp() should pad with zeros,
313        since the microsecond string "456" actually represents "456000".
314        """
315
316        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
317        cur = con.cursor()
318        cur.execute("CREATE TABLE t (x TIMESTAMP)")
319
320        # Microseconds should be 456000
321        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
322
323        # Microseconds should be truncated to 123456
324        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
325
326        cur.execute("SELECT * FROM t")
327        values = [x[0] for x in cur.fetchall()]
328
329        self.assertEqual(values, [
330            datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
331            datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
332        ])
333
334    def CheckInvalidIsolationLevelType(self):
335        # isolation level is a string, not an integer
336        self.assertRaises(TypeError,
337                          sqlite.connect, ":memory:", isolation_level=123)
338
339
340    def CheckNullCharacter(self):
341        # Issue #21147
342        con = sqlite.connect(":memory:")
343        self.assertRaises(ValueError, con, "\0select 1")
344        self.assertRaises(ValueError, con, "select 1\0")
345        cur = con.cursor()
346        self.assertRaises(ValueError, cur.execute, " \0select 2")
347        self.assertRaises(ValueError, cur.execute, "select 2\0")
348
349    def CheckCommitCursorReset(self):
350        """
351        Connection.commit() did reset cursors, which made sqlite3
352        to return rows multiple times when fetched from cursors
353        after commit. See issues 10513 and 23129 for details.
354        """
355        con = sqlite.connect(":memory:")
356        con.executescript("""
357        create table t(c);
358        create table t2(c);
359        insert into t values(0);
360        insert into t values(1);
361        insert into t values(2);
362        """)
363
364        self.assertEqual(con.isolation_level, "")
365
366        counter = 0
367        for i, row in enumerate(con.execute("select c from t")):
368            with self.subTest(i=i, row=row):
369                con.execute("insert into t2(c) values (?)", (i,))
370                con.commit()
371                if counter == 0:
372                    self.assertEqual(row[0], 0)
373                elif counter == 1:
374                    self.assertEqual(row[0], 1)
375                elif counter == 2:
376                    self.assertEqual(row[0], 2)
377                counter += 1
378        self.assertEqual(counter, 3, "should have returned exactly three rows")
379
380    def CheckBpo31770(self):
381        """
382        The interpreter shouldn't crash in case Cursor.__init__() is called
383        more than once.
384        """
385        def callback(*args):
386            pass
387        con = sqlite.connect(":memory:")
388        cur = sqlite.Cursor(con)
389        ref = weakref.ref(cur, callback)
390        cur.__init__(con)
391        del cur
392        # The interpreter shouldn't crash when ref is collected.
393        del ref
394        support.gc_collect()
395
396    def CheckDelIsolation_levelSegfault(self):
397        with self.assertRaises(AttributeError):
398            del self.con.isolation_level
399
400    def CheckBpo37347(self):
401        class Printer:
402            def log(self, *args):
403                return sqlite.SQLITE_OK
404
405        for method in [self.con.set_trace_callback,
406                       functools.partial(self.con.set_progress_handler, n=1),
407                       self.con.set_authorizer]:
408            printer_instance = Printer()
409            method(printer_instance.log)
410            method(printer_instance.log)
411            self.con.execute("select 1")  # trigger seg fault
412            method(None)
413
414
415
416def suite():
417    regression_suite = unittest.makeSuite(RegressionTests, "Check")
418    return unittest.TestSuite((
419        regression_suite,
420    ))
421
422def test():
423    runner = unittest.TextTestRunner()
424    runner.run(suite())
425
426if __name__ == "__main__":
427    test()
428