1import time 2import warnings 3 4import pytest 5 6import pg8000 7 8""" Python DB API 2.0 driver compliance unit test suite. 9 10 This software is Public Domain and may be used without restrictions. 11 12 "Now we have booze and barflies entering the discussion, plus rumours of 13 DBAs on drugs... and I won't tell you what flashes through my mind each 14 time I read the subject line with 'Anal Compliance' in it. All around 15 this is turning out to be a thoroughly unwholesome unit test." 16 17 -- Ian Bicking 18""" 19 20__rcs_id__ = "$Id: dbapi20.py,v 1.10 2003/10/09 03:14:14 zenzen Exp $" 21__version__ = "$Revision: 1.10 $"[11:-2] 22__author__ = "Stuart Bishop <zen@shangri-la.dropbear.id.au>" 23 24 25# $Log: dbapi20.py,v $ 26# Revision 1.10 2003/10/09 03:14:14 zenzen 27# Add test for DB API 2.0 optional extension, where database exceptions 28# are exposed as attributes on the Connection object. 29# 30# Revision 1.9 2003/08/13 01:16:36 zenzen 31# Minor tweak from Stefan Fleiter 32# 33# Revision 1.8 2003/04/10 00:13:25 zenzen 34# Changes, as per suggestions by M.-A. Lemburg 35# - Add a table prefix, to ensure namespace collisions can always be avoided 36# 37# Revision 1.7 2003/02/26 23:33:37 zenzen 38# Break out DDL into helper functions, as per request by David Rushby 39# 40# Revision 1.6 2003/02/21 03:04:33 zenzen 41# Stuff from Henrik Ekelund: 42# added test_None 43# added test_nextset & hooks 44# 45# Revision 1.5 2003/02/17 22:08:43 zenzen 46# Implement suggestions and code from Henrik Eklund - test that 47# cursor.arraysize defaults to 1 & generic cursor.callproc test added 48# 49# Revision 1.4 2003/02/15 00:16:33 zenzen 50# Changes, as per suggestions and bug reports by M.-A. Lemburg, 51# Matthew T. Kromer, Federico Di Gregorio and Daniel Dittmar 52# - Class renamed 53# - Now a subclass of TestCase, to avoid requiring the driver stub 54# to use multiple inheritance 55# - Reversed the polarity of buggy test in test_description 56# - Test exception heirarchy correctly 57# - self.populate is now self._populate(), so if a driver stub 58# overrides self.ddl1 this change propogates 59# - VARCHAR columns now have a width, which will hopefully make the 60# DDL even more portible (this will be reversed if it causes more problems) 61# - cursor.rowcount being checked after various execute and fetchXXX methods 62# - Check for fetchall and fetchmany returning empty lists after results 63# are exhausted (already checking for empty lists if select retrieved 64# nothing 65# - Fix bugs in test_setoutputsize_basic and test_setinputsizes 66# 67 68 69""" Test a database self.driver for DB API 2.0 compatibility. 70 This implementation tests Gadfly, but the TestCase 71 is structured so that other self.drivers can subclass this 72 test case to ensure compiliance with the DB-API. It is 73 expected that this TestCase may be expanded in the future 74 if ambiguities or edge conditions are discovered. 75 76 The 'Optional Extensions' are not yet being tested. 77 78 self.drivers should subclass this test, overriding setUp, tearDown, 79 self.driver, connect_args and connect_kw_args. Class specification 80 should be as follows: 81 82 import dbapi20 83 class mytest(dbapi20.DatabaseAPI20Test): 84 [...] 85 86 Don't 'import DatabaseAPI20Test from dbapi20', or you will 87 confuse the unit tester - just 'import dbapi20'. 88""" 89 90# The self.driver module. This should be the module where the 'connect' 91# method is to be found 92driver = pg8000 93table_prefix = "dbapi20test_" # If you need to specify a prefix for tables 94 95ddl1 = "create table %sbooze (name varchar(20))" % table_prefix 96ddl2 = "create table %sbarflys (name varchar(20))" % table_prefix 97xddl1 = "drop table %sbooze" % table_prefix 98xddl2 = "drop table %sbarflys" % table_prefix 99 100# Name of stored procedure to convert 101# string->lowercase 102lowerfunc = "lower" 103 104 105# Some drivers may need to override these helpers, for example adding 106# a 'commit' after the execute. 107def executeDDL1(cursor): 108 cursor.execute(ddl1) 109 110 111def executeDDL2(cursor): 112 cursor.execute(ddl2) 113 114 115@pytest.fixture 116def db(request, con): 117 def fin(): 118 with con.cursor() as cur: 119 for ddl in (xddl1, xddl2): 120 try: 121 cur.execute(ddl) 122 con.commit() 123 except driver.Error: 124 # Assume table didn't exist. Other tests will check if 125 # execute is busted. 126 pass 127 128 request.addfinalizer(fin) 129 return con 130 131 132def test_apilevel(): 133 # Must exist 134 apilevel = driver.apilevel 135 136 # Must equal 2.0 137 assert apilevel == "2.0" 138 139 140def test_threadsafety(): 141 try: 142 # Must exist 143 threadsafety = driver.threadsafety 144 # Must be a valid value 145 assert threadsafety in (0, 1, 2, 3) 146 except AttributeError: 147 assert False, "Driver doesn't define threadsafety" 148 149 150def test_paramstyle(): 151 try: 152 # Must exist 153 paramstyle = driver.paramstyle 154 # Must be a valid value 155 assert paramstyle in ("qmark", "numeric", "named", "format", "pyformat") 156 except AttributeError: 157 assert False, "Driver doesn't define paramstyle" 158 159 160def test_Exceptions(): 161 # Make sure required exceptions exist, and are in the 162 # defined heirarchy. 163 assert issubclass(driver.Warning, Exception) 164 assert issubclass(driver.Error, Exception) 165 assert issubclass(driver.InterfaceError, driver.Error) 166 assert issubclass(driver.DatabaseError, driver.Error) 167 assert issubclass(driver.OperationalError, driver.Error) 168 assert issubclass(driver.IntegrityError, driver.Error) 169 assert issubclass(driver.InternalError, driver.Error) 170 assert issubclass(driver.ProgrammingError, driver.Error) 171 assert issubclass(driver.NotSupportedError, driver.Error) 172 173 174def test_ExceptionsAsConnectionAttributes(con): 175 # OPTIONAL EXTENSION 176 # Test for the optional DB API 2.0 extension, where the exceptions 177 # are exposed as attributes on the Connection object 178 # I figure this optional extension will be implemented by any 179 # driver author who is using this test suite, so it is enabled 180 # by default. 181 warnings.simplefilter("ignore") 182 drv = driver 183 assert con.Warning is drv.Warning 184 assert con.Error is drv.Error 185 assert con.InterfaceError is drv.InterfaceError 186 assert con.DatabaseError is drv.DatabaseError 187 assert con.OperationalError is drv.OperationalError 188 assert con.IntegrityError is drv.IntegrityError 189 assert con.InternalError is drv.InternalError 190 assert con.ProgrammingError is drv.ProgrammingError 191 assert con.NotSupportedError is drv.NotSupportedError 192 warnings.resetwarnings() 193 194 195def test_commit(con): 196 # Commit must work, even if it doesn't do anything 197 con.commit() 198 199 200def test_rollback(con): 201 # If rollback is defined, it should either work or throw 202 # the documented exception 203 if hasattr(con, "rollback"): 204 try: 205 con.rollback() 206 except driver.NotSupportedError: 207 pass 208 209 210def test_cursor(con): 211 con.cursor() 212 213 214def test_cursor_isolation(con): 215 # Make sure cursors created from the same connection have 216 # the documented transaction isolation level 217 cur1 = con.cursor() 218 cur2 = con.cursor() 219 executeDDL1(cur1) 220 cur1.execute("insert into %sbooze values ('Victoria Bitter')" % (table_prefix)) 221 cur2.execute("select name from %sbooze" % table_prefix) 222 booze = cur2.fetchall() 223 assert len(booze) == 1 224 assert len(booze[0]) == 1 225 assert booze[0][0] == "Victoria Bitter" 226 227 228def test_description(con): 229 cur = con.cursor() 230 executeDDL1(cur) 231 assert cur.description is None, ( 232 "cursor.description should be none after executing a " 233 "statement that can return no rows (such as DDL)" 234 ) 235 cur.execute("select name from %sbooze" % table_prefix) 236 assert len(cur.description) == 1, "cursor.description describes too many columns" 237 assert ( 238 len(cur.description[0]) == 7 239 ), "cursor.description[x] tuples must have 7 elements" 240 assert ( 241 cur.description[0][0].lower() == "name" 242 ), "cursor.description[x][0] must return column name" 243 assert cur.description[0][1] == driver.STRING, ( 244 "cursor.description[x][1] must return column type. Got %r" 245 % cur.description[0][1] 246 ) 247 248 # Make sure self.description gets reset 249 executeDDL2(cur) 250 assert cur.description is None, ( 251 "cursor.description not being set to None when executing " 252 "no-result statements (eg. DDL)" 253 ) 254 255 256def test_rowcount(cursor): 257 executeDDL1(cursor) 258 assert cursor.rowcount == -1, ( 259 "cursor.rowcount should be -1 after executing no-result " "statements" 260 ) 261 cursor.execute("insert into %sbooze values ('Victoria Bitter')" % (table_prefix)) 262 assert cursor.rowcount in (-1, 1), ( 263 "cursor.rowcount should == number or rows inserted, or " 264 "set to -1 after executing an insert statement" 265 ) 266 cursor.execute("select name from %sbooze" % table_prefix) 267 assert cursor.rowcount in (-1, 1), ( 268 "cursor.rowcount should == number of rows returned, or " 269 "set to -1 after executing a select statement" 270 ) 271 executeDDL2(cursor) 272 assert cursor.rowcount == -1, ( 273 "cursor.rowcount not being reset to -1 after executing " "no-result statements" 274 ) 275 276 277def test_close(con): 278 cur = con.cursor() 279 con.close() 280 281 # cursor.execute should raise an Error if called after connection 282 # closed 283 with pytest.raises(driver.Error): 284 executeDDL1(cur) 285 286 # connection.commit should raise an Error if called after connection' 287 # closed.' 288 with pytest.raises(driver.Error): 289 con.commit() 290 291 # connection.close should raise an Error if called more than once 292 with pytest.raises(driver.Error): 293 con.close() 294 295 296def test_execute(con): 297 cur = con.cursor() 298 _paraminsert(cur) 299 300 301def _paraminsert(cur): 302 executeDDL1(cur) 303 cur.execute("insert into %sbooze values ('Victoria Bitter')" % (table_prefix)) 304 assert cur.rowcount in (-1, 1) 305 306 if driver.paramstyle == "qmark": 307 cur.execute("insert into %sbooze values (?)" % table_prefix, ("Cooper's",)) 308 elif driver.paramstyle == "numeric": 309 cur.execute("insert into %sbooze values (:1)" % table_prefix, ("Cooper's",)) 310 elif driver.paramstyle == "named": 311 cur.execute( 312 "insert into %sbooze values (:beer)" % table_prefix, {"beer": "Cooper's"} 313 ) 314 elif driver.paramstyle == "format": 315 cur.execute("insert into %sbooze values (%%s)" % table_prefix, ("Cooper's",)) 316 elif driver.paramstyle == "pyformat": 317 cur.execute( 318 "insert into %sbooze values (%%(beer)s)" % table_prefix, 319 {"beer": "Cooper's"}, 320 ) 321 else: 322 assert False, "Invalid paramstyle" 323 324 assert cur.rowcount in (-1, 1) 325 326 cur.execute("select name from %sbooze" % table_prefix) 327 res = cur.fetchall() 328 assert len(res) == 2, "cursor.fetchall returned too few rows" 329 beers = [res[0][0], res[1][0]] 330 beers.sort() 331 assert beers[0] == "Cooper's", ( 332 "cursor.fetchall retrieved incorrect data, or data inserted " "incorrectly" 333 ) 334 assert beers[1] == "Victoria Bitter", ( 335 "cursor.fetchall retrieved incorrect data, or data inserted " "incorrectly" 336 ) 337 338 339def test_executemany(cursor): 340 executeDDL1(cursor) 341 largs = [("Cooper's",), ("Boag's",)] 342 margs = [{"beer": "Cooper's"}, {"beer": "Boag's"}] 343 if driver.paramstyle == "qmark": 344 cursor.executemany("insert into %sbooze values (?)" % table_prefix, largs) 345 elif driver.paramstyle == "numeric": 346 cursor.executemany("insert into %sbooze values (:1)" % table_prefix, largs) 347 elif driver.paramstyle == "named": 348 cursor.executemany("insert into %sbooze values (:beer)" % table_prefix, margs) 349 elif driver.paramstyle == "format": 350 cursor.executemany("insert into %sbooze values (%%s)" % table_prefix, largs) 351 elif driver.paramstyle == "pyformat": 352 cursor.executemany( 353 "insert into %sbooze values (%%(beer)s)" % (table_prefix), margs 354 ) 355 else: 356 assert False, "Unknown paramstyle" 357 358 assert cursor.rowcount in (-1, 2), ( 359 "insert using cursor.executemany set cursor.rowcount to " 360 "incorrect value %r" % cursor.rowcount 361 ) 362 363 cursor.execute("select name from %sbooze" % table_prefix) 364 res = cursor.fetchall() 365 assert len(res) == 2, "cursor.fetchall retrieved incorrect number of rows" 366 beers = [res[0][0], res[1][0]] 367 beers.sort() 368 assert beers[0] == "Boag's", "incorrect data retrieved" 369 assert beers[1] == "Cooper's", "incorrect data retrieved" 370 371 372def test_fetchone(cursor): 373 # cursor.fetchone should raise an Error if called before 374 # executing a select-type query 375 with pytest.raises(driver.Error): 376 cursor.fetchone() 377 378 # cursor.fetchone should raise an Error if called after 379 # executing a query that cannnot return rows 380 executeDDL1(cursor) 381 with pytest.raises(driver.Error): 382 cursor.fetchone() 383 384 cursor.execute("select name from %sbooze" % table_prefix) 385 assert cursor.fetchone() is None, ( 386 "cursor.fetchone should return None if a query retrieves " "no rows" 387 ) 388 assert cursor.rowcount in (-1, 0) 389 390 # cursor.fetchone should raise an Error if called after 391 # executing a query that cannnot return rows 392 cursor.execute("insert into %sbooze values ('Victoria Bitter')" % (table_prefix)) 393 with pytest.raises(driver.Error): 394 cursor.fetchone() 395 396 cursor.execute("select name from %sbooze" % table_prefix) 397 r = cursor.fetchone() 398 assert len(r) == 1, "cursor.fetchone should have retrieved a single row" 399 assert r[0] == "Victoria Bitter", "cursor.fetchone retrieved incorrect data" 400 assert ( 401 cursor.fetchone() is None 402 ), "cursor.fetchone should return None if no more rows available" 403 assert cursor.rowcount in (-1, 1) 404 405 406samples = [ 407 "Carlton Cold", 408 "Carlton Draft", 409 "Mountain Goat", 410 "Redback", 411 "Victoria Bitter", 412 "XXXX", 413] 414 415 416def _populate(): 417 """Return a list of sql commands to setup the DB for the fetch 418 tests. 419 """ 420 populate = [ 421 "insert into %sbooze values ('%s')" % (table_prefix, s) for s in samples 422 ] 423 return populate 424 425 426def test_fetchmany(cursor): 427 # cursor.fetchmany should raise an Error if called without 428 # issuing a query 429 with pytest.raises(driver.Error): 430 cursor.fetchmany(4) 431 432 executeDDL1(cursor) 433 for sql in _populate(): 434 cursor.execute(sql) 435 436 cursor.execute("select name from %sbooze" % table_prefix) 437 r = cursor.fetchmany() 438 assert len(r) == 1, ( 439 "cursor.fetchmany retrieved incorrect number of rows, " 440 "default of arraysize is one." 441 ) 442 cursor.arraysize = 10 443 r = cursor.fetchmany(3) # Should get 3 rows 444 assert len(r) == 3, "cursor.fetchmany retrieved incorrect number of rows" 445 r = cursor.fetchmany(4) # Should get 2 more 446 assert len(r) == 2, "cursor.fetchmany retrieved incorrect number of rows" 447 r = cursor.fetchmany(4) # Should be an empty sequence 448 assert len(r) == 0, ( 449 "cursor.fetchmany should return an empty sequence after " 450 "results are exhausted" 451 ) 452 assert cursor.rowcount in (-1, 6) 453 454 # Same as above, using cursor.arraysize 455 cursor.arraysize = 4 456 cursor.execute("select name from %sbooze" % table_prefix) 457 r = cursor.fetchmany() # Should get 4 rows 458 assert len(r) == 4, "cursor.arraysize not being honoured by fetchmany" 459 r = cursor.fetchmany() # Should get 2 more 460 assert len(r) == 2 461 r = cursor.fetchmany() # Should be an empty sequence 462 assert len(r) == 0 463 assert cursor.rowcount in (-1, 6) 464 465 cursor.arraysize = 6 466 cursor.execute("select name from %sbooze" % table_prefix) 467 rows = cursor.fetchmany() # Should get all rows 468 assert cursor.rowcount in (-1, 6) 469 assert len(rows) == 6 470 assert len(rows) == 6 471 rows = [row[0] for row in rows] 472 rows.sort() 473 474 # Make sure we get the right data back out 475 for i in range(0, 6): 476 assert rows[i] == samples[i], "incorrect data retrieved by cursor.fetchmany" 477 478 rows = cursor.fetchmany() # Should return an empty list 479 assert len(rows) == 0, ( 480 "cursor.fetchmany should return an empty sequence if " 481 "called after the whole result set has been fetched" 482 ) 483 assert cursor.rowcount in (-1, 6) 484 485 executeDDL2(cursor) 486 cursor.execute("select name from %sbarflys" % table_prefix) 487 r = cursor.fetchmany() # Should get empty sequence 488 assert len(r) == 0, ( 489 "cursor.fetchmany should return an empty sequence if " "query retrieved no rows" 490 ) 491 assert cursor.rowcount in (-1, 0) 492 493 494def test_fetchall(cursor): 495 # cursor.fetchall should raise an Error if called 496 # without executing a query that may return rows (such 497 # as a select) 498 with pytest.raises(driver.Error): 499 cursor.fetchall() 500 501 executeDDL1(cursor) 502 for sql in _populate(): 503 cursor.execute(sql) 504 505 # cursor.fetchall should raise an Error if called 506 # after executing a a statement that cannot return rows 507 with pytest.raises(driver.Error): 508 cursor.fetchall() 509 510 cursor.execute("select name from %sbooze" % table_prefix) 511 rows = cursor.fetchall() 512 assert cursor.rowcount in (-1, len(samples)) 513 assert len(rows) == len(samples), "cursor.fetchall did not retrieve all rows" 514 rows = [r[0] for r in rows] 515 rows.sort() 516 for i in range(0, len(samples)): 517 assert rows[i] == samples[i], "cursor.fetchall retrieved incorrect rows" 518 rows = cursor.fetchall() 519 assert len(rows) == 0, ( 520 "cursor.fetchall should return an empty list if called " 521 "after the whole result set has been fetched" 522 ) 523 assert cursor.rowcount in (-1, len(samples)) 524 525 executeDDL2(cursor) 526 cursor.execute("select name from %sbarflys" % table_prefix) 527 rows = cursor.fetchall() 528 assert cursor.rowcount in (-1, 0) 529 assert len(rows) == 0, ( 530 "cursor.fetchall should return an empty list if " 531 "a select query returns no rows" 532 ) 533 534 535def test_mixedfetch(cursor): 536 executeDDL1(cursor) 537 for sql in _populate(): 538 cursor.execute(sql) 539 540 cursor.execute("select name from %sbooze" % table_prefix) 541 rows1 = cursor.fetchone() 542 rows23 = cursor.fetchmany(2) 543 rows4 = cursor.fetchone() 544 rows56 = cursor.fetchall() 545 assert cursor.rowcount in (-1, 6) 546 assert len(rows23) == 2, "fetchmany returned incorrect number of rows" 547 assert len(rows56) == 2, "fetchall returned incorrect number of rows" 548 549 rows = [rows1[0]] 550 rows.extend([rows23[0][0], rows23[1][0]]) 551 rows.append(rows4[0]) 552 rows.extend([rows56[0][0], rows56[1][0]]) 553 rows.sort() 554 for i in range(0, len(samples)): 555 assert rows[i] == samples[i], "incorrect data retrieved or inserted" 556 557 558def help_nextset_setUp(cur): 559 """Should create a procedure called deleteme 560 that returns two result sets, first the 561 number of rows in booze then "name from booze" 562 """ 563 raise NotImplementedError("Helper not implemented") 564 565 566def help_nextset_tearDown(cur): 567 "If cleaning up is needed after nextSetTest" 568 raise NotImplementedError("Helper not implemented") 569 570 571def test_nextset(cursor): 572 if not hasattr(cursor, "nextset"): 573 return 574 575 try: 576 executeDDL1(cursor) 577 sql = _populate() 578 for sql in _populate(): 579 cursor.execute(sql) 580 581 help_nextset_setUp(cursor) 582 583 cursor.callproc("deleteme") 584 numberofrows = cursor.fetchone() 585 assert numberofrows[0] == len(samples) 586 assert cursor.nextset() 587 names = cursor.fetchall() 588 assert len(names) == len(samples) 589 s = cursor.nextset() 590 assert s is None, "No more return sets, should return None" 591 finally: 592 help_nextset_tearDown(cursor) 593 594 595def test_arraysize(cursor): 596 # Not much here - rest of the tests for this are in test_fetchmany 597 assert hasattr(cursor, "arraysize"), "cursor.arraysize must be defined" 598 599 600def test_setinputsizes(cursor): 601 cursor.setinputsizes(25) 602 603 604def test_setoutputsize_basic(cursor): 605 # Basic test is to make sure setoutputsize doesn't blow up 606 cursor.setoutputsize(1000) 607 cursor.setoutputsize(2000, 0) 608 _paraminsert(cursor) # Make sure the cursor still works 609 610 611def test_None(cursor): 612 executeDDL1(cursor) 613 cursor.execute("insert into %sbooze values (NULL)" % table_prefix) 614 cursor.execute("select name from %sbooze" % table_prefix) 615 r = cursor.fetchall() 616 assert len(r) == 1 617 assert len(r[0]) == 1 618 assert r[0][0] is None, "NULL value not returned as None" 619 620 621def test_Date(): 622 driver.Date(2002, 12, 25) 623 driver.DateFromTicks(time.mktime((2002, 12, 25, 0, 0, 0, 0, 0, 0))) 624 # Can we assume this? API doesn't specify, but it seems implied 625 # self.assertEqual(str(d1),str(d2)) 626 627 628def test_Time(): 629 driver.Time(13, 45, 30) 630 driver.TimeFromTicks(time.mktime((2001, 1, 1, 13, 45, 30, 0, 0, 0))) 631 # Can we assume this? API doesn't specify, but it seems implied 632 # self.assertEqual(str(t1),str(t2)) 633 634 635def test_Timestamp(): 636 driver.Timestamp(2002, 12, 25, 13, 45, 30) 637 driver.TimestampFromTicks(time.mktime((2002, 12, 25, 13, 45, 30, 0, 0, 0))) 638 # Can we assume this? API doesn't specify, but it seems implied 639 # self.assertEqual(str(t1),str(t2)) 640 641 642def test_Binary(): 643 driver.Binary(b"Something") 644 driver.Binary(b"") 645 646 647def test_STRING(): 648 assert hasattr(driver, "STRING"), "module.STRING must be defined" 649 650 651def test_BINARY(): 652 assert hasattr(driver, "BINARY"), "module.BINARY must be defined." 653 654 655def test_NUMBER(): 656 assert hasattr(driver, "NUMBER"), "module.NUMBER must be defined." 657 658 659def test_DATETIME(): 660 assert hasattr(driver, "DATETIME"), "module.DATETIME must be defined." 661 662 663def test_ROWID(): 664 assert hasattr(driver, "ROWID"), "module.ROWID must be defined." 665