1# postgresql/pg8000.py 2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors <see AUTHORS 3# file> 4# 5# This module is part of SQLAlchemy and is released under 6# the MIT License: https://www.opensource.org/licenses/mit-license.php 7r""" 8.. dialect:: postgresql+pg8000 9 :name: pg8000 10 :dbapi: pg8000 11 :connectstring: postgresql+pg8000://user:password@host:port/dbname[?key=value&key=value...] 12 :url: https://pypi.org/project/pg8000/ 13 14.. versionchanged:: 1.4 The pg8000 dialect has been updated for version 15 1.16.6 and higher, and is again part of SQLAlchemy's continuous integration 16 with full feature support. 17 18.. _pg8000_unicode: 19 20Unicode 21------- 22 23pg8000 will encode / decode string values between it and the server using the 24PostgreSQL ``client_encoding`` parameter; by default this is the value in 25the ``postgresql.conf`` file, which often defaults to ``SQL_ASCII``. 26Typically, this can be changed to ``utf-8``, as a more useful default:: 27 28 #client_encoding = sql_ascii # actually, defaults to database 29 # encoding 30 client_encoding = utf8 31 32The ``client_encoding`` can be overridden for a session by executing the SQL: 33 34SET CLIENT_ENCODING TO 'utf8'; 35 36SQLAlchemy will execute this SQL on all new connections based on the value 37passed to :func:`_sa.create_engine` using the ``client_encoding`` parameter:: 38 39 engine = create_engine( 40 "postgresql+pg8000://user:pass@host/dbname", client_encoding='utf8') 41 42.. _pg8000_ssl: 43 44SSL Connections 45--------------- 46 47pg8000 accepts a Python ``SSLContext`` object which may be specified using the 48:paramref:`_sa.create_engine.connect_args` dictionary:: 49 50 import ssl 51 ssl_context = ssl.create_default_context() 52 engine = sa.create_engine( 53 "postgresql+pg8000://scott:tiger@192.168.0.199/test", 54 connect_args={"ssl_context": ssl_context}, 55 ) 56 57If the server uses an automatically-generated certificate that is self-signed 58or does not match the host name (as seen from the client), it may also be 59necessary to disable hostname checking:: 60 61 import ssl 62 ssl_context = ssl.create_default_context() 63 ssl_context.check_hostname = False 64 ssl_context.verify_mode = ssl.CERT_NONE 65 engine = sa.create_engine( 66 "postgresql+pg8000://scott:tiger@192.168.0.199/test", 67 connect_args={"ssl_context": ssl_context}, 68 ) 69 70.. _pg8000_isolation_level: 71 72pg8000 Transaction Isolation Level 73------------------------------------- 74 75The pg8000 dialect offers the same isolation level settings as that 76of the :ref:`psycopg2 <psycopg2_isolation_level>` dialect: 77 78* ``READ COMMITTED`` 79* ``READ UNCOMMITTED`` 80* ``REPEATABLE READ`` 81* ``SERIALIZABLE`` 82* ``AUTOCOMMIT`` 83 84.. seealso:: 85 86 :ref:`postgresql_isolation_level` 87 88 :ref:`psycopg2_isolation_level` 89 90 91""" # noqa 92import decimal 93import re 94from uuid import UUID as _python_UUID 95 96from .array import ARRAY as PGARRAY 97from .base import _ColonCast 98from .base import _DECIMAL_TYPES 99from .base import _FLOAT_TYPES 100from .base import _INT_TYPES 101from .base import ENUM 102from .base import INTERVAL 103from .base import PGCompiler 104from .base import PGDialect 105from .base import PGExecutionContext 106from .base import PGIdentifierPreparer 107from .base import UUID 108from .json import JSON 109from .json import JSONB 110from .json import JSONPathType 111from ... import exc 112from ... import processors 113from ... import types as sqltypes 114from ... import util 115from ...sql.elements import quoted_name 116 117 118class _PGNumeric(sqltypes.Numeric): 119 def result_processor(self, dialect, coltype): 120 if self.asdecimal: 121 if coltype in _FLOAT_TYPES: 122 return processors.to_decimal_processor_factory( 123 decimal.Decimal, self._effective_decimal_return_scale 124 ) 125 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: 126 # pg8000 returns Decimal natively for 1700 127 return None 128 else: 129 raise exc.InvalidRequestError( 130 "Unknown PG numeric type: %d" % coltype 131 ) 132 else: 133 if coltype in _FLOAT_TYPES: 134 # pg8000 returns float natively for 701 135 return None 136 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: 137 return processors.to_float 138 else: 139 raise exc.InvalidRequestError( 140 "Unknown PG numeric type: %d" % coltype 141 ) 142 143 144class _PGNumericNoBind(_PGNumeric): 145 def bind_processor(self, dialect): 146 return None 147 148 149class _PGJSON(JSON): 150 def result_processor(self, dialect, coltype): 151 return None 152 153 def get_dbapi_type(self, dbapi): 154 return dbapi.JSON 155 156 157class _PGJSONB(JSONB): 158 def result_processor(self, dialect, coltype): 159 return None 160 161 def get_dbapi_type(self, dbapi): 162 return dbapi.JSONB 163 164 165class _PGJSONIndexType(sqltypes.JSON.JSONIndexType): 166 def get_dbapi_type(self, dbapi): 167 raise NotImplementedError("should not be here") 168 169 170class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType): 171 def get_dbapi_type(self, dbapi): 172 return dbapi.INTEGER 173 174 175class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType): 176 def get_dbapi_type(self, dbapi): 177 return dbapi.STRING 178 179 180class _PGJSONPathType(JSONPathType): 181 def get_dbapi_type(self, dbapi): 182 return 1009 183 184 185class _PGUUID(UUID): 186 def bind_processor(self, dialect): 187 if not self.as_uuid: 188 189 def process(value): 190 if value is not None: 191 value = _python_UUID(value) 192 return value 193 194 return process 195 196 def result_processor(self, dialect, coltype): 197 if not self.as_uuid: 198 199 def process(value): 200 if value is not None: 201 value = str(value) 202 return value 203 204 return process 205 206 207class _PGEnum(ENUM): 208 def get_dbapi_type(self, dbapi): 209 return dbapi.UNKNOWN 210 211 212class _PGInterval(INTERVAL): 213 def get_dbapi_type(self, dbapi): 214 return dbapi.INTERVAL 215 216 @classmethod 217 def adapt_emulated_to_native(cls, interval, **kw): 218 return _PGInterval(precision=interval.second_precision) 219 220 221class _PGTimeStamp(sqltypes.DateTime): 222 def get_dbapi_type(self, dbapi): 223 if self.timezone: 224 # TIMESTAMPTZOID 225 return 1184 226 else: 227 # TIMESTAMPOID 228 return 1114 229 230 231class _PGTime(sqltypes.Time): 232 def get_dbapi_type(self, dbapi): 233 return dbapi.TIME 234 235 236class _PGInteger(sqltypes.Integer): 237 def get_dbapi_type(self, dbapi): 238 return dbapi.INTEGER 239 240 241class _PGSmallInteger(sqltypes.SmallInteger): 242 def get_dbapi_type(self, dbapi): 243 return dbapi.INTEGER 244 245 246class _PGNullType(sqltypes.NullType): 247 def get_dbapi_type(self, dbapi): 248 return dbapi.NULLTYPE 249 250 251class _PGBigInteger(sqltypes.BigInteger): 252 def get_dbapi_type(self, dbapi): 253 return dbapi.BIGINTEGER 254 255 256class _PGBoolean(sqltypes.Boolean): 257 def get_dbapi_type(self, dbapi): 258 return dbapi.BOOLEAN 259 260 261class _PGARRAY(PGARRAY): 262 def bind_expression(self, bindvalue): 263 return _ColonCast(bindvalue, self) 264 265 266_server_side_id = util.counter() 267 268 269class PGExecutionContext_pg8000(PGExecutionContext): 270 def create_server_side_cursor(self): 271 ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:]) 272 return ServerSideCursor(self._dbapi_connection.cursor(), ident) 273 274 def pre_exec(self): 275 if not self.compiled: 276 return 277 278 279class ServerSideCursor: 280 server_side = True 281 282 def __init__(self, cursor, ident): 283 self.ident = ident 284 self.cursor = cursor 285 286 @property 287 def connection(self): 288 return self.cursor.connection 289 290 @property 291 def rowcount(self): 292 return self.cursor.rowcount 293 294 @property 295 def description(self): 296 return self.cursor.description 297 298 def execute(self, operation, args=(), stream=None): 299 op = "DECLARE " + self.ident + " NO SCROLL CURSOR FOR " + operation 300 self.cursor.execute(op, args, stream=stream) 301 return self 302 303 def executemany(self, operation, param_sets): 304 self.cursor.executemany(operation, param_sets) 305 return self 306 307 def fetchone(self): 308 self.cursor.execute("FETCH FORWARD 1 FROM " + self.ident) 309 return self.cursor.fetchone() 310 311 def fetchmany(self, num=None): 312 if num is None: 313 return self.fetchall() 314 else: 315 self.cursor.execute( 316 "FETCH FORWARD " + str(int(num)) + " FROM " + self.ident 317 ) 318 return self.cursor.fetchall() 319 320 def fetchall(self): 321 self.cursor.execute("FETCH FORWARD ALL FROM " + self.ident) 322 return self.cursor.fetchall() 323 324 def close(self): 325 self.cursor.execute("CLOSE " + self.ident) 326 self.cursor.close() 327 328 def setinputsizes(self, *sizes): 329 self.cursor.setinputsizes(*sizes) 330 331 def setoutputsize(self, size, column=None): 332 pass 333 334 335class PGCompiler_pg8000(PGCompiler): 336 def visit_mod_binary(self, binary, operator, **kw): 337 return ( 338 self.process(binary.left, **kw) 339 + " %% " 340 + self.process(binary.right, **kw) 341 ) 342 343 344class PGIdentifierPreparer_pg8000(PGIdentifierPreparer): 345 def __init__(self, *args, **kwargs): 346 PGIdentifierPreparer.__init__(self, *args, **kwargs) 347 self._double_percents = False 348 349 350class PGDialect_pg8000(PGDialect): 351 driver = "pg8000" 352 supports_statement_cache = True 353 354 supports_unicode_statements = True 355 356 supports_unicode_binds = True 357 358 default_paramstyle = "format" 359 supports_sane_multi_rowcount = True 360 execution_ctx_cls = PGExecutionContext_pg8000 361 statement_compiler = PGCompiler_pg8000 362 preparer = PGIdentifierPreparer_pg8000 363 supports_server_side_cursors = True 364 365 use_setinputsizes = True 366 367 # reversed as of pg8000 1.16.6. 1.16.5 and lower 368 # are no longer compatible 369 description_encoding = None 370 # description_encoding = "use_encoding" 371 372 colspecs = util.update_copy( 373 PGDialect.colspecs, 374 { 375 sqltypes.Numeric: _PGNumericNoBind, 376 sqltypes.Float: _PGNumeric, 377 sqltypes.JSON: _PGJSON, 378 sqltypes.Boolean: _PGBoolean, 379 sqltypes.NullType: _PGNullType, 380 JSONB: _PGJSONB, 381 sqltypes.JSON.JSONPathType: _PGJSONPathType, 382 sqltypes.JSON.JSONIndexType: _PGJSONIndexType, 383 sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType, 384 sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType, 385 UUID: _PGUUID, 386 sqltypes.Interval: _PGInterval, 387 INTERVAL: _PGInterval, 388 sqltypes.DateTime: _PGTimeStamp, 389 sqltypes.Time: _PGTime, 390 sqltypes.Integer: _PGInteger, 391 sqltypes.SmallInteger: _PGSmallInteger, 392 sqltypes.BigInteger: _PGBigInteger, 393 sqltypes.Enum: _PGEnum, 394 sqltypes.ARRAY: _PGARRAY, 395 }, 396 ) 397 398 def __init__(self, client_encoding=None, **kwargs): 399 PGDialect.__init__(self, **kwargs) 400 self.client_encoding = client_encoding 401 402 if self._dbapi_version < (1, 16, 6): 403 raise NotImplementedError("pg8000 1.16.6 or greater is required") 404 405 @util.memoized_property 406 def _dbapi_version(self): 407 if self.dbapi and hasattr(self.dbapi, "__version__"): 408 return tuple( 409 [ 410 int(x) 411 for x in re.findall( 412 r"(\d+)(?:[-\.]?|$)", self.dbapi.__version__ 413 ) 414 ] 415 ) 416 else: 417 return (99, 99, 99) 418 419 @classmethod 420 def dbapi(cls): 421 return __import__("pg8000") 422 423 def create_connect_args(self, url): 424 opts = url.translate_connect_args(username="user") 425 if "port" in opts: 426 opts["port"] = int(opts["port"]) 427 opts.update(url.query) 428 return ([], opts) 429 430 def is_disconnect(self, e, connection, cursor): 431 if isinstance(e, self.dbapi.InterfaceError) and "network error" in str( 432 e 433 ): 434 # new as of pg8000 1.19.0 for broken connections 435 return True 436 437 # connection was closed normally 438 return "connection is closed" in str(e) 439 440 def set_isolation_level(self, connection, level): 441 level = level.replace("_", " ") 442 443 # adjust for ConnectionFairy possibly being present 444 if hasattr(connection, "dbapi_connection"): 445 connection = connection.dbapi_connection 446 447 if level == "AUTOCOMMIT": 448 connection.autocommit = True 449 elif level in self._isolation_lookup: 450 connection.autocommit = False 451 cursor = connection.cursor() 452 cursor.execute( 453 "SET SESSION CHARACTERISTICS AS TRANSACTION " 454 "ISOLATION LEVEL %s" % level 455 ) 456 cursor.execute("COMMIT") 457 cursor.close() 458 else: 459 raise exc.ArgumentError( 460 "Invalid value '%s' for isolation_level. " 461 "Valid isolation levels for %s are %s or AUTOCOMMIT" 462 % (level, self.name, ", ".join(self._isolation_lookup)) 463 ) 464 465 def set_readonly(self, connection, value): 466 cursor = connection.cursor() 467 try: 468 cursor.execute( 469 "SET SESSION CHARACTERISTICS AS TRANSACTION %s" 470 % ("READ ONLY" if value else "READ WRITE") 471 ) 472 cursor.execute("COMMIT") 473 finally: 474 cursor.close() 475 476 def get_readonly(self, connection): 477 cursor = connection.cursor() 478 try: 479 cursor.execute("show transaction_read_only") 480 val = cursor.fetchone()[0] 481 finally: 482 cursor.close() 483 484 return val == "on" 485 486 def set_deferrable(self, connection, value): 487 cursor = connection.cursor() 488 try: 489 cursor.execute( 490 "SET SESSION CHARACTERISTICS AS TRANSACTION %s" 491 % ("DEFERRABLE" if value else "NOT DEFERRABLE") 492 ) 493 cursor.execute("COMMIT") 494 finally: 495 cursor.close() 496 497 def get_deferrable(self, connection): 498 cursor = connection.cursor() 499 try: 500 cursor.execute("show transaction_deferrable") 501 val = cursor.fetchone()[0] 502 finally: 503 cursor.close() 504 505 return val == "on" 506 507 def set_client_encoding(self, connection, client_encoding): 508 # adjust for ConnectionFairy possibly being present 509 if hasattr(connection, "dbapi_connection"): 510 connection = connection.dbapi_connection 511 512 cursor = connection.cursor() 513 cursor.execute("SET CLIENT_ENCODING TO '" + client_encoding + "'") 514 cursor.execute("COMMIT") 515 cursor.close() 516 517 def do_set_input_sizes(self, cursor, list_of_tuples, context): 518 if self.positional: 519 cursor.setinputsizes( 520 *[dbtype for key, dbtype, sqltype in list_of_tuples] 521 ) 522 else: 523 cursor.setinputsizes( 524 **{ 525 key: dbtype 526 for key, dbtype, sqltype in list_of_tuples 527 if dbtype 528 } 529 ) 530 531 def do_begin_twophase(self, connection, xid): 532 connection.connection.tpc_begin((0, xid, "")) 533 534 def do_prepare_twophase(self, connection, xid): 535 connection.connection.tpc_prepare() 536 537 def do_rollback_twophase( 538 self, connection, xid, is_prepared=True, recover=False 539 ): 540 connection.connection.tpc_rollback((0, xid, "")) 541 542 def do_commit_twophase( 543 self, connection, xid, is_prepared=True, recover=False 544 ): 545 connection.connection.tpc_commit((0, xid, "")) 546 547 def do_recover_twophase(self, connection): 548 return [row[1] for row in connection.connection.tpc_recover()] 549 550 def on_connect(self): 551 fns = [] 552 553 def on_connect(conn): 554 conn.py_types[quoted_name] = conn.py_types[util.text_type] 555 556 fns.append(on_connect) 557 558 if self.client_encoding is not None: 559 560 def on_connect(conn): 561 self.set_client_encoding(conn, self.client_encoding) 562 563 fns.append(on_connect) 564 565 if self.isolation_level is not None: 566 567 def on_connect(conn): 568 self.set_isolation_level(conn, self.isolation_level) 569 570 fns.append(on_connect) 571 572 if self._json_deserializer: 573 574 def on_connect(conn): 575 # json 576 conn.register_in_adapter(114, self._json_deserializer) 577 578 # jsonb 579 conn.register_in_adapter(3802, self._json_deserializer) 580 581 fns.append(on_connect) 582 583 if len(fns) > 0: 584 585 def on_connect(conn): 586 for fn in fns: 587 fn(conn) 588 589 return on_connect 590 else: 591 return None 592 593 594dialect = PGDialect_pg8000 595