1from datetime import time as Time 2 3import pytest 4 5from pg8000.native import Connection, DatabaseError, InterfaceError 6 7 8def test_unix_socket_missing(): 9 conn_params = {"unix_sock": "/file-does-not-exist", "user": "doesn't-matter"} 10 11 with pytest.raises(InterfaceError): 12 Connection(**conn_params) 13 14 15def test_internet_socket_connection_refused(): 16 conn_params = {"port": 0, "user": "doesn't-matter"} 17 18 with pytest.raises( 19 InterfaceError, 20 match="Can't create a connection to host localhost and port 0 " 21 "\\(timeout is None and source_address is None\\).", 22 ): 23 Connection(**conn_params) 24 25 26def test_database_missing(db_kwargs): 27 db_kwargs["database"] = "missing-db" 28 with pytest.raises(DatabaseError): 29 Connection(**db_kwargs) 30 31 32def test_notify(con): 33 backend_pid = con.run("select pg_backend_pid()")[0][0] 34 assert list(con.notifications) == [] 35 con.run("LISTEN test") 36 con.run("NOTIFY test") 37 38 con.run("VALUES (1, 2), (3, 4), (5, 6)") 39 assert len(con.notifications) == 1 40 assert con.notifications[0] == (backend_pid, "test", "") 41 42 43def test_notify_with_payload(con): 44 backend_pid = con.run("select pg_backend_pid()")[0][0] 45 assert list(con.notifications) == [] 46 con.run("LISTEN test") 47 con.run("NOTIFY test, 'Parnham'") 48 49 con.run("VALUES (1, 2), (3, 4), (5, 6)") 50 assert len(con.notifications) == 1 51 assert con.notifications[0] == (backend_pid, "test", "Parnham") 52 53 54# This requires a line in pg_hba.conf that requires md5 for the database 55# pg8000_md5 56 57 58def test_md5(db_kwargs): 59 db_kwargs["database"] = "pg8000_md5" 60 61 # Should only raise an exception saying db doesn't exist 62 with pytest.raises(DatabaseError, match="3D000"): 63 Connection(**db_kwargs) 64 65 66# This requires a line in pg_hba.conf that requires 'password' for the 67# database pg8000_password 68 69 70def test_password(db_kwargs): 71 db_kwargs["database"] = "pg8000_password" 72 73 # Should only raise an exception saying db doesn't exist 74 with pytest.raises(DatabaseError, match="3D000"): 75 Connection(**db_kwargs) 76 77 78def test_unicode_databaseName(db_kwargs): 79 db_kwargs["database"] = "pg8000_sn\uFF6Fw" 80 81 # Should only raise an exception saying db doesn't exist 82 with pytest.raises(DatabaseError, match="3D000"): 83 Connection(**db_kwargs) 84 85 86def test_bytes_databaseName(db_kwargs): 87 """Should only raise an exception saying db doesn't exist""" 88 89 db_kwargs["database"] = bytes("pg8000_sn\uFF6Fw", "utf8") 90 with pytest.raises(DatabaseError, match="3D000"): 91 Connection(**db_kwargs) 92 93 94def test_bytes_password(con, db_kwargs): 95 # Create user 96 username = "boltzmann" 97 password = "cha\uFF6Fs" 98 con.run("create user " + username + " with password '" + password + "';") 99 100 db_kwargs["user"] = username 101 db_kwargs["password"] = password.encode("utf8") 102 db_kwargs["database"] = "pg8000_md5" 103 with pytest.raises(DatabaseError, match="3D000"): 104 Connection(**db_kwargs) 105 106 con.run("drop role " + username) 107 108 109def test_broken_pipe_read(con, db_kwargs): 110 db1 = Connection(**db_kwargs) 111 res = db1.run("select pg_backend_pid()") 112 pid1 = res[0][0] 113 114 con.run("select pg_terminate_backend(:v)", v=pid1) 115 with pytest.raises(InterfaceError, match="network error on read"): 116 db1.run("select 1") 117 118 119def test_broken_pipe_unpack(con): 120 res = con.run("select pg_backend_pid()") 121 pid1 = res[0][0] 122 123 with pytest.raises(InterfaceError, match="network error"): 124 con.run("select pg_terminate_backend(:v)", v=pid1) 125 126 127def test_broken_pipe_flush(con, db_kwargs): 128 db1 = Connection(**db_kwargs) 129 res = db1.run("select pg_backend_pid()") 130 pid1 = res[0][0] 131 132 con.run("select pg_terminate_backend(:v)", v=pid1) 133 try: 134 db1.run("select 1") 135 except BaseException: 136 pass 137 138 # Sometimes raises and sometime doesn't 139 try: 140 db1.close() 141 except InterfaceError as e: 142 assert str(e) == "network error on flush" 143 144 145def test_application_name(db_kwargs): 146 app_name = "my test application name" 147 db_kwargs["application_name"] = app_name 148 with Connection(**db_kwargs) as db: 149 res = db.run( 150 "select application_name from pg_stat_activity " 151 " where pid = pg_backend_pid()" 152 ) 153 154 application_name = res[0][0] 155 assert application_name == app_name 156 157 158def test_application_name_integer(db_kwargs): 159 db_kwargs["application_name"] = 1 160 with pytest.raises( 161 InterfaceError, 162 match="The parameter application_name can't be of type <class 'int'>.", 163 ): 164 Connection(**db_kwargs) 165 166 167def test_application_name_bytearray(db_kwargs): 168 db_kwargs["application_name"] = bytearray(b"Philby") 169 Connection(**db_kwargs) 170 171 172class PG8000TestException(Exception): 173 pass 174 175 176def raise_exception(val): 177 raise PG8000TestException("oh noes!") 178 179 180def test_py_value_fail(con, mocker): 181 # Ensure that if types.py_value throws an exception, the original 182 # exception is raised (PG8000TestException), and the connection is 183 # still usable after the error. 184 mocker.patch.object(con, "py_types") 185 con.py_types = {Time: raise_exception} 186 187 with pytest.raises(PG8000TestException): 188 con.run("SELECT CAST(:v AS TIME)", v=Time(10, 30)) 189 190 # ensure that the connection is still usable for a new query 191 res = con.run("VALUES ('hw3'::text)") 192 assert res[0][0] == "hw3" 193 194 195def test_no_data_error_recovery(con): 196 for i in range(1, 4): 197 with pytest.raises(DatabaseError) as e: 198 con.run("DROP TABLE t1") 199 assert e.value.args[0]["C"] == "42P01" 200 con.run("ROLLBACK") 201 202 203def test_closed_connection(con): 204 con.close() 205 with pytest.raises(InterfaceError, match="connection is closed"): 206 con.run("VALUES ('hw1'::text)") 207