1# coding: utf-8 2from sqlalchemy import and_ 3from sqlalchemy import cast 4from sqlalchemy import Column 5from sqlalchemy import Computed 6from sqlalchemy import Date 7from sqlalchemy import delete 8from sqlalchemy import Enum 9from sqlalchemy import exc 10from sqlalchemy import func 11from sqlalchemy import Index 12from sqlalchemy import Integer 13from sqlalchemy import MetaData 14from sqlalchemy import null 15from sqlalchemy import schema 16from sqlalchemy import select 17from sqlalchemy import Sequence 18from sqlalchemy import String 19from sqlalchemy import Table 20from sqlalchemy import testing 21from sqlalchemy import Text 22from sqlalchemy import text 23from sqlalchemy import types as sqltypes 24from sqlalchemy import update 25from sqlalchemy.dialects import postgresql 26from sqlalchemy.dialects.postgresql import aggregate_order_by 27from sqlalchemy.dialects.postgresql import ARRAY as PG_ARRAY 28from sqlalchemy.dialects.postgresql import array 29from sqlalchemy.dialects.postgresql import array_agg as pg_array_agg 30from sqlalchemy.dialects.postgresql import ExcludeConstraint 31from sqlalchemy.dialects.postgresql import insert 32from sqlalchemy.dialects.postgresql import TSRANGE 33from sqlalchemy.dialects.postgresql.base import PGDialect 34from sqlalchemy.dialects.postgresql.psycopg2 import PGDialect_psycopg2 35from sqlalchemy.orm import aliased 36from sqlalchemy.orm import mapper 37from sqlalchemy.orm import Session 38from sqlalchemy.sql import column 39from sqlalchemy.sql import literal_column 40from sqlalchemy.sql import operators 41from sqlalchemy.sql import table 42from sqlalchemy.sql import util as sql_util 43from sqlalchemy.testing import engines 44from sqlalchemy.testing import fixtures 45from sqlalchemy.testing.assertions import assert_raises 46from sqlalchemy.testing.assertions import assert_raises_message 47from sqlalchemy.testing.assertions import AssertsCompiledSQL 48from sqlalchemy.testing.assertions import expect_warnings 49from sqlalchemy.testing.assertions import is_ 50from sqlalchemy.util import OrderedDict 51from sqlalchemy.util import u 52 53 54class SequenceTest(fixtures.TestBase, AssertsCompiledSQL): 55 __prefer__ = "postgresql" 56 57 def test_format(self): 58 seq = Sequence("my_seq_no_schema") 59 dialect = postgresql.dialect() 60 assert ( 61 dialect.identifier_preparer.format_sequence(seq) 62 == "my_seq_no_schema" 63 ) 64 seq = Sequence("my_seq", schema="some_schema") 65 assert ( 66 dialect.identifier_preparer.format_sequence(seq) 67 == "some_schema.my_seq" 68 ) 69 seq = Sequence("My_Seq", schema="Some_Schema") 70 assert ( 71 dialect.identifier_preparer.format_sequence(seq) 72 == '"Some_Schema"."My_Seq"' 73 ) 74 75 @testing.only_on("postgresql", "foo") 76 @testing.provide_metadata 77 def test_reverse_eng_name(self): 78 metadata = self.metadata 79 engine = engines.testing_engine(options=dict(implicit_returning=False)) 80 for tname, cname in [ 81 ("tb1" * 30, "abc"), 82 ("tb2", "abc" * 30), 83 ("tb3" * 30, "abc" * 30), 84 ("tb4", "abc"), 85 ]: 86 t = Table( 87 tname[:57], 88 metadata, 89 Column(cname[:57], Integer, primary_key=True), 90 ) 91 t.create(engine) 92 r = engine.execute(t.insert()) 93 assert r.inserted_primary_key == [1] 94 95 96class CompileTest(fixtures.TestBase, AssertsCompiledSQL): 97 98 __dialect__ = postgresql.dialect() 99 100 def test_update_returning(self): 101 dialect = postgresql.dialect() 102 table1 = table( 103 "mytable", 104 column("myid", Integer), 105 column("name", String(128)), 106 column("description", String(128)), 107 ) 108 u = update(table1, values=dict(name="foo")).returning( 109 table1.c.myid, table1.c.name 110 ) 111 self.assert_compile( 112 u, 113 "UPDATE mytable SET name=%(name)s " 114 "RETURNING mytable.myid, mytable.name", 115 dialect=dialect, 116 ) 117 u = update(table1, values=dict(name="foo")).returning(table1) 118 self.assert_compile( 119 u, 120 "UPDATE mytable SET name=%(name)s " 121 "RETURNING mytable.myid, mytable.name, " 122 "mytable.description", 123 dialect=dialect, 124 ) 125 u = update(table1, values=dict(name="foo")).returning( 126 func.length(table1.c.name) 127 ) 128 self.assert_compile( 129 u, 130 "UPDATE mytable SET name=%(name)s " 131 "RETURNING length(mytable.name) AS length_1", 132 dialect=dialect, 133 ) 134 135 def test_insert_returning(self): 136 dialect = postgresql.dialect() 137 table1 = table( 138 "mytable", 139 column("myid", Integer), 140 column("name", String(128)), 141 column("description", String(128)), 142 ) 143 144 i = insert(table1, values=dict(name="foo")).returning( 145 table1.c.myid, table1.c.name 146 ) 147 self.assert_compile( 148 i, 149 "INSERT INTO mytable (name) VALUES " 150 "(%(name)s) RETURNING mytable.myid, " 151 "mytable.name", 152 dialect=dialect, 153 ) 154 i = insert(table1, values=dict(name="foo")).returning(table1) 155 self.assert_compile( 156 i, 157 "INSERT INTO mytable (name) VALUES " 158 "(%(name)s) RETURNING mytable.myid, " 159 "mytable.name, mytable.description", 160 dialect=dialect, 161 ) 162 i = insert(table1, values=dict(name="foo")).returning( 163 func.length(table1.c.name) 164 ) 165 self.assert_compile( 166 i, 167 "INSERT INTO mytable (name) VALUES " 168 "(%(name)s) RETURNING length(mytable.name) " 169 "AS length_1", 170 dialect=dialect, 171 ) 172 173 def test_create_drop_enum(self): 174 # test escaping and unicode within CREATE TYPE for ENUM 175 typ = postgresql.ENUM( 176 "val1", "val2", "val's 3", u("méil"), name="myname" 177 ) 178 self.assert_compile( 179 postgresql.CreateEnumType(typ), 180 u( 181 "CREATE TYPE myname AS " 182 "ENUM ('val1', 'val2', 'val''s 3', 'méil')" 183 ), 184 ) 185 186 typ = postgresql.ENUM("val1", "val2", "val's 3", name="PleaseQuoteMe") 187 self.assert_compile( 188 postgresql.CreateEnumType(typ), 189 'CREATE TYPE "PleaseQuoteMe" AS ENUM ' 190 "('val1', 'val2', 'val''s 3')", 191 ) 192 193 def test_generic_enum(self): 194 e1 = Enum("x", "y", "z", name="somename") 195 e2 = Enum("x", "y", "z", name="somename", schema="someschema") 196 self.assert_compile( 197 postgresql.CreateEnumType(e1), 198 "CREATE TYPE somename AS ENUM ('x', 'y', 'z')", 199 ) 200 self.assert_compile( 201 postgresql.CreateEnumType(e2), 202 "CREATE TYPE someschema.somename AS ENUM " "('x', 'y', 'z')", 203 ) 204 self.assert_compile(postgresql.DropEnumType(e1), "DROP TYPE somename") 205 self.assert_compile( 206 postgresql.DropEnumType(e2), "DROP TYPE someschema.somename" 207 ) 208 t1 = Table("sometable", MetaData(), Column("somecolumn", e1)) 209 self.assert_compile( 210 schema.CreateTable(t1), 211 "CREATE TABLE sometable (somecolumn " "somename)", 212 ) 213 t1 = Table( 214 "sometable", 215 MetaData(), 216 Column("somecolumn", Enum("x", "y", "z", native_enum=False)), 217 ) 218 self.assert_compile( 219 schema.CreateTable(t1), 220 "CREATE TABLE sometable (somecolumn " 221 "VARCHAR(1), CHECK (somecolumn IN ('x', " 222 "'y', 'z')))", 223 ) 224 225 def test_create_type_schema_translate(self): 226 e1 = Enum("x", "y", "z", name="somename") 227 e2 = Enum("x", "y", "z", name="somename", schema="someschema") 228 schema_translate_map = {None: "foo", "someschema": "bar"} 229 230 self.assert_compile( 231 postgresql.CreateEnumType(e1), 232 "CREATE TYPE foo.somename AS ENUM ('x', 'y', 'z')", 233 schema_translate_map=schema_translate_map, 234 ) 235 236 self.assert_compile( 237 postgresql.CreateEnumType(e2), 238 "CREATE TYPE bar.somename AS ENUM ('x', 'y', 'z')", 239 schema_translate_map=schema_translate_map, 240 ) 241 242 def test_create_table_with_schema_type_schema_translate(self): 243 e1 = Enum("x", "y", "z", name="somename") 244 e2 = Enum("x", "y", "z", name="somename", schema="someschema") 245 schema_translate_map = {None: "foo", "someschema": "bar"} 246 247 table = Table( 248 "some_table", MetaData(), Column("q", e1), Column("p", e2) 249 ) 250 from sqlalchemy.schema import CreateTable 251 252 self.assert_compile( 253 CreateTable(table), 254 "CREATE TABLE foo.some_table (q foo.somename, p bar.somename)", 255 schema_translate_map=schema_translate_map, 256 ) 257 258 def test_create_table_with_tablespace(self): 259 m = MetaData() 260 tbl = Table( 261 "atable", 262 m, 263 Column("id", Integer), 264 postgresql_tablespace="sometablespace", 265 ) 266 self.assert_compile( 267 schema.CreateTable(tbl), 268 "CREATE TABLE atable (id INTEGER) TABLESPACE sometablespace", 269 ) 270 271 def test_create_table_with_tablespace_quoted(self): 272 # testing quoting of tablespace name 273 m = MetaData() 274 tbl = Table( 275 "anothertable", 276 m, 277 Column("id", Integer), 278 postgresql_tablespace="table", 279 ) 280 self.assert_compile( 281 schema.CreateTable(tbl), 282 'CREATE TABLE anothertable (id INTEGER) TABLESPACE "table"', 283 ) 284 285 def test_create_table_inherits(self): 286 m = MetaData() 287 tbl = Table( 288 "atable", m, Column("id", Integer), postgresql_inherits="i1" 289 ) 290 self.assert_compile( 291 schema.CreateTable(tbl), 292 "CREATE TABLE atable (id INTEGER) INHERITS ( i1 )", 293 ) 294 295 def test_create_table_inherits_tuple(self): 296 m = MetaData() 297 tbl = Table( 298 "atable", 299 m, 300 Column("id", Integer), 301 postgresql_inherits=("i1", "i2"), 302 ) 303 self.assert_compile( 304 schema.CreateTable(tbl), 305 "CREATE TABLE atable (id INTEGER) INHERITS ( i1, i2 )", 306 ) 307 308 def test_create_table_inherits_quoting(self): 309 m = MetaData() 310 tbl = Table( 311 "atable", 312 m, 313 Column("id", Integer), 314 postgresql_inherits=("Quote Me", "quote Me Too"), 315 ) 316 self.assert_compile( 317 schema.CreateTable(tbl), 318 "CREATE TABLE atable (id INTEGER) INHERITS " 319 '( "Quote Me", "quote Me Too" )', 320 ) 321 322 def test_create_table_partition_by_list(self): 323 m = MetaData() 324 tbl = Table( 325 "atable", 326 m, 327 Column("id", Integer), 328 Column("part_column", Integer), 329 postgresql_partition_by="LIST (part_column)", 330 ) 331 self.assert_compile( 332 schema.CreateTable(tbl), 333 "CREATE TABLE atable (id INTEGER, part_column INTEGER) " 334 "PARTITION BY LIST (part_column)", 335 ) 336 337 def test_create_table_partition_by_range(self): 338 m = MetaData() 339 tbl = Table( 340 "atable", 341 m, 342 Column("id", Integer), 343 Column("part_column", Integer), 344 postgresql_partition_by="RANGE (part_column)", 345 ) 346 self.assert_compile( 347 schema.CreateTable(tbl), 348 "CREATE TABLE atable (id INTEGER, part_column INTEGER) " 349 "PARTITION BY RANGE (part_column)", 350 ) 351 352 def test_create_table_with_oids(self): 353 m = MetaData() 354 tbl = Table( 355 "atable", m, Column("id", Integer), postgresql_with_oids=True 356 ) 357 self.assert_compile( 358 schema.CreateTable(tbl), 359 "CREATE TABLE atable (id INTEGER) WITH OIDS", 360 ) 361 362 tbl2 = Table( 363 "anothertable", 364 m, 365 Column("id", Integer), 366 postgresql_with_oids=False, 367 ) 368 self.assert_compile( 369 schema.CreateTable(tbl2), 370 "CREATE TABLE anothertable (id INTEGER) WITHOUT OIDS", 371 ) 372 373 def test_create_table_with_oncommit_option(self): 374 m = MetaData() 375 tbl = Table( 376 "atable", m, Column("id", Integer), postgresql_on_commit="drop" 377 ) 378 self.assert_compile( 379 schema.CreateTable(tbl), 380 "CREATE TABLE atable (id INTEGER) ON COMMIT DROP", 381 ) 382 383 def test_create_table_with_multiple_options(self): 384 m = MetaData() 385 tbl = Table( 386 "atable", 387 m, 388 Column("id", Integer), 389 postgresql_tablespace="sometablespace", 390 postgresql_with_oids=False, 391 postgresql_on_commit="preserve_rows", 392 ) 393 self.assert_compile( 394 schema.CreateTable(tbl), 395 "CREATE TABLE atable (id INTEGER) WITHOUT OIDS " 396 "ON COMMIT PRESERVE ROWS TABLESPACE sometablespace", 397 ) 398 399 def test_create_partial_index(self): 400 m = MetaData() 401 tbl = Table("testtbl", m, Column("data", Integer)) 402 idx = Index( 403 "test_idx1", 404 tbl.c.data, 405 postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10), 406 ) 407 idx = Index( 408 "test_idx1", 409 tbl.c.data, 410 postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10), 411 ) 412 413 # test quoting and all that 414 415 idx2 = Index( 416 "test_idx2", 417 tbl.c.data, 418 postgresql_where=and_(tbl.c.data > "a", tbl.c.data < "b's"), 419 ) 420 self.assert_compile( 421 schema.CreateIndex(idx), 422 "CREATE INDEX test_idx1 ON testtbl (data) " 423 "WHERE data > 5 AND data < 10", 424 dialect=postgresql.dialect(), 425 ) 426 self.assert_compile( 427 schema.CreateIndex(idx2), 428 "CREATE INDEX test_idx2 ON testtbl (data) " 429 "WHERE data > 'a' AND data < 'b''s'", 430 dialect=postgresql.dialect(), 431 ) 432 433 def test_create_index_with_ops(self): 434 m = MetaData() 435 tbl = Table( 436 "testtbl", 437 m, 438 Column("data", String), 439 Column("data2", Integer, key="d2"), 440 ) 441 442 idx = Index( 443 "test_idx1", 444 tbl.c.data, 445 postgresql_ops={"data": "text_pattern_ops"}, 446 ) 447 448 idx2 = Index( 449 "test_idx2", 450 tbl.c.data, 451 tbl.c.d2, 452 postgresql_ops={"data": "text_pattern_ops", "d2": "int4_ops"}, 453 ) 454 455 self.assert_compile( 456 schema.CreateIndex(idx), 457 "CREATE INDEX test_idx1 ON testtbl " "(data text_pattern_ops)", 458 dialect=postgresql.dialect(), 459 ) 460 self.assert_compile( 461 schema.CreateIndex(idx2), 462 "CREATE INDEX test_idx2 ON testtbl " 463 "(data text_pattern_ops, data2 int4_ops)", 464 dialect=postgresql.dialect(), 465 ) 466 467 def test_create_index_with_labeled_ops(self): 468 m = MetaData() 469 tbl = Table( 470 "testtbl", 471 m, 472 Column("data", String), 473 Column("data2", Integer, key="d2"), 474 ) 475 476 idx = Index( 477 "test_idx1", 478 func.lower(tbl.c.data).label("data_lower"), 479 postgresql_ops={"data_lower": "text_pattern_ops"}, 480 ) 481 482 idx2 = Index( 483 "test_idx2", 484 (func.xyz(tbl.c.data) + tbl.c.d2).label("bar"), 485 tbl.c.d2.label("foo"), 486 postgresql_ops={"bar": "text_pattern_ops", "foo": "int4_ops"}, 487 ) 488 489 self.assert_compile( 490 schema.CreateIndex(idx), 491 "CREATE INDEX test_idx1 ON testtbl " 492 "(lower(data) text_pattern_ops)", 493 dialect=postgresql.dialect(), 494 ) 495 self.assert_compile( 496 schema.CreateIndex(idx2), 497 "CREATE INDEX test_idx2 ON testtbl " 498 "((xyz(data) + data2) text_pattern_ops, " 499 "data2 int4_ops)", 500 dialect=postgresql.dialect(), 501 ) 502 503 def test_create_index_with_text_or_composite(self): 504 m = MetaData() 505 tbl = Table("testtbl", m, Column("d1", String), Column("d2", Integer)) 506 507 idx = Index("test_idx1", text("x")) 508 tbl.append_constraint(idx) 509 510 idx2 = Index("test_idx2", text("y"), tbl.c.d2) 511 512 idx3 = Index( 513 "test_idx2", 514 tbl.c.d1, 515 text("y"), 516 tbl.c.d2, 517 postgresql_ops={"d1": "x1", "d2": "x2"}, 518 ) 519 520 idx4 = Index( 521 "test_idx2", 522 tbl.c.d1, 523 tbl.c.d2 > 5, 524 text("q"), 525 postgresql_ops={"d1": "x1", "d2": "x2"}, 526 ) 527 528 idx5 = Index( 529 "test_idx2", 530 tbl.c.d1, 531 (tbl.c.d2 > 5).label("g"), 532 text("q"), 533 postgresql_ops={"d1": "x1", "g": "x2"}, 534 ) 535 536 self.assert_compile( 537 schema.CreateIndex(idx), "CREATE INDEX test_idx1 ON testtbl (x)" 538 ) 539 self.assert_compile( 540 schema.CreateIndex(idx2), 541 "CREATE INDEX test_idx2 ON testtbl (y, d2)", 542 ) 543 self.assert_compile( 544 schema.CreateIndex(idx3), 545 "CREATE INDEX test_idx2 ON testtbl (d1 x1, y, d2 x2)", 546 ) 547 548 # note that at the moment we do not expect the 'd2' op to 549 # pick up on the "d2 > 5" expression 550 self.assert_compile( 551 schema.CreateIndex(idx4), 552 "CREATE INDEX test_idx2 ON testtbl (d1 x1, (d2 > 5), q)", 553 ) 554 555 # however it does work if we label! 556 self.assert_compile( 557 schema.CreateIndex(idx5), 558 "CREATE INDEX test_idx2 ON testtbl (d1 x1, (d2 > 5) x2, q)", 559 ) 560 561 def test_create_index_with_using(self): 562 m = MetaData() 563 tbl = Table("testtbl", m, Column("data", String)) 564 565 idx1 = Index("test_idx1", tbl.c.data) 566 idx2 = Index("test_idx2", tbl.c.data, postgresql_using="btree") 567 idx3 = Index("test_idx3", tbl.c.data, postgresql_using="hash") 568 569 self.assert_compile( 570 schema.CreateIndex(idx1), 571 "CREATE INDEX test_idx1 ON testtbl " "(data)", 572 dialect=postgresql.dialect(), 573 ) 574 self.assert_compile( 575 schema.CreateIndex(idx2), 576 "CREATE INDEX test_idx2 ON testtbl " "USING btree (data)", 577 dialect=postgresql.dialect(), 578 ) 579 self.assert_compile( 580 schema.CreateIndex(idx3), 581 "CREATE INDEX test_idx3 ON testtbl " "USING hash (data)", 582 dialect=postgresql.dialect(), 583 ) 584 585 def test_create_index_with_with(self): 586 m = MetaData() 587 tbl = Table("testtbl", m, Column("data", String)) 588 589 idx1 = Index("test_idx1", tbl.c.data) 590 idx2 = Index( 591 "test_idx2", tbl.c.data, postgresql_with={"fillfactor": 50} 592 ) 593 idx3 = Index( 594 "test_idx3", 595 tbl.c.data, 596 postgresql_using="gist", 597 postgresql_with={"buffering": "off"}, 598 ) 599 600 self.assert_compile( 601 schema.CreateIndex(idx1), 602 "CREATE INDEX test_idx1 ON testtbl " "(data)", 603 ) 604 self.assert_compile( 605 schema.CreateIndex(idx2), 606 "CREATE INDEX test_idx2 ON testtbl " 607 "(data) " 608 "WITH (fillfactor = 50)", 609 ) 610 self.assert_compile( 611 schema.CreateIndex(idx3), 612 "CREATE INDEX test_idx3 ON testtbl " 613 "USING gist (data) " 614 "WITH (buffering = off)", 615 ) 616 617 def test_create_index_with_using_unusual_conditions(self): 618 m = MetaData() 619 tbl = Table("testtbl", m, Column("data", String)) 620 621 self.assert_compile( 622 schema.CreateIndex( 623 Index("test_idx1", tbl.c.data, postgresql_using="GIST") 624 ), 625 "CREATE INDEX test_idx1 ON testtbl " "USING gist (data)", 626 ) 627 628 self.assert_compile( 629 schema.CreateIndex( 630 Index( 631 "test_idx1", 632 tbl.c.data, 633 postgresql_using="some_custom_method", 634 ) 635 ), 636 "CREATE INDEX test_idx1 ON testtbl " 637 "USING some_custom_method (data)", 638 ) 639 640 assert_raises_message( 641 exc.CompileError, 642 "Unexpected SQL phrase: 'gin invalid sql'", 643 schema.CreateIndex( 644 Index( 645 "test_idx2", tbl.c.data, postgresql_using="gin invalid sql" 646 ) 647 ).compile, 648 dialect=postgresql.dialect(), 649 ) 650 651 def test_create_index_with_tablespace(self): 652 m = MetaData() 653 tbl = Table("testtbl", m, Column("data", String)) 654 655 idx1 = Index("test_idx1", tbl.c.data) 656 idx2 = Index( 657 "test_idx2", tbl.c.data, postgresql_tablespace="sometablespace" 658 ) 659 idx3 = Index( 660 "test_idx3", 661 tbl.c.data, 662 postgresql_tablespace="another table space", 663 ) 664 665 self.assert_compile( 666 schema.CreateIndex(idx1), 667 "CREATE INDEX test_idx1 ON testtbl " "(data)", 668 dialect=postgresql.dialect(), 669 ) 670 self.assert_compile( 671 schema.CreateIndex(idx2), 672 "CREATE INDEX test_idx2 ON testtbl " 673 "(data) " 674 "TABLESPACE sometablespace", 675 dialect=postgresql.dialect(), 676 ) 677 self.assert_compile( 678 schema.CreateIndex(idx3), 679 "CREATE INDEX test_idx3 ON testtbl " 680 "(data) " 681 'TABLESPACE "another table space"', 682 dialect=postgresql.dialect(), 683 ) 684 685 def test_create_index_with_multiple_options(self): 686 m = MetaData() 687 tbl = Table("testtbl", m, Column("data", String)) 688 689 idx1 = Index( 690 "test_idx1", 691 tbl.c.data, 692 postgresql_using="btree", 693 postgresql_tablespace="atablespace", 694 postgresql_with={"fillfactor": 60}, 695 postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10), 696 ) 697 698 self.assert_compile( 699 schema.CreateIndex(idx1), 700 "CREATE INDEX test_idx1 ON testtbl " 701 "USING btree (data) " 702 "WITH (fillfactor = 60) " 703 "TABLESPACE atablespace " 704 "WHERE data > 5 AND data < 10", 705 dialect=postgresql.dialect(), 706 ) 707 708 def test_create_index_expr_gets_parens(self): 709 m = MetaData() 710 tbl = Table("testtbl", m, Column("x", Integer), Column("y", Integer)) 711 712 idx1 = Index("test_idx1", 5 / (tbl.c.x + tbl.c.y)) 713 self.assert_compile( 714 schema.CreateIndex(idx1), 715 "CREATE INDEX test_idx1 ON testtbl ((5 / (x + y)))", 716 ) 717 718 def test_create_index_literals(self): 719 m = MetaData() 720 tbl = Table("testtbl", m, Column("data", Integer)) 721 722 idx1 = Index("test_idx1", tbl.c.data + 5) 723 self.assert_compile( 724 schema.CreateIndex(idx1), 725 "CREATE INDEX test_idx1 ON testtbl ((data + 5))", 726 ) 727 728 def test_create_index_concurrently(self): 729 m = MetaData() 730 tbl = Table("testtbl", m, Column("data", Integer)) 731 732 idx1 = Index("test_idx1", tbl.c.data, postgresql_concurrently=True) 733 self.assert_compile( 734 schema.CreateIndex(idx1), 735 "CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data)", 736 ) 737 738 dialect_8_1 = postgresql.dialect() 739 dialect_8_1._supports_create_index_concurrently = False 740 self.assert_compile( 741 schema.CreateIndex(idx1), 742 "CREATE INDEX test_idx1 ON testtbl (data)", 743 dialect=dialect_8_1, 744 ) 745 746 def test_drop_index_concurrently(self): 747 m = MetaData() 748 tbl = Table("testtbl", m, Column("data", Integer)) 749 750 idx1 = Index("test_idx1", tbl.c.data, postgresql_concurrently=True) 751 self.assert_compile( 752 schema.DropIndex(idx1), "DROP INDEX CONCURRENTLY test_idx1" 753 ) 754 755 dialect_9_1 = postgresql.dialect() 756 dialect_9_1._supports_drop_index_concurrently = False 757 self.assert_compile( 758 schema.DropIndex(idx1), "DROP INDEX test_idx1", dialect=dialect_9_1 759 ) 760 761 def test_exclude_constraint_min(self): 762 m = MetaData() 763 tbl = Table("testtbl", m, Column("room", Integer, primary_key=True)) 764 cons = ExcludeConstraint(("room", "=")) 765 tbl.append_constraint(cons) 766 self.assert_compile( 767 schema.AddConstraint(cons), 768 "ALTER TABLE testtbl ADD EXCLUDE USING gist " "(room WITH =)", 769 dialect=postgresql.dialect(), 770 ) 771 772 @testing.combinations( 773 (True, "deferred"), 774 (False, "immediate"), 775 argnames="deferrable_value, initially_value", 776 ) 777 def test_copy_exclude_constraint_adhoc_columns( 778 self, deferrable_value, initially_value 779 ): 780 meta = MetaData() 781 table = Table( 782 "mytable", 783 meta, 784 Column("myid", Integer, Sequence("foo_id_seq"), primary_key=True), 785 Column("valid_from_date", Date(), nullable=True), 786 Column("valid_thru_date", Date(), nullable=True), 787 ) 788 cons = ExcludeConstraint( 789 ( 790 literal_column( 791 "daterange(valid_from_date, valid_thru_date, '[]')" 792 ), 793 "&&", 794 ), 795 where=column("valid_from_date") <= column("valid_thru_date"), 796 name="ex_mytable_valid_date_range", 797 deferrable=deferrable_value, 798 initially=initially_value, 799 ) 800 801 table.append_constraint(cons) 802 expected = ( 803 "ALTER TABLE mytable ADD CONSTRAINT ex_mytable_valid_date_range " 804 "EXCLUDE USING gist " 805 "(daterange(valid_from_date, valid_thru_date, '[]') WITH &&) " 806 "WHERE (valid_from_date <= valid_thru_date) " 807 "%s %s" 808 % ( 809 "NOT DEFERRABLE" if not deferrable_value else "DEFERRABLE", 810 "INITIALLY %s" % initially_value, 811 ) 812 ) 813 self.assert_compile( 814 schema.AddConstraint(cons), 815 expected, 816 dialect=postgresql.dialect(), 817 ) 818 819 meta2 = MetaData() 820 table2 = table.tometadata(meta2) 821 cons2 = [ 822 c for c in table2.constraints if isinstance(c, ExcludeConstraint) 823 ][0] 824 self.assert_compile( 825 schema.AddConstraint(cons2), 826 expected, 827 dialect=postgresql.dialect(), 828 ) 829 830 def test_exclude_constraint_full(self): 831 m = MetaData() 832 room = Column("room", Integer, primary_key=True) 833 tbl = Table("testtbl", m, room, Column("during", TSRANGE)) 834 room = Column("room", Integer, primary_key=True) 835 cons = ExcludeConstraint( 836 (room, "="), 837 ("during", "&&"), 838 name="my_name", 839 using="gist", 840 where="room > 100", 841 deferrable=True, 842 initially="immediate", 843 ops={"room": "my_opclass"}, 844 ) 845 tbl.append_constraint(cons) 846 self.assert_compile( 847 schema.AddConstraint(cons), 848 "ALTER TABLE testtbl ADD CONSTRAINT my_name " 849 "EXCLUDE USING gist " 850 "(room my_opclass WITH =, during WITH " 851 "&&) WHERE " 852 "(room > 100) DEFERRABLE INITIALLY immediate", 853 dialect=postgresql.dialect(), 854 ) 855 856 def test_exclude_constraint_copy(self): 857 m = MetaData() 858 cons = ExcludeConstraint(("room", "=")) 859 tbl = Table( 860 "testtbl", m, Column("room", Integer, primary_key=True), cons 861 ) 862 # apparently you can't copy a ColumnCollectionConstraint until 863 # after it has been bound to a table... 864 cons_copy = cons.copy() 865 tbl.append_constraint(cons_copy) 866 self.assert_compile( 867 schema.AddConstraint(cons_copy), 868 "ALTER TABLE testtbl ADD EXCLUDE USING gist " "(room WITH =)", 869 ) 870 871 def test_exclude_constraint_copy_where_using(self): 872 m = MetaData() 873 tbl = Table("testtbl", m, Column("room", Integer, primary_key=True)) 874 cons = ExcludeConstraint( 875 (tbl.c.room, "="), where=tbl.c.room > 5, using="foobar" 876 ) 877 tbl.append_constraint(cons) 878 self.assert_compile( 879 schema.AddConstraint(cons), 880 "ALTER TABLE testtbl ADD EXCLUDE USING foobar " 881 "(room WITH =) WHERE (testtbl.room > 5)", 882 ) 883 884 m2 = MetaData() 885 tbl2 = tbl.tometadata(m2) 886 self.assert_compile( 887 schema.CreateTable(tbl2), 888 "CREATE TABLE testtbl (room SERIAL NOT NULL, " 889 "PRIMARY KEY (room), " 890 "EXCLUDE USING foobar " 891 "(room WITH =) WHERE (testtbl.room > 5))", 892 ) 893 894 def test_exclude_constraint_text(self): 895 m = MetaData() 896 cons = ExcludeConstraint((text("room::TEXT"), "=")) 897 Table("testtbl", m, Column("room", String), cons) 898 self.assert_compile( 899 schema.AddConstraint(cons), 900 "ALTER TABLE testtbl ADD EXCLUDE USING gist " 901 "(room::TEXT WITH =)", 902 ) 903 904 def test_exclude_constraint_colname_needs_quoting(self): 905 m = MetaData() 906 cons = ExcludeConstraint(("Some Column Name", "=")) 907 Table("testtbl", m, Column("Some Column Name", String), cons) 908 self.assert_compile( 909 schema.AddConstraint(cons), 910 "ALTER TABLE testtbl ADD EXCLUDE USING gist " 911 '("Some Column Name" WITH =)', 912 ) 913 914 def test_exclude_constraint_with_using_unusual_conditions(self): 915 m = MetaData() 916 cons = ExcludeConstraint(("q", "="), using="not a keyword") 917 Table("testtbl", m, Column("q", String), cons) 918 assert_raises_message( 919 exc.CompileError, 920 "Unexpected SQL phrase: 'not a keyword'", 921 schema.AddConstraint(cons).compile, 922 dialect=postgresql.dialect(), 923 ) 924 925 def test_exclude_constraint_cast(self): 926 m = MetaData() 927 tbl = Table("testtbl", m, Column("room", String)) 928 cons = ExcludeConstraint((cast(tbl.c.room, Text), "=")) 929 tbl.append_constraint(cons) 930 self.assert_compile( 931 schema.AddConstraint(cons), 932 "ALTER TABLE testtbl ADD EXCLUDE USING gist " 933 "(CAST(room AS TEXT) WITH =)", 934 ) 935 936 def test_exclude_constraint_cast_quote(self): 937 m = MetaData() 938 tbl = Table("testtbl", m, Column("Room", String)) 939 cons = ExcludeConstraint((cast(tbl.c.Room, Text), "=")) 940 tbl.append_constraint(cons) 941 self.assert_compile( 942 schema.AddConstraint(cons), 943 "ALTER TABLE testtbl ADD EXCLUDE USING gist " 944 '(CAST("Room" AS TEXT) WITH =)', 945 ) 946 947 def test_exclude_constraint_when(self): 948 m = MetaData() 949 tbl = Table("testtbl", m, Column("room", String)) 950 cons = ExcludeConstraint(("room", "="), where=tbl.c.room.in_(["12"])) 951 tbl.append_constraint(cons) 952 self.assert_compile( 953 schema.AddConstraint(cons), 954 "ALTER TABLE testtbl ADD EXCLUDE USING gist " 955 "(room WITH =) WHERE (testtbl.room IN ('12'))", 956 dialect=postgresql.dialect(), 957 ) 958 959 def test_exclude_constraint_ops_many(self): 960 m = MetaData() 961 tbl = Table( 962 "testtbl", m, Column("room", String), Column("during", TSRANGE) 963 ) 964 cons = ExcludeConstraint( 965 ("room", "="), 966 ("during", "&&"), 967 ops={"room": "first_opsclass", "during": "second_opclass"}, 968 ) 969 tbl.append_constraint(cons) 970 self.assert_compile( 971 schema.AddConstraint(cons), 972 "ALTER TABLE testtbl ADD EXCLUDE USING gist " 973 "(room first_opsclass WITH =, during second_opclass WITH &&)", 974 dialect=postgresql.dialect(), 975 ) 976 977 def test_substring(self): 978 self.assert_compile( 979 func.substring("abc", 1, 2), 980 "SUBSTRING(%(substring_1)s FROM %(substring_2)s " 981 "FOR %(substring_3)s)", 982 ) 983 self.assert_compile( 984 func.substring("abc", 1), 985 "SUBSTRING(%(substring_1)s FROM %(substring_2)s)", 986 ) 987 988 def test_for_update(self): 989 table1 = table( 990 "mytable", column("myid"), column("name"), column("description") 991 ) 992 993 self.assert_compile( 994 table1.select(table1.c.myid == 7).with_for_update(), 995 "SELECT mytable.myid, mytable.name, mytable.description " 996 "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE", 997 ) 998 999 self.assert_compile( 1000 table1.select(table1.c.myid == 7).with_for_update(nowait=True), 1001 "SELECT mytable.myid, mytable.name, mytable.description " 1002 "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE NOWAIT", 1003 ) 1004 1005 self.assert_compile( 1006 table1.select(table1.c.myid == 7).with_for_update( 1007 skip_locked=True 1008 ), 1009 "SELECT mytable.myid, mytable.name, mytable.description " 1010 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1011 "FOR UPDATE SKIP LOCKED", 1012 ) 1013 1014 self.assert_compile( 1015 table1.select(table1.c.myid == 7).with_for_update(read=True), 1016 "SELECT mytable.myid, mytable.name, mytable.description " 1017 "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE", 1018 ) 1019 1020 self.assert_compile( 1021 table1.select(table1.c.myid == 7).with_for_update( 1022 read=True, nowait=True 1023 ), 1024 "SELECT mytable.myid, mytable.name, mytable.description " 1025 "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE NOWAIT", 1026 ) 1027 1028 self.assert_compile( 1029 table1.select(table1.c.myid == 7).with_for_update( 1030 key_share=True, nowait=True 1031 ), 1032 "SELECT mytable.myid, mytable.name, mytable.description " 1033 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1034 "FOR NO KEY UPDATE NOWAIT", 1035 ) 1036 1037 self.assert_compile( 1038 table1.select(table1.c.myid == 7).with_for_update( 1039 key_share=True, read=True, nowait=True 1040 ), 1041 "SELECT mytable.myid, mytable.name, mytable.description " 1042 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1043 "FOR KEY SHARE NOWAIT", 1044 ) 1045 1046 self.assert_compile( 1047 table1.select(table1.c.myid == 7).with_for_update( 1048 read=True, skip_locked=True 1049 ), 1050 "SELECT mytable.myid, mytable.name, mytable.description " 1051 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1052 "FOR SHARE SKIP LOCKED", 1053 ) 1054 1055 self.assert_compile( 1056 table1.select(table1.c.myid == 7).with_for_update( 1057 of=table1.c.myid 1058 ), 1059 "SELECT mytable.myid, mytable.name, mytable.description " 1060 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1061 "FOR UPDATE OF mytable", 1062 ) 1063 1064 self.assert_compile( 1065 table1.select(table1.c.myid == 7).with_for_update( 1066 read=True, nowait=True, of=table1 1067 ), 1068 "SELECT mytable.myid, mytable.name, mytable.description " 1069 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1070 "FOR SHARE OF mytable NOWAIT", 1071 ) 1072 1073 self.assert_compile( 1074 table1.select(table1.c.myid == 7).with_for_update( 1075 key_share=True, read=True, nowait=True, of=table1 1076 ), 1077 "SELECT mytable.myid, mytable.name, mytable.description " 1078 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1079 "FOR KEY SHARE OF mytable NOWAIT", 1080 ) 1081 1082 self.assert_compile( 1083 table1.select(table1.c.myid == 7).with_for_update( 1084 read=True, nowait=True, of=table1.c.myid 1085 ), 1086 "SELECT mytable.myid, mytable.name, mytable.description " 1087 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1088 "FOR SHARE OF mytable NOWAIT", 1089 ) 1090 1091 self.assert_compile( 1092 table1.select(table1.c.myid == 7).with_for_update( 1093 read=True, nowait=True, of=[table1.c.myid, table1.c.name] 1094 ), 1095 "SELECT mytable.myid, mytable.name, mytable.description " 1096 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1097 "FOR SHARE OF mytable NOWAIT", 1098 ) 1099 1100 self.assert_compile( 1101 table1.select(table1.c.myid == 7).with_for_update( 1102 read=True, 1103 skip_locked=True, 1104 of=[table1.c.myid, table1.c.name], 1105 key_share=True, 1106 ), 1107 "SELECT mytable.myid, mytable.name, mytable.description " 1108 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1109 "FOR KEY SHARE OF mytable SKIP LOCKED", 1110 ) 1111 1112 self.assert_compile( 1113 table1.select(table1.c.myid == 7).with_for_update( 1114 skip_locked=True, of=[table1.c.myid, table1.c.name] 1115 ), 1116 "SELECT mytable.myid, mytable.name, mytable.description " 1117 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1118 "FOR UPDATE OF mytable SKIP LOCKED", 1119 ) 1120 1121 self.assert_compile( 1122 table1.select(table1.c.myid == 7).with_for_update( 1123 read=True, skip_locked=True, of=[table1.c.myid, table1.c.name] 1124 ), 1125 "SELECT mytable.myid, mytable.name, mytable.description " 1126 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1127 "FOR SHARE OF mytable SKIP LOCKED", 1128 ) 1129 1130 self.assert_compile( 1131 table1.select(table1.c.myid == 7).with_for_update( 1132 key_share=True, nowait=True, of=[table1.c.myid, table1.c.name] 1133 ), 1134 "SELECT mytable.myid, mytable.name, mytable.description " 1135 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1136 "FOR NO KEY UPDATE OF mytable NOWAIT", 1137 ) 1138 1139 self.assert_compile( 1140 table1.select(table1.c.myid == 7).with_for_update( 1141 key_share=True, 1142 skip_locked=True, 1143 of=[table1.c.myid, table1.c.name], 1144 ), 1145 "SELECT mytable.myid, mytable.name, mytable.description " 1146 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1147 "FOR NO KEY UPDATE OF mytable SKIP LOCKED", 1148 ) 1149 1150 self.assert_compile( 1151 table1.select(table1.c.myid == 7).with_for_update( 1152 key_share=True, of=[table1.c.myid, table1.c.name] 1153 ), 1154 "SELECT mytable.myid, mytable.name, mytable.description " 1155 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1156 "FOR NO KEY UPDATE OF mytable", 1157 ) 1158 1159 self.assert_compile( 1160 table1.select(table1.c.myid == 7).with_for_update(key_share=True), 1161 "SELECT mytable.myid, mytable.name, mytable.description " 1162 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1163 "FOR NO KEY UPDATE", 1164 ) 1165 1166 self.assert_compile( 1167 table1.select(table1.c.myid == 7).with_for_update( 1168 read=True, key_share=True 1169 ), 1170 "SELECT mytable.myid, mytable.name, mytable.description " 1171 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1172 "FOR KEY SHARE", 1173 ) 1174 1175 self.assert_compile( 1176 table1.select(table1.c.myid == 7).with_for_update( 1177 read=True, key_share=True, of=table1 1178 ), 1179 "SELECT mytable.myid, mytable.name, mytable.description " 1180 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1181 "FOR KEY SHARE OF mytable", 1182 ) 1183 1184 self.assert_compile( 1185 table1.select(table1.c.myid == 7).with_for_update( 1186 read=True, of=table1 1187 ), 1188 "SELECT mytable.myid, mytable.name, mytable.description " 1189 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1190 "FOR SHARE OF mytable", 1191 ) 1192 1193 self.assert_compile( 1194 table1.select(table1.c.myid == 7).with_for_update( 1195 read=True, key_share=True, skip_locked=True 1196 ), 1197 "SELECT mytable.myid, mytable.name, mytable.description " 1198 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1199 "FOR KEY SHARE SKIP LOCKED", 1200 ) 1201 1202 self.assert_compile( 1203 table1.select(table1.c.myid == 7).with_for_update( 1204 key_share=True, skip_locked=True 1205 ), 1206 "SELECT mytable.myid, mytable.name, mytable.description " 1207 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1208 "FOR NO KEY UPDATE SKIP LOCKED", 1209 ) 1210 1211 ta = table1.alias() 1212 self.assert_compile( 1213 ta.select(ta.c.myid == 7).with_for_update( 1214 of=[ta.c.myid, ta.c.name] 1215 ), 1216 "SELECT mytable_1.myid, mytable_1.name, mytable_1.description " 1217 "FROM mytable AS mytable_1 " 1218 "WHERE mytable_1.myid = %(myid_1)s FOR UPDATE OF mytable_1", 1219 ) 1220 1221 table2 = table("table2", column("mytable_id")) 1222 join = table2.join(table1, table2.c.mytable_id == table1.c.myid) 1223 self.assert_compile( 1224 join.select(table2.c.mytable_id == 7).with_for_update(of=[join]), 1225 "SELECT table2.mytable_id, " 1226 "mytable.myid, mytable.name, mytable.description " 1227 "FROM table2 " 1228 "JOIN mytable ON table2.mytable_id = mytable.myid " 1229 "WHERE table2.mytable_id = %(mytable_id_1)s " 1230 "FOR UPDATE OF mytable, table2", 1231 ) 1232 1233 join = table2.join(ta, table2.c.mytable_id == ta.c.myid) 1234 self.assert_compile( 1235 join.select(table2.c.mytable_id == 7).with_for_update(of=[join]), 1236 "SELECT table2.mytable_id, " 1237 "mytable_1.myid, mytable_1.name, mytable_1.description " 1238 "FROM table2 " 1239 "JOIN mytable AS mytable_1 " 1240 "ON table2.mytable_id = mytable_1.myid " 1241 "WHERE table2.mytable_id = %(mytable_id_1)s " 1242 "FOR UPDATE OF mytable_1, table2", 1243 ) 1244 1245 # ensure of=text() for of works 1246 self.assert_compile( 1247 table1.select(table1.c.myid == 7).with_for_update( 1248 of=text("table1") 1249 ), 1250 "SELECT mytable.myid, mytable.name, mytable.description " 1251 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1252 "FOR UPDATE OF table1", 1253 ) 1254 1255 # ensure literal_column of works 1256 self.assert_compile( 1257 table1.select(table1.c.myid == 7).with_for_update( 1258 of=literal_column("table1") 1259 ), 1260 "SELECT mytable.myid, mytable.name, mytable.description " 1261 "FROM mytable WHERE mytable.myid = %(myid_1)s " 1262 "FOR UPDATE OF table1", 1263 ) 1264 1265 def test_for_update_with_schema(self): 1266 m = MetaData() 1267 table1 = Table( 1268 "mytable", m, Column("myid"), Column("name"), schema="testschema" 1269 ) 1270 1271 self.assert_compile( 1272 table1.select(table1.c.myid == 7).with_for_update(of=table1), 1273 "SELECT testschema.mytable.myid, testschema.mytable.name " 1274 "FROM testschema.mytable " 1275 "WHERE testschema.mytable.myid = %(myid_1)s " 1276 "FOR UPDATE OF mytable", 1277 ) 1278 1279 def test_reserved_words(self): 1280 table = Table( 1281 "pg_table", 1282 MetaData(), 1283 Column("col1", Integer), 1284 Column("variadic", Integer), 1285 ) 1286 x = select([table.c.col1, table.c.variadic]) 1287 1288 self.assert_compile( 1289 x, """SELECT pg_table.col1, pg_table."variadic" FROM pg_table""" 1290 ) 1291 1292 def test_array(self): 1293 c = Column("x", postgresql.ARRAY(Integer)) 1294 1295 self.assert_compile( 1296 cast(c, postgresql.ARRAY(Integer)), "CAST(x AS INTEGER[])" 1297 ) 1298 self.assert_compile(c[5], "x[%(x_1)s]", checkparams={"x_1": 5}) 1299 1300 self.assert_compile( 1301 c[5:7], "x[%(x_1)s:%(x_2)s]", checkparams={"x_2": 7, "x_1": 5} 1302 ) 1303 self.assert_compile( 1304 c[5:7][2:3], 1305 "x[%(x_1)s:%(x_2)s][%(param_1)s:%(param_2)s]", 1306 checkparams={"x_2": 7, "x_1": 5, "param_1": 2, "param_2": 3}, 1307 ) 1308 self.assert_compile( 1309 c[5:7][3], 1310 "x[%(x_1)s:%(x_2)s][%(param_1)s]", 1311 checkparams={"x_2": 7, "x_1": 5, "param_1": 3}, 1312 ) 1313 1314 self.assert_compile( 1315 c.contains([1]), 1316 "x @> %(x_1)s::INTEGER[]", 1317 checkparams={"x_1": [1]}, 1318 dialect=PGDialect_psycopg2(), 1319 ) 1320 self.assert_compile( 1321 c.contained_by([2]), 1322 "x <@ %(x_1)s::INTEGER[]", 1323 checkparams={"x_1": [2]}, 1324 dialect=PGDialect_psycopg2(), 1325 ) 1326 self.assert_compile( 1327 c.contained_by([2]), 1328 "x <@ %(x_1)s", 1329 checkparams={"x_1": [2]}, 1330 dialect=PGDialect(), 1331 ) 1332 self.assert_compile( 1333 c.overlap([3]), 1334 "x && %(x_1)s::INTEGER[]", 1335 checkparams={"x_1": [3]}, 1336 dialect=PGDialect_psycopg2(), 1337 ) 1338 self.assert_compile( 1339 postgresql.Any(4, c), 1340 "%(param_1)s = ANY (x)", 1341 checkparams={"param_1": 4}, 1342 ) 1343 1344 self.assert_compile( 1345 c.any(5), 1346 "%(param_1)s = ANY (x)", 1347 checkparams={"param_1": 5}, 1348 ) 1349 1350 self.assert_compile( 1351 ~c.any(5), 1352 "NOT (%(param_1)s = ANY (x))", 1353 checkparams={"param_1": 5}, 1354 ) 1355 1356 self.assert_compile( 1357 c.all(5), 1358 "%(param_1)s = ALL (x)", 1359 checkparams={"param_1": 5}, 1360 ) 1361 1362 self.assert_compile( 1363 ~c.all(5), 1364 "NOT (%(param_1)s = ALL (x))", 1365 checkparams={"param_1": 5}, 1366 ) 1367 1368 self.assert_compile( 1369 c.any(5, operator=operators.ne), 1370 "%(param_1)s != ANY (x)", 1371 checkparams={"param_1": 5}, 1372 ) 1373 self.assert_compile( 1374 postgresql.All(6, c, operator=operators.gt), 1375 "%(param_1)s > ALL (x)", 1376 checkparams={"param_1": 6}, 1377 ) 1378 self.assert_compile( 1379 c.all(7, operator=operators.lt), 1380 "%(param_1)s < ALL (x)", 1381 checkparams={"param_1": 7}, 1382 ) 1383 1384 @testing.combinations((True,), (False,)) 1385 def test_array_zero_indexes(self, zero_indexes): 1386 c = Column("x", postgresql.ARRAY(Integer, zero_indexes=zero_indexes)) 1387 1388 add_one = 1 if zero_indexes else 0 1389 1390 self.assert_compile( 1391 cast(c, postgresql.ARRAY(Integer, zero_indexes=zero_indexes)), 1392 "CAST(x AS INTEGER[])", 1393 ) 1394 self.assert_compile( 1395 c[5], "x[%(x_1)s]", checkparams={"x_1": 5 + add_one} 1396 ) 1397 1398 self.assert_compile( 1399 c[5:7], 1400 "x[%(x_1)s:%(x_2)s]", 1401 checkparams={"x_2": 7 + add_one, "x_1": 5 + add_one}, 1402 ) 1403 self.assert_compile( 1404 c[5:7][2:3], 1405 "x[%(x_1)s:%(x_2)s][%(param_1)s:%(param_2)s]", 1406 checkparams={ 1407 "x_2": 7 + add_one, 1408 "x_1": 5 + add_one, 1409 "param_1": 2 + add_one, 1410 "param_2": 3 + add_one, 1411 }, 1412 ) 1413 self.assert_compile( 1414 c[5:7][3], 1415 "x[%(x_1)s:%(x_2)s][%(param_1)s]", 1416 checkparams={ 1417 "x_2": 7 + add_one, 1418 "x_1": 5 + add_one, 1419 "param_1": 3 + add_one, 1420 }, 1421 ) 1422 1423 def test_array_literal_type(self): 1424 isinstance(postgresql.array([1, 2]).type, postgresql.ARRAY) 1425 is_(postgresql.array([1, 2]).type.item_type._type_affinity, Integer) 1426 1427 is_( 1428 postgresql.array( 1429 [1, 2], type_=String 1430 ).type.item_type._type_affinity, 1431 String, 1432 ) 1433 1434 def test_array_literal(self): 1435 self.assert_compile( 1436 func.array_dims( 1437 postgresql.array([1, 2]) + postgresql.array([3, 4, 5]) 1438 ), 1439 "array_dims(ARRAY[%(param_1)s, %(param_2)s] || " 1440 "ARRAY[%(param_3)s, %(param_4)s, %(param_5)s])", 1441 checkparams={ 1442 "param_5": 5, 1443 "param_4": 4, 1444 "param_1": 1, 1445 "param_3": 3, 1446 "param_2": 2, 1447 }, 1448 ) 1449 1450 def test_array_literal_compare(self): 1451 self.assert_compile( 1452 postgresql.array([1, 2]) == [3, 4, 5], 1453 "ARRAY[%(param_1)s, %(param_2)s] = " 1454 "ARRAY[%(param_3)s, %(param_4)s, %(param_5)s]", 1455 checkparams={ 1456 "param_5": 5, 1457 "param_4": 4, 1458 "param_1": 1, 1459 "param_3": 3, 1460 "param_2": 2, 1461 }, 1462 ) 1463 1464 def test_array_literal_contains(self): 1465 self.assert_compile( 1466 postgresql.array([1, 2]).contains([3, 4, 5]), 1467 "ARRAY[%(param_1)s, %(param_2)s] @> ARRAY[%(param_3)s, " 1468 "%(param_4)s, %(param_5)s]", 1469 checkparams={ 1470 "param_1": 1, 1471 "param_2": 2, 1472 "param_3": 3, 1473 "param_4": 4, 1474 "param_5": 5, 1475 }, 1476 ) 1477 1478 self.assert_compile( 1479 postgresql.array(["a", "b"]).contains([""]), 1480 "ARRAY[%(param_1)s, %(param_2)s] @> ARRAY[%(param_3)s]", 1481 checkparams={"param_1": "a", "param_2": "b", "param_3": ""}, 1482 ) 1483 1484 self.assert_compile( 1485 postgresql.array(["a", "b"]).contains([]), 1486 "ARRAY[%(param_1)s, %(param_2)s] @> ARRAY[]", 1487 checkparams={"param_1": "a", "param_2": "b"}, 1488 ) 1489 1490 self.assert_compile( 1491 postgresql.array(["a", "b"]).contains([0]), 1492 "ARRAY[%(param_1)s, %(param_2)s] @> ARRAY[%(param_3)s]", 1493 checkparams={"param_1": "a", "param_2": "b", "param_3": 0}, 1494 ) 1495 1496 def test_array_literal_contained_by(self): 1497 self.assert_compile( 1498 postgresql.array(["a", "b"]).contained_by(["a", "b", "c"]), 1499 "ARRAY[%(param_1)s, %(param_2)s] <@ ARRAY[%(param_3)s, " 1500 "%(param_4)s, %(param_5)s]", 1501 checkparams={ 1502 "param_1": "a", 1503 "param_2": "b", 1504 "param_3": "a", 1505 "param_4": "b", 1506 "param_5": "c", 1507 }, 1508 ) 1509 1510 self.assert_compile( 1511 postgresql.array([1, 2]).contained_by([3, 4, 5]), 1512 "ARRAY[%(param_1)s, %(param_2)s] <@ ARRAY[%(param_3)s, " 1513 "%(param_4)s, %(param_5)s]", 1514 checkparams={ 1515 "param_1": 1, 1516 "param_2": 2, 1517 "param_3": 3, 1518 "param_4": 4, 1519 "param_5": 5, 1520 }, 1521 ) 1522 1523 self.assert_compile( 1524 postgresql.array(["a", "b"]).contained_by([""]), 1525 "ARRAY[%(param_1)s, %(param_2)s] <@ ARRAY[%(param_3)s]", 1526 checkparams={"param_1": "a", "param_2": "b", "param_3": ""}, 1527 ) 1528 1529 self.assert_compile( 1530 postgresql.array(["a", "b"]).contained_by([]), 1531 "ARRAY[%(param_1)s, %(param_2)s] <@ ARRAY[]", 1532 checkparams={"param_1": "a", "param_2": "b"}, 1533 ) 1534 1535 self.assert_compile( 1536 postgresql.array(["a", "b"]).contained_by([0]), 1537 "ARRAY[%(param_1)s, %(param_2)s] <@ ARRAY[%(param_3)s]", 1538 checkparams={"param_1": "a", "param_2": "b", "param_3": 0}, 1539 ) 1540 1541 def test_array_literal_insert(self): 1542 m = MetaData() 1543 t = Table("t", m, Column("data", postgresql.ARRAY(Integer))) 1544 self.assert_compile( 1545 t.insert().values(data=array([1, 2, 3])), 1546 "INSERT INTO t (data) VALUES (ARRAY[%(param_1)s, " 1547 "%(param_2)s, %(param_3)s])", 1548 ) 1549 1550 def test_update_array(self): 1551 m = MetaData() 1552 t = Table("t", m, Column("data", postgresql.ARRAY(Integer))) 1553 self.assert_compile( 1554 t.update().values({t.c.data: [1, 3, 4]}), 1555 "UPDATE t SET data=%(data)s::INTEGER[]", 1556 checkparams={"data": [1, 3, 4]}, 1557 ) 1558 1559 def test_update_array_element(self): 1560 m = MetaData() 1561 t = Table("t", m, Column("data", postgresql.ARRAY(Integer))) 1562 self.assert_compile( 1563 t.update().values({t.c.data[5]: 1}), 1564 "UPDATE t SET data[%(data_1)s]=%(param_1)s", 1565 checkparams={"data_1": 5, "param_1": 1}, 1566 ) 1567 1568 def test_update_array_slice(self): 1569 m = MetaData() 1570 t = Table("t", m, Column("data", postgresql.ARRAY(Integer))) 1571 1572 # psycopg2-specific, has a cast 1573 self.assert_compile( 1574 t.update().values({t.c.data[2:5]: [2, 3, 4]}), 1575 "UPDATE t SET data[%(data_1)s:%(data_2)s]=" 1576 "%(param_1)s::INTEGER[]", 1577 checkparams={"param_1": [2, 3, 4], "data_2": 5, "data_1": 2}, 1578 dialect=PGDialect_psycopg2(), 1579 ) 1580 1581 # default dialect does not, as DBAPIs may be doing this for us 1582 self.assert_compile( 1583 t.update().values({t.c.data[2:5]: [2, 3, 4]}), 1584 "UPDATE t SET data[%s:%s]=" "%s", 1585 checkparams={"param_1": [2, 3, 4], "data_2": 5, "data_1": 2}, 1586 dialect=PGDialect(paramstyle="format"), 1587 ) 1588 1589 def test_from_only(self): 1590 m = MetaData() 1591 tbl1 = Table("testtbl1", m, Column("id", Integer)) 1592 tbl2 = Table("testtbl2", m, Column("id", Integer)) 1593 1594 stmt = tbl1.select().with_hint(tbl1, "ONLY", "postgresql") 1595 expected = "SELECT testtbl1.id FROM ONLY testtbl1" 1596 self.assert_compile(stmt, expected) 1597 1598 talias1 = tbl1.alias("foo") 1599 stmt = talias1.select().with_hint(talias1, "ONLY", "postgresql") 1600 expected = "SELECT foo.id FROM ONLY testtbl1 AS foo" 1601 self.assert_compile(stmt, expected) 1602 1603 stmt = select([tbl1, tbl2]).with_hint(tbl1, "ONLY", "postgresql") 1604 expected = ( 1605 "SELECT testtbl1.id, testtbl2.id FROM ONLY testtbl1, " "testtbl2" 1606 ) 1607 self.assert_compile(stmt, expected) 1608 1609 stmt = select([tbl1, tbl2]).with_hint(tbl2, "ONLY", "postgresql") 1610 expected = ( 1611 "SELECT testtbl1.id, testtbl2.id FROM testtbl1, ONLY " "testtbl2" 1612 ) 1613 self.assert_compile(stmt, expected) 1614 1615 stmt = select([tbl1, tbl2]) 1616 stmt = stmt.with_hint(tbl1, "ONLY", "postgresql") 1617 stmt = stmt.with_hint(tbl2, "ONLY", "postgresql") 1618 expected = ( 1619 "SELECT testtbl1.id, testtbl2.id FROM ONLY testtbl1, " 1620 "ONLY testtbl2" 1621 ) 1622 self.assert_compile(stmt, expected) 1623 1624 stmt = update(tbl1, values=dict(id=1)) 1625 stmt = stmt.with_hint("ONLY", dialect_name="postgresql") 1626 expected = "UPDATE ONLY testtbl1 SET id=%(id)s" 1627 self.assert_compile(stmt, expected) 1628 1629 stmt = delete(tbl1).with_hint( 1630 "ONLY", selectable=tbl1, dialect_name="postgresql" 1631 ) 1632 expected = "DELETE FROM ONLY testtbl1" 1633 self.assert_compile(stmt, expected) 1634 1635 tbl3 = Table("testtbl3", m, Column("id", Integer), schema="testschema") 1636 stmt = tbl3.select().with_hint(tbl3, "ONLY", "postgresql") 1637 expected = ( 1638 "SELECT testschema.testtbl3.id FROM " "ONLY testschema.testtbl3" 1639 ) 1640 self.assert_compile(stmt, expected) 1641 1642 assert_raises( 1643 exc.CompileError, 1644 tbl3.select().with_hint(tbl3, "FAKE", "postgresql").compile, 1645 dialect=postgresql.dialect(), 1646 ) 1647 1648 def test_aggregate_order_by_one(self): 1649 m = MetaData() 1650 table = Table("table1", m, Column("a", Integer), Column("b", Integer)) 1651 expr = func.array_agg(aggregate_order_by(table.c.a, table.c.b.desc())) 1652 stmt = select([expr]) 1653 1654 # note this tests that the object exports FROM objects 1655 # correctly 1656 self.assert_compile( 1657 stmt, 1658 "SELECT array_agg(table1.a ORDER BY table1.b DESC) " 1659 "AS array_agg_1 FROM table1", 1660 ) 1661 1662 def test_aggregate_order_by_two(self): 1663 m = MetaData() 1664 table = Table("table1", m, Column("a", Integer), Column("b", Integer)) 1665 expr = func.string_agg( 1666 table.c.a, aggregate_order_by(literal_column("','"), table.c.a) 1667 ) 1668 stmt = select([expr]) 1669 1670 self.assert_compile( 1671 stmt, 1672 "SELECT string_agg(table1.a, ',' ORDER BY table1.a) " 1673 "AS string_agg_1 FROM table1", 1674 ) 1675 1676 def test_aggregate_order_by_multi_col(self): 1677 m = MetaData() 1678 table = Table("table1", m, Column("a", Integer), Column("b", Integer)) 1679 expr = func.string_agg( 1680 table.c.a, 1681 aggregate_order_by( 1682 literal_column("','"), table.c.a, table.c.b.desc() 1683 ), 1684 ) 1685 stmt = select([expr]) 1686 1687 self.assert_compile( 1688 stmt, 1689 "SELECT string_agg(table1.a, " 1690 "',' ORDER BY table1.a, table1.b DESC) " 1691 "AS string_agg_1 FROM table1", 1692 ) 1693 1694 def test_aggregate_orcer_by_no_arg(self): 1695 assert_raises_message( 1696 TypeError, 1697 "at least one ORDER BY element is required", 1698 aggregate_order_by, 1699 literal_column("','"), 1700 ) 1701 1702 def test_pg_array_agg_implicit_pg_array(self): 1703 1704 expr = pg_array_agg(column("data", Integer)) 1705 assert isinstance(expr.type, PG_ARRAY) 1706 is_(expr.type.item_type._type_affinity, Integer) 1707 1708 def test_pg_array_agg_uses_base_array(self): 1709 1710 expr = pg_array_agg(column("data", sqltypes.ARRAY(Integer))) 1711 assert isinstance(expr.type, sqltypes.ARRAY) 1712 assert not isinstance(expr.type, PG_ARRAY) 1713 is_(expr.type.item_type._type_affinity, Integer) 1714 1715 def test_pg_array_agg_uses_pg_array(self): 1716 1717 expr = pg_array_agg(column("data", PG_ARRAY(Integer))) 1718 assert isinstance(expr.type, PG_ARRAY) 1719 is_(expr.type.item_type._type_affinity, Integer) 1720 1721 def test_pg_array_agg_explicit_base_array(self): 1722 1723 expr = pg_array_agg( 1724 column("data", sqltypes.ARRAY(Integer)), 1725 type_=sqltypes.ARRAY(Integer), 1726 ) 1727 assert isinstance(expr.type, sqltypes.ARRAY) 1728 assert not isinstance(expr.type, PG_ARRAY) 1729 is_(expr.type.item_type._type_affinity, Integer) 1730 1731 def test_pg_array_agg_explicit_pg_array(self): 1732 1733 expr = pg_array_agg( 1734 column("data", sqltypes.ARRAY(Integer)), type_=PG_ARRAY(Integer) 1735 ) 1736 assert isinstance(expr.type, PG_ARRAY) 1737 is_(expr.type.item_type._type_affinity, Integer) 1738 1739 def test_aggregate_order_by_adapt(self): 1740 m = MetaData() 1741 table = Table("table1", m, Column("a", Integer), Column("b", Integer)) 1742 expr = func.array_agg(aggregate_order_by(table.c.a, table.c.b.desc())) 1743 stmt = select([expr]) 1744 1745 a1 = table.alias("foo") 1746 stmt2 = sql_util.ClauseAdapter(a1).traverse(stmt) 1747 self.assert_compile( 1748 stmt2, 1749 "SELECT array_agg(foo.a ORDER BY foo.b DESC) AS array_agg_1 " 1750 "FROM table1 AS foo", 1751 ) 1752 1753 def test_array_agg_w_filter_subscript(self): 1754 series = func.generate_series(1, 100).alias("series") 1755 series_col = column("series") 1756 query = select( 1757 [func.array_agg(series_col).filter(series_col % 2 == 0)[3]] 1758 ).select_from(series) 1759 self.assert_compile( 1760 query, 1761 "SELECT (array_agg(series) FILTER " 1762 "(WHERE series %% %(series_1)s = %(param_1)s))[%(param_2)s] " 1763 "AS anon_1 FROM " 1764 "generate_series(%(generate_series_1)s, %(generate_series_2)s) " 1765 "AS series", 1766 ) 1767 1768 def test_delete_extra_froms(self): 1769 t1 = table("t1", column("c1")) 1770 t2 = table("t2", column("c1")) 1771 q = delete(t1).where(t1.c.c1 == t2.c.c1) 1772 self.assert_compile(q, "DELETE FROM t1 USING t2 WHERE t1.c1 = t2.c1") 1773 1774 def test_delete_extra_froms_alias(self): 1775 a1 = table("t1", column("c1")).alias("a1") 1776 t2 = table("t2", column("c1")) 1777 q = delete(a1).where(a1.c.c1 == t2.c.c1) 1778 self.assert_compile( 1779 q, "DELETE FROM t1 AS a1 USING t2 WHERE a1.c1 = t2.c1" 1780 ) 1781 1782 @testing.combinations( 1783 ("no_persisted", " STORED", "ignore"), 1784 ("persisted_none", " STORED", None), 1785 ("persisted_true", " STORED", True), 1786 id_="iaa", 1787 ) 1788 def test_column_computed(self, text, persisted): 1789 m = MetaData() 1790 kwargs = {"persisted": persisted} if persisted != "ignore" else {} 1791 t = Table( 1792 "t", 1793 m, 1794 Column("x", Integer), 1795 Column("y", Integer, Computed("x + 2", **kwargs)), 1796 ) 1797 self.assert_compile( 1798 schema.CreateTable(t), 1799 "CREATE TABLE t (x INTEGER, y INTEGER GENERATED " 1800 "ALWAYS AS (x + 2)%s)" % text, 1801 ) 1802 1803 def test_column_computed_persisted_false(self): 1804 m = MetaData() 1805 t = Table( 1806 "t", 1807 m, 1808 Column("x", Integer), 1809 Column("y", Integer, Computed("x + 2", persisted=False)), 1810 ) 1811 assert_raises_message( 1812 exc.CompileError, 1813 "PostrgreSQL computed columns do not support 'virtual'", 1814 schema.CreateTable(t).compile, 1815 dialect=postgresql.dialect(), 1816 ) 1817 1818 1819class InsertOnConflictTest(fixtures.TestBase, AssertsCompiledSQL): 1820 __dialect__ = postgresql.dialect() 1821 1822 def setup(self): 1823 self.table1 = table1 = table( 1824 "mytable", 1825 column("myid", Integer), 1826 column("name", String(128)), 1827 column("description", String(128)), 1828 ) 1829 md = MetaData() 1830 self.table_with_metadata = Table( 1831 "mytable", 1832 md, 1833 Column("myid", Integer, primary_key=True), 1834 Column("name", String(128)), 1835 Column("description", String(128)), 1836 ) 1837 self.unique_constr = schema.UniqueConstraint( 1838 table1.c.name, name="uq_name" 1839 ) 1840 self.excl_constr = ExcludeConstraint( 1841 (table1.c.name, "="), 1842 (table1.c.description, "&&"), 1843 name="excl_thing", 1844 ) 1845 self.excl_constr_anon = ExcludeConstraint( 1846 (self.table_with_metadata.c.name, "="), 1847 (self.table_with_metadata.c.description, "&&"), 1848 where=self.table_with_metadata.c.description != "foo", 1849 ) 1850 self.goofy_index = Index( 1851 "goofy_index", table1.c.name, postgresql_where=table1.c.name > "m" 1852 ) 1853 1854 def test_do_nothing_no_target(self): 1855 1856 i = insert( 1857 self.table1, values=dict(name="foo") 1858 ).on_conflict_do_nothing() 1859 self.assert_compile( 1860 i, 1861 "INSERT INTO mytable (name) VALUES " 1862 "(%(name)s) ON CONFLICT DO NOTHING", 1863 ) 1864 1865 def test_do_nothing_index_elements_target(self): 1866 1867 i = insert( 1868 self.table1, values=dict(name="foo") 1869 ).on_conflict_do_nothing(index_elements=["myid"]) 1870 self.assert_compile( 1871 i, 1872 "INSERT INTO mytable (name) VALUES " 1873 "(%(name)s) ON CONFLICT (myid) DO NOTHING", 1874 ) 1875 1876 def test_do_update_set_clause_none(self): 1877 i = insert(self.table_with_metadata).values(myid=1, name="foo") 1878 i = i.on_conflict_do_update( 1879 index_elements=["myid"], 1880 set_=OrderedDict([("name", "I'm a name"), ("description", None)]), 1881 ) 1882 self.assert_compile( 1883 i, 1884 "INSERT INTO mytable (myid, name) VALUES " 1885 "(%(myid)s, %(name)s) ON CONFLICT (myid) " 1886 "DO UPDATE SET name = %(param_1)s, " 1887 "description = %(param_2)s", 1888 { 1889 "myid": 1, 1890 "name": "foo", 1891 "param_1": "I'm a name", 1892 "param_2": None, 1893 }, 1894 ) 1895 1896 def test_do_update_set_clause_literal(self): 1897 i = insert(self.table_with_metadata).values(myid=1, name="foo") 1898 i = i.on_conflict_do_update( 1899 index_elements=["myid"], 1900 set_=OrderedDict( 1901 [("name", "I'm a name"), ("description", null())] 1902 ), 1903 ) 1904 self.assert_compile( 1905 i, 1906 "INSERT INTO mytable (myid, name) VALUES " 1907 "(%(myid)s, %(name)s) ON CONFLICT (myid) " 1908 "DO UPDATE SET name = %(param_1)s, " 1909 "description = NULL", 1910 {"myid": 1, "name": "foo", "param_1": "I'm a name"}, 1911 ) 1912 1913 def test_do_update_str_index_elements_target_one(self): 1914 i = insert(self.table_with_metadata).values(myid=1, name="foo") 1915 i = i.on_conflict_do_update( 1916 index_elements=["myid"], 1917 set_=OrderedDict( 1918 [ 1919 ("name", i.excluded.name), 1920 ("description", i.excluded.description), 1921 ] 1922 ), 1923 ) 1924 self.assert_compile( 1925 i, 1926 "INSERT INTO mytable (myid, name) VALUES " 1927 "(%(myid)s, %(name)s) ON CONFLICT (myid) " 1928 "DO UPDATE SET name = excluded.name, " 1929 "description = excluded.description", 1930 ) 1931 1932 def test_do_update_str_index_elements_target_two(self): 1933 i = insert(self.table1, values=dict(name="foo")) 1934 i = i.on_conflict_do_update( 1935 index_elements=["myid"], set_=dict(name=i.excluded.name) 1936 ) 1937 self.assert_compile( 1938 i, 1939 "INSERT INTO mytable (name) VALUES " 1940 "(%(name)s) ON CONFLICT (myid) " 1941 "DO UPDATE SET name = excluded.name", 1942 ) 1943 1944 def test_do_update_col_index_elements_target(self): 1945 i = insert(self.table1, values=dict(name="foo")) 1946 i = i.on_conflict_do_update( 1947 index_elements=[self.table1.c.myid], 1948 set_=dict(name=i.excluded.name), 1949 ) 1950 self.assert_compile( 1951 i, 1952 "INSERT INTO mytable (name) VALUES " 1953 "(%(name)s) ON CONFLICT (myid) " 1954 "DO UPDATE SET name = excluded.name", 1955 ) 1956 1957 def test_do_update_unnamed_pk_constraint_target(self): 1958 i = insert(self.table_with_metadata, values=dict(myid=1, name="foo")) 1959 i = i.on_conflict_do_update( 1960 constraint=self.table_with_metadata.primary_key, 1961 set_=dict(name=i.excluded.name), 1962 ) 1963 self.assert_compile( 1964 i, 1965 "INSERT INTO mytable (myid, name) VALUES " 1966 "(%(myid)s, %(name)s) ON CONFLICT (myid) " 1967 "DO UPDATE SET name = excluded.name", 1968 ) 1969 1970 def test_do_update_pk_constraint_index_elements_target(self): 1971 i = insert(self.table_with_metadata, values=dict(myid=1, name="foo")) 1972 i = i.on_conflict_do_update( 1973 index_elements=self.table_with_metadata.primary_key, 1974 set_=dict(name=i.excluded.name), 1975 ) 1976 self.assert_compile( 1977 i, 1978 "INSERT INTO mytable (myid, name) VALUES " 1979 "(%(myid)s, %(name)s) ON CONFLICT (myid) " 1980 "DO UPDATE SET name = excluded.name", 1981 ) 1982 1983 def test_do_update_named_unique_constraint_target(self): 1984 i = insert(self.table1, values=dict(name="foo")) 1985 i = i.on_conflict_do_update( 1986 constraint=self.unique_constr, set_=dict(myid=i.excluded.myid) 1987 ) 1988 self.assert_compile( 1989 i, 1990 "INSERT INTO mytable (name) VALUES " 1991 "(%(name)s) ON CONFLICT ON CONSTRAINT uq_name " 1992 "DO UPDATE SET myid = excluded.myid", 1993 ) 1994 1995 def test_do_update_string_constraint_target(self): 1996 i = insert(self.table1, values=dict(name="foo")) 1997 i = i.on_conflict_do_update( 1998 constraint=self.unique_constr.name, set_=dict(myid=i.excluded.myid) 1999 ) 2000 self.assert_compile( 2001 i, 2002 "INSERT INTO mytable (name) VALUES " 2003 "(%(name)s) ON CONFLICT ON CONSTRAINT uq_name " 2004 "DO UPDATE SET myid = excluded.myid", 2005 ) 2006 2007 def test_do_update_index_elements_where_target(self): 2008 i = insert(self.table1, values=dict(name="foo")) 2009 i = i.on_conflict_do_update( 2010 index_elements=self.goofy_index.expressions, 2011 index_where=self.goofy_index.dialect_options["postgresql"][ 2012 "where" 2013 ], 2014 set_=dict(name=i.excluded.name), 2015 ) 2016 self.assert_compile( 2017 i, 2018 "INSERT INTO mytable (name) VALUES " 2019 "(%(name)s) ON CONFLICT (name) " 2020 "WHERE name > %(name_1)s " 2021 "DO UPDATE SET name = excluded.name", 2022 ) 2023 2024 def test_do_update_index_elements_where_target_multivalues(self): 2025 i = insert( 2026 self.table1, 2027 values=[dict(name="foo"), dict(name="bar"), dict(name="bat")], 2028 ) 2029 i = i.on_conflict_do_update( 2030 index_elements=self.goofy_index.expressions, 2031 index_where=self.goofy_index.dialect_options["postgresql"][ 2032 "where" 2033 ], 2034 set_=dict(name=i.excluded.name), 2035 ) 2036 self.assert_compile( 2037 i, 2038 "INSERT INTO mytable (name) " 2039 "VALUES (%(name_m0)s), (%(name_m1)s), (%(name_m2)s) " 2040 "ON CONFLICT (name) " 2041 "WHERE name > %(name_1)s " 2042 "DO UPDATE SET name = excluded.name", 2043 checkparams={ 2044 "name_1": "m", 2045 "name_m0": "foo", 2046 "name_m1": "bar", 2047 "name_m2": "bat", 2048 }, 2049 ) 2050 2051 def test_do_update_unnamed_index_target(self): 2052 i = insert(self.table1, values=dict(name="foo")) 2053 2054 unnamed_goofy = Index( 2055 None, self.table1.c.name, postgresql_where=self.table1.c.name > "m" 2056 ) 2057 2058 i = i.on_conflict_do_update( 2059 constraint=unnamed_goofy, set_=dict(name=i.excluded.name) 2060 ) 2061 self.assert_compile( 2062 i, 2063 "INSERT INTO mytable (name) VALUES " 2064 "(%(name)s) ON CONFLICT (name) " 2065 "WHERE name > %(name_1)s " 2066 "DO UPDATE SET name = excluded.name", 2067 ) 2068 2069 def test_do_update_unnamed_exclude_constraint_target(self): 2070 i = insert(self.table1, values=dict(name="foo")) 2071 i = i.on_conflict_do_update( 2072 constraint=self.excl_constr_anon, set_=dict(name=i.excluded.name) 2073 ) 2074 self.assert_compile( 2075 i, 2076 "INSERT INTO mytable (name) VALUES " 2077 "(%(name)s) ON CONFLICT (name, description) " 2078 "WHERE description != %(description_1)s " 2079 "DO UPDATE SET name = excluded.name", 2080 ) 2081 2082 def test_do_update_add_whereclause(self): 2083 i = insert(self.table1, values=dict(name="foo")) 2084 i = i.on_conflict_do_update( 2085 constraint=self.excl_constr_anon, 2086 set_=dict(name=i.excluded.name), 2087 where=( 2088 (self.table1.c.name != "brah") 2089 & (self.table1.c.description != "brah") 2090 ), 2091 ) 2092 self.assert_compile( 2093 i, 2094 "INSERT INTO mytable (name) VALUES " 2095 "(%(name)s) ON CONFLICT (name, description) " 2096 "WHERE description != %(description_1)s " 2097 "DO UPDATE SET name = excluded.name " 2098 "WHERE mytable.name != %(name_1)s " 2099 "AND mytable.description != %(description_2)s", 2100 ) 2101 2102 def test_do_update_add_whereclause_references_excluded(self): 2103 i = insert(self.table1, values=dict(name="foo")) 2104 i = i.on_conflict_do_update( 2105 constraint=self.excl_constr_anon, 2106 set_=dict(name=i.excluded.name), 2107 where=((self.table1.c.name != i.excluded.name)), 2108 ) 2109 self.assert_compile( 2110 i, 2111 "INSERT INTO mytable (name) VALUES " 2112 "(%(name)s) ON CONFLICT (name, description) " 2113 "WHERE description != %(description_1)s " 2114 "DO UPDATE SET name = excluded.name " 2115 "WHERE mytable.name != excluded.name", 2116 ) 2117 2118 def test_do_update_additional_colnames(self): 2119 i = insert(self.table1, values=dict(name="bar")) 2120 i = i.on_conflict_do_update( 2121 constraint=self.excl_constr_anon, 2122 set_=dict(name="somename", unknown="unknown"), 2123 ) 2124 with expect_warnings( 2125 "Additional column names not matching any " 2126 "column keys in table 'mytable': 'unknown'" 2127 ): 2128 self.assert_compile( 2129 i, 2130 "INSERT INTO mytable (name) VALUES " 2131 "(%(name)s) ON CONFLICT (name, description) " 2132 "WHERE description != %(description_1)s " 2133 "DO UPDATE SET name = %(param_1)s, " 2134 "unknown = %(param_2)s", 2135 checkparams={ 2136 "name": "bar", 2137 "description_1": "foo", 2138 "param_1": "somename", 2139 "param_2": "unknown", 2140 }, 2141 ) 2142 2143 def test_on_conflict_as_cte(self): 2144 i = insert(self.table1, values=dict(name="foo")) 2145 i = ( 2146 i.on_conflict_do_update( 2147 constraint=self.excl_constr_anon, 2148 set_=dict(name=i.excluded.name), 2149 where=((self.table1.c.name != i.excluded.name)), 2150 ) 2151 .returning(literal_column("1")) 2152 .cte("i_upsert") 2153 ) 2154 2155 stmt = select([i]) 2156 2157 self.assert_compile( 2158 stmt, 2159 "WITH i_upsert AS " 2160 "(INSERT INTO mytable (name) VALUES (%(name)s) " 2161 "ON CONFLICT (name, description) " 2162 "WHERE description != %(description_1)s " 2163 "DO UPDATE SET name = excluded.name " 2164 "WHERE mytable.name != excluded.name RETURNING 1) " 2165 "SELECT i_upsert.1 " 2166 "FROM i_upsert", 2167 ) 2168 2169 def test_quote_raw_string_col(self): 2170 t = table("t", column("FancyName"), column("other name")) 2171 2172 stmt = ( 2173 insert(t) 2174 .values(FancyName="something new") 2175 .on_conflict_do_update( 2176 index_elements=["FancyName", "other name"], 2177 set_=OrderedDict( 2178 [ 2179 ("FancyName", "something updated"), 2180 ("other name", "something else"), 2181 ] 2182 ), 2183 ) 2184 ) 2185 2186 self.assert_compile( 2187 stmt, 2188 'INSERT INTO t ("FancyName") VALUES (%(FancyName)s) ' 2189 'ON CONFLICT ("FancyName", "other name") ' 2190 'DO UPDATE SET "FancyName" = %(param_1)s, ' 2191 '"other name" = %(param_2)s', 2192 { 2193 "param_1": "something updated", 2194 "param_2": "something else", 2195 "FancyName": "something new", 2196 }, 2197 ) 2198 2199 2200class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL): 2201 2202 """Test 'DISTINCT' with SQL expression language and orm.Query with 2203 an emphasis on PG's 'DISTINCT ON' syntax. 2204 2205 """ 2206 2207 __dialect__ = postgresql.dialect() 2208 2209 def setup(self): 2210 self.table = Table( 2211 "t", 2212 MetaData(), 2213 Column("id", Integer, primary_key=True), 2214 Column("a", String), 2215 Column("b", String), 2216 ) 2217 2218 def test_plain_generative(self): 2219 self.assert_compile( 2220 select([self.table]).distinct(), 2221 "SELECT DISTINCT t.id, t.a, t.b FROM t", 2222 ) 2223 2224 def test_on_columns_generative(self): 2225 self.assert_compile( 2226 select([self.table]).distinct(self.table.c.a), 2227 "SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t", 2228 ) 2229 2230 def test_on_columns_generative_multi_call(self): 2231 self.assert_compile( 2232 select([self.table]) 2233 .distinct(self.table.c.a) 2234 .distinct(self.table.c.b), 2235 "SELECT DISTINCT ON (t.a, t.b) t.id, t.a, t.b FROM t", 2236 ) 2237 2238 def test_plain_inline(self): 2239 self.assert_compile( 2240 select([self.table], distinct=True), 2241 "SELECT DISTINCT t.id, t.a, t.b FROM t", 2242 ) 2243 2244 def test_on_columns_inline_list(self): 2245 self.assert_compile( 2246 select( 2247 [self.table], distinct=[self.table.c.a, self.table.c.b] 2248 ).order_by(self.table.c.a, self.table.c.b), 2249 "SELECT DISTINCT ON (t.a, t.b) t.id, " 2250 "t.a, t.b FROM t ORDER BY t.a, t.b", 2251 ) 2252 2253 def test_on_columns_inline_scalar(self): 2254 self.assert_compile( 2255 select([self.table], distinct=self.table.c.a), 2256 "SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t", 2257 ) 2258 2259 def test_literal_binds(self): 2260 self.assert_compile( 2261 select([self.table]).distinct(self.table.c.a == 10), 2262 "SELECT DISTINCT ON (t.a = 10) t.id, t.a, t.b FROM t", 2263 literal_binds=True, 2264 ) 2265 2266 def test_query_plain(self): 2267 sess = Session() 2268 self.assert_compile( 2269 sess.query(self.table).distinct(), 2270 "SELECT DISTINCT t.id AS t_id, t.a AS t_a, " "t.b AS t_b FROM t", 2271 ) 2272 2273 def test_query_on_columns(self): 2274 sess = Session() 2275 self.assert_compile( 2276 sess.query(self.table).distinct(self.table.c.a), 2277 "SELECT DISTINCT ON (t.a) t.id AS t_id, t.a AS t_a, " 2278 "t.b AS t_b FROM t", 2279 ) 2280 2281 def test_query_on_columns_multi_call(self): 2282 sess = Session() 2283 self.assert_compile( 2284 sess.query(self.table) 2285 .distinct(self.table.c.a) 2286 .distinct(self.table.c.b), 2287 "SELECT DISTINCT ON (t.a, t.b) t.id AS t_id, t.a AS t_a, " 2288 "t.b AS t_b FROM t", 2289 ) 2290 2291 def test_query_on_columns_subquery(self): 2292 sess = Session() 2293 2294 class Foo(object): 2295 pass 2296 2297 mapper(Foo, self.table) 2298 sess = Session() 2299 self.assert_compile( 2300 sess.query(Foo).from_self().distinct(Foo.a, Foo.b), 2301 "SELECT DISTINCT ON (anon_1.t_a, anon_1.t_b) anon_1.t_id " 2302 "AS anon_1_t_id, anon_1.t_a AS anon_1_t_a, anon_1.t_b " 2303 "AS anon_1_t_b FROM (SELECT t.id AS t_id, t.a AS t_a, " 2304 "t.b AS t_b FROM t) AS anon_1", 2305 ) 2306 2307 def test_query_distinct_on_aliased(self): 2308 class Foo(object): 2309 pass 2310 2311 mapper(Foo, self.table) 2312 a1 = aliased(Foo) 2313 sess = Session() 2314 self.assert_compile( 2315 sess.query(a1).distinct(a1.a), 2316 "SELECT DISTINCT ON (t_1.a) t_1.id AS t_1_id, " 2317 "t_1.a AS t_1_a, t_1.b AS t_1_b FROM t AS t_1", 2318 ) 2319 2320 def test_distinct_on_subquery_anon(self): 2321 2322 sq = select([self.table]).alias() 2323 q = ( 2324 select([self.table.c.id, sq.c.id]) 2325 .distinct(sq.c.id) 2326 .where(self.table.c.id == sq.c.id) 2327 ) 2328 2329 self.assert_compile( 2330 q, 2331 "SELECT DISTINCT ON (anon_1.id) t.id, anon_1.id " 2332 "FROM t, (SELECT t.id AS id, t.a AS a, t.b " 2333 "AS b FROM t) AS anon_1 WHERE t.id = anon_1.id", 2334 ) 2335 2336 def test_distinct_on_subquery_named(self): 2337 sq = select([self.table]).alias("sq") 2338 q = ( 2339 select([self.table.c.id, sq.c.id]) 2340 .distinct(sq.c.id) 2341 .where(self.table.c.id == sq.c.id) 2342 ) 2343 self.assert_compile( 2344 q, 2345 "SELECT DISTINCT ON (sq.id) t.id, sq.id " 2346 "FROM t, (SELECT t.id AS id, t.a AS a, " 2347 "t.b AS b FROM t) AS sq WHERE t.id = sq.id", 2348 ) 2349 2350 2351class FullTextSearchTest(fixtures.TestBase, AssertsCompiledSQL): 2352 2353 """Tests for full text searching""" 2354 2355 __dialect__ = postgresql.dialect() 2356 2357 def setup(self): 2358 self.table = Table( 2359 "t", 2360 MetaData(), 2361 Column("id", Integer, primary_key=True), 2362 Column("title", String), 2363 Column("body", String), 2364 ) 2365 self.table_alt = table( 2366 "mytable", 2367 column("id", Integer), 2368 column("title", String(128)), 2369 column("body", String(128)), 2370 ) 2371 2372 def _raise_query(self, q): 2373 """ 2374 useful for debugging. just do... 2375 self._raise_query(q) 2376 """ 2377 c = q.compile(dialect=postgresql.dialect()) 2378 raise ValueError(c) 2379 2380 def test_match_basic(self): 2381 s = select([self.table_alt.c.id]).where( 2382 self.table_alt.c.title.match("somestring") 2383 ) 2384 self.assert_compile( 2385 s, 2386 "SELECT mytable.id " 2387 "FROM mytable " 2388 "WHERE mytable.title @@ to_tsquery(%(title_1)s)", 2389 ) 2390 2391 def test_match_regconfig(self): 2392 s = select([self.table_alt.c.id]).where( 2393 self.table_alt.c.title.match( 2394 "somestring", postgresql_regconfig="english" 2395 ) 2396 ) 2397 self.assert_compile( 2398 s, 2399 "SELECT mytable.id " 2400 "FROM mytable " 2401 """WHERE mytable.title @@ to_tsquery('english', %(title_1)s)""", 2402 ) 2403 2404 def test_match_tsvector(self): 2405 s = select([self.table_alt.c.id]).where( 2406 func.to_tsvector(self.table_alt.c.title).match("somestring") 2407 ) 2408 self.assert_compile( 2409 s, 2410 "SELECT mytable.id " 2411 "FROM mytable " 2412 "WHERE to_tsvector(mytable.title) " 2413 "@@ to_tsquery(%(to_tsvector_1)s)", 2414 ) 2415 2416 def test_match_tsvectorconfig(self): 2417 s = select([self.table_alt.c.id]).where( 2418 func.to_tsvector("english", self.table_alt.c.title).match( 2419 "somestring" 2420 ) 2421 ) 2422 self.assert_compile( 2423 s, 2424 "SELECT mytable.id " 2425 "FROM mytable " 2426 "WHERE to_tsvector(%(to_tsvector_1)s, mytable.title) @@ " 2427 "to_tsquery(%(to_tsvector_2)s)", 2428 ) 2429 2430 def test_match_tsvectorconfig_regconfig(self): 2431 s = select([self.table_alt.c.id]).where( 2432 func.to_tsvector("english", self.table_alt.c.title).match( 2433 "somestring", postgresql_regconfig="english" 2434 ) 2435 ) 2436 self.assert_compile( 2437 s, 2438 "SELECT mytable.id " 2439 "FROM mytable " 2440 "WHERE to_tsvector(%(to_tsvector_1)s, mytable.title) @@ " 2441 """to_tsquery('english', %(to_tsvector_2)s)""", 2442 ) 2443