1from datetime import time as Time
2
3import pytest
4
5from pg8000.native import Connection, DatabaseError, InterfaceError
6
7
8def test_unix_socket_missing():
9    conn_params = {"unix_sock": "/file-does-not-exist", "user": "doesn't-matter"}
10
11    with pytest.raises(InterfaceError):
12        Connection(**conn_params)
13
14
15def test_internet_socket_connection_refused():
16    conn_params = {"port": 0, "user": "doesn't-matter"}
17
18    with pytest.raises(
19        InterfaceError,
20        match="Can't create a connection to host localhost and port 0 "
21        "\\(timeout is None and source_address is None\\).",
22    ):
23        Connection(**conn_params)
24
25
26def test_database_missing(db_kwargs):
27    db_kwargs["database"] = "missing-db"
28    with pytest.raises(DatabaseError):
29        Connection(**db_kwargs)
30
31
32def test_notify(con):
33    backend_pid = con.run("select pg_backend_pid()")[0][0]
34    assert list(con.notifications) == []
35    con.run("LISTEN test")
36    con.run("NOTIFY test")
37
38    con.run("VALUES (1, 2), (3, 4), (5, 6)")
39    assert len(con.notifications) == 1
40    assert con.notifications[0] == (backend_pid, "test", "")
41
42
43def test_notify_with_payload(con):
44    backend_pid = con.run("select pg_backend_pid()")[0][0]
45    assert list(con.notifications) == []
46    con.run("LISTEN test")
47    con.run("NOTIFY test, 'Parnham'")
48
49    con.run("VALUES (1, 2), (3, 4), (5, 6)")
50    assert len(con.notifications) == 1
51    assert con.notifications[0] == (backend_pid, "test", "Parnham")
52
53
54# This requires a line in pg_hba.conf that requires md5 for the database
55# pg8000_md5
56
57
58def test_md5(db_kwargs):
59    db_kwargs["database"] = "pg8000_md5"
60
61    # Should only raise an exception saying db doesn't exist
62    with pytest.raises(DatabaseError, match="3D000"):
63        Connection(**db_kwargs)
64
65
66# This requires a line in pg_hba.conf that requires 'password' for the
67# database pg8000_password
68
69
70def test_password(db_kwargs):
71    db_kwargs["database"] = "pg8000_password"
72
73    # Should only raise an exception saying db doesn't exist
74    with pytest.raises(DatabaseError, match="3D000"):
75        Connection(**db_kwargs)
76
77
78def test_unicode_databaseName(db_kwargs):
79    db_kwargs["database"] = "pg8000_sn\uFF6Fw"
80
81    # Should only raise an exception saying db doesn't exist
82    with pytest.raises(DatabaseError, match="3D000"):
83        Connection(**db_kwargs)
84
85
86def test_bytes_databaseName(db_kwargs):
87    """Should only raise an exception saying db doesn't exist"""
88
89    db_kwargs["database"] = bytes("pg8000_sn\uFF6Fw", "utf8")
90    with pytest.raises(DatabaseError, match="3D000"):
91        Connection(**db_kwargs)
92
93
94def test_bytes_password(con, db_kwargs):
95    # Create user
96    username = "boltzmann"
97    password = "cha\uFF6Fs"
98    con.run("create user " + username + " with password '" + password + "';")
99
100    db_kwargs["user"] = username
101    db_kwargs["password"] = password.encode("utf8")
102    db_kwargs["database"] = "pg8000_md5"
103    with pytest.raises(DatabaseError, match="3D000"):
104        Connection(**db_kwargs)
105
106    con.run("drop role " + username)
107
108
109def test_broken_pipe_read(con, db_kwargs):
110    db1 = Connection(**db_kwargs)
111    res = db1.run("select pg_backend_pid()")
112    pid1 = res[0][0]
113
114    con.run("select pg_terminate_backend(:v)", v=pid1)
115    with pytest.raises(InterfaceError, match="network error on read"):
116        db1.run("select 1")
117
118
119def test_broken_pipe_unpack(con):
120    res = con.run("select pg_backend_pid()")
121    pid1 = res[0][0]
122
123    with pytest.raises(InterfaceError, match="network error"):
124        con.run("select pg_terminate_backend(:v)", v=pid1)
125
126
127def test_broken_pipe_flush(con, db_kwargs):
128    db1 = Connection(**db_kwargs)
129    res = db1.run("select pg_backend_pid()")
130    pid1 = res[0][0]
131
132    con.run("select pg_terminate_backend(:v)", v=pid1)
133    try:
134        db1.run("select 1")
135    except BaseException:
136        pass
137
138    # Sometimes raises and sometime doesn't
139    try:
140        db1.close()
141    except InterfaceError as e:
142        assert str(e) == "network error on flush"
143
144
145def test_application_name(db_kwargs):
146    app_name = "my test application name"
147    db_kwargs["application_name"] = app_name
148    with Connection(**db_kwargs) as db:
149        res = db.run(
150            "select application_name from pg_stat_activity "
151            " where pid = pg_backend_pid()"
152        )
153
154        application_name = res[0][0]
155        assert application_name == app_name
156
157
158def test_application_name_integer(db_kwargs):
159    db_kwargs["application_name"] = 1
160    with pytest.raises(
161        InterfaceError,
162        match="The parameter application_name can't be of type <class 'int'>.",
163    ):
164        Connection(**db_kwargs)
165
166
167def test_application_name_bytearray(db_kwargs):
168    db_kwargs["application_name"] = bytearray(b"Philby")
169    Connection(**db_kwargs)
170
171
172class PG8000TestException(Exception):
173    pass
174
175
176def raise_exception(val):
177    raise PG8000TestException("oh noes!")
178
179
180def test_py_value_fail(con, mocker):
181    # Ensure that if types.py_value throws an exception, the original
182    # exception is raised (PG8000TestException), and the connection is
183    # still usable after the error.
184    mocker.patch.object(con, "py_types")
185    con.py_types = {Time: raise_exception}
186
187    with pytest.raises(PG8000TestException):
188        con.run("SELECT CAST(:v AS TIME)", v=Time(10, 30))
189
190        # ensure that the connection is still usable for a new query
191        res = con.run("VALUES ('hw3'::text)")
192        assert res[0][0] == "hw3"
193
194
195def test_no_data_error_recovery(con):
196    for i in range(1, 4):
197        with pytest.raises(DatabaseError) as e:
198            con.run("DROP TABLE t1")
199        assert e.value.args[0]["C"] == "42P01"
200        con.run("ROLLBACK")
201
202
203def test_closed_connection(con):
204    con.close()
205    with pytest.raises(InterfaceError, match="connection is closed"):
206        con.run("VALUES ('hw1'::text)")
207