1import pytest 2 3from pg8000.native import DatabaseError, to_statement 4 5 6# Tests relating to the basic operation of the database driver, driven by the 7# pg8000 custom interface. 8 9 10@pytest.fixture 11def db_table(request, con): 12 con.run( 13 "CREATE TEMPORARY TABLE t1 (f1 int primary key, " 14 "f2 bigint not null, f3 varchar(50) null) " 15 ) 16 17 def fin(): 18 try: 19 con.run("drop table t1") 20 except DatabaseError: 21 pass 22 23 request.addfinalizer(fin) 24 return con 25 26 27def test_database_error(con): 28 with pytest.raises(DatabaseError): 29 con.run("INSERT INTO t99 VALUES (1, 2, 3)") 30 31 32# Run a query on a table, alter the structure of the table, then run the 33# original query again. 34 35 36def test_alter(db_table): 37 db_table.run("select * from t1") 38 db_table.run("alter table t1 drop column f3") 39 db_table.run("select * from t1") 40 41 42# Run a query on a table, drop then re-create the table, then run the 43# original query again. 44 45 46def test_create(db_table): 47 db_table.run("select * from t1") 48 db_table.run("drop table t1") 49 db_table.run("create temporary table t1 (f1 int primary key)") 50 db_table.run("select * from t1") 51 52 53def test_parametrized(db_table): 54 res = db_table.run("SELECT f1, f2, f3 FROM t1 WHERE f1 > :f1", f1=3) 55 for row in res: 56 f1, f2, f3 = row 57 58 59def test_insert_returning(db_table): 60 db_table.run("CREATE TEMPORARY TABLE t2 (id serial, data text)") 61 62 # Test INSERT ... RETURNING with one row... 63 res = db_table.run("INSERT INTO t2 (data) VALUES (:v) RETURNING id", v="test1") 64 row_id = res[0][0] 65 res = db_table.run("SELECT data FROM t2 WHERE id = :v", v=row_id) 66 assert "test1" == res[0][0] 67 68 assert db_table.row_count == 1 69 70 # Test with multiple rows... 71 res = db_table.run( 72 "INSERT INTO t2 (data) VALUES (:v1), (:v2), (:v3) " "RETURNING id", 73 v1="test2", 74 v2="test3", 75 v3="test4", 76 ) 77 assert db_table.row_count == 3 78 ids = [x[0] for x in res] 79 assert len(ids) == 3 80 81 82def test_row_count_select(db_table): 83 expected_count = 57 84 for i in range(expected_count): 85 db_table.run( 86 "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=i, v2=i, v3=None 87 ) 88 89 db_table.run("SELECT * FROM t1") 90 91 # Check row_count 92 assert expected_count == db_table.row_count 93 94 # Should be -1 for a command with no results 95 db_table.run("DROP TABLE t1") 96 assert -1 == db_table.row_count 97 98 99def test_row_count_delete(db_table): 100 db_table.run( 101 "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=1, v2=1, v3=None 102 ) 103 db_table.run("DELETE FROM t1") 104 assert db_table.row_count == 1 105 106 107def test_row_count_update(db_table): 108 db_table.run( 109 "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=1, v2=1, v3=None 110 ) 111 db_table.run( 112 "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=2, v2=10, v3=None 113 ) 114 db_table.run( 115 "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=3, v2=100, v3=None 116 ) 117 db_table.run( 118 "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=4, v2=1000, v3=None 119 ) 120 db_table.run( 121 "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=5, v2=10000, v3=None 122 ) 123 db_table.run("UPDATE t1 SET f3 = :v1 WHERE f2 > 101", v1="Hello!") 124 assert db_table.row_count == 2 125 126 127def test_int_oid(con): 128 # https://bugs.launchpad.net/pg8000/+bug/230796 129 con.run("SELECT typname FROM pg_type WHERE oid = :v", v=100) 130 131 132def test_unicode_query(con): 133 con.run( 134 "CREATE TEMPORARY TABLE \u043c\u0435\u0441\u0442\u043e " 135 "(\u0438\u043c\u044f VARCHAR(50), " 136 "\u0430\u0434\u0440\u0435\u0441 VARCHAR(250))" 137 ) 138 139 140def test_transactions(db_table): 141 db_table.run("start transaction") 142 db_table.run( 143 "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=1, v2=1, v3="Zombie" 144 ) 145 db_table.run("rollback") 146 db_table.run("select * from t1") 147 148 assert db_table.row_count == 0 149 150 151def test_in(con): 152 ret = con.run("SELECT typname FROM pg_type WHERE oid = any(:v)", v=[16, 23]) 153 assert ret[0][0] == "bool" 154 155 156# An empty query should raise a ProgrammingError 157def test_empty_query(con): 158 with pytest.raises(DatabaseError): 159 con.run("") 160 161 162def test_rollback_no_transaction(con): 163 # Remove any existing notices 164 con.notices.clear() 165 166 # First, verify that a raw rollback does produce a notice 167 con.run("rollback") 168 169 assert 1 == len(con.notices) 170 171 # 25P01 is the code for no_active_sql_tronsaction. It has 172 # a message and severity name, but those might be 173 # localized/depend on the server version. 174 assert con.notices.pop().get(b"C") == b"25P01" 175 176 177def test_close_prepared_statement(con): 178 ps = con.prepare("select 1") 179 ps.run() 180 res = con.run("select count(*) from pg_prepared_statements") 181 assert res[0][0] == 1 # Should have one prepared statement 182 183 ps.close() 184 185 res = con.run("select count(*) from pg_prepared_statements") 186 assert res[0][0] == 0 # Should have no prepared statements 187 188 189def test_no_data(con): 190 assert con.run("START TRANSACTION") is None 191 192 193def test_multiple_statements(con): 194 statements = "SELECT 5; SELECT 'Erich Fromm';" 195 assert con.run(statements) == [[5], ["Erich Fromm"]] 196 197 198def test_unexecuted_connection_row_count(con): 199 assert con.row_count is None 200 201 202def test_unexecuted_connection_columns(con): 203 assert con.columns is None 204 205 206def test_sql_prepared_statement(con): 207 con.run("PREPARE gen_series AS SELECT generate_series(1, 10);") 208 con.run("EXECUTE gen_series") 209 210 211def test_to_statement(): 212 new_query, _ = to_statement( 213 "SELECT sum(x)::decimal(5, 2) :f_2, :f1 FROM t WHERE a=:f_2" 214 ) 215 expected = "SELECT sum(x)::decimal(5, 2) $1, $2 FROM t WHERE a=$1" 216 assert new_query == expected 217 218 219def test_not_parsed_if_no_params(mocker, con): 220 mock_to_statement = mocker.patch("pg8000.native.to_statement") 221 con.run("ROLLBACK") 222 mock_to_statement.assert_not_called() 223