1import time
2import warnings
3
4import pytest
5
6import pg8000
7
8""" Python DB API 2.0 driver compliance unit test suite.
9
10    This software is Public Domain and may be used without restrictions.
11
12 "Now we have booze and barflies entering the discussion, plus rumours of
13  DBAs on drugs... and I won't tell you what flashes through my mind each
14  time I read the subject line with 'Anal Compliance' in it.  All around
15  this is turning out to be a thoroughly unwholesome unit test."
16
17    -- Ian Bicking
18"""
19
20__rcs_id__ = "$Id: dbapi20.py,v 1.10 2003/10/09 03:14:14 zenzen Exp $"
21__version__ = "$Revision: 1.10 $"[11:-2]
22__author__ = "Stuart Bishop <zen@shangri-la.dropbear.id.au>"
23
24
25# $Log: dbapi20.py,v $
26# Revision 1.10  2003/10/09 03:14:14  zenzen
27# Add test for DB API 2.0 optional extension, where database exceptions
28# are exposed as attributes on the Connection object.
29#
30# Revision 1.9  2003/08/13 01:16:36  zenzen
31# Minor tweak from Stefan Fleiter
32#
33# Revision 1.8  2003/04/10 00:13:25  zenzen
34# Changes, as per suggestions by M.-A. Lemburg
35# - Add a table prefix, to ensure namespace collisions can always be avoided
36#
37# Revision 1.7  2003/02/26 23:33:37  zenzen
38# Break out DDL into helper functions, as per request by David Rushby
39#
40# Revision 1.6  2003/02/21 03:04:33  zenzen
41# Stuff from Henrik Ekelund:
42#     added test_None
43#     added test_nextset & hooks
44#
45# Revision 1.5  2003/02/17 22:08:43  zenzen
46# Implement suggestions and code from Henrik Eklund - test that
47# cursor.arraysize defaults to 1 & generic cursor.callproc test added
48#
49# Revision 1.4  2003/02/15 00:16:33  zenzen
50# Changes, as per suggestions and bug reports by M.-A. Lemburg,
51# Matthew T. Kromer, Federico Di Gregorio and Daniel Dittmar
52# - Class renamed
53# - Now a subclass of TestCase, to avoid requiring the driver stub
54#   to use multiple inheritance
55# - Reversed the polarity of buggy test in test_description
56# - Test exception heirarchy correctly
57# - self.populate is now self._populate(), so if a driver stub
58#   overrides self.ddl1 this change propogates
59# - VARCHAR columns now have a width, which will hopefully make the
60#   DDL even more portible (this will be reversed if it causes more problems)
61# - cursor.rowcount being checked after various execute and fetchXXX methods
62# - Check for fetchall and fetchmany returning empty lists after results
63#   are exhausted (already checking for empty lists if select retrieved
64#   nothing
65# - Fix bugs in test_setoutputsize_basic and test_setinputsizes
66#
67
68
69""" Test a database self.driver for DB API 2.0 compatibility.
70    This implementation tests Gadfly, but the TestCase
71    is structured so that other self.drivers can subclass this
72    test case to ensure compiliance with the DB-API. It is
73    expected that this TestCase may be expanded in the future
74    if ambiguities or edge conditions are discovered.
75
76    The 'Optional Extensions' are not yet being tested.
77
78    self.drivers should subclass this test, overriding setUp, tearDown,
79    self.driver, connect_args and connect_kw_args. Class specification
80    should be as follows:
81
82    import dbapi20
83    class mytest(dbapi20.DatabaseAPI20Test):
84       [...]
85
86    Don't 'import DatabaseAPI20Test from dbapi20', or you will
87    confuse the unit tester - just 'import dbapi20'.
88"""
89
90# The self.driver module. This should be the module where the 'connect'
91# method is to be found
92driver = pg8000
93table_prefix = "dbapi20test_"  # If you need to specify a prefix for tables
94
95ddl1 = "create table %sbooze (name varchar(20))" % table_prefix
96ddl2 = "create table %sbarflys (name varchar(20))" % table_prefix
97xddl1 = "drop table %sbooze" % table_prefix
98xddl2 = "drop table %sbarflys" % table_prefix
99
100# Name of stored procedure to convert
101# string->lowercase
102lowerfunc = "lower"
103
104
105# Some drivers may need to override these helpers, for example adding
106# a 'commit' after the execute.
107def executeDDL1(cursor):
108    cursor.execute(ddl1)
109
110
111def executeDDL2(cursor):
112    cursor.execute(ddl2)
113
114
115@pytest.fixture
116def db(request, con):
117    def fin():
118        with con.cursor() as cur:
119            for ddl in (xddl1, xddl2):
120                try:
121                    cur.execute(ddl)
122                    con.commit()
123                except driver.Error:
124                    # Assume table didn't exist. Other tests will check if
125                    # execute is busted.
126                    pass
127
128    request.addfinalizer(fin)
129    return con
130
131
132def test_apilevel():
133    # Must exist
134    apilevel = driver.apilevel
135
136    # Must equal 2.0
137    assert apilevel == "2.0"
138
139
140def test_threadsafety():
141    try:
142        # Must exist
143        threadsafety = driver.threadsafety
144        # Must be a valid value
145        assert threadsafety in (0, 1, 2, 3)
146    except AttributeError:
147        assert False, "Driver doesn't define threadsafety"
148
149
150def test_paramstyle():
151    try:
152        # Must exist
153        paramstyle = driver.paramstyle
154        # Must be a valid value
155        assert paramstyle in ("qmark", "numeric", "named", "format", "pyformat")
156    except AttributeError:
157        assert False, "Driver doesn't define paramstyle"
158
159
160def test_Exceptions():
161    # Make sure required exceptions exist, and are in the
162    # defined heirarchy.
163    assert issubclass(driver.Warning, Exception)
164    assert issubclass(driver.Error, Exception)
165    assert issubclass(driver.InterfaceError, driver.Error)
166    assert issubclass(driver.DatabaseError, driver.Error)
167    assert issubclass(driver.OperationalError, driver.Error)
168    assert issubclass(driver.IntegrityError, driver.Error)
169    assert issubclass(driver.InternalError, driver.Error)
170    assert issubclass(driver.ProgrammingError, driver.Error)
171    assert issubclass(driver.NotSupportedError, driver.Error)
172
173
174def test_ExceptionsAsConnectionAttributes(con):
175    # OPTIONAL EXTENSION
176    # Test for the optional DB API 2.0 extension, where the exceptions
177    # are exposed as attributes on the Connection object
178    # I figure this optional extension will be implemented by any
179    # driver author who is using this test suite, so it is enabled
180    # by default.
181    warnings.simplefilter("ignore")
182    drv = driver
183    assert con.Warning is drv.Warning
184    assert con.Error is drv.Error
185    assert con.InterfaceError is drv.InterfaceError
186    assert con.DatabaseError is drv.DatabaseError
187    assert con.OperationalError is drv.OperationalError
188    assert con.IntegrityError is drv.IntegrityError
189    assert con.InternalError is drv.InternalError
190    assert con.ProgrammingError is drv.ProgrammingError
191    assert con.NotSupportedError is drv.NotSupportedError
192    warnings.resetwarnings()
193
194
195def test_commit(con):
196    # Commit must work, even if it doesn't do anything
197    con.commit()
198
199
200def test_rollback(con):
201    # If rollback is defined, it should either work or throw
202    # the documented exception
203    if hasattr(con, "rollback"):
204        try:
205            con.rollback()
206        except driver.NotSupportedError:
207            pass
208
209
210def test_cursor(con):
211    con.cursor()
212
213
214def test_cursor_isolation(con):
215    # Make sure cursors created from the same connection have
216    # the documented transaction isolation level
217    cur1 = con.cursor()
218    cur2 = con.cursor()
219    executeDDL1(cur1)
220    cur1.execute("insert into %sbooze values ('Victoria Bitter')" % (table_prefix))
221    cur2.execute("select name from %sbooze" % table_prefix)
222    booze = cur2.fetchall()
223    assert len(booze) == 1
224    assert len(booze[0]) == 1
225    assert booze[0][0] == "Victoria Bitter"
226
227
228def test_description(con):
229    cur = con.cursor()
230    executeDDL1(cur)
231    assert cur.description is None, (
232        "cursor.description should be none after executing a "
233        "statement that can return no rows (such as DDL)"
234    )
235    cur.execute("select name from %sbooze" % table_prefix)
236    assert len(cur.description) == 1, "cursor.description describes too many columns"
237    assert (
238        len(cur.description[0]) == 7
239    ), "cursor.description[x] tuples must have 7 elements"
240    assert (
241        cur.description[0][0].lower() == "name"
242    ), "cursor.description[x][0] must return column name"
243    assert cur.description[0][1] == driver.STRING, (
244        "cursor.description[x][1] must return column type. Got %r"
245        % cur.description[0][1]
246    )
247
248    # Make sure self.description gets reset
249    executeDDL2(cur)
250    assert cur.description is None, (
251        "cursor.description not being set to None when executing "
252        "no-result statements (eg. DDL)"
253    )
254
255
256def test_rowcount(cursor):
257    executeDDL1(cursor)
258    assert cursor.rowcount == -1, (
259        "cursor.rowcount should be -1 after executing no-result " "statements"
260    )
261    cursor.execute("insert into %sbooze values ('Victoria Bitter')" % (table_prefix))
262    assert cursor.rowcount in (-1, 1), (
263        "cursor.rowcount should == number or rows inserted, or "
264        "set to -1 after executing an insert statement"
265    )
266    cursor.execute("select name from %sbooze" % table_prefix)
267    assert cursor.rowcount in (-1, 1), (
268        "cursor.rowcount should == number of rows returned, or "
269        "set to -1 after executing a select statement"
270    )
271    executeDDL2(cursor)
272    assert cursor.rowcount == -1, (
273        "cursor.rowcount not being reset to -1 after executing " "no-result statements"
274    )
275
276
277def test_close(con):
278    cur = con.cursor()
279    con.close()
280
281    # cursor.execute should raise an Error if called after connection
282    # closed
283    with pytest.raises(driver.Error):
284        executeDDL1(cur)
285
286    # connection.commit should raise an Error if called after connection'
287    # closed.'
288    with pytest.raises(driver.Error):
289        con.commit()
290
291    # connection.close should raise an Error if called more than once
292    with pytest.raises(driver.Error):
293        con.close()
294
295
296def test_execute(con):
297    cur = con.cursor()
298    _paraminsert(cur)
299
300
301def _paraminsert(cur):
302    executeDDL1(cur)
303    cur.execute("insert into %sbooze values ('Victoria Bitter')" % (table_prefix))
304    assert cur.rowcount in (-1, 1)
305
306    if driver.paramstyle == "qmark":
307        cur.execute("insert into %sbooze values (?)" % table_prefix, ("Cooper's",))
308    elif driver.paramstyle == "numeric":
309        cur.execute("insert into %sbooze values (:1)" % table_prefix, ("Cooper's",))
310    elif driver.paramstyle == "named":
311        cur.execute(
312            "insert into %sbooze values (:beer)" % table_prefix, {"beer": "Cooper's"}
313        )
314    elif driver.paramstyle == "format":
315        cur.execute("insert into %sbooze values (%%s)" % table_prefix, ("Cooper's",))
316    elif driver.paramstyle == "pyformat":
317        cur.execute(
318            "insert into %sbooze values (%%(beer)s)" % table_prefix,
319            {"beer": "Cooper's"},
320        )
321    else:
322        assert False, "Invalid paramstyle"
323
324    assert cur.rowcount in (-1, 1)
325
326    cur.execute("select name from %sbooze" % table_prefix)
327    res = cur.fetchall()
328    assert len(res) == 2, "cursor.fetchall returned too few rows"
329    beers = [res[0][0], res[1][0]]
330    beers.sort()
331    assert beers[0] == "Cooper's", (
332        "cursor.fetchall retrieved incorrect data, or data inserted " "incorrectly"
333    )
334    assert beers[1] == "Victoria Bitter", (
335        "cursor.fetchall retrieved incorrect data, or data inserted " "incorrectly"
336    )
337
338
339def test_executemany(cursor):
340    executeDDL1(cursor)
341    largs = [("Cooper's",), ("Boag's",)]
342    margs = [{"beer": "Cooper's"}, {"beer": "Boag's"}]
343    if driver.paramstyle == "qmark":
344        cursor.executemany("insert into %sbooze values (?)" % table_prefix, largs)
345    elif driver.paramstyle == "numeric":
346        cursor.executemany("insert into %sbooze values (:1)" % table_prefix, largs)
347    elif driver.paramstyle == "named":
348        cursor.executemany("insert into %sbooze values (:beer)" % table_prefix, margs)
349    elif driver.paramstyle == "format":
350        cursor.executemany("insert into %sbooze values (%%s)" % table_prefix, largs)
351    elif driver.paramstyle == "pyformat":
352        cursor.executemany(
353            "insert into %sbooze values (%%(beer)s)" % (table_prefix), margs
354        )
355    else:
356        assert False, "Unknown paramstyle"
357
358    assert cursor.rowcount in (-1, 2), (
359        "insert using cursor.executemany set cursor.rowcount to "
360        "incorrect value %r" % cursor.rowcount
361    )
362
363    cursor.execute("select name from %sbooze" % table_prefix)
364    res = cursor.fetchall()
365    assert len(res) == 2, "cursor.fetchall retrieved incorrect number of rows"
366    beers = [res[0][0], res[1][0]]
367    beers.sort()
368    assert beers[0] == "Boag's", "incorrect data retrieved"
369    assert beers[1] == "Cooper's", "incorrect data retrieved"
370
371
372def test_fetchone(cursor):
373    # cursor.fetchone should raise an Error if called before
374    # executing a select-type query
375    with pytest.raises(driver.Error):
376        cursor.fetchone()
377
378    # cursor.fetchone should raise an Error if called after
379    # executing a query that cannnot return rows
380    executeDDL1(cursor)
381    with pytest.raises(driver.Error):
382        cursor.fetchone()
383
384    cursor.execute("select name from %sbooze" % table_prefix)
385    assert cursor.fetchone() is None, (
386        "cursor.fetchone should return None if a query retrieves " "no rows"
387    )
388    assert cursor.rowcount in (-1, 0)
389
390    # cursor.fetchone should raise an Error if called after
391    # executing a query that cannnot return rows
392    cursor.execute("insert into %sbooze values ('Victoria Bitter')" % (table_prefix))
393    with pytest.raises(driver.Error):
394        cursor.fetchone()
395
396    cursor.execute("select name from %sbooze" % table_prefix)
397    r = cursor.fetchone()
398    assert len(r) == 1, "cursor.fetchone should have retrieved a single row"
399    assert r[0] == "Victoria Bitter", "cursor.fetchone retrieved incorrect data"
400    assert (
401        cursor.fetchone() is None
402    ), "cursor.fetchone should return None if no more rows available"
403    assert cursor.rowcount in (-1, 1)
404
405
406samples = [
407    "Carlton Cold",
408    "Carlton Draft",
409    "Mountain Goat",
410    "Redback",
411    "Victoria Bitter",
412    "XXXX",
413]
414
415
416def _populate():
417    """Return a list of sql commands to setup the DB for the fetch
418    tests.
419    """
420    populate = [
421        "insert into %sbooze values ('%s')" % (table_prefix, s) for s in samples
422    ]
423    return populate
424
425
426def test_fetchmany(cursor):
427    # cursor.fetchmany should raise an Error if called without
428    # issuing a query
429    with pytest.raises(driver.Error):
430        cursor.fetchmany(4)
431
432    executeDDL1(cursor)
433    for sql in _populate():
434        cursor.execute(sql)
435
436    cursor.execute("select name from %sbooze" % table_prefix)
437    r = cursor.fetchmany()
438    assert len(r) == 1, (
439        "cursor.fetchmany retrieved incorrect number of rows, "
440        "default of arraysize is one."
441    )
442    cursor.arraysize = 10
443    r = cursor.fetchmany(3)  # Should get 3 rows
444    assert len(r) == 3, "cursor.fetchmany retrieved incorrect number of rows"
445    r = cursor.fetchmany(4)  # Should get 2 more
446    assert len(r) == 2, "cursor.fetchmany retrieved incorrect number of rows"
447    r = cursor.fetchmany(4)  # Should be an empty sequence
448    assert len(r) == 0, (
449        "cursor.fetchmany should return an empty sequence after "
450        "results are exhausted"
451    )
452    assert cursor.rowcount in (-1, 6)
453
454    # Same as above, using cursor.arraysize
455    cursor.arraysize = 4
456    cursor.execute("select name from %sbooze" % table_prefix)
457    r = cursor.fetchmany()  # Should get 4 rows
458    assert len(r) == 4, "cursor.arraysize not being honoured by fetchmany"
459    r = cursor.fetchmany()  # Should get 2 more
460    assert len(r) == 2
461    r = cursor.fetchmany()  # Should be an empty sequence
462    assert len(r) == 0
463    assert cursor.rowcount in (-1, 6)
464
465    cursor.arraysize = 6
466    cursor.execute("select name from %sbooze" % table_prefix)
467    rows = cursor.fetchmany()  # Should get all rows
468    assert cursor.rowcount in (-1, 6)
469    assert len(rows) == 6
470    assert len(rows) == 6
471    rows = [row[0] for row in rows]
472    rows.sort()
473
474    # Make sure we get the right data back out
475    for i in range(0, 6):
476        assert rows[i] == samples[i], "incorrect data retrieved by cursor.fetchmany"
477
478    rows = cursor.fetchmany()  # Should return an empty list
479    assert len(rows) == 0, (
480        "cursor.fetchmany should return an empty sequence if "
481        "called after the whole result set has been fetched"
482    )
483    assert cursor.rowcount in (-1, 6)
484
485    executeDDL2(cursor)
486    cursor.execute("select name from %sbarflys" % table_prefix)
487    r = cursor.fetchmany()  # Should get empty sequence
488    assert len(r) == 0, (
489        "cursor.fetchmany should return an empty sequence if " "query retrieved no rows"
490    )
491    assert cursor.rowcount in (-1, 0)
492
493
494def test_fetchall(cursor):
495    # cursor.fetchall should raise an Error if called
496    # without executing a query that may return rows (such
497    # as a select)
498    with pytest.raises(driver.Error):
499        cursor.fetchall()
500
501    executeDDL1(cursor)
502    for sql in _populate():
503        cursor.execute(sql)
504
505    # cursor.fetchall should raise an Error if called
506    # after executing a a statement that cannot return rows
507    with pytest.raises(driver.Error):
508        cursor.fetchall()
509
510    cursor.execute("select name from %sbooze" % table_prefix)
511    rows = cursor.fetchall()
512    assert cursor.rowcount in (-1, len(samples))
513    assert len(rows) == len(samples), "cursor.fetchall did not retrieve all rows"
514    rows = [r[0] for r in rows]
515    rows.sort()
516    for i in range(0, len(samples)):
517        assert rows[i] == samples[i], "cursor.fetchall retrieved incorrect rows"
518    rows = cursor.fetchall()
519    assert len(rows) == 0, (
520        "cursor.fetchall should return an empty list if called "
521        "after the whole result set has been fetched"
522    )
523    assert cursor.rowcount in (-1, len(samples))
524
525    executeDDL2(cursor)
526    cursor.execute("select name from %sbarflys" % table_prefix)
527    rows = cursor.fetchall()
528    assert cursor.rowcount in (-1, 0)
529    assert len(rows) == 0, (
530        "cursor.fetchall should return an empty list if "
531        "a select query returns no rows"
532    )
533
534
535def test_mixedfetch(cursor):
536    executeDDL1(cursor)
537    for sql in _populate():
538        cursor.execute(sql)
539
540    cursor.execute("select name from %sbooze" % table_prefix)
541    rows1 = cursor.fetchone()
542    rows23 = cursor.fetchmany(2)
543    rows4 = cursor.fetchone()
544    rows56 = cursor.fetchall()
545    assert cursor.rowcount in (-1, 6)
546    assert len(rows23) == 2, "fetchmany returned incorrect number of rows"
547    assert len(rows56) == 2, "fetchall returned incorrect number of rows"
548
549    rows = [rows1[0]]
550    rows.extend([rows23[0][0], rows23[1][0]])
551    rows.append(rows4[0])
552    rows.extend([rows56[0][0], rows56[1][0]])
553    rows.sort()
554    for i in range(0, len(samples)):
555        assert rows[i] == samples[i], "incorrect data retrieved or inserted"
556
557
558def help_nextset_setUp(cur):
559    """Should create a procedure called deleteme
560    that returns two result sets, first the
561    number of rows in booze then "name from booze"
562    """
563    raise NotImplementedError("Helper not implemented")
564
565
566def help_nextset_tearDown(cur):
567    "If cleaning up is needed after nextSetTest"
568    raise NotImplementedError("Helper not implemented")
569
570
571def test_nextset(cursor):
572    if not hasattr(cursor, "nextset"):
573        return
574
575    try:
576        executeDDL1(cursor)
577        sql = _populate()
578        for sql in _populate():
579            cursor.execute(sql)
580
581        help_nextset_setUp(cursor)
582
583        cursor.callproc("deleteme")
584        numberofrows = cursor.fetchone()
585        assert numberofrows[0] == len(samples)
586        assert cursor.nextset()
587        names = cursor.fetchall()
588        assert len(names) == len(samples)
589        s = cursor.nextset()
590        assert s is None, "No more return sets, should return None"
591    finally:
592        help_nextset_tearDown(cursor)
593
594
595def test_arraysize(cursor):
596    # Not much here - rest of the tests for this are in test_fetchmany
597    assert hasattr(cursor, "arraysize"), "cursor.arraysize must be defined"
598
599
600def test_setinputsizes(cursor):
601    cursor.setinputsizes(25)
602
603
604def test_setoutputsize_basic(cursor):
605    # Basic test is to make sure setoutputsize doesn't blow up
606    cursor.setoutputsize(1000)
607    cursor.setoutputsize(2000, 0)
608    _paraminsert(cursor)  # Make sure the cursor still works
609
610
611def test_None(cursor):
612    executeDDL1(cursor)
613    cursor.execute("insert into %sbooze values (NULL)" % table_prefix)
614    cursor.execute("select name from %sbooze" % table_prefix)
615    r = cursor.fetchall()
616    assert len(r) == 1
617    assert len(r[0]) == 1
618    assert r[0][0] is None, "NULL value not returned as None"
619
620
621def test_Date():
622    driver.Date(2002, 12, 25)
623    driver.DateFromTicks(time.mktime((2002, 12, 25, 0, 0, 0, 0, 0, 0)))
624    # Can we assume this? API doesn't specify, but it seems implied
625    # self.assertEqual(str(d1),str(d2))
626
627
628def test_Time():
629    driver.Time(13, 45, 30)
630    driver.TimeFromTicks(time.mktime((2001, 1, 1, 13, 45, 30, 0, 0, 0)))
631    # Can we assume this? API doesn't specify, but it seems implied
632    # self.assertEqual(str(t1),str(t2))
633
634
635def test_Timestamp():
636    driver.Timestamp(2002, 12, 25, 13, 45, 30)
637    driver.TimestampFromTicks(time.mktime((2002, 12, 25, 13, 45, 30, 0, 0, 0)))
638    # Can we assume this? API doesn't specify, but it seems implied
639    # self.assertEqual(str(t1),str(t2))
640
641
642def test_Binary():
643    driver.Binary(b"Something")
644    driver.Binary(b"")
645
646
647def test_STRING():
648    assert hasattr(driver, "STRING"), "module.STRING must be defined"
649
650
651def test_BINARY():
652    assert hasattr(driver, "BINARY"), "module.BINARY must be defined."
653
654
655def test_NUMBER():
656    assert hasattr(driver, "NUMBER"), "module.NUMBER must be defined."
657
658
659def test_DATETIME():
660    assert hasattr(driver, "DATETIME"), "module.DATETIME must be defined."
661
662
663def test_ROWID():
664    assert hasattr(driver, "ROWID"), "module.ROWID must be defined."
665