1#-*- coding: iso-8859-1 -*- 2# pysqlite2/test/regression.py: pysqlite regression tests 3# 4# Copyright (C) 2006-2007 Gerhard H�ring <gh@ghaering.de> 5# 6# This file is part of pysqlite. 7# 8# This software is provided 'as-is', without any express or implied 9# warranty. In no event will the authors be held liable for any damages 10# arising from the use of this software. 11# 12# Permission is granted to anyone to use this software for any purpose, 13# including commercial applications, and to alter it and redistribute it 14# freely, subject to the following restrictions: 15# 16# 1. The origin of this software must not be misrepresented; you must not 17# claim that you wrote the original software. If you use this software 18# in a product, an acknowledgment in the product documentation would be 19# appreciated but is not required. 20# 2. Altered source versions must be plainly marked as such, and must not be 21# misrepresented as being the original software. 22# 3. This notice may not be removed or altered from any source distribution. 23 24import datetime 25import unittest 26import sqlite3 as sqlite 27import weakref 28from test import support 29 30class RegressionTests(unittest.TestCase): 31 def setUp(self): 32 self.con = sqlite.connect(":memory:") 33 34 def tearDown(self): 35 self.con.close() 36 37 def CheckPragmaUserVersion(self): 38 # This used to crash pysqlite because this pragma command returns NULL for the column name 39 cur = self.con.cursor() 40 cur.execute("pragma user_version") 41 42 def CheckPragmaSchemaVersion(self): 43 # This still crashed pysqlite <= 2.2.1 44 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) 45 try: 46 cur = self.con.cursor() 47 cur.execute("pragma schema_version") 48 finally: 49 cur.close() 50 con.close() 51 52 def CheckStatementReset(self): 53 # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are 54 # reset before a rollback, but only those that are still in the 55 # statement cache. The others are not accessible from the connection object. 56 con = sqlite.connect(":memory:", cached_statements=5) 57 cursors = [con.cursor() for x in xrange(5)] 58 cursors[0].execute("create table test(x)") 59 for i in range(10): 60 cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in xrange(10)]) 61 62 for i in range(5): 63 cursors[i].execute(" " * i + "select x from test") 64 65 con.rollback() 66 67 def CheckColumnNameWithSpaces(self): 68 cur = self.con.cursor() 69 cur.execute('select 1 as "foo bar [datetime]"') 70 self.assertEqual(cur.description[0][0], "foo bar") 71 72 cur.execute('select 1 as "foo baz"') 73 self.assertEqual(cur.description[0][0], "foo baz") 74 75 def CheckStatementFinalizationOnCloseDb(self): 76 # pysqlite versions <= 2.3.3 only finalized statements in the statement 77 # cache when closing the database. statements that were still 78 # referenced in cursors weren't closed and could provoke " 79 # "OperationalError: Unable to close due to unfinalised statements". 80 con = sqlite.connect(":memory:") 81 cursors = [] 82 # default statement cache size is 100 83 for i in range(105): 84 cur = con.cursor() 85 cursors.append(cur) 86 cur.execute("select 1 x union select " + str(i)) 87 con.close() 88 89 def CheckOnConflictRollback(self): 90 if sqlite.sqlite_version_info < (3, 2, 2): 91 return 92 con = sqlite.connect(":memory:") 93 con.execute("create table foo(x, unique(x) on conflict rollback)") 94 con.execute("insert into foo(x) values (1)") 95 try: 96 con.execute("insert into foo(x) values (1)") 97 except sqlite.DatabaseError: 98 pass 99 con.execute("insert into foo(x) values (2)") 100 try: 101 con.commit() 102 except sqlite.OperationalError: 103 self.fail("pysqlite knew nothing about the implicit ROLLBACK") 104 105 def CheckWorkaroundForBuggySqliteTransferBindings(self): 106 """ 107 pysqlite would crash with older SQLite versions unless 108 a workaround is implemented. 109 """ 110 self.con.execute("create table foo(bar)") 111 self.con.execute("drop table foo") 112 self.con.execute("create table foo(bar)") 113 114 def CheckEmptyStatement(self): 115 """ 116 pysqlite used to segfault with SQLite versions 3.5.x. These return NULL 117 for "no-operation" statements 118 """ 119 self.con.execute("") 120 121 def CheckUnicodeConnect(self): 122 """ 123 With pysqlite 2.4.0 you needed to use a string or an APSW connection 124 object for opening database connections. 125 126 Formerly, both bytestrings and unicode strings used to work. 127 128 Let's make sure unicode strings work in the future. 129 """ 130 con = sqlite.connect(u":memory:") 131 con.close() 132 133 def CheckTypeMapUsage(self): 134 """ 135 pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling 136 a statement. This test exhibits the problem. 137 """ 138 SELECT = "select * from foo" 139 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES) 140 con.execute("create table foo(bar timestamp)") 141 con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),)) 142 con.execute(SELECT) 143 con.execute("drop table foo") 144 con.execute("create table foo(bar integer)") 145 con.execute("insert into foo(bar) values (5)") 146 con.execute(SELECT) 147 148 def CheckRegisterAdapter(self): 149 """ 150 See issue 3312. 151 """ 152 self.assertRaises(TypeError, sqlite.register_adapter, {}, None) 153 154 def CheckSetIsolationLevel(self): 155 """ 156 See issue 3312. 157 """ 158 con = sqlite.connect(":memory:") 159 self.assertRaises(UnicodeEncodeError, setattr, con, 160 "isolation_level", u"\xe9") 161 162 def CheckCursorConstructorCallCheck(self): 163 """ 164 Verifies that cursor methods check whether base class __init__ was 165 called. 166 """ 167 class Cursor(sqlite.Cursor): 168 def __init__(self, con): 169 pass 170 171 con = sqlite.connect(":memory:") 172 cur = Cursor(con) 173 try: 174 cur.execute("select 4+5").fetchall() 175 self.fail("should have raised ProgrammingError") 176 except sqlite.ProgrammingError: 177 pass 178 except: 179 self.fail("should have raised ProgrammingError") 180 with self.assertRaisesRegexp(sqlite.ProgrammingError, 181 r'^Base Cursor\.__init__ not called\.$'): 182 cur.close() 183 184 def CheckConnectionConstructorCallCheck(self): 185 """ 186 Verifies that connection methods check whether base class __init__ was 187 called. 188 """ 189 class Connection(sqlite.Connection): 190 def __init__(self, name): 191 pass 192 193 con = Connection(":memory:") 194 try: 195 cur = con.cursor() 196 self.fail("should have raised ProgrammingError") 197 except sqlite.ProgrammingError: 198 pass 199 except: 200 self.fail("should have raised ProgrammingError") 201 202 def CheckCursorRegistration(self): 203 """ 204 Verifies that subclassed cursor classes are correctly registered with 205 the connection object, too. (fetch-across-rollback problem) 206 """ 207 class Connection(sqlite.Connection): 208 def cursor(self): 209 return Cursor(self) 210 211 class Cursor(sqlite.Cursor): 212 def __init__(self, con): 213 sqlite.Cursor.__init__(self, con) 214 215 con = Connection(":memory:") 216 cur = con.cursor() 217 cur.execute("create table foo(x)") 218 cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)]) 219 cur.execute("select x from foo") 220 con.rollback() 221 try: 222 cur.fetchall() 223 self.fail("should have raised InterfaceError") 224 except sqlite.InterfaceError: 225 pass 226 except: 227 self.fail("should have raised InterfaceError") 228 229 def CheckAutoCommit(self): 230 """ 231 Verifies that creating a connection in autocommit mode works. 232 2.5.3 introduced a regression so that these could no longer 233 be created. 234 """ 235 con = sqlite.connect(":memory:", isolation_level=None) 236 237 def CheckPragmaAutocommit(self): 238 """ 239 Verifies that running a PRAGMA statement that does an autocommit does 240 work. This did not work in 2.5.3/2.5.4. 241 """ 242 cur = self.con.cursor() 243 cur.execute("create table foo(bar)") 244 cur.execute("insert into foo(bar) values (5)") 245 246 cur.execute("pragma page_size") 247 row = cur.fetchone() 248 249 def CheckConnectionCall(self): 250 """ 251 Call a connection with a non-string SQL request: check error handling 252 of the statement constructor. 253 """ 254 self.assertRaises(sqlite.Warning, self.con, 1) 255 256 def CheckRecursiveCursorUse(self): 257 """ 258 http://bugs.python.org/issue10811 259 260 Recursively using a cursor, such as when reusing it from a generator led to segfaults. 261 Now we catch recursive cursor usage and raise a ProgrammingError. 262 """ 263 con = sqlite.connect(":memory:") 264 265 cur = con.cursor() 266 cur.execute("create table a (bar)") 267 cur.execute("create table b (baz)") 268 269 def foo(): 270 cur.execute("insert into a (bar) values (?)", (1,)) 271 yield 1 272 273 with self.assertRaises(sqlite.ProgrammingError): 274 cur.executemany("insert into b (baz) values (?)", 275 ((i,) for i in foo())) 276 277 def CheckConvertTimestampMicrosecondPadding(self): 278 """ 279 http://bugs.python.org/issue14720 280 281 The microsecond parsing of convert_timestamp() should pad with zeros, 282 since the microsecond string "456" actually represents "456000". 283 """ 284 285 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) 286 cur = con.cursor() 287 cur.execute("CREATE TABLE t (x TIMESTAMP)") 288 289 # Microseconds should be 456000 290 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')") 291 292 # Microseconds should be truncated to 123456 293 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')") 294 295 cur.execute("SELECT * FROM t") 296 values = [x[0] for x in cur.fetchall()] 297 298 self.assertEqual(values, [ 299 datetime.datetime(2012, 4, 4, 15, 6, 0, 456000), 300 datetime.datetime(2012, 4, 4, 15, 6, 0, 123456), 301 ]) 302 303 def CheckInvalidIsolationLevelType(self): 304 # isolation level is a string, not an integer 305 self.assertRaises(TypeError, 306 sqlite.connect, ":memory:", isolation_level=123) 307 308 309 def CheckNullCharacter(self): 310 # Issue #21147 311 con = sqlite.connect(":memory:") 312 self.assertRaises(ValueError, con, "\0select 1") 313 self.assertRaises(ValueError, con, "select 1\0") 314 cur = con.cursor() 315 self.assertRaises(ValueError, cur.execute, " \0select 2") 316 self.assertRaises(ValueError, cur.execute, "select 2\0") 317 318 def CheckCommitCursorReset(self): 319 """ 320 Connection.commit() did reset cursors, which made sqlite3 321 to return rows multiple times when fetched from cursors 322 after commit. See issues 10513 and 23129 for details. 323 """ 324 con = sqlite.connect(":memory:") 325 con.executescript(""" 326 create table t(c); 327 create table t2(c); 328 insert into t values(0); 329 insert into t values(1); 330 insert into t values(2); 331 """) 332 333 self.assertEqual(con.isolation_level, "") 334 335 counter = 0 336 for i, row in enumerate(con.execute("select c from t")): 337 con.execute("insert into t2(c) values (?)", (i,)) 338 con.commit() 339 if counter == 0: 340 self.assertEqual(row[0], 0) 341 elif counter == 1: 342 self.assertEqual(row[0], 1) 343 elif counter == 2: 344 self.assertEqual(row[0], 2) 345 counter += 1 346 self.assertEqual(counter, 3, "should have returned exactly three rows") 347 348 def CheckBpo31770(self): 349 """ 350 The interpreter shouldn't crash in case Cursor.__init__() is called 351 more than once. 352 """ 353 def callback(*args): 354 pass 355 con = sqlite.connect(":memory:") 356 cur = sqlite.Cursor(con) 357 ref = weakref.ref(cur, callback) 358 cur.__init__(con) 359 del cur 360 # The interpreter shouldn't crash when ref is collected. 361 del ref 362 support.gc_collect() 363 364 def CheckDelIsolation_levelSegfault(self): 365 with self.assertRaises(AttributeError): 366 del self.con.isolation_level 367 368 369class UnhashableFunc: 370 def __hash__(self): 371 raise TypeError('unhashable type') 372 373 def __init__(self, return_value=None): 374 self.calls = 0 375 self.return_value = return_value 376 377 def __call__(self, *args, **kwargs): 378 self.calls += 1 379 return self.return_value 380 381 382class UnhashableCallbacksTestCase(unittest.TestCase): 383 """ 384 https://bugs.python.org/issue34052 385 386 Registering unhashable callbacks raises TypeError, callbacks are not 387 registered in SQLite after such registration attempt. 388 """ 389 def setUp(self): 390 self.con = sqlite.connect(':memory:') 391 392 def tearDown(self): 393 self.con.close() 394 395 def test_progress_handler(self): 396 f = UnhashableFunc(return_value=0) 397 with self.assertRaisesRegexp(TypeError, 'unhashable type'): 398 self.con.set_progress_handler(f, 1) 399 self.con.execute('SELECT 1') 400 self.assertFalse(f.calls) 401 402 def test_func(self): 403 func_name = 'func_name' 404 f = UnhashableFunc() 405 with self.assertRaisesRegexp(TypeError, 'unhashable type'): 406 self.con.create_function(func_name, 0, f) 407 msg = 'no such function: %s' % func_name 408 with self.assertRaisesRegexp(sqlite.OperationalError, msg): 409 self.con.execute('SELECT %s()' % func_name) 410 self.assertFalse(f.calls) 411 412 def test_authorizer(self): 413 f = UnhashableFunc(return_value=sqlite.SQLITE_DENY) 414 with self.assertRaisesRegexp(TypeError, 'unhashable type'): 415 self.con.set_authorizer(f) 416 self.con.execute('SELECT 1') 417 self.assertFalse(f.calls) 418 419 def test_aggr(self): 420 class UnhashableType(type): 421 __hash__ = None 422 aggr_name = 'aggr_name' 423 with self.assertRaisesRegexp(TypeError, 'unhashable type'): 424 self.con.create_aggregate(aggr_name, 0, UnhashableType('Aggr', (), {})) 425 msg = 'no such function: %s' % aggr_name 426 with self.assertRaisesRegexp(sqlite.OperationalError, msg): 427 self.con.execute('SELECT %s()' % aggr_name) 428 429 430def suite(): 431 regression_suite = unittest.makeSuite(RegressionTests, "Check") 432 return unittest.TestSuite(( 433 regression_suite, 434 unittest.makeSuite(UnhashableCallbacksTestCase), 435 )) 436 437def test(): 438 runner = unittest.TextTestRunner() 439 runner.run(suite()) 440 441if __name__ == "__main__": 442 test() 443