1import pytest
2
3from pg8000.native import DatabaseError, to_statement
4
5
6# Tests relating to the basic operation of the database driver, driven by the
7# pg8000 custom interface.
8
9
10@pytest.fixture
11def db_table(request, con):
12    con.run(
13        "CREATE TEMPORARY TABLE t1 (f1 int primary key, "
14        "f2 bigint not null, f3 varchar(50) null) "
15    )
16
17    def fin():
18        try:
19            con.run("drop table t1")
20        except DatabaseError:
21            pass
22
23    request.addfinalizer(fin)
24    return con
25
26
27def test_database_error(con):
28    with pytest.raises(DatabaseError):
29        con.run("INSERT INTO t99 VALUES (1, 2, 3)")
30
31
32# Run a query on a table, alter the structure of the table, then run the
33# original query again.
34
35
36def test_alter(db_table):
37    db_table.run("select * from t1")
38    db_table.run("alter table t1 drop column f3")
39    db_table.run("select * from t1")
40
41
42# Run a query on a table, drop then re-create the table, then run the
43# original query again.
44
45
46def test_create(db_table):
47    db_table.run("select * from t1")
48    db_table.run("drop table t1")
49    db_table.run("create temporary table t1 (f1 int primary key)")
50    db_table.run("select * from t1")
51
52
53def test_parametrized(db_table):
54    res = db_table.run("SELECT f1, f2, f3 FROM t1 WHERE f1 > :f1", f1=3)
55    for row in res:
56        f1, f2, f3 = row
57
58
59def test_insert_returning(db_table):
60    db_table.run("CREATE TEMPORARY TABLE t2 (id serial, data text)")
61
62    # Test INSERT ... RETURNING with one row...
63    res = db_table.run("INSERT INTO t2 (data) VALUES (:v) RETURNING id", v="test1")
64    row_id = res[0][0]
65    res = db_table.run("SELECT data FROM t2 WHERE id = :v", v=row_id)
66    assert "test1" == res[0][0]
67
68    assert db_table.row_count == 1
69
70    # Test with multiple rows...
71    res = db_table.run(
72        "INSERT INTO t2 (data) VALUES (:v1), (:v2), (:v3) " "RETURNING id",
73        v1="test2",
74        v2="test3",
75        v3="test4",
76    )
77    assert db_table.row_count == 3
78    ids = [x[0] for x in res]
79    assert len(ids) == 3
80
81
82def test_row_count_select(db_table):
83    expected_count = 57
84    for i in range(expected_count):
85        db_table.run(
86            "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=i, v2=i, v3=None
87        )
88
89    db_table.run("SELECT * FROM t1")
90
91    # Check row_count
92    assert expected_count == db_table.row_count
93
94    # Should be -1 for a command with no results
95    db_table.run("DROP TABLE t1")
96    assert -1 == db_table.row_count
97
98
99def test_row_count_delete(db_table):
100    db_table.run(
101        "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=1, v2=1, v3=None
102    )
103    db_table.run("DELETE FROM t1")
104    assert db_table.row_count == 1
105
106
107def test_row_count_update(db_table):
108    db_table.run(
109        "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=1, v2=1, v3=None
110    )
111    db_table.run(
112        "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=2, v2=10, v3=None
113    )
114    db_table.run(
115        "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=3, v2=100, v3=None
116    )
117    db_table.run(
118        "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=4, v2=1000, v3=None
119    )
120    db_table.run(
121        "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=5, v2=10000, v3=None
122    )
123    db_table.run("UPDATE t1 SET f3 = :v1 WHERE f2 > 101", v1="Hello!")
124    assert db_table.row_count == 2
125
126
127def test_int_oid(con):
128    # https://bugs.launchpad.net/pg8000/+bug/230796
129    con.run("SELECT typname FROM pg_type WHERE oid = :v", v=100)
130
131
132def test_unicode_query(con):
133    con.run(
134        "CREATE TEMPORARY TABLE \u043c\u0435\u0441\u0442\u043e "
135        "(\u0438\u043c\u044f VARCHAR(50), "
136        "\u0430\u0434\u0440\u0435\u0441 VARCHAR(250))"
137    )
138
139
140def test_transactions(db_table):
141    db_table.run("start transaction")
142    db_table.run(
143        "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=1, v2=1, v3="Zombie"
144    )
145    db_table.run("rollback")
146    db_table.run("select * from t1")
147
148    assert db_table.row_count == 0
149
150
151def test_in(con):
152    ret = con.run("SELECT typname FROM pg_type WHERE oid = any(:v)", v=[16, 23])
153    assert ret[0][0] == "bool"
154
155
156# An empty query should raise a ProgrammingError
157def test_empty_query(con):
158    with pytest.raises(DatabaseError):
159        con.run("")
160
161
162def test_rollback_no_transaction(con):
163    # Remove any existing notices
164    con.notices.clear()
165
166    # First, verify that a raw rollback does produce a notice
167    con.run("rollback")
168
169    assert 1 == len(con.notices)
170
171    # 25P01 is the code for no_active_sql_tronsaction. It has
172    # a message and severity name, but those might be
173    # localized/depend on the server version.
174    assert con.notices.pop().get(b"C") == b"25P01"
175
176
177def test_close_prepared_statement(con):
178    ps = con.prepare("select 1")
179    ps.run()
180    res = con.run("select count(*) from pg_prepared_statements")
181    assert res[0][0] == 1  # Should have one prepared statement
182
183    ps.close()
184
185    res = con.run("select count(*) from pg_prepared_statements")
186    assert res[0][0] == 0  # Should have no prepared statements
187
188
189def test_no_data(con):
190    assert con.run("START TRANSACTION") is None
191
192
193def test_multiple_statements(con):
194    statements = "SELECT 5; SELECT 'Erich Fromm';"
195    assert con.run(statements) == [[5], ["Erich Fromm"]]
196
197
198def test_unexecuted_connection_row_count(con):
199    assert con.row_count is None
200
201
202def test_unexecuted_connection_columns(con):
203    assert con.columns is None
204
205
206def test_sql_prepared_statement(con):
207    con.run("PREPARE gen_series AS SELECT generate_series(1, 10);")
208    con.run("EXECUTE gen_series")
209
210
211def test_to_statement():
212    new_query, _ = to_statement(
213        "SELECT sum(x)::decimal(5, 2) :f_2, :f1 FROM t WHERE a=:f_2"
214    )
215    expected = "SELECT sum(x)::decimal(5, 2) $1, $2 FROM t WHERE a=$1"
216    assert new_query == expected
217
218
219def test_not_parsed_if_no_params(mocker, con):
220    mock_to_statement = mocker.patch("pg8000.native.to_statement")
221    con.run("ROLLBACK")
222    mock_to_statement.assert_not_called()
223