1from contextlib import contextmanager 2import pickle 3 4import sqlalchemy as tsa 5from sqlalchemy import ARRAY 6from sqlalchemy import bindparam 7from sqlalchemy import BLANK_SCHEMA 8from sqlalchemy import Boolean 9from sqlalchemy import CheckConstraint 10from sqlalchemy import Column 11from sqlalchemy import column 12from sqlalchemy import ColumnDefault 13from sqlalchemy import desc 14from sqlalchemy import Enum 15from sqlalchemy import event 16from sqlalchemy import exc 17from sqlalchemy import ForeignKey 18from sqlalchemy import ForeignKeyConstraint 19from sqlalchemy import func 20from sqlalchemy import Index 21from sqlalchemy import Integer 22from sqlalchemy import MetaData 23from sqlalchemy import PrimaryKeyConstraint 24from sqlalchemy import schema 25from sqlalchemy import Sequence 26from sqlalchemy import String 27from sqlalchemy import Table 28from sqlalchemy import table 29from sqlalchemy import testing 30from sqlalchemy import text 31from sqlalchemy import TypeDecorator 32from sqlalchemy import types as sqltypes 33from sqlalchemy import Unicode 34from sqlalchemy import UniqueConstraint 35from sqlalchemy.engine import default 36from sqlalchemy.schema import AddConstraint 37from sqlalchemy.schema import CreateIndex 38from sqlalchemy.schema import DefaultClause 39from sqlalchemy.schema import DropIndex 40from sqlalchemy.sql import naming 41from sqlalchemy.sql import operators 42from sqlalchemy.sql.elements import _NONE_NAME 43from sqlalchemy.sql.elements import literal_column 44from sqlalchemy.testing import assert_raises 45from sqlalchemy.testing import assert_raises_message 46from sqlalchemy.testing import AssertsCompiledSQL 47from sqlalchemy.testing import ComparesTables 48from sqlalchemy.testing import emits_warning 49from sqlalchemy.testing import eq_ 50from sqlalchemy.testing import expect_raises_message 51from sqlalchemy.testing import fixtures 52from sqlalchemy.testing import is_ 53from sqlalchemy.testing import is_false 54from sqlalchemy.testing import is_true 55from sqlalchemy.testing import mock 56 57 58class MetaDataTest(fixtures.TestBase, ComparesTables): 59 def test_metadata_contains(self): 60 metadata = MetaData() 61 t1 = Table("t1", metadata, Column("x", Integer)) 62 t2 = Table("t2", metadata, Column("x", Integer), schema="foo") 63 t3 = Table("t2", MetaData(), Column("x", Integer)) 64 t4 = Table("t1", MetaData(), Column("x", Integer), schema="foo") 65 66 assert "t1" in metadata 67 assert "foo.t2" in metadata 68 assert "t2" not in metadata 69 assert "foo.t1" not in metadata 70 assert t1 in metadata 71 assert t2 in metadata 72 assert t3 not in metadata 73 assert t4 not in metadata 74 75 def test_uninitialized_column_copy(self): 76 for col in [ 77 Column("foo", String(), nullable=False), 78 Column("baz", String(), unique=True), 79 Column(Integer(), primary_key=True), 80 Column( 81 "bar", 82 Integer(), 83 Sequence("foo_seq"), 84 primary_key=True, 85 key="bar", 86 ), 87 Column(Integer(), ForeignKey("bat.blah"), doc="this is a col"), 88 Column( 89 "bar", 90 Integer(), 91 ForeignKey("bat.blah"), 92 primary_key=True, 93 key="bar", 94 ), 95 Column("bar", Integer(), info={"foo": "bar"}), 96 ]: 97 c2 = col._copy() 98 for attr in ( 99 "name", 100 "type", 101 "nullable", 102 "primary_key", 103 "key", 104 "unique", 105 "info", 106 "doc", 107 ): 108 eq_(getattr(col, attr), getattr(c2, attr)) 109 eq_(len(col.foreign_keys), len(c2.foreign_keys)) 110 if col.default: 111 eq_(c2.default.name, "foo_seq") 112 for a1, a2 in zip(col.foreign_keys, c2.foreign_keys): 113 assert a1 is not a2 114 eq_(a2._colspec, "bat.blah") 115 116 def test_col_subclass_copy(self): 117 class MyColumn(schema.Column): 118 def __init__(self, *args, **kw): 119 self.widget = kw.pop("widget", None) 120 super(MyColumn, self).__init__(*args, **kw) 121 122 def _copy(self, *arg, **kw): 123 c = super(MyColumn, self)._copy(*arg, **kw) 124 c.widget = self.widget 125 return c 126 127 c1 = MyColumn("foo", Integer, widget="x") 128 c2 = c1._copy() 129 assert isinstance(c2, MyColumn) 130 eq_(c2.widget, "x") 131 132 def test_uninitialized_column_copy_events(self): 133 msgs = [] 134 135 def write(c, t): 136 msgs.append("attach %s.%s" % (t.name, c.name)) 137 138 c1 = Column("foo", String()) 139 m = MetaData() 140 for i in range(3): 141 cx = c1._copy() 142 # as of 0.7, these events no longer copy. its expected 143 # that listeners will be re-established from the 144 # natural construction of things. 145 cx._on_table_attach(write) 146 Table("foo%d" % i, m, cx) 147 eq_(msgs, ["attach foo0.foo", "attach foo1.foo", "attach foo2.foo"]) 148 149 def test_schema_collection_add(self): 150 metadata = MetaData() 151 152 Table("t1", metadata, Column("x", Integer), schema="foo") 153 Table("t2", metadata, Column("x", Integer), schema="bar") 154 Table("t3", metadata, Column("x", Integer)) 155 156 eq_(metadata._schemas, set(["foo", "bar"])) 157 eq_(len(metadata.tables), 3) 158 159 def test_schema_collection_remove(self): 160 metadata = MetaData() 161 162 t1 = Table("t1", metadata, Column("x", Integer), schema="foo") 163 Table("t2", metadata, Column("x", Integer), schema="bar") 164 t3 = Table("t3", metadata, Column("x", Integer), schema="bar") 165 166 metadata.remove(t3) 167 eq_(metadata._schemas, set(["foo", "bar"])) 168 eq_(len(metadata.tables), 2) 169 170 metadata.remove(t1) 171 eq_(metadata._schemas, set(["bar"])) 172 eq_(len(metadata.tables), 1) 173 174 def test_schema_collection_remove_all(self): 175 metadata = MetaData() 176 177 Table("t1", metadata, Column("x", Integer), schema="foo") 178 Table("t2", metadata, Column("x", Integer), schema="bar") 179 180 metadata.clear() 181 eq_(metadata._schemas, set()) 182 eq_(len(metadata.tables), 0) 183 184 def test_metadata_tables_immutable(self): 185 # this use case was added due to #1917. 186 metadata = MetaData() 187 188 Table("t1", metadata, Column("x", Integer)) 189 assert "t1" in metadata.tables 190 191 assert_raises(TypeError, lambda: metadata.tables.pop("t1")) 192 193 @testing.provide_metadata 194 def test_dupe_tables(self): 195 metadata = self.metadata 196 Table( 197 "table1", 198 metadata, 199 Column("col1", Integer, primary_key=True), 200 Column("col2", String(20)), 201 ) 202 203 metadata.create_all(testing.db) 204 Table("table1", metadata, autoload_with=testing.db) 205 206 def go(): 207 Table( 208 "table1", 209 metadata, 210 Column("col1", Integer, primary_key=True), 211 Column("col2", String(20)), 212 ) 213 214 assert_raises_message( 215 tsa.exc.InvalidRequestError, 216 "Table 'table1' is already defined for this " 217 "MetaData instance. Specify 'extend_existing=True' " 218 "to redefine options and columns on an existing " 219 "Table object.", 220 go, 221 ) 222 223 def test_fk_copy(self): 224 c1 = Column("foo", Integer) 225 c2 = Column("bar", Integer) 226 m = MetaData() 227 t1 = Table("t", m, c1, c2) 228 229 kw = dict( 230 onupdate="X", 231 ondelete="Y", 232 use_alter=True, 233 name="f1", 234 deferrable="Z", 235 initially="Q", 236 link_to_name=True, 237 ) 238 239 fk1 = ForeignKey(c1, **kw) 240 fk2 = ForeignKeyConstraint((c1,), (c2,), **kw) 241 242 t1.append_constraint(fk2) 243 fk1c = fk1._copy() 244 fk2c = fk2._copy() 245 246 for k in kw: 247 eq_(getattr(fk1c, k), kw[k]) 248 eq_(getattr(fk2c, k), kw[k]) 249 250 def test_check_constraint_copy(self): 251 def r(x): 252 return x 253 254 c = CheckConstraint( 255 "foo bar", 256 name="name", 257 initially=True, 258 deferrable=True, 259 _create_rule=r, 260 ) 261 c2 = c._copy() 262 eq_(c2.name, "name") 263 eq_(str(c2.sqltext), "foo bar") 264 eq_(c2.initially, True) 265 eq_(c2.deferrable, True) 266 assert c2._create_rule is r 267 268 def test_col_replace_w_constraint(self): 269 m = MetaData() 270 a = Table("a", m, Column("id", Integer, primary_key=True)) 271 272 aid = Column("a_id", ForeignKey("a.id")) 273 b = Table("b", m, aid) 274 b.append_column(aid) 275 276 assert b.c.a_id.references(a.c.id) 277 eq_(len(b.constraints), 2) 278 279 def test_fk_construct(self): 280 c1 = Column("foo", Integer) 281 c2 = Column("bar", Integer) 282 m = MetaData() 283 t1 = Table("t", m, c1, c2) 284 fk1 = ForeignKeyConstraint(("foo",), ("bar",), table=t1) 285 assert fk1 in t1.constraints 286 287 def test_fk_constraint_col_collection_w_table(self): 288 c1 = Column("foo", Integer) 289 c2 = Column("bar", Integer) 290 m = MetaData() 291 t1 = Table("t", m, c1, c2) 292 fk1 = ForeignKeyConstraint(("foo",), ("bar",), table=t1) 293 eq_(dict(fk1.columns), {"foo": c1}) 294 295 def test_fk_constraint_col_collection_no_table(self): 296 fk1 = ForeignKeyConstraint(("foo", "bat"), ("bar", "hoho")) 297 eq_(dict(fk1.columns), {}) 298 eq_(fk1.column_keys, ["foo", "bat"]) 299 eq_(fk1._col_description, "foo, bat") 300 eq_(fk1._elements, {"foo": fk1.elements[0], "bat": fk1.elements[1]}) 301 302 def test_fk_constraint_col_collection_no_table_real_cols(self): 303 c1 = Column("foo", Integer) 304 c2 = Column("bar", Integer) 305 fk1 = ForeignKeyConstraint((c1,), (c2,)) 306 eq_(dict(fk1.columns), {}) 307 eq_(fk1.column_keys, ["foo"]) 308 eq_(fk1._col_description, "foo") 309 eq_(fk1._elements, {"foo": fk1.elements[0]}) 310 311 def test_fk_constraint_col_collection_added_to_table(self): 312 c1 = Column("foo", Integer) 313 m = MetaData() 314 fk1 = ForeignKeyConstraint(("foo",), ("bar",)) 315 Table("t", m, c1, fk1) 316 eq_(dict(fk1.columns), {"foo": c1}) 317 eq_(fk1._elements, {"foo": fk1.elements[0]}) 318 319 def test_fk_constraint_col_collection_via_fk(self): 320 fk = ForeignKey("bar") 321 c1 = Column("foo", Integer, fk) 322 m = MetaData() 323 t1 = Table("t", m, c1) 324 fk1 = fk.constraint 325 eq_(fk1.column_keys, ["foo"]) 326 assert fk1 in t1.constraints 327 eq_(fk1.column_keys, ["foo"]) 328 eq_(dict(fk1.columns), {"foo": c1}) 329 eq_(fk1._elements, {"foo": fk}) 330 331 def test_fk_no_such_parent_col_error(self): 332 meta = MetaData() 333 a = Table("a", meta, Column("a", Integer)) 334 Table("b", meta, Column("b", Integer)) 335 336 def go(): 337 a.append_constraint(ForeignKeyConstraint(["x"], ["b.b"])) 338 339 assert_raises_message( 340 exc.ArgumentError, 341 "Can't create ForeignKeyConstraint on " 342 "table 'a': no column named 'x' is present.", 343 go, 344 ) 345 346 def test_fk_given_non_col(self): 347 not_a_col = bindparam("x") 348 assert_raises_message( 349 exc.ArgumentError, 350 "String column name or Column object for DDL foreign " 351 "key constraint expected, got .*Bind", 352 ForeignKey, 353 not_a_col, 354 ) 355 356 def test_fk_given_non_col_clauseelem(self): 357 class Foo(object): 358 def __clause_element__(self): 359 return bindparam("x") 360 361 assert_raises_message( 362 exc.ArgumentError, 363 "String column name or Column object for DDL foreign " 364 "key constraint expected, got .*Foo", 365 ForeignKey, 366 Foo(), 367 ) 368 369 def test_fk_given_col_non_table(self): 370 t = Table("t", MetaData(), Column("x", Integer)) 371 xa = t.alias().c.x 372 assert_raises_message( 373 exc.ArgumentError, 374 "ForeignKey received Column not bound to a Table, got: .*Alias", 375 ForeignKey, 376 xa, 377 ) 378 379 def test_fk_given_col_non_table_clauseelem(self): 380 t = Table("t", MetaData(), Column("x", Integer)) 381 382 class Foo(object): 383 def __clause_element__(self): 384 return t.alias().c.x 385 386 assert_raises_message( 387 exc.ArgumentError, 388 "ForeignKey received Column not bound to a Table, got: .*Alias", 389 ForeignKey, 390 Foo(), 391 ) 392 393 def test_fk_no_such_target_col_error_upfront(self): 394 meta = MetaData() 395 a = Table("a", meta, Column("a", Integer)) 396 Table("b", meta, Column("b", Integer)) 397 398 a.append_constraint(ForeignKeyConstraint(["a"], ["b.x"])) 399 400 assert_raises_message( 401 exc.NoReferencedColumnError, 402 "Could not initialize target column for ForeignKey 'b.x' on " 403 "table 'a': table 'b' has no column named 'x'", 404 getattr, 405 list(a.foreign_keys)[0], 406 "column", 407 ) 408 409 def test_fk_no_such_target_col_error_delayed(self): 410 meta = MetaData() 411 a = Table("a", meta, Column("a", Integer)) 412 a.append_constraint(ForeignKeyConstraint(["a"], ["b.x"])) 413 414 Table("b", meta, Column("b", Integer)) 415 416 assert_raises_message( 417 exc.NoReferencedColumnError, 418 "Could not initialize target column for ForeignKey 'b.x' on " 419 "table 'a': table 'b' has no column named 'x'", 420 getattr, 421 list(a.foreign_keys)[0], 422 "column", 423 ) 424 425 def test_fk_mismatched_local_remote_cols(self): 426 427 assert_raises_message( 428 exc.ArgumentError, 429 "ForeignKeyConstraint number of constrained columns must " 430 "match the number of referenced columns.", 431 ForeignKeyConstraint, 432 ["a"], 433 ["b.a", "b.b"], 434 ) 435 436 assert_raises_message( 437 exc.ArgumentError, 438 "ForeignKeyConstraint number of constrained columns " 439 "must match the number of referenced columns.", 440 ForeignKeyConstraint, 441 ["a", "b"], 442 ["b.a"], 443 ) 444 445 assert_raises_message( 446 exc.ArgumentError, 447 "ForeignKeyConstraint with duplicate source column " 448 "references are not supported.", 449 ForeignKeyConstraint, 450 ["a", "a"], 451 ["b.a", "b.b"], 452 ) 453 454 def test_pickle_metadata_sequence_restated(self): 455 m1 = MetaData() 456 Table( 457 "a", 458 m1, 459 Column("id", Integer, primary_key=True), 460 Column("x", Integer, Sequence("x_seq")), 461 ) 462 463 m2 = pickle.loads(pickle.dumps(m1)) 464 465 s2 = Sequence("x_seq") 466 t2 = Table( 467 "a", 468 m2, 469 Column("id", Integer, primary_key=True), 470 Column("x", Integer, s2), 471 extend_existing=True, 472 ) 473 474 assert m2._sequences["x_seq"] is t2.c.x.default 475 assert m2._sequences["x_seq"] is s2 476 477 def test_sequence_restated_replaced(self): 478 """Test restatement of Sequence replaces.""" 479 480 m1 = MetaData() 481 s1 = Sequence("x_seq") 482 t = Table("a", m1, Column("x", Integer, s1)) 483 assert m1._sequences["x_seq"] is s1 484 485 s2 = Sequence("x_seq") 486 Table("a", m1, Column("x", Integer, s2), extend_existing=True) 487 assert t.c.x.default is s2 488 assert m1._sequences["x_seq"] is s2 489 490 def test_sequence_attach_to_table(self): 491 m1 = MetaData() 492 s1 = Sequence("s") 493 Table("a", m1, Column("x", Integer, s1)) 494 assert s1.metadata is m1 495 496 def test_sequence_attach_to_existing_table(self): 497 m1 = MetaData() 498 s1 = Sequence("s") 499 t = Table("a", m1, Column("x", Integer)) 500 t.c.x._init_items(s1) 501 assert s1.metadata is m1 502 503 def test_pickle_metadata_sequence_implicit(self): 504 m1 = MetaData() 505 Table( 506 "a", 507 m1, 508 Column("id", Integer, primary_key=True), 509 Column("x", Integer, Sequence("x_seq")), 510 ) 511 512 m2 = pickle.loads(pickle.dumps(m1)) 513 514 t2 = Table("a", m2, extend_existing=True) 515 516 eq_(m2._sequences, {"x_seq": t2.c.x.default}) 517 518 def test_pickle_metadata_schema(self): 519 m1 = MetaData() 520 Table( 521 "a", 522 m1, 523 Column("id", Integer, primary_key=True), 524 Column("x", Integer, Sequence("x_seq")), 525 schema="y", 526 ) 527 528 m2 = pickle.loads(pickle.dumps(m1)) 529 530 Table("a", m2, schema="y", extend_existing=True) 531 532 eq_(m2._schemas, m1._schemas) 533 534 def test_metadata_schema_arg(self): 535 m1 = MetaData(schema="sch1") 536 m2 = MetaData(schema="sch1", quote_schema=True) 537 m3 = MetaData(schema="sch1", quote_schema=False) 538 m4 = MetaData() 539 540 for ( 541 i, 542 ( 543 name, 544 metadata, 545 schema_, 546 quote_schema, 547 exp_schema, 548 exp_quote_schema, 549 ), 550 ) in enumerate( 551 [ 552 ("t1", m1, None, None, "sch1", None), 553 ("t2", m1, "sch2", None, "sch2", None), 554 ("t3", m1, "sch2", True, "sch2", True), 555 ("t4", m1, "sch1", None, "sch1", None), 556 ("t5", m1, BLANK_SCHEMA, None, None, None), 557 ("t1", m2, None, None, "sch1", True), 558 ("t2", m2, "sch2", None, "sch2", None), 559 ("t3", m2, "sch2", True, "sch2", True), 560 ("t4", m2, "sch1", None, "sch1", None), 561 ("t1", m3, None, None, "sch1", False), 562 ("t2", m3, "sch2", None, "sch2", None), 563 ("t3", m3, "sch2", True, "sch2", True), 564 ("t4", m3, "sch1", None, "sch1", None), 565 ("t1", m4, None, None, None, None), 566 ("t2", m4, "sch2", None, "sch2", None), 567 ("t3", m4, "sch2", True, "sch2", True), 568 ("t4", m4, "sch1", None, "sch1", None), 569 ("t5", m4, BLANK_SCHEMA, None, None, None), 570 ] 571 ): 572 kw = {} 573 if schema_ is not None: 574 kw["schema"] = schema_ 575 if quote_schema is not None: 576 kw["quote_schema"] = quote_schema 577 t = Table(name, metadata, **kw) 578 eq_(t.schema, exp_schema, "test %d, table schema" % i) 579 eq_( 580 t.schema.quote if t.schema is not None else None, 581 exp_quote_schema, 582 "test %d, table quote_schema" % i, 583 ) 584 seq = Sequence(name, metadata=metadata, **kw) 585 eq_(seq.schema, exp_schema, "test %d, seq schema" % i) 586 eq_( 587 seq.schema.quote if seq.schema is not None else None, 588 exp_quote_schema, 589 "test %d, seq quote_schema" % i, 590 ) 591 592 def test_manual_dependencies(self): 593 meta = MetaData() 594 a = Table("a", meta, Column("foo", Integer)) 595 b = Table("b", meta, Column("foo", Integer)) 596 c = Table("c", meta, Column("foo", Integer)) 597 d = Table("d", meta, Column("foo", Integer)) 598 e = Table("e", meta, Column("foo", Integer)) 599 600 e.add_is_dependent_on(c) 601 a.add_is_dependent_on(b) 602 b.add_is_dependent_on(d) 603 e.add_is_dependent_on(b) 604 c.add_is_dependent_on(a) 605 eq_(meta.sorted_tables, [d, b, a, c, e]) 606 607 def test_deterministic_order(self): 608 meta = MetaData() 609 a = Table("a", meta, Column("foo", Integer)) 610 b = Table("b", meta, Column("foo", Integer)) 611 c = Table("c", meta, Column("foo", Integer)) 612 d = Table("d", meta, Column("foo", Integer)) 613 e = Table("e", meta, Column("foo", Integer)) 614 615 e.add_is_dependent_on(c) 616 a.add_is_dependent_on(b) 617 eq_(meta.sorted_tables, [b, c, d, a, e]) 618 619 def test_fks_deterministic_order(self): 620 meta = MetaData() 621 a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo"))) 622 b = Table("b", meta, Column("foo", Integer)) 623 c = Table("c", meta, Column("foo", Integer)) 624 d = Table("d", meta, Column("foo", Integer)) 625 e = Table("e", meta, Column("foo", Integer, ForeignKey("c.foo"))) 626 627 eq_(meta.sorted_tables, [b, c, d, a, e]) 628 629 def test_cycles_fks_warning_one(self): 630 meta = MetaData() 631 a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo"))) 632 b = Table("b", meta, Column("foo", Integer, ForeignKey("d.foo"))) 633 c = Table("c", meta, Column("foo", Integer, ForeignKey("b.foo"))) 634 d = Table("d", meta, Column("foo", Integer, ForeignKey("c.foo"))) 635 e = Table("e", meta, Column("foo", Integer)) 636 637 with testing.expect_warnings( 638 "Cannot correctly sort tables; there are unresolvable cycles " 639 'between tables "b, c, d", which is usually caused by mutually ' 640 "dependent foreign key constraints. " 641 "Foreign key constraints involving these tables will not be " 642 "considered" 643 ): 644 eq_(meta.sorted_tables, [b, c, d, e, a]) 645 646 def test_cycles_fks_warning_two(self): 647 meta = MetaData() 648 a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo"))) 649 b = Table("b", meta, Column("foo", Integer, ForeignKey("a.foo"))) 650 c = Table("c", meta, Column("foo", Integer, ForeignKey("e.foo"))) 651 d = Table("d", meta, Column("foo", Integer)) 652 e = Table("e", meta, Column("foo", Integer, ForeignKey("d.foo"))) 653 654 with testing.expect_warnings( 655 "Cannot correctly sort tables; there are unresolvable cycles " 656 'between tables "a, b", which is usually caused by mutually ' 657 "dependent foreign key constraints. " 658 "Foreign key constraints involving these tables will not be " 659 "considered" 660 ): 661 eq_(meta.sorted_tables, [a, b, d, e, c]) 662 663 def test_cycles_fks_fks_delivered_separately(self): 664 meta = MetaData() 665 a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo"))) 666 b = Table("b", meta, Column("foo", Integer, ForeignKey("a.foo"))) 667 c = Table("c", meta, Column("foo", Integer, ForeignKey("e.foo"))) 668 d = Table("d", meta, Column("foo", Integer)) 669 e = Table("e", meta, Column("foo", Integer, ForeignKey("d.foo"))) 670 671 results = schema.sort_tables_and_constraints( 672 sorted(meta.tables.values(), key=lambda t: t.key) 673 ) 674 results[-1] = (None, set(results[-1][-1])) 675 eq_( 676 results, 677 [ 678 (a, set()), 679 (b, set()), 680 (d, {fk.constraint for fk in d.foreign_keys}), 681 (e, {fk.constraint for fk in e.foreign_keys}), 682 (c, {fk.constraint for fk in c.foreign_keys}), 683 ( 684 None, 685 {fk.constraint for fk in a.foreign_keys}.union( 686 fk.constraint for fk in b.foreign_keys 687 ), 688 ), 689 ], 690 ) 691 692 def test_cycles_fks_usealter(self): 693 meta = MetaData() 694 a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo"))) 695 b = Table( 696 "b", 697 meta, 698 Column("foo", Integer, ForeignKey("d.foo", use_alter=True)), 699 ) 700 c = Table("c", meta, Column("foo", Integer, ForeignKey("b.foo"))) 701 d = Table("d", meta, Column("foo", Integer, ForeignKey("c.foo"))) 702 e = Table("e", meta, Column("foo", Integer)) 703 704 eq_(meta.sorted_tables, [b, e, a, c, d]) 705 706 def test_nonexistent(self): 707 assert_raises( 708 tsa.exc.NoSuchTableError, 709 Table, 710 "fake_table", 711 MetaData(), 712 autoload_with=testing.db, 713 ) 714 715 def test_assorted_repr(self): 716 t1 = Table("foo", MetaData(), Column("x", Integer)) 717 i1 = Index("bar", t1.c.x) 718 ck = schema.CheckConstraint("x > y", name="someconstraint") 719 720 for const, exp in ( 721 (Sequence("my_seq"), "Sequence('my_seq')"), 722 (Sequence("my_seq", start=5), "Sequence('my_seq', start=5)"), 723 (Column("foo", Integer), "Column('foo', Integer(), table=None)"), 724 ( 725 Column( 726 "foo", 727 Integer, 728 primary_key=True, 729 nullable=False, 730 onupdate=1, 731 default=42, 732 server_default="42", 733 comment="foo", 734 ), 735 "Column('foo', Integer(), table=None, primary_key=True, " 736 "nullable=False, onupdate=%s, default=%s, server_default=%s, " 737 "comment='foo')" 738 % ( 739 ColumnDefault(1), 740 ColumnDefault(42), 741 DefaultClause("42"), 742 ), 743 ), 744 ( 745 Table("bar", MetaData(), Column("x", String)), 746 "Table('bar', MetaData(), " 747 "Column('x', String(), table=<bar>), schema=None)", 748 ), 749 ( 750 schema.DefaultGenerator(for_update=True), 751 "DefaultGenerator(for_update=True)", 752 ), 753 (schema.Index("bar", "c"), "Index('bar', 'c')"), 754 (i1, "Index('bar', Column('x', Integer(), table=<foo>))"), 755 (schema.FetchedValue(), "FetchedValue()"), 756 ( 757 ck, 758 "CheckConstraint(" 759 "%s" 760 ", name='someconstraint')" % repr(ck.sqltext), 761 ), 762 (ColumnDefault(("foo", "bar")), "ColumnDefault(('foo', 'bar'))"), 763 ): 764 eq_(repr(const), exp) 765 766 767class ToMetaDataTest(fixtures.TestBase, AssertsCompiledSQL, ComparesTables): 768 @testing.requires.check_constraints 769 def test_copy(self): 770 # TODO: modernize this test for 2.0 771 772 from sqlalchemy.testing.schema import Table 773 774 meta = MetaData() 775 776 table = Table( 777 "mytable", 778 meta, 779 Column("myid", Integer, Sequence("foo_id_seq"), primary_key=True), 780 Column("name", String(40), nullable=True), 781 Column( 782 "foo", 783 String(40), 784 nullable=False, 785 server_default="x", 786 server_onupdate="q", 787 ), 788 Column( 789 "bar", String(40), nullable=False, default="y", onupdate="z" 790 ), 791 Column( 792 "description", String(30), CheckConstraint("description='hi'") 793 ), 794 UniqueConstraint("name"), 795 test_needs_fk=True, 796 ) 797 798 table2 = Table( 799 "othertable", 800 meta, 801 Column("id", Integer, Sequence("foo_seq"), primary_key=True), 802 Column("myid", Integer, ForeignKey("mytable.myid")), 803 test_needs_fk=True, 804 ) 805 806 table3 = Table( 807 "has_comments", 808 meta, 809 Column("foo", Integer, comment="some column"), 810 comment="table comment", 811 ) 812 813 def test_to_metadata(): 814 meta2 = MetaData() 815 table_c = table.to_metadata(meta2) 816 table2_c = table2.to_metadata(meta2) 817 table3_c = table3.to_metadata(meta2) 818 return (table_c, table2_c, table3_c) 819 820 def test_pickle(): 821 meta.bind = testing.db 822 meta2 = pickle.loads(pickle.dumps(meta)) 823 assert meta2.bind is None 824 pickle.loads(pickle.dumps(meta2)) 825 return ( 826 meta2.tables["mytable"], 827 meta2.tables["othertable"], 828 meta2.tables["has_comments"], 829 ) 830 831 def test_pickle_via_reflect(): 832 # this is the most common use case, pickling the results of a 833 # database reflection 834 meta2 = MetaData() 835 t1 = Table("mytable", meta2, autoload_with=testing.db) 836 Table("othertable", meta2, autoload_with=testing.db) 837 Table("has_comments", meta2, autoload_with=testing.db) 838 meta3 = pickle.loads(pickle.dumps(meta2)) 839 assert meta3.bind is None 840 assert meta3.tables["mytable"] is not t1 841 842 return ( 843 meta3.tables["mytable"], 844 meta3.tables["othertable"], 845 meta3.tables["has_comments"], 846 ) 847 848 meta.create_all(testing.db) 849 try: 850 for test, has_constraints, reflect in ( 851 (test_to_metadata, True, False), 852 (test_pickle, True, False), 853 (test_pickle_via_reflect, False, True), 854 ): 855 table_c, table2_c, table3_c = test() 856 self.assert_tables_equal(table, table_c) 857 self.assert_tables_equal(table2, table2_c) 858 assert table is not table_c 859 assert table.primary_key is not table_c.primary_key 860 assert ( 861 list(table2_c.c.myid.foreign_keys)[0].column 862 is table_c.c.myid 863 ) 864 assert ( 865 list(table2_c.c.myid.foreign_keys)[0].column 866 is not table.c.myid 867 ) 868 assert "x" in str(table_c.c.foo.server_default.arg) 869 if not reflect: 870 assert isinstance(table_c.c.myid.default, Sequence) 871 assert str(table_c.c.foo.server_onupdate.arg) == "q" 872 assert str(table_c.c.bar.default.arg) == "y" 873 assert ( 874 getattr( 875 table_c.c.bar.onupdate.arg, 876 "arg", 877 table_c.c.bar.onupdate.arg, 878 ) 879 == "z" 880 ) 881 assert isinstance(table2_c.c.id.default, Sequence) 882 883 # constraints don't get reflected for any dialect right 884 # now 885 886 if has_constraints: 887 for c in table_c.c.description.constraints: 888 if isinstance(c, CheckConstraint): 889 break 890 else: 891 assert False 892 assert str(c.sqltext) == "description='hi'" 893 for c in table_c.constraints: 894 if isinstance(c, UniqueConstraint): 895 break 896 else: 897 assert False 898 assert c.columns.contains_column(table_c.c.name) 899 assert not c.columns.contains_column(table.c.name) 900 901 if testing.requires.comment_reflection.enabled: 902 eq_(table3_c.comment, "table comment") 903 eq_(table3_c.c.foo.comment, "some column") 904 905 finally: 906 meta.drop_all(testing.db) 907 908 def test_col_key_fk_parent(self): 909 # test #2643 910 m1 = MetaData() 911 a = Table("a", m1, Column("x", Integer)) 912 b = Table("b", m1, Column("x", Integer, ForeignKey("a.x"), key="y")) 913 assert b.c.y.references(a.c.x) 914 915 m2 = MetaData() 916 b2 = b.to_metadata(m2) 917 a2 = a.to_metadata(m2) 918 assert b2.c.y.references(a2.c.x) 919 920 def test_column_collection_constraint_w_ad_hoc_columns(self): 921 """Test ColumnCollectionConstraint that has columns that aren't 922 part of the Table. 923 924 """ 925 meta = MetaData() 926 927 uq1 = UniqueConstraint(literal_column("some_name")) 928 cc1 = CheckConstraint(literal_column("some_name") > 5) 929 table = Table( 930 "mytable", 931 meta, 932 Column("myid", Integer, primary_key=True), 933 Column("name", String(40), nullable=True), 934 uq1, 935 cc1, 936 ) 937 938 self.assert_compile( 939 schema.AddConstraint(uq1), 940 "ALTER TABLE mytable ADD UNIQUE (some_name)", 941 dialect="default", 942 ) 943 self.assert_compile( 944 schema.AddConstraint(cc1), 945 "ALTER TABLE mytable ADD CHECK (some_name > 5)", 946 dialect="default", 947 ) 948 meta2 = MetaData() 949 table2 = table.to_metadata(meta2) 950 uq2 = [ 951 c for c in table2.constraints if isinstance(c, UniqueConstraint) 952 ][0] 953 cc2 = [ 954 c for c in table2.constraints if isinstance(c, CheckConstraint) 955 ][0] 956 self.assert_compile( 957 schema.AddConstraint(uq2), 958 "ALTER TABLE mytable ADD UNIQUE (some_name)", 959 dialect="default", 960 ) 961 self.assert_compile( 962 schema.AddConstraint(cc2), 963 "ALTER TABLE mytable ADD CHECK (some_name > 5)", 964 dialect="default", 965 ) 966 967 def test_change_schema(self): 968 meta = MetaData() 969 970 table = Table( 971 "mytable", 972 meta, 973 Column("myid", Integer, primary_key=True), 974 Column("name", String(40), nullable=True), 975 Column( 976 "description", String(30), CheckConstraint("description='hi'") 977 ), 978 UniqueConstraint("name"), 979 ) 980 981 table2 = Table( 982 "othertable", 983 meta, 984 Column("id", Integer, primary_key=True), 985 Column("myid", Integer, ForeignKey("mytable.myid")), 986 ) 987 988 meta2 = MetaData() 989 table_c = table.to_metadata(meta2, schema="someschema") 990 table2_c = table2.to_metadata(meta2, schema="someschema") 991 992 eq_( 993 str(table_c.join(table2_c).onclause), 994 str(table_c.c.myid == table2_c.c.myid), 995 ) 996 eq_( 997 str(table_c.join(table2_c).onclause), 998 "someschema.mytable.myid = someschema.othertable.myid", 999 ) 1000 1001 def test_retain_table_schema(self): 1002 meta = MetaData() 1003 1004 table = Table( 1005 "mytable", 1006 meta, 1007 Column("myid", Integer, primary_key=True), 1008 Column("name", String(40), nullable=True), 1009 Column( 1010 "description", String(30), CheckConstraint("description='hi'") 1011 ), 1012 UniqueConstraint("name"), 1013 schema="myschema", 1014 ) 1015 1016 table2 = Table( 1017 "othertable", 1018 meta, 1019 Column("id", Integer, primary_key=True), 1020 Column("myid", Integer, ForeignKey("myschema.mytable.myid")), 1021 schema="myschema", 1022 ) 1023 1024 meta2 = MetaData() 1025 table_c = table.to_metadata(meta2) 1026 table2_c = table2.to_metadata(meta2) 1027 1028 eq_( 1029 str(table_c.join(table2_c).onclause), 1030 str(table_c.c.myid == table2_c.c.myid), 1031 ) 1032 eq_( 1033 str(table_c.join(table2_c).onclause), 1034 "myschema.mytable.myid = myschema.othertable.myid", 1035 ) 1036 1037 def test_change_name_retain_metadata(self): 1038 meta = MetaData() 1039 1040 table = Table( 1041 "mytable", 1042 meta, 1043 Column("myid", Integer, primary_key=True), 1044 Column("name", String(40), nullable=True), 1045 Column( 1046 "description", String(30), CheckConstraint("description='hi'") 1047 ), 1048 UniqueConstraint("name"), 1049 schema="myschema", 1050 ) 1051 1052 table2 = table.to_metadata(table.metadata, name="newtable") 1053 table3 = table.to_metadata( 1054 table.metadata, schema="newschema", name="newtable" 1055 ) 1056 1057 assert table.metadata is table2.metadata 1058 assert table.metadata is table3.metadata 1059 eq_( 1060 (table.name, table2.name, table3.name), 1061 ("mytable", "newtable", "newtable"), 1062 ) 1063 eq_( 1064 (table.key, table2.key, table3.key), 1065 ("myschema.mytable", "myschema.newtable", "newschema.newtable"), 1066 ) 1067 1068 def test_change_name_change_metadata(self): 1069 meta = MetaData() 1070 meta2 = MetaData() 1071 1072 table = Table( 1073 "mytable", 1074 meta, 1075 Column("myid", Integer, primary_key=True), 1076 Column("name", String(40), nullable=True), 1077 Column( 1078 "description", String(30), CheckConstraint("description='hi'") 1079 ), 1080 UniqueConstraint("name"), 1081 schema="myschema", 1082 ) 1083 1084 table2 = table.to_metadata(meta2, name="newtable") 1085 1086 assert table.metadata is not table2.metadata 1087 eq_((table.name, table2.name), ("mytable", "newtable")) 1088 eq_((table.key, table2.key), ("myschema.mytable", "myschema.newtable")) 1089 1090 def test_change_name_selfref_fk_moves(self): 1091 meta = MetaData() 1092 1093 referenced = Table( 1094 "ref", meta, Column("id", Integer, primary_key=True) 1095 ) 1096 table = Table( 1097 "mytable", 1098 meta, 1099 Column("id", Integer, primary_key=True), 1100 Column("parent_id", ForeignKey("mytable.id")), 1101 Column("ref_id", ForeignKey("ref.id")), 1102 ) 1103 1104 table2 = table.to_metadata(table.metadata, name="newtable") 1105 assert table.metadata is table2.metadata 1106 assert table2.c.ref_id.references(referenced.c.id) 1107 assert table2.c.parent_id.references(table2.c.id) 1108 1109 def test_change_name_selfref_fk_moves_w_schema(self): 1110 meta = MetaData() 1111 1112 referenced = Table( 1113 "ref", meta, Column("id", Integer, primary_key=True) 1114 ) 1115 table = Table( 1116 "mytable", 1117 meta, 1118 Column("id", Integer, primary_key=True), 1119 Column("parent_id", ForeignKey("mytable.id")), 1120 Column("ref_id", ForeignKey("ref.id")), 1121 ) 1122 1123 table2 = table.to_metadata( 1124 table.metadata, name="newtable", schema="newschema" 1125 ) 1126 ref2 = referenced.to_metadata(table.metadata, schema="newschema") 1127 assert table.metadata is table2.metadata 1128 assert table2.c.ref_id.references(ref2.c.id) 1129 assert table2.c.parent_id.references(table2.c.id) 1130 1131 def _assert_fk(self, t2, schema, expected, referred_schema_fn=None): 1132 m2 = MetaData() 1133 existing_schema = t2.schema 1134 if schema: 1135 t2c = t2.to_metadata( 1136 m2, schema=schema, referred_schema_fn=referred_schema_fn 1137 ) 1138 eq_(t2c.schema, schema) 1139 else: 1140 t2c = t2.to_metadata(m2, referred_schema_fn=referred_schema_fn) 1141 eq_(t2c.schema, existing_schema) 1142 eq_(list(t2c.c.y.foreign_keys)[0]._get_colspec(), expected) 1143 1144 def test_fk_has_schema_string_retain_schema(self): 1145 m = MetaData() 1146 1147 t2 = Table("t2", m, Column("y", Integer, ForeignKey("q.t1.x"))) 1148 self._assert_fk(t2, None, "q.t1.x") 1149 1150 Table("t1", m, Column("x", Integer), schema="q") 1151 self._assert_fk(t2, None, "q.t1.x") 1152 1153 def test_fk_has_schema_string_new_schema(self): 1154 m = MetaData() 1155 1156 t2 = Table("t2", m, Column("y", Integer, ForeignKey("q.t1.x"))) 1157 self._assert_fk(t2, "z", "q.t1.x") 1158 1159 Table("t1", m, Column("x", Integer), schema="q") 1160 self._assert_fk(t2, "z", "q.t1.x") 1161 1162 def test_fk_has_schema_col_retain_schema(self): 1163 m = MetaData() 1164 1165 t1 = Table("t1", m, Column("x", Integer), schema="q") 1166 t2 = Table("t2", m, Column("y", Integer, ForeignKey(t1.c.x))) 1167 1168 self._assert_fk(t2, "z", "q.t1.x") 1169 1170 def test_fk_has_schema_col_new_schema(self): 1171 m = MetaData() 1172 1173 t1 = Table("t1", m, Column("x", Integer), schema="q") 1174 t2 = Table("t2", m, Column("y", Integer, ForeignKey(t1.c.x))) 1175 1176 self._assert_fk(t2, "z", "q.t1.x") 1177 1178 def test_fk_and_referent_has_same_schema_string_retain_schema(self): 1179 m = MetaData() 1180 1181 t2 = Table( 1182 "t2", m, Column("y", Integer, ForeignKey("q.t1.x")), schema="q" 1183 ) 1184 1185 self._assert_fk(t2, None, "q.t1.x") 1186 1187 Table("t1", m, Column("x", Integer), schema="q") 1188 self._assert_fk(t2, None, "q.t1.x") 1189 1190 def test_fk_and_referent_has_same_schema_string_new_schema(self): 1191 m = MetaData() 1192 1193 t2 = Table( 1194 "t2", m, Column("y", Integer, ForeignKey("q.t1.x")), schema="q" 1195 ) 1196 1197 self._assert_fk(t2, "z", "z.t1.x") 1198 1199 Table("t1", m, Column("x", Integer), schema="q") 1200 self._assert_fk(t2, "z", "z.t1.x") 1201 1202 def test_fk_and_referent_has_same_schema_col_retain_schema(self): 1203 m = MetaData() 1204 1205 t1 = Table("t1", m, Column("x", Integer), schema="q") 1206 t2 = Table( 1207 "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q" 1208 ) 1209 self._assert_fk(t2, None, "q.t1.x") 1210 1211 def test_fk_and_referent_has_same_schema_col_new_schema(self): 1212 m = MetaData() 1213 1214 t1 = Table("t1", m, Column("x", Integer), schema="q") 1215 t2 = Table( 1216 "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q" 1217 ) 1218 self._assert_fk(t2, "z", "z.t1.x") 1219 1220 def test_fk_and_referent_has_diff_schema_string_retain_schema(self): 1221 m = MetaData() 1222 1223 t2 = Table( 1224 "t2", m, Column("y", Integer, ForeignKey("p.t1.x")), schema="q" 1225 ) 1226 1227 self._assert_fk(t2, None, "p.t1.x") 1228 1229 Table("t1", m, Column("x", Integer), schema="p") 1230 self._assert_fk(t2, None, "p.t1.x") 1231 1232 def test_fk_and_referent_has_diff_schema_string_new_schema(self): 1233 m = MetaData() 1234 1235 t2 = Table( 1236 "t2", m, Column("y", Integer, ForeignKey("p.t1.x")), schema="q" 1237 ) 1238 1239 self._assert_fk(t2, "z", "p.t1.x") 1240 1241 Table("t1", m, Column("x", Integer), schema="p") 1242 self._assert_fk(t2, "z", "p.t1.x") 1243 1244 def test_fk_and_referent_has_diff_schema_col_retain_schema(self): 1245 m = MetaData() 1246 1247 t1 = Table("t1", m, Column("x", Integer), schema="p") 1248 t2 = Table( 1249 "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q" 1250 ) 1251 self._assert_fk(t2, None, "p.t1.x") 1252 1253 def test_fk_and_referent_has_diff_schema_col_new_schema(self): 1254 m = MetaData() 1255 1256 t1 = Table("t1", m, Column("x", Integer), schema="p") 1257 t2 = Table( 1258 "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q" 1259 ) 1260 self._assert_fk(t2, "z", "p.t1.x") 1261 1262 def test_fk_custom_system(self): 1263 m = MetaData() 1264 t2 = Table( 1265 "t2", m, Column("y", Integer, ForeignKey("p.t1.x")), schema="q" 1266 ) 1267 1268 def ref_fn(table, to_schema, constraint, referred_schema): 1269 assert table is t2 1270 eq_(to_schema, "z") 1271 eq_(referred_schema, "p") 1272 return "h" 1273 1274 self._assert_fk(t2, "z", "h.t1.x", referred_schema_fn=ref_fn) 1275 1276 def test_copy_info(self): 1277 m = MetaData() 1278 fk = ForeignKey("t2.id") 1279 c = Column("c", Integer, fk) 1280 ck = CheckConstraint("c > 5") 1281 t = Table("t", m, c, ck) 1282 1283 m.info["minfo"] = True 1284 fk.info["fkinfo"] = True 1285 c.info["cinfo"] = True 1286 ck.info["ckinfo"] = True 1287 t.info["tinfo"] = True 1288 t.primary_key.info["pkinfo"] = True 1289 fkc = [ 1290 const 1291 for const in t.constraints 1292 if isinstance(const, ForeignKeyConstraint) 1293 ][0] 1294 fkc.info["fkcinfo"] = True 1295 1296 m2 = MetaData() 1297 t2 = t.to_metadata(m2) 1298 1299 m.info["minfo"] = False 1300 fk.info["fkinfo"] = False 1301 c.info["cinfo"] = False 1302 ck.info["ckinfo"] = False 1303 t.primary_key.info["pkinfo"] = False 1304 fkc.info["fkcinfo"] = False 1305 1306 eq_(m2.info, {}) 1307 eq_(t2.info, {"tinfo": True}) 1308 eq_(t2.c.c.info, {"cinfo": True}) 1309 eq_(list(t2.c.c.foreign_keys)[0].info, {"fkinfo": True}) 1310 eq_(t2.primary_key.info, {"pkinfo": True}) 1311 1312 fkc2 = [ 1313 const 1314 for const in t2.constraints 1315 if isinstance(const, ForeignKeyConstraint) 1316 ][0] 1317 eq_(fkc2.info, {"fkcinfo": True}) 1318 1319 ck2 = [ 1320 const 1321 for const in t2.constraints 1322 if isinstance(const, CheckConstraint) 1323 ][0] 1324 eq_(ck2.info, {"ckinfo": True}) 1325 1326 def test_dialect_kwargs(self): 1327 meta = MetaData() 1328 1329 table = Table( 1330 "mytable", 1331 meta, 1332 Column("myid", Integer, primary_key=True), 1333 mysql_engine="InnoDB", 1334 ) 1335 1336 meta2 = MetaData() 1337 table_c = table.to_metadata(meta2) 1338 1339 eq_(table.kwargs, {"mysql_engine": "InnoDB"}) 1340 1341 eq_(table.kwargs, table_c.kwargs) 1342 1343 def test_indexes(self): 1344 meta = MetaData() 1345 1346 table = Table( 1347 "mytable", 1348 meta, 1349 Column("id", Integer, primary_key=True), 1350 Column("data1", Integer, index=True), 1351 Column("data2", Integer), 1352 Index("text", text("data1 + 1")), 1353 ) 1354 Index("multi", table.c.data1, table.c.data2) 1355 Index("func", func.abs(table.c.data1)) 1356 Index("multi-func", table.c.data1, func.abs(table.c.data2)) 1357 1358 meta2 = MetaData() 1359 table_c = table.to_metadata(meta2) 1360 1361 def _get_key(i): 1362 return ( 1363 [i.name, i.unique] 1364 + sorted(i.kwargs.items()) 1365 + [str(col) for col in i.expressions] 1366 ) 1367 1368 eq_( 1369 sorted([_get_key(i) for i in table.indexes]), 1370 sorted([_get_key(i) for i in table_c.indexes]), 1371 ) 1372 1373 def test_indexes_with_col_redefine(self): 1374 meta = MetaData() 1375 1376 table = Table( 1377 "mytable", 1378 meta, 1379 Column("id", Integer, primary_key=True), 1380 Column("data1", Integer), 1381 Column("data2", Integer), 1382 Index("text", text("data1 + 1")), 1383 ) 1384 Index("multi", table.c.data1, table.c.data2) 1385 Index("func", func.abs(table.c.data1)) 1386 Index("multi-func", table.c.data1, func.abs(table.c.data2)) 1387 1388 table = Table( 1389 "mytable", 1390 meta, 1391 Column("data1", Integer), 1392 Column("data2", Integer), 1393 extend_existing=True, 1394 ) 1395 1396 meta2 = MetaData() 1397 table_c = table.to_metadata(meta2) 1398 1399 def _get_key(i): 1400 return ( 1401 [i.name, i.unique] 1402 + sorted(i.kwargs.items()) 1403 + [str(col) for col in i.expressions] 1404 ) 1405 1406 eq_( 1407 sorted([_get_key(i) for i in table.indexes]), 1408 sorted([_get_key(i) for i in table_c.indexes]), 1409 ) 1410 1411 @emits_warning("Table '.+' already exists within the given MetaData") 1412 def test_already_exists(self): 1413 1414 meta1 = MetaData() 1415 table1 = Table( 1416 "mytable", meta1, Column("myid", Integer, primary_key=True) 1417 ) 1418 meta2 = MetaData() 1419 table2 = Table( 1420 "mytable", meta2, Column("yourid", Integer, primary_key=True) 1421 ) 1422 1423 table_c = table1.to_metadata(meta2) 1424 table_d = table2.to_metadata(meta2) 1425 1426 # d'oh! 1427 assert table_c is table_d 1428 1429 def test_default_schema_metadata(self): 1430 meta = MetaData(schema="myschema") 1431 1432 table = Table( 1433 "mytable", 1434 meta, 1435 Column("myid", Integer, primary_key=True), 1436 Column("name", String(40), nullable=True), 1437 Column( 1438 "description", String(30), CheckConstraint("description='hi'") 1439 ), 1440 UniqueConstraint("name"), 1441 ) 1442 1443 table2 = Table( 1444 "othertable", 1445 meta, 1446 Column("id", Integer, primary_key=True), 1447 Column("myid", Integer, ForeignKey("myschema.mytable.myid")), 1448 ) 1449 1450 meta2 = MetaData(schema="someschema") 1451 table_c = table.to_metadata(meta2, schema=None) 1452 table2_c = table2.to_metadata(meta2, schema=None) 1453 1454 eq_( 1455 str(table_c.join(table2_c).onclause), 1456 str(table_c.c.myid == table2_c.c.myid), 1457 ) 1458 eq_( 1459 str(table_c.join(table2_c).onclause), 1460 "someschema.mytable.myid = someschema.othertable.myid", 1461 ) 1462 1463 def test_strip_schema(self): 1464 meta = MetaData() 1465 1466 table = Table( 1467 "mytable", 1468 meta, 1469 Column("myid", Integer, primary_key=True), 1470 Column("name", String(40), nullable=True), 1471 Column( 1472 "description", String(30), CheckConstraint("description='hi'") 1473 ), 1474 UniqueConstraint("name"), 1475 ) 1476 1477 table2 = Table( 1478 "othertable", 1479 meta, 1480 Column("id", Integer, primary_key=True), 1481 Column("myid", Integer, ForeignKey("mytable.myid")), 1482 ) 1483 1484 meta2 = MetaData() 1485 table_c = table.to_metadata(meta2, schema=None) 1486 table2_c = table2.to_metadata(meta2, schema=None) 1487 1488 eq_( 1489 str(table_c.join(table2_c).onclause), 1490 str(table_c.c.myid == table2_c.c.myid), 1491 ) 1492 eq_( 1493 str(table_c.join(table2_c).onclause), 1494 "mytable.myid = othertable.myid", 1495 ) 1496 1497 def test_unique_true_flag(self): 1498 meta = MetaData() 1499 1500 table = Table("mytable", meta, Column("x", Integer, unique=True)) 1501 1502 m2 = MetaData() 1503 1504 t2 = table.to_metadata(m2) 1505 1506 eq_( 1507 len( 1508 [ 1509 const 1510 for const in t2.constraints 1511 if isinstance(const, UniqueConstraint) 1512 ] 1513 ), 1514 1, 1515 ) 1516 1517 def test_index_true_flag(self): 1518 meta = MetaData() 1519 1520 table = Table("mytable", meta, Column("x", Integer, index=True)) 1521 1522 m2 = MetaData() 1523 1524 t2 = table.to_metadata(m2) 1525 1526 eq_(len(t2.indexes), 1) 1527 1528 1529class InfoTest(fixtures.TestBase): 1530 def test_metadata_info(self): 1531 m1 = MetaData() 1532 eq_(m1.info, {}) 1533 1534 m1 = MetaData(info={"foo": "bar"}) 1535 eq_(m1.info, {"foo": "bar"}) 1536 1537 def test_foreignkey_constraint_info(self): 1538 fkc = ForeignKeyConstraint(["a"], ["b"], name="bar") 1539 eq_(fkc.info, {}) 1540 1541 fkc = ForeignKeyConstraint( 1542 ["a"], ["b"], name="bar", info={"foo": "bar"} 1543 ) 1544 eq_(fkc.info, {"foo": "bar"}) 1545 1546 def test_foreignkey_info(self): 1547 fkc = ForeignKey("a") 1548 eq_(fkc.info, {}) 1549 1550 fkc = ForeignKey("a", info={"foo": "bar"}) 1551 eq_(fkc.info, {"foo": "bar"}) 1552 1553 def test_primarykey_constraint_info(self): 1554 pkc = PrimaryKeyConstraint("a", name="x") 1555 eq_(pkc.info, {}) 1556 1557 pkc = PrimaryKeyConstraint("a", name="x", info={"foo": "bar"}) 1558 eq_(pkc.info, {"foo": "bar"}) 1559 1560 def test_unique_constraint_info(self): 1561 uc = UniqueConstraint("a", name="x") 1562 eq_(uc.info, {}) 1563 1564 uc = UniqueConstraint("a", name="x", info={"foo": "bar"}) 1565 eq_(uc.info, {"foo": "bar"}) 1566 1567 def test_check_constraint_info(self): 1568 cc = CheckConstraint("foo=bar", name="x") 1569 eq_(cc.info, {}) 1570 1571 cc = CheckConstraint("foo=bar", name="x", info={"foo": "bar"}) 1572 eq_(cc.info, {"foo": "bar"}) 1573 1574 def test_index_info(self): 1575 ix = Index("x", "a") 1576 eq_(ix.info, {}) 1577 1578 ix = Index("x", "a", info={"foo": "bar"}) 1579 eq_(ix.info, {"foo": "bar"}) 1580 1581 def test_column_info(self): 1582 c = Column("x", Integer) 1583 eq_(c.info, {}) 1584 1585 c = Column("x", Integer, info={"foo": "bar"}) 1586 eq_(c.info, {"foo": "bar"}) 1587 1588 def test_table_info(self): 1589 t = Table("x", MetaData()) 1590 eq_(t.info, {}) 1591 1592 t = Table("x", MetaData(), info={"foo": "bar"}) 1593 eq_(t.info, {"foo": "bar"}) 1594 1595 1596class TableTest(fixtures.TestBase, AssertsCompiledSQL): 1597 @testing.requires.temporary_tables 1598 @testing.skip_if("mssql", "different col format") 1599 def test_prefixes(self): 1600 from sqlalchemy import Table 1601 1602 table1 = Table( 1603 "temporary_table_1", 1604 MetaData(), 1605 Column("col1", Integer), 1606 prefixes=["TEMPORARY"], 1607 ) 1608 1609 self.assert_compile( 1610 schema.CreateTable(table1), 1611 "CREATE TEMPORARY TABLE temporary_table_1 (col1 INTEGER)", 1612 ) 1613 1614 table2 = Table( 1615 "temporary_table_2", 1616 MetaData(), 1617 Column("col1", Integer), 1618 prefixes=["VIRTUAL"], 1619 ) 1620 self.assert_compile( 1621 schema.CreateTable(table2), 1622 "CREATE VIRTUAL TABLE temporary_table_2 (col1 INTEGER)", 1623 ) 1624 1625 @testing.combinations((None, []), ((), []), ([], []), (["foo"], ["foo"])) 1626 def test_prefixes_parameter_parsing(self, arg, expected): 1627 """test #6685""" 1628 table = Table("foo", MetaData(), Column("bar", Integer), prefixes=arg) 1629 eq_(table._prefixes, expected) 1630 1631 def test_table_info(self): 1632 metadata = MetaData() 1633 t1 = Table("foo", metadata, info={"x": "y"}) 1634 t2 = Table("bar", metadata, info={}) 1635 t3 = Table("bat", metadata) 1636 assert t1.info == {"x": "y"} 1637 assert t2.info == {} 1638 assert t3.info == {} 1639 for t in (t1, t2, t3): 1640 t.info["bar"] = "zip" 1641 assert t.info["bar"] == "zip" 1642 1643 def test_invalid_objects(self): 1644 assert_raises_message( 1645 tsa.exc.ArgumentError, 1646 "'SchemaItem' object, such as a 'Column' or a " 1647 "'Constraint' expected, got <.*ColumnClause at .*; q>", 1648 Table, 1649 "asdf", 1650 MetaData(), 1651 tsa.column("q", Integer), 1652 ) 1653 1654 assert_raises_message( 1655 tsa.exc.ArgumentError, 1656 r"'SchemaItem' object, such as a 'Column' or a " 1657 r"'Constraint' expected, got String\(\)", 1658 Table, 1659 "asdf", 1660 MetaData(), 1661 String(), 1662 ) 1663 1664 assert_raises_message( 1665 tsa.exc.ArgumentError, 1666 "'SchemaItem' object, such as a 'Column' or a " 1667 "'Constraint' expected, got 12", 1668 Table, 1669 "asdf", 1670 MetaData(), 1671 12, 1672 ) 1673 1674 def test_reset_exported_passes(self): 1675 1676 m = MetaData() 1677 1678 t = Table("t", m, Column("foo", Integer)) 1679 eq_(list(t.c), [t.c.foo]) 1680 1681 t._reset_exported() 1682 1683 eq_(list(t.c), [t.c.foo]) 1684 1685 def test_foreign_key_constraints_collection(self): 1686 metadata = MetaData() 1687 t1 = Table("foo", metadata, Column("a", Integer)) 1688 eq_(t1.foreign_key_constraints, set()) 1689 1690 fk1 = ForeignKey("q.id") 1691 fk2 = ForeignKey("j.id") 1692 fk3 = ForeignKeyConstraint(["b", "c"], ["r.x", "r.y"]) 1693 1694 t1.append_column(Column("b", Integer, fk1)) 1695 eq_(t1.foreign_key_constraints, set([fk1.constraint])) 1696 1697 t1.append_column(Column("c", Integer, fk2)) 1698 eq_(t1.foreign_key_constraints, set([fk1.constraint, fk2.constraint])) 1699 1700 t1.append_constraint(fk3) 1701 eq_( 1702 t1.foreign_key_constraints, 1703 set([fk1.constraint, fk2.constraint, fk3]), 1704 ) 1705 1706 def test_c_immutable(self): 1707 m = MetaData() 1708 t1 = Table("t", m, Column("x", Integer), Column("y", Integer)) 1709 assert_raises(TypeError, t1.c.extend, [Column("z", Integer)]) 1710 1711 def assign(): 1712 t1.c["z"] = Column("z", Integer) 1713 1714 assert_raises(TypeError, assign) 1715 1716 def assign2(): 1717 t1.c.z = Column("z", Integer) 1718 1719 assert_raises(TypeError, assign2) 1720 1721 def test_c_mutate_after_unpickle(self): 1722 m = MetaData() 1723 1724 y = Column("y", Integer) 1725 t1 = Table("t", m, Column("x", Integer), y) 1726 1727 # note we are testing immutable column collection here 1728 t2 = pickle.loads(pickle.dumps(t1)) 1729 z = Column("z", Integer) 1730 g = Column("g", Integer) 1731 t2.append_column(z) 1732 1733 is_(t1.c.contains_column(y), True) 1734 is_(t2.c.contains_column(y), False) 1735 y2 = t2.c.y 1736 is_(t2.c.contains_column(y2), True) 1737 1738 is_(t2.c.contains_column(z), True) 1739 is_(t2.c.contains_column(g), False) 1740 1741 def test_table_ctor_duplicated_column_name(self): 1742 def go(): 1743 return Table( 1744 "t", 1745 MetaData(), 1746 Column("a", Integer), 1747 Column("col", Integer), 1748 Column("col", String), 1749 ) 1750 1751 with testing.expect_deprecated( 1752 "A column with name 'col' is already present in table 't'", 1753 ): 1754 t = go() 1755 is_true(isinstance(t.c.col.type, String)) 1756 # when it will raise 1757 # with testing.expect_raises_message( 1758 # exc.ArgumentError, 1759 # "A column with name 'col' is already present in table 't'", 1760 # ): 1761 # go() 1762 1763 def test_append_column_existing_name(self): 1764 t = Table("t", MetaData(), Column("col", Integer)) 1765 1766 with testing.expect_deprecated( 1767 "A column with name 'col' is already present in table 't'", 1768 ): 1769 t.append_column(Column("col", String)) 1770 is_true(isinstance(t.c.col.type, String)) 1771 # when it will raise 1772 # col = t.c.col 1773 # with testing.expect_raises_message( 1774 # exc.ArgumentError, 1775 # "A column with name 'col' is already present in table 't'", 1776 # ): 1777 # t.append_column(Column("col", String)) 1778 # is_true(t.c.col is col) 1779 1780 def test_append_column_replace_existing(self): 1781 t = Table("t", MetaData(), Column("col", Integer)) 1782 t.append_column(Column("col", String), replace_existing=True) 1783 is_true(isinstance(t.c.col.type, String)) 1784 1785 def test_autoincrement_replace(self): 1786 m = MetaData() 1787 1788 t = Table("t", m, Column("id", Integer, primary_key=True)) 1789 1790 is_(t._autoincrement_column, t.c.id) 1791 1792 t = Table( 1793 "t", 1794 m, 1795 Column("id", Integer, primary_key=True), 1796 extend_existing=True, 1797 ) 1798 is_(t._autoincrement_column, t.c.id) 1799 1800 def test_pk_args_standalone(self): 1801 m = MetaData() 1802 t = Table( 1803 "t", 1804 m, 1805 Column("x", Integer, primary_key=True), 1806 PrimaryKeyConstraint(mssql_clustered=True), 1807 ) 1808 eq_(list(t.primary_key), [t.c.x]) 1809 eq_(t.primary_key.dialect_kwargs, {"mssql_clustered": True}) 1810 1811 def test_pk_cols_sets_flags(self): 1812 m = MetaData() 1813 t = Table( 1814 "t", 1815 m, 1816 Column("x", Integer), 1817 Column("y", Integer), 1818 Column("z", Integer), 1819 PrimaryKeyConstraint("x", "y"), 1820 ) 1821 eq_(t.c.x.primary_key, True) 1822 eq_(t.c.y.primary_key, True) 1823 eq_(t.c.z.primary_key, False) 1824 1825 def test_pk_col_mismatch_one(self): 1826 m = MetaData() 1827 assert_raises_message( 1828 exc.SAWarning, 1829 "Table 't' specifies columns 'x' as primary_key=True, " 1830 "not matching locally specified columns 'q'", 1831 Table, 1832 "t", 1833 m, 1834 Column("x", Integer, primary_key=True), 1835 Column("q", Integer), 1836 PrimaryKeyConstraint("q"), 1837 ) 1838 1839 def test_pk_col_mismatch_two(self): 1840 m = MetaData() 1841 assert_raises_message( 1842 exc.SAWarning, 1843 "Table 't' specifies columns 'a', 'b', 'c' as primary_key=True, " 1844 "not matching locally specified columns 'b', 'c'", 1845 Table, 1846 "t", 1847 m, 1848 Column("a", Integer, primary_key=True), 1849 Column("b", Integer, primary_key=True), 1850 Column("c", Integer, primary_key=True), 1851 PrimaryKeyConstraint("b", "c"), 1852 ) 1853 1854 @testing.emits_warning("Table 't'") 1855 def test_pk_col_mismatch_three(self): 1856 m = MetaData() 1857 t = Table( 1858 "t", 1859 m, 1860 Column("x", Integer, primary_key=True), 1861 Column("q", Integer), 1862 PrimaryKeyConstraint("q"), 1863 ) 1864 eq_(list(t.primary_key), [t.c.q]) 1865 1866 @testing.emits_warning("Table 't'") 1867 def test_pk_col_mismatch_four(self): 1868 m = MetaData() 1869 t = Table( 1870 "t", 1871 m, 1872 Column("a", Integer, primary_key=True), 1873 Column("b", Integer, primary_key=True), 1874 Column("c", Integer, primary_key=True), 1875 PrimaryKeyConstraint("b", "c"), 1876 ) 1877 eq_(list(t.primary_key), [t.c.b, t.c.c]) 1878 1879 def test_pk_always_flips_nullable(self): 1880 m = MetaData() 1881 1882 t1 = Table("t1", m, Column("x", Integer), PrimaryKeyConstraint("x")) 1883 1884 t2 = Table("t2", m, Column("x", Integer, primary_key=True)) 1885 1886 eq_(list(t1.primary_key), [t1.c.x]) 1887 1888 eq_(list(t2.primary_key), [t2.c.x]) 1889 1890 assert t1.c.x.primary_key 1891 assert t2.c.x.primary_key 1892 1893 assert not t2.c.x.nullable 1894 assert not t1.c.x.nullable 1895 1896 def test_pk_can_be_nullable(self): 1897 m = MetaData() 1898 1899 t1 = Table( 1900 "t1", 1901 m, 1902 Column("x", Integer, nullable=True), 1903 PrimaryKeyConstraint("x"), 1904 ) 1905 1906 t2 = Table( 1907 "t2", m, Column("x", Integer, primary_key=True, nullable=True) 1908 ) 1909 1910 eq_(list(t1.primary_key), [t1.c.x]) 1911 1912 eq_(list(t2.primary_key), [t2.c.x]) 1913 1914 assert t1.c.x.primary_key 1915 assert t2.c.x.primary_key 1916 1917 assert t2.c.x.nullable 1918 assert t1.c.x.nullable 1919 1920 def test_must_exist(self): 1921 with testing.expect_raises_message( 1922 exc.InvalidRequestError, "Table 'foo' not defined" 1923 ): 1924 Table("foo", MetaData(), must_exist=True) 1925 1926 @testing.combinations( 1927 ("comment", ("A", "B", "A")), 1928 ("implicit_returning", (True, False, True)), 1929 ("info", ({"A": 1}, {"A": 2}, {"A": 1})), 1930 ) 1931 def test_extend_attributes(self, attrib, attrib_values): 1932 """ 1933 ensure `extend_existing` is compatible with simple attributes 1934 """ 1935 metadata = MetaData() 1936 for counter, _attrib_value in enumerate(attrib_values): 1937 _extend_existing = True if (counter > 0) else False 1938 _kwargs = { 1939 "extend_existing": _extend_existing, 1940 attrib: _attrib_value, 1941 } 1942 table_a = Table( 1943 "a", 1944 metadata, 1945 Column("foo", String, primary_key=True), 1946 **_kwargs 1947 ) 1948 eq_(getattr(table_a, attrib), _attrib_value) 1949 eq_(getattr(metadata.tables["a"], attrib), _attrib_value) 1950 1951 1952class PKAutoIncrementTest(fixtures.TestBase): 1953 def test_multi_integer_no_autoinc(self): 1954 pk = PrimaryKeyConstraint(Column("a", Integer), Column("b", Integer)) 1955 t = Table("t", MetaData()) 1956 t.append_constraint(pk) 1957 1958 is_(pk._autoincrement_column, None) 1959 1960 def test_multi_integer_multi_autoinc(self): 1961 pk = PrimaryKeyConstraint( 1962 Column("a", Integer, autoincrement=True), 1963 Column("b", Integer, autoincrement=True), 1964 ) 1965 t = Table("t", MetaData()) 1966 t.append_constraint(pk) 1967 1968 assert_raises_message( 1969 exc.ArgumentError, 1970 "Only one Column may be marked", 1971 lambda: pk._autoincrement_column, 1972 ) 1973 1974 def test_single_integer_no_autoinc(self): 1975 pk = PrimaryKeyConstraint(Column("a", Integer)) 1976 t = Table("t", MetaData()) 1977 t.append_constraint(pk) 1978 1979 is_(pk._autoincrement_column, pk.columns["a"]) 1980 1981 def test_single_string_no_autoinc(self): 1982 pk = PrimaryKeyConstraint(Column("a", String)) 1983 t = Table("t", MetaData()) 1984 t.append_constraint(pk) 1985 1986 is_(pk._autoincrement_column, None) 1987 1988 def test_single_string_illegal_autoinc(self): 1989 t = Table("t", MetaData(), Column("a", String, autoincrement=True)) 1990 pk = PrimaryKeyConstraint(t.c.a) 1991 t.append_constraint(pk) 1992 1993 assert_raises_message( 1994 exc.ArgumentError, 1995 "Column type VARCHAR on column 't.a'", 1996 lambda: pk._autoincrement_column, 1997 ) 1998 1999 def test_single_integer_default(self): 2000 t = Table( 2001 "t", 2002 MetaData(), 2003 Column("a", Integer, autoincrement=True, default=lambda: 1), 2004 ) 2005 pk = PrimaryKeyConstraint(t.c.a) 2006 t.append_constraint(pk) 2007 2008 is_(pk._autoincrement_column, t.c.a) 2009 2010 def test_single_integer_server_default(self): 2011 # new as of 1.1; now that we have three states for autoincrement, 2012 # if the user puts autoincrement=True with a server_default, trust 2013 # them on it 2014 t = Table( 2015 "t", 2016 MetaData(), 2017 Column( 2018 "a", Integer, autoincrement=True, server_default=func.magic() 2019 ), 2020 ) 2021 pk = PrimaryKeyConstraint(t.c.a) 2022 t.append_constraint(pk) 2023 2024 is_(pk._autoincrement_column, t.c.a) 2025 2026 def test_implicit_autoinc_but_fks(self): 2027 m = MetaData() 2028 Table("t1", m, Column("id", Integer, primary_key=True)) 2029 t2 = Table("t2", MetaData(), Column("a", Integer, ForeignKey("t1.id"))) 2030 pk = PrimaryKeyConstraint(t2.c.a) 2031 t2.append_constraint(pk) 2032 is_(pk._autoincrement_column, None) 2033 2034 def test_explicit_autoinc_but_fks(self): 2035 m = MetaData() 2036 Table("t1", m, Column("id", Integer, primary_key=True)) 2037 t2 = Table( 2038 "t2", 2039 MetaData(), 2040 Column("a", Integer, ForeignKey("t1.id"), autoincrement=True), 2041 ) 2042 pk = PrimaryKeyConstraint(t2.c.a) 2043 t2.append_constraint(pk) 2044 is_(pk._autoincrement_column, t2.c.a) 2045 2046 t3 = Table( 2047 "t3", 2048 MetaData(), 2049 Column( 2050 "a", Integer, ForeignKey("t1.id"), autoincrement="ignore_fk" 2051 ), 2052 ) 2053 pk = PrimaryKeyConstraint(t3.c.a) 2054 t3.append_constraint(pk) 2055 is_(pk._autoincrement_column, t3.c.a) 2056 2057 def test_no_kw_args(self): 2058 with expect_raises_message( 2059 TypeError, 2060 r"Table\(\) takes at least two positional-only arguments", 2061 check_context=False, 2062 ): 2063 Table(name="foo", metadata=MetaData()) 2064 with expect_raises_message( 2065 TypeError, 2066 r"Table\(\) takes at least two positional-only arguments", 2067 check_context=False, 2068 ): 2069 Table("foo", metadata=MetaData()) 2070 2071 2072class SchemaTypeTest(fixtures.TestBase): 2073 __backend__ = True 2074 2075 class TrackEvents(object): 2076 column = None 2077 table = None 2078 evt_targets = () 2079 2080 def _set_table(self, column, table): 2081 super(SchemaTypeTest.TrackEvents, self)._set_table(column, table) 2082 self.column = column 2083 self.table = table 2084 2085 def _on_table_create(self, target, bind, **kw): 2086 super(SchemaTypeTest.TrackEvents, self)._on_table_create( 2087 target, bind, **kw 2088 ) 2089 self.evt_targets += (target,) 2090 2091 def _on_metadata_create(self, target, bind, **kw): 2092 super(SchemaTypeTest.TrackEvents, self)._on_metadata_create( 2093 target, bind, **kw 2094 ) 2095 self.evt_targets += (target,) 2096 2097 # TODO: Enum and Boolean put TypeEngine first. Changing that here 2098 # causes collection-mutate-while-iterated errors in the event system 2099 # since the hooks here call upon the adapted type. Need to figure out 2100 # why Enum and Boolean don't have this problem. 2101 class MyType(TrackEvents, sqltypes.SchemaType, sqltypes.TypeEngine): 2102 pass 2103 2104 class WrapEnum(TrackEvents, Enum): 2105 pass 2106 2107 class WrapBoolean(TrackEvents, Boolean): 2108 pass 2109 2110 class MyTypeWImpl(MyType): 2111 def _gen_dialect_impl(self, dialect): 2112 return self.adapt(SchemaTypeTest.MyTypeImpl) 2113 2114 class MyTypeImpl(MyTypeWImpl): 2115 pass 2116 2117 class MyTypeDecAndSchema(TypeDecorator, sqltypes.SchemaType): 2118 impl = String() 2119 cache_ok = True 2120 2121 evt_targets = () 2122 2123 def __init__(self): 2124 TypeDecorator.__init__(self) 2125 sqltypes.SchemaType.__init__(self) 2126 2127 def _on_table_create(self, target, bind, **kw): 2128 self.evt_targets += (target,) 2129 2130 def _on_metadata_create(self, target, bind, **kw): 2131 self.evt_targets += (target,) 2132 2133 def test_before_parent_attach_plain(self): 2134 typ = self.MyType() 2135 self._test_before_parent_attach(typ) 2136 2137 def test_before_parent_attach_typedec_enclosing_schematype(self): 2138 # additional test for [ticket:2919] as part of test for 2139 # [ticket:3832] 2140 # this also serves as the test for [ticket:6152] 2141 2142 class MySchemaType(sqltypes.TypeEngine, sqltypes.SchemaType): 2143 pass 2144 2145 target_typ = MySchemaType() 2146 2147 class MyType(TypeDecorator): 2148 impl = target_typ 2149 cache_ok = True 2150 2151 typ = MyType() 2152 self._test_before_parent_attach(typ, target_typ) 2153 2154 def test_before_parent_attach_array_enclosing_schematype(self): 2155 # test for [ticket:4141] which is the same idea as [ticket:3832] 2156 # for ARRAY 2157 2158 typ = ARRAY(String) 2159 2160 self._test_before_parent_attach(typ) 2161 2162 def test_before_parent_attach_typedec_of_schematype(self): 2163 class MyType(TypeDecorator, sqltypes.SchemaType): 2164 impl = String 2165 cache_ok = True 2166 2167 typ = MyType() 2168 self._test_before_parent_attach(typ) 2169 2170 def test_before_parent_attach_schematype_of_typedec(self): 2171 class MyType(sqltypes.SchemaType, TypeDecorator): 2172 impl = String 2173 cache_ok = True 2174 2175 typ = MyType() 2176 self._test_before_parent_attach(typ) 2177 2178 def test_before_parent_attach_variant_array_schematype(self): 2179 2180 target = Enum("one", "two", "three") 2181 typ = ARRAY(target).with_variant(String(), "other") 2182 self._test_before_parent_attach(typ, evt_target=target) 2183 2184 def _test_before_parent_attach(self, typ, evt_target=None): 2185 canary = mock.Mock() 2186 2187 if evt_target is None: 2188 evt_target = typ 2189 2190 orig_set_parent = evt_target._set_parent 2191 orig_set_parent_w_dispatch = evt_target._set_parent_with_dispatch 2192 2193 def _set_parent(parent, **kw): 2194 orig_set_parent(parent, **kw) 2195 canary._set_parent(parent) 2196 2197 def _set_parent_w_dispatch(parent): 2198 orig_set_parent_w_dispatch(parent) 2199 canary._set_parent_with_dispatch(parent) 2200 2201 with mock.patch.object(evt_target, "_set_parent", _set_parent): 2202 with mock.patch.object( 2203 evt_target, "_set_parent_with_dispatch", _set_parent_w_dispatch 2204 ): 2205 event.listen(evt_target, "before_parent_attach", canary.go) 2206 2207 c = Column("q", typ) 2208 2209 eq_( 2210 canary.mock_calls, 2211 [ 2212 mock.call.go(evt_target, c), 2213 mock.call._set_parent(c), 2214 mock.call._set_parent_with_dispatch(c), 2215 ], 2216 ) 2217 2218 def test_independent_schema(self): 2219 m = MetaData() 2220 type_ = self.MyType(schema="q") 2221 t1 = Table("x", m, Column("y", type_), schema="z") 2222 eq_(t1.c.y.type.schema, "q") 2223 2224 def test_inherit_schema_from_metadata(self): 2225 """test #6373""" 2226 m = MetaData(schema="q") 2227 type_ = self.MyType(metadata=m) 2228 t1 = Table("x", m, Column("y", type_), schema="z") 2229 eq_(t1.c.y.type.schema, "q") 2230 2231 def test_inherit_schema_from_table_override_metadata(self): 2232 """test #6373""" 2233 m = MetaData(schema="q") 2234 type_ = self.MyType(metadata=m, inherit_schema=True) 2235 t1 = Table("x", m, Column("y", type_), schema="z") 2236 eq_(t1.c.y.type.schema, "z") 2237 2238 def test_inherit_schema_from_metadata_override_explicit(self): 2239 """test #6373""" 2240 m = MetaData(schema="q") 2241 type_ = self.MyType(schema="e", metadata=m) 2242 t1 = Table("x", m, Column("y", type_), schema="z") 2243 eq_(t1.c.y.type.schema, "e") 2244 2245 def test_inherit_schema(self): 2246 m = MetaData() 2247 type_ = self.MyType(schema="q", inherit_schema=True) 2248 t1 = Table("x", m, Column("y", type_), schema="z") 2249 eq_(t1.c.y.type.schema, "z") 2250 2251 def test_independent_schema_enum(self): 2252 m = MetaData() 2253 type_ = sqltypes.Enum("a", schema="q") 2254 t1 = Table("x", m, Column("y", type_), schema="z") 2255 eq_(t1.c.y.type.schema, "q") 2256 2257 def test_inherit_schema_enum(self): 2258 m = MetaData() 2259 type_ = sqltypes.Enum("a", "b", "c", schema="q", inherit_schema=True) 2260 t1 = Table("x", m, Column("y", type_), schema="z") 2261 eq_(t1.c.y.type.schema, "z") 2262 2263 def test_to_metadata_copy_type(self): 2264 m1 = MetaData() 2265 2266 type_ = self.MyType() 2267 t1 = Table("x", m1, Column("y", type_)) 2268 2269 m2 = MetaData() 2270 t2 = t1.to_metadata(m2) 2271 2272 # metadata isn't set 2273 is_(t2.c.y.type.metadata, None) 2274 2275 # our test type sets table, though 2276 is_(t2.c.y.type.table, t2) 2277 2278 def test_to_metadata_copy_decorated(self): 2279 class MyDecorated(TypeDecorator): 2280 impl = self.MyType 2281 cache_ok = True 2282 2283 m1 = MetaData() 2284 2285 type_ = MyDecorated(schema="z") 2286 t1 = Table("x", m1, Column("y", type_)) 2287 2288 m2 = MetaData() 2289 t2 = t1.to_metadata(m2) 2290 eq_(t2.c.y.type.schema, "z") 2291 2292 def test_to_metadata_independent_schema(self): 2293 m1 = MetaData() 2294 2295 type_ = self.MyType() 2296 t1 = Table("x", m1, Column("y", type_)) 2297 2298 m2 = MetaData() 2299 t2 = t1.to_metadata(m2, schema="bar") 2300 2301 eq_(t2.c.y.type.schema, None) 2302 2303 def test_to_metadata_inherit_schema(self): 2304 m1 = MetaData() 2305 2306 type_ = self.MyType(inherit_schema=True) 2307 t1 = Table("x", m1, Column("y", type_)) 2308 2309 m2 = MetaData() 2310 t2 = t1.to_metadata(m2, schema="bar") 2311 2312 eq_(t1.c.y.type.schema, None) 2313 eq_(t2.c.y.type.schema, "bar") 2314 2315 def test_to_metadata_independent_events(self): 2316 m1 = MetaData() 2317 2318 type_ = self.MyType() 2319 t1 = Table("x", m1, Column("y", type_)) 2320 2321 m2 = MetaData() 2322 t2 = t1.to_metadata(m2) 2323 2324 t1.dispatch.before_create(t1, testing.db) 2325 eq_(t1.c.y.type.evt_targets, (t1,)) 2326 eq_(t2.c.y.type.evt_targets, ()) 2327 2328 t2.dispatch.before_create(t2, testing.db) 2329 t2.dispatch.before_create(t2, testing.db) 2330 eq_(t1.c.y.type.evt_targets, (t1,)) 2331 eq_(t2.c.y.type.evt_targets, (t2, t2)) 2332 2333 def test_enum_column_copy_transfers_events(self): 2334 m = MetaData() 2335 2336 type_ = self.WrapEnum("a", "b", "c", name="foo") 2337 y = Column("y", type_) 2338 y_copy = y._copy() 2339 t1 = Table("x", m, y_copy) 2340 2341 is_true(y_copy.type._create_events) 2342 2343 # for PostgreSQL, this will emit CREATE TYPE 2344 m.dispatch.before_create(t1, testing.db) 2345 try: 2346 eq_(t1.c.y.type.evt_targets, (t1,)) 2347 finally: 2348 # do the drop so that PostgreSQL emits DROP TYPE 2349 m.dispatch.after_drop(t1, testing.db) 2350 2351 def test_enum_nonnative_column_copy_transfers_events(self): 2352 m = MetaData() 2353 2354 type_ = self.WrapEnum("a", "b", "c", name="foo", native_enum=False) 2355 y = Column("y", type_) 2356 y_copy = y._copy() 2357 t1 = Table("x", m, y_copy) 2358 2359 is_true(y_copy.type._create_events) 2360 2361 m.dispatch.before_create(t1, testing.db) 2362 eq_(t1.c.y.type.evt_targets, (t1,)) 2363 2364 def test_enum_nonnative_column_copy_transfers_constraintpref(self): 2365 m = MetaData() 2366 2367 type_ = self.WrapEnum( 2368 "a", 2369 "b", 2370 "c", 2371 name="foo", 2372 native_enum=False, 2373 create_constraint=False, 2374 ) 2375 y = Column("y", type_) 2376 y_copy = y._copy() 2377 Table("x", m, y_copy) 2378 2379 is_false(y_copy.type.create_constraint) 2380 2381 def test_boolean_column_copy_transfers_events(self): 2382 m = MetaData() 2383 2384 type_ = self.WrapBoolean() 2385 y = Column("y", type_) 2386 y_copy = y._copy() 2387 Table("x", m, y_copy) 2388 2389 is_true(y_copy.type._create_events) 2390 2391 def test_boolean_nonnative_column_copy_transfers_constraintpref(self): 2392 m = MetaData() 2393 2394 type_ = self.WrapBoolean(create_constraint=False) 2395 y = Column("y", type_) 2396 y_copy = y._copy() 2397 Table("x", m, y_copy) 2398 2399 is_false(y_copy.type.create_constraint) 2400 2401 def test_metadata_dispatch_no_new_impl(self): 2402 m1 = MetaData() 2403 typ = self.MyType(metadata=m1) 2404 m1.dispatch.before_create(m1, testing.db) 2405 eq_(typ.evt_targets, (m1,)) 2406 2407 dialect_impl = typ.dialect_impl(testing.db.dialect) 2408 eq_(dialect_impl.evt_targets, ()) 2409 2410 def test_metadata_dispatch_new_impl(self): 2411 m1 = MetaData() 2412 typ = self.MyTypeWImpl(metadata=m1) 2413 m1.dispatch.before_create(m1, testing.db) 2414 eq_(typ.evt_targets, (m1,)) 2415 2416 dialect_impl = typ.dialect_impl(testing.db.dialect) 2417 eq_(dialect_impl.evt_targets, (m1,)) 2418 2419 def test_table_dispatch_decorator_schematype(self): 2420 m1 = MetaData() 2421 typ = self.MyTypeDecAndSchema() 2422 t1 = Table("t1", m1, Column("x", typ)) 2423 m1.dispatch.before_create(t1, testing.db) 2424 eq_(typ.evt_targets, (t1,)) 2425 2426 def test_table_dispatch_no_new_impl(self): 2427 m1 = MetaData() 2428 typ = self.MyType() 2429 t1 = Table("t1", m1, Column("x", typ)) 2430 m1.dispatch.before_create(t1, testing.db) 2431 eq_(typ.evt_targets, (t1,)) 2432 2433 dialect_impl = typ.dialect_impl(testing.db.dialect) 2434 eq_(dialect_impl.evt_targets, ()) 2435 2436 def test_table_dispatch_new_impl(self): 2437 m1 = MetaData() 2438 typ = self.MyTypeWImpl() 2439 t1 = Table("t1", m1, Column("x", typ)) 2440 m1.dispatch.before_create(t1, testing.db) 2441 eq_(typ.evt_targets, (t1,)) 2442 2443 dialect_impl = typ.dialect_impl(testing.db.dialect) 2444 eq_(dialect_impl.evt_targets, (t1,)) 2445 2446 def test_create_metadata_bound_no_crash(self): 2447 m1 = MetaData() 2448 self.MyType(metadata=m1) 2449 2450 m1.create_all(testing.db) 2451 2452 def test_boolean_constraint_type_doesnt_double(self): 2453 m1 = MetaData() 2454 2455 t1 = Table("x", m1, Column("flag", Boolean(create_constraint=True))) 2456 eq_( 2457 len([c for c in t1.constraints if isinstance(c, CheckConstraint)]), 2458 1, 2459 ) 2460 m2 = MetaData() 2461 t2 = t1.to_metadata(m2) 2462 2463 eq_( 2464 len([c for c in t2.constraints if isinstance(c, CheckConstraint)]), 2465 1, 2466 ) 2467 2468 def test_enum_constraint_type_doesnt_double(self): 2469 m1 = MetaData() 2470 2471 t1 = Table( 2472 "x", 2473 m1, 2474 Column("flag", Enum("a", "b", "c", create_constraint=True)), 2475 ) 2476 eq_( 2477 len([c for c in t1.constraints if isinstance(c, CheckConstraint)]), 2478 1, 2479 ) 2480 m2 = MetaData() 2481 t2 = t1.to_metadata(m2) 2482 2483 eq_( 2484 len([c for c in t2.constraints if isinstance(c, CheckConstraint)]), 2485 1, 2486 ) 2487 2488 2489class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): 2490 def test_default_schema_metadata_fk(self): 2491 m = MetaData(schema="foo") 2492 t1 = Table("t1", m, Column("x", Integer)) 2493 t2 = Table("t2", m, Column("x", Integer, ForeignKey("t1.x"))) 2494 assert t2.c.x.references(t1.c.x) 2495 2496 def test_ad_hoc_schema_equiv_fk(self): 2497 m = MetaData() 2498 t1 = Table("t1", m, Column("x", Integer), schema="foo") 2499 t2 = Table( 2500 "t2", m, Column("x", Integer, ForeignKey("t1.x")), schema="foo" 2501 ) 2502 assert_raises( 2503 exc.NoReferencedTableError, lambda: t2.c.x.references(t1.c.x) 2504 ) 2505 2506 def test_default_schema_metadata_fk_alt_remote(self): 2507 m = MetaData(schema="foo") 2508 t1 = Table("t1", m, Column("x", Integer)) 2509 t2 = Table( 2510 "t2", m, Column("x", Integer, ForeignKey("t1.x")), schema="bar" 2511 ) 2512 assert t2.c.x.references(t1.c.x) 2513 2514 def test_default_schema_metadata_fk_alt_local_raises(self): 2515 m = MetaData(schema="foo") 2516 t1 = Table("t1", m, Column("x", Integer), schema="bar") 2517 t2 = Table("t2", m, Column("x", Integer, ForeignKey("t1.x"))) 2518 assert_raises( 2519 exc.NoReferencedTableError, lambda: t2.c.x.references(t1.c.x) 2520 ) 2521 2522 def test_default_schema_metadata_fk_alt_local(self): 2523 m = MetaData(schema="foo") 2524 t1 = Table("t1", m, Column("x", Integer), schema="bar") 2525 t2 = Table("t2", m, Column("x", Integer, ForeignKey("bar.t1.x"))) 2526 assert t2.c.x.references(t1.c.x) 2527 2528 def test_create_drop_schema(self): 2529 2530 self.assert_compile( 2531 schema.CreateSchema("sa_schema"), "CREATE SCHEMA sa_schema" 2532 ) 2533 self.assert_compile( 2534 schema.DropSchema("sa_schema"), "DROP SCHEMA sa_schema" 2535 ) 2536 self.assert_compile( 2537 schema.DropSchema("sa_schema", cascade=True), 2538 "DROP SCHEMA sa_schema CASCADE", 2539 ) 2540 2541 def test_iteration(self): 2542 metadata = MetaData() 2543 table1 = Table( 2544 "table1", 2545 metadata, 2546 Column("col1", Integer, primary_key=True), 2547 schema="someschema", 2548 ) 2549 table2 = Table( 2550 "table2", 2551 metadata, 2552 Column("col1", Integer, primary_key=True), 2553 Column("col2", Integer, ForeignKey("someschema.table1.col1")), 2554 schema="someschema", 2555 ) 2556 2557 t1 = str(schema.CreateTable(table1).compile(bind=testing.db)) 2558 t2 = str(schema.CreateTable(table2).compile(bind=testing.db)) 2559 if testing.db.dialect.preparer(testing.db.dialect).omit_schema: 2560 assert t1.index("CREATE TABLE table1") > -1 2561 assert t2.index("CREATE TABLE table2") > -1 2562 else: 2563 assert t1.index("CREATE TABLE someschema.table1") > -1 2564 assert t2.index("CREATE TABLE someschema.table2") > -1 2565 2566 2567class UseExistingTest(fixtures.TablesTest): 2568 @classmethod 2569 def define_tables(cls, metadata): 2570 Table( 2571 "users", 2572 metadata, 2573 Column("id", Integer, primary_key=True), 2574 Column("name", String(30)), 2575 ) 2576 2577 @testing.fixture 2578 def existing_meta(self): 2579 meta2 = MetaData() 2580 Table("users", meta2, autoload_with=testing.db) 2581 return meta2 2582 2583 @testing.fixture 2584 def empty_meta(self): 2585 return MetaData() 2586 2587 def test_exception_no_flags(self, existing_meta): 2588 def go(): 2589 Table( 2590 "users", 2591 existing_meta, 2592 Column("name", Unicode), 2593 autoload_with=testing.db, 2594 ) 2595 2596 assert_raises_message( 2597 exc.InvalidRequestError, 2598 "Table 'users' is already defined for this " "MetaData instance.", 2599 go, 2600 ) 2601 2602 def test_keep_plus_existing_raises(self, existing_meta): 2603 assert_raises( 2604 exc.ArgumentError, 2605 Table, 2606 "users", 2607 existing_meta, 2608 keep_existing=True, 2609 extend_existing=True, 2610 ) 2611 2612 def test_keep_existing_no_dupe_constraints(self, empty_meta): 2613 users = Table( 2614 "users", 2615 empty_meta, 2616 Column("id", Integer), 2617 Column("name", Unicode), 2618 UniqueConstraint("name"), 2619 keep_existing=True, 2620 ) 2621 assert "name" in users.c 2622 assert "id" in users.c 2623 eq_(len(users.constraints), 2) 2624 2625 u2 = Table( 2626 "users", 2627 empty_meta, 2628 Column("id", Integer), 2629 Column("name", Unicode), 2630 UniqueConstraint("name"), 2631 keep_existing=True, 2632 ) 2633 eq_(len(u2.constraints), 2) 2634 2635 def test_extend_existing_dupes_constraints(self, empty_meta): 2636 users = Table( 2637 "users", 2638 empty_meta, 2639 Column("id", Integer), 2640 Column("name", Unicode), 2641 UniqueConstraint("name"), 2642 extend_existing=True, 2643 ) 2644 assert "name" in users.c 2645 assert "id" in users.c 2646 eq_(len(users.constraints), 2) 2647 2648 u2 = Table( 2649 "users", 2650 empty_meta, 2651 Column("id", Integer), 2652 Column("name", Unicode), 2653 UniqueConstraint("name"), 2654 extend_existing=True, 2655 ) 2656 # constraint got duped 2657 eq_(len(u2.constraints), 3) 2658 2659 def test_autoload_replace_column(self, empty_meta): 2660 users = Table( 2661 "users", 2662 empty_meta, 2663 Column("name", Unicode), 2664 autoload_with=testing.db, 2665 ) 2666 assert isinstance(users.c.name.type, Unicode) 2667 2668 def test_keep_existing_coltype(self, existing_meta): 2669 users = Table( 2670 "users", 2671 existing_meta, 2672 Column("name", Unicode), 2673 autoload_with=testing.db, 2674 keep_existing=True, 2675 ) 2676 assert not isinstance(users.c.name.type, Unicode) 2677 2678 def test_keep_existing_quote(self, existing_meta): 2679 users = Table( 2680 "users", 2681 existing_meta, 2682 quote=True, 2683 autoload_with=testing.db, 2684 keep_existing=True, 2685 ) 2686 assert not users.name.quote 2687 2688 def test_keep_existing_add_column(self, existing_meta): 2689 users = Table( 2690 "users", 2691 existing_meta, 2692 Column("foo", Integer), 2693 autoload_with=testing.db, 2694 keep_existing=True, 2695 ) 2696 assert "foo" not in users.c 2697 2698 def test_keep_existing_coltype_no_orig(self, empty_meta): 2699 users = Table( 2700 "users", 2701 empty_meta, 2702 Column("name", Unicode), 2703 autoload_with=testing.db, 2704 keep_existing=True, 2705 ) 2706 assert isinstance(users.c.name.type, Unicode) 2707 2708 @testing.skip_if( 2709 lambda: testing.db.dialect.requires_name_normalize, 2710 "test depends on lowercase as case insensitive", 2711 ) 2712 def test_keep_existing_quote_no_orig(self, empty_meta): 2713 users = Table( 2714 "users", 2715 empty_meta, 2716 quote=True, 2717 autoload_with=testing.db, 2718 keep_existing=True, 2719 ) 2720 assert users.name.quote 2721 2722 def test_keep_existing_add_column_no_orig(self, empty_meta): 2723 users = Table( 2724 "users", 2725 empty_meta, 2726 Column("foo", Integer), 2727 autoload_with=testing.db, 2728 keep_existing=True, 2729 ) 2730 assert "foo" in users.c 2731 2732 def test_keep_existing_coltype_no_reflection(self, existing_meta): 2733 users = Table( 2734 "users", existing_meta, Column("name", Unicode), keep_existing=True 2735 ) 2736 assert not isinstance(users.c.name.type, Unicode) 2737 2738 def test_keep_existing_quote_no_reflection(self, existing_meta): 2739 users = Table("users", existing_meta, quote=True, keep_existing=True) 2740 assert not users.name.quote 2741 2742 def test_keep_existing_add_column_no_reflection(self, existing_meta): 2743 users = Table( 2744 "users", existing_meta, Column("foo", Integer), keep_existing=True 2745 ) 2746 assert "foo" not in users.c 2747 2748 def test_extend_existing_coltype(self, existing_meta): 2749 users = Table( 2750 "users", 2751 existing_meta, 2752 Column("name", Unicode), 2753 autoload_with=testing.db, 2754 extend_existing=True, 2755 ) 2756 assert isinstance(users.c.name.type, Unicode) 2757 2758 def test_extend_existing_quote(self, existing_meta): 2759 assert_raises_message( 2760 tsa.exc.ArgumentError, 2761 "Can't redefine 'quote' or 'quote_schema' arguments", 2762 Table, 2763 "users", 2764 existing_meta, 2765 quote=True, 2766 autoload_with=testing.db, 2767 extend_existing=True, 2768 ) 2769 2770 def test_extend_existing_add_column(self, existing_meta): 2771 users = Table( 2772 "users", 2773 existing_meta, 2774 Column("foo", Integer), 2775 autoload_with=testing.db, 2776 extend_existing=True, 2777 ) 2778 assert "foo" in users.c 2779 2780 def test_extend_existing_coltype_no_orig(self, empty_meta): 2781 users = Table( 2782 "users", 2783 empty_meta, 2784 Column("name", Unicode), 2785 autoload_with=testing.db, 2786 extend_existing=True, 2787 ) 2788 assert isinstance(users.c.name.type, Unicode) 2789 2790 @testing.skip_if( 2791 lambda: testing.db.dialect.requires_name_normalize, 2792 "test depends on lowercase as case insensitive", 2793 ) 2794 def test_extend_existing_quote_no_orig(self, empty_meta): 2795 users = Table( 2796 "users", 2797 empty_meta, 2798 quote=True, 2799 autoload_with=testing.db, 2800 extend_existing=True, 2801 ) 2802 assert users.name.quote 2803 2804 def test_extend_existing_add_column_no_orig(self, empty_meta): 2805 users = Table( 2806 "users", 2807 empty_meta, 2808 Column("foo", Integer), 2809 autoload_with=testing.db, 2810 extend_existing=True, 2811 ) 2812 assert "foo" in users.c 2813 2814 def test_extend_existing_coltype_no_reflection(self, existing_meta): 2815 users = Table( 2816 "users", 2817 existing_meta, 2818 Column("name", Unicode), 2819 extend_existing=True, 2820 ) 2821 assert isinstance(users.c.name.type, Unicode) 2822 2823 def test_extend_existing_quote_no_reflection(self, existing_meta): 2824 assert_raises_message( 2825 tsa.exc.ArgumentError, 2826 "Can't redefine 'quote' or 'quote_schema' arguments", 2827 Table, 2828 "users", 2829 existing_meta, 2830 quote=True, 2831 extend_existing=True, 2832 ) 2833 2834 def test_extend_existing_add_column_no_reflection(self, existing_meta): 2835 users = Table( 2836 "users", 2837 existing_meta, 2838 Column("foo", Integer), 2839 extend_existing=True, 2840 ) 2841 assert "foo" in users.c 2842 2843 2844class ConstraintTest(fixtures.TestBase): 2845 def _single_fixture(self): 2846 m = MetaData() 2847 2848 t1 = Table("t1", m, Column("a", Integer), Column("b", Integer)) 2849 2850 t2 = Table("t2", m, Column("a", Integer, ForeignKey("t1.a"))) 2851 2852 t3 = Table("t3", m, Column("a", Integer)) 2853 return t1, t2, t3 2854 2855 def _assert_index_col_x(self, t, i, columns=True): 2856 eq_(t.indexes, set([i])) 2857 if columns: 2858 eq_(list(i.columns), [t.c.x]) 2859 else: 2860 eq_(list(i.columns), []) 2861 assert i.table is t 2862 2863 def test_separate_decl_columns(self): 2864 m = MetaData() 2865 t = Table("t", m, Column("x", Integer)) 2866 i = Index("i", t.c.x) 2867 self._assert_index_col_x(t, i) 2868 2869 def test_separate_decl_columns_functional(self): 2870 m = MetaData() 2871 t = Table("t", m, Column("x", Integer)) 2872 i = Index("i", func.foo(t.c.x)) 2873 self._assert_index_col_x(t, i) 2874 2875 def test_index_no_cols_private_table_arg(self): 2876 m = MetaData() 2877 t = Table("t", m, Column("x", Integer)) 2878 i = Index("i", _table=t) 2879 is_(i.table, t) 2880 eq_(list(i.columns), []) 2881 2882 def test_index_w_cols_private_table_arg(self): 2883 m = MetaData() 2884 t = Table("t", m, Column("x", Integer)) 2885 i = Index("i", t.c.x, _table=t) 2886 is_(i.table, t) 2887 2888 eq_(list(i.columns), [t.c.x]) 2889 2890 def test_inline_decl_columns(self): 2891 m = MetaData() 2892 c = Column("x", Integer) 2893 i = Index("i", c) 2894 t = Table("t", m, c, i) 2895 self._assert_index_col_x(t, i) 2896 2897 def test_inline_decl_columns_functional(self): 2898 m = MetaData() 2899 c = Column("x", Integer) 2900 i = Index("i", func.foo(c)) 2901 t = Table("t", m, c, i) 2902 self._assert_index_col_x(t, i) 2903 2904 def test_inline_decl_string(self): 2905 m = MetaData() 2906 i = Index("i", "x") 2907 t = Table("t", m, Column("x", Integer), i) 2908 self._assert_index_col_x(t, i) 2909 2910 def test_inline_decl_textonly(self): 2911 m = MetaData() 2912 i = Index("i", text("foobar(x)")) 2913 t = Table("t", m, Column("x", Integer), i) 2914 self._assert_index_col_x(t, i, columns=False) 2915 2916 def test_separate_decl_textonly(self): 2917 m = MetaData() 2918 i = Index("i", text("foobar(x)")) 2919 t = Table("t", m, Column("x", Integer)) 2920 t.append_constraint(i) 2921 self._assert_index_col_x(t, i, columns=False) 2922 2923 def test_unnamed_column_exception(self): 2924 # this can occur in some declarative situations 2925 c = Column(Integer) 2926 idx = Index("q", c) 2927 m = MetaData() 2928 t = Table("t", m, Column("q")) 2929 assert_raises_message( 2930 exc.ArgumentError, 2931 "Can't add unnamed column to column collection", 2932 t.append_constraint, 2933 idx, 2934 ) 2935 2936 def test_non_attached_col_plus_string_expr(self): 2937 # another one that declarative can lead towards 2938 metadata = MetaData() 2939 2940 t1 = Table("a", metadata, Column("id", Integer)) 2941 2942 c2 = Column("x", Integer) 2943 2944 # if we do it here, no problem 2945 # t1.append_column(c2) 2946 2947 idx = Index("foo", c2, desc("foo")) 2948 2949 t1.append_column(c2) 2950 2951 self._assert_index_col_x(t1, idx, columns=True) 2952 2953 def test_column_associated_w_lowercase_table(self): 2954 from sqlalchemy import table 2955 2956 c = Column("x", Integer) 2957 table("foo", c) 2958 idx = Index("q", c) 2959 is_(idx.table, None) # lower-case-T table doesn't have indexes 2960 2961 def test_clauseelement_extraction_one(self): 2962 t = Table("t", MetaData(), Column("x", Integer), Column("y", Integer)) 2963 2964 class MyThing(object): 2965 def __clause_element__(self): 2966 return t.c.x + 5 2967 2968 idx = Index("foo", MyThing()) 2969 self._assert_index_col_x(t, idx) 2970 2971 def test_clauseelement_extraction_two(self): 2972 t = Table("t", MetaData(), Column("x", Integer), Column("y", Integer)) 2973 2974 class MyThing(object): 2975 def __clause_element__(self): 2976 return t.c.x + 5 2977 2978 idx = Index("bar", MyThing(), t.c.y) 2979 2980 eq_(set(t.indexes), set([idx])) 2981 2982 def test_clauseelement_extraction_three(self): 2983 t = Table("t", MetaData(), Column("x", Integer), Column("y", Integer)) 2984 2985 expr1 = t.c.x + 5 2986 2987 class MyThing(object): 2988 def __clause_element__(self): 2989 return expr1 2990 2991 idx = Index("bar", MyThing(), t.c.y) 2992 2993 is_true(idx.expressions[0].compare(expr1)) 2994 is_(idx.expressions[1], t.c.y) 2995 2996 def test_table_references(self): 2997 t1, t2, t3 = self._single_fixture() 2998 assert list(t2.c.a.foreign_keys)[0].references(t1) 2999 assert not list(t2.c.a.foreign_keys)[0].references(t3) 3000 3001 def test_column_references(self): 3002 t1, t2, t3 = self._single_fixture() 3003 assert t2.c.a.references(t1.c.a) 3004 assert not t2.c.a.references(t3.c.a) 3005 assert not t2.c.a.references(t1.c.b) 3006 3007 def test_column_references_derived(self): 3008 t1, t2, t3 = self._single_fixture() 3009 s1 = tsa.select(tsa.select(t1).alias()).subquery() 3010 assert t2.c.a.references(s1.c.a) 3011 assert not t2.c.a.references(s1.c.b) 3012 3013 def test_copy_doesnt_reference(self): 3014 t1, t2, t3 = self._single_fixture() 3015 a2 = t2.c.a._copy() 3016 assert not a2.references(t1.c.a) 3017 assert not a2.references(t1.c.b) 3018 3019 def test_derived_column_references(self): 3020 t1, t2, t3 = self._single_fixture() 3021 s1 = tsa.select(tsa.select(t2).alias()).subquery() 3022 assert s1.c.a.references(t1.c.a) 3023 assert not s1.c.a.references(t1.c.b) 3024 3025 def test_referred_table_accessor(self): 3026 t1, t2, t3 = self._single_fixture() 3027 fkc = list(t2.foreign_key_constraints)[0] 3028 is_(fkc.referred_table, t1) 3029 3030 def test_referred_table_accessor_not_available(self): 3031 t1 = Table("t", MetaData(), Column("x", ForeignKey("q.id"))) 3032 fkc = list(t1.foreign_key_constraints)[0] 3033 assert_raises_message( 3034 exc.InvalidRequestError, 3035 "Foreign key associated with column 't.x' could not find " 3036 "table 'q' with which to generate a foreign key to target " 3037 "column 'id'", 3038 getattr, 3039 fkc, 3040 "referred_table", 3041 ) 3042 3043 def test_related_column_not_present_atfirst_ok(self): 3044 m = MetaData() 3045 base_table = Table("base", m, Column("id", Integer, primary_key=True)) 3046 fk = ForeignKey("base.q") 3047 derived_table = Table( 3048 "derived", m, Column("id", None, fk, primary_key=True) 3049 ) 3050 3051 base_table.append_column(Column("q", Integer)) 3052 assert fk.column is base_table.c.q 3053 assert isinstance(derived_table.c.id.type, Integer) 3054 3055 def test_related_column_not_present_atfirst_ok_onname(self): 3056 m = MetaData() 3057 base_table = Table("base", m, Column("id", Integer, primary_key=True)) 3058 fk = ForeignKey("base.q", link_to_name=True) 3059 derived_table = Table( 3060 "derived", m, Column("id", None, fk, primary_key=True) 3061 ) 3062 3063 base_table.append_column(Column("q", Integer, key="zz")) 3064 assert fk.column is base_table.c.zz 3065 assert isinstance(derived_table.c.id.type, Integer) 3066 3067 def test_related_column_not_present_atfirst_ok_linktoname_conflict(self): 3068 m = MetaData() 3069 base_table = Table("base", m, Column("id", Integer, primary_key=True)) 3070 fk = ForeignKey("base.q", link_to_name=True) 3071 derived_table = Table( 3072 "derived", m, Column("id", None, fk, primary_key=True) 3073 ) 3074 3075 base_table.append_column(Column("zz", Integer, key="q")) 3076 base_table.append_column(Column("q", Integer, key="zz")) 3077 assert fk.column is base_table.c.zz 3078 assert isinstance(derived_table.c.id.type, Integer) 3079 3080 def test_invalid_composite_fk_check_strings(self): 3081 m = MetaData() 3082 3083 assert_raises_message( 3084 exc.ArgumentError, 3085 r"ForeignKeyConstraint on t1\(x, y\) refers to " 3086 "multiple remote tables: t2 and t3", 3087 Table, 3088 "t1", 3089 m, 3090 Column("x", Integer), 3091 Column("y", Integer), 3092 ForeignKeyConstraint(["x", "y"], ["t2.x", "t3.y"]), 3093 ) 3094 3095 def test_invalid_composite_fk_check_columns(self): 3096 m = MetaData() 3097 3098 t2 = Table("t2", m, Column("x", Integer)) 3099 t3 = Table("t3", m, Column("y", Integer)) 3100 3101 assert_raises_message( 3102 exc.ArgumentError, 3103 r"ForeignKeyConstraint on t1\(x, y\) refers to " 3104 "multiple remote tables: t2 and t3", 3105 Table, 3106 "t1", 3107 m, 3108 Column("x", Integer), 3109 Column("y", Integer), 3110 ForeignKeyConstraint(["x", "y"], [t2.c.x, t3.c.y]), 3111 ) 3112 3113 def test_invalid_composite_fk_check_columns_notattached(self): 3114 m = MetaData() 3115 x = Column("x", Integer) 3116 y = Column("y", Integer) 3117 3118 # no error is raised for this one right now. 3119 # which is a minor bug. 3120 Table( 3121 "t1", 3122 m, 3123 Column("x", Integer), 3124 Column("y", Integer), 3125 ForeignKeyConstraint(["x", "y"], [x, y]), 3126 ) 3127 3128 Table("t2", m, x) 3129 Table("t3", m, y) 3130 3131 def test_constraint_copied_to_proxy_ok(self): 3132 m = MetaData() 3133 Table("t1", m, Column("id", Integer, primary_key=True)) 3134 t2 = Table( 3135 "t2", 3136 m, 3137 Column("id", Integer, ForeignKey("t1.id"), primary_key=True), 3138 ) 3139 3140 s = tsa.select(t2).subquery() 3141 t2fk = list(t2.c.id.foreign_keys)[0] 3142 sfk = list(s.c.id.foreign_keys)[0] 3143 3144 # the two FKs share the ForeignKeyConstraint 3145 is_(t2fk.constraint, sfk.constraint) 3146 3147 # but the ForeignKeyConstraint isn't 3148 # aware of the select's FK 3149 eq_(t2fk.constraint.elements, [t2fk]) 3150 3151 def test_type_propagate_composite_fk_string(self): 3152 metadata = MetaData() 3153 Table( 3154 "a", 3155 metadata, 3156 Column("key1", Integer, primary_key=True), 3157 Column("key2", String(40), primary_key=True), 3158 ) 3159 3160 b = Table( 3161 "b", 3162 metadata, 3163 Column("a_key1", None), 3164 Column("a_key2", None), 3165 Column("id", Integer, primary_key=True), 3166 ForeignKeyConstraint(["a_key1", "a_key2"], ["a.key1", "a.key2"]), 3167 ) 3168 3169 assert isinstance(b.c.a_key1.type, Integer) 3170 assert isinstance(b.c.a_key2.type, String) 3171 3172 def test_type_propagate_composite_fk_col(self): 3173 metadata = MetaData() 3174 a = Table( 3175 "a", 3176 metadata, 3177 Column("key1", Integer, primary_key=True), 3178 Column("key2", String(40), primary_key=True), 3179 ) 3180 3181 b = Table( 3182 "b", 3183 metadata, 3184 Column("a_key1", None), 3185 Column("a_key2", None), 3186 Column("id", Integer, primary_key=True), 3187 ForeignKeyConstraint(["a_key1", "a_key2"], [a.c.key1, a.c.key2]), 3188 ) 3189 3190 assert isinstance(b.c.a_key1.type, Integer) 3191 assert isinstance(b.c.a_key2.type, String) 3192 3193 def test_type_propagate_standalone_fk_string(self): 3194 metadata = MetaData() 3195 Table("a", metadata, Column("key1", Integer, primary_key=True)) 3196 3197 b = Table("b", metadata, Column("a_key1", None, ForeignKey("a.key1"))) 3198 3199 assert isinstance(b.c.a_key1.type, Integer) 3200 3201 def test_type_propagate_standalone_fk_col(self): 3202 metadata = MetaData() 3203 a = Table("a", metadata, Column("key1", Integer, primary_key=True)) 3204 3205 b = Table("b", metadata, Column("a_key1", None, ForeignKey(a.c.key1))) 3206 3207 assert isinstance(b.c.a_key1.type, Integer) 3208 3209 def test_type_propagate_chained_string_source_first(self): 3210 metadata = MetaData() 3211 Table("a", metadata, Column("key1", Integer, primary_key=True)) 3212 3213 b = Table("b", metadata, Column("a_key1", None, ForeignKey("a.key1"))) 3214 3215 c = Table( 3216 "c", metadata, Column("b_key1", None, ForeignKey("b.a_key1")) 3217 ) 3218 3219 assert isinstance(b.c.a_key1.type, Integer) 3220 assert isinstance(c.c.b_key1.type, Integer) 3221 3222 def test_type_propagate_chained_string_source_last(self): 3223 metadata = MetaData() 3224 3225 b = Table("b", metadata, Column("a_key1", None, ForeignKey("a.key1"))) 3226 3227 c = Table( 3228 "c", metadata, Column("b_key1", None, ForeignKey("b.a_key1")) 3229 ) 3230 3231 Table("a", metadata, Column("key1", Integer, primary_key=True)) 3232 3233 assert isinstance(b.c.a_key1.type, Integer) 3234 assert isinstance(c.c.b_key1.type, Integer) 3235 3236 def test_type_propagate_chained_string_source_last_onname(self): 3237 metadata = MetaData() 3238 3239 b = Table( 3240 "b", 3241 metadata, 3242 Column( 3243 "a_key1", 3244 None, 3245 ForeignKey("a.key1", link_to_name=True), 3246 key="ak1", 3247 ), 3248 ) 3249 3250 c = Table( 3251 "c", 3252 metadata, 3253 Column( 3254 "b_key1", 3255 None, 3256 ForeignKey("b.a_key1", link_to_name=True), 3257 key="bk1", 3258 ), 3259 ) 3260 3261 Table( 3262 "a", metadata, Column("key1", Integer, primary_key=True, key="ak1") 3263 ) 3264 3265 assert isinstance(b.c.ak1.type, Integer) 3266 assert isinstance(c.c.bk1.type, Integer) 3267 3268 def test_type_propagate_chained_string_source_last_onname_conflict(self): 3269 metadata = MetaData() 3270 3271 b = Table( 3272 "b", 3273 metadata, 3274 # b.c.key1 -> a.c.key1 -> String 3275 Column( 3276 "ak1", 3277 None, 3278 ForeignKey("a.key1", link_to_name=False), 3279 key="key1", 3280 ), 3281 # b.c.ak1 -> a.c.ak1 -> Integer 3282 Column( 3283 "a_key1", 3284 None, 3285 ForeignKey("a.key1", link_to_name=True), 3286 key="ak1", 3287 ), 3288 ) 3289 3290 c = Table( 3291 "c", 3292 metadata, 3293 # c.c.b_key1 -> b.c.ak1 -> Integer 3294 Column("b_key1", None, ForeignKey("b.ak1", link_to_name=False)), 3295 # c.c.b_ak1 -> b.c.ak1 3296 Column("b_ak1", None, ForeignKey("b.ak1", link_to_name=True)), 3297 ) 3298 3299 Table( 3300 "a", 3301 metadata, 3302 # a.c.key1 3303 Column("ak1", String, key="key1"), 3304 # a.c.ak1 3305 Column("key1", Integer, primary_key=True, key="ak1"), 3306 ) 3307 3308 assert isinstance(b.c.key1.type, String) 3309 assert isinstance(b.c.ak1.type, Integer) 3310 3311 assert isinstance(c.c.b_ak1.type, String) 3312 assert isinstance(c.c.b_key1.type, Integer) 3313 3314 def test_type_propagate_chained_col_orig_first(self): 3315 metadata = MetaData() 3316 a = Table("a", metadata, Column("key1", Integer, primary_key=True)) 3317 3318 b = Table("b", metadata, Column("a_key1", None, ForeignKey(a.c.key1))) 3319 3320 c = Table( 3321 "c", metadata, Column("b_key1", None, ForeignKey(b.c.a_key1)) 3322 ) 3323 3324 assert isinstance(b.c.a_key1.type, Integer) 3325 assert isinstance(c.c.b_key1.type, Integer) 3326 3327 def test_column_accessor_col(self): 3328 c1 = Column("x", Integer) 3329 fk = ForeignKey(c1) 3330 is_(fk.column, c1) 3331 3332 def test_column_accessor_clause_element(self): 3333 c1 = Column("x", Integer) 3334 3335 class CThing(object): 3336 def __init__(self, c): 3337 self.c = c 3338 3339 def __clause_element__(self): 3340 return self.c 3341 3342 fk = ForeignKey(CThing(c1)) 3343 is_(fk.column, c1) 3344 3345 def test_column_accessor_string_no_parent(self): 3346 fk = ForeignKey("sometable.somecol") 3347 assert_raises_message( 3348 exc.InvalidRequestError, 3349 "this ForeignKey object does not yet have a parent " 3350 "Column associated with it.", 3351 getattr, 3352 fk, 3353 "column", 3354 ) 3355 3356 def test_column_accessor_string_no_parent_table(self): 3357 fk = ForeignKey("sometable.somecol") 3358 Column("x", fk) 3359 assert_raises_message( 3360 exc.InvalidRequestError, 3361 "this ForeignKey's parent column is not yet " 3362 "associated with a Table.", 3363 getattr, 3364 fk, 3365 "column", 3366 ) 3367 3368 def test_column_accessor_string_no_target_table(self): 3369 fk = ForeignKey("sometable.somecol") 3370 c1 = Column("x", fk) 3371 Table("t", MetaData(), c1) 3372 assert_raises_message( 3373 exc.NoReferencedTableError, 3374 "Foreign key associated with column 't.x' could not find " 3375 "table 'sometable' with which to generate a " 3376 "foreign key to target column 'somecol'", 3377 getattr, 3378 fk, 3379 "column", 3380 ) 3381 3382 def test_column_accessor_string_no_target_column(self): 3383 fk = ForeignKey("sometable.somecol") 3384 c1 = Column("x", fk) 3385 m = MetaData() 3386 Table("t", m, c1) 3387 Table("sometable", m, Column("notsomecol", Integer)) 3388 assert_raises_message( 3389 exc.NoReferencedColumnError, 3390 "Could not initialize target column for ForeignKey " 3391 "'sometable.somecol' on table 't': " 3392 "table 'sometable' has no column named 'somecol'", 3393 getattr, 3394 fk, 3395 "column", 3396 ) 3397 3398 def test_remove_table_fk_bookkeeping(self): 3399 metadata = MetaData() 3400 fk = ForeignKey("t1.x") 3401 t2 = Table("t2", metadata, Column("y", Integer, fk)) 3402 t3 = Table("t3", metadata, Column("y", Integer, ForeignKey("t1.x"))) 3403 3404 assert t2.key in metadata.tables 3405 assert ("t1", "x") in metadata._fk_memos 3406 3407 metadata.remove(t2) 3408 3409 # key is removed 3410 assert t2.key not in metadata.tables 3411 3412 # the memo for the FK is still there 3413 assert ("t1", "x") in metadata._fk_memos 3414 3415 # fk is not in the collection 3416 assert fk not in metadata._fk_memos[("t1", "x")] 3417 3418 # make the referenced table 3419 t1 = Table("t1", metadata, Column("x", Integer)) 3420 3421 # t2 tells us exactly what's wrong 3422 assert_raises_message( 3423 exc.InvalidRequestError, 3424 "Table t2 is no longer associated with its parent MetaData", 3425 getattr, 3426 fk, 3427 "column", 3428 ) 3429 3430 # t3 is unaffected 3431 assert t3.c.y.references(t1.c.x) 3432 3433 # remove twice OK 3434 metadata.remove(t2) 3435 3436 def test_double_fk_usage_raises(self): 3437 f = ForeignKey("b.id") 3438 3439 Column("x", Integer, f) 3440 assert_raises(exc.InvalidRequestError, Column, "y", Integer, f) 3441 3442 def test_auto_append_constraint(self): 3443 m = MetaData() 3444 3445 t = Table("tbl", m, Column("a", Integer), Column("b", Integer)) 3446 3447 t2 = Table("t2", m, Column("a", Integer), Column("b", Integer)) 3448 3449 for c in ( 3450 UniqueConstraint(t.c.a), 3451 CheckConstraint(t.c.a > 5), 3452 ForeignKeyConstraint([t.c.a], [t2.c.a]), 3453 PrimaryKeyConstraint(t.c.a), 3454 ): 3455 assert c in t.constraints 3456 t.append_constraint(c) 3457 assert c in t.constraints 3458 3459 c = Index("foo", t.c.a) 3460 assert c in t.indexes 3461 3462 def test_auto_append_lowercase_table(self): 3463 from sqlalchemy import table, column 3464 3465 t = table("t", column("a")) 3466 t2 = table("t2", column("a")) 3467 for c in ( 3468 UniqueConstraint(t.c.a), 3469 CheckConstraint(t.c.a > 5), 3470 ForeignKeyConstraint([t.c.a], [t2.c.a]), 3471 PrimaryKeyConstraint(t.c.a), 3472 Index("foo", t.c.a), 3473 ): 3474 assert True 3475 3476 def test_to_metadata_ok(self): 3477 m = MetaData() 3478 3479 t = Table("tbl", m, Column("a", Integer), Column("b", Integer)) 3480 3481 t2 = Table("t2", m, Column("a", Integer), Column("b", Integer)) 3482 3483 UniqueConstraint(t.c.a) 3484 CheckConstraint(t.c.a > 5) 3485 ForeignKeyConstraint([t.c.a], [t2.c.a]) 3486 PrimaryKeyConstraint(t.c.a) 3487 3488 m2 = MetaData() 3489 3490 t3 = t.to_metadata(m2) 3491 3492 eq_(len(t3.constraints), 4) 3493 3494 for c in t3.constraints: 3495 assert c.table is t3 3496 3497 def test_check_constraint_copy(self): 3498 m = MetaData() 3499 t = Table("tbl", m, Column("a", Integer), Column("b", Integer)) 3500 ck = CheckConstraint(t.c.a > 5) 3501 ck2 = ck._copy() 3502 assert ck in t.constraints 3503 assert ck2 not in t.constraints 3504 3505 def test_ambig_check_constraint_auto_append(self): 3506 m = MetaData() 3507 3508 t = Table("tbl", m, Column("a", Integer), Column("b", Integer)) 3509 3510 t2 = Table("t2", m, Column("a", Integer), Column("b", Integer)) 3511 c = CheckConstraint(t.c.a > t2.c.b) 3512 assert c not in t.constraints 3513 assert c not in t2.constraints 3514 3515 def test_auto_append_ck_on_col_attach_one(self): 3516 m = MetaData() 3517 3518 a = Column("a", Integer) 3519 b = Column("b", Integer) 3520 ck = CheckConstraint(a > b) 3521 3522 t = Table("tbl", m, a, b) 3523 assert ck in t.constraints 3524 3525 def test_auto_append_ck_on_col_attach_two(self): 3526 m = MetaData() 3527 3528 a = Column("a", Integer) 3529 b = Column("b", Integer) 3530 c = Column("c", Integer) 3531 ck = CheckConstraint(a > b + c) 3532 3533 t = Table("tbl", m, a) 3534 assert ck not in t.constraints 3535 3536 t.append_column(b) 3537 assert ck not in t.constraints 3538 3539 t.append_column(c) 3540 assert ck in t.constraints 3541 3542 def test_auto_append_ck_on_col_attach_three(self): 3543 m = MetaData() 3544 3545 a = Column("a", Integer) 3546 b = Column("b", Integer) 3547 c = Column("c", Integer) 3548 ck = CheckConstraint(a > b + c) 3549 3550 t = Table("tbl", m, a) 3551 assert ck not in t.constraints 3552 3553 t.append_column(b) 3554 assert ck not in t.constraints 3555 3556 t2 = Table("t2", m) 3557 t2.append_column(c) 3558 3559 # two different tables, so CheckConstraint does nothing. 3560 assert ck not in t.constraints 3561 3562 def test_auto_append_uq_on_col_attach_one(self): 3563 m = MetaData() 3564 3565 a = Column("a", Integer) 3566 b = Column("b", Integer) 3567 uq = UniqueConstraint(a, b) 3568 3569 t = Table("tbl", m, a, b) 3570 assert uq in t.constraints 3571 3572 def test_auto_append_uq_on_col_attach_two(self): 3573 m = MetaData() 3574 3575 a = Column("a", Integer) 3576 b = Column("b", Integer) 3577 c = Column("c", Integer) 3578 uq = UniqueConstraint(a, b, c) 3579 3580 t = Table("tbl", m, a) 3581 assert uq not in t.constraints 3582 3583 t.append_column(b) 3584 assert uq not in t.constraints 3585 3586 t.append_column(c) 3587 assert uq in t.constraints 3588 3589 def test_auto_append_uq_on_col_attach_three(self): 3590 m = MetaData() 3591 3592 a = Column("a", Integer) 3593 b = Column("b", Integer) 3594 c = Column("c", Integer) 3595 uq = UniqueConstraint(a, b, c) 3596 3597 t = Table("tbl", m, a) 3598 assert uq not in t.constraints 3599 3600 t.append_column(b) 3601 assert uq not in t.constraints 3602 3603 t2 = Table("t2", m) 3604 3605 # two different tables, so UniqueConstraint raises 3606 assert_raises_message( 3607 exc.ArgumentError, 3608 r"Column\(s\) 't2\.c' are not part of table 'tbl'\.", 3609 t2.append_column, 3610 c, 3611 ) 3612 3613 def test_auto_append_uq_on_col_attach_four(self): 3614 """Test that a uniqueconstraint that names Column and string names 3615 won't autoattach using deferred column attachment. 3616 3617 """ 3618 m = MetaData() 3619 3620 a = Column("a", Integer) 3621 b = Column("b", Integer) 3622 c = Column("c", Integer) 3623 uq = UniqueConstraint(a, "b", "c") 3624 3625 t = Table("tbl", m, a) 3626 assert uq not in t.constraints 3627 3628 t.append_column(b) 3629 assert uq not in t.constraints 3630 3631 t.append_column(c) 3632 3633 # we don't track events for previously unknown columns 3634 # named 'c' to be attached 3635 assert uq not in t.constraints 3636 3637 t.append_constraint(uq) 3638 3639 assert uq in t.constraints 3640 3641 eq_( 3642 [cn for cn in t.constraints if isinstance(cn, UniqueConstraint)], 3643 [uq], 3644 ) 3645 3646 def test_auto_append_uq_on_col_attach_five(self): 3647 """Test that a uniqueconstraint that names Column and string names 3648 *will* autoattach if the table has all those names up front. 3649 3650 """ 3651 m = MetaData() 3652 3653 a = Column("a", Integer) 3654 b = Column("b", Integer) 3655 c = Column("c", Integer) 3656 3657 t = Table("tbl", m, a, c, b) 3658 3659 uq = UniqueConstraint(a, "b", "c") 3660 3661 assert uq in t.constraints 3662 3663 t.append_constraint(uq) 3664 3665 assert uq in t.constraints 3666 3667 eq_( 3668 [cn for cn in t.constraints if isinstance(cn, UniqueConstraint)], 3669 [uq], 3670 ) 3671 3672 def test_index_asserts_cols_standalone(self): 3673 metadata = MetaData() 3674 3675 t1 = Table("t1", metadata, Column("x", Integer)) 3676 t2 = Table("t2", metadata, Column("y", Integer)) 3677 assert_raises_message( 3678 exc.ArgumentError, 3679 r"Column\(s\) 't2.y' are not part of table 't1'.", 3680 Index, 3681 "bar", 3682 t1.c.x, 3683 t2.c.y, 3684 ) 3685 3686 def test_index_asserts_cols_inline(self): 3687 metadata = MetaData() 3688 3689 t1 = Table("t1", metadata, Column("x", Integer)) 3690 assert_raises_message( 3691 exc.ArgumentError, 3692 "Index 'bar' is against table 't1', and " 3693 "cannot be associated with table 't2'.", 3694 Table, 3695 "t2", 3696 metadata, 3697 Column("y", Integer), 3698 Index("bar", t1.c.x), 3699 ) 3700 3701 def test_raise_index_nonexistent_name(self): 3702 m = MetaData() 3703 # the KeyError isn't ideal here, a nicer message 3704 # perhaps 3705 assert_raises( 3706 KeyError, Table, "t", m, Column("x", Integer), Index("foo", "q") 3707 ) 3708 3709 def test_raise_not_a_column(self): 3710 assert_raises(exc.ArgumentError, Index, "foo", 5) 3711 3712 def test_raise_expr_no_column(self): 3713 idx = Index("foo", func.lower(5)) 3714 3715 assert_raises_message( 3716 exc.CompileError, 3717 "Index 'foo' is not associated with any table.", 3718 schema.CreateIndex(idx).compile, 3719 dialect=testing.db.dialect, 3720 ) 3721 assert_raises_message( 3722 exc.CompileError, 3723 "Index 'foo' is not associated with any table.", 3724 schema.CreateIndex(idx).compile, 3725 ) 3726 3727 def test_no_warning_w_no_columns(self): 3728 idx = Index(name="foo") 3729 3730 assert_raises_message( 3731 exc.CompileError, 3732 "Index 'foo' is not associated with any table.", 3733 schema.CreateIndex(idx).compile, 3734 dialect=testing.db.dialect, 3735 ) 3736 assert_raises_message( 3737 exc.CompileError, 3738 "Index 'foo' is not associated with any table.", 3739 schema.CreateIndex(idx).compile, 3740 ) 3741 3742 def test_raise_clauseelement_not_a_column(self): 3743 m = MetaData() 3744 t2 = Table("t2", m, Column("x", Integer)) 3745 3746 class SomeClass(object): 3747 def __clause_element__(self): 3748 return t2 3749 3750 assert_raises_message( 3751 exc.ArgumentError, 3752 r"String column name or column expression for DDL constraint " 3753 r"expected, got .*SomeClass", 3754 Index, 3755 "foo", 3756 SomeClass(), 3757 ) 3758 3759 @testing.fixture 3760 def no_pickle_annotated(self): 3761 class NoPickle(object): 3762 def __reduce__(self): 3763 raise NotImplementedError() 3764 3765 class ClauseElement(operators.ColumnOperators): 3766 def __init__(self, col): 3767 self.col = col._annotate({"bar": NoPickle()}) 3768 3769 def __clause_element__(self): 3770 return self.col 3771 3772 def operate(self, op, *other, **kwargs): 3773 return self.col.operate(op, *other, **kwargs) 3774 3775 m = MetaData() 3776 t = Table("t", m, Column("q", Integer)) 3777 return t, ClauseElement(t.c.q) 3778 3779 def test_pickle_fk_annotated_col(self, no_pickle_annotated): 3780 3781 t, q_col = no_pickle_annotated 3782 3783 t2 = Table("t2", t.metadata, Column("p", ForeignKey(q_col))) 3784 assert t2.c.p.references(t.c.q) 3785 3786 m2 = pickle.loads(pickle.dumps(t.metadata)) 3787 3788 m2_t, m2_t2 = m2.tables["t"], m2.tables["t2"] 3789 3790 is_true(m2_t2.c.p.references(m2_t.c.q)) 3791 3792 def test_pickle_uq_annotated_col(self, no_pickle_annotated): 3793 t, q_col = no_pickle_annotated 3794 3795 t.append_constraint(UniqueConstraint(q_col)) 3796 3797 m2 = pickle.loads(pickle.dumps(t.metadata)) 3798 3799 const = [ 3800 c 3801 for c in m2.tables["t"].constraints 3802 if isinstance(c, UniqueConstraint) 3803 ][0] 3804 3805 is_true(const.columns[0].compare(t.c.q)) 3806 3807 def test_pickle_idx_expr_annotated_col(self, no_pickle_annotated): 3808 t, q_col = no_pickle_annotated 3809 3810 expr = q_col > 5 3811 t.append_constraint(Index("conditional_index", expr)) 3812 3813 m2 = pickle.loads(pickle.dumps(t.metadata)) 3814 3815 const = list(m2.tables["t"].indexes)[0] 3816 3817 is_true(const.expressions[0].compare(expr)) 3818 3819 def test_pickle_ck_binary_annotated_col(self, no_pickle_annotated): 3820 t, q_col = no_pickle_annotated 3821 3822 ck = CheckConstraint(q_col > 5) 3823 t.append_constraint(ck) 3824 3825 m2 = pickle.loads(pickle.dumps(t.metadata)) 3826 const = [ 3827 c 3828 for c in m2.tables["t"].constraints 3829 if isinstance(c, CheckConstraint) 3830 ][0] 3831 is_true(const.sqltext.compare(ck.sqltext)) 3832 3833 3834class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): 3835 3836 """Test Column() construction.""" 3837 3838 __dialect__ = "default" 3839 3840 def columns(self): 3841 return [ 3842 Column(Integer), 3843 Column("b", Integer), 3844 Column(Integer), 3845 Column("d", Integer), 3846 Column(Integer, name="e"), 3847 Column(type_=Integer), 3848 Column(Integer()), 3849 Column("h", Integer()), 3850 Column(type_=Integer()), 3851 ] 3852 3853 def test_basic(self): 3854 c = self.columns() 3855 3856 for i, v in ((0, "a"), (2, "c"), (5, "f"), (6, "g"), (8, "i")): 3857 c[i].name = v 3858 c[i].key = v 3859 del i, v 3860 3861 tbl = Table("table", MetaData(), *c) 3862 3863 for i, col in enumerate(tbl.c): 3864 assert col.name == c[i].name 3865 3866 def test_name_none(self): 3867 3868 c = Column(Integer) 3869 assert_raises_message( 3870 exc.ArgumentError, 3871 "Column must be constructed with a non-blank name or assign a " 3872 "non-blank .name ", 3873 Table, 3874 "t", 3875 MetaData(), 3876 c, 3877 ) 3878 3879 def test_name_blank(self): 3880 3881 c = Column("", Integer) 3882 assert_raises_message( 3883 exc.ArgumentError, 3884 "Column must be constructed with a non-blank name or assign a " 3885 "non-blank .name ", 3886 Table, 3887 "t", 3888 MetaData(), 3889 c, 3890 ) 3891 3892 def test_no_shared_column_schema(self): 3893 c = Column("x", Integer) 3894 Table("t", MetaData(), c) 3895 3896 assert_raises_message( 3897 exc.ArgumentError, 3898 "Column object 'x' already assigned to Table 't'", 3899 Table, 3900 "q", 3901 MetaData(), 3902 c, 3903 ) 3904 3905 def test_no_shared_column_sql(self): 3906 c = column("x", Integer) 3907 table("t", c) 3908 3909 assert_raises_message( 3910 exc.ArgumentError, 3911 "column object 'x' already assigned to table 't'", 3912 table, 3913 "q", 3914 c, 3915 ) 3916 3917 def test_incomplete_key(self): 3918 c = Column(Integer) 3919 assert c.name is None 3920 assert c.key is None 3921 3922 c.name = "named" 3923 Table("t", MetaData(), c) 3924 3925 assert c.name == "named" 3926 assert c.name == c.key 3927 3928 def test_unique_index_flags_default_to_none(self): 3929 c = Column(Integer) 3930 eq_(c.unique, None) 3931 eq_(c.index, None) 3932 3933 c = Column("c", Integer, index=True) 3934 eq_(c.unique, None) 3935 eq_(c.index, True) 3936 3937 t = Table("t", MetaData(), c) 3938 eq_(list(t.indexes)[0].unique, False) 3939 3940 c = Column(Integer, unique=True) 3941 eq_(c.unique, True) 3942 eq_(c.index, None) 3943 3944 c = Column("c", Integer, index=True, unique=True) 3945 eq_(c.unique, True) 3946 eq_(c.index, True) 3947 3948 t = Table("t", MetaData(), c) 3949 eq_(list(t.indexes)[0].unique, True) 3950 3951 def test_bogus(self): 3952 assert_raises(exc.ArgumentError, Column, "foo", name="bar") 3953 assert_raises( 3954 exc.ArgumentError, Column, "foo", Integer, type_=Integer() 3955 ) 3956 3957 def test_custom_subclass_proxy(self): 3958 """test proxy generation of a Column subclass, can be compiled.""" 3959 3960 from sqlalchemy.schema import Column 3961 from sqlalchemy.ext.compiler import compiles 3962 from sqlalchemy.sql import select 3963 3964 class MyColumn(Column): 3965 def _constructor(self, name, type_, **kw): 3966 kw["name"] = name 3967 return MyColumn(type_, **kw) 3968 3969 def __init__(self, type_, **kw): 3970 Column.__init__(self, type_, **kw) 3971 3972 def my_goofy_thing(self): 3973 return "hi" 3974 3975 @compiles(MyColumn) 3976 def goofy(element, compiler, **kw): 3977 s = compiler.visit_column(element, **kw) 3978 return s + "-" 3979 3980 id_ = MyColumn(Integer, primary_key=True) 3981 id_.name = "id" 3982 name = MyColumn(String) 3983 name.name = "name" 3984 t1 = Table("foo", MetaData(), id_, name) 3985 3986 # goofy thing 3987 eq_(t1.c.name.my_goofy_thing(), "hi") 3988 3989 # create proxy 3990 s = select(t1.select().alias()) 3991 3992 # proxy has goofy thing 3993 eq_(s.subquery().c.name.my_goofy_thing(), "hi") 3994 3995 # compile works 3996 self.assert_compile( 3997 select(t1.select().alias()), 3998 "SELECT anon_1.id-, anon_1.name- FROM " 3999 "(SELECT foo.id- AS id, foo.name- AS name " 4000 "FROM foo) AS anon_1", 4001 ) 4002 4003 def test_custom_subclass_proxy_typeerror(self): 4004 from sqlalchemy.schema import Column 4005 from sqlalchemy.sql import select 4006 4007 class MyColumn(Column): 4008 def __init__(self, type_, **kw): 4009 Column.__init__(self, type_, **kw) 4010 4011 id_ = MyColumn(Integer, primary_key=True) 4012 id_.name = "id" 4013 name = MyColumn(String) 4014 name.name = "name" 4015 t1 = Table("foo", MetaData(), id_, name) 4016 assert_raises_message( 4017 TypeError, 4018 "Could not create a copy of this <class " 4019 "'test.sql.test_metadata..*MyColumn'> " 4020 "object. Ensure the class includes a _constructor()", 4021 getattr, 4022 select(t1.select().alias()).subquery(), 4023 "c", 4024 ) 4025 4026 def test_custom_create(self): 4027 from sqlalchemy.ext.compiler import compiles, deregister 4028 4029 @compiles(schema.CreateColumn) 4030 def compile_(element, compiler, **kw): 4031 column = element.element 4032 4033 if "special" not in column.info: 4034 return compiler.visit_create_column(element, **kw) 4035 4036 text = "%s SPECIAL DIRECTIVE %s" % ( 4037 column.name, 4038 compiler.type_compiler.process(column.type), 4039 ) 4040 default = compiler.get_column_default_string(column) 4041 if default is not None: 4042 text += " DEFAULT " + default 4043 4044 if not column.nullable: 4045 text += " NOT NULL" 4046 4047 if column.constraints: 4048 text += " ".join( 4049 compiler.process(const) for const in column.constraints 4050 ) 4051 return text 4052 4053 t = Table( 4054 "mytable", 4055 MetaData(), 4056 Column("x", Integer, info={"special": True}, primary_key=True), 4057 Column("y", String(50)), 4058 Column("z", String(20), info={"special": True}), 4059 ) 4060 4061 self.assert_compile( 4062 schema.CreateTable(t), 4063 "CREATE TABLE mytable (x SPECIAL DIRECTIVE INTEGER " 4064 "NOT NULL, y VARCHAR(50), " 4065 "z SPECIAL DIRECTIVE VARCHAR(20), PRIMARY KEY (x))", 4066 ) 4067 4068 deregister(schema.CreateColumn) 4069 4070 4071class ColumnDefaultsTest(fixtures.TestBase): 4072 4073 """test assignment of default fixures to columns""" 4074 4075 def _fixture(self, *arg, **kw): 4076 return Column("x", Integer, *arg, **kw) 4077 4078 def test_server_default_positional(self): 4079 target = schema.DefaultClause("y") 4080 c = self._fixture(target) 4081 assert c.server_default is target 4082 assert target.column is c 4083 4084 def test_onupdate_default_not_server_default_one(self): 4085 target1 = schema.DefaultClause("y") 4086 target2 = schema.DefaultClause("z") 4087 4088 c = self._fixture(server_default=target1, server_onupdate=target2) 4089 eq_(c.server_default.arg, "y") 4090 eq_(c.server_onupdate.arg, "z") 4091 4092 def test_onupdate_default_not_server_default_two(self): 4093 target1 = schema.DefaultClause("y", for_update=True) 4094 target2 = schema.DefaultClause("z", for_update=True) 4095 4096 c = self._fixture(server_default=target1, server_onupdate=target2) 4097 eq_(c.server_default.arg, "y") 4098 eq_(c.server_onupdate.arg, "z") 4099 4100 def test_onupdate_default_not_server_default_three(self): 4101 target1 = schema.DefaultClause("y", for_update=False) 4102 target2 = schema.DefaultClause("z", for_update=True) 4103 4104 c = self._fixture(target1, target2) 4105 eq_(c.server_default.arg, "y") 4106 eq_(c.server_onupdate.arg, "z") 4107 4108 def test_onupdate_default_not_server_default_four(self): 4109 target1 = schema.DefaultClause("y", for_update=False) 4110 4111 c = self._fixture(server_onupdate=target1) 4112 is_(c.server_default, None) 4113 eq_(c.server_onupdate.arg, "y") 4114 4115 def test_server_default_keyword_as_schemaitem(self): 4116 target = schema.DefaultClause("y") 4117 c = self._fixture(server_default=target) 4118 assert c.server_default is target 4119 assert target.column is c 4120 4121 def test_server_default_keyword_as_clause(self): 4122 target = "y" 4123 c = self._fixture(server_default=target) 4124 assert c.server_default.arg == target 4125 assert c.server_default.column is c 4126 4127 def test_server_default_onupdate_positional(self): 4128 target = schema.DefaultClause("y", for_update=True) 4129 c = self._fixture(target) 4130 assert c.server_onupdate is target 4131 assert target.column is c 4132 4133 def test_server_default_onupdate_keyword_as_schemaitem(self): 4134 target = schema.DefaultClause("y", for_update=True) 4135 c = self._fixture(server_onupdate=target) 4136 assert c.server_onupdate is target 4137 assert target.column is c 4138 4139 def test_server_default_onupdate_keyword_as_clause(self): 4140 target = "y" 4141 c = self._fixture(server_onupdate=target) 4142 assert c.server_onupdate.arg == target 4143 assert c.server_onupdate.column is c 4144 4145 def test_column_default_positional(self): 4146 target = schema.ColumnDefault("y") 4147 c = self._fixture(target) 4148 assert c.default is target 4149 assert target.column is c 4150 4151 def test_column_default_keyword_as_schemaitem(self): 4152 target = schema.ColumnDefault("y") 4153 c = self._fixture(default=target) 4154 assert c.default is target 4155 assert target.column is c 4156 4157 def test_column_default_keyword_as_clause(self): 4158 target = "y" 4159 c = self._fixture(default=target) 4160 assert c.default.arg == target 4161 assert c.default.column is c 4162 4163 def test_column_default_onupdate_positional(self): 4164 target = schema.ColumnDefault("y", for_update=True) 4165 c = self._fixture(target) 4166 assert c.onupdate is target 4167 assert target.column is c 4168 4169 def test_column_default_onupdate_keyword_as_schemaitem(self): 4170 target = schema.ColumnDefault("y", for_update=True) 4171 c = self._fixture(onupdate=target) 4172 assert c.onupdate is target 4173 assert target.column is c 4174 4175 def test_column_default_onupdate_keyword_as_clause(self): 4176 target = "y" 4177 c = self._fixture(onupdate=target) 4178 assert c.onupdate.arg == target 4179 assert c.onupdate.column is c 4180 4181 4182class ColumnOptionsTest(fixtures.TestBase): 4183 def test_default_generators(self): 4184 g1, g2 = Sequence("foo_id_seq"), ColumnDefault("f5") 4185 assert Column(String, default=g1).default is g1 4186 assert Column(String, onupdate=g1).onupdate is g1 4187 assert Column(String, default=g2).default is g2 4188 assert Column(String, onupdate=g2).onupdate is g2 4189 4190 def _null_type_no_error(self, col): 4191 c_str = str(schema.CreateColumn(col).compile()) 4192 assert "NULL" in c_str 4193 4194 def _no_name_error(self, col): 4195 assert_raises_message( 4196 exc.ArgumentError, 4197 "Column must be constructed with a non-blank name or " 4198 "assign a non-blank .name", 4199 Table, 4200 "t", 4201 MetaData(), 4202 col, 4203 ) 4204 4205 def _no_error(self, col): 4206 m = MetaData() 4207 Table("bar", m, Column("id", Integer)) 4208 t = Table("t", m, col) 4209 schema.CreateTable(t).compile() 4210 4211 def test_argument_signatures(self): 4212 self._no_name_error(Column()) 4213 self._null_type_no_error(Column("foo")) 4214 self._no_name_error(Column(default="foo")) 4215 4216 self._no_name_error(Column(Sequence("a"))) 4217 self._null_type_no_error(Column("foo", default="foo")) 4218 4219 self._null_type_no_error(Column("foo", Sequence("a"))) 4220 4221 self._no_name_error(Column(ForeignKey("bar.id"))) 4222 4223 self._no_error(Column("foo", ForeignKey("bar.id"))) 4224 4225 self._no_name_error(Column(ForeignKey("bar.id"), default="foo")) 4226 4227 self._no_name_error(Column(ForeignKey("bar.id"), Sequence("a"))) 4228 self._no_error(Column("foo", ForeignKey("bar.id"), default="foo")) 4229 self._no_error(Column("foo", ForeignKey("bar.id"), Sequence("a"))) 4230 4231 def test_column_info(self): 4232 4233 c1 = Column("foo", String, info={"x": "y"}) 4234 c2 = Column("bar", String, info={}) 4235 c3 = Column("bat", String) 4236 assert c1.info == {"x": "y"} 4237 assert c2.info == {} 4238 assert c3.info == {} 4239 4240 for c in (c1, c2, c3): 4241 c.info["bar"] = "zip" 4242 assert c.info["bar"] == "zip" 4243 4244 4245class CatchAllEventsTest(fixtures.RemovesEvents, fixtures.TestBase): 4246 def test_all_events(self): 4247 canary = [] 4248 4249 def before_attach(obj, parent): 4250 canary.append( 4251 "%s->%s" % (obj.__class__.__name__, parent.__class__.__name__) 4252 ) 4253 4254 def after_attach(obj, parent): 4255 canary.append("%s->%s" % (obj.__class__.__name__, parent)) 4256 4257 self.event_listen( 4258 schema.SchemaItem, "before_parent_attach", before_attach 4259 ) 4260 self.event_listen( 4261 schema.SchemaItem, "after_parent_attach", after_attach 4262 ) 4263 4264 m = MetaData() 4265 Table( 4266 "t1", 4267 m, 4268 Column("id", Integer, Sequence("foo_id"), primary_key=True), 4269 Column("bar", String, ForeignKey("t2.id")), 4270 ) 4271 Table("t2", m, Column("id", Integer, primary_key=True)) 4272 4273 eq_( 4274 canary, 4275 [ 4276 "Sequence->Column", 4277 "Sequence->id", 4278 "ForeignKey->Column", 4279 "ForeignKey->bar", 4280 "Table->MetaData", 4281 "PrimaryKeyConstraint->Table", 4282 "PrimaryKeyConstraint->t1", 4283 "Column->Table", 4284 "Column->t1", 4285 "Column->Table", 4286 "Column->t1", 4287 "ForeignKeyConstraint->Table", 4288 "ForeignKeyConstraint->t1", 4289 "Table->MetaData()", 4290 "Table->MetaData", 4291 "PrimaryKeyConstraint->Table", 4292 "PrimaryKeyConstraint->t2", 4293 "Column->Table", 4294 "Column->t2", 4295 "Table->MetaData()", 4296 ], 4297 ) 4298 4299 def test_events_per_constraint(self): 4300 canary = [] 4301 4302 def evt(target): 4303 def before_attach(obj, parent): 4304 canary.append( 4305 "%s->%s" % (target.__name__, parent.__class__.__name__) 4306 ) 4307 4308 def after_attach(obj, parent): 4309 assert hasattr(obj, "name") # so we can change it 4310 canary.append("%s->%s" % (target.__name__, parent)) 4311 4312 self.event_listen(target, "before_parent_attach", before_attach) 4313 self.event_listen(target, "after_parent_attach", after_attach) 4314 4315 for target in [ 4316 schema.ForeignKeyConstraint, 4317 schema.PrimaryKeyConstraint, 4318 schema.UniqueConstraint, 4319 schema.CheckConstraint, 4320 schema.Index, 4321 ]: 4322 evt(target) 4323 4324 m = MetaData() 4325 Table( 4326 "t1", 4327 m, 4328 Column("id", Integer, Sequence("foo_id"), primary_key=True), 4329 Column("bar", String, ForeignKey("t2.id"), index=True), 4330 Column("bat", Integer, unique=True), 4331 ) 4332 Table( 4333 "t2", 4334 m, 4335 Column("id", Integer, primary_key=True), 4336 Column("bar", Integer), 4337 Column("bat", Integer), 4338 CheckConstraint("bar>5"), 4339 UniqueConstraint("bar", "bat"), 4340 Index(None, "bar", "bat"), 4341 ) 4342 eq_( 4343 canary, 4344 [ 4345 "PrimaryKeyConstraint->Table", 4346 "PrimaryKeyConstraint->t1", 4347 "Index->Table", 4348 "Index->t1", 4349 "ForeignKeyConstraint->Table", 4350 "ForeignKeyConstraint->t1", 4351 "UniqueConstraint->Table", 4352 "UniqueConstraint->t1", 4353 "PrimaryKeyConstraint->Table", 4354 "PrimaryKeyConstraint->t2", 4355 "CheckConstraint->Table", 4356 "CheckConstraint->t2", 4357 "UniqueConstraint->Table", 4358 "UniqueConstraint->t2", 4359 "Index->Table", 4360 "Index->t2", 4361 ], 4362 ) 4363 4364 4365class DialectKWArgTest(fixtures.TestBase): 4366 @contextmanager 4367 def _fixture(self): 4368 from sqlalchemy.engine.default import DefaultDialect 4369 4370 class ParticipatingDialect(DefaultDialect): 4371 construct_arguments = [ 4372 (schema.Index, {"x": 5, "y": False, "z_one": None}), 4373 (schema.ForeignKeyConstraint, {"foobar": False}), 4374 ] 4375 4376 class ParticipatingDialect2(DefaultDialect): 4377 construct_arguments = [ 4378 (schema.Index, {"x": 9, "y": True, "pp": "default"}), 4379 (schema.Table, {"*": None}), 4380 ] 4381 4382 class NonParticipatingDialect(DefaultDialect): 4383 construct_arguments = None 4384 4385 def load(dialect_name): 4386 if dialect_name == "participating": 4387 return ParticipatingDialect 4388 elif dialect_name == "participating2": 4389 return ParticipatingDialect2 4390 elif dialect_name == "nonparticipating": 4391 return NonParticipatingDialect 4392 else: 4393 raise exc.NoSuchModuleError("no dialect %r" % dialect_name) 4394 4395 with mock.patch("sqlalchemy.dialects.registry.load", load): 4396 yield 4397 4398 def teardown_test(self): 4399 Index._kw_registry.clear() 4400 4401 def test_participating(self): 4402 with self._fixture(): 4403 idx = Index("a", "b", "c", participating_y=True) 4404 eq_( 4405 idx.dialect_options, 4406 {"participating": {"x": 5, "y": True, "z_one": None}}, 4407 ) 4408 eq_(idx.dialect_kwargs, {"participating_y": True}) 4409 4410 def test_nonparticipating(self): 4411 with self._fixture(): 4412 idx = Index( 4413 "a", "b", "c", nonparticipating_y=True, nonparticipating_q=5 4414 ) 4415 eq_( 4416 idx.dialect_kwargs, 4417 {"nonparticipating_y": True, "nonparticipating_q": 5}, 4418 ) 4419 4420 def test_bad_kwarg_raise(self): 4421 with self._fixture(): 4422 assert_raises_message( 4423 TypeError, 4424 "Additional arguments should be named " 4425 "<dialectname>_<argument>, got 'foobar'", 4426 Index, 4427 "a", 4428 "b", 4429 "c", 4430 foobar=True, 4431 ) 4432 4433 def test_unknown_dialect_warning(self): 4434 with self._fixture(): 4435 with testing.expect_warnings( 4436 "Can't validate argument 'unknown_y'; can't locate " 4437 "any SQLAlchemy dialect named 'unknown'", 4438 ): 4439 Index("a", "b", "c", unknown_y=True) 4440 4441 def test_participating_bad_kw(self): 4442 with self._fixture(): 4443 assert_raises_message( 4444 exc.ArgumentError, 4445 "Argument 'participating_q_p_x' is not accepted by dialect " 4446 "'participating' on behalf of " 4447 "<class 'sqlalchemy.sql.schema.Index'>", 4448 Index, 4449 "a", 4450 "b", 4451 "c", 4452 participating_q_p_x=8, 4453 ) 4454 4455 def test_participating_unknown_schema_item(self): 4456 with self._fixture(): 4457 # the dialect doesn't include UniqueConstraint in 4458 # its registry at all. 4459 assert_raises_message( 4460 exc.ArgumentError, 4461 "Argument 'participating_q_p_x' is not accepted by dialect " 4462 "'participating' on behalf of " 4463 "<class 'sqlalchemy.sql.schema.UniqueConstraint'>", 4464 UniqueConstraint, 4465 "a", 4466 "b", 4467 participating_q_p_x=8, 4468 ) 4469 4470 @testing.emits_warning("Can't validate") 4471 def test_unknown_dialect_warning_still_populates(self): 4472 with self._fixture(): 4473 idx = Index("a", "b", "c", unknown_y=True) 4474 eq_(idx.dialect_kwargs, {"unknown_y": True}) # still populates 4475 4476 @testing.emits_warning("Can't validate") 4477 def test_unknown_dialect_warning_still_populates_multiple(self): 4478 with self._fixture(): 4479 idx = Index( 4480 "a", 4481 "b", 4482 "c", 4483 unknown_y=True, 4484 unknown_z=5, 4485 otherunknown_foo="bar", 4486 participating_y=8, 4487 ) 4488 eq_( 4489 idx.dialect_options, 4490 { 4491 "unknown": {"y": True, "z": 5, "*": None}, 4492 "otherunknown": {"foo": "bar", "*": None}, 4493 "participating": {"x": 5, "y": 8, "z_one": None}, 4494 }, 4495 ) 4496 eq_( 4497 idx.dialect_kwargs, 4498 { 4499 "unknown_z": 5, 4500 "participating_y": 8, 4501 "unknown_y": True, 4502 "otherunknown_foo": "bar", 4503 }, 4504 ) # still populates 4505 4506 def test_combined(self): 4507 with self._fixture(): 4508 idx = Index( 4509 "a", "b", "c", participating_x=7, nonparticipating_y=True 4510 ) 4511 4512 eq_( 4513 idx.dialect_options, 4514 { 4515 "participating": {"y": False, "x": 7, "z_one": None}, 4516 "nonparticipating": {"y": True, "*": None}, 4517 }, 4518 ) 4519 eq_( 4520 idx.dialect_kwargs, 4521 {"participating_x": 7, "nonparticipating_y": True}, 4522 ) 4523 4524 def test_multiple_participating(self): 4525 with self._fixture(): 4526 idx = Index( 4527 "a", 4528 "b", 4529 "c", 4530 participating_x=7, 4531 participating2_x=15, 4532 participating2_y="lazy", 4533 ) 4534 eq_( 4535 idx.dialect_options, 4536 { 4537 "participating": {"x": 7, "y": False, "z_one": None}, 4538 "participating2": {"x": 15, "y": "lazy", "pp": "default"}, 4539 }, 4540 ) 4541 eq_( 4542 idx.dialect_kwargs, 4543 { 4544 "participating_x": 7, 4545 "participating2_x": 15, 4546 "participating2_y": "lazy", 4547 }, 4548 ) 4549 4550 def test_foreign_key_propagate(self): 4551 with self._fixture(): 4552 m = MetaData() 4553 fk = ForeignKey("t2.id", participating_foobar=True) 4554 t = Table("t", m, Column("id", Integer, fk)) 4555 fkc = [ 4556 c for c in t.constraints if isinstance(c, ForeignKeyConstraint) 4557 ][0] 4558 eq_(fkc.dialect_kwargs, {"participating_foobar": True}) 4559 4560 def test_foreign_key_propagate_exceptions_delayed(self): 4561 with self._fixture(): 4562 m = MetaData() 4563 fk = ForeignKey("t2.id", participating_fake=True) 4564 c1 = Column("id", Integer, fk) 4565 assert_raises_message( 4566 exc.ArgumentError, 4567 "Argument 'participating_fake' is not accepted by " 4568 "dialect 'participating' on behalf of " 4569 "<class 'sqlalchemy.sql.schema.ForeignKeyConstraint'>", 4570 Table, 4571 "t", 4572 m, 4573 c1, 4574 ) 4575 4576 def test_wildcard(self): 4577 with self._fixture(): 4578 m = MetaData() 4579 t = Table( 4580 "x", 4581 m, 4582 Column("x", Integer), 4583 participating2_xyz="foo", 4584 participating2_engine="InnoDB", 4585 ) 4586 eq_( 4587 t.dialect_kwargs, 4588 { 4589 "participating2_xyz": "foo", 4590 "participating2_engine": "InnoDB", 4591 }, 4592 ) 4593 4594 def test_uninit_wildcard(self): 4595 with self._fixture(): 4596 m = MetaData() 4597 t = Table("x", m, Column("x", Integer)) 4598 eq_(t.dialect_options["participating2"], {"*": None}) 4599 eq_(t.dialect_kwargs, {}) 4600 4601 def test_not_contains_wildcard(self): 4602 with self._fixture(): 4603 m = MetaData() 4604 t = Table("x", m, Column("x", Integer)) 4605 assert "foobar" not in t.dialect_options["participating2"] 4606 4607 def test_contains_wildcard(self): 4608 with self._fixture(): 4609 m = MetaData() 4610 t = Table("x", m, Column("x", Integer), participating2_foobar=5) 4611 assert "foobar" in t.dialect_options["participating2"] 4612 4613 def test_update(self): 4614 with self._fixture(): 4615 idx = Index("a", "b", "c", participating_x=20) 4616 eq_(idx.dialect_kwargs, {"participating_x": 20}) 4617 idx._validate_dialect_kwargs( 4618 {"participating_x": 25, "participating_z_one": "default"} 4619 ) 4620 eq_( 4621 idx.dialect_options, 4622 {"participating": {"x": 25, "y": False, "z_one": "default"}}, 4623 ) 4624 eq_( 4625 idx.dialect_kwargs, 4626 {"participating_x": 25, "participating_z_one": "default"}, 4627 ) 4628 4629 idx._validate_dialect_kwargs( 4630 {"participating_x": 25, "participating_z_one": "default"} 4631 ) 4632 4633 eq_( 4634 idx.dialect_options, 4635 {"participating": {"x": 25, "y": False, "z_one": "default"}}, 4636 ) 4637 eq_( 4638 idx.dialect_kwargs, 4639 {"participating_x": 25, "participating_z_one": "default"}, 4640 ) 4641 4642 idx._validate_dialect_kwargs( 4643 {"participating_y": True, "participating2_y": "p2y"} 4644 ) 4645 eq_( 4646 idx.dialect_options, 4647 { 4648 "participating": {"x": 25, "y": True, "z_one": "default"}, 4649 "participating2": {"y": "p2y", "pp": "default", "x": 9}, 4650 }, 4651 ) 4652 eq_( 4653 idx.dialect_kwargs, 4654 { 4655 "participating_x": 25, 4656 "participating_y": True, 4657 "participating2_y": "p2y", 4658 "participating_z_one": "default", 4659 }, 4660 ) 4661 4662 def test_key_error_kwargs_no_dialect(self): 4663 with self._fixture(): 4664 idx = Index("a", "b", "c") 4665 assert_raises(KeyError, idx.kwargs.__getitem__, "foo_bar") 4666 4667 def test_key_error_kwargs_no_underscore(self): 4668 with self._fixture(): 4669 idx = Index("a", "b", "c") 4670 assert_raises(KeyError, idx.kwargs.__getitem__, "foobar") 4671 4672 def test_key_error_kwargs_no_argument(self): 4673 with self._fixture(): 4674 idx = Index("a", "b", "c") 4675 assert_raises( 4676 KeyError, idx.kwargs.__getitem__, "participating_asdmfq34098" 4677 ) 4678 4679 assert_raises( 4680 KeyError, 4681 idx.kwargs.__getitem__, 4682 "nonparticipating_asdmfq34098", 4683 ) 4684 4685 def test_key_error_dialect_options(self): 4686 with self._fixture(): 4687 idx = Index("a", "b", "c") 4688 assert_raises( 4689 KeyError, 4690 idx.dialect_options["participating"].__getitem__, 4691 "asdfaso890", 4692 ) 4693 4694 assert_raises( 4695 KeyError, 4696 idx.dialect_options["nonparticipating"].__getitem__, 4697 "asdfaso890", 4698 ) 4699 4700 def test_ad_hoc_participating_via_opt(self): 4701 with self._fixture(): 4702 idx = Index("a", "b", "c") 4703 idx.dialect_options["participating"]["foobar"] = 5 4704 4705 eq_(idx.dialect_options["participating"]["foobar"], 5) 4706 eq_(idx.kwargs["participating_foobar"], 5) 4707 4708 def test_ad_hoc_nonparticipating_via_opt(self): 4709 with self._fixture(): 4710 idx = Index("a", "b", "c") 4711 idx.dialect_options["nonparticipating"]["foobar"] = 5 4712 4713 eq_(idx.dialect_options["nonparticipating"]["foobar"], 5) 4714 eq_(idx.kwargs["nonparticipating_foobar"], 5) 4715 4716 def test_ad_hoc_participating_via_kwargs(self): 4717 with self._fixture(): 4718 idx = Index("a", "b", "c") 4719 idx.kwargs["participating_foobar"] = 5 4720 4721 eq_(idx.dialect_options["participating"]["foobar"], 5) 4722 eq_(idx.kwargs["participating_foobar"], 5) 4723 4724 def test_ad_hoc_nonparticipating_via_kwargs(self): 4725 with self._fixture(): 4726 idx = Index("a", "b", "c") 4727 idx.kwargs["nonparticipating_foobar"] = 5 4728 4729 eq_(idx.dialect_options["nonparticipating"]["foobar"], 5) 4730 eq_(idx.kwargs["nonparticipating_foobar"], 5) 4731 4732 def test_ad_hoc_via_kwargs_invalid_key(self): 4733 with self._fixture(): 4734 idx = Index("a", "b", "c") 4735 assert_raises_message( 4736 exc.ArgumentError, 4737 "Keys must be of the form <dialectname>_<argname>", 4738 idx.kwargs.__setitem__, 4739 "foobar", 4740 5, 4741 ) 4742 4743 def test_ad_hoc_via_kwargs_invalid_dialect(self): 4744 with self._fixture(): 4745 idx = Index("a", "b", "c") 4746 assert_raises_message( 4747 exc.ArgumentError, 4748 "no dialect 'nonexistent'", 4749 idx.kwargs.__setitem__, 4750 "nonexistent_foobar", 4751 5, 4752 ) 4753 4754 def test_add_new_arguments_participating(self): 4755 with self._fixture(): 4756 Index.argument_for("participating", "xyzqpr", False) 4757 4758 idx = Index("a", "b", "c", participating_xyzqpr=True) 4759 4760 eq_(idx.kwargs["participating_xyzqpr"], True) 4761 4762 idx = Index("a", "b", "c") 4763 eq_(idx.dialect_options["participating"]["xyzqpr"], False) 4764 4765 def test_add_new_arguments_participating_no_existing(self): 4766 with self._fixture(): 4767 PrimaryKeyConstraint.argument_for("participating", "xyzqpr", False) 4768 4769 pk = PrimaryKeyConstraint("a", "b", "c", participating_xyzqpr=True) 4770 4771 eq_(pk.kwargs["participating_xyzqpr"], True) 4772 4773 pk = PrimaryKeyConstraint("a", "b", "c") 4774 eq_(pk.dialect_options["participating"]["xyzqpr"], False) 4775 4776 def test_add_new_arguments_nonparticipating(self): 4777 with self._fixture(): 4778 assert_raises_message( 4779 exc.ArgumentError, 4780 "Dialect 'nonparticipating' does have keyword-argument " 4781 "validation and defaults enabled configured", 4782 Index.argument_for, 4783 "nonparticipating", 4784 "xyzqpr", 4785 False, 4786 ) 4787 4788 def test_add_new_arguments_invalid_dialect(self): 4789 with self._fixture(): 4790 assert_raises_message( 4791 exc.ArgumentError, 4792 "no dialect 'nonexistent'", 4793 Index.argument_for, 4794 "nonexistent", 4795 "foobar", 4796 5, 4797 ) 4798 4799 4800class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): 4801 __dialect__ = "default" 4802 4803 def _fixture(self, naming_convention, table_schema=None): 4804 m1 = MetaData(naming_convention=naming_convention) 4805 4806 u1 = Table( 4807 "user", 4808 m1, 4809 Column("id", Integer, primary_key=True), 4810 Column("version", Integer, primary_key=True), 4811 Column("data", String(30)), 4812 Column("Data2", String(30), key="data2"), 4813 Column("Data3", String(30), key="data3"), 4814 schema=table_schema, 4815 ) 4816 4817 return u1 4818 4819 def _colliding_name_fixture(self, naming_convention, id_flags): 4820 m1 = MetaData(naming_convention=naming_convention) 4821 4822 t1 = Table( 4823 "foo", 4824 m1, 4825 Column("id", Integer, **id_flags), 4826 Column("foo_id", Integer), 4827 ) 4828 return t1 4829 4830 def test_colliding_col_label_from_index_flag(self): 4831 t1 = self._colliding_name_fixture( 4832 {"ix": "ix_%(column_0_label)s"}, {"index": True} 4833 ) 4834 4835 idx = list(t1.indexes)[0] 4836 4837 # name is generated up front. alembic really prefers this 4838 eq_(idx.name, "ix_foo_id") 4839 self.assert_compile( 4840 CreateIndex(idx), "CREATE INDEX ix_foo_id ON foo (id)" 4841 ) 4842 4843 def test_colliding_col_label_from_unique_flag(self): 4844 t1 = self._colliding_name_fixture( 4845 {"uq": "uq_%(column_0_label)s"}, {"unique": True} 4846 ) 4847 4848 const = [c for c in t1.constraints if isinstance(c, UniqueConstraint)] 4849 uq = const[0] 4850 4851 # name is generated up front. alembic really prefers this 4852 eq_(uq.name, "uq_foo_id") 4853 4854 self.assert_compile( 4855 AddConstraint(uq), 4856 "ALTER TABLE foo ADD CONSTRAINT uq_foo_id UNIQUE (id)", 4857 ) 4858 4859 def test_colliding_col_label_from_index_obj(self): 4860 t1 = self._colliding_name_fixture({"ix": "ix_%(column_0_label)s"}, {}) 4861 4862 idx = Index(None, t1.c.id) 4863 is_(idx, list(t1.indexes)[0]) 4864 eq_(idx.name, "ix_foo_id") 4865 self.assert_compile( 4866 CreateIndex(idx), "CREATE INDEX ix_foo_id ON foo (id)" 4867 ) 4868 4869 def test_colliding_col_label_from_unique_obj(self): 4870 t1 = self._colliding_name_fixture({"uq": "uq_%(column_0_label)s"}, {}) 4871 uq = UniqueConstraint(t1.c.id) 4872 const = [c for c in t1.constraints if isinstance(c, UniqueConstraint)] 4873 is_(const[0], uq) 4874 eq_(const[0].name, "uq_foo_id") 4875 self.assert_compile( 4876 AddConstraint(const[0]), 4877 "ALTER TABLE foo ADD CONSTRAINT uq_foo_id UNIQUE (id)", 4878 ) 4879 4880 def test_colliding_col_label_from_index_flag_no_conv(self): 4881 t1 = self._colliding_name_fixture({"ck": "foo"}, {"index": True}) 4882 4883 idx = list(t1.indexes)[0] 4884 4885 # this behavior needs to fail, as of #4911 since we are testing it, 4886 # ensure it raises a CompileError. In #4289 we may want to revisit 4887 # this in some way, most likely specifically to Postgresql only. 4888 assert_raises_message( 4889 exc.CompileError, 4890 "CREATE INDEX requires that the index have a name", 4891 CreateIndex(idx).compile, 4892 ) 4893 4894 assert_raises_message( 4895 exc.CompileError, 4896 "DROP INDEX requires that the index have a name", 4897 DropIndex(idx).compile, 4898 ) 4899 4900 def test_colliding_col_label_from_unique_flag_no_conv(self): 4901 t1 = self._colliding_name_fixture({"ck": "foo"}, {"unique": True}) 4902 4903 const = [c for c in t1.constraints if isinstance(c, UniqueConstraint)] 4904 is_(const[0].name, None) 4905 4906 self.assert_compile( 4907 AddConstraint(const[0]), "ALTER TABLE foo ADD UNIQUE (id)" 4908 ) 4909 4910 @testing.combinations( 4911 ("nopk",), 4912 ("column",), 4913 ("constraint",), 4914 ("explicit_name",), 4915 argnames="pktype", 4916 ) 4917 @testing.combinations( 4918 ("pk_%(table_name)s", "pk_t1"), 4919 ("pk_%(column_0_name)s", "pk_x"), 4920 ("pk_%(column_0_N_name)s", "pk_x_y"), 4921 ("pk_%(column_0_N_label)s", "pk_t1_x_t1_y"), 4922 ("%(column_0_name)s", "x"), 4923 ("%(column_0N_name)s", "xy"), 4924 argnames="conv, expected_name", 4925 ) 4926 def test_pk_conventions(self, conv, expected_name, pktype): 4927 m1 = MetaData(naming_convention={"pk": conv}) 4928 4929 if pktype == "column": 4930 t1 = Table( 4931 "t1", 4932 m1, 4933 Column("x", Integer, primary_key=True), 4934 Column("y", Integer, primary_key=True), 4935 ) 4936 elif pktype == "constraint": 4937 t1 = Table( 4938 "t1", 4939 m1, 4940 Column("x", Integer), 4941 Column("y", Integer), 4942 PrimaryKeyConstraint("x", "y"), 4943 ) 4944 elif pktype == "nopk": 4945 t1 = Table( 4946 "t1", 4947 m1, 4948 Column("x", Integer, nullable=False), 4949 Column("y", Integer, nullable=False), 4950 ) 4951 expected_name = None 4952 elif pktype == "explicit_name": 4953 t1 = Table( 4954 "t1", 4955 m1, 4956 Column("x", Integer, primary_key=True), 4957 Column("y", Integer, primary_key=True), 4958 PrimaryKeyConstraint("x", "y", name="myname"), 4959 ) 4960 expected_name = "myname" 4961 4962 if expected_name: 4963 eq_(t1.primary_key.name, expected_name) 4964 4965 if pktype == "nopk": 4966 self.assert_compile( 4967 schema.CreateTable(t1), 4968 "CREATE TABLE t1 (x INTEGER NOT NULL, y INTEGER NOT NULL)", 4969 ) 4970 else: 4971 self.assert_compile( 4972 schema.CreateTable(t1), 4973 "CREATE TABLE t1 (x INTEGER NOT NULL, y INTEGER NOT NULL, " 4974 "CONSTRAINT %s PRIMARY KEY (x, y))" % expected_name, 4975 ) 4976 4977 def test_uq_name(self): 4978 u1 = self._fixture( 4979 naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"} 4980 ) 4981 uq = UniqueConstraint(u1.c.data) 4982 eq_(uq.name, "uq_user_data") 4983 4984 def test_uq_conv_name(self): 4985 u1 = self._fixture( 4986 naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"} 4987 ) 4988 uq = UniqueConstraint(u1.c.data, name=naming.conv("myname")) 4989 self.assert_compile( 4990 schema.AddConstraint(uq), 4991 'ALTER TABLE "user" ADD CONSTRAINT myname UNIQUE (data)', 4992 dialect="default", 4993 ) 4994 4995 def test_uq_defer_name_convention(self): 4996 u1 = self._fixture( 4997 naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"} 4998 ) 4999 uq = UniqueConstraint(u1.c.data, name=naming._NONE_NAME) 5000 self.assert_compile( 5001 schema.AddConstraint(uq), 5002 'ALTER TABLE "user" ADD CONSTRAINT uq_user_data UNIQUE (data)', 5003 dialect="default", 5004 ) 5005 5006 def test_uq_key(self): 5007 u1 = self._fixture( 5008 naming_convention={"uq": "uq_%(table_name)s_%(column_0_key)s"} 5009 ) 5010 uq = UniqueConstraint(u1.c.data, u1.c.data2) 5011 eq_(uq.name, "uq_user_data") 5012 5013 def test_uq_label(self): 5014 u1 = self._fixture( 5015 naming_convention={"uq": "uq_%(table_name)s_%(column_0_label)s"} 5016 ) 5017 uq = UniqueConstraint(u1.c.data, u1.c.data2) 5018 eq_(uq.name, "uq_user_user_data") 5019 5020 def test_uq_allcols_underscore_name(self): 5021 u1 = self._fixture( 5022 naming_convention={"uq": "uq_%(table_name)s_%(column_0_N_name)s"} 5023 ) 5024 uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3) 5025 eq_(uq.name, "uq_user_data_Data2_Data3") 5026 5027 def test_uq_allcols_merged_name(self): 5028 u1 = self._fixture( 5029 naming_convention={"uq": "uq_%(table_name)s_%(column_0N_name)s"} 5030 ) 5031 uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3) 5032 eq_(uq.name, "uq_user_dataData2Data3") 5033 5034 def test_uq_allcols_merged_key(self): 5035 u1 = self._fixture( 5036 naming_convention={"uq": "uq_%(table_name)s_%(column_0N_key)s"} 5037 ) 5038 uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3) 5039 eq_(uq.name, "uq_user_datadata2data3") 5040 5041 def test_uq_allcols_truncated_name(self): 5042 u1 = self._fixture( 5043 naming_convention={"uq": "uq_%(table_name)s_%(column_0N_name)s"} 5044 ) 5045 uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3) 5046 5047 dialect = default.DefaultDialect() 5048 self.assert_compile( 5049 schema.AddConstraint(uq), 5050 'ALTER TABLE "user" ADD ' 5051 'CONSTRAINT "uq_user_dataData2Data3" ' 5052 'UNIQUE (data, "Data2", "Data3")', 5053 dialect=dialect, 5054 ) 5055 5056 dialect.max_identifier_length = 15 5057 self.assert_compile( 5058 schema.AddConstraint(uq), 5059 'ALTER TABLE "user" ADD ' 5060 'CONSTRAINT uq_user_2769 UNIQUE (data, "Data2", "Data3")', 5061 dialect=dialect, 5062 ) 5063 5064 def test_fk_allcols_underscore_name(self): 5065 u1 = self._fixture( 5066 naming_convention={ 5067 "fk": "fk_%(table_name)s_%(column_0_N_name)s_" 5068 "%(referred_table_name)s_%(referred_column_0_N_name)s" 5069 } 5070 ) 5071 5072 m1 = u1.metadata 5073 a1 = Table( 5074 "address", 5075 m1, 5076 Column("id", Integer, primary_key=True), 5077 Column("UserData", String(30), key="user_data"), 5078 Column("UserData2", String(30), key="user_data2"), 5079 Column("UserData3", String(30), key="user_data3"), 5080 ) 5081 fk = ForeignKeyConstraint( 5082 ["user_data", "user_data2", "user_data3"], 5083 ["user.data", "user.data2", "user.data3"], 5084 ) 5085 a1.append_constraint(fk) 5086 self.assert_compile( 5087 schema.AddConstraint(fk), 5088 "ALTER TABLE address ADD CONSTRAINT " 5089 '"fk_address_UserData_UserData2_UserData3_user_data_Data2_Data3" ' 5090 'FOREIGN KEY("UserData", "UserData2", "UserData3") ' 5091 'REFERENCES "user" (data, "Data2", "Data3")', 5092 dialect=default.DefaultDialect(), 5093 ) 5094 5095 def test_fk_allcols_merged_name(self): 5096 u1 = self._fixture( 5097 naming_convention={ 5098 "fk": "fk_%(table_name)s_%(column_0N_name)s_" 5099 "%(referred_table_name)s_%(referred_column_0N_name)s" 5100 } 5101 ) 5102 5103 m1 = u1.metadata 5104 a1 = Table( 5105 "address", 5106 m1, 5107 Column("id", Integer, primary_key=True), 5108 Column("UserData", String(30), key="user_data"), 5109 Column("UserData2", String(30), key="user_data2"), 5110 Column("UserData3", String(30), key="user_data3"), 5111 ) 5112 fk = ForeignKeyConstraint( 5113 ["user_data", "user_data2", "user_data3"], 5114 ["user.data", "user.data2", "user.data3"], 5115 ) 5116 a1.append_constraint(fk) 5117 self.assert_compile( 5118 schema.AddConstraint(fk), 5119 "ALTER TABLE address ADD CONSTRAINT " 5120 '"fk_address_UserDataUserData2UserData3_user_dataData2Data3" ' 5121 'FOREIGN KEY("UserData", "UserData2", "UserData3") ' 5122 'REFERENCES "user" (data, "Data2", "Data3")', 5123 dialect=default.DefaultDialect(), 5124 ) 5125 5126 def test_fk_allcols_truncated_name(self): 5127 u1 = self._fixture( 5128 naming_convention={ 5129 "fk": "fk_%(table_name)s_%(column_0N_name)s_" 5130 "%(referred_table_name)s_%(referred_column_0N_name)s" 5131 } 5132 ) 5133 5134 m1 = u1.metadata 5135 a1 = Table( 5136 "address", 5137 m1, 5138 Column("id", Integer, primary_key=True), 5139 Column("UserData", String(30), key="user_data"), 5140 Column("UserData2", String(30), key="user_data2"), 5141 Column("UserData3", String(30), key="user_data3"), 5142 ) 5143 fk = ForeignKeyConstraint( 5144 ["user_data", "user_data2", "user_data3"], 5145 ["user.data", "user.data2", "user.data3"], 5146 ) 5147 a1.append_constraint(fk) 5148 5149 dialect = default.DefaultDialect() 5150 dialect.max_identifier_length = 15 5151 self.assert_compile( 5152 schema.AddConstraint(fk), 5153 "ALTER TABLE address ADD CONSTRAINT " 5154 "fk_addr_f9ff " 5155 'FOREIGN KEY("UserData", "UserData2", "UserData3") ' 5156 'REFERENCES "user" (data, "Data2", "Data3")', 5157 dialect=dialect, 5158 ) 5159 5160 def test_ix_allcols_truncation(self): 5161 u1 = self._fixture( 5162 naming_convention={"ix": "ix_%(table_name)s_%(column_0N_name)s"} 5163 ) 5164 ix = Index(None, u1.c.data, u1.c.data2, u1.c.data3) 5165 dialect = default.DefaultDialect() 5166 dialect.max_identifier_length = 15 5167 self.assert_compile( 5168 schema.CreateIndex(ix), 5169 "CREATE INDEX ix_user_2de9 ON " '"user" (data, "Data2", "Data3")', 5170 dialect=dialect, 5171 ) 5172 5173 def test_ix_name(self): 5174 u1 = self._fixture( 5175 naming_convention={"ix": "ix_%(table_name)s_%(column_0_name)s"} 5176 ) 5177 ix = Index(None, u1.c.data) 5178 eq_(ix.name, "ix_user_data") 5179 5180 def test_ck_name_required(self): 5181 u1 = self._fixture( 5182 naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} 5183 ) 5184 ck = CheckConstraint(u1.c.data == "x", name="mycheck") 5185 eq_(ck.name, "ck_user_mycheck") 5186 5187 assert_raises_message( 5188 exc.InvalidRequestError, 5189 r"Naming convention including %\(constraint_name\)s token " 5190 "requires that constraint is explicitly named.", 5191 CheckConstraint, 5192 u1.c.data == "x", 5193 ) 5194 5195 def test_ck_name_deferred_required(self): 5196 u1 = self._fixture( 5197 naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} 5198 ) 5199 ck = CheckConstraint(u1.c.data == "x", name=naming._NONE_NAME) 5200 5201 assert_raises_message( 5202 exc.InvalidRequestError, 5203 r"Naming convention including %\(constraint_name\)s token " 5204 "requires that constraint is explicitly named.", 5205 schema.AddConstraint(ck).compile, 5206 ) 5207 5208 def test_column_attached_ck_name(self): 5209 m = MetaData( 5210 naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} 5211 ) 5212 ck = CheckConstraint("x > 5", name="x1") 5213 Table("t", m, Column("x", ck)) 5214 eq_(ck.name, "ck_t_x1") 5215 5216 def test_table_attached_ck_name(self): 5217 m = MetaData( 5218 naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} 5219 ) 5220 ck = CheckConstraint("x > 5", name="x1") 5221 Table("t", m, Column("x", Integer), ck) 5222 eq_(ck.name, "ck_t_x1") 5223 5224 def test_uq_name_already_conv(self): 5225 m = MetaData( 5226 naming_convention={ 5227 "uq": "uq_%(constraint_name)s_%(column_0_name)s" 5228 } 5229 ) 5230 5231 t = Table("mytable", m) 5232 uq = UniqueConstraint(name=naming.conv("my_special_key")) 5233 5234 t.append_constraint(uq) 5235 eq_(uq.name, "my_special_key") 5236 5237 def test_fk_name_schema(self): 5238 u1 = self._fixture( 5239 naming_convention={ 5240 "fk": "fk_%(table_name)s_%(column_0_name)s_" 5241 "%(referred_table_name)s_%(referred_column_0_name)s" 5242 }, 5243 table_schema="foo", 5244 ) 5245 m1 = u1.metadata 5246 a1 = Table( 5247 "address", 5248 m1, 5249 Column("id", Integer, primary_key=True), 5250 Column("user_id", Integer), 5251 Column("user_version_id", Integer), 5252 ) 5253 fk = ForeignKeyConstraint( 5254 ["user_id", "user_version_id"], ["foo.user.id", "foo.user.version"] 5255 ) 5256 a1.append_constraint(fk) 5257 eq_(fk.name, "fk_address_user_id_user_id") 5258 5259 def test_fk_attrs(self): 5260 u1 = self._fixture( 5261 naming_convention={ 5262 "fk": "fk_%(table_name)s_%(column_0_name)s_" 5263 "%(referred_table_name)s_%(referred_column_0_name)s" 5264 } 5265 ) 5266 m1 = u1.metadata 5267 a1 = Table( 5268 "address", 5269 m1, 5270 Column("id", Integer, primary_key=True), 5271 Column("user_id", Integer), 5272 Column("user_version_id", Integer), 5273 ) 5274 fk = ForeignKeyConstraint( 5275 ["user_id", "user_version_id"], ["user.id", "user.version"] 5276 ) 5277 a1.append_constraint(fk) 5278 eq_(fk.name, "fk_address_user_id_user_id") 5279 5280 def test_custom(self): 5281 def key_hash(const, table): 5282 return "HASH_%s" % table.name 5283 5284 u1 = self._fixture( 5285 naming_convention={ 5286 "fk": "fk_%(table_name)s_%(key_hash)s", 5287 "key_hash": key_hash, 5288 } 5289 ) 5290 m1 = u1.metadata 5291 a1 = Table( 5292 "address", 5293 m1, 5294 Column("id", Integer, primary_key=True), 5295 Column("user_id", Integer), 5296 Column("user_version_id", Integer), 5297 ) 5298 fk = ForeignKeyConstraint( 5299 ["user_id", "user_version_id"], ["user.id", "user.version"] 5300 ) 5301 a1.append_constraint(fk) 5302 eq_(fk.name, "fk_address_HASH_address") 5303 5304 def test_schematype_ck_name_boolean(self): 5305 m1 = MetaData( 5306 naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} 5307 ) 5308 5309 u1 = Table( 5310 "user", 5311 m1, 5312 Column("x", Boolean(name="foo", create_constraint=True)), 5313 ) 5314 5315 self.assert_compile( 5316 schema.CreateTable(u1), 5317 'CREATE TABLE "user" (' 5318 "x BOOLEAN, " 5319 "CONSTRAINT ck_user_foo CHECK (x IN (0, 1))" 5320 ")", 5321 ) 5322 5323 # test no side effects from first compile 5324 self.assert_compile( 5325 schema.CreateTable(u1), 5326 'CREATE TABLE "user" (' 5327 "x BOOLEAN, " 5328 "CONSTRAINT ck_user_foo CHECK (x IN (0, 1))" 5329 ")", 5330 ) 5331 5332 def test_schematype_ck_name_boolean_not_on_name(self): 5333 m1 = MetaData( 5334 naming_convention={"ck": "ck_%(table_name)s_%(column_0_name)s"} 5335 ) 5336 5337 u1 = Table("user", m1, Column("x", Boolean(create_constraint=True))) 5338 # constraint is not hit 5339 is_( 5340 [c for c in u1.constraints if isinstance(c, CheckConstraint)][ 5341 0 5342 ].name, 5343 _NONE_NAME, 5344 ) 5345 # but is hit at compile time 5346 self.assert_compile( 5347 schema.CreateTable(u1), 5348 'CREATE TABLE "user" (' 5349 "x BOOLEAN, " 5350 "CONSTRAINT ck_user_x CHECK (x IN (0, 1))" 5351 ")", 5352 ) 5353 5354 def test_schematype_ck_name_enum(self): 5355 m1 = MetaData( 5356 naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} 5357 ) 5358 5359 u1 = Table( 5360 "user", 5361 m1, 5362 Column("x", Enum("a", "b", name="foo", create_constraint=True)), 5363 ) 5364 5365 self.assert_compile( 5366 schema.CreateTable(u1), 5367 'CREATE TABLE "user" (' 5368 "x VARCHAR(1), " 5369 "CONSTRAINT ck_user_foo CHECK (x IN ('a', 'b'))" 5370 ")", 5371 ) 5372 5373 # test no side effects from first compile 5374 self.assert_compile( 5375 schema.CreateTable(u1), 5376 'CREATE TABLE "user" (' 5377 "x VARCHAR(1), " 5378 "CONSTRAINT ck_user_foo CHECK (x IN ('a', 'b'))" 5379 ")", 5380 ) 5381 5382 def test_schematype_ck_name_propagate_conv(self): 5383 m1 = MetaData( 5384 naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} 5385 ) 5386 5387 u1 = Table( 5388 "user", 5389 m1, 5390 Column( 5391 "x", 5392 Enum( 5393 "a", "b", name=naming.conv("foo"), create_constraint=True 5394 ), 5395 ), 5396 ) 5397 eq_( 5398 [c for c in u1.constraints if isinstance(c, CheckConstraint)][ 5399 0 5400 ].name, 5401 "foo", 5402 ) 5403 # but is hit at compile time 5404 self.assert_compile( 5405 schema.CreateTable(u1), 5406 'CREATE TABLE "user" (' 5407 "x VARCHAR(1), " 5408 "CONSTRAINT foo CHECK (x IN ('a', 'b'))" 5409 ")", 5410 ) 5411 5412 def test_schematype_ck_name_boolean_no_name(self): 5413 m1 = MetaData( 5414 naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} 5415 ) 5416 5417 u1 = Table("user", m1, Column("x", Boolean(create_constraint=True))) 5418 # constraint gets special _defer_none_name 5419 is_( 5420 [c for c in u1.constraints if isinstance(c, CheckConstraint)][ 5421 0 5422 ].name, 5423 _NONE_NAME, 5424 ) 5425 # no issue with native boolean 5426 self.assert_compile( 5427 schema.CreateTable(u1), 5428 'CREATE TABLE "user" (' "x BOOLEAN" ")", 5429 dialect="postgresql", 5430 ) 5431 5432 assert_raises_message( 5433 exc.InvalidRequestError, 5434 r"Naming convention including \%\(constraint_name\)s token " 5435 r"requires that constraint is explicitly named.", 5436 schema.CreateTable(u1).compile, 5437 dialect=default.DefaultDialect(), 5438 ) 5439 5440 def test_schematype_no_ck_name_boolean_no_name(self): 5441 m1 = MetaData() # no naming convention 5442 5443 u1 = Table("user", m1, Column("x", Boolean(create_constraint=True))) 5444 # constraint gets special _defer_none_name 5445 is_( 5446 [c for c in u1.constraints if isinstance(c, CheckConstraint)][ 5447 0 5448 ].name, 5449 _NONE_NAME, 5450 ) 5451 5452 self.assert_compile( 5453 schema.CreateTable(u1), 5454 'CREATE TABLE "user" (x BOOLEAN, CHECK (x IN (0, 1)))', 5455 ) 5456 5457 def test_ck_constraint_redundant_event(self): 5458 u1 = self._fixture( 5459 naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} 5460 ) 5461 5462 ck1 = CheckConstraint(u1.c.version > 3, name="foo") 5463 u1.append_constraint(ck1) 5464 u1.append_constraint(ck1) 5465 u1.append_constraint(ck1) 5466 5467 eq_(ck1.name, "ck_user_foo") 5468 5469 def test_pickle_metadata(self): 5470 m = MetaData(naming_convention={"pk": "%(table_name)s_pk"}) 5471 5472 m2 = pickle.loads(pickle.dumps(m)) 5473 5474 eq_(m2.naming_convention, {"pk": "%(table_name)s_pk"}) 5475 5476 t2a = Table("t2", m, Column("id", Integer, primary_key=True)) 5477 t2b = Table("t2", m2, Column("id", Integer, primary_key=True)) 5478 5479 eq_(t2a.primary_key.name, t2b.primary_key.name) 5480 eq_(t2b.primary_key.name, "t2_pk") 5481 5482 def test_expression_index(self): 5483 m = MetaData(naming_convention={"ix": "ix_%(column_0_label)s"}) 5484 t = Table("t", m, Column("q", Integer), Column("p", Integer)) 5485 ix = Index(None, t.c.q + 5) 5486 t.append_constraint(ix) 5487 5488 # huh. pretty cool 5489 self.assert_compile( 5490 CreateIndex(ix), "CREATE INDEX ix_t_q ON t (q + 5)" 5491 ) 5492 5493 5494class CopyDialectOptionsTest(fixtures.TestBase): 5495 @contextmanager 5496 def _fixture(self): 5497 from sqlalchemy.engine.default import DefaultDialect 5498 5499 class CopyDialectOptionsTestDialect(DefaultDialect): 5500 construct_arguments = [ 5501 (Table, {"some_table_arg": None}), 5502 (Column, {"some_column_arg": None}), 5503 (Index, {"some_index_arg": None}), 5504 (PrimaryKeyConstraint, {"some_pk_arg": None}), 5505 (UniqueConstraint, {"some_uq_arg": None}), 5506 ] 5507 5508 def load(dialect_name): 5509 if dialect_name == "copydialectoptionstest": 5510 return CopyDialectOptionsTestDialect 5511 else: 5512 raise exc.NoSuchModuleError("no dialect %r" % dialect_name) 5513 5514 with mock.patch("sqlalchemy.dialects.registry.load", load): 5515 yield 5516 5517 @classmethod 5518 def check_dialect_options_(cls, t): 5519 eq_( 5520 t.dialect_kwargs["copydialectoptionstest_some_table_arg"], 5521 "a1", 5522 ) 5523 eq_( 5524 t.c.foo.dialect_kwargs["copydialectoptionstest_some_column_arg"], 5525 "a2", 5526 ) 5527 eq_( 5528 t.primary_key.dialect_kwargs["copydialectoptionstest_some_pk_arg"], 5529 "a3", 5530 ) 5531 eq_( 5532 list(t.indexes)[0].dialect_kwargs[ 5533 "copydialectoptionstest_some_index_arg" 5534 ], 5535 "a4", 5536 ) 5537 eq_( 5538 list(c for c in t.constraints if isinstance(c, UniqueConstraint))[ 5539 0 5540 ].dialect_kwargs["copydialectoptionstest_some_uq_arg"], 5541 "a5", 5542 ) 5543 5544 def test_dialect_options_are_copied(self): 5545 with self._fixture(): 5546 t1 = Table( 5547 "t", 5548 MetaData(), 5549 Column( 5550 "foo", 5551 Integer, 5552 copydialectoptionstest_some_column_arg="a2", 5553 ), 5554 Column("bar", Integer), 5555 PrimaryKeyConstraint( 5556 "foo", copydialectoptionstest_some_pk_arg="a3" 5557 ), 5558 UniqueConstraint( 5559 "bar", copydialectoptionstest_some_uq_arg="a5" 5560 ), 5561 copydialectoptionstest_some_table_arg="a1", 5562 ) 5563 Index( 5564 "idx", 5565 t1.c.foo, 5566 copydialectoptionstest_some_index_arg="a4", 5567 ) 5568 5569 self.check_dialect_options_(t1) 5570 5571 m2 = MetaData() 5572 t2 = t1.to_metadata(m2) # make a copy 5573 self.check_dialect_options_(t2) 5574