1from contextlib import contextmanager 2import pickle 3 4import sqlalchemy as tsa 5from sqlalchemy import ARRAY 6from sqlalchemy import bindparam 7from sqlalchemy import BLANK_SCHEMA 8from sqlalchemy import Boolean 9from sqlalchemy import CheckConstraint 10from sqlalchemy import Column 11from sqlalchemy import column 12from sqlalchemy import ColumnDefault 13from sqlalchemy import desc 14from sqlalchemy import Enum 15from sqlalchemy import event 16from sqlalchemy import exc 17from sqlalchemy import ForeignKey 18from sqlalchemy import ForeignKeyConstraint 19from sqlalchemy import func 20from sqlalchemy import Index 21from sqlalchemy import Integer 22from sqlalchemy import MetaData 23from sqlalchemy import PrimaryKeyConstraint 24from sqlalchemy import schema 25from sqlalchemy import Sequence 26from sqlalchemy import String 27from sqlalchemy import Table 28from sqlalchemy import table 29from sqlalchemy import testing 30from sqlalchemy import text 31from sqlalchemy import TypeDecorator 32from sqlalchemy import types as sqltypes 33from sqlalchemy import Unicode 34from sqlalchemy import UniqueConstraint 35from sqlalchemy import util 36from sqlalchemy.engine import default 37from sqlalchemy.schema import AddConstraint 38from sqlalchemy.schema import CreateIndex 39from sqlalchemy.schema import DefaultClause 40from sqlalchemy.schema import DropIndex 41from sqlalchemy.sql import naming 42from sqlalchemy.sql.elements import _NONE_NAME 43from sqlalchemy.sql.elements import literal_column 44from sqlalchemy.testing import assert_raises 45from sqlalchemy.testing import assert_raises_message 46from sqlalchemy.testing import AssertsCompiledSQL 47from sqlalchemy.testing import ComparesTables 48from sqlalchemy.testing import emits_warning 49from sqlalchemy.testing import eq_ 50from sqlalchemy.testing import fixtures 51from sqlalchemy.testing import is_ 52from sqlalchemy.testing import is_false 53from sqlalchemy.testing import is_true 54from sqlalchemy.testing import mock 55 56 57class MetaDataTest(fixtures.TestBase, ComparesTables): 58 def test_metadata_contains(self): 59 metadata = MetaData() 60 t1 = Table("t1", metadata, Column("x", Integer)) 61 t2 = Table("t2", metadata, Column("x", Integer), schema="foo") 62 t3 = Table("t2", MetaData(), Column("x", Integer)) 63 t4 = Table("t1", MetaData(), Column("x", Integer), schema="foo") 64 65 assert "t1" in metadata 66 assert "foo.t2" in metadata 67 assert "t2" not in metadata 68 assert "foo.t1" not in metadata 69 assert t1 in metadata 70 assert t2 in metadata 71 assert t3 not in metadata 72 assert t4 not in metadata 73 74 def test_uninitialized_column_copy(self): 75 for col in [ 76 Column("foo", String(), nullable=False), 77 Column("baz", String(), unique=True), 78 Column(Integer(), primary_key=True), 79 Column( 80 "bar", 81 Integer(), 82 Sequence("foo_seq"), 83 primary_key=True, 84 key="bar", 85 ), 86 Column(Integer(), ForeignKey("bat.blah"), doc="this is a col"), 87 Column( 88 "bar", 89 Integer(), 90 ForeignKey("bat.blah"), 91 primary_key=True, 92 key="bar", 93 ), 94 Column("bar", Integer(), info={"foo": "bar"}), 95 ]: 96 c2 = col.copy() 97 for attr in ( 98 "name", 99 "type", 100 "nullable", 101 "primary_key", 102 "key", 103 "unique", 104 "info", 105 "doc", 106 ): 107 eq_(getattr(col, attr), getattr(c2, attr)) 108 eq_(len(col.foreign_keys), len(c2.foreign_keys)) 109 if col.default: 110 eq_(c2.default.name, "foo_seq") 111 for a1, a2 in zip(col.foreign_keys, c2.foreign_keys): 112 assert a1 is not a2 113 eq_(a2._colspec, "bat.blah") 114 115 def test_col_subclass_copy(self): 116 class MyColumn(schema.Column): 117 def __init__(self, *args, **kw): 118 self.widget = kw.pop("widget", None) 119 super(MyColumn, self).__init__(*args, **kw) 120 121 def copy(self, *arg, **kw): 122 c = super(MyColumn, self).copy(*arg, **kw) 123 c.widget = self.widget 124 return c 125 126 c1 = MyColumn("foo", Integer, widget="x") 127 c2 = c1.copy() 128 assert isinstance(c2, MyColumn) 129 eq_(c2.widget, "x") 130 131 def test_uninitialized_column_copy_events(self): 132 msgs = [] 133 134 def write(c, t): 135 msgs.append("attach %s.%s" % (t.name, c.name)) 136 137 c1 = Column("foo", String()) 138 m = MetaData() 139 for i in range(3): 140 cx = c1.copy() 141 # as of 0.7, these events no longer copy. its expected 142 # that listeners will be re-established from the 143 # natural construction of things. 144 cx._on_table_attach(write) 145 Table("foo%d" % i, m, cx) 146 eq_(msgs, ["attach foo0.foo", "attach foo1.foo", "attach foo2.foo"]) 147 148 def test_schema_collection_add(self): 149 metadata = MetaData() 150 151 Table("t1", metadata, Column("x", Integer), schema="foo") 152 Table("t2", metadata, Column("x", Integer), schema="bar") 153 Table("t3", metadata, Column("x", Integer)) 154 155 eq_(metadata._schemas, set(["foo", "bar"])) 156 eq_(len(metadata.tables), 3) 157 158 def test_schema_collection_remove(self): 159 metadata = MetaData() 160 161 t1 = Table("t1", metadata, Column("x", Integer), schema="foo") 162 Table("t2", metadata, Column("x", Integer), schema="bar") 163 t3 = Table("t3", metadata, Column("x", Integer), schema="bar") 164 165 metadata.remove(t3) 166 eq_(metadata._schemas, set(["foo", "bar"])) 167 eq_(len(metadata.tables), 2) 168 169 metadata.remove(t1) 170 eq_(metadata._schemas, set(["bar"])) 171 eq_(len(metadata.tables), 1) 172 173 def test_schema_collection_remove_all(self): 174 metadata = MetaData() 175 176 Table("t1", metadata, Column("x", Integer), schema="foo") 177 Table("t2", metadata, Column("x", Integer), schema="bar") 178 179 metadata.clear() 180 eq_(metadata._schemas, set()) 181 eq_(len(metadata.tables), 0) 182 183 def test_metadata_tables_immutable(self): 184 metadata = MetaData() 185 186 Table("t1", metadata, Column("x", Integer)) 187 assert "t1" in metadata.tables 188 189 assert_raises(TypeError, lambda: metadata.tables.pop("t1")) 190 191 @testing.provide_metadata 192 def test_dupe_tables(self): 193 metadata = self.metadata 194 Table( 195 "table1", 196 metadata, 197 Column("col1", Integer, primary_key=True), 198 Column("col2", String(20)), 199 ) 200 201 metadata.create_all() 202 Table("table1", metadata, autoload=True) 203 204 def go(): 205 Table( 206 "table1", 207 metadata, 208 Column("col1", Integer, primary_key=True), 209 Column("col2", String(20)), 210 ) 211 212 assert_raises_message( 213 tsa.exc.InvalidRequestError, 214 "Table 'table1' is already defined for this " 215 "MetaData instance. Specify 'extend_existing=True' " 216 "to redefine options and columns on an existing " 217 "Table object.", 218 go, 219 ) 220 221 def test_fk_copy(self): 222 c1 = Column("foo", Integer) 223 c2 = Column("bar", Integer) 224 m = MetaData() 225 t1 = Table("t", m, c1, c2) 226 227 kw = dict( 228 onupdate="X", 229 ondelete="Y", 230 use_alter=True, 231 name="f1", 232 deferrable="Z", 233 initially="Q", 234 link_to_name=True, 235 ) 236 237 fk1 = ForeignKey(c1, **kw) 238 fk2 = ForeignKeyConstraint((c1,), (c2,), **kw) 239 240 t1.append_constraint(fk2) 241 fk1c = fk1.copy() 242 fk2c = fk2.copy() 243 244 for k in kw: 245 eq_(getattr(fk1c, k), kw[k]) 246 eq_(getattr(fk2c, k), kw[k]) 247 248 def test_check_constraint_copy(self): 249 def r(x): 250 return x 251 252 c = CheckConstraint( 253 "foo bar", 254 name="name", 255 initially=True, 256 deferrable=True, 257 _create_rule=r, 258 ) 259 c2 = c.copy() 260 eq_(c2.name, "name") 261 eq_(str(c2.sqltext), "foo bar") 262 eq_(c2.initially, True) 263 eq_(c2.deferrable, True) 264 assert c2._create_rule is r 265 266 def test_col_replace_w_constraint(self): 267 m = MetaData() 268 a = Table("a", m, Column("id", Integer, primary_key=True)) 269 270 aid = Column("a_id", ForeignKey("a.id")) 271 b = Table("b", m, aid) 272 b.append_column(aid) 273 274 assert b.c.a_id.references(a.c.id) 275 eq_(len(b.constraints), 2) 276 277 def test_fk_construct(self): 278 c1 = Column("foo", Integer) 279 c2 = Column("bar", Integer) 280 m = MetaData() 281 t1 = Table("t", m, c1, c2) 282 fk1 = ForeignKeyConstraint(("foo",), ("bar",), table=t1) 283 assert fk1 in t1.constraints 284 285 def test_fk_constraint_col_collection_w_table(self): 286 c1 = Column("foo", Integer) 287 c2 = Column("bar", Integer) 288 m = MetaData() 289 t1 = Table("t", m, c1, c2) 290 fk1 = ForeignKeyConstraint(("foo",), ("bar",), table=t1) 291 eq_(dict(fk1.columns), {"foo": c1}) 292 293 def test_fk_constraint_col_collection_no_table(self): 294 fk1 = ForeignKeyConstraint(("foo", "bat"), ("bar", "hoho")) 295 eq_(dict(fk1.columns), {}) 296 eq_(fk1.column_keys, ["foo", "bat"]) 297 eq_(fk1._col_description, "foo, bat") 298 eq_(fk1._elements, {"foo": fk1.elements[0], "bat": fk1.elements[1]}) 299 300 def test_fk_constraint_col_collection_no_table_real_cols(self): 301 c1 = Column("foo", Integer) 302 c2 = Column("bar", Integer) 303 fk1 = ForeignKeyConstraint((c1,), (c2,)) 304 eq_(dict(fk1.columns), {}) 305 eq_(fk1.column_keys, ["foo"]) 306 eq_(fk1._col_description, "foo") 307 eq_(fk1._elements, {"foo": fk1.elements[0]}) 308 309 def test_fk_constraint_col_collection_added_to_table(self): 310 c1 = Column("foo", Integer) 311 m = MetaData() 312 fk1 = ForeignKeyConstraint(("foo",), ("bar",)) 313 Table("t", m, c1, fk1) 314 eq_(dict(fk1.columns), {"foo": c1}) 315 eq_(fk1._elements, {"foo": fk1.elements[0]}) 316 317 def test_fk_constraint_col_collection_via_fk(self): 318 fk = ForeignKey("bar") 319 c1 = Column("foo", Integer, fk) 320 m = MetaData() 321 t1 = Table("t", m, c1) 322 fk1 = fk.constraint 323 eq_(fk1.column_keys, ["foo"]) 324 assert fk1 in t1.constraints 325 eq_(fk1.column_keys, ["foo"]) 326 eq_(dict(fk1.columns), {"foo": c1}) 327 eq_(fk1._elements, {"foo": fk}) 328 329 def test_fk_no_such_parent_col_error(self): 330 meta = MetaData() 331 a = Table("a", meta, Column("a", Integer)) 332 Table("b", meta, Column("b", Integer)) 333 334 def go(): 335 a.append_constraint(ForeignKeyConstraint(["x"], ["b.b"])) 336 337 assert_raises_message( 338 exc.ArgumentError, 339 "Can't create ForeignKeyConstraint on " 340 "table 'a': no column named 'x' is present.", 341 go, 342 ) 343 344 def test_fk_given_non_col(self): 345 not_a_col = bindparam("x") 346 assert_raises_message( 347 exc.ArgumentError, 348 "String, Column, or Column-bound argument expected, got Bind", 349 ForeignKey, 350 not_a_col, 351 ) 352 353 def test_fk_given_non_col_clauseelem(self): 354 class Foo(object): 355 def __clause_element__(self): 356 return bindparam("x") 357 358 assert_raises_message( 359 exc.ArgumentError, 360 "String, Column, or Column-bound argument expected, got Bind", 361 ForeignKey, 362 Foo(), 363 ) 364 365 def test_fk_given_col_non_table(self): 366 t = Table("t", MetaData(), Column("x", Integer)) 367 xa = t.alias().c.x 368 assert_raises_message( 369 exc.ArgumentError, 370 "ForeignKey received Column not bound to a Table, got: .*Alias", 371 ForeignKey, 372 xa, 373 ) 374 375 def test_fk_given_col_non_table_clauseelem(self): 376 t = Table("t", MetaData(), Column("x", Integer)) 377 378 class Foo(object): 379 def __clause_element__(self): 380 return t.alias().c.x 381 382 assert_raises_message( 383 exc.ArgumentError, 384 "ForeignKey received Column not bound to a Table, got: .*Alias", 385 ForeignKey, 386 Foo(), 387 ) 388 389 def test_fk_no_such_target_col_error_upfront(self): 390 meta = MetaData() 391 a = Table("a", meta, Column("a", Integer)) 392 Table("b", meta, Column("b", Integer)) 393 394 a.append_constraint(ForeignKeyConstraint(["a"], ["b.x"])) 395 396 assert_raises_message( 397 exc.NoReferencedColumnError, 398 "Could not initialize target column for ForeignKey 'b.x' on " 399 "table 'a': table 'b' has no column named 'x'", 400 getattr, 401 list(a.foreign_keys)[0], 402 "column", 403 ) 404 405 def test_fk_no_such_target_col_error_delayed(self): 406 meta = MetaData() 407 a = Table("a", meta, Column("a", Integer)) 408 a.append_constraint(ForeignKeyConstraint(["a"], ["b.x"])) 409 410 Table("b", meta, Column("b", Integer)) 411 412 assert_raises_message( 413 exc.NoReferencedColumnError, 414 "Could not initialize target column for ForeignKey 'b.x' on " 415 "table 'a': table 'b' has no column named 'x'", 416 getattr, 417 list(a.foreign_keys)[0], 418 "column", 419 ) 420 421 def test_fk_mismatched_local_remote_cols(self): 422 423 assert_raises_message( 424 exc.ArgumentError, 425 "ForeignKeyConstraint number of constrained columns must " 426 "match the number of referenced columns.", 427 ForeignKeyConstraint, 428 ["a"], 429 ["b.a", "b.b"], 430 ) 431 432 assert_raises_message( 433 exc.ArgumentError, 434 "ForeignKeyConstraint number of constrained columns " 435 "must match the number of referenced columns.", 436 ForeignKeyConstraint, 437 ["a", "b"], 438 ["b.a"], 439 ) 440 441 assert_raises_message( 442 exc.ArgumentError, 443 "ForeignKeyConstraint with duplicate source column " 444 "references are not supported.", 445 ForeignKeyConstraint, 446 ["a", "a"], 447 ["b.a", "b.b"], 448 ) 449 450 def test_pickle_metadata_sequence_restated(self): 451 m1 = MetaData() 452 Table( 453 "a", 454 m1, 455 Column("id", Integer, primary_key=True), 456 Column("x", Integer, Sequence("x_seq")), 457 ) 458 459 m2 = pickle.loads(pickle.dumps(m1)) 460 461 s2 = Sequence("x_seq") 462 t2 = Table( 463 "a", 464 m2, 465 Column("id", Integer, primary_key=True), 466 Column("x", Integer, s2), 467 extend_existing=True, 468 ) 469 470 assert m2._sequences["x_seq"] is t2.c.x.default 471 assert m2._sequences["x_seq"] is s2 472 473 def test_sequence_restated_replaced(self): 474 """Test restatement of Sequence replaces.""" 475 476 m1 = MetaData() 477 s1 = Sequence("x_seq") 478 t = Table("a", m1, Column("x", Integer, s1)) 479 assert m1._sequences["x_seq"] is s1 480 481 s2 = Sequence("x_seq") 482 Table("a", m1, Column("x", Integer, s2), extend_existing=True) 483 assert t.c.x.default is s2 484 assert m1._sequences["x_seq"] is s2 485 486 def test_sequence_attach_to_table(self): 487 m1 = MetaData() 488 s1 = Sequence("s") 489 Table("a", m1, Column("x", Integer, s1)) 490 assert s1.metadata is m1 491 492 def test_sequence_attach_to_existing_table(self): 493 m1 = MetaData() 494 s1 = Sequence("s") 495 t = Table("a", m1, Column("x", Integer)) 496 t.c.x._init_items(s1) 497 assert s1.metadata is m1 498 499 def test_pickle_metadata_sequence_implicit(self): 500 m1 = MetaData() 501 Table( 502 "a", 503 m1, 504 Column("id", Integer, primary_key=True), 505 Column("x", Integer, Sequence("x_seq")), 506 ) 507 508 m2 = pickle.loads(pickle.dumps(m1)) 509 510 t2 = Table("a", m2, extend_existing=True) 511 512 eq_(m2._sequences, {"x_seq": t2.c.x.default}) 513 514 def test_pickle_metadata_schema(self): 515 m1 = MetaData() 516 Table( 517 "a", 518 m1, 519 Column("id", Integer, primary_key=True), 520 Column("x", Integer, Sequence("x_seq")), 521 schema="y", 522 ) 523 524 m2 = pickle.loads(pickle.dumps(m1)) 525 526 Table("a", m2, schema="y", extend_existing=True) 527 528 eq_(m2._schemas, m1._schemas) 529 530 def test_metadata_schema_arg(self): 531 m1 = MetaData(schema="sch1") 532 m2 = MetaData(schema="sch1", quote_schema=True) 533 m3 = MetaData(schema="sch1", quote_schema=False) 534 m4 = MetaData() 535 536 for ( 537 i, 538 ( 539 name, 540 metadata, 541 schema_, 542 quote_schema, 543 exp_schema, 544 exp_quote_schema, 545 ), 546 ) in enumerate( 547 [ 548 ("t1", m1, None, None, "sch1", None), 549 ("t2", m1, "sch2", None, "sch2", None), 550 ("t3", m1, "sch2", True, "sch2", True), 551 ("t4", m1, "sch1", None, "sch1", None), 552 ("t5", m1, BLANK_SCHEMA, None, None, None), 553 ("t1", m2, None, None, "sch1", True), 554 ("t2", m2, "sch2", None, "sch2", None), 555 ("t3", m2, "sch2", True, "sch2", True), 556 ("t4", m2, "sch1", None, "sch1", None), 557 ("t1", m3, None, None, "sch1", False), 558 ("t2", m3, "sch2", None, "sch2", None), 559 ("t3", m3, "sch2", True, "sch2", True), 560 ("t4", m3, "sch1", None, "sch1", None), 561 ("t1", m4, None, None, None, None), 562 ("t2", m4, "sch2", None, "sch2", None), 563 ("t3", m4, "sch2", True, "sch2", True), 564 ("t4", m4, "sch1", None, "sch1", None), 565 ("t5", m4, BLANK_SCHEMA, None, None, None), 566 ] 567 ): 568 kw = {} 569 if schema_ is not None: 570 kw["schema"] = schema_ 571 if quote_schema is not None: 572 kw["quote_schema"] = quote_schema 573 t = Table(name, metadata, **kw) 574 eq_(t.schema, exp_schema, "test %d, table schema" % i) 575 eq_( 576 t.schema.quote if t.schema is not None else None, 577 exp_quote_schema, 578 "test %d, table quote_schema" % i, 579 ) 580 seq = Sequence(name, metadata=metadata, **kw) 581 eq_(seq.schema, exp_schema, "test %d, seq schema" % i) 582 eq_( 583 seq.schema.quote if seq.schema is not None else None, 584 exp_quote_schema, 585 "test %d, seq quote_schema" % i, 586 ) 587 588 def test_manual_dependencies(self): 589 meta = MetaData() 590 a = Table("a", meta, Column("foo", Integer)) 591 b = Table("b", meta, Column("foo", Integer)) 592 c = Table("c", meta, Column("foo", Integer)) 593 d = Table("d", meta, Column("foo", Integer)) 594 e = Table("e", meta, Column("foo", Integer)) 595 596 e.add_is_dependent_on(c) 597 a.add_is_dependent_on(b) 598 b.add_is_dependent_on(d) 599 e.add_is_dependent_on(b) 600 c.add_is_dependent_on(a) 601 eq_(meta.sorted_tables, [d, b, a, c, e]) 602 603 def test_deterministic_order(self): 604 meta = MetaData() 605 a = Table("a", meta, Column("foo", Integer)) 606 b = Table("b", meta, Column("foo", Integer)) 607 c = Table("c", meta, Column("foo", Integer)) 608 d = Table("d", meta, Column("foo", Integer)) 609 e = Table("e", meta, Column("foo", Integer)) 610 611 e.add_is_dependent_on(c) 612 a.add_is_dependent_on(b) 613 eq_(meta.sorted_tables, [b, c, d, a, e]) 614 615 def test_fks_deterministic_order(self): 616 meta = MetaData() 617 a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo"))) 618 b = Table("b", meta, Column("foo", Integer)) 619 c = Table("c", meta, Column("foo", Integer)) 620 d = Table("d", meta, Column("foo", Integer)) 621 e = Table("e", meta, Column("foo", Integer, ForeignKey("c.foo"))) 622 623 eq_(meta.sorted_tables, [b, c, d, a, e]) 624 625 def test_cycles_fks_warning_one(self): 626 meta = MetaData() 627 a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo"))) 628 b = Table("b", meta, Column("foo", Integer, ForeignKey("d.foo"))) 629 c = Table("c", meta, Column("foo", Integer, ForeignKey("b.foo"))) 630 d = Table("d", meta, Column("foo", Integer, ForeignKey("c.foo"))) 631 e = Table("e", meta, Column("foo", Integer)) 632 633 with testing.expect_warnings( 634 "Cannot correctly sort tables; there are unresolvable cycles " 635 'between tables "b, c, d", which is usually caused by mutually ' 636 "dependent foreign key constraints. " 637 "Foreign key constraints involving these tables will not be " 638 "considered" 639 ): 640 eq_(meta.sorted_tables, [b, c, d, e, a]) 641 642 def test_cycles_fks_warning_two(self): 643 meta = MetaData() 644 a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo"))) 645 b = Table("b", meta, Column("foo", Integer, ForeignKey("a.foo"))) 646 c = Table("c", meta, Column("foo", Integer, ForeignKey("e.foo"))) 647 d = Table("d", meta, Column("foo", Integer)) 648 e = Table("e", meta, Column("foo", Integer, ForeignKey("d.foo"))) 649 650 with testing.expect_warnings( 651 "Cannot correctly sort tables; there are unresolvable cycles " 652 'between tables "a, b", which is usually caused by mutually ' 653 "dependent foreign key constraints. " 654 "Foreign key constraints involving these tables will not be " 655 "considered" 656 ): 657 eq_(meta.sorted_tables, [a, b, d, e, c]) 658 659 def test_cycles_fks_fks_delivered_separately(self): 660 meta = MetaData() 661 a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo"))) 662 b = Table("b", meta, Column("foo", Integer, ForeignKey("a.foo"))) 663 c = Table("c", meta, Column("foo", Integer, ForeignKey("e.foo"))) 664 d = Table("d", meta, Column("foo", Integer)) 665 e = Table("e", meta, Column("foo", Integer, ForeignKey("d.foo"))) 666 667 results = schema.sort_tables_and_constraints( 668 sorted(meta.tables.values(), key=lambda t: t.key) 669 ) 670 results[-1] = (None, set(results[-1][-1])) 671 eq_( 672 results, 673 [ 674 (a, set()), 675 (b, set()), 676 (d, {fk.constraint for fk in d.foreign_keys}), 677 (e, {fk.constraint for fk in e.foreign_keys}), 678 (c, {fk.constraint for fk in c.foreign_keys}), 679 ( 680 None, 681 {fk.constraint for fk in a.foreign_keys}.union( 682 fk.constraint for fk in b.foreign_keys 683 ), 684 ), 685 ], 686 ) 687 688 def test_cycles_fks_usealter(self): 689 meta = MetaData() 690 a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo"))) 691 b = Table( 692 "b", 693 meta, 694 Column("foo", Integer, ForeignKey("d.foo", use_alter=True)), 695 ) 696 c = Table("c", meta, Column("foo", Integer, ForeignKey("b.foo"))) 697 d = Table("d", meta, Column("foo", Integer, ForeignKey("c.foo"))) 698 e = Table("e", meta, Column("foo", Integer)) 699 700 eq_(meta.sorted_tables, [b, e, a, c, d]) 701 702 def test_nonexistent(self): 703 assert_raises( 704 tsa.exc.NoSuchTableError, 705 Table, 706 "fake_table", 707 MetaData(testing.db), 708 autoload=True, 709 ) 710 711 def test_assorted_repr(self): 712 t1 = Table("foo", MetaData(), Column("x", Integer)) 713 i1 = Index("bar", t1.c.x) 714 ck = schema.CheckConstraint("x > y", name="someconstraint") 715 716 for const, exp in ( 717 (Sequence("my_seq"), "Sequence('my_seq')"), 718 (Sequence("my_seq", start=5), "Sequence('my_seq', start=5)"), 719 (Column("foo", Integer), "Column('foo', Integer(), table=None)"), 720 ( 721 Column( 722 "foo", 723 Integer, 724 primary_key=True, 725 nullable=False, 726 onupdate=1, 727 default=42, 728 server_default="42", 729 comment="foo", 730 ), 731 "Column('foo', Integer(), table=None, primary_key=True, " 732 "nullable=False, onupdate=%s, default=%s, server_default=%s, " 733 "comment='foo')" 734 % ( 735 ColumnDefault(1), 736 ColumnDefault(42), 737 DefaultClause("42"), 738 ), 739 ), 740 ( 741 Table("bar", MetaData(), Column("x", String)), 742 "Table('bar', MetaData(bind=None), " 743 "Column('x', String(), table=<bar>), schema=None)", 744 ), 745 ( 746 schema.DefaultGenerator(for_update=True), 747 "DefaultGenerator(for_update=True)", 748 ), 749 (schema.Index("bar", "c"), "Index('bar', 'c')"), 750 (i1, "Index('bar', Column('x', Integer(), table=<foo>))"), 751 (schema.FetchedValue(), "FetchedValue()"), 752 ( 753 ck, 754 "CheckConstraint(" 755 "%s" 756 ", name='someconstraint')" % repr(ck.sqltext), 757 ), 758 (ColumnDefault(("foo", "bar")), "ColumnDefault(('foo', 'bar'))"), 759 ): 760 eq_(repr(const), exp) 761 762 763class ToMetaDataTest(fixtures.TestBase, AssertsCompiledSQL, ComparesTables): 764 @testing.requires.check_constraints 765 def test_copy(self): 766 # TODO: modernize this test 767 768 from sqlalchemy.testing.schema import Table 769 770 meta = MetaData() 771 772 table = Table( 773 "mytable", 774 meta, 775 Column("myid", Integer, Sequence("foo_id_seq"), primary_key=True), 776 Column("name", String(40), nullable=True), 777 Column( 778 "foo", 779 String(40), 780 nullable=False, 781 server_default="x", 782 server_onupdate="q", 783 ), 784 Column( 785 "bar", String(40), nullable=False, default="y", onupdate="z" 786 ), 787 Column( 788 "description", String(30), CheckConstraint("description='hi'") 789 ), 790 UniqueConstraint("name"), 791 test_needs_fk=True, 792 ) 793 794 table2 = Table( 795 "othertable", 796 meta, 797 Column("id", Integer, Sequence("foo_seq"), primary_key=True), 798 Column("myid", Integer, ForeignKey("mytable.myid")), 799 test_needs_fk=True, 800 ) 801 802 table3 = Table( 803 "has_comments", 804 meta, 805 Column("foo", Integer, comment="some column"), 806 comment="table comment", 807 ) 808 809 def test_to_metadata(): 810 meta2 = MetaData() 811 table_c = table.tometadata(meta2) 812 table2_c = table2.tometadata(meta2) 813 table3_c = table3.tometadata(meta2) 814 return (table_c, table2_c, table3_c) 815 816 def test_pickle(): 817 meta.bind = testing.db 818 meta2 = pickle.loads(pickle.dumps(meta)) 819 assert meta2.bind is None 820 pickle.loads(pickle.dumps(meta2)) 821 return ( 822 meta2.tables["mytable"], 823 meta2.tables["othertable"], 824 meta2.tables["has_comments"], 825 ) 826 827 def test_pickle_via_reflect(): 828 # this is the most common use case, pickling the results of a 829 # database reflection 830 meta2 = MetaData(bind=testing.db) 831 t1 = Table("mytable", meta2, autoload=True) 832 Table("othertable", meta2, autoload=True) 833 Table("has_comments", meta2, autoload=True) 834 meta3 = pickle.loads(pickle.dumps(meta2)) 835 assert meta3.bind is None 836 assert meta3.tables["mytable"] is not t1 837 838 return ( 839 meta3.tables["mytable"], 840 meta3.tables["othertable"], 841 meta3.tables["has_comments"], 842 ) 843 844 meta.create_all(testing.db) 845 try: 846 for test, has_constraints, reflect in ( 847 (test_to_metadata, True, False), 848 (test_pickle, True, False), 849 (test_pickle_via_reflect, False, True), 850 ): 851 table_c, table2_c, table3_c = test() 852 self.assert_tables_equal(table, table_c) 853 self.assert_tables_equal(table2, table2_c) 854 assert table is not table_c 855 assert table.primary_key is not table_c.primary_key 856 assert ( 857 list(table2_c.c.myid.foreign_keys)[0].column 858 is table_c.c.myid 859 ) 860 assert ( 861 list(table2_c.c.myid.foreign_keys)[0].column 862 is not table.c.myid 863 ) 864 assert "x" in str(table_c.c.foo.server_default.arg) 865 if not reflect: 866 assert isinstance(table_c.c.myid.default, Sequence) 867 assert str(table_c.c.foo.server_onupdate.arg) == "q" 868 assert str(table_c.c.bar.default.arg) == "y" 869 assert ( 870 getattr( 871 table_c.c.bar.onupdate.arg, 872 "arg", 873 table_c.c.bar.onupdate.arg, 874 ) 875 == "z" 876 ) 877 assert isinstance(table2_c.c.id.default, Sequence) 878 879 # constraints don't get reflected for any dialect right 880 # now 881 882 if has_constraints: 883 for c in table_c.c.description.constraints: 884 if isinstance(c, CheckConstraint): 885 break 886 else: 887 assert False 888 assert str(c.sqltext) == "description='hi'" 889 for c in table_c.constraints: 890 if isinstance(c, UniqueConstraint): 891 break 892 else: 893 assert False 894 assert c.columns.contains_column(table_c.c.name) 895 assert not c.columns.contains_column(table.c.name) 896 897 if testing.requires.comment_reflection.enabled: 898 eq_(table3_c.comment, "table comment") 899 eq_(table3_c.c.foo.comment, "some column") 900 901 finally: 902 meta.drop_all(testing.db) 903 904 def test_col_key_fk_parent(self): 905 # test #2643 906 m1 = MetaData() 907 a = Table("a", m1, Column("x", Integer)) 908 b = Table("b", m1, Column("x", Integer, ForeignKey("a.x"), key="y")) 909 assert b.c.y.references(a.c.x) 910 911 m2 = MetaData() 912 b2 = b.tometadata(m2) 913 a2 = a.tometadata(m2) 914 assert b2.c.y.references(a2.c.x) 915 916 def test_column_collection_constraint_w_ad_hoc_columns(self): 917 """Test ColumnCollectionConstraint that has columns that aren't 918 part of the Table. 919 920 """ 921 meta = MetaData() 922 923 uq1 = UniqueConstraint(literal_column("some_name")) 924 cc1 = CheckConstraint(literal_column("some_name") > 5) 925 table = Table( 926 "mytable", 927 meta, 928 Column("myid", Integer, primary_key=True), 929 Column("name", String(40), nullable=True), 930 uq1, 931 cc1, 932 ) 933 934 self.assert_compile( 935 schema.AddConstraint(uq1), 936 "ALTER TABLE mytable ADD UNIQUE (some_name)", 937 dialect="default", 938 ) 939 self.assert_compile( 940 schema.AddConstraint(cc1), 941 "ALTER TABLE mytable ADD CHECK (some_name > 5)", 942 dialect="default", 943 ) 944 meta2 = MetaData() 945 table2 = table.tometadata(meta2) 946 uq2 = [ 947 c for c in table2.constraints if isinstance(c, UniqueConstraint) 948 ][0] 949 cc2 = [ 950 c for c in table2.constraints if isinstance(c, CheckConstraint) 951 ][0] 952 self.assert_compile( 953 schema.AddConstraint(uq2), 954 "ALTER TABLE mytable ADD UNIQUE (some_name)", 955 dialect="default", 956 ) 957 self.assert_compile( 958 schema.AddConstraint(cc2), 959 "ALTER TABLE mytable ADD CHECK (some_name > 5)", 960 dialect="default", 961 ) 962 963 def test_change_schema(self): 964 meta = MetaData() 965 966 table = Table( 967 "mytable", 968 meta, 969 Column("myid", Integer, primary_key=True), 970 Column("name", String(40), nullable=True), 971 Column( 972 "description", String(30), CheckConstraint("description='hi'") 973 ), 974 UniqueConstraint("name"), 975 ) 976 977 table2 = Table( 978 "othertable", 979 meta, 980 Column("id", Integer, primary_key=True), 981 Column("myid", Integer, ForeignKey("mytable.myid")), 982 ) 983 984 meta2 = MetaData() 985 table_c = table.tometadata(meta2, schema="someschema") 986 table2_c = table2.tometadata(meta2, schema="someschema") 987 988 eq_( 989 str(table_c.join(table2_c).onclause), 990 str(table_c.c.myid == table2_c.c.myid), 991 ) 992 eq_( 993 str(table_c.join(table2_c).onclause), 994 "someschema.mytable.myid = someschema.othertable.myid", 995 ) 996 997 def test_retain_table_schema(self): 998 meta = MetaData() 999 1000 table = Table( 1001 "mytable", 1002 meta, 1003 Column("myid", Integer, primary_key=True), 1004 Column("name", String(40), nullable=True), 1005 Column( 1006 "description", String(30), CheckConstraint("description='hi'") 1007 ), 1008 UniqueConstraint("name"), 1009 schema="myschema", 1010 ) 1011 1012 table2 = Table( 1013 "othertable", 1014 meta, 1015 Column("id", Integer, primary_key=True), 1016 Column("myid", Integer, ForeignKey("myschema.mytable.myid")), 1017 schema="myschema", 1018 ) 1019 1020 meta2 = MetaData() 1021 table_c = table.tometadata(meta2) 1022 table2_c = table2.tometadata(meta2) 1023 1024 eq_( 1025 str(table_c.join(table2_c).onclause), 1026 str(table_c.c.myid == table2_c.c.myid), 1027 ) 1028 eq_( 1029 str(table_c.join(table2_c).onclause), 1030 "myschema.mytable.myid = myschema.othertable.myid", 1031 ) 1032 1033 def test_change_name_retain_metadata(self): 1034 meta = MetaData() 1035 1036 table = Table( 1037 "mytable", 1038 meta, 1039 Column("myid", Integer, primary_key=True), 1040 Column("name", String(40), nullable=True), 1041 Column( 1042 "description", String(30), CheckConstraint("description='hi'") 1043 ), 1044 UniqueConstraint("name"), 1045 schema="myschema", 1046 ) 1047 1048 table2 = table.tometadata(table.metadata, name="newtable") 1049 table3 = table.tometadata( 1050 table.metadata, schema="newschema", name="newtable" 1051 ) 1052 1053 assert table.metadata is table2.metadata 1054 assert table.metadata is table3.metadata 1055 eq_( 1056 (table.name, table2.name, table3.name), 1057 ("mytable", "newtable", "newtable"), 1058 ) 1059 eq_( 1060 (table.key, table2.key, table3.key), 1061 ("myschema.mytable", "myschema.newtable", "newschema.newtable"), 1062 ) 1063 1064 def test_change_name_change_metadata(self): 1065 meta = MetaData() 1066 meta2 = MetaData() 1067 1068 table = Table( 1069 "mytable", 1070 meta, 1071 Column("myid", Integer, primary_key=True), 1072 Column("name", String(40), nullable=True), 1073 Column( 1074 "description", String(30), CheckConstraint("description='hi'") 1075 ), 1076 UniqueConstraint("name"), 1077 schema="myschema", 1078 ) 1079 1080 table2 = table.tometadata(meta2, name="newtable") 1081 1082 assert table.metadata is not table2.metadata 1083 eq_((table.name, table2.name), ("mytable", "newtable")) 1084 eq_((table.key, table2.key), ("myschema.mytable", "myschema.newtable")) 1085 1086 def test_change_name_selfref_fk_moves(self): 1087 meta = MetaData() 1088 1089 referenced = Table( 1090 "ref", meta, Column("id", Integer, primary_key=True) 1091 ) 1092 table = Table( 1093 "mytable", 1094 meta, 1095 Column("id", Integer, primary_key=True), 1096 Column("parent_id", ForeignKey("mytable.id")), 1097 Column("ref_id", ForeignKey("ref.id")), 1098 ) 1099 1100 table2 = table.tometadata(table.metadata, name="newtable") 1101 assert table.metadata is table2.metadata 1102 assert table2.c.ref_id.references(referenced.c.id) 1103 assert table2.c.parent_id.references(table2.c.id) 1104 1105 def test_change_name_selfref_fk_moves_w_schema(self): 1106 meta = MetaData() 1107 1108 referenced = Table( 1109 "ref", meta, Column("id", Integer, primary_key=True) 1110 ) 1111 table = Table( 1112 "mytable", 1113 meta, 1114 Column("id", Integer, primary_key=True), 1115 Column("parent_id", ForeignKey("mytable.id")), 1116 Column("ref_id", ForeignKey("ref.id")), 1117 ) 1118 1119 table2 = table.tometadata( 1120 table.metadata, name="newtable", schema="newschema" 1121 ) 1122 ref2 = referenced.tometadata(table.metadata, schema="newschema") 1123 assert table.metadata is table2.metadata 1124 assert table2.c.ref_id.references(ref2.c.id) 1125 assert table2.c.parent_id.references(table2.c.id) 1126 1127 def _assert_fk(self, t2, schema, expected, referred_schema_fn=None): 1128 m2 = MetaData() 1129 existing_schema = t2.schema 1130 if schema: 1131 t2c = t2.tometadata( 1132 m2, schema=schema, referred_schema_fn=referred_schema_fn 1133 ) 1134 eq_(t2c.schema, schema) 1135 else: 1136 t2c = t2.tometadata(m2, referred_schema_fn=referred_schema_fn) 1137 eq_(t2c.schema, existing_schema) 1138 eq_(list(t2c.c.y.foreign_keys)[0]._get_colspec(), expected) 1139 1140 def test_fk_has_schema_string_retain_schema(self): 1141 m = MetaData() 1142 1143 t2 = Table("t2", m, Column("y", Integer, ForeignKey("q.t1.x"))) 1144 self._assert_fk(t2, None, "q.t1.x") 1145 1146 Table("t1", m, Column("x", Integer), schema="q") 1147 self._assert_fk(t2, None, "q.t1.x") 1148 1149 def test_fk_has_schema_string_new_schema(self): 1150 m = MetaData() 1151 1152 t2 = Table("t2", m, Column("y", Integer, ForeignKey("q.t1.x"))) 1153 self._assert_fk(t2, "z", "q.t1.x") 1154 1155 Table("t1", m, Column("x", Integer), schema="q") 1156 self._assert_fk(t2, "z", "q.t1.x") 1157 1158 def test_fk_has_schema_col_retain_schema(self): 1159 m = MetaData() 1160 1161 t1 = Table("t1", m, Column("x", Integer), schema="q") 1162 t2 = Table("t2", m, Column("y", Integer, ForeignKey(t1.c.x))) 1163 1164 self._assert_fk(t2, "z", "q.t1.x") 1165 1166 def test_fk_has_schema_col_new_schema(self): 1167 m = MetaData() 1168 1169 t1 = Table("t1", m, Column("x", Integer), schema="q") 1170 t2 = Table("t2", m, Column("y", Integer, ForeignKey(t1.c.x))) 1171 1172 self._assert_fk(t2, "z", "q.t1.x") 1173 1174 def test_fk_and_referent_has_same_schema_string_retain_schema(self): 1175 m = MetaData() 1176 1177 t2 = Table( 1178 "t2", m, Column("y", Integer, ForeignKey("q.t1.x")), schema="q" 1179 ) 1180 1181 self._assert_fk(t2, None, "q.t1.x") 1182 1183 Table("t1", m, Column("x", Integer), schema="q") 1184 self._assert_fk(t2, None, "q.t1.x") 1185 1186 def test_fk_and_referent_has_same_schema_string_new_schema(self): 1187 m = MetaData() 1188 1189 t2 = Table( 1190 "t2", m, Column("y", Integer, ForeignKey("q.t1.x")), schema="q" 1191 ) 1192 1193 self._assert_fk(t2, "z", "z.t1.x") 1194 1195 Table("t1", m, Column("x", Integer), schema="q") 1196 self._assert_fk(t2, "z", "z.t1.x") 1197 1198 def test_fk_and_referent_has_same_schema_col_retain_schema(self): 1199 m = MetaData() 1200 1201 t1 = Table("t1", m, Column("x", Integer), schema="q") 1202 t2 = Table( 1203 "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q" 1204 ) 1205 self._assert_fk(t2, None, "q.t1.x") 1206 1207 def test_fk_and_referent_has_same_schema_col_new_schema(self): 1208 m = MetaData() 1209 1210 t1 = Table("t1", m, Column("x", Integer), schema="q") 1211 t2 = Table( 1212 "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q" 1213 ) 1214 self._assert_fk(t2, "z", "z.t1.x") 1215 1216 def test_fk_and_referent_has_diff_schema_string_retain_schema(self): 1217 m = MetaData() 1218 1219 t2 = Table( 1220 "t2", m, Column("y", Integer, ForeignKey("p.t1.x")), schema="q" 1221 ) 1222 1223 self._assert_fk(t2, None, "p.t1.x") 1224 1225 Table("t1", m, Column("x", Integer), schema="p") 1226 self._assert_fk(t2, None, "p.t1.x") 1227 1228 def test_fk_and_referent_has_diff_schema_string_new_schema(self): 1229 m = MetaData() 1230 1231 t2 = Table( 1232 "t2", m, Column("y", Integer, ForeignKey("p.t1.x")), schema="q" 1233 ) 1234 1235 self._assert_fk(t2, "z", "p.t1.x") 1236 1237 Table("t1", m, Column("x", Integer), schema="p") 1238 self._assert_fk(t2, "z", "p.t1.x") 1239 1240 def test_fk_and_referent_has_diff_schema_col_retain_schema(self): 1241 m = MetaData() 1242 1243 t1 = Table("t1", m, Column("x", Integer), schema="p") 1244 t2 = Table( 1245 "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q" 1246 ) 1247 self._assert_fk(t2, None, "p.t1.x") 1248 1249 def test_fk_and_referent_has_diff_schema_col_new_schema(self): 1250 m = MetaData() 1251 1252 t1 = Table("t1", m, Column("x", Integer), schema="p") 1253 t2 = Table( 1254 "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q" 1255 ) 1256 self._assert_fk(t2, "z", "p.t1.x") 1257 1258 def test_fk_custom_system(self): 1259 m = MetaData() 1260 t2 = Table( 1261 "t2", m, Column("y", Integer, ForeignKey("p.t1.x")), schema="q" 1262 ) 1263 1264 def ref_fn(table, to_schema, constraint, referred_schema): 1265 assert table is t2 1266 eq_(to_schema, "z") 1267 eq_(referred_schema, "p") 1268 return "h" 1269 1270 self._assert_fk(t2, "z", "h.t1.x", referred_schema_fn=ref_fn) 1271 1272 def test_copy_info(self): 1273 m = MetaData() 1274 fk = ForeignKey("t2.id") 1275 c = Column("c", Integer, fk) 1276 ck = CheckConstraint("c > 5") 1277 t = Table("t", m, c, ck) 1278 1279 m.info["minfo"] = True 1280 fk.info["fkinfo"] = True 1281 c.info["cinfo"] = True 1282 ck.info["ckinfo"] = True 1283 t.info["tinfo"] = True 1284 t.primary_key.info["pkinfo"] = True 1285 fkc = [ 1286 const 1287 for const in t.constraints 1288 if isinstance(const, ForeignKeyConstraint) 1289 ][0] 1290 fkc.info["fkcinfo"] = True 1291 1292 m2 = MetaData() 1293 t2 = t.tometadata(m2) 1294 1295 m.info["minfo"] = False 1296 fk.info["fkinfo"] = False 1297 c.info["cinfo"] = False 1298 ck.info["ckinfo"] = False 1299 t.primary_key.info["pkinfo"] = False 1300 fkc.info["fkcinfo"] = False 1301 1302 eq_(m2.info, {}) 1303 eq_(t2.info, {"tinfo": True}) 1304 eq_(t2.c.c.info, {"cinfo": True}) 1305 eq_(list(t2.c.c.foreign_keys)[0].info, {"fkinfo": True}) 1306 eq_(t2.primary_key.info, {"pkinfo": True}) 1307 1308 fkc2 = [ 1309 const 1310 for const in t2.constraints 1311 if isinstance(const, ForeignKeyConstraint) 1312 ][0] 1313 eq_(fkc2.info, {"fkcinfo": True}) 1314 1315 ck2 = [ 1316 const 1317 for const in t2.constraints 1318 if isinstance(const, CheckConstraint) 1319 ][0] 1320 eq_(ck2.info, {"ckinfo": True}) 1321 1322 def test_dialect_kwargs(self): 1323 meta = MetaData() 1324 1325 table = Table( 1326 "mytable", 1327 meta, 1328 Column("myid", Integer, primary_key=True), 1329 mysql_engine="InnoDB", 1330 ) 1331 1332 meta2 = MetaData() 1333 table_c = table.tometadata(meta2) 1334 1335 eq_(table.kwargs, {"mysql_engine": "InnoDB"}) 1336 1337 eq_(table.kwargs, table_c.kwargs) 1338 1339 def test_indexes(self): 1340 meta = MetaData() 1341 1342 table = Table( 1343 "mytable", 1344 meta, 1345 Column("id", Integer, primary_key=True), 1346 Column("data1", Integer, index=True), 1347 Column("data2", Integer), 1348 Index("text", text("data1 + 1")), 1349 ) 1350 Index("multi", table.c.data1, table.c.data2) 1351 Index("func", func.abs(table.c.data1)) 1352 Index("multi-func", table.c.data1, func.abs(table.c.data2)) 1353 1354 meta2 = MetaData() 1355 table_c = table.tometadata(meta2) 1356 1357 def _get_key(i): 1358 return ( 1359 [i.name, i.unique] 1360 + sorted(i.kwargs.items()) 1361 + [str(col) for col in i.expressions] 1362 ) 1363 1364 eq_( 1365 sorted([_get_key(i) for i in table.indexes]), 1366 sorted([_get_key(i) for i in table_c.indexes]), 1367 ) 1368 1369 def test_indexes_with_col_redefine(self): 1370 meta = MetaData() 1371 1372 table = Table( 1373 "mytable", 1374 meta, 1375 Column("id", Integer, primary_key=True), 1376 Column("data1", Integer), 1377 Column("data2", Integer), 1378 Index("text", text("data1 + 1")), 1379 ) 1380 Index("multi", table.c.data1, table.c.data2) 1381 Index("func", func.abs(table.c.data1)) 1382 Index("multi-func", table.c.data1, func.abs(table.c.data2)) 1383 1384 table = Table( 1385 "mytable", 1386 meta, 1387 Column("data1", Integer), 1388 Column("data2", Integer), 1389 extend_existing=True, 1390 ) 1391 1392 meta2 = MetaData() 1393 table_c = table.tometadata(meta2) 1394 1395 def _get_key(i): 1396 return ( 1397 [i.name, i.unique] 1398 + sorted(i.kwargs.items()) 1399 + [str(col) for col in i.expressions] 1400 ) 1401 1402 eq_( 1403 sorted([_get_key(i) for i in table.indexes]), 1404 sorted([_get_key(i) for i in table_c.indexes]), 1405 ) 1406 1407 @emits_warning("Table '.+' already exists within the given MetaData") 1408 def test_already_exists(self): 1409 1410 meta1 = MetaData() 1411 table1 = Table( 1412 "mytable", meta1, Column("myid", Integer, primary_key=True) 1413 ) 1414 meta2 = MetaData() 1415 table2 = Table( 1416 "mytable", meta2, Column("yourid", Integer, primary_key=True) 1417 ) 1418 1419 table_c = table1.tometadata(meta2) 1420 table_d = table2.tometadata(meta2) 1421 1422 # d'oh! 1423 assert table_c is table_d 1424 1425 def test_default_schema_metadata(self): 1426 meta = MetaData(schema="myschema") 1427 1428 table = Table( 1429 "mytable", 1430 meta, 1431 Column("myid", Integer, primary_key=True), 1432 Column("name", String(40), nullable=True), 1433 Column( 1434 "description", String(30), CheckConstraint("description='hi'") 1435 ), 1436 UniqueConstraint("name"), 1437 ) 1438 1439 table2 = Table( 1440 "othertable", 1441 meta, 1442 Column("id", Integer, primary_key=True), 1443 Column("myid", Integer, ForeignKey("myschema.mytable.myid")), 1444 ) 1445 1446 meta2 = MetaData(schema="someschema") 1447 table_c = table.tometadata(meta2, schema=None) 1448 table2_c = table2.tometadata(meta2, schema=None) 1449 1450 eq_( 1451 str(table_c.join(table2_c).onclause), 1452 str(table_c.c.myid == table2_c.c.myid), 1453 ) 1454 eq_( 1455 str(table_c.join(table2_c).onclause), 1456 "someschema.mytable.myid = someschema.othertable.myid", 1457 ) 1458 1459 def test_strip_schema(self): 1460 meta = MetaData() 1461 1462 table = Table( 1463 "mytable", 1464 meta, 1465 Column("myid", Integer, primary_key=True), 1466 Column("name", String(40), nullable=True), 1467 Column( 1468 "description", String(30), CheckConstraint("description='hi'") 1469 ), 1470 UniqueConstraint("name"), 1471 ) 1472 1473 table2 = Table( 1474 "othertable", 1475 meta, 1476 Column("id", Integer, primary_key=True), 1477 Column("myid", Integer, ForeignKey("mytable.myid")), 1478 ) 1479 1480 meta2 = MetaData() 1481 table_c = table.tometadata(meta2, schema=None) 1482 table2_c = table2.tometadata(meta2, schema=None) 1483 1484 eq_( 1485 str(table_c.join(table2_c).onclause), 1486 str(table_c.c.myid == table2_c.c.myid), 1487 ) 1488 eq_( 1489 str(table_c.join(table2_c).onclause), 1490 "mytable.myid = othertable.myid", 1491 ) 1492 1493 def test_unique_true_flag(self): 1494 meta = MetaData() 1495 1496 table = Table("mytable", meta, Column("x", Integer, unique=True)) 1497 1498 m2 = MetaData() 1499 1500 t2 = table.tometadata(m2) 1501 1502 eq_( 1503 len( 1504 [ 1505 const 1506 for const in t2.constraints 1507 if isinstance(const, UniqueConstraint) 1508 ] 1509 ), 1510 1, 1511 ) 1512 1513 def test_index_true_flag(self): 1514 meta = MetaData() 1515 1516 table = Table("mytable", meta, Column("x", Integer, index=True)) 1517 1518 m2 = MetaData() 1519 1520 t2 = table.tometadata(m2) 1521 1522 eq_(len(t2.indexes), 1) 1523 1524 1525class InfoTest(fixtures.TestBase): 1526 def test_metadata_info(self): 1527 m1 = MetaData() 1528 eq_(m1.info, {}) 1529 1530 m1 = MetaData(info={"foo": "bar"}) 1531 eq_(m1.info, {"foo": "bar"}) 1532 1533 def test_foreignkey_constraint_info(self): 1534 fkc = ForeignKeyConstraint(["a"], ["b"], name="bar") 1535 eq_(fkc.info, {}) 1536 1537 fkc = ForeignKeyConstraint( 1538 ["a"], ["b"], name="bar", info={"foo": "bar"} 1539 ) 1540 eq_(fkc.info, {"foo": "bar"}) 1541 1542 def test_foreignkey_info(self): 1543 fkc = ForeignKey("a") 1544 eq_(fkc.info, {}) 1545 1546 fkc = ForeignKey("a", info={"foo": "bar"}) 1547 eq_(fkc.info, {"foo": "bar"}) 1548 1549 def test_primarykey_constraint_info(self): 1550 pkc = PrimaryKeyConstraint("a", name="x") 1551 eq_(pkc.info, {}) 1552 1553 pkc = PrimaryKeyConstraint("a", name="x", info={"foo": "bar"}) 1554 eq_(pkc.info, {"foo": "bar"}) 1555 1556 def test_unique_constraint_info(self): 1557 uc = UniqueConstraint("a", name="x") 1558 eq_(uc.info, {}) 1559 1560 uc = UniqueConstraint("a", name="x", info={"foo": "bar"}) 1561 eq_(uc.info, {"foo": "bar"}) 1562 1563 def test_check_constraint_info(self): 1564 cc = CheckConstraint("foo=bar", name="x") 1565 eq_(cc.info, {}) 1566 1567 cc = CheckConstraint("foo=bar", name="x", info={"foo": "bar"}) 1568 eq_(cc.info, {"foo": "bar"}) 1569 1570 def test_index_info(self): 1571 ix = Index("x", "a") 1572 eq_(ix.info, {}) 1573 1574 ix = Index("x", "a", info={"foo": "bar"}) 1575 eq_(ix.info, {"foo": "bar"}) 1576 1577 def test_column_info(self): 1578 c = Column("x", Integer) 1579 eq_(c.info, {}) 1580 1581 c = Column("x", Integer, info={"foo": "bar"}) 1582 eq_(c.info, {"foo": "bar"}) 1583 1584 def test_table_info(self): 1585 t = Table("x", MetaData()) 1586 eq_(t.info, {}) 1587 1588 t = Table("x", MetaData(), info={"foo": "bar"}) 1589 eq_(t.info, {"foo": "bar"}) 1590 1591 1592class TableTest(fixtures.TestBase, AssertsCompiledSQL): 1593 @testing.requires.temporary_tables 1594 @testing.skip_if("mssql", "different col format") 1595 def test_prefixes(self): 1596 from sqlalchemy import Table 1597 1598 table1 = Table( 1599 "temporary_table_1", 1600 MetaData(), 1601 Column("col1", Integer), 1602 prefixes=["TEMPORARY"], 1603 ) 1604 1605 self.assert_compile( 1606 schema.CreateTable(table1), 1607 "CREATE TEMPORARY TABLE temporary_table_1 (col1 INTEGER)", 1608 ) 1609 1610 table2 = Table( 1611 "temporary_table_2", 1612 MetaData(), 1613 Column("col1", Integer), 1614 prefixes=["VIRTUAL"], 1615 ) 1616 self.assert_compile( 1617 schema.CreateTable(table2), 1618 "CREATE VIRTUAL TABLE temporary_table_2 (col1 INTEGER)", 1619 ) 1620 1621 def test_table_info(self): 1622 metadata = MetaData() 1623 t1 = Table("foo", metadata, info={"x": "y"}) 1624 t2 = Table("bar", metadata, info={}) 1625 t3 = Table("bat", metadata) 1626 assert t1.info == {"x": "y"} 1627 assert t2.info == {} 1628 assert t3.info == {} 1629 for t in (t1, t2, t3): 1630 t.info["bar"] = "zip" 1631 assert t.info["bar"] == "zip" 1632 1633 def test_invalid_objects(self): 1634 assert_raises_message( 1635 tsa.exc.ArgumentError, 1636 "'SchemaItem' object, such as a 'Column' or a " 1637 "'Constraint' expected, got <.*ColumnClause at .*; q>", 1638 Table, 1639 "asdf", 1640 MetaData(), 1641 tsa.column("q", Integer), 1642 ) 1643 1644 assert_raises_message( 1645 tsa.exc.ArgumentError, 1646 r"'SchemaItem' object, such as a 'Column' or a " 1647 r"'Constraint' expected, got String\(\)", 1648 Table, 1649 "asdf", 1650 MetaData(), 1651 String(), 1652 ) 1653 1654 assert_raises_message( 1655 tsa.exc.ArgumentError, 1656 "'SchemaItem' object, such as a 'Column' or a " 1657 "'Constraint' expected, got 12", 1658 Table, 1659 "asdf", 1660 MetaData(), 1661 12, 1662 ) 1663 1664 def test_reset_exported_passes(self): 1665 1666 m = MetaData() 1667 1668 t = Table("t", m, Column("foo", Integer)) 1669 eq_(list(t.c), [t.c.foo]) 1670 1671 t._reset_exported() 1672 1673 eq_(list(t.c), [t.c.foo]) 1674 1675 def test_foreign_key_constraints_collection(self): 1676 metadata = MetaData() 1677 t1 = Table("foo", metadata, Column("a", Integer)) 1678 eq_(t1.foreign_key_constraints, set()) 1679 1680 fk1 = ForeignKey("q.id") 1681 fk2 = ForeignKey("j.id") 1682 fk3 = ForeignKeyConstraint(["b", "c"], ["r.x", "r.y"]) 1683 1684 t1.append_column(Column("b", Integer, fk1)) 1685 eq_(t1.foreign_key_constraints, set([fk1.constraint])) 1686 1687 t1.append_column(Column("c", Integer, fk2)) 1688 eq_(t1.foreign_key_constraints, set([fk1.constraint, fk2.constraint])) 1689 1690 t1.append_constraint(fk3) 1691 eq_( 1692 t1.foreign_key_constraints, 1693 set([fk1.constraint, fk2.constraint, fk3]), 1694 ) 1695 1696 def test_c_immutable(self): 1697 m = MetaData() 1698 t1 = Table("t", m, Column("x", Integer), Column("y", Integer)) 1699 assert_raises(TypeError, t1.c.extend, [Column("z", Integer)]) 1700 1701 def assign(): 1702 t1.c["z"] = Column("z", Integer) 1703 1704 assert_raises(TypeError, assign) 1705 1706 def assign2(): 1707 t1.c.z = Column("z", Integer) 1708 1709 assert_raises(TypeError, assign2) 1710 1711 def test_c_mutate_after_unpickle(self): 1712 m = MetaData() 1713 1714 y = Column("y", Integer) 1715 t1 = Table("t", m, Column("x", Integer), y) 1716 1717 t2 = pickle.loads(pickle.dumps(t1)) 1718 z = Column("z", Integer) 1719 g = Column("g", Integer) 1720 t2.append_column(z) 1721 1722 is_(t1.c.contains_column(y), True) 1723 is_(t2.c.contains_column(y), False) 1724 y2 = t2.c.y 1725 is_(t2.c.contains_column(y2), True) 1726 1727 is_(t2.c.contains_column(z), True) 1728 is_(t2.c.contains_column(g), False) 1729 1730 def test_autoincrement_replace(self): 1731 m = MetaData() 1732 1733 t = Table("t", m, Column("id", Integer, primary_key=True)) 1734 1735 is_(t._autoincrement_column, t.c.id) 1736 1737 t = Table( 1738 "t", 1739 m, 1740 Column("id", Integer, primary_key=True), 1741 extend_existing=True, 1742 ) 1743 is_(t._autoincrement_column, t.c.id) 1744 1745 def test_pk_args_standalone(self): 1746 m = MetaData() 1747 t = Table( 1748 "t", 1749 m, 1750 Column("x", Integer, primary_key=True), 1751 PrimaryKeyConstraint(mssql_clustered=True), 1752 ) 1753 eq_(list(t.primary_key), [t.c.x]) 1754 eq_(t.primary_key.dialect_kwargs, {"mssql_clustered": True}) 1755 1756 def test_pk_cols_sets_flags(self): 1757 m = MetaData() 1758 t = Table( 1759 "t", 1760 m, 1761 Column("x", Integer), 1762 Column("y", Integer), 1763 Column("z", Integer), 1764 PrimaryKeyConstraint("x", "y"), 1765 ) 1766 eq_(t.c.x.primary_key, True) 1767 eq_(t.c.y.primary_key, True) 1768 eq_(t.c.z.primary_key, False) 1769 1770 def test_pk_col_mismatch_one(self): 1771 m = MetaData() 1772 assert_raises_message( 1773 exc.SAWarning, 1774 "Table 't' specifies columns 'x' as primary_key=True, " 1775 "not matching locally specified columns 'q'", 1776 Table, 1777 "t", 1778 m, 1779 Column("x", Integer, primary_key=True), 1780 Column("q", Integer), 1781 PrimaryKeyConstraint("q"), 1782 ) 1783 1784 def test_pk_col_mismatch_two(self): 1785 m = MetaData() 1786 assert_raises_message( 1787 exc.SAWarning, 1788 "Table 't' specifies columns 'a', 'b', 'c' as primary_key=True, " 1789 "not matching locally specified columns 'b', 'c'", 1790 Table, 1791 "t", 1792 m, 1793 Column("a", Integer, primary_key=True), 1794 Column("b", Integer, primary_key=True), 1795 Column("c", Integer, primary_key=True), 1796 PrimaryKeyConstraint("b", "c"), 1797 ) 1798 1799 @testing.emits_warning("Table 't'") 1800 def test_pk_col_mismatch_three(self): 1801 m = MetaData() 1802 t = Table( 1803 "t", 1804 m, 1805 Column("x", Integer, primary_key=True), 1806 Column("q", Integer), 1807 PrimaryKeyConstraint("q"), 1808 ) 1809 eq_(list(t.primary_key), [t.c.q]) 1810 1811 @testing.emits_warning("Table 't'") 1812 def test_pk_col_mismatch_four(self): 1813 m = MetaData() 1814 t = Table( 1815 "t", 1816 m, 1817 Column("a", Integer, primary_key=True), 1818 Column("b", Integer, primary_key=True), 1819 Column("c", Integer, primary_key=True), 1820 PrimaryKeyConstraint("b", "c"), 1821 ) 1822 eq_(list(t.primary_key), [t.c.b, t.c.c]) 1823 1824 def test_pk_always_flips_nullable(self): 1825 m = MetaData() 1826 1827 t1 = Table("t1", m, Column("x", Integer), PrimaryKeyConstraint("x")) 1828 1829 t2 = Table("t2", m, Column("x", Integer, primary_key=True)) 1830 1831 eq_(list(t1.primary_key), [t1.c.x]) 1832 1833 eq_(list(t2.primary_key), [t2.c.x]) 1834 1835 assert t1.c.x.primary_key 1836 assert t2.c.x.primary_key 1837 1838 assert not t2.c.x.nullable 1839 assert not t1.c.x.nullable 1840 1841 1842class PKAutoIncrementTest(fixtures.TestBase): 1843 def test_multi_integer_no_autoinc(self): 1844 pk = PrimaryKeyConstraint(Column("a", Integer), Column("b", Integer)) 1845 t = Table("t", MetaData()) 1846 t.append_constraint(pk) 1847 1848 is_(pk._autoincrement_column, None) 1849 1850 def test_multi_integer_multi_autoinc(self): 1851 pk = PrimaryKeyConstraint( 1852 Column("a", Integer, autoincrement=True), 1853 Column("b", Integer, autoincrement=True), 1854 ) 1855 t = Table("t", MetaData()) 1856 t.append_constraint(pk) 1857 1858 assert_raises_message( 1859 exc.ArgumentError, 1860 "Only one Column may be marked", 1861 lambda: pk._autoincrement_column, 1862 ) 1863 1864 def test_single_integer_no_autoinc(self): 1865 pk = PrimaryKeyConstraint(Column("a", Integer)) 1866 t = Table("t", MetaData()) 1867 t.append_constraint(pk) 1868 1869 is_(pk._autoincrement_column, pk.columns["a"]) 1870 1871 def test_single_string_no_autoinc(self): 1872 pk = PrimaryKeyConstraint(Column("a", String)) 1873 t = Table("t", MetaData()) 1874 t.append_constraint(pk) 1875 1876 is_(pk._autoincrement_column, None) 1877 1878 def test_single_string_illegal_autoinc(self): 1879 t = Table("t", MetaData(), Column("a", String, autoincrement=True)) 1880 pk = PrimaryKeyConstraint(t.c.a) 1881 t.append_constraint(pk) 1882 1883 assert_raises_message( 1884 exc.ArgumentError, 1885 "Column type VARCHAR on column 't.a'", 1886 lambda: pk._autoincrement_column, 1887 ) 1888 1889 def test_single_integer_default(self): 1890 t = Table( 1891 "t", 1892 MetaData(), 1893 Column("a", Integer, autoincrement=True, default=lambda: 1), 1894 ) 1895 pk = PrimaryKeyConstraint(t.c.a) 1896 t.append_constraint(pk) 1897 1898 is_(pk._autoincrement_column, t.c.a) 1899 1900 def test_single_integer_server_default(self): 1901 # new as of 1.1; now that we have three states for autoincrement, 1902 # if the user puts autoincrement=True with a server_default, trust 1903 # them on it 1904 t = Table( 1905 "t", 1906 MetaData(), 1907 Column( 1908 "a", Integer, autoincrement=True, server_default=func.magic() 1909 ), 1910 ) 1911 pk = PrimaryKeyConstraint(t.c.a) 1912 t.append_constraint(pk) 1913 1914 is_(pk._autoincrement_column, t.c.a) 1915 1916 def test_implicit_autoinc_but_fks(self): 1917 m = MetaData() 1918 Table("t1", m, Column("id", Integer, primary_key=True)) 1919 t2 = Table("t2", MetaData(), Column("a", Integer, ForeignKey("t1.id"))) 1920 pk = PrimaryKeyConstraint(t2.c.a) 1921 t2.append_constraint(pk) 1922 is_(pk._autoincrement_column, None) 1923 1924 def test_explicit_autoinc_but_fks(self): 1925 m = MetaData() 1926 Table("t1", m, Column("id", Integer, primary_key=True)) 1927 t2 = Table( 1928 "t2", 1929 MetaData(), 1930 Column("a", Integer, ForeignKey("t1.id"), autoincrement=True), 1931 ) 1932 pk = PrimaryKeyConstraint(t2.c.a) 1933 t2.append_constraint(pk) 1934 is_(pk._autoincrement_column, t2.c.a) 1935 1936 t3 = Table( 1937 "t3", 1938 MetaData(), 1939 Column( 1940 "a", Integer, ForeignKey("t1.id"), autoincrement="ignore_fk" 1941 ), 1942 ) 1943 pk = PrimaryKeyConstraint(t3.c.a) 1944 t3.append_constraint(pk) 1945 is_(pk._autoincrement_column, t3.c.a) 1946 1947 1948class SchemaTypeTest(fixtures.TestBase): 1949 __backend__ = True 1950 1951 class TrackEvents(object): 1952 column = None 1953 table = None 1954 evt_targets = () 1955 1956 def _set_table(self, column, table): 1957 super(SchemaTypeTest.TrackEvents, self)._set_table(column, table) 1958 self.column = column 1959 self.table = table 1960 1961 def _on_table_create(self, target, bind, **kw): 1962 super(SchemaTypeTest.TrackEvents, self)._on_table_create( 1963 target, bind, **kw 1964 ) 1965 self.evt_targets += (target,) 1966 1967 def _on_metadata_create(self, target, bind, **kw): 1968 super(SchemaTypeTest.TrackEvents, self)._on_metadata_create( 1969 target, bind, **kw 1970 ) 1971 self.evt_targets += (target,) 1972 1973 # TODO: Enum and Boolean put TypeEngine first. Changing that here 1974 # causes collection-mutate-while-iterated errors in the event system 1975 # since the hooks here call upon the adapted type. Need to figure out 1976 # why Enum and Boolean don't have this problem. 1977 class MyType(TrackEvents, sqltypes.SchemaType, sqltypes.TypeEngine): 1978 pass 1979 1980 class WrapEnum(TrackEvents, Enum): 1981 pass 1982 1983 class WrapBoolean(TrackEvents, Boolean): 1984 pass 1985 1986 class MyTypeWImpl(MyType): 1987 def _gen_dialect_impl(self, dialect): 1988 return self.adapt(SchemaTypeTest.MyTypeImpl) 1989 1990 class MyTypeImpl(MyTypeWImpl): 1991 pass 1992 1993 class MyTypeDecAndSchema(TypeDecorator, sqltypes.SchemaType): 1994 impl = String() 1995 1996 evt_targets = () 1997 1998 def __init__(self): 1999 TypeDecorator.__init__(self) 2000 sqltypes.SchemaType.__init__(self) 2001 2002 def _on_table_create(self, target, bind, **kw): 2003 self.evt_targets += (target,) 2004 2005 def _on_metadata_create(self, target, bind, **kw): 2006 self.evt_targets += (target,) 2007 2008 def test_before_parent_attach_plain(self): 2009 typ = self.MyType() 2010 self._test_before_parent_attach(typ) 2011 2012 def test_before_parent_attach_typedec_enclosing_schematype(self): 2013 # additional test for [ticket:2919] as part of test for 2014 # [ticket:3832] 2015 # this also serves as the test for [ticket:6152] 2016 2017 class MySchemaType(sqltypes.TypeEngine, sqltypes.SchemaType): 2018 pass 2019 2020 target_typ = MySchemaType() 2021 2022 class MyType(TypeDecorator): 2023 impl = target_typ 2024 2025 typ = MyType() 2026 self._test_before_parent_attach(typ, target_typ) 2027 2028 def test_before_parent_attach_array_enclosing_schematype(self): 2029 # test for [ticket:4141] which is the same idea as [ticket:3832] 2030 # for ARRAY 2031 2032 typ = ARRAY(String) 2033 2034 self._test_before_parent_attach(typ) 2035 2036 def test_before_parent_attach_typedec_of_schematype(self): 2037 class MyType(TypeDecorator, sqltypes.SchemaType): 2038 impl = String 2039 2040 typ = MyType() 2041 self._test_before_parent_attach(typ) 2042 2043 def test_before_parent_attach_schematype_of_typedec(self): 2044 class MyType(sqltypes.SchemaType, TypeDecorator): 2045 impl = String 2046 2047 typ = MyType() 2048 self._test_before_parent_attach(typ) 2049 2050 def _test_before_parent_attach(self, typ, evt_target=None): 2051 canary = mock.Mock() 2052 2053 if evt_target is None: 2054 evt_target = typ 2055 2056 orig_set_parent = evt_target._set_parent 2057 orig_set_parent_w_dispatch = evt_target._set_parent_with_dispatch 2058 2059 def _set_parent(parent, **kw): 2060 orig_set_parent(parent, **kw) 2061 canary._set_parent(parent) 2062 2063 def _set_parent_w_dispatch(parent): 2064 orig_set_parent_w_dispatch(parent) 2065 canary._set_parent_with_dispatch(parent) 2066 2067 with mock.patch.object(evt_target, "_set_parent", _set_parent): 2068 with mock.patch.object( 2069 evt_target, "_set_parent_with_dispatch", _set_parent_w_dispatch 2070 ): 2071 event.listen(evt_target, "before_parent_attach", canary.go) 2072 2073 c = Column("q", typ) 2074 2075 eq_( 2076 canary.mock_calls, 2077 [ 2078 mock.call.go(evt_target, c), 2079 mock.call._set_parent(c), 2080 mock.call._set_parent_with_dispatch(c), 2081 ], 2082 ) 2083 2084 def test_independent_schema(self): 2085 m = MetaData() 2086 type_ = self.MyType(schema="q") 2087 t1 = Table("x", m, Column("y", type_), schema="z") 2088 eq_(t1.c.y.type.schema, "q") 2089 2090 def test_inherit_schema(self): 2091 m = MetaData() 2092 type_ = self.MyType(schema="q", inherit_schema=True) 2093 t1 = Table("x", m, Column("y", type_), schema="z") 2094 eq_(t1.c.y.type.schema, "z") 2095 2096 def test_independent_schema_enum(self): 2097 m = MetaData() 2098 type_ = sqltypes.Enum("a", schema="q") 2099 t1 = Table("x", m, Column("y", type_), schema="z") 2100 eq_(t1.c.y.type.schema, "q") 2101 2102 def test_inherit_schema_enum(self): 2103 m = MetaData() 2104 type_ = sqltypes.Enum("a", "b", "c", schema="q", inherit_schema=True) 2105 t1 = Table("x", m, Column("y", type_), schema="z") 2106 eq_(t1.c.y.type.schema, "z") 2107 2108 def test_tometadata_copy_type(self): 2109 m1 = MetaData() 2110 2111 type_ = self.MyType() 2112 t1 = Table("x", m1, Column("y", type_)) 2113 2114 m2 = MetaData() 2115 t2 = t1.tometadata(m2) 2116 2117 # metadata isn't set 2118 is_(t2.c.y.type.metadata, None) 2119 2120 # our test type sets table, though 2121 is_(t2.c.y.type.table, t2) 2122 2123 def test_tometadata_copy_decorated(self): 2124 class MyDecorated(TypeDecorator): 2125 impl = self.MyType 2126 2127 m1 = MetaData() 2128 2129 type_ = MyDecorated(schema="z") 2130 t1 = Table("x", m1, Column("y", type_)) 2131 2132 m2 = MetaData() 2133 t2 = t1.tometadata(m2) 2134 eq_(t2.c.y.type.schema, "z") 2135 2136 def test_tometadata_independent_schema(self): 2137 m1 = MetaData() 2138 2139 type_ = self.MyType() 2140 t1 = Table("x", m1, Column("y", type_)) 2141 2142 m2 = MetaData() 2143 t2 = t1.tometadata(m2, schema="bar") 2144 2145 eq_(t2.c.y.type.schema, None) 2146 2147 def test_tometadata_inherit_schema(self): 2148 m1 = MetaData() 2149 2150 type_ = self.MyType(inherit_schema=True) 2151 t1 = Table("x", m1, Column("y", type_)) 2152 2153 m2 = MetaData() 2154 t2 = t1.tometadata(m2, schema="bar") 2155 2156 eq_(t1.c.y.type.schema, None) 2157 eq_(t2.c.y.type.schema, "bar") 2158 2159 def test_tometadata_independent_events(self): 2160 m1 = MetaData() 2161 2162 type_ = self.MyType() 2163 t1 = Table("x", m1, Column("y", type_)) 2164 2165 m2 = MetaData() 2166 t2 = t1.tometadata(m2) 2167 2168 t1.dispatch.before_create(t1, testing.db) 2169 eq_(t1.c.y.type.evt_targets, (t1,)) 2170 eq_(t2.c.y.type.evt_targets, ()) 2171 2172 t2.dispatch.before_create(t2, testing.db) 2173 t2.dispatch.before_create(t2, testing.db) 2174 eq_(t1.c.y.type.evt_targets, (t1,)) 2175 eq_(t2.c.y.type.evt_targets, (t2, t2)) 2176 2177 def test_enum_column_copy_transfers_events(self): 2178 m = MetaData() 2179 2180 type_ = self.WrapEnum("a", "b", "c", name="foo") 2181 y = Column("y", type_) 2182 y_copy = y.copy() 2183 t1 = Table("x", m, y_copy) 2184 2185 is_true(y_copy.type._create_events) 2186 2187 # for PostgreSQL, this will emit CREATE TYPE 2188 m.dispatch.before_create(t1, testing.db) 2189 try: 2190 eq_(t1.c.y.type.evt_targets, (t1,)) 2191 finally: 2192 # do the drop so that PostgreSQL emits DROP TYPE 2193 m.dispatch.after_drop(t1, testing.db) 2194 2195 def test_enum_nonnative_column_copy_transfers_events(self): 2196 m = MetaData() 2197 2198 type_ = self.WrapEnum("a", "b", "c", name="foo", native_enum=False) 2199 y = Column("y", type_) 2200 y_copy = y.copy() 2201 t1 = Table("x", m, y_copy) 2202 2203 is_true(y_copy.type._create_events) 2204 2205 m.dispatch.before_create(t1, testing.db) 2206 eq_(t1.c.y.type.evt_targets, (t1,)) 2207 2208 def test_enum_nonnative_column_copy_transfers_constraintpref(self): 2209 m = MetaData() 2210 2211 type_ = self.WrapEnum( 2212 "a", 2213 "b", 2214 "c", 2215 name="foo", 2216 native_enum=False, 2217 create_constraint=False, 2218 ) 2219 y = Column("y", type_) 2220 y_copy = y.copy() 2221 Table("x", m, y_copy) 2222 2223 is_false(y_copy.type.create_constraint) 2224 2225 def test_boolean_column_copy_transfers_events(self): 2226 m = MetaData() 2227 2228 type_ = self.WrapBoolean() 2229 y = Column("y", type_) 2230 y_copy = y.copy() 2231 Table("x", m, y_copy) 2232 2233 is_true(y_copy.type._create_events) 2234 2235 def test_boolean_nonnative_column_copy_transfers_constraintpref(self): 2236 m = MetaData() 2237 2238 type_ = self.WrapBoolean(create_constraint=False) 2239 y = Column("y", type_) 2240 y_copy = y.copy() 2241 Table("x", m, y_copy) 2242 2243 is_false(y_copy.type.create_constraint) 2244 2245 def test_metadata_dispatch_no_new_impl(self): 2246 m1 = MetaData() 2247 typ = self.MyType(metadata=m1) 2248 m1.dispatch.before_create(m1, testing.db) 2249 eq_(typ.evt_targets, (m1,)) 2250 2251 dialect_impl = typ.dialect_impl(testing.db.dialect) 2252 eq_(dialect_impl.evt_targets, ()) 2253 2254 def test_metadata_dispatch_new_impl(self): 2255 m1 = MetaData() 2256 typ = self.MyTypeWImpl(metadata=m1) 2257 m1.dispatch.before_create(m1, testing.db) 2258 eq_(typ.evt_targets, (m1,)) 2259 2260 dialect_impl = typ.dialect_impl(testing.db.dialect) 2261 eq_(dialect_impl.evt_targets, (m1,)) 2262 2263 def test_table_dispatch_decorator_schematype(self): 2264 m1 = MetaData() 2265 typ = self.MyTypeDecAndSchema() 2266 t1 = Table("t1", m1, Column("x", typ)) 2267 m1.dispatch.before_create(t1, testing.db) 2268 eq_(typ.evt_targets, (t1,)) 2269 2270 def test_table_dispatch_no_new_impl(self): 2271 m1 = MetaData() 2272 typ = self.MyType() 2273 t1 = Table("t1", m1, Column("x", typ)) 2274 m1.dispatch.before_create(t1, testing.db) 2275 eq_(typ.evt_targets, (t1,)) 2276 2277 dialect_impl = typ.dialect_impl(testing.db.dialect) 2278 eq_(dialect_impl.evt_targets, ()) 2279 2280 def test_table_dispatch_new_impl(self): 2281 m1 = MetaData() 2282 typ = self.MyTypeWImpl() 2283 t1 = Table("t1", m1, Column("x", typ)) 2284 m1.dispatch.before_create(t1, testing.db) 2285 eq_(typ.evt_targets, (t1,)) 2286 2287 dialect_impl = typ.dialect_impl(testing.db.dialect) 2288 eq_(dialect_impl.evt_targets, (t1,)) 2289 2290 def test_create_metadata_bound_no_crash(self): 2291 m1 = MetaData() 2292 self.MyType(metadata=m1) 2293 2294 m1.create_all(testing.db) 2295 2296 def test_boolean_constraint_type_doesnt_double(self): 2297 m1 = MetaData() 2298 2299 t1 = Table("x", m1, Column("flag", Boolean())) 2300 eq_( 2301 len([c for c in t1.constraints if isinstance(c, CheckConstraint)]), 2302 1, 2303 ) 2304 m2 = MetaData() 2305 t2 = t1.tometadata(m2) 2306 2307 eq_( 2308 len([c for c in t2.constraints if isinstance(c, CheckConstraint)]), 2309 1, 2310 ) 2311 2312 def test_enum_constraint_type_doesnt_double(self): 2313 m1 = MetaData() 2314 2315 t1 = Table("x", m1, Column("flag", Enum("a", "b", "c"))) 2316 eq_( 2317 len([c for c in t1.constraints if isinstance(c, CheckConstraint)]), 2318 1, 2319 ) 2320 m2 = MetaData() 2321 t2 = t1.tometadata(m2) 2322 2323 eq_( 2324 len([c for c in t2.constraints if isinstance(c, CheckConstraint)]), 2325 1, 2326 ) 2327 2328 2329class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): 2330 def test_default_schema_metadata_fk(self): 2331 m = MetaData(schema="foo") 2332 t1 = Table("t1", m, Column("x", Integer)) 2333 t2 = Table("t2", m, Column("x", Integer, ForeignKey("t1.x"))) 2334 assert t2.c.x.references(t1.c.x) 2335 2336 def test_ad_hoc_schema_equiv_fk(self): 2337 m = MetaData() 2338 t1 = Table("t1", m, Column("x", Integer), schema="foo") 2339 t2 = Table( 2340 "t2", m, Column("x", Integer, ForeignKey("t1.x")), schema="foo" 2341 ) 2342 assert_raises( 2343 exc.NoReferencedTableError, lambda: t2.c.x.references(t1.c.x) 2344 ) 2345 2346 def test_default_schema_metadata_fk_alt_remote(self): 2347 m = MetaData(schema="foo") 2348 t1 = Table("t1", m, Column("x", Integer)) 2349 t2 = Table( 2350 "t2", m, Column("x", Integer, ForeignKey("t1.x")), schema="bar" 2351 ) 2352 assert t2.c.x.references(t1.c.x) 2353 2354 def test_default_schema_metadata_fk_alt_local_raises(self): 2355 m = MetaData(schema="foo") 2356 t1 = Table("t1", m, Column("x", Integer), schema="bar") 2357 t2 = Table("t2", m, Column("x", Integer, ForeignKey("t1.x"))) 2358 assert_raises( 2359 exc.NoReferencedTableError, lambda: t2.c.x.references(t1.c.x) 2360 ) 2361 2362 def test_default_schema_metadata_fk_alt_local(self): 2363 m = MetaData(schema="foo") 2364 t1 = Table("t1", m, Column("x", Integer), schema="bar") 2365 t2 = Table("t2", m, Column("x", Integer, ForeignKey("bar.t1.x"))) 2366 assert t2.c.x.references(t1.c.x) 2367 2368 def test_create_drop_schema(self): 2369 2370 self.assert_compile( 2371 schema.CreateSchema("sa_schema"), "CREATE SCHEMA sa_schema" 2372 ) 2373 self.assert_compile( 2374 schema.DropSchema("sa_schema"), "DROP SCHEMA sa_schema" 2375 ) 2376 self.assert_compile( 2377 schema.DropSchema("sa_schema", cascade=True), 2378 "DROP SCHEMA sa_schema CASCADE", 2379 ) 2380 2381 def test_iteration(self): 2382 metadata = MetaData() 2383 table1 = Table( 2384 "table1", 2385 metadata, 2386 Column("col1", Integer, primary_key=True), 2387 schema="someschema", 2388 ) 2389 table2 = Table( 2390 "table2", 2391 metadata, 2392 Column("col1", Integer, primary_key=True), 2393 Column("col2", Integer, ForeignKey("someschema.table1.col1")), 2394 schema="someschema", 2395 ) 2396 2397 t1 = str(schema.CreateTable(table1).compile(bind=testing.db)) 2398 t2 = str(schema.CreateTable(table2).compile(bind=testing.db)) 2399 if testing.db.dialect.preparer(testing.db.dialect).omit_schema: 2400 assert t1.index("CREATE TABLE table1") > -1 2401 assert t2.index("CREATE TABLE table2") > -1 2402 else: 2403 assert t1.index("CREATE TABLE someschema.table1") > -1 2404 assert t2.index("CREATE TABLE someschema.table2") > -1 2405 2406 2407class UseExistingTest(fixtures.TablesTest): 2408 @classmethod 2409 def define_tables(cls, metadata): 2410 Table( 2411 "users", 2412 metadata, 2413 Column("id", Integer, primary_key=True), 2414 Column("name", String(30)), 2415 ) 2416 2417 def _useexisting_fixture(self): 2418 meta2 = MetaData(testing.db) 2419 Table("users", meta2, autoload=True) 2420 return meta2 2421 2422 def _notexisting_fixture(self): 2423 return MetaData(testing.db) 2424 2425 def test_exception_no_flags(self): 2426 meta2 = self._useexisting_fixture() 2427 2428 def go(): 2429 Table("users", meta2, Column("name", Unicode), autoload=True) 2430 2431 assert_raises_message( 2432 exc.InvalidRequestError, 2433 "Table 'users' is already defined for this " "MetaData instance.", 2434 go, 2435 ) 2436 2437 def test_keep_plus_existing_raises(self): 2438 meta2 = self._useexisting_fixture() 2439 assert_raises( 2440 exc.ArgumentError, 2441 Table, 2442 "users", 2443 meta2, 2444 keep_existing=True, 2445 extend_existing=True, 2446 ) 2447 2448 def test_keep_existing_no_dupe_constraints(self): 2449 meta2 = self._notexisting_fixture() 2450 users = Table( 2451 "users", 2452 meta2, 2453 Column("id", Integer), 2454 Column("name", Unicode), 2455 UniqueConstraint("name"), 2456 keep_existing=True, 2457 ) 2458 assert "name" in users.c 2459 assert "id" in users.c 2460 eq_(len(users.constraints), 2) 2461 2462 u2 = Table( 2463 "users", 2464 meta2, 2465 Column("id", Integer), 2466 Column("name", Unicode), 2467 UniqueConstraint("name"), 2468 keep_existing=True, 2469 ) 2470 eq_(len(u2.constraints), 2) 2471 2472 def test_extend_existing_dupes_constraints(self): 2473 meta2 = self._notexisting_fixture() 2474 users = Table( 2475 "users", 2476 meta2, 2477 Column("id", Integer), 2478 Column("name", Unicode), 2479 UniqueConstraint("name"), 2480 extend_existing=True, 2481 ) 2482 assert "name" in users.c 2483 assert "id" in users.c 2484 eq_(len(users.constraints), 2) 2485 2486 u2 = Table( 2487 "users", 2488 meta2, 2489 Column("id", Integer), 2490 Column("name", Unicode), 2491 UniqueConstraint("name"), 2492 extend_existing=True, 2493 ) 2494 # constraint got duped 2495 eq_(len(u2.constraints), 3) 2496 2497 def test_keep_existing_coltype(self): 2498 meta2 = self._useexisting_fixture() 2499 users = Table( 2500 "users", 2501 meta2, 2502 Column("name", Unicode), 2503 autoload=True, 2504 keep_existing=True, 2505 ) 2506 assert not isinstance(users.c.name.type, Unicode) 2507 2508 def test_keep_existing_quote(self): 2509 meta2 = self._useexisting_fixture() 2510 users = Table( 2511 "users", meta2, quote=True, autoload=True, keep_existing=True 2512 ) 2513 assert not users.name.quote 2514 2515 def test_keep_existing_add_column(self): 2516 meta2 = self._useexisting_fixture() 2517 users = Table( 2518 "users", 2519 meta2, 2520 Column("foo", Integer), 2521 autoload=True, 2522 keep_existing=True, 2523 ) 2524 assert "foo" not in users.c 2525 2526 def test_keep_existing_coltype_no_orig(self): 2527 meta2 = self._notexisting_fixture() 2528 users = Table( 2529 "users", 2530 meta2, 2531 Column("name", Unicode), 2532 autoload=True, 2533 keep_existing=True, 2534 ) 2535 assert isinstance(users.c.name.type, Unicode) 2536 2537 @testing.skip_if( 2538 lambda: testing.db.dialect.requires_name_normalize, 2539 "test depends on lowercase as case insensitive", 2540 ) 2541 def test_keep_existing_quote_no_orig(self): 2542 meta2 = self._notexisting_fixture() 2543 users = Table( 2544 "users", meta2, quote=True, autoload=True, keep_existing=True 2545 ) 2546 assert users.name.quote 2547 2548 def test_keep_existing_add_column_no_orig(self): 2549 meta2 = self._notexisting_fixture() 2550 users = Table( 2551 "users", 2552 meta2, 2553 Column("foo", Integer), 2554 autoload=True, 2555 keep_existing=True, 2556 ) 2557 assert "foo" in users.c 2558 2559 def test_keep_existing_coltype_no_reflection(self): 2560 meta2 = self._useexisting_fixture() 2561 users = Table( 2562 "users", meta2, Column("name", Unicode), keep_existing=True 2563 ) 2564 assert not isinstance(users.c.name.type, Unicode) 2565 2566 def test_keep_existing_quote_no_reflection(self): 2567 meta2 = self._useexisting_fixture() 2568 users = Table("users", meta2, quote=True, keep_existing=True) 2569 assert not users.name.quote 2570 2571 def test_keep_existing_add_column_no_reflection(self): 2572 meta2 = self._useexisting_fixture() 2573 users = Table( 2574 "users", meta2, Column("foo", Integer), keep_existing=True 2575 ) 2576 assert "foo" not in users.c 2577 2578 def test_extend_existing_coltype(self): 2579 meta2 = self._useexisting_fixture() 2580 users = Table( 2581 "users", 2582 meta2, 2583 Column("name", Unicode), 2584 autoload=True, 2585 extend_existing=True, 2586 ) 2587 assert isinstance(users.c.name.type, Unicode) 2588 2589 def test_extend_existing_quote(self): 2590 meta2 = self._useexisting_fixture() 2591 assert_raises_message( 2592 tsa.exc.ArgumentError, 2593 "Can't redefine 'quote' or 'quote_schema' arguments", 2594 Table, 2595 "users", 2596 meta2, 2597 quote=True, 2598 autoload=True, 2599 extend_existing=True, 2600 ) 2601 2602 def test_extend_existing_add_column(self): 2603 meta2 = self._useexisting_fixture() 2604 users = Table( 2605 "users", 2606 meta2, 2607 Column("foo", Integer), 2608 autoload=True, 2609 extend_existing=True, 2610 ) 2611 assert "foo" in users.c 2612 2613 def test_extend_existing_coltype_no_orig(self): 2614 meta2 = self._notexisting_fixture() 2615 users = Table( 2616 "users", 2617 meta2, 2618 Column("name", Unicode), 2619 autoload=True, 2620 extend_existing=True, 2621 ) 2622 assert isinstance(users.c.name.type, Unicode) 2623 2624 @testing.skip_if( 2625 lambda: testing.db.dialect.requires_name_normalize, 2626 "test depends on lowercase as case insensitive", 2627 ) 2628 def test_extend_existing_quote_no_orig(self): 2629 meta2 = self._notexisting_fixture() 2630 users = Table( 2631 "users", meta2, quote=True, autoload=True, extend_existing=True 2632 ) 2633 assert users.name.quote 2634 2635 def test_extend_existing_add_column_no_orig(self): 2636 meta2 = self._notexisting_fixture() 2637 users = Table( 2638 "users", 2639 meta2, 2640 Column("foo", Integer), 2641 autoload=True, 2642 extend_existing=True, 2643 ) 2644 assert "foo" in users.c 2645 2646 def test_extend_existing_coltype_no_reflection(self): 2647 meta2 = self._useexisting_fixture() 2648 users = Table( 2649 "users", meta2, Column("name", Unicode), extend_existing=True 2650 ) 2651 assert isinstance(users.c.name.type, Unicode) 2652 2653 def test_extend_existing_quote_no_reflection(self): 2654 meta2 = self._useexisting_fixture() 2655 assert_raises_message( 2656 tsa.exc.ArgumentError, 2657 "Can't redefine 'quote' or 'quote_schema' arguments", 2658 Table, 2659 "users", 2660 meta2, 2661 quote=True, 2662 extend_existing=True, 2663 ) 2664 2665 def test_extend_existing_add_column_no_reflection(self): 2666 meta2 = self._useexisting_fixture() 2667 users = Table( 2668 "users", meta2, Column("foo", Integer), extend_existing=True 2669 ) 2670 assert "foo" in users.c 2671 2672 2673class ConstraintTest(fixtures.TestBase): 2674 def _single_fixture(self): 2675 m = MetaData() 2676 2677 t1 = Table("t1", m, Column("a", Integer), Column("b", Integer)) 2678 2679 t2 = Table("t2", m, Column("a", Integer, ForeignKey("t1.a"))) 2680 2681 t3 = Table("t3", m, Column("a", Integer)) 2682 return t1, t2, t3 2683 2684 def _assert_index_col_x(self, t, i, columns=True): 2685 eq_(t.indexes, set([i])) 2686 if columns: 2687 eq_(list(i.columns), [t.c.x]) 2688 else: 2689 eq_(list(i.columns), []) 2690 assert i.table is t 2691 2692 def test_separate_decl_columns(self): 2693 m = MetaData() 2694 t = Table("t", m, Column("x", Integer)) 2695 i = Index("i", t.c.x) 2696 self._assert_index_col_x(t, i) 2697 2698 def test_separate_decl_columns_functional(self): 2699 m = MetaData() 2700 t = Table("t", m, Column("x", Integer)) 2701 i = Index("i", func.foo(t.c.x)) 2702 self._assert_index_col_x(t, i) 2703 2704 def test_index_no_cols_private_table_arg(self): 2705 m = MetaData() 2706 t = Table("t", m, Column("x", Integer)) 2707 i = Index("i", _table=t) 2708 is_(i.table, t) 2709 eq_(list(i.columns), []) 2710 2711 def test_index_w_cols_private_table_arg(self): 2712 m = MetaData() 2713 t = Table("t", m, Column("x", Integer)) 2714 i = Index("i", t.c.x, _table=t) 2715 is_(i.table, t) 2716 2717 eq_(i.columns, [t.c.x]) 2718 2719 def test_inline_decl_columns(self): 2720 m = MetaData() 2721 c = Column("x", Integer) 2722 i = Index("i", c) 2723 t = Table("t", m, c, i) 2724 self._assert_index_col_x(t, i) 2725 2726 def test_inline_decl_columns_functional(self): 2727 m = MetaData() 2728 c = Column("x", Integer) 2729 i = Index("i", func.foo(c)) 2730 t = Table("t", m, c, i) 2731 self._assert_index_col_x(t, i) 2732 2733 def test_inline_decl_string(self): 2734 m = MetaData() 2735 i = Index("i", "x") 2736 t = Table("t", m, Column("x", Integer), i) 2737 self._assert_index_col_x(t, i) 2738 2739 def test_inline_decl_textonly(self): 2740 m = MetaData() 2741 i = Index("i", text("foobar(x)")) 2742 t = Table("t", m, Column("x", Integer), i) 2743 self._assert_index_col_x(t, i, columns=False) 2744 2745 def test_separate_decl_textonly(self): 2746 m = MetaData() 2747 i = Index("i", text("foobar(x)")) 2748 t = Table("t", m, Column("x", Integer)) 2749 t.append_constraint(i) 2750 self._assert_index_col_x(t, i, columns=False) 2751 2752 def test_unnamed_column_exception(self): 2753 # this can occur in some declarative situations 2754 c = Column(Integer) 2755 idx = Index("q", c) 2756 m = MetaData() 2757 t = Table("t", m, Column("q")) 2758 assert_raises_message( 2759 exc.ArgumentError, 2760 "Can't add unnamed column to column collection", 2761 t.append_constraint, 2762 idx, 2763 ) 2764 2765 def test_non_attached_col_plus_string_expr(self): 2766 # another one that declarative can lead towards 2767 metadata = MetaData() 2768 2769 t1 = Table("a", metadata, Column("id", Integer)) 2770 2771 c2 = Column("x", Integer) 2772 2773 # if we do it here, no problem 2774 # t1.append_column(c2) 2775 2776 idx = Index("foo", c2, desc("foo")) 2777 2778 t1.append_column(c2) 2779 2780 self._assert_index_col_x(t1, idx, columns=True) 2781 2782 def test_column_associated_w_lowercase_table(self): 2783 from sqlalchemy import table 2784 2785 c = Column("x", Integer) 2786 table("foo", c) 2787 idx = Index("q", c) 2788 is_(idx.table, None) # lower-case-T table doesn't have indexes 2789 2790 def test_clauseelement_extraction_one(self): 2791 t = Table("t", MetaData(), Column("x", Integer), Column("y", Integer)) 2792 2793 class MyThing(object): 2794 def __clause_element__(self): 2795 return t.c.x + 5 2796 2797 idx = Index("foo", MyThing()) 2798 self._assert_index_col_x(t, idx) 2799 2800 def test_clauseelement_extraction_two(self): 2801 t = Table("t", MetaData(), Column("x", Integer), Column("y", Integer)) 2802 2803 class MyThing(object): 2804 def __clause_element__(self): 2805 return t.c.x + 5 2806 2807 idx = Index("bar", MyThing(), t.c.y) 2808 2809 eq_(set(t.indexes), set([idx])) 2810 2811 def test_clauseelement_extraction_three(self): 2812 t = Table("t", MetaData(), Column("x", Integer), Column("y", Integer)) 2813 2814 expr1 = t.c.x + 5 2815 2816 class MyThing(object): 2817 def __clause_element__(self): 2818 return expr1 2819 2820 idx = Index("bar", MyThing(), t.c.y) 2821 2822 is_(idx.expressions[0], expr1) 2823 is_(idx.expressions[1], t.c.y) 2824 2825 def test_table_references(self): 2826 t1, t2, t3 = self._single_fixture() 2827 assert list(t2.c.a.foreign_keys)[0].references(t1) 2828 assert not list(t2.c.a.foreign_keys)[0].references(t3) 2829 2830 def test_column_references(self): 2831 t1, t2, t3 = self._single_fixture() 2832 assert t2.c.a.references(t1.c.a) 2833 assert not t2.c.a.references(t3.c.a) 2834 assert not t2.c.a.references(t1.c.b) 2835 2836 def test_column_references_derived(self): 2837 t1, t2, t3 = self._single_fixture() 2838 s1 = tsa.select([tsa.select([t1]).alias()]) 2839 assert t2.c.a.references(s1.c.a) 2840 assert not t2.c.a.references(s1.c.b) 2841 2842 def test_copy_doesnt_reference(self): 2843 t1, t2, t3 = self._single_fixture() 2844 a2 = t2.c.a.copy() 2845 assert not a2.references(t1.c.a) 2846 assert not a2.references(t1.c.b) 2847 2848 def test_derived_column_references(self): 2849 t1, t2, t3 = self._single_fixture() 2850 s1 = tsa.select([tsa.select([t2]).alias()]) 2851 assert s1.c.a.references(t1.c.a) 2852 assert not s1.c.a.references(t1.c.b) 2853 2854 def test_referred_table_accessor(self): 2855 t1, t2, t3 = self._single_fixture() 2856 fkc = list(t2.foreign_key_constraints)[0] 2857 is_(fkc.referred_table, t1) 2858 2859 def test_referred_table_accessor_not_available(self): 2860 t1 = Table("t", MetaData(), Column("x", ForeignKey("q.id"))) 2861 fkc = list(t1.foreign_key_constraints)[0] 2862 assert_raises_message( 2863 exc.InvalidRequestError, 2864 "Foreign key associated with column 't.x' could not find " 2865 "table 'q' with which to generate a foreign key to target " 2866 "column 'id'", 2867 getattr, 2868 fkc, 2869 "referred_table", 2870 ) 2871 2872 def test_related_column_not_present_atfirst_ok(self): 2873 m = MetaData() 2874 base_table = Table("base", m, Column("id", Integer, primary_key=True)) 2875 fk = ForeignKey("base.q") 2876 derived_table = Table( 2877 "derived", m, Column("id", None, fk, primary_key=True) 2878 ) 2879 2880 base_table.append_column(Column("q", Integer)) 2881 assert fk.column is base_table.c.q 2882 assert isinstance(derived_table.c.id.type, Integer) 2883 2884 def test_related_column_not_present_atfirst_ok_onname(self): 2885 m = MetaData() 2886 base_table = Table("base", m, Column("id", Integer, primary_key=True)) 2887 fk = ForeignKey("base.q", link_to_name=True) 2888 derived_table = Table( 2889 "derived", m, Column("id", None, fk, primary_key=True) 2890 ) 2891 2892 base_table.append_column(Column("q", Integer, key="zz")) 2893 assert fk.column is base_table.c.zz 2894 assert isinstance(derived_table.c.id.type, Integer) 2895 2896 def test_related_column_not_present_atfirst_ok_linktoname_conflict(self): 2897 m = MetaData() 2898 base_table = Table("base", m, Column("id", Integer, primary_key=True)) 2899 fk = ForeignKey("base.q", link_to_name=True) 2900 derived_table = Table( 2901 "derived", m, Column("id", None, fk, primary_key=True) 2902 ) 2903 2904 base_table.append_column(Column("zz", Integer, key="q")) 2905 base_table.append_column(Column("q", Integer, key="zz")) 2906 assert fk.column is base_table.c.zz 2907 assert isinstance(derived_table.c.id.type, Integer) 2908 2909 def test_invalid_composite_fk_check_strings(self): 2910 m = MetaData() 2911 2912 assert_raises_message( 2913 exc.ArgumentError, 2914 r"ForeignKeyConstraint on t1\(x, y\) refers to " 2915 "multiple remote tables: t2 and t3", 2916 Table, 2917 "t1", 2918 m, 2919 Column("x", Integer), 2920 Column("y", Integer), 2921 ForeignKeyConstraint(["x", "y"], ["t2.x", "t3.y"]), 2922 ) 2923 2924 def test_invalid_composite_fk_check_columns(self): 2925 m = MetaData() 2926 2927 t2 = Table("t2", m, Column("x", Integer)) 2928 t3 = Table("t3", m, Column("y", Integer)) 2929 2930 assert_raises_message( 2931 exc.ArgumentError, 2932 r"ForeignKeyConstraint on t1\(x, y\) refers to " 2933 "multiple remote tables: t2 and t3", 2934 Table, 2935 "t1", 2936 m, 2937 Column("x", Integer), 2938 Column("y", Integer), 2939 ForeignKeyConstraint(["x", "y"], [t2.c.x, t3.c.y]), 2940 ) 2941 2942 def test_invalid_composite_fk_check_columns_notattached(self): 2943 m = MetaData() 2944 x = Column("x", Integer) 2945 y = Column("y", Integer) 2946 2947 # no error is raised for this one right now. 2948 # which is a minor bug. 2949 Table( 2950 "t1", 2951 m, 2952 Column("x", Integer), 2953 Column("y", Integer), 2954 ForeignKeyConstraint(["x", "y"], [x, y]), 2955 ) 2956 2957 Table("t2", m, x) 2958 Table("t3", m, y) 2959 2960 def test_constraint_copied_to_proxy_ok(self): 2961 m = MetaData() 2962 Table("t1", m, Column("id", Integer, primary_key=True)) 2963 t2 = Table( 2964 "t2", 2965 m, 2966 Column("id", Integer, ForeignKey("t1.id"), primary_key=True), 2967 ) 2968 2969 s = tsa.select([t2]) 2970 t2fk = list(t2.c.id.foreign_keys)[0] 2971 sfk = list(s.c.id.foreign_keys)[0] 2972 2973 # the two FKs share the ForeignKeyConstraint 2974 is_(t2fk.constraint, sfk.constraint) 2975 2976 # but the ForeignKeyConstraint isn't 2977 # aware of the select's FK 2978 eq_(t2fk.constraint.elements, [t2fk]) 2979 2980 def test_type_propagate_composite_fk_string(self): 2981 metadata = MetaData() 2982 Table( 2983 "a", 2984 metadata, 2985 Column("key1", Integer, primary_key=True), 2986 Column("key2", String(40), primary_key=True), 2987 ) 2988 2989 b = Table( 2990 "b", 2991 metadata, 2992 Column("a_key1", None), 2993 Column("a_key2", None), 2994 Column("id", Integer, primary_key=True), 2995 ForeignKeyConstraint(["a_key1", "a_key2"], ["a.key1", "a.key2"]), 2996 ) 2997 2998 assert isinstance(b.c.a_key1.type, Integer) 2999 assert isinstance(b.c.a_key2.type, String) 3000 3001 def test_type_propagate_composite_fk_col(self): 3002 metadata = MetaData() 3003 a = Table( 3004 "a", 3005 metadata, 3006 Column("key1", Integer, primary_key=True), 3007 Column("key2", String(40), primary_key=True), 3008 ) 3009 3010 b = Table( 3011 "b", 3012 metadata, 3013 Column("a_key1", None), 3014 Column("a_key2", None), 3015 Column("id", Integer, primary_key=True), 3016 ForeignKeyConstraint(["a_key1", "a_key2"], [a.c.key1, a.c.key2]), 3017 ) 3018 3019 assert isinstance(b.c.a_key1.type, Integer) 3020 assert isinstance(b.c.a_key2.type, String) 3021 3022 def test_type_propagate_standalone_fk_string(self): 3023 metadata = MetaData() 3024 Table("a", metadata, Column("key1", Integer, primary_key=True)) 3025 3026 b = Table("b", metadata, Column("a_key1", None, ForeignKey("a.key1"))) 3027 3028 assert isinstance(b.c.a_key1.type, Integer) 3029 3030 def test_type_propagate_standalone_fk_col(self): 3031 metadata = MetaData() 3032 a = Table("a", metadata, Column("key1", Integer, primary_key=True)) 3033 3034 b = Table("b", metadata, Column("a_key1", None, ForeignKey(a.c.key1))) 3035 3036 assert isinstance(b.c.a_key1.type, Integer) 3037 3038 def test_type_propagate_chained_string_source_first(self): 3039 metadata = MetaData() 3040 Table("a", metadata, Column("key1", Integer, primary_key=True)) 3041 3042 b = Table("b", metadata, Column("a_key1", None, ForeignKey("a.key1"))) 3043 3044 c = Table( 3045 "c", metadata, Column("b_key1", None, ForeignKey("b.a_key1")) 3046 ) 3047 3048 assert isinstance(b.c.a_key1.type, Integer) 3049 assert isinstance(c.c.b_key1.type, Integer) 3050 3051 def test_type_propagate_chained_string_source_last(self): 3052 metadata = MetaData() 3053 3054 b = Table("b", metadata, Column("a_key1", None, ForeignKey("a.key1"))) 3055 3056 c = Table( 3057 "c", metadata, Column("b_key1", None, ForeignKey("b.a_key1")) 3058 ) 3059 3060 Table("a", metadata, Column("key1", Integer, primary_key=True)) 3061 3062 assert isinstance(b.c.a_key1.type, Integer) 3063 assert isinstance(c.c.b_key1.type, Integer) 3064 3065 def test_type_propagate_chained_string_source_last_onname(self): 3066 metadata = MetaData() 3067 3068 b = Table( 3069 "b", 3070 metadata, 3071 Column( 3072 "a_key1", 3073 None, 3074 ForeignKey("a.key1", link_to_name=True), 3075 key="ak1", 3076 ), 3077 ) 3078 3079 c = Table( 3080 "c", 3081 metadata, 3082 Column( 3083 "b_key1", 3084 None, 3085 ForeignKey("b.a_key1", link_to_name=True), 3086 key="bk1", 3087 ), 3088 ) 3089 3090 Table( 3091 "a", metadata, Column("key1", Integer, primary_key=True, key="ak1") 3092 ) 3093 3094 assert isinstance(b.c.ak1.type, Integer) 3095 assert isinstance(c.c.bk1.type, Integer) 3096 3097 def test_type_propagate_chained_string_source_last_onname_conflict(self): 3098 metadata = MetaData() 3099 3100 b = Table( 3101 "b", 3102 metadata, 3103 # b.c.key1 -> a.c.key1 -> String 3104 Column( 3105 "ak1", 3106 None, 3107 ForeignKey("a.key1", link_to_name=False), 3108 key="key1", 3109 ), 3110 # b.c.ak1 -> a.c.ak1 -> Integer 3111 Column( 3112 "a_key1", 3113 None, 3114 ForeignKey("a.key1", link_to_name=True), 3115 key="ak1", 3116 ), 3117 ) 3118 3119 c = Table( 3120 "c", 3121 metadata, 3122 # c.c.b_key1 -> b.c.ak1 -> Integer 3123 Column("b_key1", None, ForeignKey("b.ak1", link_to_name=False)), 3124 # c.c.b_ak1 -> b.c.ak1 3125 Column("b_ak1", None, ForeignKey("b.ak1", link_to_name=True)), 3126 ) 3127 3128 Table( 3129 "a", 3130 metadata, 3131 # a.c.key1 3132 Column("ak1", String, key="key1"), 3133 # a.c.ak1 3134 Column("key1", Integer, primary_key=True, key="ak1"), 3135 ) 3136 3137 assert isinstance(b.c.key1.type, String) 3138 assert isinstance(b.c.ak1.type, Integer) 3139 3140 assert isinstance(c.c.b_ak1.type, String) 3141 assert isinstance(c.c.b_key1.type, Integer) 3142 3143 def test_type_propagate_chained_col_orig_first(self): 3144 metadata = MetaData() 3145 a = Table("a", metadata, Column("key1", Integer, primary_key=True)) 3146 3147 b = Table("b", metadata, Column("a_key1", None, ForeignKey(a.c.key1))) 3148 3149 c = Table( 3150 "c", metadata, Column("b_key1", None, ForeignKey(b.c.a_key1)) 3151 ) 3152 3153 assert isinstance(b.c.a_key1.type, Integer) 3154 assert isinstance(c.c.b_key1.type, Integer) 3155 3156 def test_column_accessor_col(self): 3157 c1 = Column("x", Integer) 3158 fk = ForeignKey(c1) 3159 is_(fk.column, c1) 3160 3161 def test_column_accessor_clause_element(self): 3162 c1 = Column("x", Integer) 3163 3164 class CThing(object): 3165 def __init__(self, c): 3166 self.c = c 3167 3168 def __clause_element__(self): 3169 return self.c 3170 3171 fk = ForeignKey(CThing(c1)) 3172 is_(fk.column, c1) 3173 3174 def test_column_accessor_string_no_parent(self): 3175 fk = ForeignKey("sometable.somecol") 3176 assert_raises_message( 3177 exc.InvalidRequestError, 3178 "this ForeignKey object does not yet have a parent " 3179 "Column associated with it.", 3180 getattr, 3181 fk, 3182 "column", 3183 ) 3184 3185 def test_column_accessor_string_no_parent_table(self): 3186 fk = ForeignKey("sometable.somecol") 3187 Column("x", fk) 3188 assert_raises_message( 3189 exc.InvalidRequestError, 3190 "this ForeignKey's parent column is not yet " 3191 "associated with a Table.", 3192 getattr, 3193 fk, 3194 "column", 3195 ) 3196 3197 def test_column_accessor_string_no_target_table(self): 3198 fk = ForeignKey("sometable.somecol") 3199 c1 = Column("x", fk) 3200 Table("t", MetaData(), c1) 3201 assert_raises_message( 3202 exc.NoReferencedTableError, 3203 "Foreign key associated with column 't.x' could not find " 3204 "table 'sometable' with which to generate a " 3205 "foreign key to target column 'somecol'", 3206 getattr, 3207 fk, 3208 "column", 3209 ) 3210 3211 def test_column_accessor_string_no_target_column(self): 3212 fk = ForeignKey("sometable.somecol") 3213 c1 = Column("x", fk) 3214 m = MetaData() 3215 Table("t", m, c1) 3216 Table("sometable", m, Column("notsomecol", Integer)) 3217 assert_raises_message( 3218 exc.NoReferencedColumnError, 3219 "Could not initialize target column for ForeignKey " 3220 "'sometable.somecol' on table 't': " 3221 "table 'sometable' has no column named 'somecol'", 3222 getattr, 3223 fk, 3224 "column", 3225 ) 3226 3227 def test_remove_table_fk_bookkeeping(self): 3228 metadata = MetaData() 3229 fk = ForeignKey("t1.x") 3230 t2 = Table("t2", metadata, Column("y", Integer, fk)) 3231 t3 = Table("t3", metadata, Column("y", Integer, ForeignKey("t1.x"))) 3232 3233 assert t2.key in metadata.tables 3234 assert ("t1", "x") in metadata._fk_memos 3235 3236 metadata.remove(t2) 3237 3238 # key is removed 3239 assert t2.key not in metadata.tables 3240 3241 # the memo for the FK is still there 3242 assert ("t1", "x") in metadata._fk_memos 3243 3244 # fk is not in the collection 3245 assert fk not in metadata._fk_memos[("t1", "x")] 3246 3247 # make the referenced table 3248 t1 = Table("t1", metadata, Column("x", Integer)) 3249 3250 # t2 tells us exactly what's wrong 3251 assert_raises_message( 3252 exc.InvalidRequestError, 3253 "Table t2 is no longer associated with its parent MetaData", 3254 getattr, 3255 fk, 3256 "column", 3257 ) 3258 3259 # t3 is unaffected 3260 assert t3.c.y.references(t1.c.x) 3261 3262 # remove twice OK 3263 metadata.remove(t2) 3264 3265 def test_double_fk_usage_raises(self): 3266 f = ForeignKey("b.id") 3267 3268 Column("x", Integer, f) 3269 assert_raises(exc.InvalidRequestError, Column, "y", Integer, f) 3270 3271 def test_auto_append_constraint(self): 3272 m = MetaData() 3273 3274 t = Table("tbl", m, Column("a", Integer), Column("b", Integer)) 3275 3276 t2 = Table("t2", m, Column("a", Integer), Column("b", Integer)) 3277 3278 for c in ( 3279 UniqueConstraint(t.c.a), 3280 CheckConstraint(t.c.a > 5), 3281 ForeignKeyConstraint([t.c.a], [t2.c.a]), 3282 PrimaryKeyConstraint(t.c.a), 3283 ): 3284 assert c in t.constraints 3285 t.append_constraint(c) 3286 assert c in t.constraints 3287 3288 c = Index("foo", t.c.a) 3289 assert c in t.indexes 3290 3291 def test_auto_append_lowercase_table(self): 3292 from sqlalchemy import table, column 3293 3294 t = table("t", column("a")) 3295 t2 = table("t2", column("a")) 3296 for c in ( 3297 UniqueConstraint(t.c.a), 3298 CheckConstraint(t.c.a > 5), 3299 ForeignKeyConstraint([t.c.a], [t2.c.a]), 3300 PrimaryKeyConstraint(t.c.a), 3301 Index("foo", t.c.a), 3302 ): 3303 assert True 3304 3305 def test_tometadata_ok(self): 3306 m = MetaData() 3307 3308 t = Table("tbl", m, Column("a", Integer), Column("b", Integer)) 3309 3310 t2 = Table("t2", m, Column("a", Integer), Column("b", Integer)) 3311 3312 UniqueConstraint(t.c.a) 3313 CheckConstraint(t.c.a > 5) 3314 ForeignKeyConstraint([t.c.a], [t2.c.a]) 3315 PrimaryKeyConstraint(t.c.a) 3316 3317 m2 = MetaData() 3318 3319 t3 = t.tometadata(m2) 3320 3321 eq_(len(t3.constraints), 4) 3322 3323 for c in t3.constraints: 3324 assert c.table is t3 3325 3326 def test_check_constraint_copy(self): 3327 m = MetaData() 3328 t = Table("tbl", m, Column("a", Integer), Column("b", Integer)) 3329 ck = CheckConstraint(t.c.a > 5) 3330 ck2 = ck.copy() 3331 assert ck in t.constraints 3332 assert ck2 not in t.constraints 3333 3334 def test_ambig_check_constraint_auto_append(self): 3335 m = MetaData() 3336 3337 t = Table("tbl", m, Column("a", Integer), Column("b", Integer)) 3338 3339 t2 = Table("t2", m, Column("a", Integer), Column("b", Integer)) 3340 c = CheckConstraint(t.c.a > t2.c.b) 3341 assert c not in t.constraints 3342 assert c not in t2.constraints 3343 3344 def test_auto_append_ck_on_col_attach_one(self): 3345 m = MetaData() 3346 3347 a = Column("a", Integer) 3348 b = Column("b", Integer) 3349 ck = CheckConstraint(a > b) 3350 3351 t = Table("tbl", m, a, b) 3352 assert ck in t.constraints 3353 3354 def test_auto_append_ck_on_col_attach_two(self): 3355 m = MetaData() 3356 3357 a = Column("a", Integer) 3358 b = Column("b", Integer) 3359 c = Column("c", Integer) 3360 ck = CheckConstraint(a > b + c) 3361 3362 t = Table("tbl", m, a) 3363 assert ck not in t.constraints 3364 3365 t.append_column(b) 3366 assert ck not in t.constraints 3367 3368 t.append_column(c) 3369 assert ck in t.constraints 3370 3371 def test_auto_append_ck_on_col_attach_three(self): 3372 m = MetaData() 3373 3374 a = Column("a", Integer) 3375 b = Column("b", Integer) 3376 c = Column("c", Integer) 3377 ck = CheckConstraint(a > b + c) 3378 3379 t = Table("tbl", m, a) 3380 assert ck not in t.constraints 3381 3382 t.append_column(b) 3383 assert ck not in t.constraints 3384 3385 t2 = Table("t2", m) 3386 t2.append_column(c) 3387 3388 # two different tables, so CheckConstraint does nothing. 3389 assert ck not in t.constraints 3390 3391 def test_auto_append_uq_on_col_attach_one(self): 3392 m = MetaData() 3393 3394 a = Column("a", Integer) 3395 b = Column("b", Integer) 3396 uq = UniqueConstraint(a, b) 3397 3398 t = Table("tbl", m, a, b) 3399 assert uq in t.constraints 3400 3401 def test_auto_append_uq_on_col_attach_two(self): 3402 m = MetaData() 3403 3404 a = Column("a", Integer) 3405 b = Column("b", Integer) 3406 c = Column("c", Integer) 3407 uq = UniqueConstraint(a, b, c) 3408 3409 t = Table("tbl", m, a) 3410 assert uq not in t.constraints 3411 3412 t.append_column(b) 3413 assert uq not in t.constraints 3414 3415 t.append_column(c) 3416 assert uq in t.constraints 3417 3418 def test_auto_append_uq_on_col_attach_three(self): 3419 m = MetaData() 3420 3421 a = Column("a", Integer) 3422 b = Column("b", Integer) 3423 c = Column("c", Integer) 3424 uq = UniqueConstraint(a, b, c) 3425 3426 t = Table("tbl", m, a) 3427 assert uq not in t.constraints 3428 3429 t.append_column(b) 3430 assert uq not in t.constraints 3431 3432 t2 = Table("t2", m) 3433 3434 # two different tables, so UniqueConstraint raises 3435 assert_raises_message( 3436 exc.ArgumentError, 3437 r"Column\(s\) 't2\.c' are not part of table 'tbl'\.", 3438 t2.append_column, 3439 c, 3440 ) 3441 3442 def test_auto_append_uq_on_col_attach_four(self): 3443 """Test that a uniqueconstraint that names Column and string names 3444 won't autoattach using deferred column attachment. 3445 3446 """ 3447 m = MetaData() 3448 3449 a = Column("a", Integer) 3450 b = Column("b", Integer) 3451 c = Column("c", Integer) 3452 uq = UniqueConstraint(a, "b", "c") 3453 3454 t = Table("tbl", m, a) 3455 assert uq not in t.constraints 3456 3457 t.append_column(b) 3458 assert uq not in t.constraints 3459 3460 t.append_column(c) 3461 3462 # we don't track events for previously unknown columns 3463 # named 'c' to be attached 3464 assert uq not in t.constraints 3465 3466 t.append_constraint(uq) 3467 3468 assert uq in t.constraints 3469 3470 eq_( 3471 [cn for cn in t.constraints if isinstance(cn, UniqueConstraint)], 3472 [uq], 3473 ) 3474 3475 def test_auto_append_uq_on_col_attach_five(self): 3476 """Test that a uniqueconstraint that names Column and string names 3477 *will* autoattach if the table has all those names up front. 3478 3479 """ 3480 m = MetaData() 3481 3482 a = Column("a", Integer) 3483 b = Column("b", Integer) 3484 c = Column("c", Integer) 3485 3486 t = Table("tbl", m, a, c, b) 3487 3488 uq = UniqueConstraint(a, "b", "c") 3489 3490 assert uq in t.constraints 3491 3492 t.append_constraint(uq) 3493 3494 assert uq in t.constraints 3495 3496 eq_( 3497 [cn for cn in t.constraints if isinstance(cn, UniqueConstraint)], 3498 [uq], 3499 ) 3500 3501 def test_index_asserts_cols_standalone(self): 3502 metadata = MetaData() 3503 3504 t1 = Table("t1", metadata, Column("x", Integer)) 3505 t2 = Table("t2", metadata, Column("y", Integer)) 3506 assert_raises_message( 3507 exc.ArgumentError, 3508 r"Column\(s\) 't2.y' are not part of table 't1'.", 3509 Index, 3510 "bar", 3511 t1.c.x, 3512 t2.c.y, 3513 ) 3514 3515 def test_index_asserts_cols_inline(self): 3516 metadata = MetaData() 3517 3518 t1 = Table("t1", metadata, Column("x", Integer)) 3519 assert_raises_message( 3520 exc.ArgumentError, 3521 "Index 'bar' is against table 't1', and " 3522 "cannot be associated with table 't2'.", 3523 Table, 3524 "t2", 3525 metadata, 3526 Column("y", Integer), 3527 Index("bar", t1.c.x), 3528 ) 3529 3530 def test_raise_index_nonexistent_name(self): 3531 m = MetaData() 3532 # the KeyError isn't ideal here, a nicer message 3533 # perhaps 3534 assert_raises( 3535 KeyError, Table, "t", m, Column("x", Integer), Index("foo", "q") 3536 ) 3537 3538 def test_raise_not_a_column(self): 3539 assert_raises(exc.ArgumentError, Index, "foo", 5) 3540 3541 def test_raise_expr_no_column(self): 3542 idx = Index("foo", func.lower(5)) 3543 3544 assert_raises_message( 3545 exc.CompileError, 3546 "Index 'foo' is not associated with any table.", 3547 schema.CreateIndex(idx).compile, 3548 dialect=testing.db.dialect, 3549 ) 3550 assert_raises_message( 3551 exc.CompileError, 3552 "Index 'foo' is not associated with any table.", 3553 schema.CreateIndex(idx).compile, 3554 ) 3555 3556 def test_no_warning_w_no_columns(self): 3557 idx = Index(name="foo") 3558 3559 assert_raises_message( 3560 exc.CompileError, 3561 "Index 'foo' is not associated with any table.", 3562 schema.CreateIndex(idx).compile, 3563 dialect=testing.db.dialect, 3564 ) 3565 assert_raises_message( 3566 exc.CompileError, 3567 "Index 'foo' is not associated with any table.", 3568 schema.CreateIndex(idx).compile, 3569 ) 3570 3571 def test_raise_clauseelement_not_a_column(self): 3572 m = MetaData() 3573 t2 = Table("t2", m, Column("x", Integer)) 3574 3575 class SomeClass(object): 3576 def __clause_element__(self): 3577 return t2 3578 3579 assert_raises_message( 3580 exc.ArgumentError, 3581 r"Element Table\('t2', .* is not a string name or column element", 3582 Index, 3583 "foo", 3584 SomeClass(), 3585 ) 3586 3587 3588class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): 3589 3590 """Test Column() construction.""" 3591 3592 __dialect__ = "default" 3593 3594 def columns(self): 3595 return [ 3596 Column(Integer), 3597 Column("b", Integer), 3598 Column(Integer), 3599 Column("d", Integer), 3600 Column(Integer, name="e"), 3601 Column(type_=Integer), 3602 Column(Integer()), 3603 Column("h", Integer()), 3604 Column(type_=Integer()), 3605 ] 3606 3607 def test_basic(self): 3608 c = self.columns() 3609 3610 for i, v in ((0, "a"), (2, "c"), (5, "f"), (6, "g"), (8, "i")): 3611 c[i].name = v 3612 c[i].key = v 3613 del i, v 3614 3615 tbl = Table("table", MetaData(), *c) 3616 3617 for i, col in enumerate(tbl.c): 3618 assert col.name == c[i].name 3619 3620 def test_name_none(self): 3621 3622 c = Column(Integer) 3623 assert_raises_message( 3624 exc.ArgumentError, 3625 "Column must be constructed with a non-blank name or assign a " 3626 "non-blank .name ", 3627 Table, 3628 "t", 3629 MetaData(), 3630 c, 3631 ) 3632 3633 def test_name_blank(self): 3634 3635 c = Column("", Integer) 3636 assert_raises_message( 3637 exc.ArgumentError, 3638 "Column must be constructed with a non-blank name or assign a " 3639 "non-blank .name ", 3640 Table, 3641 "t", 3642 MetaData(), 3643 c, 3644 ) 3645 3646 def test_no_shared_column_schema(self): 3647 c = Column("x", Integer) 3648 Table("t", MetaData(), c) 3649 3650 assert_raises_message( 3651 exc.ArgumentError, 3652 "Column object 'x' already assigned to Table 't'", 3653 Table, 3654 "q", 3655 MetaData(), 3656 c, 3657 ) 3658 3659 def test_no_shared_column_sql(self): 3660 c = column("x", Integer) 3661 table("t", c) 3662 3663 assert_raises_message( 3664 exc.ArgumentError, 3665 "column object 'x' already assigned to table 't'", 3666 table, 3667 "q", 3668 c, 3669 ) 3670 3671 def test_incomplete_key(self): 3672 c = Column(Integer) 3673 assert c.name is None 3674 assert c.key is None 3675 3676 c.name = "named" 3677 Table("t", MetaData(), c) 3678 3679 assert c.name == "named" 3680 assert c.name == c.key 3681 3682 def test_unique_index_flags_default_to_none(self): 3683 c = Column(Integer) 3684 eq_(c.unique, None) 3685 eq_(c.index, None) 3686 3687 c = Column("c", Integer, index=True) 3688 eq_(c.unique, None) 3689 eq_(c.index, True) 3690 3691 t = Table("t", MetaData(), c) 3692 eq_(list(t.indexes)[0].unique, False) 3693 3694 c = Column(Integer, unique=True) 3695 eq_(c.unique, True) 3696 eq_(c.index, None) 3697 3698 c = Column("c", Integer, index=True, unique=True) 3699 eq_(c.unique, True) 3700 eq_(c.index, True) 3701 3702 t = Table("t", MetaData(), c) 3703 eq_(list(t.indexes)[0].unique, True) 3704 3705 def test_bogus(self): 3706 assert_raises(exc.ArgumentError, Column, "foo", name="bar") 3707 assert_raises( 3708 exc.ArgumentError, Column, "foo", Integer, type_=Integer() 3709 ) 3710 3711 def test_custom_subclass_proxy(self): 3712 """test proxy generation of a Column subclass, can be compiled.""" 3713 3714 from sqlalchemy.schema import Column 3715 from sqlalchemy.ext.compiler import compiles 3716 from sqlalchemy.sql import select 3717 3718 class MyColumn(Column): 3719 def _constructor(self, name, type_, **kw): 3720 kw["name"] = name 3721 return MyColumn(type_, **kw) 3722 3723 def __init__(self, type_, **kw): 3724 Column.__init__(self, type_, **kw) 3725 3726 def my_goofy_thing(self): 3727 return "hi" 3728 3729 @compiles(MyColumn) 3730 def goofy(element, compiler, **kw): 3731 s = compiler.visit_column(element, **kw) 3732 return s + "-" 3733 3734 id_ = MyColumn(Integer, primary_key=True) 3735 id_.name = "id" 3736 name = MyColumn(String) 3737 name.name = "name" 3738 t1 = Table("foo", MetaData(), id_, name) 3739 3740 # goofy thing 3741 eq_(t1.c.name.my_goofy_thing(), "hi") 3742 3743 # create proxy 3744 s = select([t1.select().alias()]) 3745 3746 # proxy has goofy thing 3747 eq_(s.c.name.my_goofy_thing(), "hi") 3748 3749 # compile works 3750 self.assert_compile( 3751 select([t1.select().alias()]), 3752 "SELECT anon_1.id-, anon_1.name- FROM " 3753 "(SELECT foo.id- AS id, foo.name- AS name " 3754 "FROM foo) AS anon_1", 3755 ) 3756 3757 def test_custom_subclass_proxy_typeerror(self): 3758 from sqlalchemy.schema import Column 3759 from sqlalchemy.sql import select 3760 3761 class MyColumn(Column): 3762 def __init__(self, type_, **kw): 3763 Column.__init__(self, type_, **kw) 3764 3765 id_ = MyColumn(Integer, primary_key=True) 3766 id_.name = "id" 3767 name = MyColumn(String) 3768 name.name = "name" 3769 t1 = Table("foo", MetaData(), id_, name) 3770 assert_raises_message( 3771 TypeError, 3772 "Could not create a copy of this <class " 3773 "'test.sql.test_metadata..*MyColumn'> " 3774 "object. Ensure the class includes a _constructor()", 3775 getattr, 3776 select([t1.select().alias()]), 3777 "c", 3778 ) 3779 3780 def test_custom_create(self): 3781 from sqlalchemy.ext.compiler import compiles, deregister 3782 3783 @compiles(schema.CreateColumn) 3784 def compile_(element, compiler, **kw): 3785 column = element.element 3786 3787 if "special" not in column.info: 3788 return compiler.visit_create_column(element, **kw) 3789 3790 text = "%s SPECIAL DIRECTIVE %s" % ( 3791 column.name, 3792 compiler.type_compiler.process(column.type), 3793 ) 3794 default = compiler.get_column_default_string(column) 3795 if default is not None: 3796 text += " DEFAULT " + default 3797 3798 if not column.nullable: 3799 text += " NOT NULL" 3800 3801 if column.constraints: 3802 text += " ".join( 3803 compiler.process(const) for const in column.constraints 3804 ) 3805 return text 3806 3807 t = Table( 3808 "mytable", 3809 MetaData(), 3810 Column("x", Integer, info={"special": True}, primary_key=True), 3811 Column("y", String(50)), 3812 Column("z", String(20), info={"special": True}), 3813 ) 3814 3815 self.assert_compile( 3816 schema.CreateTable(t), 3817 "CREATE TABLE mytable (x SPECIAL DIRECTIVE INTEGER " 3818 "NOT NULL, y VARCHAR(50), " 3819 "z SPECIAL DIRECTIVE VARCHAR(20), PRIMARY KEY (x))", 3820 ) 3821 3822 deregister(schema.CreateColumn) 3823 3824 3825class ColumnDefaultsTest(fixtures.TestBase): 3826 3827 """test assignment of default fixures to columns""" 3828 3829 def _fixture(self, *arg, **kw): 3830 return Column("x", Integer, *arg, **kw) 3831 3832 def test_server_default_positional(self): 3833 target = schema.DefaultClause("y") 3834 c = self._fixture(target) 3835 assert c.server_default is target 3836 assert target.column is c 3837 3838 def test_onupdate_default_not_server_default_one(self): 3839 target1 = schema.DefaultClause("y") 3840 target2 = schema.DefaultClause("z") 3841 3842 c = self._fixture(server_default=target1, server_onupdate=target2) 3843 eq_(c.server_default.arg, "y") 3844 eq_(c.server_onupdate.arg, "z") 3845 3846 def test_onupdate_default_not_server_default_two(self): 3847 target1 = schema.DefaultClause("y", for_update=True) 3848 target2 = schema.DefaultClause("z", for_update=True) 3849 3850 c = self._fixture(server_default=target1, server_onupdate=target2) 3851 eq_(c.server_default.arg, "y") 3852 eq_(c.server_onupdate.arg, "z") 3853 3854 def test_onupdate_default_not_server_default_three(self): 3855 target1 = schema.DefaultClause("y", for_update=False) 3856 target2 = schema.DefaultClause("z", for_update=True) 3857 3858 c = self._fixture(target1, target2) 3859 eq_(c.server_default.arg, "y") 3860 eq_(c.server_onupdate.arg, "z") 3861 3862 def test_onupdate_default_not_server_default_four(self): 3863 target1 = schema.DefaultClause("y", for_update=False) 3864 3865 c = self._fixture(server_onupdate=target1) 3866 is_(c.server_default, None) 3867 eq_(c.server_onupdate.arg, "y") 3868 3869 def test_server_default_keyword_as_schemaitem(self): 3870 target = schema.DefaultClause("y") 3871 c = self._fixture(server_default=target) 3872 assert c.server_default is target 3873 assert target.column is c 3874 3875 def test_server_default_keyword_as_clause(self): 3876 target = "y" 3877 c = self._fixture(server_default=target) 3878 assert c.server_default.arg == target 3879 assert c.server_default.column is c 3880 3881 def test_server_default_onupdate_positional(self): 3882 target = schema.DefaultClause("y", for_update=True) 3883 c = self._fixture(target) 3884 assert c.server_onupdate is target 3885 assert target.column is c 3886 3887 def test_server_default_onupdate_keyword_as_schemaitem(self): 3888 target = schema.DefaultClause("y", for_update=True) 3889 c = self._fixture(server_onupdate=target) 3890 assert c.server_onupdate is target 3891 assert target.column is c 3892 3893 def test_server_default_onupdate_keyword_as_clause(self): 3894 target = "y" 3895 c = self._fixture(server_onupdate=target) 3896 assert c.server_onupdate.arg == target 3897 assert c.server_onupdate.column is c 3898 3899 def test_column_default_positional(self): 3900 target = schema.ColumnDefault("y") 3901 c = self._fixture(target) 3902 assert c.default is target 3903 assert target.column is c 3904 3905 def test_column_default_keyword_as_schemaitem(self): 3906 target = schema.ColumnDefault("y") 3907 c = self._fixture(default=target) 3908 assert c.default is target 3909 assert target.column is c 3910 3911 def test_column_default_keyword_as_clause(self): 3912 target = "y" 3913 c = self._fixture(default=target) 3914 assert c.default.arg == target 3915 assert c.default.column is c 3916 3917 def test_column_default_onupdate_positional(self): 3918 target = schema.ColumnDefault("y", for_update=True) 3919 c = self._fixture(target) 3920 assert c.onupdate is target 3921 assert target.column is c 3922 3923 def test_column_default_onupdate_keyword_as_schemaitem(self): 3924 target = schema.ColumnDefault("y", for_update=True) 3925 c = self._fixture(onupdate=target) 3926 assert c.onupdate is target 3927 assert target.column is c 3928 3929 def test_column_default_onupdate_keyword_as_clause(self): 3930 target = "y" 3931 c = self._fixture(onupdate=target) 3932 assert c.onupdate.arg == target 3933 assert c.onupdate.column is c 3934 3935 3936class ColumnOptionsTest(fixtures.TestBase): 3937 def test_default_generators(self): 3938 g1, g2 = Sequence("foo_id_seq"), ColumnDefault("f5") 3939 assert Column(String, default=g1).default is g1 3940 assert Column(String, onupdate=g1).onupdate is g1 3941 assert Column(String, default=g2).default is g2 3942 assert Column(String, onupdate=g2).onupdate is g2 3943 3944 def _null_type_error(self, col): 3945 t = Table("t", MetaData(), col) 3946 assert_raises_message( 3947 exc.CompileError, 3948 r"\(in table 't', column 'foo'\): Can't generate DDL for NullType", 3949 schema.CreateTable(t).compile, 3950 ) 3951 3952 def _no_name_error(self, col): 3953 assert_raises_message( 3954 exc.ArgumentError, 3955 "Column must be constructed with a non-blank name or " 3956 "assign a non-blank .name", 3957 Table, 3958 "t", 3959 MetaData(), 3960 col, 3961 ) 3962 3963 def _no_error(self, col): 3964 m = MetaData() 3965 Table("bar", m, Column("id", Integer)) 3966 t = Table("t", m, col) 3967 schema.CreateTable(t).compile() 3968 3969 def test_argument_signatures(self): 3970 self._no_name_error(Column()) 3971 self._null_type_error(Column("foo")) 3972 self._no_name_error(Column(default="foo")) 3973 3974 self._no_name_error(Column(Sequence("a"))) 3975 self._null_type_error(Column("foo", default="foo")) 3976 3977 self._null_type_error(Column("foo", Sequence("a"))) 3978 3979 self._no_name_error(Column(ForeignKey("bar.id"))) 3980 3981 self._no_error(Column("foo", ForeignKey("bar.id"))) 3982 3983 self._no_name_error(Column(ForeignKey("bar.id"), default="foo")) 3984 3985 self._no_name_error(Column(ForeignKey("bar.id"), Sequence("a"))) 3986 self._no_error(Column("foo", ForeignKey("bar.id"), default="foo")) 3987 self._no_error(Column("foo", ForeignKey("bar.id"), Sequence("a"))) 3988 3989 def test_column_info(self): 3990 3991 c1 = Column("foo", String, info={"x": "y"}) 3992 c2 = Column("bar", String, info={}) 3993 c3 = Column("bat", String) 3994 assert c1.info == {"x": "y"} 3995 assert c2.info == {} 3996 assert c3.info == {} 3997 3998 for c in (c1, c2, c3): 3999 c.info["bar"] = "zip" 4000 assert c.info["bar"] == "zip" 4001 4002 4003class CatchAllEventsTest(fixtures.RemovesEvents, fixtures.TestBase): 4004 def test_all_events(self): 4005 canary = [] 4006 4007 def before_attach(obj, parent): 4008 canary.append( 4009 "%s->%s" % (obj.__class__.__name__, parent.__class__.__name__) 4010 ) 4011 4012 def after_attach(obj, parent): 4013 canary.append("%s->%s" % (obj.__class__.__name__, parent)) 4014 4015 self.event_listen( 4016 schema.SchemaItem, "before_parent_attach", before_attach 4017 ) 4018 self.event_listen( 4019 schema.SchemaItem, "after_parent_attach", after_attach 4020 ) 4021 4022 m = MetaData() 4023 Table( 4024 "t1", 4025 m, 4026 Column("id", Integer, Sequence("foo_id"), primary_key=True), 4027 Column("bar", String, ForeignKey("t2.id")), 4028 ) 4029 Table("t2", m, Column("id", Integer, primary_key=True)) 4030 4031 eq_( 4032 canary, 4033 [ 4034 "Sequence->Column", 4035 "Sequence->id", 4036 "ForeignKey->Column", 4037 "ForeignKey->bar", 4038 "Table->MetaData", 4039 "PrimaryKeyConstraint->Table", 4040 "PrimaryKeyConstraint->t1", 4041 "Column->Table", 4042 "Column->t1", 4043 "Column->Table", 4044 "Column->t1", 4045 "ForeignKeyConstraint->Table", 4046 "ForeignKeyConstraint->t1", 4047 "Table->MetaData(bind=None)", 4048 "Table->MetaData", 4049 "PrimaryKeyConstraint->Table", 4050 "PrimaryKeyConstraint->t2", 4051 "Column->Table", 4052 "Column->t2", 4053 "Table->MetaData(bind=None)", 4054 ], 4055 ) 4056 4057 def test_events_per_constraint(self): 4058 canary = [] 4059 4060 def evt(target): 4061 def before_attach(obj, parent): 4062 canary.append( 4063 "%s->%s" % (target.__name__, parent.__class__.__name__) 4064 ) 4065 4066 def after_attach(obj, parent): 4067 assert hasattr(obj, "name") # so we can change it 4068 canary.append("%s->%s" % (target.__name__, parent)) 4069 4070 self.event_listen(target, "before_parent_attach", before_attach) 4071 self.event_listen(target, "after_parent_attach", after_attach) 4072 4073 for target in [ 4074 schema.ForeignKeyConstraint, 4075 schema.PrimaryKeyConstraint, 4076 schema.UniqueConstraint, 4077 schema.CheckConstraint, 4078 schema.Index, 4079 ]: 4080 evt(target) 4081 4082 m = MetaData() 4083 Table( 4084 "t1", 4085 m, 4086 Column("id", Integer, Sequence("foo_id"), primary_key=True), 4087 Column("bar", String, ForeignKey("t2.id"), index=True), 4088 Column("bat", Integer, unique=True), 4089 ) 4090 Table( 4091 "t2", 4092 m, 4093 Column("id", Integer, primary_key=True), 4094 Column("bar", Integer), 4095 Column("bat", Integer), 4096 CheckConstraint("bar>5"), 4097 UniqueConstraint("bar", "bat"), 4098 Index(None, "bar", "bat"), 4099 ) 4100 eq_( 4101 canary, 4102 [ 4103 "PrimaryKeyConstraint->Table", 4104 "PrimaryKeyConstraint->t1", 4105 "Index->Table", 4106 "Index->t1", 4107 "ForeignKeyConstraint->Table", 4108 "ForeignKeyConstraint->t1", 4109 "UniqueConstraint->Table", 4110 "UniqueConstraint->t1", 4111 "PrimaryKeyConstraint->Table", 4112 "PrimaryKeyConstraint->t2", 4113 "CheckConstraint->Table", 4114 "CheckConstraint->t2", 4115 "UniqueConstraint->Table", 4116 "UniqueConstraint->t2", 4117 "Index->Table", 4118 "Index->t2", 4119 ], 4120 ) 4121 4122 4123class DialectKWArgTest(fixtures.TestBase): 4124 @contextmanager 4125 def _fixture(self): 4126 from sqlalchemy.engine.default import DefaultDialect 4127 4128 class ParticipatingDialect(DefaultDialect): 4129 construct_arguments = [ 4130 (schema.Index, {"x": 5, "y": False, "z_one": None}), 4131 (schema.ForeignKeyConstraint, {"foobar": False}), 4132 ] 4133 4134 class ParticipatingDialect2(DefaultDialect): 4135 construct_arguments = [ 4136 (schema.Index, {"x": 9, "y": True, "pp": "default"}), 4137 (schema.Table, {"*": None}), 4138 ] 4139 4140 class NonParticipatingDialect(DefaultDialect): 4141 construct_arguments = None 4142 4143 def load(dialect_name): 4144 if dialect_name == "participating": 4145 return ParticipatingDialect 4146 elif dialect_name == "participating2": 4147 return ParticipatingDialect2 4148 elif dialect_name == "nonparticipating": 4149 return NonParticipatingDialect 4150 else: 4151 raise exc.NoSuchModuleError("no dialect %r" % dialect_name) 4152 4153 with mock.patch("sqlalchemy.dialects.registry.load", load): 4154 yield 4155 4156 def teardown(self): 4157 Index._kw_registry.clear() 4158 4159 def test_participating(self): 4160 with self._fixture(): 4161 idx = Index("a", "b", "c", participating_y=True) 4162 eq_( 4163 idx.dialect_options, 4164 {"participating": {"x": 5, "y": True, "z_one": None}}, 4165 ) 4166 eq_(idx.dialect_kwargs, {"participating_y": True}) 4167 4168 def test_nonparticipating(self): 4169 with self._fixture(): 4170 idx = Index( 4171 "a", "b", "c", nonparticipating_y=True, nonparticipating_q=5 4172 ) 4173 eq_( 4174 idx.dialect_kwargs, 4175 {"nonparticipating_y": True, "nonparticipating_q": 5}, 4176 ) 4177 4178 def test_bad_kwarg_raise(self): 4179 with self._fixture(): 4180 assert_raises_message( 4181 TypeError, 4182 "Additional arguments should be named " 4183 "<dialectname>_<argument>, got 'foobar'", 4184 Index, 4185 "a", 4186 "b", 4187 "c", 4188 foobar=True, 4189 ) 4190 4191 def test_unknown_dialect_warning(self): 4192 with self._fixture(): 4193 with testing.expect_warnings( 4194 "Can't validate argument 'unknown_y'; can't locate " 4195 "any SQLAlchemy dialect named 'unknown'", 4196 ): 4197 Index("a", "b", "c", unknown_y=True) 4198 4199 def test_participating_bad_kw(self): 4200 with self._fixture(): 4201 assert_raises_message( 4202 exc.ArgumentError, 4203 "Argument 'participating_q_p_x' is not accepted by dialect " 4204 "'participating' on behalf of " 4205 "<class 'sqlalchemy.sql.schema.Index'>", 4206 Index, 4207 "a", 4208 "b", 4209 "c", 4210 participating_q_p_x=8, 4211 ) 4212 4213 def test_participating_unknown_schema_item(self): 4214 with self._fixture(): 4215 # the dialect doesn't include UniqueConstraint in 4216 # its registry at all. 4217 assert_raises_message( 4218 exc.ArgumentError, 4219 "Argument 'participating_q_p_x' is not accepted by dialect " 4220 "'participating' on behalf of " 4221 "<class 'sqlalchemy.sql.schema.UniqueConstraint'>", 4222 UniqueConstraint, 4223 "a", 4224 "b", 4225 participating_q_p_x=8, 4226 ) 4227 4228 @testing.emits_warning("Can't validate") 4229 def test_unknown_dialect_warning_still_populates(self): 4230 with self._fixture(): 4231 idx = Index("a", "b", "c", unknown_y=True) 4232 eq_(idx.dialect_kwargs, {"unknown_y": True}) # still populates 4233 4234 @testing.emits_warning("Can't validate") 4235 def test_unknown_dialect_warning_still_populates_multiple(self): 4236 with self._fixture(): 4237 idx = Index( 4238 "a", 4239 "b", 4240 "c", 4241 unknown_y=True, 4242 unknown_z=5, 4243 otherunknown_foo="bar", 4244 participating_y=8, 4245 ) 4246 eq_( 4247 idx.dialect_options, 4248 { 4249 "unknown": {"y": True, "z": 5, "*": None}, 4250 "otherunknown": {"foo": "bar", "*": None}, 4251 "participating": {"x": 5, "y": 8, "z_one": None}, 4252 }, 4253 ) 4254 eq_( 4255 idx.dialect_kwargs, 4256 { 4257 "unknown_z": 5, 4258 "participating_y": 8, 4259 "unknown_y": True, 4260 "otherunknown_foo": "bar", 4261 }, 4262 ) # still populates 4263 4264 def test_runs_safekwarg(self): 4265 4266 with mock.patch( 4267 "sqlalchemy.util.safe_kwarg", lambda arg: "goofy_%s" % arg 4268 ): 4269 with self._fixture(): 4270 idx = Index("a", "b") 4271 idx.kwargs[util.u("participating_x")] = 7 4272 4273 eq_(list(idx.dialect_kwargs), ["goofy_participating_x"]) 4274 4275 def test_combined(self): 4276 with self._fixture(): 4277 idx = Index( 4278 "a", "b", "c", participating_x=7, nonparticipating_y=True 4279 ) 4280 4281 eq_( 4282 idx.dialect_options, 4283 { 4284 "participating": {"y": False, "x": 7, "z_one": None}, 4285 "nonparticipating": {"y": True, "*": None}, 4286 }, 4287 ) 4288 eq_( 4289 idx.dialect_kwargs, 4290 {"participating_x": 7, "nonparticipating_y": True}, 4291 ) 4292 4293 def test_multiple_participating(self): 4294 with self._fixture(): 4295 idx = Index( 4296 "a", 4297 "b", 4298 "c", 4299 participating_x=7, 4300 participating2_x=15, 4301 participating2_y="lazy", 4302 ) 4303 eq_( 4304 idx.dialect_options, 4305 { 4306 "participating": {"x": 7, "y": False, "z_one": None}, 4307 "participating2": {"x": 15, "y": "lazy", "pp": "default"}, 4308 }, 4309 ) 4310 eq_( 4311 idx.dialect_kwargs, 4312 { 4313 "participating_x": 7, 4314 "participating2_x": 15, 4315 "participating2_y": "lazy", 4316 }, 4317 ) 4318 4319 def test_foreign_key_propagate(self): 4320 with self._fixture(): 4321 m = MetaData() 4322 fk = ForeignKey("t2.id", participating_foobar=True) 4323 t = Table("t", m, Column("id", Integer, fk)) 4324 fkc = [ 4325 c for c in t.constraints if isinstance(c, ForeignKeyConstraint) 4326 ][0] 4327 eq_(fkc.dialect_kwargs, {"participating_foobar": True}) 4328 4329 def test_foreign_key_propagate_exceptions_delayed(self): 4330 with self._fixture(): 4331 m = MetaData() 4332 fk = ForeignKey("t2.id", participating_fake=True) 4333 c1 = Column("id", Integer, fk) 4334 assert_raises_message( 4335 exc.ArgumentError, 4336 "Argument 'participating_fake' is not accepted by " 4337 "dialect 'participating' on behalf of " 4338 "<class 'sqlalchemy.sql.schema.ForeignKeyConstraint'>", 4339 Table, 4340 "t", 4341 m, 4342 c1, 4343 ) 4344 4345 def test_wildcard(self): 4346 with self._fixture(): 4347 m = MetaData() 4348 t = Table( 4349 "x", 4350 m, 4351 Column("x", Integer), 4352 participating2_xyz="foo", 4353 participating2_engine="InnoDB", 4354 ) 4355 eq_( 4356 t.dialect_kwargs, 4357 { 4358 "participating2_xyz": "foo", 4359 "participating2_engine": "InnoDB", 4360 }, 4361 ) 4362 4363 def test_uninit_wildcard(self): 4364 with self._fixture(): 4365 m = MetaData() 4366 t = Table("x", m, Column("x", Integer)) 4367 eq_(t.dialect_options["participating2"], {"*": None}) 4368 eq_(t.dialect_kwargs, {}) 4369 4370 def test_not_contains_wildcard(self): 4371 with self._fixture(): 4372 m = MetaData() 4373 t = Table("x", m, Column("x", Integer)) 4374 assert "foobar" not in t.dialect_options["participating2"] 4375 4376 def test_contains_wildcard(self): 4377 with self._fixture(): 4378 m = MetaData() 4379 t = Table("x", m, Column("x", Integer), participating2_foobar=5) 4380 assert "foobar" in t.dialect_options["participating2"] 4381 4382 def test_update(self): 4383 with self._fixture(): 4384 idx = Index("a", "b", "c", participating_x=20) 4385 eq_(idx.dialect_kwargs, {"participating_x": 20}) 4386 idx._validate_dialect_kwargs( 4387 {"participating_x": 25, "participating_z_one": "default"} 4388 ) 4389 eq_( 4390 idx.dialect_options, 4391 {"participating": {"x": 25, "y": False, "z_one": "default"}}, 4392 ) 4393 eq_( 4394 idx.dialect_kwargs, 4395 {"participating_x": 25, "participating_z_one": "default"}, 4396 ) 4397 4398 idx._validate_dialect_kwargs( 4399 {"participating_x": 25, "participating_z_one": "default"} 4400 ) 4401 4402 eq_( 4403 idx.dialect_options, 4404 {"participating": {"x": 25, "y": False, "z_one": "default"}}, 4405 ) 4406 eq_( 4407 idx.dialect_kwargs, 4408 {"participating_x": 25, "participating_z_one": "default"}, 4409 ) 4410 4411 idx._validate_dialect_kwargs( 4412 {"participating_y": True, "participating2_y": "p2y"} 4413 ) 4414 eq_( 4415 idx.dialect_options, 4416 { 4417 "participating": {"x": 25, "y": True, "z_one": "default"}, 4418 "participating2": {"y": "p2y", "pp": "default", "x": 9}, 4419 }, 4420 ) 4421 eq_( 4422 idx.dialect_kwargs, 4423 { 4424 "participating_x": 25, 4425 "participating_y": True, 4426 "participating2_y": "p2y", 4427 "participating_z_one": "default", 4428 }, 4429 ) 4430 4431 def test_key_error_kwargs_no_dialect(self): 4432 with self._fixture(): 4433 idx = Index("a", "b", "c") 4434 assert_raises(KeyError, idx.kwargs.__getitem__, "foo_bar") 4435 4436 def test_key_error_kwargs_no_underscore(self): 4437 with self._fixture(): 4438 idx = Index("a", "b", "c") 4439 assert_raises(KeyError, idx.kwargs.__getitem__, "foobar") 4440 4441 def test_key_error_kwargs_no_argument(self): 4442 with self._fixture(): 4443 idx = Index("a", "b", "c") 4444 assert_raises( 4445 KeyError, idx.kwargs.__getitem__, "participating_asdmfq34098" 4446 ) 4447 4448 assert_raises( 4449 KeyError, 4450 idx.kwargs.__getitem__, 4451 "nonparticipating_asdmfq34098", 4452 ) 4453 4454 def test_key_error_dialect_options(self): 4455 with self._fixture(): 4456 idx = Index("a", "b", "c") 4457 assert_raises( 4458 KeyError, 4459 idx.dialect_options["participating"].__getitem__, 4460 "asdfaso890", 4461 ) 4462 4463 assert_raises( 4464 KeyError, 4465 idx.dialect_options["nonparticipating"].__getitem__, 4466 "asdfaso890", 4467 ) 4468 4469 def test_ad_hoc_participating_via_opt(self): 4470 with self._fixture(): 4471 idx = Index("a", "b", "c") 4472 idx.dialect_options["participating"]["foobar"] = 5 4473 4474 eq_(idx.dialect_options["participating"]["foobar"], 5) 4475 eq_(idx.kwargs["participating_foobar"], 5) 4476 4477 def test_ad_hoc_nonparticipating_via_opt(self): 4478 with self._fixture(): 4479 idx = Index("a", "b", "c") 4480 idx.dialect_options["nonparticipating"]["foobar"] = 5 4481 4482 eq_(idx.dialect_options["nonparticipating"]["foobar"], 5) 4483 eq_(idx.kwargs["nonparticipating_foobar"], 5) 4484 4485 def test_ad_hoc_participating_via_kwargs(self): 4486 with self._fixture(): 4487 idx = Index("a", "b", "c") 4488 idx.kwargs["participating_foobar"] = 5 4489 4490 eq_(idx.dialect_options["participating"]["foobar"], 5) 4491 eq_(idx.kwargs["participating_foobar"], 5) 4492 4493 def test_ad_hoc_nonparticipating_via_kwargs(self): 4494 with self._fixture(): 4495 idx = Index("a", "b", "c") 4496 idx.kwargs["nonparticipating_foobar"] = 5 4497 4498 eq_(idx.dialect_options["nonparticipating"]["foobar"], 5) 4499 eq_(idx.kwargs["nonparticipating_foobar"], 5) 4500 4501 def test_ad_hoc_via_kwargs_invalid_key(self): 4502 with self._fixture(): 4503 idx = Index("a", "b", "c") 4504 assert_raises_message( 4505 exc.ArgumentError, 4506 "Keys must be of the form <dialectname>_<argname>", 4507 idx.kwargs.__setitem__, 4508 "foobar", 4509 5, 4510 ) 4511 4512 def test_ad_hoc_via_kwargs_invalid_dialect(self): 4513 with self._fixture(): 4514 idx = Index("a", "b", "c") 4515 assert_raises_message( 4516 exc.ArgumentError, 4517 "no dialect 'nonexistent'", 4518 idx.kwargs.__setitem__, 4519 "nonexistent_foobar", 4520 5, 4521 ) 4522 4523 def test_add_new_arguments_participating(self): 4524 with self._fixture(): 4525 Index.argument_for("participating", "xyzqpr", False) 4526 4527 idx = Index("a", "b", "c", participating_xyzqpr=True) 4528 4529 eq_(idx.kwargs["participating_xyzqpr"], True) 4530 4531 idx = Index("a", "b", "c") 4532 eq_(idx.dialect_options["participating"]["xyzqpr"], False) 4533 4534 def test_add_new_arguments_participating_no_existing(self): 4535 with self._fixture(): 4536 PrimaryKeyConstraint.argument_for("participating", "xyzqpr", False) 4537 4538 pk = PrimaryKeyConstraint("a", "b", "c", participating_xyzqpr=True) 4539 4540 eq_(pk.kwargs["participating_xyzqpr"], True) 4541 4542 pk = PrimaryKeyConstraint("a", "b", "c") 4543 eq_(pk.dialect_options["participating"]["xyzqpr"], False) 4544 4545 def test_add_new_arguments_nonparticipating(self): 4546 with self._fixture(): 4547 assert_raises_message( 4548 exc.ArgumentError, 4549 "Dialect 'nonparticipating' does have keyword-argument " 4550 "validation and defaults enabled configured", 4551 Index.argument_for, 4552 "nonparticipating", 4553 "xyzqpr", 4554 False, 4555 ) 4556 4557 def test_add_new_arguments_invalid_dialect(self): 4558 with self._fixture(): 4559 assert_raises_message( 4560 exc.ArgumentError, 4561 "no dialect 'nonexistent'", 4562 Index.argument_for, 4563 "nonexistent", 4564 "foobar", 4565 5, 4566 ) 4567 4568 4569class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): 4570 __dialect__ = "default" 4571 4572 def _fixture(self, naming_convention, table_schema=None): 4573 m1 = MetaData(naming_convention=naming_convention) 4574 4575 u1 = Table( 4576 "user", 4577 m1, 4578 Column("id", Integer, primary_key=True), 4579 Column("version", Integer, primary_key=True), 4580 Column("data", String(30)), 4581 Column("Data2", String(30), key="data2"), 4582 Column("Data3", String(30), key="data3"), 4583 schema=table_schema, 4584 ) 4585 4586 return u1 4587 4588 def _colliding_name_fixture(self, naming_convention, id_flags): 4589 m1 = MetaData(naming_convention=naming_convention) 4590 4591 t1 = Table( 4592 "foo", 4593 m1, 4594 Column("id", Integer, **id_flags), 4595 Column("foo_id", Integer), 4596 ) 4597 return t1 4598 4599 def test_colliding_col_label_from_index_flag(self): 4600 t1 = self._colliding_name_fixture( 4601 {"ix": "ix_%(column_0_label)s"}, {"index": True} 4602 ) 4603 4604 idx = list(t1.indexes)[0] 4605 4606 # name is generated up front. alembic really prefers this 4607 eq_(idx.name, "ix_foo_id") 4608 self.assert_compile( 4609 CreateIndex(idx), "CREATE INDEX ix_foo_id ON foo (id)" 4610 ) 4611 4612 def test_colliding_col_label_from_unique_flag(self): 4613 t1 = self._colliding_name_fixture( 4614 {"uq": "uq_%(column_0_label)s"}, {"unique": True} 4615 ) 4616 4617 const = [c for c in t1.constraints if isinstance(c, UniqueConstraint)] 4618 uq = const[0] 4619 4620 # name is generated up front. alembic really prefers this 4621 eq_(uq.name, "uq_foo_id") 4622 4623 self.assert_compile( 4624 AddConstraint(uq), 4625 "ALTER TABLE foo ADD CONSTRAINT uq_foo_id UNIQUE (id)", 4626 ) 4627 4628 def test_colliding_col_label_from_index_obj(self): 4629 t1 = self._colliding_name_fixture({"ix": "ix_%(column_0_label)s"}, {}) 4630 4631 idx = Index(None, t1.c.id) 4632 is_(idx, list(t1.indexes)[0]) 4633 eq_(idx.name, "ix_foo_id") 4634 self.assert_compile( 4635 CreateIndex(idx), "CREATE INDEX ix_foo_id ON foo (id)" 4636 ) 4637 4638 def test_colliding_col_label_from_unique_obj(self): 4639 t1 = self._colliding_name_fixture({"uq": "uq_%(column_0_label)s"}, {}) 4640 uq = UniqueConstraint(t1.c.id) 4641 const = [c for c in t1.constraints if isinstance(c, UniqueConstraint)] 4642 is_(const[0], uq) 4643 eq_(const[0].name, "uq_foo_id") 4644 self.assert_compile( 4645 AddConstraint(const[0]), 4646 "ALTER TABLE foo ADD CONSTRAINT uq_foo_id UNIQUE (id)", 4647 ) 4648 4649 def test_colliding_col_label_from_index_flag_no_conv(self): 4650 t1 = self._colliding_name_fixture({"ck": "foo"}, {"index": True}) 4651 4652 idx = list(t1.indexes)[0] 4653 4654 # this behavior needs to fail, as of #4911 since we are testing it, 4655 # ensure it raises a CompileError. In #4289 we may want to revisit 4656 # this in some way, most likely specifically to Postgresql only. 4657 assert_raises_message( 4658 exc.CompileError, 4659 "CREATE INDEX requires that the index have a name", 4660 CreateIndex(idx).compile, 4661 ) 4662 4663 assert_raises_message( 4664 exc.CompileError, 4665 "DROP INDEX requires that the index have a name", 4666 DropIndex(idx).compile, 4667 ) 4668 4669 def test_colliding_col_label_from_unique_flag_no_conv(self): 4670 t1 = self._colliding_name_fixture({"ck": "foo"}, {"unique": True}) 4671 4672 const = [c for c in t1.constraints if isinstance(c, UniqueConstraint)] 4673 is_(const[0].name, None) 4674 4675 self.assert_compile( 4676 AddConstraint(const[0]), "ALTER TABLE foo ADD UNIQUE (id)" 4677 ) 4678 4679 @testing.combinations( 4680 ("nopk",), 4681 ("column",), 4682 ("constraint",), 4683 ("explicit_name",), 4684 argnames="pktype", 4685 ) 4686 @testing.combinations( 4687 ("pk_%(table_name)s", "pk_t1"), 4688 ("pk_%(column_0_name)s", "pk_x"), 4689 ("pk_%(column_0_N_name)s", "pk_x_y"), 4690 ("pk_%(column_0_N_label)s", "pk_t1_x_t1_y"), 4691 ("%(column_0_name)s", "x"), 4692 ("%(column_0N_name)s", "xy"), 4693 argnames="conv, expected_name", 4694 ) 4695 def test_pk_conventions(self, conv, expected_name, pktype): 4696 m1 = MetaData(naming_convention={"pk": conv}) 4697 4698 if pktype == "column": 4699 t1 = Table( 4700 "t1", 4701 m1, 4702 Column("x", Integer, primary_key=True), 4703 Column("y", Integer, primary_key=True), 4704 ) 4705 elif pktype == "constraint": 4706 t1 = Table( 4707 "t1", 4708 m1, 4709 Column("x", Integer), 4710 Column("y", Integer), 4711 PrimaryKeyConstraint("x", "y"), 4712 ) 4713 elif pktype == "nopk": 4714 t1 = Table( 4715 "t1", 4716 m1, 4717 Column("x", Integer, nullable=False), 4718 Column("y", Integer, nullable=False), 4719 ) 4720 expected_name = None 4721 elif pktype == "explicit_name": 4722 t1 = Table( 4723 "t1", 4724 m1, 4725 Column("x", Integer, primary_key=True), 4726 Column("y", Integer, primary_key=True), 4727 PrimaryKeyConstraint("x", "y", name="myname"), 4728 ) 4729 expected_name = "myname" 4730 4731 if expected_name: 4732 eq_(t1.primary_key.name, expected_name) 4733 4734 if pktype == "nopk": 4735 self.assert_compile( 4736 schema.CreateTable(t1), 4737 "CREATE TABLE t1 (x INTEGER NOT NULL, y INTEGER NOT NULL)", 4738 ) 4739 else: 4740 self.assert_compile( 4741 schema.CreateTable(t1), 4742 "CREATE TABLE t1 (x INTEGER NOT NULL, y INTEGER NOT NULL, " 4743 "CONSTRAINT %s PRIMARY KEY (x, y))" % expected_name, 4744 ) 4745 4746 def test_uq_name(self): 4747 u1 = self._fixture( 4748 naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"} 4749 ) 4750 uq = UniqueConstraint(u1.c.data) 4751 eq_(uq.name, "uq_user_data") 4752 4753 def test_uq_conv_name(self): 4754 u1 = self._fixture( 4755 naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"} 4756 ) 4757 uq = UniqueConstraint(u1.c.data, name=naming.conv("myname")) 4758 self.assert_compile( 4759 schema.AddConstraint(uq), 4760 'ALTER TABLE "user" ADD CONSTRAINT myname UNIQUE (data)', 4761 dialect="default", 4762 ) 4763 4764 def test_uq_defer_name_convention(self): 4765 u1 = self._fixture( 4766 naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"} 4767 ) 4768 uq = UniqueConstraint(u1.c.data, name=naming._NONE_NAME) 4769 self.assert_compile( 4770 schema.AddConstraint(uq), 4771 'ALTER TABLE "user" ADD CONSTRAINT uq_user_data UNIQUE (data)', 4772 dialect="default", 4773 ) 4774 4775 def test_uq_key(self): 4776 u1 = self._fixture( 4777 naming_convention={"uq": "uq_%(table_name)s_%(column_0_key)s"} 4778 ) 4779 uq = UniqueConstraint(u1.c.data, u1.c.data2) 4780 eq_(uq.name, "uq_user_data") 4781 4782 def test_uq_label(self): 4783 u1 = self._fixture( 4784 naming_convention={"uq": "uq_%(table_name)s_%(column_0_label)s"} 4785 ) 4786 uq = UniqueConstraint(u1.c.data, u1.c.data2) 4787 eq_(uq.name, "uq_user_user_data") 4788 4789 def test_uq_allcols_underscore_name(self): 4790 u1 = self._fixture( 4791 naming_convention={"uq": "uq_%(table_name)s_%(column_0_N_name)s"} 4792 ) 4793 uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3) 4794 eq_(uq.name, "uq_user_data_Data2_Data3") 4795 4796 def test_uq_allcols_merged_name(self): 4797 u1 = self._fixture( 4798 naming_convention={"uq": "uq_%(table_name)s_%(column_0N_name)s"} 4799 ) 4800 uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3) 4801 eq_(uq.name, "uq_user_dataData2Data3") 4802 4803 def test_uq_allcols_merged_key(self): 4804 u1 = self._fixture( 4805 naming_convention={"uq": "uq_%(table_name)s_%(column_0N_key)s"} 4806 ) 4807 uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3) 4808 eq_(uq.name, "uq_user_datadata2data3") 4809 4810 def test_uq_allcols_truncated_name(self): 4811 u1 = self._fixture( 4812 naming_convention={"uq": "uq_%(table_name)s_%(column_0N_name)s"} 4813 ) 4814 uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3) 4815 4816 dialect = default.DefaultDialect() 4817 self.assert_compile( 4818 schema.AddConstraint(uq), 4819 'ALTER TABLE "user" ADD ' 4820 'CONSTRAINT "uq_user_dataData2Data3" ' 4821 'UNIQUE (data, "Data2", "Data3")', 4822 dialect=dialect, 4823 ) 4824 4825 dialect.max_identifier_length = 15 4826 self.assert_compile( 4827 schema.AddConstraint(uq), 4828 'ALTER TABLE "user" ADD ' 4829 'CONSTRAINT uq_user_2769 UNIQUE (data, "Data2", "Data3")', 4830 dialect=dialect, 4831 ) 4832 4833 def test_fk_allcols_underscore_name(self): 4834 u1 = self._fixture( 4835 naming_convention={ 4836 "fk": "fk_%(table_name)s_%(column_0_N_name)s_" 4837 "%(referred_table_name)s_%(referred_column_0_N_name)s" 4838 } 4839 ) 4840 4841 m1 = u1.metadata 4842 a1 = Table( 4843 "address", 4844 m1, 4845 Column("id", Integer, primary_key=True), 4846 Column("UserData", String(30), key="user_data"), 4847 Column("UserData2", String(30), key="user_data2"), 4848 Column("UserData3", String(30), key="user_data3"), 4849 ) 4850 fk = ForeignKeyConstraint( 4851 ["user_data", "user_data2", "user_data3"], 4852 ["user.data", "user.data2", "user.data3"], 4853 ) 4854 a1.append_constraint(fk) 4855 self.assert_compile( 4856 schema.AddConstraint(fk), 4857 "ALTER TABLE address ADD CONSTRAINT " 4858 '"fk_address_UserData_UserData2_UserData3_user_data_Data2_Data3" ' 4859 'FOREIGN KEY("UserData", "UserData2", "UserData3") ' 4860 'REFERENCES "user" (data, "Data2", "Data3")', 4861 dialect=default.DefaultDialect(), 4862 ) 4863 4864 def test_fk_allcols_merged_name(self): 4865 u1 = self._fixture( 4866 naming_convention={ 4867 "fk": "fk_%(table_name)s_%(column_0N_name)s_" 4868 "%(referred_table_name)s_%(referred_column_0N_name)s" 4869 } 4870 ) 4871 4872 m1 = u1.metadata 4873 a1 = Table( 4874 "address", 4875 m1, 4876 Column("id", Integer, primary_key=True), 4877 Column("UserData", String(30), key="user_data"), 4878 Column("UserData2", String(30), key="user_data2"), 4879 Column("UserData3", String(30), key="user_data3"), 4880 ) 4881 fk = ForeignKeyConstraint( 4882 ["user_data", "user_data2", "user_data3"], 4883 ["user.data", "user.data2", "user.data3"], 4884 ) 4885 a1.append_constraint(fk) 4886 self.assert_compile( 4887 schema.AddConstraint(fk), 4888 "ALTER TABLE address ADD CONSTRAINT " 4889 '"fk_address_UserDataUserData2UserData3_user_dataData2Data3" ' 4890 'FOREIGN KEY("UserData", "UserData2", "UserData3") ' 4891 'REFERENCES "user" (data, "Data2", "Data3")', 4892 dialect=default.DefaultDialect(), 4893 ) 4894 4895 def test_fk_allcols_truncated_name(self): 4896 u1 = self._fixture( 4897 naming_convention={ 4898 "fk": "fk_%(table_name)s_%(column_0N_name)s_" 4899 "%(referred_table_name)s_%(referred_column_0N_name)s" 4900 } 4901 ) 4902 4903 m1 = u1.metadata 4904 a1 = Table( 4905 "address", 4906 m1, 4907 Column("id", Integer, primary_key=True), 4908 Column("UserData", String(30), key="user_data"), 4909 Column("UserData2", String(30), key="user_data2"), 4910 Column("UserData3", String(30), key="user_data3"), 4911 ) 4912 fk = ForeignKeyConstraint( 4913 ["user_data", "user_data2", "user_data3"], 4914 ["user.data", "user.data2", "user.data3"], 4915 ) 4916 a1.append_constraint(fk) 4917 4918 dialect = default.DefaultDialect() 4919 dialect.max_identifier_length = 15 4920 self.assert_compile( 4921 schema.AddConstraint(fk), 4922 "ALTER TABLE address ADD CONSTRAINT " 4923 "fk_addr_f9ff " 4924 'FOREIGN KEY("UserData", "UserData2", "UserData3") ' 4925 'REFERENCES "user" (data, "Data2", "Data3")', 4926 dialect=dialect, 4927 ) 4928 4929 def test_ix_allcols_truncation(self): 4930 u1 = self._fixture( 4931 naming_convention={"ix": "ix_%(table_name)s_%(column_0N_name)s"} 4932 ) 4933 ix = Index(None, u1.c.data, u1.c.data2, u1.c.data3) 4934 dialect = default.DefaultDialect() 4935 dialect.max_identifier_length = 15 4936 self.assert_compile( 4937 schema.CreateIndex(ix), 4938 "CREATE INDEX ix_user_2de9 ON " '"user" (data, "Data2", "Data3")', 4939 dialect=dialect, 4940 ) 4941 4942 def test_ix_name(self): 4943 u1 = self._fixture( 4944 naming_convention={"ix": "ix_%(table_name)s_%(column_0_name)s"} 4945 ) 4946 ix = Index(None, u1.c.data) 4947 eq_(ix.name, "ix_user_data") 4948 4949 def test_ck_name_required(self): 4950 u1 = self._fixture( 4951 naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} 4952 ) 4953 ck = CheckConstraint(u1.c.data == "x", name="mycheck") 4954 eq_(ck.name, "ck_user_mycheck") 4955 4956 assert_raises_message( 4957 exc.InvalidRequestError, 4958 r"Naming convention including %\(constraint_name\)s token " 4959 "requires that constraint is explicitly named.", 4960 CheckConstraint, 4961 u1.c.data == "x", 4962 ) 4963 4964 def test_ck_name_deferred_required(self): 4965 u1 = self._fixture( 4966 naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} 4967 ) 4968 ck = CheckConstraint(u1.c.data == "x", name=naming._NONE_NAME) 4969 4970 assert_raises_message( 4971 exc.InvalidRequestError, 4972 r"Naming convention including %\(constraint_name\)s token " 4973 "requires that constraint is explicitly named.", 4974 schema.AddConstraint(ck).compile, 4975 ) 4976 4977 def test_column_attached_ck_name(self): 4978 m = MetaData( 4979 naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} 4980 ) 4981 ck = CheckConstraint("x > 5", name="x1") 4982 Table("t", m, Column("x", ck)) 4983 eq_(ck.name, "ck_t_x1") 4984 4985 def test_table_attached_ck_name(self): 4986 m = MetaData( 4987 naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} 4988 ) 4989 ck = CheckConstraint("x > 5", name="x1") 4990 Table("t", m, Column("x", Integer), ck) 4991 eq_(ck.name, "ck_t_x1") 4992 4993 def test_uq_name_already_conv(self): 4994 m = MetaData( 4995 naming_convention={ 4996 "uq": "uq_%(constraint_name)s_%(column_0_name)s" 4997 } 4998 ) 4999 5000 t = Table("mytable", m) 5001 uq = UniqueConstraint(name=naming.conv("my_special_key")) 5002 5003 t.append_constraint(uq) 5004 eq_(uq.name, "my_special_key") 5005 5006 def test_fk_name_schema(self): 5007 u1 = self._fixture( 5008 naming_convention={ 5009 "fk": "fk_%(table_name)s_%(column_0_name)s_" 5010 "%(referred_table_name)s_%(referred_column_0_name)s" 5011 }, 5012 table_schema="foo", 5013 ) 5014 m1 = u1.metadata 5015 a1 = Table( 5016 "address", 5017 m1, 5018 Column("id", Integer, primary_key=True), 5019 Column("user_id", Integer), 5020 Column("user_version_id", Integer), 5021 ) 5022 fk = ForeignKeyConstraint( 5023 ["user_id", "user_version_id"], ["foo.user.id", "foo.user.version"] 5024 ) 5025 a1.append_constraint(fk) 5026 eq_(fk.name, "fk_address_user_id_user_id") 5027 5028 def test_fk_attrs(self): 5029 u1 = self._fixture( 5030 naming_convention={ 5031 "fk": "fk_%(table_name)s_%(column_0_name)s_" 5032 "%(referred_table_name)s_%(referred_column_0_name)s" 5033 } 5034 ) 5035 m1 = u1.metadata 5036 a1 = Table( 5037 "address", 5038 m1, 5039 Column("id", Integer, primary_key=True), 5040 Column("user_id", Integer), 5041 Column("user_version_id", Integer), 5042 ) 5043 fk = ForeignKeyConstraint( 5044 ["user_id", "user_version_id"], ["user.id", "user.version"] 5045 ) 5046 a1.append_constraint(fk) 5047 eq_(fk.name, "fk_address_user_id_user_id") 5048 5049 def test_custom(self): 5050 def key_hash(const, table): 5051 return "HASH_%s" % table.name 5052 5053 u1 = self._fixture( 5054 naming_convention={ 5055 "fk": "fk_%(table_name)s_%(key_hash)s", 5056 "key_hash": key_hash, 5057 } 5058 ) 5059 m1 = u1.metadata 5060 a1 = Table( 5061 "address", 5062 m1, 5063 Column("id", Integer, primary_key=True), 5064 Column("user_id", Integer), 5065 Column("user_version_id", Integer), 5066 ) 5067 fk = ForeignKeyConstraint( 5068 ["user_id", "user_version_id"], ["user.id", "user.version"] 5069 ) 5070 a1.append_constraint(fk) 5071 eq_(fk.name, "fk_address_HASH_address") 5072 5073 def test_schematype_ck_name_boolean(self): 5074 m1 = MetaData( 5075 naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} 5076 ) 5077 5078 u1 = Table( 5079 "user", 5080 m1, 5081 Column("x", Boolean(name="foo")), 5082 ) 5083 5084 self.assert_compile( 5085 schema.CreateTable(u1), 5086 'CREATE TABLE "user" (' 5087 "x BOOLEAN, " 5088 "CONSTRAINT ck_user_foo CHECK (x IN (0, 1))" 5089 ")", 5090 ) 5091 5092 # test no side effects from first compile 5093 self.assert_compile( 5094 schema.CreateTable(u1), 5095 'CREATE TABLE "user" (' 5096 "x BOOLEAN, " 5097 "CONSTRAINT ck_user_foo CHECK (x IN (0, 1))" 5098 ")", 5099 ) 5100 5101 def test_schematype_ck_name_boolean_not_on_name(self): 5102 m1 = MetaData( 5103 naming_convention={"ck": "ck_%(table_name)s_%(column_0_name)s"} 5104 ) 5105 5106 u1 = Table("user", m1, Column("x", Boolean())) 5107 # constraint is not hit 5108 is_( 5109 [c for c in u1.constraints if isinstance(c, CheckConstraint)][ 5110 0 5111 ].name, 5112 _NONE_NAME, 5113 ) 5114 # but is hit at compile time 5115 self.assert_compile( 5116 schema.CreateTable(u1), 5117 'CREATE TABLE "user" (' 5118 "x BOOLEAN, " 5119 "CONSTRAINT ck_user_x CHECK (x IN (0, 1))" 5120 ")", 5121 ) 5122 5123 def test_schematype_ck_name_enum(self): 5124 m1 = MetaData( 5125 naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} 5126 ) 5127 5128 u1 = Table( 5129 "user", 5130 m1, 5131 Column("x", Enum("a", "b", name="foo")), 5132 ) 5133 5134 self.assert_compile( 5135 schema.CreateTable(u1), 5136 'CREATE TABLE "user" (' 5137 "x VARCHAR(1), " 5138 "CONSTRAINT ck_user_foo CHECK (x IN ('a', 'b'))" 5139 ")", 5140 ) 5141 5142 # test no side effects from first compile 5143 self.assert_compile( 5144 schema.CreateTable(u1), 5145 'CREATE TABLE "user" (' 5146 "x VARCHAR(1), " 5147 "CONSTRAINT ck_user_foo CHECK (x IN ('a', 'b'))" 5148 ")", 5149 ) 5150 5151 def test_schematype_ck_name_propagate_conv(self): 5152 m1 = MetaData( 5153 naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} 5154 ) 5155 5156 u1 = Table( 5157 "user", m1, Column("x", Enum("a", "b", name=naming.conv("foo"))) 5158 ) 5159 eq_( 5160 [c for c in u1.constraints if isinstance(c, CheckConstraint)][ 5161 0 5162 ].name, 5163 "foo", 5164 ) 5165 # but is hit at compile time 5166 self.assert_compile( 5167 schema.CreateTable(u1), 5168 'CREATE TABLE "user" (' 5169 "x VARCHAR(1), " 5170 "CONSTRAINT foo CHECK (x IN ('a', 'b'))" 5171 ")", 5172 ) 5173 5174 def test_schematype_ck_name_boolean_no_name(self): 5175 m1 = MetaData( 5176 naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} 5177 ) 5178 5179 u1 = Table("user", m1, Column("x", Boolean())) 5180 # constraint gets special _defer_none_name 5181 is_( 5182 [c for c in u1.constraints if isinstance(c, CheckConstraint)][ 5183 0 5184 ].name, 5185 _NONE_NAME, 5186 ) 5187 # no issue with native boolean 5188 self.assert_compile( 5189 schema.CreateTable(u1), 5190 'CREATE TABLE "user" (' "x BOOLEAN" ")", 5191 dialect="postgresql", 5192 ) 5193 5194 assert_raises_message( 5195 exc.InvalidRequestError, 5196 r"Naming convention including \%\(constraint_name\)s token " 5197 r"requires that constraint is explicitly named.", 5198 schema.CreateTable(u1).compile, 5199 dialect=default.DefaultDialect(), 5200 ) 5201 5202 def test_schematype_no_ck_name_boolean_no_name(self): 5203 m1 = MetaData() # no naming convention 5204 5205 u1 = Table("user", m1, Column("x", Boolean())) 5206 # constraint gets special _defer_none_name 5207 is_( 5208 [c for c in u1.constraints if isinstance(c, CheckConstraint)][ 5209 0 5210 ].name, 5211 _NONE_NAME, 5212 ) 5213 5214 self.assert_compile( 5215 schema.CreateTable(u1), 5216 'CREATE TABLE "user" (x BOOLEAN, CHECK (x IN (0, 1)))', 5217 ) 5218 5219 def test_ck_constraint_redundant_event(self): 5220 u1 = self._fixture( 5221 naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} 5222 ) 5223 5224 ck1 = CheckConstraint(u1.c.version > 3, name="foo") 5225 u1.append_constraint(ck1) 5226 u1.append_constraint(ck1) 5227 u1.append_constraint(ck1) 5228 5229 eq_(ck1.name, "ck_user_foo") 5230 5231 def test_pickle_metadata(self): 5232 m = MetaData(naming_convention={"pk": "%(table_name)s_pk"}) 5233 5234 m2 = pickle.loads(pickle.dumps(m)) 5235 5236 eq_(m2.naming_convention, {"pk": "%(table_name)s_pk"}) 5237 5238 t2a = Table("t2", m, Column("id", Integer, primary_key=True)) 5239 t2b = Table("t2", m2, Column("id", Integer, primary_key=True)) 5240 5241 eq_(t2a.primary_key.name, t2b.primary_key.name) 5242 eq_(t2b.primary_key.name, "t2_pk") 5243 5244 def test_expression_index(self): 5245 m = MetaData(naming_convention={"ix": "ix_%(column_0_label)s"}) 5246 t = Table("t", m, Column("q", Integer), Column("p", Integer)) 5247 ix = Index(None, t.c.q + 5) 5248 t.append_constraint(ix) 5249 5250 # huh. pretty cool 5251 self.assert_compile( 5252 CreateIndex(ix), "CREATE INDEX ix_t_q ON t (q + 5)" 5253 ) 5254 5255 5256class CopyDialectOptionsTest(fixtures.TestBase): 5257 @contextmanager 5258 def _fixture(self): 5259 from sqlalchemy.engine.default import DefaultDialect 5260 5261 class CopyDialectOptionsTestDialect(DefaultDialect): 5262 construct_arguments = [ 5263 (Table, {"some_table_arg": None}), 5264 (Column, {"some_column_arg": None}), 5265 (Index, {"some_index_arg": None}), 5266 (PrimaryKeyConstraint, {"some_pk_arg": None}), 5267 (UniqueConstraint, {"some_uq_arg": None}), 5268 ] 5269 5270 def load(dialect_name): 5271 if dialect_name == "copydialectoptionstest": 5272 return CopyDialectOptionsTestDialect 5273 else: 5274 raise exc.NoSuchModuleError("no dialect %r" % dialect_name) 5275 5276 with mock.patch("sqlalchemy.dialects.registry.load", load): 5277 yield 5278 5279 @classmethod 5280 def check_dialect_options_(cls, t): 5281 eq_( 5282 t.dialect_kwargs["copydialectoptionstest_some_table_arg"], 5283 "a1", 5284 ) 5285 eq_( 5286 t.c.foo.dialect_kwargs["copydialectoptionstest_some_column_arg"], 5287 "a2", 5288 ) 5289 eq_( 5290 t.primary_key.dialect_kwargs["copydialectoptionstest_some_pk_arg"], 5291 "a3", 5292 ) 5293 eq_( 5294 list(t.indexes)[0].dialect_kwargs[ 5295 "copydialectoptionstest_some_index_arg" 5296 ], 5297 "a4", 5298 ) 5299 eq_( 5300 list(c for c in t.constraints if isinstance(c, UniqueConstraint))[ 5301 0 5302 ].dialect_kwargs["copydialectoptionstest_some_uq_arg"], 5303 "a5", 5304 ) 5305 5306 def test_dialect_options_are_copied(self): 5307 with self._fixture(): 5308 t1 = Table( 5309 "t", 5310 MetaData(), 5311 Column( 5312 "foo", 5313 Integer, 5314 copydialectoptionstest_some_column_arg="a2", 5315 ), 5316 Column("bar", Integer), 5317 PrimaryKeyConstraint( 5318 "foo", copydialectoptionstest_some_pk_arg="a3" 5319 ), 5320 UniqueConstraint( 5321 "bar", copydialectoptionstest_some_uq_arg="a5" 5322 ), 5323 copydialectoptionstest_some_table_arg="a1", 5324 ) 5325 Index( 5326 "idx", 5327 t1.c.foo, 5328 copydialectoptionstest_some_index_arg="a4", 5329 ) 5330 5331 self.check_dialect_options_(t1) 5332 5333 m2 = MetaData() 5334 t2 = t1.tometadata(m2) # make a copy 5335 self.check_dialect_options_(t2) 5336