1#-*- coding: iso-8859-1 -*-
2# pysqlite2/test/regression.py: pysqlite regression tests
3#
4# Copyright (C) 2006-2007 Gerhard H�ring <gh@ghaering.de>
5#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty.  In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17#    claim that you wrote the original software. If you use this software
18#    in a product, an acknowledgment in the product documentation would be
19#    appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21#    misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
24import datetime
25import unittest
26import sqlite3 as sqlite
27import weakref
28from test import support
29
30class RegressionTests(unittest.TestCase):
31    def setUp(self):
32        self.con = sqlite.connect(":memory:")
33
34    def tearDown(self):
35        self.con.close()
36
37    def CheckPragmaUserVersion(self):
38        # This used to crash pysqlite because this pragma command returns NULL for the column name
39        cur = self.con.cursor()
40        cur.execute("pragma user_version")
41
42    def CheckPragmaSchemaVersion(self):
43        # This still crashed pysqlite <= 2.2.1
44        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
45        try:
46            cur = self.con.cursor()
47            cur.execute("pragma schema_version")
48        finally:
49            cur.close()
50            con.close()
51
52    def CheckStatementReset(self):
53        # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
54        # reset before a rollback, but only those that are still in the
55        # statement cache. The others are not accessible from the connection object.
56        con = sqlite.connect(":memory:", cached_statements=5)
57        cursors = [con.cursor() for x in xrange(5)]
58        cursors[0].execute("create table test(x)")
59        for i in range(10):
60            cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in xrange(10)])
61
62        for i in range(5):
63            cursors[i].execute(" " * i + "select x from test")
64
65        con.rollback()
66
67    def CheckColumnNameWithSpaces(self):
68        cur = self.con.cursor()
69        cur.execute('select 1 as "foo bar [datetime]"')
70        self.assertEqual(cur.description[0][0], "foo bar")
71
72        cur.execute('select 1 as "foo baz"')
73        self.assertEqual(cur.description[0][0], "foo baz")
74
75    def CheckStatementFinalizationOnCloseDb(self):
76        # pysqlite versions <= 2.3.3 only finalized statements in the statement
77        # cache when closing the database. statements that were still
78        # referenced in cursors weren't closed and could provoke "
79        # "OperationalError: Unable to close due to unfinalised statements".
80        con = sqlite.connect(":memory:")
81        cursors = []
82        # default statement cache size is 100
83        for i in range(105):
84            cur = con.cursor()
85            cursors.append(cur)
86            cur.execute("select 1 x union select " + str(i))
87        con.close()
88
89    def CheckOnConflictRollback(self):
90        if sqlite.sqlite_version_info < (3, 2, 2):
91            return
92        con = sqlite.connect(":memory:")
93        con.execute("create table foo(x, unique(x) on conflict rollback)")
94        con.execute("insert into foo(x) values (1)")
95        try:
96            con.execute("insert into foo(x) values (1)")
97        except sqlite.DatabaseError:
98            pass
99        con.execute("insert into foo(x) values (2)")
100        try:
101            con.commit()
102        except sqlite.OperationalError:
103            self.fail("pysqlite knew nothing about the implicit ROLLBACK")
104
105    def CheckWorkaroundForBuggySqliteTransferBindings(self):
106        """
107        pysqlite would crash with older SQLite versions unless
108        a workaround is implemented.
109        """
110        self.con.execute("create table foo(bar)")
111        self.con.execute("drop table foo")
112        self.con.execute("create table foo(bar)")
113
114    def CheckEmptyStatement(self):
115        """
116        pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
117        for "no-operation" statements
118        """
119        self.con.execute("")
120
121    def CheckUnicodeConnect(self):
122        """
123        With pysqlite 2.4.0 you needed to use a string or an APSW connection
124        object for opening database connections.
125
126        Formerly, both bytestrings and unicode strings used to work.
127
128        Let's make sure unicode strings work in the future.
129        """
130        con = sqlite.connect(u":memory:")
131        con.close()
132
133    def CheckTypeMapUsage(self):
134        """
135        pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
136        a statement. This test exhibits the problem.
137        """
138        SELECT = "select * from foo"
139        con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
140        con.execute("create table foo(bar timestamp)")
141        con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
142        con.execute(SELECT)
143        con.execute("drop table foo")
144        con.execute("create table foo(bar integer)")
145        con.execute("insert into foo(bar) values (5)")
146        con.execute(SELECT)
147
148    def CheckRegisterAdapter(self):
149        """
150        See issue 3312.
151        """
152        self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
153
154    def CheckSetIsolationLevel(self):
155        """
156        See issue 3312.
157        """
158        con = sqlite.connect(":memory:")
159        self.assertRaises(UnicodeEncodeError, setattr, con,
160                          "isolation_level", u"\xe9")
161
162    def CheckCursorConstructorCallCheck(self):
163        """
164        Verifies that cursor methods check whether base class __init__ was
165        called.
166        """
167        class Cursor(sqlite.Cursor):
168            def __init__(self, con):
169                pass
170
171        con = sqlite.connect(":memory:")
172        cur = Cursor(con)
173        try:
174            cur.execute("select 4+5").fetchall()
175            self.fail("should have raised ProgrammingError")
176        except sqlite.ProgrammingError:
177            pass
178        except:
179            self.fail("should have raised ProgrammingError")
180        with self.assertRaisesRegexp(sqlite.ProgrammingError,
181                                     r'^Base Cursor\.__init__ not called\.$'):
182            cur.close()
183
184    def CheckConnectionConstructorCallCheck(self):
185        """
186        Verifies that connection methods check whether base class __init__ was
187        called.
188        """
189        class Connection(sqlite.Connection):
190            def __init__(self, name):
191                pass
192
193        con = Connection(":memory:")
194        try:
195            cur = con.cursor()
196            self.fail("should have raised ProgrammingError")
197        except sqlite.ProgrammingError:
198            pass
199        except:
200            self.fail("should have raised ProgrammingError")
201
202    def CheckCursorRegistration(self):
203        """
204        Verifies that subclassed cursor classes are correctly registered with
205        the connection object, too.  (fetch-across-rollback problem)
206        """
207        class Connection(sqlite.Connection):
208            def cursor(self):
209                return Cursor(self)
210
211        class Cursor(sqlite.Cursor):
212            def __init__(self, con):
213                sqlite.Cursor.__init__(self, con)
214
215        con = Connection(":memory:")
216        cur = con.cursor()
217        cur.execute("create table foo(x)")
218        cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)])
219        cur.execute("select x from foo")
220        con.rollback()
221        try:
222            cur.fetchall()
223            self.fail("should have raised InterfaceError")
224        except sqlite.InterfaceError:
225            pass
226        except:
227            self.fail("should have raised InterfaceError")
228
229    def CheckAutoCommit(self):
230        """
231        Verifies that creating a connection in autocommit mode works.
232        2.5.3 introduced a regression so that these could no longer
233        be created.
234        """
235        con = sqlite.connect(":memory:", isolation_level=None)
236
237    def CheckPragmaAutocommit(self):
238        """
239        Verifies that running a PRAGMA statement that does an autocommit does
240        work. This did not work in 2.5.3/2.5.4.
241        """
242        cur = self.con.cursor()
243        cur.execute("create table foo(bar)")
244        cur.execute("insert into foo(bar) values (5)")
245
246        cur.execute("pragma page_size")
247        row = cur.fetchone()
248
249    def CheckConnectionCall(self):
250        """
251        Call a connection with a non-string SQL request: check error handling
252        of the statement constructor.
253        """
254        self.assertRaises(sqlite.Warning, self.con, 1)
255
256    def CheckRecursiveCursorUse(self):
257        """
258        http://bugs.python.org/issue10811
259
260        Recursively using a cursor, such as when reusing it from a generator led to segfaults.
261        Now we catch recursive cursor usage and raise a ProgrammingError.
262        """
263        con = sqlite.connect(":memory:")
264
265        cur = con.cursor()
266        cur.execute("create table a (bar)")
267        cur.execute("create table b (baz)")
268
269        def foo():
270            cur.execute("insert into a (bar) values (?)", (1,))
271            yield 1
272
273        with self.assertRaises(sqlite.ProgrammingError):
274            cur.executemany("insert into b (baz) values (?)",
275                            ((i,) for i in foo()))
276
277    def CheckConvertTimestampMicrosecondPadding(self):
278        """
279        http://bugs.python.org/issue14720
280
281        The microsecond parsing of convert_timestamp() should pad with zeros,
282        since the microsecond string "456" actually represents "456000".
283        """
284
285        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
286        cur = con.cursor()
287        cur.execute("CREATE TABLE t (x TIMESTAMP)")
288
289        # Microseconds should be 456000
290        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
291
292        # Microseconds should be truncated to 123456
293        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
294
295        cur.execute("SELECT * FROM t")
296        values = [x[0] for x in cur.fetchall()]
297
298        self.assertEqual(values, [
299            datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
300            datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
301        ])
302
303    def CheckInvalidIsolationLevelType(self):
304        # isolation level is a string, not an integer
305        self.assertRaises(TypeError,
306                          sqlite.connect, ":memory:", isolation_level=123)
307
308
309    def CheckNullCharacter(self):
310        # Issue #21147
311        con = sqlite.connect(":memory:")
312        self.assertRaises(ValueError, con, "\0select 1")
313        self.assertRaises(ValueError, con, "select 1\0")
314        cur = con.cursor()
315        self.assertRaises(ValueError, cur.execute, " \0select 2")
316        self.assertRaises(ValueError, cur.execute, "select 2\0")
317
318    def CheckCommitCursorReset(self):
319        """
320        Connection.commit() did reset cursors, which made sqlite3
321        to return rows multiple times when fetched from cursors
322        after commit. See issues 10513 and 23129 for details.
323        """
324        con = sqlite.connect(":memory:")
325        con.executescript("""
326        create table t(c);
327        create table t2(c);
328        insert into t values(0);
329        insert into t values(1);
330        insert into t values(2);
331        """)
332
333        self.assertEqual(con.isolation_level, "")
334
335        counter = 0
336        for i, row in enumerate(con.execute("select c from t")):
337            con.execute("insert into t2(c) values (?)", (i,))
338            con.commit()
339            if counter == 0:
340                self.assertEqual(row[0], 0)
341            elif counter == 1:
342                self.assertEqual(row[0], 1)
343            elif counter == 2:
344                self.assertEqual(row[0], 2)
345            counter += 1
346        self.assertEqual(counter, 3, "should have returned exactly three rows")
347
348    def CheckBpo31770(self):
349        """
350        The interpreter shouldn't crash in case Cursor.__init__() is called
351        more than once.
352        """
353        def callback(*args):
354            pass
355        con = sqlite.connect(":memory:")
356        cur = sqlite.Cursor(con)
357        ref = weakref.ref(cur, callback)
358        cur.__init__(con)
359        del cur
360        # The interpreter shouldn't crash when ref is collected.
361        del ref
362        support.gc_collect()
363
364    def CheckDelIsolation_levelSegfault(self):
365        with self.assertRaises(AttributeError):
366            del self.con.isolation_level
367
368
369class UnhashableFunc:
370    def __hash__(self):
371        raise TypeError('unhashable type')
372
373    def __init__(self, return_value=None):
374        self.calls = 0
375        self.return_value = return_value
376
377    def __call__(self, *args, **kwargs):
378        self.calls += 1
379        return self.return_value
380
381
382class UnhashableCallbacksTestCase(unittest.TestCase):
383    """
384    https://bugs.python.org/issue34052
385
386    Registering unhashable callbacks raises TypeError, callbacks are not
387    registered in SQLite after such registration attempt.
388    """
389    def setUp(self):
390        self.con = sqlite.connect(':memory:')
391
392    def tearDown(self):
393        self.con.close()
394
395    def test_progress_handler(self):
396        f = UnhashableFunc(return_value=0)
397        with self.assertRaisesRegexp(TypeError, 'unhashable type'):
398            self.con.set_progress_handler(f, 1)
399        self.con.execute('SELECT 1')
400        self.assertFalse(f.calls)
401
402    def test_func(self):
403        func_name = 'func_name'
404        f = UnhashableFunc()
405        with self.assertRaisesRegexp(TypeError, 'unhashable type'):
406            self.con.create_function(func_name, 0, f)
407        msg = 'no such function: %s' % func_name
408        with self.assertRaisesRegexp(sqlite.OperationalError, msg):
409            self.con.execute('SELECT %s()' % func_name)
410        self.assertFalse(f.calls)
411
412    def test_authorizer(self):
413        f = UnhashableFunc(return_value=sqlite.SQLITE_DENY)
414        with self.assertRaisesRegexp(TypeError, 'unhashable type'):
415            self.con.set_authorizer(f)
416        self.con.execute('SELECT 1')
417        self.assertFalse(f.calls)
418
419    def test_aggr(self):
420        class UnhashableType(type):
421            __hash__ = None
422        aggr_name = 'aggr_name'
423        with self.assertRaisesRegexp(TypeError, 'unhashable type'):
424            self.con.create_aggregate(aggr_name, 0, UnhashableType('Aggr', (), {}))
425        msg = 'no such function: %s' % aggr_name
426        with self.assertRaisesRegexp(sqlite.OperationalError, msg):
427            self.con.execute('SELECT %s()' % aggr_name)
428
429
430def suite():
431    regression_suite = unittest.makeSuite(RegressionTests, "Check")
432    return unittest.TestSuite((
433        regression_suite,
434        unittest.makeSuite(UnhashableCallbacksTestCase),
435    ))
436
437def test():
438    runner = unittest.TextTestRunner()
439    runner.run(suite())
440
441if __name__ == "__main__":
442    test()
443