1from sqlalchemy.testing import assert_raises 2from sqlalchemy.testing import assert_raises_message 3from sqlalchemy.testing import emits_warning 4import pickle 5from sqlalchemy import Integer, String, UniqueConstraint, \ 6 CheckConstraint, ForeignKey, MetaData, Sequence, \ 7 ForeignKeyConstraint, PrimaryKeyConstraint, ColumnDefault, Index, event,\ 8 events, Unicode, types as sqltypes, bindparam, \ 9 Table, Column, Boolean, Enum, func, text, TypeDecorator, \ 10 BLANK_SCHEMA 11from sqlalchemy import schema, exc 12from sqlalchemy.engine import default 13from sqlalchemy.sql import elements, naming 14import sqlalchemy as tsa 15from sqlalchemy.testing import fixtures 16from sqlalchemy import testing 17from sqlalchemy.testing import ComparesTables, AssertsCompiledSQL 18from sqlalchemy.testing import eq_, is_, mock, is_true 19from contextlib import contextmanager 20from sqlalchemy import util 21 22 23class MetaDataTest(fixtures.TestBase, ComparesTables): 24 25 def test_metadata_contains(self): 26 metadata = MetaData() 27 t1 = Table('t1', metadata, Column('x', Integer)) 28 t2 = Table('t2', metadata, Column('x', Integer), schema='foo') 29 t3 = Table('t2', MetaData(), Column('x', Integer)) 30 t4 = Table('t1', MetaData(), Column('x', Integer), schema='foo') 31 32 assert "t1" in metadata 33 assert "foo.t2" in metadata 34 assert "t2" not in metadata 35 assert "foo.t1" not in metadata 36 assert t1 in metadata 37 assert t2 in metadata 38 assert t3 not in metadata 39 assert t4 not in metadata 40 41 def test_uninitialized_column_copy(self): 42 for col in [ 43 Column('foo', String(), nullable=False), 44 Column('baz', String(), unique=True), 45 Column(Integer(), primary_key=True), 46 Column('bar', Integer(), Sequence('foo_seq'), primary_key=True, 47 key='bar'), 48 Column(Integer(), ForeignKey('bat.blah'), doc="this is a col"), 49 Column('bar', Integer(), ForeignKey('bat.blah'), primary_key=True, 50 key='bar'), 51 Column('bar', Integer(), info={'foo': 'bar'}), 52 ]: 53 c2 = col.copy() 54 for attr in ('name', 'type', 'nullable', 55 'primary_key', 'key', 'unique', 'info', 56 'doc'): 57 eq_(getattr(col, attr), getattr(c2, attr)) 58 eq_(len(col.foreign_keys), len(c2.foreign_keys)) 59 if col.default: 60 eq_(c2.default.name, 'foo_seq') 61 for a1, a2 in zip(col.foreign_keys, c2.foreign_keys): 62 assert a1 is not a2 63 eq_(a2._colspec, 'bat.blah') 64 65 def test_col_subclass_copy(self): 66 class MyColumn(schema.Column): 67 68 def __init__(self, *args, **kw): 69 self.widget = kw.pop('widget', None) 70 super(MyColumn, self).__init__(*args, **kw) 71 72 def copy(self, *arg, **kw): 73 c = super(MyColumn, self).copy(*arg, **kw) 74 c.widget = self.widget 75 return c 76 c1 = MyColumn('foo', Integer, widget='x') 77 c2 = c1.copy() 78 assert isinstance(c2, MyColumn) 79 eq_(c2.widget, 'x') 80 81 def test_uninitialized_column_copy_events(self): 82 msgs = [] 83 84 def write(c, t): 85 msgs.append("attach %s.%s" % (t.name, c.name)) 86 c1 = Column('foo', String()) 87 m = MetaData() 88 for i in range(3): 89 cx = c1.copy() 90 # as of 0.7, these events no longer copy. its expected 91 # that listeners will be re-established from the 92 # natural construction of things. 93 cx._on_table_attach(write) 94 Table('foo%d' % i, m, cx) 95 eq_(msgs, ['attach foo0.foo', 'attach foo1.foo', 'attach foo2.foo']) 96 97 def test_schema_collection_add(self): 98 metadata = MetaData() 99 100 Table('t1', metadata, Column('x', Integer), schema='foo') 101 Table('t2', metadata, Column('x', Integer), schema='bar') 102 Table('t3', metadata, Column('x', Integer)) 103 104 eq_(metadata._schemas, set(['foo', 'bar'])) 105 eq_(len(metadata.tables), 3) 106 107 def test_schema_collection_remove(self): 108 metadata = MetaData() 109 110 t1 = Table('t1', metadata, Column('x', Integer), schema='foo') 111 Table('t2', metadata, Column('x', Integer), schema='bar') 112 t3 = Table('t3', metadata, Column('x', Integer), schema='bar') 113 114 metadata.remove(t3) 115 eq_(metadata._schemas, set(['foo', 'bar'])) 116 eq_(len(metadata.tables), 2) 117 118 metadata.remove(t1) 119 eq_(metadata._schemas, set(['bar'])) 120 eq_(len(metadata.tables), 1) 121 122 def test_schema_collection_remove_all(self): 123 metadata = MetaData() 124 125 Table('t1', metadata, Column('x', Integer), schema='foo') 126 Table('t2', metadata, Column('x', Integer), schema='bar') 127 128 metadata.clear() 129 eq_(metadata._schemas, set()) 130 eq_(len(metadata.tables), 0) 131 132 def test_metadata_tables_immutable(self): 133 metadata = MetaData() 134 135 Table('t1', metadata, Column('x', Integer)) 136 assert 't1' in metadata.tables 137 138 assert_raises( 139 TypeError, 140 lambda: metadata.tables.pop('t1') 141 ) 142 143 @testing.provide_metadata 144 def test_dupe_tables(self): 145 metadata = self.metadata 146 Table('table1', metadata, 147 Column('col1', Integer, primary_key=True), 148 Column('col2', String(20))) 149 150 metadata.create_all() 151 Table('table1', metadata, autoload=True) 152 153 def go(): 154 Table('table1', metadata, 155 Column('col1', Integer, primary_key=True), 156 Column('col2', String(20))) 157 assert_raises_message( 158 tsa.exc.InvalidRequestError, 159 "Table 'table1' is already defined for this " 160 "MetaData instance. Specify 'extend_existing=True' " 161 "to redefine options and columns on an existing " 162 "Table object.", 163 go 164 ) 165 166 def test_fk_copy(self): 167 c1 = Column('foo', Integer) 168 c2 = Column('bar', Integer) 169 m = MetaData() 170 t1 = Table('t', m, c1, c2) 171 172 kw = dict(onupdate="X", 173 ondelete="Y", use_alter=True, name='f1', 174 deferrable="Z", initially="Q", link_to_name=True) 175 176 fk1 = ForeignKey(c1, **kw) 177 fk2 = ForeignKeyConstraint((c1,), (c2,), **kw) 178 179 t1.append_constraint(fk2) 180 fk1c = fk1.copy() 181 fk2c = fk2.copy() 182 183 for k in kw: 184 eq_(getattr(fk1c, k), kw[k]) 185 eq_(getattr(fk2c, k), kw[k]) 186 187 def test_check_constraint_copy(self): 188 def r(x): return x 189 c = CheckConstraint("foo bar", 190 name='name', 191 initially=True, 192 deferrable=True, 193 _create_rule=r) 194 c2 = c.copy() 195 eq_(c2.name, 'name') 196 eq_(str(c2.sqltext), "foo bar") 197 eq_(c2.initially, True) 198 eq_(c2.deferrable, True) 199 assert c2._create_rule is r 200 201 def test_col_replace_w_constraint(self): 202 m = MetaData() 203 a = Table('a', m, Column('id', Integer, primary_key=True)) 204 205 aid = Column('a_id', ForeignKey('a.id')) 206 b = Table('b', m, aid) 207 b.append_column(aid) 208 209 assert b.c.a_id.references(a.c.id) 210 eq_(len(b.constraints), 2) 211 212 def test_fk_construct(self): 213 c1 = Column('foo', Integer) 214 c2 = Column('bar', Integer) 215 m = MetaData() 216 t1 = Table('t', m, c1, c2) 217 fk1 = ForeignKeyConstraint(('foo', ), ('bar', ), table=t1) 218 assert fk1 in t1.constraints 219 220 def test_fk_constraint_col_collection_w_table(self): 221 c1 = Column('foo', Integer) 222 c2 = Column('bar', Integer) 223 m = MetaData() 224 t1 = Table('t', m, c1, c2) 225 fk1 = ForeignKeyConstraint(('foo', ), ('bar', ), table=t1) 226 eq_(dict(fk1.columns), {"foo": c1}) 227 228 def test_fk_constraint_col_collection_no_table(self): 229 fk1 = ForeignKeyConstraint(('foo', 'bat'), ('bar', 'hoho')) 230 eq_(dict(fk1.columns), {}) 231 eq_(fk1.column_keys, ['foo', 'bat']) 232 eq_(fk1._col_description, 'foo, bat') 233 eq_(fk1._elements, {"foo": fk1.elements[0], "bat": fk1.elements[1]}) 234 235 def test_fk_constraint_col_collection_no_table_real_cols(self): 236 c1 = Column('foo', Integer) 237 c2 = Column('bar', Integer) 238 fk1 = ForeignKeyConstraint((c1, ), (c2, )) 239 eq_(dict(fk1.columns), {}) 240 eq_(fk1.column_keys, ['foo']) 241 eq_(fk1._col_description, 'foo') 242 eq_(fk1._elements, {"foo": fk1.elements[0]}) 243 244 def test_fk_constraint_col_collection_added_to_table(self): 245 c1 = Column('foo', Integer) 246 m = MetaData() 247 fk1 = ForeignKeyConstraint(('foo', ), ('bar', )) 248 Table('t', m, c1, fk1) 249 eq_(dict(fk1.columns), {"foo": c1}) 250 eq_(fk1._elements, {"foo": fk1.elements[0]}) 251 252 def test_fk_constraint_col_collection_via_fk(self): 253 fk = ForeignKey('bar') 254 c1 = Column('foo', Integer, fk) 255 m = MetaData() 256 t1 = Table('t', m, c1) 257 fk1 = fk.constraint 258 eq_(fk1.column_keys, ['foo']) 259 assert fk1 in t1.constraints 260 eq_(fk1.column_keys, ['foo']) 261 eq_(dict(fk1.columns), {"foo": c1}) 262 eq_(fk1._elements, {"foo": fk}) 263 264 def test_fk_no_such_parent_col_error(self): 265 meta = MetaData() 266 a = Table('a', meta, Column('a', Integer)) 267 Table('b', meta, Column('b', Integer)) 268 269 def go(): 270 a.append_constraint( 271 ForeignKeyConstraint(['x'], ['b.b']) 272 ) 273 assert_raises_message( 274 exc.ArgumentError, 275 "Can't create ForeignKeyConstraint on " 276 "table 'a': no column named 'x' is present.", 277 go 278 ) 279 280 def test_fk_given_non_col(self): 281 not_a_col = bindparam('x') 282 assert_raises_message( 283 exc.ArgumentError, 284 "String, Column, or Column-bound argument expected, got Bind", 285 ForeignKey, not_a_col 286 ) 287 288 def test_fk_given_non_col_clauseelem(self): 289 class Foo(object): 290 291 def __clause_element__(self): 292 return bindparam('x') 293 assert_raises_message( 294 exc.ArgumentError, 295 "String, Column, or Column-bound argument expected, got Bind", 296 ForeignKey, Foo() 297 ) 298 299 def test_fk_given_col_non_table(self): 300 t = Table('t', MetaData(), Column('x', Integer)) 301 xa = t.alias().c.x 302 assert_raises_message( 303 exc.ArgumentError, 304 "ForeignKey received Column not bound to a Table, got: .*Alias", 305 ForeignKey, xa 306 ) 307 308 def test_fk_given_col_non_table_clauseelem(self): 309 t = Table('t', MetaData(), Column('x', Integer)) 310 311 class Foo(object): 312 313 def __clause_element__(self): 314 return t.alias().c.x 315 316 assert_raises_message( 317 exc.ArgumentError, 318 "ForeignKey received Column not bound to a Table, got: .*Alias", 319 ForeignKey, Foo() 320 ) 321 322 def test_fk_no_such_target_col_error_upfront(self): 323 meta = MetaData() 324 a = Table('a', meta, Column('a', Integer)) 325 Table('b', meta, Column('b', Integer)) 326 327 a.append_constraint(ForeignKeyConstraint(['a'], ['b.x'])) 328 329 assert_raises_message( 330 exc.NoReferencedColumnError, 331 "Could not initialize target column for ForeignKey 'b.x' on " 332 "table 'a': table 'b' has no column named 'x'", 333 getattr, list(a.foreign_keys)[0], "column" 334 ) 335 336 def test_fk_no_such_target_col_error_delayed(self): 337 meta = MetaData() 338 a = Table('a', meta, Column('a', Integer)) 339 a.append_constraint( 340 ForeignKeyConstraint(['a'], ['b.x'])) 341 342 b = Table('b', meta, Column('b', Integer)) 343 344 assert_raises_message( 345 exc.NoReferencedColumnError, 346 "Could not initialize target column for ForeignKey 'b.x' on " 347 "table 'a': table 'b' has no column named 'x'", 348 getattr, list(a.foreign_keys)[0], "column" 349 ) 350 351 def test_fk_mismatched_local_remote_cols(self): 352 353 assert_raises_message( 354 exc.ArgumentError, 355 "ForeignKeyConstraint number of constrained columns must " 356 "match the number of referenced columns.", 357 ForeignKeyConstraint, ['a'], ['b.a', 'b.b'] 358 ) 359 360 assert_raises_message( 361 exc.ArgumentError, 362 "ForeignKeyConstraint number of constrained columns " 363 "must match the number of referenced columns.", 364 ForeignKeyConstraint, ['a', 'b'], ['b.a'] 365 ) 366 367 assert_raises_message( 368 exc.ArgumentError, 369 "ForeignKeyConstraint with duplicate source column " 370 "references are not supported.", 371 ForeignKeyConstraint, ['a', 'a'], ['b.a', 'b.b'] 372 ) 373 374 def test_pickle_metadata_sequence_restated(self): 375 m1 = MetaData() 376 Table('a', m1, 377 Column('id', Integer, primary_key=True), 378 Column('x', Integer, Sequence("x_seq"))) 379 380 m2 = pickle.loads(pickle.dumps(m1)) 381 382 s2 = Sequence("x_seq") 383 t2 = Table('a', m2, 384 Column('id', Integer, primary_key=True), 385 Column('x', Integer, s2), 386 extend_existing=True) 387 388 assert m2._sequences['x_seq'] is t2.c.x.default 389 assert m2._sequences['x_seq'] is s2 390 391 def test_sequence_restated_replaced(self): 392 """Test restatement of Sequence replaces.""" 393 394 m1 = MetaData() 395 s1 = Sequence("x_seq") 396 t = Table('a', m1, 397 Column('x', Integer, s1) 398 ) 399 assert m1._sequences['x_seq'] is s1 400 401 s2 = Sequence('x_seq') 402 Table('a', m1, 403 Column('x', Integer, s2), 404 extend_existing=True 405 ) 406 assert t.c.x.default is s2 407 assert m1._sequences['x_seq'] is s2 408 409 def test_sequence_attach_to_table(self): 410 m1 = MetaData() 411 s1 = Sequence("s") 412 t = Table('a', m1, Column('x', Integer, s1)) 413 assert s1.metadata is m1 414 415 def test_sequence_attach_to_existing_table(self): 416 m1 = MetaData() 417 s1 = Sequence("s") 418 t = Table('a', m1, Column('x', Integer)) 419 t.c.x._init_items(s1) 420 assert s1.metadata is m1 421 422 def test_pickle_metadata_sequence_implicit(self): 423 m1 = MetaData() 424 Table('a', m1, 425 Column('id', Integer, primary_key=True), 426 Column('x', Integer, Sequence("x_seq"))) 427 428 m2 = pickle.loads(pickle.dumps(m1)) 429 430 t2 = Table('a', m2, extend_existing=True) 431 432 eq_(m2._sequences, {'x_seq': t2.c.x.default}) 433 434 def test_pickle_metadata_schema(self): 435 m1 = MetaData() 436 Table('a', m1, 437 Column('id', Integer, primary_key=True), 438 Column('x', Integer, Sequence("x_seq")), 439 schema='y') 440 441 m2 = pickle.loads(pickle.dumps(m1)) 442 443 Table('a', m2, schema='y', 444 extend_existing=True) 445 446 eq_(m2._schemas, m1._schemas) 447 448 def test_metadata_schema_arg(self): 449 m1 = MetaData(schema='sch1') 450 m2 = MetaData(schema='sch1', quote_schema=True) 451 m3 = MetaData(schema='sch1', quote_schema=False) 452 m4 = MetaData() 453 454 for i, (name, metadata, schema, quote_schema, 455 exp_schema, exp_quote_schema) in enumerate([ 456 ('t1', m1, None, None, 'sch1', None), 457 ('t2', m1, 'sch2', None, 'sch2', None), 458 ('t3', m1, 'sch2', True, 'sch2', True), 459 ('t4', m1, 'sch1', None, 'sch1', None), 460 ('t5', m1, BLANK_SCHEMA, None, None, None), 461 ('t1', m2, None, None, 'sch1', True), 462 ('t2', m2, 'sch2', None, 'sch2', None), 463 ('t3', m2, 'sch2', True, 'sch2', True), 464 ('t4', m2, 'sch1', None, 'sch1', None), 465 ('t1', m3, None, None, 'sch1', False), 466 ('t2', m3, 'sch2', None, 'sch2', None), 467 ('t3', m3, 'sch2', True, 'sch2', True), 468 ('t4', m3, 'sch1', None, 'sch1', None), 469 ('t1', m4, None, None, None, None), 470 ('t2', m4, 'sch2', None, 'sch2', None), 471 ('t3', m4, 'sch2', True, 'sch2', True), 472 ('t4', m4, 'sch1', None, 'sch1', None), 473 ('t5', m4, BLANK_SCHEMA, None, None, None), 474 ]): 475 kw = {} 476 if schema is not None: 477 kw['schema'] = schema 478 if quote_schema is not None: 479 kw['quote_schema'] = quote_schema 480 t = Table(name, metadata, **kw) 481 eq_(t.schema, exp_schema, "test %d, table schema" % i) 482 eq_(t.schema.quote if t.schema is not None else None, 483 exp_quote_schema, 484 "test %d, table quote_schema" % i) 485 seq = Sequence(name, metadata=metadata, **kw) 486 eq_(seq.schema, exp_schema, "test %d, seq schema" % i) 487 eq_(seq.schema.quote if seq.schema is not None else None, 488 exp_quote_schema, 489 "test %d, seq quote_schema" % i) 490 491 def test_manual_dependencies(self): 492 meta = MetaData() 493 a = Table('a', meta, Column('foo', Integer)) 494 b = Table('b', meta, Column('foo', Integer)) 495 c = Table('c', meta, Column('foo', Integer)) 496 d = Table('d', meta, Column('foo', Integer)) 497 e = Table('e', meta, Column('foo', Integer)) 498 499 e.add_is_dependent_on(c) 500 a.add_is_dependent_on(b) 501 b.add_is_dependent_on(d) 502 e.add_is_dependent_on(b) 503 c.add_is_dependent_on(a) 504 eq_( 505 meta.sorted_tables, 506 [d, b, a, c, e] 507 ) 508 509 def test_deterministic_order(self): 510 meta = MetaData() 511 a = Table('a', meta, Column('foo', Integer)) 512 b = Table('b', meta, Column('foo', Integer)) 513 c = Table('c', meta, Column('foo', Integer)) 514 d = Table('d', meta, Column('foo', Integer)) 515 e = Table('e', meta, Column('foo', Integer)) 516 517 e.add_is_dependent_on(c) 518 a.add_is_dependent_on(b) 519 eq_( 520 meta.sorted_tables, 521 [b, c, d, a, e] 522 ) 523 524 def test_nonexistent(self): 525 assert_raises(tsa.exc.NoSuchTableError, Table, 526 'fake_table', 527 MetaData(testing.db), autoload=True) 528 529 def test_assorted_repr(self): 530 t1 = Table("foo", MetaData(), Column("x", Integer)) 531 i1 = Index("bar", t1.c.x) 532 ck = schema.CheckConstraint("x > y", name="someconstraint") 533 534 for const, exp in ( 535 (Sequence("my_seq"), 536 "Sequence('my_seq')"), 537 (Sequence("my_seq", start=5), 538 "Sequence('my_seq', start=5)"), 539 (Column("foo", Integer), 540 "Column('foo', Integer(), table=None)"), 541 (Table("bar", MetaData(), Column("x", String)), 542 "Table('bar', MetaData(bind=None), " 543 "Column('x', String(), table=<bar>), schema=None)"), 544 (schema.DefaultGenerator(for_update=True), 545 "DefaultGenerator(for_update=True)"), 546 (schema.Index("bar", "c"), "Index('bar', 'c')"), 547 (i1, "Index('bar', Column('x', Integer(), table=<foo>))"), 548 (schema.FetchedValue(), "FetchedValue()"), 549 (ck, 550 "CheckConstraint(" 551 "%s" 552 ", name='someconstraint')" % repr(ck.sqltext)), 553 (ColumnDefault(('foo', 'bar')), "ColumnDefault(('foo', 'bar'))") 554 ): 555 eq_( 556 repr(const), 557 exp 558 ) 559 560 561class ToMetaDataTest(fixtures.TestBase, ComparesTables): 562 563 @testing.requires.check_constraints 564 def test_copy(self): 565 from sqlalchemy.testing.schema import Table 566 meta = MetaData() 567 568 table = Table( 569 'mytable', 570 meta, 571 Column( 572 'myid', 573 Integer, 574 Sequence('foo_id_seq'), 575 primary_key=True), 576 Column( 577 'name', 578 String(40), 579 nullable=True), 580 Column( 581 'foo', 582 String(40), 583 nullable=False, 584 server_default='x', 585 server_onupdate='q'), 586 Column( 587 'bar', 588 String(40), 589 nullable=False, 590 default='y', 591 onupdate='z'), 592 Column( 593 'description', 594 String(30), 595 CheckConstraint("description='hi'")), 596 UniqueConstraint('name'), 597 test_needs_fk=True) 598 599 table2 = Table( 600 'othertable', 601 meta, 602 Column( 603 'id', 604 Integer, 605 Sequence('foo_seq'), 606 primary_key=True), 607 Column( 608 'myid', 609 Integer, 610 ForeignKey('mytable.myid'), 611 ), 612 test_needs_fk=True) 613 614 def test_to_metadata(): 615 meta2 = MetaData() 616 table_c = table.tometadata(meta2) 617 table2_c = table2.tometadata(meta2) 618 return (table_c, table2_c) 619 620 def test_pickle(): 621 meta.bind = testing.db 622 meta2 = pickle.loads(pickle.dumps(meta)) 623 assert meta2.bind is None 624 pickle.loads(pickle.dumps(meta2)) 625 return (meta2.tables['mytable'], meta2.tables['othertable']) 626 627 def test_pickle_via_reflect(): 628 # this is the most common use case, pickling the results of a 629 # database reflection 630 meta2 = MetaData(bind=testing.db) 631 t1 = Table('mytable', meta2, autoload=True) 632 Table('othertable', meta2, autoload=True) 633 meta3 = pickle.loads(pickle.dumps(meta2)) 634 assert meta3.bind is None 635 assert meta3.tables['mytable'] is not t1 636 637 return (meta3.tables['mytable'], meta3.tables['othertable']) 638 639 meta.create_all(testing.db) 640 try: 641 for test, has_constraints, reflect in \ 642 (test_to_metadata, True, False), \ 643 (test_pickle, True, False), \ 644 (test_pickle_via_reflect, False, True): 645 table_c, table2_c = test() 646 self.assert_tables_equal(table, table_c) 647 self.assert_tables_equal(table2, table2_c) 648 assert table is not table_c 649 assert table.primary_key is not table_c.primary_key 650 assert list(table2_c.c.myid.foreign_keys)[0].column \ 651 is table_c.c.myid 652 assert list(table2_c.c.myid.foreign_keys)[0].column \ 653 is not table.c.myid 654 assert 'x' in str(table_c.c.foo.server_default.arg) 655 if not reflect: 656 assert isinstance(table_c.c.myid.default, Sequence) 657 assert str(table_c.c.foo.server_onupdate.arg) == 'q' 658 assert str(table_c.c.bar.default.arg) == 'y' 659 assert getattr(table_c.c.bar.onupdate.arg, 'arg', 660 table_c.c.bar.onupdate.arg) == 'z' 661 assert isinstance(table2_c.c.id.default, Sequence) 662 663 # constraints don't get reflected for any dialect right 664 # now 665 666 if has_constraints: 667 for c in table_c.c.description.constraints: 668 if isinstance(c, CheckConstraint): 669 break 670 else: 671 assert False 672 assert str(c.sqltext) == "description='hi'" 673 for c in table_c.constraints: 674 if isinstance(c, UniqueConstraint): 675 break 676 else: 677 assert False 678 assert c.columns.contains_column(table_c.c.name) 679 assert not c.columns.contains_column(table.c.name) 680 681 finally: 682 meta.drop_all(testing.db) 683 684 def test_col_key_fk_parent(self): 685 # test #2643 686 m1 = MetaData() 687 a = Table('a', m1, Column('x', Integer)) 688 b = Table('b', m1, Column('x', Integer, ForeignKey('a.x'), key='y')) 689 assert b.c.y.references(a.c.x) 690 691 m2 = MetaData() 692 b2 = b.tometadata(m2) 693 a2 = a.tometadata(m2) 694 assert b2.c.y.references(a2.c.x) 695 696 def test_change_schema(self): 697 meta = MetaData() 698 699 table = Table('mytable', meta, 700 Column('myid', Integer, primary_key=True), 701 Column('name', String(40), nullable=True), 702 Column('description', String(30), 703 CheckConstraint("description='hi'")), 704 UniqueConstraint('name'), 705 ) 706 707 table2 = Table('othertable', meta, 708 Column('id', Integer, primary_key=True), 709 Column('myid', Integer, ForeignKey('mytable.myid')), 710 ) 711 712 meta2 = MetaData() 713 table_c = table.tometadata(meta2, schema='someschema') 714 table2_c = table2.tometadata(meta2, schema='someschema') 715 716 eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid 717 == table2_c.c.myid)) 718 eq_(str(table_c.join(table2_c).onclause), 719 'someschema.mytable.myid = someschema.othertable.myid') 720 721 def test_retain_table_schema(self): 722 meta = MetaData() 723 724 table = Table('mytable', meta, 725 Column('myid', Integer, primary_key=True), 726 Column('name', String(40), nullable=True), 727 Column('description', String(30), 728 CheckConstraint("description='hi'")), 729 UniqueConstraint('name'), 730 schema='myschema', 731 ) 732 733 table2 = Table( 734 'othertable', 735 meta, 736 Column( 737 'id', 738 Integer, 739 primary_key=True), 740 Column( 741 'myid', 742 Integer, 743 ForeignKey('myschema.mytable.myid')), 744 schema='myschema', 745 ) 746 747 meta2 = MetaData() 748 table_c = table.tometadata(meta2) 749 table2_c = table2.tometadata(meta2) 750 751 eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid 752 == table2_c.c.myid)) 753 eq_(str(table_c.join(table2_c).onclause), 754 'myschema.mytable.myid = myschema.othertable.myid') 755 756 def test_change_name_retain_metadata(self): 757 meta = MetaData() 758 759 table = Table('mytable', meta, 760 Column('myid', Integer, primary_key=True), 761 Column('name', String(40), nullable=True), 762 Column('description', String(30), 763 CheckConstraint("description='hi'")), 764 UniqueConstraint('name'), 765 schema='myschema', 766 ) 767 768 table2 = table.tometadata(table.metadata, name='newtable') 769 table3 = table.tometadata(table.metadata, schema='newschema', 770 name='newtable') 771 772 assert table.metadata is table2.metadata 773 assert table.metadata is table3.metadata 774 eq_((table.name, table2.name, table3.name), 775 ('mytable', 'newtable', 'newtable')) 776 eq_((table.key, table2.key, table3.key), 777 ('myschema.mytable', 'myschema.newtable', 'newschema.newtable')) 778 779 def test_change_name_change_metadata(self): 780 meta = MetaData() 781 meta2 = MetaData() 782 783 table = Table('mytable', meta, 784 Column('myid', Integer, primary_key=True), 785 Column('name', String(40), nullable=True), 786 Column('description', String(30), 787 CheckConstraint("description='hi'")), 788 UniqueConstraint('name'), 789 schema='myschema', 790 ) 791 792 table2 = table.tometadata(meta2, name='newtable') 793 794 assert table.metadata is not table2.metadata 795 eq_((table.name, table2.name), 796 ('mytable', 'newtable')) 797 eq_((table.key, table2.key), 798 ('myschema.mytable', 'myschema.newtable')) 799 800 def test_change_name_selfref_fk_moves(self): 801 meta = MetaData() 802 803 referenced = Table('ref', meta, 804 Column('id', Integer, primary_key=True), 805 ) 806 table = Table('mytable', meta, 807 Column('id', Integer, primary_key=True), 808 Column('parent_id', ForeignKey('mytable.id')), 809 Column('ref_id', ForeignKey('ref.id')) 810 ) 811 812 table2 = table.tometadata(table.metadata, name='newtable') 813 assert table.metadata is table2.metadata 814 assert table2.c.ref_id.references(referenced.c.id) 815 assert table2.c.parent_id.references(table2.c.id) 816 817 def test_change_name_selfref_fk_moves_w_schema(self): 818 meta = MetaData() 819 820 referenced = Table('ref', meta, 821 Column('id', Integer, primary_key=True), 822 ) 823 table = Table('mytable', meta, 824 Column('id', Integer, primary_key=True), 825 Column('parent_id', ForeignKey('mytable.id')), 826 Column('ref_id', ForeignKey('ref.id')) 827 ) 828 829 table2 = table.tometadata( 830 table.metadata, name='newtable', schema='newschema') 831 ref2 = referenced.tometadata(table.metadata, schema='newschema') 832 assert table.metadata is table2.metadata 833 assert table2.c.ref_id.references(ref2.c.id) 834 assert table2.c.parent_id.references(table2.c.id) 835 836 def _assert_fk(self, t2, schema, expected, referred_schema_fn=None): 837 m2 = MetaData() 838 existing_schema = t2.schema 839 if schema: 840 t2c = t2.tometadata(m2, schema=schema, 841 referred_schema_fn=referred_schema_fn) 842 eq_(t2c.schema, schema) 843 else: 844 t2c = t2.tometadata(m2, referred_schema_fn=referred_schema_fn) 845 eq_(t2c.schema, existing_schema) 846 eq_(list(t2c.c.y.foreign_keys)[0]._get_colspec(), expected) 847 848 def test_fk_has_schema_string_retain_schema(self): 849 m = MetaData() 850 851 t2 = Table('t2', m, Column('y', Integer, ForeignKey('q.t1.x'))) 852 self._assert_fk(t2, None, "q.t1.x") 853 854 Table('t1', m, Column('x', Integer), schema='q') 855 self._assert_fk(t2, None, "q.t1.x") 856 857 def test_fk_has_schema_string_new_schema(self): 858 m = MetaData() 859 860 t2 = Table('t2', m, Column('y', Integer, ForeignKey('q.t1.x'))) 861 self._assert_fk(t2, "z", "q.t1.x") 862 863 Table('t1', m, Column('x', Integer), schema='q') 864 self._assert_fk(t2, "z", "q.t1.x") 865 866 def test_fk_has_schema_col_retain_schema(self): 867 m = MetaData() 868 869 t1 = Table('t1', m, Column('x', Integer), schema='q') 870 t2 = Table('t2', m, Column('y', Integer, ForeignKey(t1.c.x))) 871 872 self._assert_fk(t2, "z", "q.t1.x") 873 874 def test_fk_has_schema_col_new_schema(self): 875 m = MetaData() 876 877 t1 = Table('t1', m, Column('x', Integer), schema='q') 878 t2 = Table('t2', m, Column('y', Integer, ForeignKey(t1.c.x))) 879 880 self._assert_fk(t2, "z", "q.t1.x") 881 882 def test_fk_and_referent_has_same_schema_string_retain_schema(self): 883 m = MetaData() 884 885 t2 = Table('t2', m, Column('y', Integer, 886 ForeignKey('q.t1.x')), schema="q") 887 888 self._assert_fk(t2, None, "q.t1.x") 889 890 Table('t1', m, Column('x', Integer), schema='q') 891 self._assert_fk(t2, None, "q.t1.x") 892 893 def test_fk_and_referent_has_same_schema_string_new_schema(self): 894 m = MetaData() 895 896 t2 = Table('t2', m, Column('y', Integer, 897 ForeignKey('q.t1.x')), schema="q") 898 899 self._assert_fk(t2, "z", "z.t1.x") 900 901 Table('t1', m, Column('x', Integer), schema='q') 902 self._assert_fk(t2, "z", "z.t1.x") 903 904 def test_fk_and_referent_has_same_schema_col_retain_schema(self): 905 m = MetaData() 906 907 t1 = Table('t1', m, Column('x', Integer), schema='q') 908 t2 = Table('t2', m, Column('y', Integer, 909 ForeignKey(t1.c.x)), schema='q') 910 self._assert_fk(t2, None, "q.t1.x") 911 912 def test_fk_and_referent_has_same_schema_col_new_schema(self): 913 m = MetaData() 914 915 t1 = Table('t1', m, Column('x', Integer), schema='q') 916 t2 = Table('t2', m, Column('y', Integer, 917 ForeignKey(t1.c.x)), schema='q') 918 self._assert_fk(t2, 'z', "z.t1.x") 919 920 def test_fk_and_referent_has_diff_schema_string_retain_schema(self): 921 m = MetaData() 922 923 t2 = Table('t2', m, Column('y', Integer, 924 ForeignKey('p.t1.x')), schema="q") 925 926 self._assert_fk(t2, None, "p.t1.x") 927 928 Table('t1', m, Column('x', Integer), schema='p') 929 self._assert_fk(t2, None, "p.t1.x") 930 931 def test_fk_and_referent_has_diff_schema_string_new_schema(self): 932 m = MetaData() 933 934 t2 = Table('t2', m, Column('y', Integer, 935 ForeignKey('p.t1.x')), schema="q") 936 937 self._assert_fk(t2, "z", "p.t1.x") 938 939 Table('t1', m, Column('x', Integer), schema='p') 940 self._assert_fk(t2, "z", "p.t1.x") 941 942 def test_fk_and_referent_has_diff_schema_col_retain_schema(self): 943 m = MetaData() 944 945 t1 = Table('t1', m, Column('x', Integer), schema='p') 946 t2 = Table('t2', m, Column('y', Integer, 947 ForeignKey(t1.c.x)), schema='q') 948 self._assert_fk(t2, None, "p.t1.x") 949 950 def test_fk_and_referent_has_diff_schema_col_new_schema(self): 951 m = MetaData() 952 953 t1 = Table('t1', m, Column('x', Integer), schema='p') 954 t2 = Table('t2', m, Column('y', Integer, 955 ForeignKey(t1.c.x)), schema='q') 956 self._assert_fk(t2, 'z', "p.t1.x") 957 958 def test_fk_custom_system(self): 959 m = MetaData() 960 t2 = Table('t2', m, Column('y', Integer, 961 ForeignKey('p.t1.x')), schema='q') 962 963 def ref_fn(table, to_schema, constraint, referred_schema): 964 assert table is t2 965 eq_(to_schema, "z") 966 eq_(referred_schema, "p") 967 return "h" 968 self._assert_fk(t2, 'z', "h.t1.x", referred_schema_fn=ref_fn) 969 970 def test_copy_info(self): 971 m = MetaData() 972 fk = ForeignKey('t2.id') 973 c = Column('c', Integer, fk) 974 ck = CheckConstraint('c > 5') 975 t = Table('t', m, c, ck) 976 977 m.info['minfo'] = True 978 fk.info['fkinfo'] = True 979 c.info['cinfo'] = True 980 ck.info['ckinfo'] = True 981 t.info['tinfo'] = True 982 t.primary_key.info['pkinfo'] = True 983 fkc = [const for const in t.constraints if 984 isinstance(const, ForeignKeyConstraint)][0] 985 fkc.info['fkcinfo'] = True 986 987 m2 = MetaData() 988 t2 = t.tometadata(m2) 989 990 m.info['minfo'] = False 991 fk.info['fkinfo'] = False 992 c.info['cinfo'] = False 993 ck.info['ckinfo'] = False 994 t.primary_key.info['pkinfo'] = False 995 fkc.info['fkcinfo'] = False 996 997 eq_(m2.info, {}) 998 eq_(t2.info, {"tinfo": True}) 999 eq_(t2.c.c.info, {"cinfo": True}) 1000 eq_(list(t2.c.c.foreign_keys)[0].info, {"fkinfo": True}) 1001 eq_(t2.primary_key.info, {"pkinfo": True}) 1002 1003 fkc2 = [const for const in t2.constraints 1004 if isinstance(const, ForeignKeyConstraint)][0] 1005 eq_(fkc2.info, {"fkcinfo": True}) 1006 1007 ck2 = [const for const in 1008 t2.constraints if isinstance(const, CheckConstraint)][0] 1009 eq_(ck2.info, {"ckinfo": True}) 1010 1011 def test_dialect_kwargs(self): 1012 meta = MetaData() 1013 1014 table = Table('mytable', meta, 1015 Column('myid', Integer, primary_key=True), 1016 mysql_engine='InnoDB', 1017 ) 1018 1019 meta2 = MetaData() 1020 table_c = table.tometadata(meta2) 1021 1022 eq_(table.kwargs, {"mysql_engine": "InnoDB"}) 1023 1024 eq_(table.kwargs, table_c.kwargs) 1025 1026 def test_indexes(self): 1027 meta = MetaData() 1028 1029 table = Table('mytable', meta, 1030 Column('id', Integer, primary_key=True), 1031 Column('data1', Integer, index=True), 1032 Column('data2', Integer), 1033 ) 1034 Index('multi', table.c.data1, table.c.data2), 1035 1036 meta2 = MetaData() 1037 table_c = table.tometadata(meta2) 1038 1039 def _get_key(i): 1040 return [i.name, i.unique] + \ 1041 sorted(i.kwargs.items()) + \ 1042 list(i.columns.keys()) 1043 1044 eq_( 1045 sorted([_get_key(i) for i in table.indexes]), 1046 sorted([_get_key(i) for i in table_c.indexes]) 1047 ) 1048 1049 @emits_warning("Table '.+' already exists within the given MetaData") 1050 def test_already_exists(self): 1051 1052 meta1 = MetaData() 1053 table1 = Table('mytable', meta1, 1054 Column('myid', Integer, primary_key=True), 1055 ) 1056 meta2 = MetaData() 1057 table2 = Table('mytable', meta2, 1058 Column('yourid', Integer, primary_key=True), 1059 ) 1060 1061 table_c = table1.tometadata(meta2) 1062 table_d = table2.tometadata(meta2) 1063 1064 # d'oh! 1065 assert table_c is table_d 1066 1067 def test_default_schema_metadata(self): 1068 meta = MetaData(schema='myschema') 1069 1070 table = Table( 1071 'mytable', 1072 meta, 1073 Column( 1074 'myid', 1075 Integer, 1076 primary_key=True), 1077 Column( 1078 'name', 1079 String(40), 1080 nullable=True), 1081 Column( 1082 'description', 1083 String(30), 1084 CheckConstraint("description='hi'")), 1085 UniqueConstraint('name'), 1086 ) 1087 1088 table2 = Table( 1089 'othertable', meta, Column( 1090 'id', Integer, primary_key=True), Column( 1091 'myid', Integer, ForeignKey('myschema.mytable.myid')), ) 1092 1093 meta2 = MetaData(schema='someschema') 1094 table_c = table.tometadata(meta2, schema=None) 1095 table2_c = table2.tometadata(meta2, schema=None) 1096 1097 eq_(str(table_c.join(table2_c).onclause), 1098 str(table_c.c.myid == table2_c.c.myid)) 1099 eq_(str(table_c.join(table2_c).onclause), 1100 "someschema.mytable.myid = someschema.othertable.myid") 1101 1102 def test_strip_schema(self): 1103 meta = MetaData() 1104 1105 table = Table('mytable', meta, 1106 Column('myid', Integer, primary_key=True), 1107 Column('name', String(40), nullable=True), 1108 Column('description', String(30), 1109 CheckConstraint("description='hi'")), 1110 UniqueConstraint('name'), 1111 ) 1112 1113 table2 = Table('othertable', meta, 1114 Column('id', Integer, primary_key=True), 1115 Column('myid', Integer, ForeignKey('mytable.myid')), 1116 ) 1117 1118 meta2 = MetaData() 1119 table_c = table.tometadata(meta2, schema=None) 1120 table2_c = table2.tometadata(meta2, schema=None) 1121 1122 eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid 1123 == table2_c.c.myid)) 1124 eq_(str(table_c.join(table2_c).onclause), 1125 'mytable.myid = othertable.myid') 1126 1127 def test_unique_true_flag(self): 1128 meta = MetaData() 1129 1130 table = Table('mytable', meta, Column('x', Integer, unique=True)) 1131 1132 m2 = MetaData() 1133 1134 t2 = table.tometadata(m2) 1135 1136 eq_( 1137 len([ 1138 const for const 1139 in t2.constraints 1140 if isinstance(const, UniqueConstraint)]), 1141 1 1142 ) 1143 1144 def test_index_true_flag(self): 1145 meta = MetaData() 1146 1147 table = Table('mytable', meta, Column('x', Integer, index=True)) 1148 1149 m2 = MetaData() 1150 1151 t2 = table.tometadata(m2) 1152 1153 eq_(len(t2.indexes), 1) 1154 1155 1156class InfoTest(fixtures.TestBase): 1157 def test_metadata_info(self): 1158 m1 = MetaData() 1159 eq_(m1.info, {}) 1160 1161 m1 = MetaData(info={"foo": "bar"}) 1162 eq_(m1.info, {"foo": "bar"}) 1163 1164 def test_foreignkey_constraint_info(self): 1165 fkc = ForeignKeyConstraint(['a'], ['b'], name='bar') 1166 eq_(fkc.info, {}) 1167 1168 fkc = ForeignKeyConstraint( 1169 ['a'], ['b'], name='bar', info={"foo": "bar"}) 1170 eq_(fkc.info, {"foo": "bar"}) 1171 1172 def test_foreignkey_info(self): 1173 fkc = ForeignKey('a') 1174 eq_(fkc.info, {}) 1175 1176 fkc = ForeignKey('a', info={"foo": "bar"}) 1177 eq_(fkc.info, {"foo": "bar"}) 1178 1179 def test_primarykey_constraint_info(self): 1180 pkc = PrimaryKeyConstraint('a', name='x') 1181 eq_(pkc.info, {}) 1182 1183 pkc = PrimaryKeyConstraint('a', name='x', info={'foo': 'bar'}) 1184 eq_(pkc.info, {'foo': 'bar'}) 1185 1186 def test_unique_constraint_info(self): 1187 uc = UniqueConstraint('a', name='x') 1188 eq_(uc.info, {}) 1189 1190 uc = UniqueConstraint('a', name='x', info={'foo': 'bar'}) 1191 eq_(uc.info, {'foo': 'bar'}) 1192 1193 def test_check_constraint_info(self): 1194 cc = CheckConstraint('foo=bar', name='x') 1195 eq_(cc.info, {}) 1196 1197 cc = CheckConstraint('foo=bar', name='x', info={'foo': 'bar'}) 1198 eq_(cc.info, {'foo': 'bar'}) 1199 1200 def test_index_info(self): 1201 ix = Index('x', 'a') 1202 eq_(ix.info, {}) 1203 1204 ix = Index('x', 'a', info={'foo': 'bar'}) 1205 eq_(ix.info, {'foo': 'bar'}) 1206 1207 def test_column_info(self): 1208 c = Column('x', Integer) 1209 eq_(c.info, {}) 1210 1211 c = Column('x', Integer, info={'foo': 'bar'}) 1212 eq_(c.info, {'foo': 'bar'}) 1213 1214 def test_table_info(self): 1215 t = Table('x', MetaData()) 1216 eq_(t.info, {}) 1217 1218 t = Table('x', MetaData(), info={'foo': 'bar'}) 1219 eq_(t.info, {'foo': 'bar'}) 1220 1221 1222class TableTest(fixtures.TestBase, AssertsCompiledSQL): 1223 1224 @testing.requires.temporary_tables 1225 @testing.skip_if('mssql', 'different col format') 1226 def test_prefixes(self): 1227 from sqlalchemy import Table 1228 table1 = Table("temporary_table_1", MetaData(), 1229 Column("col1", Integer), 1230 prefixes=["TEMPORARY"]) 1231 1232 self.assert_compile( 1233 schema.CreateTable(table1), 1234 "CREATE TEMPORARY TABLE temporary_table_1 (col1 INTEGER)" 1235 ) 1236 1237 table2 = Table("temporary_table_2", MetaData(), 1238 Column("col1", Integer), 1239 prefixes=["VIRTUAL"]) 1240 self.assert_compile( 1241 schema.CreateTable(table2), 1242 "CREATE VIRTUAL TABLE temporary_table_2 (col1 INTEGER)" 1243 ) 1244 1245 def test_table_info(self): 1246 metadata = MetaData() 1247 t1 = Table('foo', metadata, info={'x': 'y'}) 1248 t2 = Table('bar', metadata, info={}) 1249 t3 = Table('bat', metadata) 1250 assert t1.info == {'x': 'y'} 1251 assert t2.info == {} 1252 assert t3.info == {} 1253 for t in (t1, t2, t3): 1254 t.info['bar'] = 'zip' 1255 assert t.info['bar'] == 'zip' 1256 1257 def test_reset_exported_passes(self): 1258 1259 m = MetaData() 1260 1261 t = Table('t', m, Column('foo', Integer)) 1262 eq_( 1263 list(t.c), [t.c.foo] 1264 ) 1265 1266 t._reset_exported() 1267 1268 eq_( 1269 list(t.c), [t.c.foo] 1270 ) 1271 1272 def test_foreign_key_constraints_collection(self): 1273 metadata = MetaData() 1274 t1 = Table('foo', metadata, Column('a', Integer)) 1275 eq_(t1.foreign_key_constraints, set()) 1276 1277 fk1 = ForeignKey('q.id') 1278 fk2 = ForeignKey('j.id') 1279 fk3 = ForeignKeyConstraint(['b', 'c'], ['r.x', 'r.y']) 1280 1281 t1.append_column(Column('b', Integer, fk1)) 1282 eq_( 1283 t1.foreign_key_constraints, 1284 set([fk1.constraint])) 1285 1286 t1.append_column(Column('c', Integer, fk2)) 1287 eq_( 1288 t1.foreign_key_constraints, 1289 set([fk1.constraint, fk2.constraint])) 1290 1291 t1.append_constraint(fk3) 1292 eq_( 1293 t1.foreign_key_constraints, 1294 set([fk1.constraint, fk2.constraint, fk3])) 1295 1296 def test_c_immutable(self): 1297 m = MetaData() 1298 t1 = Table('t', m, Column('x', Integer), Column('y', Integer)) 1299 assert_raises( 1300 TypeError, 1301 t1.c.extend, [Column('z', Integer)] 1302 ) 1303 1304 def assign(): 1305 t1.c['z'] = Column('z', Integer) 1306 assert_raises( 1307 TypeError, 1308 assign 1309 ) 1310 1311 def assign2(): 1312 t1.c.z = Column('z', Integer) 1313 assert_raises( 1314 TypeError, 1315 assign2 1316 ) 1317 1318 def test_c_mutate_after_unpickle(self): 1319 m = MetaData() 1320 1321 y = Column('y', Integer) 1322 t1 = Table('t', m, Column('x', Integer), y) 1323 1324 t2 = pickle.loads(pickle.dumps(t1)) 1325 z = Column('z', Integer) 1326 g = Column('g', Integer) 1327 t2.append_column(z) 1328 1329 is_(t1.c.contains_column(y), True) 1330 is_(t2.c.contains_column(y), False) 1331 y2 = t2.c.y 1332 is_(t2.c.contains_column(y2), True) 1333 1334 is_(t2.c.contains_column(z), True) 1335 is_(t2.c.contains_column(g), False) 1336 1337 def test_autoincrement_replace(self): 1338 m = MetaData() 1339 1340 t = Table('t', m, 1341 Column('id', Integer, primary_key=True) 1342 ) 1343 1344 is_(t._autoincrement_column, t.c.id) 1345 1346 t = Table('t', m, 1347 Column('id', Integer, primary_key=True), 1348 extend_existing=True 1349 ) 1350 is_(t._autoincrement_column, t.c.id) 1351 1352 def test_pk_args_standalone(self): 1353 m = MetaData() 1354 t = Table('t', m, 1355 Column('x', Integer, primary_key=True), 1356 PrimaryKeyConstraint(mssql_clustered=True) 1357 ) 1358 eq_( 1359 list(t.primary_key), [t.c.x] 1360 ) 1361 eq_( 1362 t.primary_key.dialect_kwargs, {"mssql_clustered": True} 1363 ) 1364 1365 def test_pk_cols_sets_flags(self): 1366 m = MetaData() 1367 t = Table('t', m, 1368 Column('x', Integer), 1369 Column('y', Integer), 1370 Column('z', Integer), 1371 PrimaryKeyConstraint('x', 'y') 1372 ) 1373 eq_(t.c.x.primary_key, True) 1374 eq_(t.c.y.primary_key, True) 1375 eq_(t.c.z.primary_key, False) 1376 1377 def test_pk_col_mismatch_one(self): 1378 m = MetaData() 1379 assert_raises_message( 1380 exc.SAWarning, 1381 "Table 't' specifies columns 'x' as primary_key=True, " 1382 "not matching locally specified columns 'q'", 1383 Table, 't', m, 1384 Column('x', Integer, primary_key=True), 1385 Column('q', Integer), 1386 PrimaryKeyConstraint('q') 1387 ) 1388 1389 def test_pk_col_mismatch_two(self): 1390 m = MetaData() 1391 assert_raises_message( 1392 exc.SAWarning, 1393 "Table 't' specifies columns 'a', 'b', 'c' as primary_key=True, " 1394 "not matching locally specified columns 'b', 'c'", 1395 Table, 't', m, 1396 Column('a', Integer, primary_key=True), 1397 Column('b', Integer, primary_key=True), 1398 Column('c', Integer, primary_key=True), 1399 PrimaryKeyConstraint('b', 'c') 1400 ) 1401 1402 @testing.emits_warning("Table 't'") 1403 def test_pk_col_mismatch_three(self): 1404 m = MetaData() 1405 t = Table('t', m, 1406 Column('x', Integer, primary_key=True), 1407 Column('q', Integer), 1408 PrimaryKeyConstraint('q') 1409 ) 1410 eq_(list(t.primary_key), [t.c.q]) 1411 1412 @testing.emits_warning("Table 't'") 1413 def test_pk_col_mismatch_four(self): 1414 m = MetaData() 1415 t = Table('t', m, 1416 Column('a', Integer, primary_key=True), 1417 Column('b', Integer, primary_key=True), 1418 Column('c', Integer, primary_key=True), 1419 PrimaryKeyConstraint('b', 'c') 1420 ) 1421 eq_(list(t.primary_key), [t.c.b, t.c.c]) 1422 1423 def test_pk_always_flips_nullable(self): 1424 m = MetaData() 1425 1426 t1 = Table('t1', m, Column('x', Integer), PrimaryKeyConstraint('x')) 1427 1428 t2 = Table('t2', m, Column('x', Integer, primary_key=True)) 1429 1430 eq_(list(t1.primary_key), [t1.c.x]) 1431 1432 eq_(list(t2.primary_key), [t2.c.x]) 1433 1434 assert t1.c.x.primary_key 1435 assert t2.c.x.primary_key 1436 1437 assert not t2.c.x.nullable 1438 assert not t1.c.x.nullable 1439 1440 1441class PKAutoIncrementTest(fixtures.TestBase): 1442 def test_multi_integer_no_autoinc(self): 1443 pk = PrimaryKeyConstraint( 1444 Column('a', Integer), 1445 Column('b', Integer) 1446 ) 1447 t = Table('t', MetaData()) 1448 t.append_constraint(pk) 1449 1450 is_(pk._autoincrement_column, None) 1451 1452 def test_multi_integer_multi_autoinc(self): 1453 pk = PrimaryKeyConstraint( 1454 Column('a', Integer, autoincrement=True), 1455 Column('b', Integer, autoincrement=True) 1456 ) 1457 t = Table('t', MetaData()) 1458 t.append_constraint(pk) 1459 1460 assert_raises_message( 1461 exc.ArgumentError, 1462 "Only one Column may be marked", 1463 lambda: pk._autoincrement_column 1464 ) 1465 1466 def test_single_integer_no_autoinc(self): 1467 pk = PrimaryKeyConstraint( 1468 Column('a', Integer), 1469 ) 1470 t = Table('t', MetaData()) 1471 t.append_constraint(pk) 1472 1473 is_(pk._autoincrement_column, pk.columns['a']) 1474 1475 def test_single_string_no_autoinc(self): 1476 pk = PrimaryKeyConstraint( 1477 Column('a', String), 1478 ) 1479 t = Table('t', MetaData()) 1480 t.append_constraint(pk) 1481 1482 is_(pk._autoincrement_column, None) 1483 1484 def test_single_string_illegal_autoinc(self): 1485 t = Table('t', MetaData(), Column('a', String, autoincrement=True)) 1486 pk = PrimaryKeyConstraint( 1487 t.c.a 1488 ) 1489 t.append_constraint(pk) 1490 1491 assert_raises_message( 1492 exc.ArgumentError, 1493 "Column type VARCHAR on column 't.a'", 1494 lambda: pk._autoincrement_column 1495 ) 1496 1497 def test_single_integer_default(self): 1498 t = Table( 1499 't', MetaData(), 1500 Column('a', Integer, autoincrement=True, default=lambda: 1)) 1501 pk = PrimaryKeyConstraint( 1502 t.c.a 1503 ) 1504 t.append_constraint(pk) 1505 1506 is_(pk._autoincrement_column, t.c.a) 1507 1508 def test_single_integer_server_default(self): 1509 # new as of 1.1; now that we have three states for autoincrement, 1510 # if the user puts autoincrement=True with a server_default, trust 1511 # them on it 1512 t = Table( 1513 't', MetaData(), 1514 Column('a', Integer, 1515 autoincrement=True, server_default=func.magic())) 1516 pk = PrimaryKeyConstraint( 1517 t.c.a 1518 ) 1519 t.append_constraint(pk) 1520 1521 is_(pk._autoincrement_column, t.c.a) 1522 1523 def test_implicit_autoinc_but_fks(self): 1524 m = MetaData() 1525 Table('t1', m, Column('id', Integer, primary_key=True)) 1526 t2 = Table( 1527 't2', MetaData(), 1528 Column('a', Integer, ForeignKey('t1.id'))) 1529 pk = PrimaryKeyConstraint( 1530 t2.c.a 1531 ) 1532 t2.append_constraint(pk) 1533 is_(pk._autoincrement_column, None) 1534 1535 def test_explicit_autoinc_but_fks(self): 1536 m = MetaData() 1537 Table('t1', m, Column('id', Integer, primary_key=True)) 1538 t2 = Table( 1539 't2', MetaData(), 1540 Column('a', Integer, ForeignKey('t1.id'), autoincrement=True)) 1541 pk = PrimaryKeyConstraint( 1542 t2.c.a 1543 ) 1544 t2.append_constraint(pk) 1545 is_(pk._autoincrement_column, t2.c.a) 1546 1547 t3 = Table( 1548 't3', MetaData(), 1549 Column('a', Integer, 1550 ForeignKey('t1.id'), autoincrement='ignore_fk')) 1551 pk = PrimaryKeyConstraint( 1552 t3.c.a 1553 ) 1554 t3.append_constraint(pk) 1555 is_(pk._autoincrement_column, t3.c.a) 1556 1557 1558class SchemaTypeTest(fixtures.TestBase): 1559 __backend__ = True 1560 1561 class TrackEvents(object): 1562 column = None 1563 table = None 1564 evt_targets = () 1565 1566 def _set_table(self, column, table): 1567 super(SchemaTypeTest.TrackEvents, self)._set_table(column, table) 1568 self.column = column 1569 self.table = table 1570 1571 def _on_table_create(self, target, bind, **kw): 1572 super(SchemaTypeTest.TrackEvents, self)._on_table_create( 1573 target, bind, **kw) 1574 self.evt_targets += (target,) 1575 1576 def _on_metadata_create(self, target, bind, **kw): 1577 super(SchemaTypeTest.TrackEvents, self)._on_metadata_create( 1578 target, bind, **kw) 1579 self.evt_targets += (target,) 1580 1581 # TODO: Enum and Boolean put TypeEngine first. Changing that here 1582 # causes collection-mutate-while-iterated errors in the event system 1583 # since the hooks here call upon the adapted type. Need to figure out 1584 # why Enum and Boolean don't have this problem. 1585 class MyType(TrackEvents, sqltypes.SchemaType, sqltypes.TypeEngine): 1586 pass 1587 1588 class WrapEnum(TrackEvents, Enum): 1589 pass 1590 1591 class WrapBoolean(TrackEvents, Boolean): 1592 pass 1593 1594 class MyTypeWImpl(MyType): 1595 1596 def _gen_dialect_impl(self, dialect): 1597 return self.adapt(SchemaTypeTest.MyTypeImpl) 1598 1599 class MyTypeImpl(MyTypeWImpl): 1600 pass 1601 1602 class MyTypeDecAndSchema(TypeDecorator, sqltypes.SchemaType): 1603 impl = String() 1604 1605 evt_targets = () 1606 1607 def __init__(self): 1608 TypeDecorator.__init__(self) 1609 sqltypes.SchemaType.__init__(self) 1610 1611 def _on_table_create(self, target, bind, **kw): 1612 self.evt_targets += (target,) 1613 1614 def _on_metadata_create(self, target, bind, **kw): 1615 self.evt_targets += (target,) 1616 1617 def test_before_parent_attach_plain(self): 1618 typ = self.MyType() 1619 self._test_before_parent_attach(typ) 1620 1621 def test_before_parent_attach_typedec_enclosing_schematype(self): 1622 # additional test for [ticket:2919] as part of test for 1623 # [ticket:3832] 1624 1625 class MySchemaType(sqltypes.TypeEngine, sqltypes.SchemaType): 1626 pass 1627 1628 target_typ = MySchemaType() 1629 1630 class MyType(TypeDecorator): 1631 impl = target_typ 1632 1633 typ = MyType() 1634 self._test_before_parent_attach(typ, target_typ, double=True) 1635 1636 def test_before_parent_attach_typedec_of_schematype(self): 1637 class MyType(TypeDecorator, sqltypes.SchemaType): 1638 impl = String 1639 1640 typ = MyType() 1641 self._test_before_parent_attach(typ) 1642 1643 def test_before_parent_attach_schematype_of_typedec(self): 1644 class MyType(sqltypes.SchemaType, TypeDecorator): 1645 impl = String 1646 1647 typ = MyType() 1648 self._test_before_parent_attach(typ) 1649 1650 def _test_before_parent_attach(self, typ, evt_target=None, double=False): 1651 canary = mock.Mock() 1652 1653 if evt_target is None: 1654 evt_target = typ 1655 1656 orig_set_parent = evt_target._set_parent 1657 orig_set_parent_w_dispatch = evt_target._set_parent_with_dispatch 1658 1659 def _set_parent(parent): 1660 orig_set_parent(parent) 1661 canary._set_parent(parent) 1662 1663 def _set_parent_w_dispatch(parent): 1664 orig_set_parent_w_dispatch(parent) 1665 canary._set_parent_with_dispatch(parent) 1666 1667 with mock.patch.object(evt_target, '_set_parent', _set_parent): 1668 with mock.patch.object( 1669 evt_target, '_set_parent_with_dispatch', 1670 _set_parent_w_dispatch): 1671 event.listen(evt_target, "before_parent_attach", canary.go) 1672 1673 c = Column('q', typ) 1674 1675 if double: 1676 # no clean way yet to fix this, inner schema type is called 1677 # twice, but this is a very unusual use case. 1678 eq_( 1679 canary.mock_calls, 1680 [ 1681 mock.call._set_parent(c), 1682 mock.call.go(evt_target, c), 1683 mock.call._set_parent(c), 1684 mock.call._set_parent_with_dispatch(c) 1685 ] 1686 ) 1687 else: 1688 eq_( 1689 canary.mock_calls, 1690 [ 1691 mock.call.go(evt_target, c), 1692 mock.call._set_parent(c), 1693 mock.call._set_parent_with_dispatch(c) 1694 ] 1695 ) 1696 1697 def test_independent_schema(self): 1698 m = MetaData() 1699 type_ = self.MyType(schema="q") 1700 t1 = Table('x', m, Column("y", type_), schema="z") 1701 eq_(t1.c.y.type.schema, "q") 1702 1703 def test_inherit_schema(self): 1704 m = MetaData() 1705 type_ = self.MyType(schema="q", inherit_schema=True) 1706 t1 = Table('x', m, Column("y", type_), schema="z") 1707 eq_(t1.c.y.type.schema, "z") 1708 1709 def test_independent_schema_enum(self): 1710 m = MetaData() 1711 type_ = sqltypes.Enum("a", schema="q") 1712 t1 = Table('x', m, Column("y", type_), schema="z") 1713 eq_(t1.c.y.type.schema, "q") 1714 1715 def test_inherit_schema_enum(self): 1716 m = MetaData() 1717 type_ = sqltypes.Enum("a", "b", "c", schema="q", inherit_schema=True) 1718 t1 = Table('x', m, Column("y", type_), schema="z") 1719 eq_(t1.c.y.type.schema, "z") 1720 1721 def test_tometadata_copy_type(self): 1722 m1 = MetaData() 1723 1724 type_ = self.MyType() 1725 t1 = Table('x', m1, Column("y", type_)) 1726 1727 m2 = MetaData() 1728 t2 = t1.tometadata(m2) 1729 1730 # metadata isn't set 1731 is_(t2.c.y.type.metadata, None) 1732 1733 # our test type sets table, though 1734 is_(t2.c.y.type.table, t2) 1735 1736 def test_tometadata_copy_decorated(self): 1737 1738 class MyDecorated(TypeDecorator): 1739 impl = self.MyType 1740 1741 m1 = MetaData() 1742 1743 type_ = MyDecorated(schema="z") 1744 t1 = Table('x', m1, Column("y", type_)) 1745 1746 m2 = MetaData() 1747 t2 = t1.tometadata(m2) 1748 eq_(t2.c.y.type.schema, "z") 1749 1750 def test_tometadata_independent_schema(self): 1751 m1 = MetaData() 1752 1753 type_ = self.MyType() 1754 t1 = Table('x', m1, Column("y", type_)) 1755 1756 m2 = MetaData() 1757 t2 = t1.tometadata(m2, schema="bar") 1758 1759 eq_(t2.c.y.type.schema, None) 1760 1761 def test_tometadata_inherit_schema(self): 1762 m1 = MetaData() 1763 1764 type_ = self.MyType(inherit_schema=True) 1765 t1 = Table('x', m1, Column("y", type_)) 1766 1767 m2 = MetaData() 1768 t2 = t1.tometadata(m2, schema="bar") 1769 1770 eq_(t1.c.y.type.schema, None) 1771 eq_(t2.c.y.type.schema, "bar") 1772 1773 def test_tometadata_independent_events(self): 1774 m1 = MetaData() 1775 1776 type_ = self.MyType() 1777 t1 = Table('x', m1, Column("y", type_)) 1778 1779 m2 = MetaData() 1780 t2 = t1.tometadata(m2) 1781 1782 t1.dispatch.before_create(t1, testing.db) 1783 eq_(t1.c.y.type.evt_targets, (t1,)) 1784 eq_(t2.c.y.type.evt_targets, ()) 1785 1786 t2.dispatch.before_create(t2, testing.db) 1787 t2.dispatch.before_create(t2, testing.db) 1788 eq_(t1.c.y.type.evt_targets, (t1,)) 1789 eq_(t2.c.y.type.evt_targets, (t2, t2)) 1790 1791 def test_enum_column_copy_transfers_events(self): 1792 m = MetaData() 1793 1794 type_ = self.WrapEnum('a', 'b', 'c', name='foo') 1795 y = Column('y', type_) 1796 y_copy = y.copy() 1797 t1 = Table('x', m, y_copy) 1798 1799 is_true(y_copy.type._create_events) 1800 1801 m.dispatch.before_create(t1, testing.db) 1802 eq_(t1.c.y.type.evt_targets, (t1, )) 1803 1804 def test_boolean_column_copy_transfers_events(self): 1805 m = MetaData() 1806 1807 type_ = self.WrapBoolean() 1808 y = Column('y', type_) 1809 y_copy = y.copy() 1810 t1 = Table('x', m, y_copy) 1811 1812 is_true(y_copy.type._create_events) 1813 1814 def test_metadata_dispatch_no_new_impl(self): 1815 m1 = MetaData() 1816 typ = self.MyType(metadata=m1) 1817 m1.dispatch.before_create(m1, testing.db) 1818 eq_(typ.evt_targets, (m1, )) 1819 1820 dialect_impl = typ.dialect_impl(testing.db.dialect) 1821 eq_(dialect_impl.evt_targets, ()) 1822 1823 def test_metadata_dispatch_new_impl(self): 1824 m1 = MetaData() 1825 typ = self.MyTypeWImpl(metadata=m1) 1826 m1.dispatch.before_create(m1, testing.db) 1827 eq_(typ.evt_targets, (m1, )) 1828 1829 dialect_impl = typ.dialect_impl(testing.db.dialect) 1830 eq_(dialect_impl.evt_targets, (m1, )) 1831 1832 def test_table_dispatch_decorator_schematype(self): 1833 m1 = MetaData() 1834 typ = self.MyTypeDecAndSchema() 1835 t1 = Table('t1', m1, Column('x', typ)) 1836 m1.dispatch.before_create(t1, testing.db) 1837 eq_(typ.evt_targets, (t1, )) 1838 1839 def test_table_dispatch_no_new_impl(self): 1840 m1 = MetaData() 1841 typ = self.MyType() 1842 t1 = Table('t1', m1, Column('x', typ)) 1843 m1.dispatch.before_create(t1, testing.db) 1844 eq_(typ.evt_targets, (t1, )) 1845 1846 dialect_impl = typ.dialect_impl(testing.db.dialect) 1847 eq_(dialect_impl.evt_targets, ()) 1848 1849 def test_table_dispatch_new_impl(self): 1850 m1 = MetaData() 1851 typ = self.MyTypeWImpl() 1852 t1 = Table('t1', m1, Column('x', typ)) 1853 m1.dispatch.before_create(t1, testing.db) 1854 eq_(typ.evt_targets, (t1, )) 1855 1856 dialect_impl = typ.dialect_impl(testing.db.dialect) 1857 eq_(dialect_impl.evt_targets, (t1, )) 1858 1859 def test_create_metadata_bound_no_crash(self): 1860 m1 = MetaData() 1861 self.MyType(metadata=m1) 1862 1863 m1.create_all(testing.db) 1864 1865 def test_boolean_constraint_type_doesnt_double(self): 1866 m1 = MetaData() 1867 1868 t1 = Table('x', m1, Column("flag", Boolean())) 1869 eq_( 1870 len([ 1871 c for c in t1.constraints 1872 if isinstance(c, CheckConstraint)]), 1873 1 1874 ) 1875 m2 = MetaData() 1876 t2 = t1.tometadata(m2) 1877 1878 eq_( 1879 len([ 1880 c for c in t2.constraints 1881 if isinstance(c, CheckConstraint)]), 1882 1 1883 ) 1884 1885 def test_enum_constraint_type_doesnt_double(self): 1886 m1 = MetaData() 1887 1888 t1 = Table('x', m1, Column("flag", Enum('a', 'b', 'c'))) 1889 eq_( 1890 len([ 1891 c for c in t1.constraints 1892 if isinstance(c, CheckConstraint)]), 1893 1 1894 ) 1895 m2 = MetaData() 1896 t2 = t1.tometadata(m2) 1897 1898 eq_( 1899 len([ 1900 c for c in t2.constraints 1901 if isinstance(c, CheckConstraint)]), 1902 1 1903 ) 1904 1905 1906class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): 1907 1908 def test_default_schema_metadata_fk(self): 1909 m = MetaData(schema="foo") 1910 t1 = Table('t1', m, Column('x', Integer)) 1911 t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x'))) 1912 assert t2.c.x.references(t1.c.x) 1913 1914 def test_ad_hoc_schema_equiv_fk(self): 1915 m = MetaData() 1916 t1 = Table('t1', m, Column('x', Integer), schema="foo") 1917 t2 = Table( 1918 't2', 1919 m, 1920 Column( 1921 'x', 1922 Integer, 1923 ForeignKey('t1.x')), 1924 schema="foo") 1925 assert_raises( 1926 exc.NoReferencedTableError, 1927 lambda: t2.c.x.references(t1.c.x) 1928 ) 1929 1930 def test_default_schema_metadata_fk_alt_remote(self): 1931 m = MetaData(schema="foo") 1932 t1 = Table('t1', m, Column('x', Integer)) 1933 t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')), 1934 schema="bar") 1935 assert t2.c.x.references(t1.c.x) 1936 1937 def test_default_schema_metadata_fk_alt_local_raises(self): 1938 m = MetaData(schema="foo") 1939 t1 = Table('t1', m, Column('x', Integer), schema="bar") 1940 t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x'))) 1941 assert_raises( 1942 exc.NoReferencedTableError, 1943 lambda: t2.c.x.references(t1.c.x) 1944 ) 1945 1946 def test_default_schema_metadata_fk_alt_local(self): 1947 m = MetaData(schema="foo") 1948 t1 = Table('t1', m, Column('x', Integer), schema="bar") 1949 t2 = Table('t2', m, Column('x', Integer, ForeignKey('bar.t1.x'))) 1950 assert t2.c.x.references(t1.c.x) 1951 1952 def test_create_drop_schema(self): 1953 1954 self.assert_compile( 1955 schema.CreateSchema("sa_schema"), 1956 "CREATE SCHEMA sa_schema" 1957 ) 1958 self.assert_compile( 1959 schema.DropSchema("sa_schema"), 1960 "DROP SCHEMA sa_schema" 1961 ) 1962 self.assert_compile( 1963 schema.DropSchema("sa_schema", cascade=True), 1964 "DROP SCHEMA sa_schema CASCADE" 1965 ) 1966 1967 def test_iteration(self): 1968 metadata = MetaData() 1969 table1 = Table( 1970 'table1', 1971 metadata, 1972 Column( 1973 'col1', 1974 Integer, 1975 primary_key=True), 1976 schema='someschema') 1977 table2 = Table( 1978 'table2', 1979 metadata, 1980 Column( 1981 'col1', 1982 Integer, 1983 primary_key=True), 1984 Column( 1985 'col2', 1986 Integer, 1987 ForeignKey('someschema.table1.col1')), 1988 schema='someschema') 1989 1990 t1 = str(schema.CreateTable(table1).compile(bind=testing.db)) 1991 t2 = str(schema.CreateTable(table2).compile(bind=testing.db)) 1992 if testing.db.dialect.preparer(testing.db.dialect).omit_schema: 1993 assert t1.index("CREATE TABLE table1") > -1 1994 assert t2.index("CREATE TABLE table2") > -1 1995 else: 1996 assert t1.index("CREATE TABLE someschema.table1") > -1 1997 assert t2.index("CREATE TABLE someschema.table2") > -1 1998 1999 2000class UseExistingTest(fixtures.TablesTest): 2001 2002 @classmethod 2003 def define_tables(cls, metadata): 2004 Table('users', metadata, 2005 Column('id', Integer, primary_key=True), 2006 Column('name', String(30))) 2007 2008 def _useexisting_fixture(self): 2009 meta2 = MetaData(testing.db) 2010 Table('users', meta2, autoload=True) 2011 return meta2 2012 2013 def _notexisting_fixture(self): 2014 return MetaData(testing.db) 2015 2016 def test_exception_no_flags(self): 2017 meta2 = self._useexisting_fixture() 2018 2019 def go(): 2020 Table('users', meta2, Column('name', 2021 Unicode), autoload=True) 2022 assert_raises_message( 2023 exc.InvalidRequestError, 2024 "Table 'users' is already defined for this " 2025 "MetaData instance.", 2026 go 2027 ) 2028 2029 def test_keep_plus_existing_raises(self): 2030 meta2 = self._useexisting_fixture() 2031 assert_raises( 2032 exc.ArgumentError, 2033 Table, 'users', meta2, keep_existing=True, 2034 extend_existing=True 2035 ) 2036 2037 @testing.uses_deprecated() 2038 def test_existing_plus_useexisting_raises(self): 2039 meta2 = self._useexisting_fixture() 2040 assert_raises( 2041 exc.ArgumentError, 2042 Table, 'users', meta2, useexisting=True, 2043 extend_existing=True 2044 ) 2045 2046 def test_keep_existing_no_dupe_constraints(self): 2047 meta2 = self._notexisting_fixture() 2048 users = Table('users', meta2, 2049 Column('id', Integer), 2050 Column('name', Unicode), 2051 UniqueConstraint('name'), 2052 keep_existing=True 2053 ) 2054 assert 'name' in users.c 2055 assert 'id' in users.c 2056 eq_(len(users.constraints), 2) 2057 2058 u2 = Table('users', meta2, 2059 Column('id', Integer), 2060 Column('name', Unicode), 2061 UniqueConstraint('name'), 2062 keep_existing=True 2063 ) 2064 eq_(len(u2.constraints), 2) 2065 2066 def test_extend_existing_dupes_constraints(self): 2067 meta2 = self._notexisting_fixture() 2068 users = Table('users', meta2, 2069 Column('id', Integer), 2070 Column('name', Unicode), 2071 UniqueConstraint('name'), 2072 extend_existing=True 2073 ) 2074 assert 'name' in users.c 2075 assert 'id' in users.c 2076 eq_(len(users.constraints), 2) 2077 2078 u2 = Table('users', meta2, 2079 Column('id', Integer), 2080 Column('name', Unicode), 2081 UniqueConstraint('name'), 2082 extend_existing=True 2083 ) 2084 # constraint got duped 2085 eq_(len(u2.constraints), 3) 2086 2087 def test_keep_existing_coltype(self): 2088 meta2 = self._useexisting_fixture() 2089 users = Table('users', meta2, Column('name', Unicode), 2090 autoload=True, keep_existing=True) 2091 assert not isinstance(users.c.name.type, Unicode) 2092 2093 def test_keep_existing_quote(self): 2094 meta2 = self._useexisting_fixture() 2095 users = Table('users', meta2, quote=True, autoload=True, 2096 keep_existing=True) 2097 assert not users.name.quote 2098 2099 def test_keep_existing_add_column(self): 2100 meta2 = self._useexisting_fixture() 2101 users = Table('users', meta2, 2102 Column('foo', Integer), 2103 autoload=True, 2104 keep_existing=True) 2105 assert "foo" not in users.c 2106 2107 def test_keep_existing_coltype_no_orig(self): 2108 meta2 = self._notexisting_fixture() 2109 users = Table('users', meta2, Column('name', Unicode), 2110 autoload=True, keep_existing=True) 2111 assert isinstance(users.c.name.type, Unicode) 2112 2113 @testing.skip_if( 2114 lambda: testing.db.dialect.requires_name_normalize, 2115 "test depends on lowercase as case insensitive") 2116 def test_keep_existing_quote_no_orig(self): 2117 meta2 = self._notexisting_fixture() 2118 users = Table('users', meta2, quote=True, 2119 autoload=True, 2120 keep_existing=True) 2121 assert users.name.quote 2122 2123 def test_keep_existing_add_column_no_orig(self): 2124 meta2 = self._notexisting_fixture() 2125 users = Table('users', meta2, 2126 Column('foo', Integer), 2127 autoload=True, 2128 keep_existing=True) 2129 assert "foo" in users.c 2130 2131 def test_keep_existing_coltype_no_reflection(self): 2132 meta2 = self._useexisting_fixture() 2133 users = Table('users', meta2, Column('name', Unicode), 2134 keep_existing=True) 2135 assert not isinstance(users.c.name.type, Unicode) 2136 2137 def test_keep_existing_quote_no_reflection(self): 2138 meta2 = self._useexisting_fixture() 2139 users = Table('users', meta2, quote=True, 2140 keep_existing=True) 2141 assert not users.name.quote 2142 2143 def test_keep_existing_add_column_no_reflection(self): 2144 meta2 = self._useexisting_fixture() 2145 users = Table('users', meta2, 2146 Column('foo', Integer), 2147 keep_existing=True) 2148 assert "foo" not in users.c 2149 2150 def test_extend_existing_coltype(self): 2151 meta2 = self._useexisting_fixture() 2152 users = Table('users', meta2, Column('name', Unicode), 2153 autoload=True, extend_existing=True) 2154 assert isinstance(users.c.name.type, Unicode) 2155 2156 def test_extend_existing_quote(self): 2157 meta2 = self._useexisting_fixture() 2158 assert_raises_message( 2159 tsa.exc.ArgumentError, 2160 "Can't redefine 'quote' or 'quote_schema' arguments", 2161 Table, 'users', meta2, quote=True, autoload=True, 2162 extend_existing=True 2163 ) 2164 2165 def test_extend_existing_add_column(self): 2166 meta2 = self._useexisting_fixture() 2167 users = Table('users', meta2, 2168 Column('foo', Integer), 2169 autoload=True, 2170 extend_existing=True) 2171 assert "foo" in users.c 2172 2173 def test_extend_existing_coltype_no_orig(self): 2174 meta2 = self._notexisting_fixture() 2175 users = Table('users', meta2, Column('name', Unicode), 2176 autoload=True, extend_existing=True) 2177 assert isinstance(users.c.name.type, Unicode) 2178 2179 @testing.skip_if( 2180 lambda: testing.db.dialect.requires_name_normalize, 2181 "test depends on lowercase as case insensitive") 2182 def test_extend_existing_quote_no_orig(self): 2183 meta2 = self._notexisting_fixture() 2184 users = Table('users', meta2, quote=True, 2185 autoload=True, 2186 extend_existing=True) 2187 assert users.name.quote 2188 2189 def test_extend_existing_add_column_no_orig(self): 2190 meta2 = self._notexisting_fixture() 2191 users = Table('users', meta2, 2192 Column('foo', Integer), 2193 autoload=True, 2194 extend_existing=True) 2195 assert "foo" in users.c 2196 2197 def test_extend_existing_coltype_no_reflection(self): 2198 meta2 = self._useexisting_fixture() 2199 users = Table('users', meta2, Column('name', Unicode), 2200 extend_existing=True) 2201 assert isinstance(users.c.name.type, Unicode) 2202 2203 def test_extend_existing_quote_no_reflection(self): 2204 meta2 = self._useexisting_fixture() 2205 assert_raises_message( 2206 tsa.exc.ArgumentError, 2207 "Can't redefine 'quote' or 'quote_schema' arguments", 2208 Table, 'users', meta2, quote=True, 2209 extend_existing=True 2210 ) 2211 2212 def test_extend_existing_add_column_no_reflection(self): 2213 meta2 = self._useexisting_fixture() 2214 users = Table('users', meta2, 2215 Column('foo', Integer), 2216 extend_existing=True) 2217 assert "foo" in users.c 2218 2219 2220class ConstraintTest(fixtures.TestBase): 2221 2222 def _single_fixture(self): 2223 m = MetaData() 2224 2225 t1 = Table('t1', m, 2226 Column('a', Integer), 2227 Column('b', Integer) 2228 ) 2229 2230 t2 = Table('t2', m, 2231 Column('a', Integer, ForeignKey('t1.a')) 2232 ) 2233 2234 t3 = Table('t3', m, 2235 Column('a', Integer) 2236 ) 2237 return t1, t2, t3 2238 2239 def _assert_index_col_x(self, t, i, columns=True): 2240 eq_(t.indexes, set([i])) 2241 if columns: 2242 eq_(list(i.columns), [t.c.x]) 2243 else: 2244 eq_(list(i.columns), []) 2245 assert i.table is t 2246 2247 def test_separate_decl_columns(self): 2248 m = MetaData() 2249 t = Table('t', m, Column('x', Integer)) 2250 i = Index('i', t.c.x) 2251 self._assert_index_col_x(t, i) 2252 2253 def test_separate_decl_columns_functional(self): 2254 m = MetaData() 2255 t = Table('t', m, Column('x', Integer)) 2256 i = Index('i', func.foo(t.c.x)) 2257 self._assert_index_col_x(t, i) 2258 2259 def test_inline_decl_columns(self): 2260 m = MetaData() 2261 c = Column('x', Integer) 2262 i = Index('i', c) 2263 t = Table('t', m, c, i) 2264 self._assert_index_col_x(t, i) 2265 2266 def test_inline_decl_columns_functional(self): 2267 m = MetaData() 2268 c = Column('x', Integer) 2269 i = Index('i', func.foo(c)) 2270 t = Table('t', m, c, i) 2271 self._assert_index_col_x(t, i) 2272 2273 def test_inline_decl_string(self): 2274 m = MetaData() 2275 i = Index('i', "x") 2276 t = Table('t', m, Column('x', Integer), i) 2277 self._assert_index_col_x(t, i) 2278 2279 def test_inline_decl_textonly(self): 2280 m = MetaData() 2281 i = Index('i', text("foobar(x)")) 2282 t = Table('t', m, Column('x', Integer), i) 2283 self._assert_index_col_x(t, i, columns=False) 2284 2285 def test_separate_decl_textonly(self): 2286 m = MetaData() 2287 i = Index('i', text("foobar(x)")) 2288 t = Table('t', m, Column('x', Integer)) 2289 t.append_constraint(i) 2290 self._assert_index_col_x(t, i, columns=False) 2291 2292 def test_unnamed_column_exception(self): 2293 # this can occur in some declarative situations 2294 c = Column(Integer) 2295 idx = Index('q', c) 2296 m = MetaData() 2297 t = Table('t', m, Column('q')) 2298 assert_raises_message( 2299 exc.ArgumentError, 2300 "Can't add unnamed column to column collection", 2301 t.append_constraint, idx 2302 ) 2303 2304 def test_column_associated_w_lowercase_table(self): 2305 from sqlalchemy import table 2306 c = Column('x', Integer) 2307 table('foo', c) 2308 idx = Index('q', c) 2309 is_(idx.table, None) # lower-case-T table doesn't have indexes 2310 2311 def test_clauseelement_extraction_one(self): 2312 t = Table('t', MetaData(), Column('x', Integer), Column('y', Integer)) 2313 2314 class MyThing(object): 2315 def __clause_element__(self): 2316 return t.c.x + 5 2317 2318 idx = Index('foo', MyThing()) 2319 self._assert_index_col_x(t, idx) 2320 2321 def test_clauseelement_extraction_two(self): 2322 t = Table('t', MetaData(), Column('x', Integer), Column('y', Integer)) 2323 2324 class MyThing(object): 2325 def __clause_element__(self): 2326 return t.c.x + 5 2327 2328 idx = Index('bar', MyThing(), t.c.y) 2329 2330 eq_(set(t.indexes), set([idx])) 2331 2332 def test_clauseelement_extraction_three(self): 2333 t = Table('t', MetaData(), Column('x', Integer), Column('y', Integer)) 2334 2335 expr1 = t.c.x + 5 2336 2337 class MyThing(object): 2338 def __clause_element__(self): 2339 return expr1 2340 2341 idx = Index('bar', MyThing(), t.c.y) 2342 2343 is_(idx.expressions[0], expr1) 2344 is_(idx.expressions[1], t.c.y) 2345 2346 def test_table_references(self): 2347 t1, t2, t3 = self._single_fixture() 2348 assert list(t2.c.a.foreign_keys)[0].references(t1) 2349 assert not list(t2.c.a.foreign_keys)[0].references(t3) 2350 2351 def test_column_references(self): 2352 t1, t2, t3 = self._single_fixture() 2353 assert t2.c.a.references(t1.c.a) 2354 assert not t2.c.a.references(t3.c.a) 2355 assert not t2.c.a.references(t1.c.b) 2356 2357 def test_column_references_derived(self): 2358 t1, t2, t3 = self._single_fixture() 2359 s1 = tsa.select([tsa.select([t1]).alias()]) 2360 assert t2.c.a.references(s1.c.a) 2361 assert not t2.c.a.references(s1.c.b) 2362 2363 def test_copy_doesnt_reference(self): 2364 t1, t2, t3 = self._single_fixture() 2365 a2 = t2.c.a.copy() 2366 assert not a2.references(t1.c.a) 2367 assert not a2.references(t1.c.b) 2368 2369 def test_derived_column_references(self): 2370 t1, t2, t3 = self._single_fixture() 2371 s1 = tsa.select([tsa.select([t2]).alias()]) 2372 assert s1.c.a.references(t1.c.a) 2373 assert not s1.c.a.references(t1.c.b) 2374 2375 def test_referred_table_accessor(self): 2376 t1, t2, t3 = self._single_fixture() 2377 fkc = list(t2.foreign_key_constraints)[0] 2378 is_(fkc.referred_table, t1) 2379 2380 def test_referred_table_accessor_not_available(self): 2381 t1 = Table('t', MetaData(), Column('x', ForeignKey('q.id'))) 2382 fkc = list(t1.foreign_key_constraints)[0] 2383 assert_raises_message( 2384 exc.InvalidRequestError, 2385 "Foreign key associated with column 't.x' could not find " 2386 "table 'q' with which to generate a foreign key to target " 2387 "column 'id'", 2388 getattr, fkc, "referred_table" 2389 ) 2390 2391 def test_related_column_not_present_atfirst_ok(self): 2392 m = MetaData() 2393 base_table = Table("base", m, 2394 Column("id", Integer, primary_key=True) 2395 ) 2396 fk = ForeignKey('base.q') 2397 derived_table = Table("derived", m, 2398 Column("id", None, fk, 2399 primary_key=True), 2400 ) 2401 2402 base_table.append_column(Column('q', Integer)) 2403 assert fk.column is base_table.c.q 2404 assert isinstance(derived_table.c.id.type, Integer) 2405 2406 def test_related_column_not_present_atfirst_ok_onname(self): 2407 m = MetaData() 2408 base_table = Table("base", m, 2409 Column("id", Integer, primary_key=True) 2410 ) 2411 fk = ForeignKey('base.q', link_to_name=True) 2412 derived_table = Table("derived", m, 2413 Column("id", None, fk, 2414 primary_key=True), 2415 ) 2416 2417 base_table.append_column(Column('q', Integer, key='zz')) 2418 assert fk.column is base_table.c.zz 2419 assert isinstance(derived_table.c.id.type, Integer) 2420 2421 def test_related_column_not_present_atfirst_ok_linktoname_conflict(self): 2422 m = MetaData() 2423 base_table = Table("base", m, 2424 Column("id", Integer, primary_key=True) 2425 ) 2426 fk = ForeignKey('base.q', link_to_name=True) 2427 derived_table = Table("derived", m, 2428 Column("id", None, fk, 2429 primary_key=True), 2430 ) 2431 2432 base_table.append_column(Column('zz', Integer, key='q')) 2433 base_table.append_column(Column('q', Integer, key='zz')) 2434 assert fk.column is base_table.c.zz 2435 assert isinstance(derived_table.c.id.type, Integer) 2436 2437 def test_invalid_composite_fk_check_strings(self): 2438 m = MetaData() 2439 2440 assert_raises_message( 2441 exc.ArgumentError, 2442 r"ForeignKeyConstraint on t1\(x, y\) refers to " 2443 "multiple remote tables: t2 and t3", 2444 Table, 2445 't1', m, Column('x', Integer), Column('y', Integer), 2446 ForeignKeyConstraint(['x', 'y'], ['t2.x', 't3.y']) 2447 ) 2448 2449 def test_invalid_composite_fk_check_columns(self): 2450 m = MetaData() 2451 2452 t2 = Table('t2', m, Column('x', Integer)) 2453 t3 = Table('t3', m, Column('y', Integer)) 2454 2455 assert_raises_message( 2456 exc.ArgumentError, 2457 r"ForeignKeyConstraint on t1\(x, y\) refers to " 2458 "multiple remote tables: t2 and t3", 2459 Table, 2460 't1', m, Column('x', Integer), Column('y', Integer), 2461 ForeignKeyConstraint(['x', 'y'], [t2.c.x, t3.c.y]) 2462 ) 2463 2464 def test_invalid_composite_fk_check_columns_notattached(self): 2465 m = MetaData() 2466 x = Column('x', Integer) 2467 y = Column('y', Integer) 2468 2469 # no error is raised for this one right now. 2470 # which is a minor bug. 2471 Table('t1', m, Column('x', Integer), Column('y', Integer), 2472 ForeignKeyConstraint(['x', 'y'], [x, y]) 2473 ) 2474 2475 Table('t2', m, x) 2476 Table('t3', m, y) 2477 2478 def test_constraint_copied_to_proxy_ok(self): 2479 m = MetaData() 2480 Table('t1', m, Column('id', Integer, primary_key=True)) 2481 t2 = Table('t2', m, Column('id', Integer, ForeignKey('t1.id'), 2482 primary_key=True)) 2483 2484 s = tsa.select([t2]) 2485 t2fk = list(t2.c.id.foreign_keys)[0] 2486 sfk = list(s.c.id.foreign_keys)[0] 2487 2488 # the two FKs share the ForeignKeyConstraint 2489 is_( 2490 t2fk.constraint, 2491 sfk.constraint 2492 ) 2493 2494 # but the ForeignKeyConstraint isn't 2495 # aware of the select's FK 2496 eq_( 2497 t2fk.constraint.elements, 2498 [t2fk] 2499 ) 2500 2501 def test_type_propagate_composite_fk_string(self): 2502 metadata = MetaData() 2503 Table( 2504 'a', metadata, 2505 Column('key1', Integer, primary_key=True), 2506 Column('key2', String(40), primary_key=True)) 2507 2508 b = Table('b', metadata, 2509 Column('a_key1', None), 2510 Column('a_key2', None), 2511 Column('id', Integer, primary_key=True), 2512 ForeignKeyConstraint(['a_key1', 'a_key2'], 2513 ['a.key1', 'a.key2']) 2514 ) 2515 2516 assert isinstance(b.c.a_key1.type, Integer) 2517 assert isinstance(b.c.a_key2.type, String) 2518 2519 def test_type_propagate_composite_fk_col(self): 2520 metadata = MetaData() 2521 a = Table('a', metadata, 2522 Column('key1', Integer, primary_key=True), 2523 Column('key2', String(40), primary_key=True)) 2524 2525 b = Table('b', metadata, 2526 Column('a_key1', None), 2527 Column('a_key2', None), 2528 Column('id', Integer, primary_key=True), 2529 ForeignKeyConstraint(['a_key1', 'a_key2'], 2530 [a.c.key1, a.c.key2]) 2531 ) 2532 2533 assert isinstance(b.c.a_key1.type, Integer) 2534 assert isinstance(b.c.a_key2.type, String) 2535 2536 def test_type_propagate_standalone_fk_string(self): 2537 metadata = MetaData() 2538 Table( 2539 'a', metadata, 2540 Column('key1', Integer, primary_key=True)) 2541 2542 b = Table('b', metadata, 2543 Column('a_key1', None, ForeignKey("a.key1")), 2544 ) 2545 2546 assert isinstance(b.c.a_key1.type, Integer) 2547 2548 def test_type_propagate_standalone_fk_col(self): 2549 metadata = MetaData() 2550 a = Table('a', metadata, 2551 Column('key1', Integer, primary_key=True)) 2552 2553 b = Table('b', metadata, 2554 Column('a_key1', None, ForeignKey(a.c.key1)), 2555 ) 2556 2557 assert isinstance(b.c.a_key1.type, Integer) 2558 2559 def test_type_propagate_chained_string_source_first(self): 2560 metadata = MetaData() 2561 Table( 2562 'a', metadata, 2563 Column('key1', Integer, primary_key=True) 2564 ) 2565 2566 b = Table('b', metadata, 2567 Column('a_key1', None, ForeignKey("a.key1")), 2568 ) 2569 2570 c = Table('c', metadata, 2571 Column('b_key1', None, ForeignKey("b.a_key1")), 2572 ) 2573 2574 assert isinstance(b.c.a_key1.type, Integer) 2575 assert isinstance(c.c.b_key1.type, Integer) 2576 2577 def test_type_propagate_chained_string_source_last(self): 2578 metadata = MetaData() 2579 2580 b = Table('b', metadata, 2581 Column('a_key1', None, ForeignKey("a.key1")), 2582 ) 2583 2584 c = Table('c', metadata, 2585 Column('b_key1', None, ForeignKey("b.a_key1")), 2586 ) 2587 2588 Table( 2589 'a', metadata, 2590 Column('key1', Integer, primary_key=True)) 2591 2592 assert isinstance(b.c.a_key1.type, Integer) 2593 assert isinstance(c.c.b_key1.type, Integer) 2594 2595 def test_type_propagate_chained_string_source_last_onname(self): 2596 metadata = MetaData() 2597 2598 b = Table('b', metadata, 2599 Column( 2600 'a_key1', None, 2601 ForeignKey("a.key1", link_to_name=True), key="ak1"), 2602 ) 2603 2604 c = Table('c', metadata, 2605 Column( 2606 'b_key1', None, 2607 ForeignKey("b.a_key1", link_to_name=True), key="bk1"), 2608 ) 2609 2610 Table( 2611 'a', metadata, 2612 Column('key1', Integer, primary_key=True, key='ak1')) 2613 2614 assert isinstance(b.c.ak1.type, Integer) 2615 assert isinstance(c.c.bk1.type, Integer) 2616 2617 def test_type_propagate_chained_string_source_last_onname_conflict(self): 2618 metadata = MetaData() 2619 2620 b = Table('b', metadata, 2621 # b.c.key1 -> a.c.key1 -> String 2622 Column( 2623 'ak1', None, 2624 ForeignKey("a.key1", link_to_name=False), key="key1"), 2625 # b.c.ak1 -> a.c.ak1 -> Integer 2626 Column( 2627 'a_key1', None, 2628 ForeignKey("a.key1", link_to_name=True), key="ak1"), 2629 ) 2630 2631 c = Table('c', metadata, 2632 # c.c.b_key1 -> b.c.ak1 -> Integer 2633 Column( 2634 'b_key1', None, 2635 ForeignKey("b.ak1", link_to_name=False)), 2636 # c.c.b_ak1 -> b.c.ak1 2637 Column( 2638 'b_ak1', None, 2639 ForeignKey("b.ak1", link_to_name=True)), 2640 ) 2641 2642 Table( 2643 'a', metadata, 2644 # a.c.key1 2645 Column('ak1', String, key="key1"), 2646 # a.c.ak1 2647 Column('key1', Integer, primary_key=True, key='ak1'), 2648 ) 2649 2650 assert isinstance(b.c.key1.type, String) 2651 assert isinstance(b.c.ak1.type, Integer) 2652 2653 assert isinstance(c.c.b_ak1.type, String) 2654 assert isinstance(c.c.b_key1.type, Integer) 2655 2656 def test_type_propagate_chained_col_orig_first(self): 2657 metadata = MetaData() 2658 a = Table('a', metadata, 2659 Column('key1', Integer, primary_key=True)) 2660 2661 b = Table('b', metadata, 2662 Column('a_key1', None, ForeignKey(a.c.key1)), 2663 ) 2664 2665 c = Table('c', metadata, 2666 Column('b_key1', None, ForeignKey(b.c.a_key1)), 2667 ) 2668 2669 assert isinstance(b.c.a_key1.type, Integer) 2670 assert isinstance(c.c.b_key1.type, Integer) 2671 2672 def test_column_accessor_col(self): 2673 c1 = Column('x', Integer) 2674 fk = ForeignKey(c1) 2675 is_(fk.column, c1) 2676 2677 def test_column_accessor_clause_element(self): 2678 c1 = Column('x', Integer) 2679 2680 class CThing(object): 2681 2682 def __init__(self, c): 2683 self.c = c 2684 2685 def __clause_element__(self): 2686 return self.c 2687 2688 fk = ForeignKey(CThing(c1)) 2689 is_(fk.column, c1) 2690 2691 def test_column_accessor_string_no_parent(self): 2692 fk = ForeignKey("sometable.somecol") 2693 assert_raises_message( 2694 exc.InvalidRequestError, 2695 "this ForeignKey object does not yet have a parent " 2696 "Column associated with it.", 2697 getattr, fk, "column" 2698 ) 2699 2700 def test_column_accessor_string_no_parent_table(self): 2701 fk = ForeignKey("sometable.somecol") 2702 Column('x', fk) 2703 assert_raises_message( 2704 exc.InvalidRequestError, 2705 "this ForeignKey's parent column is not yet " 2706 "associated with a Table.", 2707 getattr, fk, "column" 2708 ) 2709 2710 def test_column_accessor_string_no_target_table(self): 2711 fk = ForeignKey("sometable.somecol") 2712 c1 = Column('x', fk) 2713 Table('t', MetaData(), c1) 2714 assert_raises_message( 2715 exc.NoReferencedTableError, 2716 "Foreign key associated with column 't.x' could not find " 2717 "table 'sometable' with which to generate a " 2718 "foreign key to target column 'somecol'", 2719 getattr, fk, "column" 2720 ) 2721 2722 def test_column_accessor_string_no_target_column(self): 2723 fk = ForeignKey("sometable.somecol") 2724 c1 = Column('x', fk) 2725 m = MetaData() 2726 Table('t', m, c1) 2727 Table("sometable", m, Column('notsomecol', Integer)) 2728 assert_raises_message( 2729 exc.NoReferencedColumnError, 2730 "Could not initialize target column for ForeignKey " 2731 "'sometable.somecol' on table 't': " 2732 "table 'sometable' has no column named 'somecol'", 2733 getattr, fk, "column" 2734 ) 2735 2736 def test_remove_table_fk_bookkeeping(self): 2737 metadata = MetaData() 2738 fk = ForeignKey('t1.x') 2739 t2 = Table('t2', metadata, Column('y', Integer, fk)) 2740 t3 = Table('t3', metadata, Column('y', Integer, ForeignKey('t1.x'))) 2741 2742 assert t2.key in metadata.tables 2743 assert ("t1", "x") in metadata._fk_memos 2744 2745 metadata.remove(t2) 2746 2747 # key is removed 2748 assert t2.key not in metadata.tables 2749 2750 # the memo for the FK is still there 2751 assert ("t1", "x") in metadata._fk_memos 2752 2753 # fk is not in the collection 2754 assert fk not in metadata._fk_memos[("t1", "x")] 2755 2756 # make the referenced table 2757 t1 = Table('t1', metadata, Column('x', Integer)) 2758 2759 # t2 tells us exactly what's wrong 2760 assert_raises_message( 2761 exc.InvalidRequestError, 2762 "Table t2 is no longer associated with its parent MetaData", 2763 getattr, fk, "column" 2764 ) 2765 2766 # t3 is unaffected 2767 assert t3.c.y.references(t1.c.x) 2768 2769 # remove twice OK 2770 metadata.remove(t2) 2771 2772 def test_double_fk_usage_raises(self): 2773 f = ForeignKey('b.id') 2774 2775 Column('x', Integer, f) 2776 assert_raises(exc.InvalidRequestError, Column, "y", Integer, f) 2777 2778 def test_auto_append_constraint(self): 2779 m = MetaData() 2780 2781 t = Table('tbl', m, 2782 Column('a', Integer), 2783 Column('b', Integer) 2784 ) 2785 2786 t2 = Table('t2', m, 2787 Column('a', Integer), 2788 Column('b', Integer) 2789 ) 2790 2791 for c in ( 2792 UniqueConstraint(t.c.a), 2793 CheckConstraint(t.c.a > 5), 2794 ForeignKeyConstraint([t.c.a], [t2.c.a]), 2795 PrimaryKeyConstraint(t.c.a) 2796 ): 2797 assert c in t.constraints 2798 t.append_constraint(c) 2799 assert c in t.constraints 2800 2801 c = Index('foo', t.c.a) 2802 assert c in t.indexes 2803 2804 def test_auto_append_lowercase_table(self): 2805 from sqlalchemy import table, column 2806 2807 t = table('t', column('a')) 2808 t2 = table('t2', column('a')) 2809 for c in ( 2810 UniqueConstraint(t.c.a), 2811 CheckConstraint(t.c.a > 5), 2812 ForeignKeyConstraint([t.c.a], [t2.c.a]), 2813 PrimaryKeyConstraint(t.c.a), 2814 Index('foo', t.c.a) 2815 ): 2816 assert True 2817 2818 def test_tometadata_ok(self): 2819 m = MetaData() 2820 2821 t = Table('tbl', m, 2822 Column('a', Integer), 2823 Column('b', Integer) 2824 ) 2825 2826 t2 = Table('t2', m, 2827 Column('a', Integer), 2828 Column('b', Integer) 2829 ) 2830 2831 UniqueConstraint(t.c.a) 2832 CheckConstraint(t.c.a > 5) 2833 ForeignKeyConstraint([t.c.a], [t2.c.a]) 2834 PrimaryKeyConstraint(t.c.a) 2835 2836 m2 = MetaData() 2837 2838 t3 = t.tometadata(m2) 2839 2840 eq_(len(t3.constraints), 4) 2841 2842 for c in t3.constraints: 2843 assert c.table is t3 2844 2845 def test_check_constraint_copy(self): 2846 m = MetaData() 2847 t = Table('tbl', m, 2848 Column('a', Integer), 2849 Column('b', Integer) 2850 ) 2851 ck = CheckConstraint(t.c.a > 5) 2852 ck2 = ck.copy() 2853 assert ck in t.constraints 2854 assert ck2 not in t.constraints 2855 2856 def test_ambig_check_constraint_auto_append(self): 2857 m = MetaData() 2858 2859 t = Table('tbl', m, 2860 Column('a', Integer), 2861 Column('b', Integer) 2862 ) 2863 2864 t2 = Table('t2', m, 2865 Column('a', Integer), 2866 Column('b', Integer) 2867 ) 2868 c = CheckConstraint(t.c.a > t2.c.b) 2869 assert c not in t.constraints 2870 assert c not in t2.constraints 2871 2872 def test_auto_append_ck_on_col_attach_one(self): 2873 m = MetaData() 2874 2875 a = Column('a', Integer) 2876 b = Column('b', Integer) 2877 ck = CheckConstraint(a > b) 2878 2879 t = Table('tbl', m, a, b) 2880 assert ck in t.constraints 2881 2882 def test_auto_append_ck_on_col_attach_two(self): 2883 m = MetaData() 2884 2885 a = Column('a', Integer) 2886 b = Column('b', Integer) 2887 c = Column('c', Integer) 2888 ck = CheckConstraint(a > b + c) 2889 2890 t = Table('tbl', m, a) 2891 assert ck not in t.constraints 2892 2893 t.append_column(b) 2894 assert ck not in t.constraints 2895 2896 t.append_column(c) 2897 assert ck in t.constraints 2898 2899 def test_auto_append_ck_on_col_attach_three(self): 2900 m = MetaData() 2901 2902 a = Column('a', Integer) 2903 b = Column('b', Integer) 2904 c = Column('c', Integer) 2905 ck = CheckConstraint(a > b + c) 2906 2907 t = Table('tbl', m, a) 2908 assert ck not in t.constraints 2909 2910 t.append_column(b) 2911 assert ck not in t.constraints 2912 2913 t2 = Table('t2', m) 2914 t2.append_column(c) 2915 2916 # two different tables, so CheckConstraint does nothing. 2917 assert ck not in t.constraints 2918 2919 def test_auto_append_uq_on_col_attach_one(self): 2920 m = MetaData() 2921 2922 a = Column('a', Integer) 2923 b = Column('b', Integer) 2924 uq = UniqueConstraint(a, b) 2925 2926 t = Table('tbl', m, a, b) 2927 assert uq in t.constraints 2928 2929 def test_auto_append_uq_on_col_attach_two(self): 2930 m = MetaData() 2931 2932 a = Column('a', Integer) 2933 b = Column('b', Integer) 2934 c = Column('c', Integer) 2935 uq = UniqueConstraint(a, b, c) 2936 2937 t = Table('tbl', m, a) 2938 assert uq not in t.constraints 2939 2940 t.append_column(b) 2941 assert uq not in t.constraints 2942 2943 t.append_column(c) 2944 assert uq in t.constraints 2945 2946 def test_auto_append_uq_on_col_attach_three(self): 2947 m = MetaData() 2948 2949 a = Column('a', Integer) 2950 b = Column('b', Integer) 2951 c = Column('c', Integer) 2952 uq = UniqueConstraint(a, b, c) 2953 2954 t = Table('tbl', m, a) 2955 assert uq not in t.constraints 2956 2957 t.append_column(b) 2958 assert uq not in t.constraints 2959 2960 t2 = Table('t2', m) 2961 2962 # two different tables, so UniqueConstraint raises 2963 assert_raises_message( 2964 exc.ArgumentError, 2965 r"Column\(s\) 't2\.c' are not part of table 'tbl'\.", 2966 t2.append_column, c 2967 ) 2968 2969 def test_auto_append_uq_on_col_attach_four(self): 2970 """Test that a uniqueconstraint that names Column and string names 2971 won't autoattach using deferred column attachment. 2972 2973 """ 2974 m = MetaData() 2975 2976 a = Column('a', Integer) 2977 b = Column('b', Integer) 2978 c = Column('c', Integer) 2979 uq = UniqueConstraint(a, 'b', 'c') 2980 2981 t = Table('tbl', m, a) 2982 assert uq not in t.constraints 2983 2984 t.append_column(b) 2985 assert uq not in t.constraints 2986 2987 t.append_column(c) 2988 2989 # we don't track events for previously unknown columns 2990 # named 'c' to be attached 2991 assert uq not in t.constraints 2992 2993 t.append_constraint(uq) 2994 2995 assert uq in t.constraints 2996 2997 eq_( 2998 [cn for cn in t.constraints if isinstance(cn, UniqueConstraint)], 2999 [uq] 3000 ) 3001 3002 def test_auto_append_uq_on_col_attach_five(self): 3003 """Test that a uniqueconstraint that names Column and string names 3004 *will* autoattach if the table has all those names up front. 3005 3006 """ 3007 m = MetaData() 3008 3009 a = Column('a', Integer) 3010 b = Column('b', Integer) 3011 c = Column('c', Integer) 3012 3013 t = Table('tbl', m, a, c, b) 3014 3015 uq = UniqueConstraint(a, 'b', 'c') 3016 3017 assert uq in t.constraints 3018 3019 t.append_constraint(uq) 3020 3021 assert uq in t.constraints 3022 3023 eq_( 3024 [cn for cn in t.constraints if isinstance(cn, UniqueConstraint)], 3025 [uq] 3026 ) 3027 3028 def test_index_asserts_cols_standalone(self): 3029 metadata = MetaData() 3030 3031 t1 = Table('t1', metadata, 3032 Column('x', Integer) 3033 ) 3034 t2 = Table('t2', metadata, 3035 Column('y', Integer) 3036 ) 3037 assert_raises_message( 3038 exc.ArgumentError, 3039 r"Column\(s\) 't2.y' are not part of table 't1'.", 3040 Index, 3041 "bar", t1.c.x, t2.c.y 3042 ) 3043 3044 def test_index_asserts_cols_inline(self): 3045 metadata = MetaData() 3046 3047 t1 = Table('t1', metadata, 3048 Column('x', Integer) 3049 ) 3050 assert_raises_message( 3051 exc.ArgumentError, 3052 "Index 'bar' is against table 't1', and " 3053 "cannot be associated with table 't2'.", 3054 Table, 't2', metadata, 3055 Column('y', Integer), 3056 Index('bar', t1.c.x) 3057 ) 3058 3059 def test_raise_index_nonexistent_name(self): 3060 m = MetaData() 3061 # the KeyError isn't ideal here, a nicer message 3062 # perhaps 3063 assert_raises( 3064 KeyError, 3065 Table, 't', m, Column('x', Integer), Index("foo", "q") 3066 ) 3067 3068 def test_raise_not_a_column(self): 3069 assert_raises( 3070 exc.ArgumentError, 3071 Index, "foo", 5 3072 ) 3073 3074 def test_raise_expr_no_column(self): 3075 idx = Index('foo', func.lower(5)) 3076 3077 assert_raises_message( 3078 exc.CompileError, 3079 "Index 'foo' is not associated with any table.", 3080 schema.CreateIndex(idx).compile, dialect=testing.db.dialect 3081 ) 3082 assert_raises_message( 3083 exc.CompileError, 3084 "Index 'foo' is not associated with any table.", 3085 schema.CreateIndex(idx).compile 3086 ) 3087 3088 def test_no_warning_w_no_columns(self): 3089 idx = Index(name="foo") 3090 3091 assert_raises_message( 3092 exc.CompileError, 3093 "Index 'foo' is not associated with any table.", 3094 schema.CreateIndex(idx).compile, dialect=testing.db.dialect 3095 ) 3096 assert_raises_message( 3097 exc.CompileError, 3098 "Index 'foo' is not associated with any table.", 3099 schema.CreateIndex(idx).compile 3100 ) 3101 3102 def test_raise_clauseelement_not_a_column(self): 3103 m = MetaData() 3104 t2 = Table('t2', m, Column('x', Integer)) 3105 3106 class SomeClass(object): 3107 3108 def __clause_element__(self): 3109 return t2 3110 assert_raises_message( 3111 exc.ArgumentError, 3112 r"Element Table\('t2', .* is not a string name or column element", 3113 Index, "foo", SomeClass() 3114 ) 3115 3116 3117class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): 3118 3119 """Test Column() construction.""" 3120 3121 __dialect__ = 'default' 3122 3123 def columns(self): 3124 return [Column(Integer), 3125 Column('b', Integer), 3126 Column(Integer), 3127 Column('d', Integer), 3128 Column(Integer, name='e'), 3129 Column(type_=Integer), 3130 Column(Integer()), 3131 Column('h', Integer()), 3132 Column(type_=Integer())] 3133 3134 def test_basic(self): 3135 c = self.columns() 3136 3137 for i, v in ((0, 'a'), (2, 'c'), (5, 'f'), (6, 'g'), (8, 'i')): 3138 c[i].name = v 3139 c[i].key = v 3140 del i, v 3141 3142 tbl = Table('table', MetaData(), *c) 3143 3144 for i, col in enumerate(tbl.c): 3145 assert col.name == c[i].name 3146 3147 def test_name_none(self): 3148 3149 c = Column(Integer) 3150 assert_raises_message( 3151 exc.ArgumentError, 3152 "Column must be constructed with a non-blank name or assign a " 3153 "non-blank .name ", 3154 Table, 't', MetaData(), c) 3155 3156 def test_name_blank(self): 3157 3158 c = Column('', Integer) 3159 assert_raises_message( 3160 exc.ArgumentError, 3161 "Column must be constructed with a non-blank name or assign a " 3162 "non-blank .name ", 3163 Table, 't', MetaData(), c) 3164 3165 def test_dupe_column(self): 3166 c = Column('x', Integer) 3167 Table('t', MetaData(), c) 3168 3169 assert_raises_message( 3170 exc.ArgumentError, 3171 "Column object 'x' already assigned to Table 't'", 3172 Table, 'q', MetaData(), c) 3173 3174 def test_incomplete_key(self): 3175 c = Column(Integer) 3176 assert c.name is None 3177 assert c.key is None 3178 3179 c.name = 'named' 3180 Table('t', MetaData(), c) 3181 3182 assert c.name == 'named' 3183 assert c.name == c.key 3184 3185 def test_unique_index_flags_default_to_none(self): 3186 c = Column(Integer) 3187 eq_(c.unique, None) 3188 eq_(c.index, None) 3189 3190 c = Column('c', Integer, index=True) 3191 eq_(c.unique, None) 3192 eq_(c.index, True) 3193 3194 t = Table('t', MetaData(), c) 3195 eq_(list(t.indexes)[0].unique, False) 3196 3197 c = Column(Integer, unique=True) 3198 eq_(c.unique, True) 3199 eq_(c.index, None) 3200 3201 c = Column('c', Integer, index=True, unique=True) 3202 eq_(c.unique, True) 3203 eq_(c.index, True) 3204 3205 t = Table('t', MetaData(), c) 3206 eq_(list(t.indexes)[0].unique, True) 3207 3208 def test_bogus(self): 3209 assert_raises(exc.ArgumentError, Column, 'foo', name='bar') 3210 assert_raises(exc.ArgumentError, Column, 'foo', Integer, 3211 type_=Integer()) 3212 3213 def test_custom_subclass_proxy(self): 3214 """test proxy generation of a Column subclass, can be compiled.""" 3215 3216 from sqlalchemy.schema import Column 3217 from sqlalchemy.ext.compiler import compiles 3218 from sqlalchemy.sql import select 3219 3220 class MyColumn(Column): 3221 3222 def _constructor(self, name, type, **kw): 3223 kw['name'] = name 3224 return MyColumn(type, **kw) 3225 3226 def __init__(self, type, **kw): 3227 Column.__init__(self, type, **kw) 3228 3229 def my_goofy_thing(self): 3230 return "hi" 3231 3232 @compiles(MyColumn) 3233 def goofy(element, compiler, **kw): 3234 s = compiler.visit_column(element, **kw) 3235 return s + "-" 3236 3237 id = MyColumn(Integer, primary_key=True) 3238 id.name = 'id' 3239 name = MyColumn(String) 3240 name.name = 'name' 3241 t1 = Table('foo', MetaData(), 3242 id, 3243 name 3244 ) 3245 3246 # goofy thing 3247 eq_(t1.c.name.my_goofy_thing(), "hi") 3248 3249 # create proxy 3250 s = select([t1.select().alias()]) 3251 3252 # proxy has goofy thing 3253 eq_(s.c.name.my_goofy_thing(), "hi") 3254 3255 # compile works 3256 self.assert_compile( 3257 select([t1.select().alias()]), 3258 "SELECT anon_1.id-, anon_1.name- FROM " 3259 "(SELECT foo.id- AS id, foo.name- AS name " 3260 "FROM foo) AS anon_1", 3261 ) 3262 3263 def test_custom_subclass_proxy_typeerror(self): 3264 from sqlalchemy.schema import Column 3265 from sqlalchemy.sql import select 3266 3267 class MyColumn(Column): 3268 3269 def __init__(self, type, **kw): 3270 Column.__init__(self, type, **kw) 3271 3272 id = MyColumn(Integer, primary_key=True) 3273 id.name = 'id' 3274 name = MyColumn(String) 3275 name.name = 'name' 3276 t1 = Table('foo', MetaData(), 3277 id, 3278 name 3279 ) 3280 assert_raises_message( 3281 TypeError, 3282 "Could not create a copy of this <class " 3283 "'test.sql.test_metadata..*MyColumn'> " 3284 "object. Ensure the class includes a _constructor()", 3285 getattr, select([t1.select().alias()]), 'c' 3286 ) 3287 3288 def test_custom_create(self): 3289 from sqlalchemy.ext.compiler import compiles, deregister 3290 3291 @compiles(schema.CreateColumn) 3292 def compile(element, compiler, **kw): 3293 column = element.element 3294 3295 if "special" not in column.info: 3296 return compiler.visit_create_column(element, **kw) 3297 3298 text = "%s SPECIAL DIRECTIVE %s" % ( 3299 column.name, 3300 compiler.type_compiler.process(column.type) 3301 ) 3302 default = compiler.get_column_default_string(column) 3303 if default is not None: 3304 text += " DEFAULT " + default 3305 3306 if not column.nullable: 3307 text += " NOT NULL" 3308 3309 if column.constraints: 3310 text += " ".join( 3311 compiler.process(const) 3312 for const in column.constraints) 3313 return text 3314 3315 t = Table( 3316 'mytable', MetaData(), 3317 Column('x', Integer, info={ 3318 "special": True}, primary_key=True), 3319 Column('y', String(50)), 3320 Column('z', String(20), info={ 3321 "special": True})) 3322 3323 self.assert_compile( 3324 schema.CreateTable(t), 3325 "CREATE TABLE mytable (x SPECIAL DIRECTIVE INTEGER " 3326 "NOT NULL, y VARCHAR(50), " 3327 "z SPECIAL DIRECTIVE VARCHAR(20), PRIMARY KEY (x))" 3328 ) 3329 3330 deregister(schema.CreateColumn) 3331 3332 3333class ColumnDefaultsTest(fixtures.TestBase): 3334 3335 """test assignment of default fixures to columns""" 3336 3337 def _fixture(self, *arg, **kw): 3338 return Column('x', Integer, *arg, **kw) 3339 3340 def test_server_default_positional(self): 3341 target = schema.DefaultClause('y') 3342 c = self._fixture(target) 3343 assert c.server_default is target 3344 assert target.column is c 3345 3346 def test_onupdate_default_not_server_default_one(self): 3347 target1 = schema.DefaultClause('y') 3348 target2 = schema.DefaultClause('z') 3349 3350 c = self._fixture(server_default=target1, server_onupdate=target2) 3351 eq_(c.server_default.arg, 'y') 3352 eq_(c.server_onupdate.arg, 'z') 3353 3354 def test_onupdate_default_not_server_default_two(self): 3355 target1 = schema.DefaultClause('y', for_update=True) 3356 target2 = schema.DefaultClause('z', for_update=True) 3357 3358 c = self._fixture(server_default=target1, server_onupdate=target2) 3359 eq_(c.server_default.arg, 'y') 3360 eq_(c.server_onupdate.arg, 'z') 3361 3362 def test_onupdate_default_not_server_default_three(self): 3363 target1 = schema.DefaultClause('y', for_update=False) 3364 target2 = schema.DefaultClause('z', for_update=True) 3365 3366 c = self._fixture(target1, target2) 3367 eq_(c.server_default.arg, 'y') 3368 eq_(c.server_onupdate.arg, 'z') 3369 3370 def test_onupdate_default_not_server_default_four(self): 3371 target1 = schema.DefaultClause('y', for_update=False) 3372 3373 c = self._fixture(server_onupdate=target1) 3374 is_(c.server_default, None) 3375 eq_(c.server_onupdate.arg, 'y') 3376 3377 def test_server_default_keyword_as_schemaitem(self): 3378 target = schema.DefaultClause('y') 3379 c = self._fixture(server_default=target) 3380 assert c.server_default is target 3381 assert target.column is c 3382 3383 def test_server_default_keyword_as_clause(self): 3384 target = 'y' 3385 c = self._fixture(server_default=target) 3386 assert c.server_default.arg == target 3387 assert c.server_default.column is c 3388 3389 def test_server_default_onupdate_positional(self): 3390 target = schema.DefaultClause('y', for_update=True) 3391 c = self._fixture(target) 3392 assert c.server_onupdate is target 3393 assert target.column is c 3394 3395 def test_server_default_onupdate_keyword_as_schemaitem(self): 3396 target = schema.DefaultClause('y', for_update=True) 3397 c = self._fixture(server_onupdate=target) 3398 assert c.server_onupdate is target 3399 assert target.column is c 3400 3401 def test_server_default_onupdate_keyword_as_clause(self): 3402 target = 'y' 3403 c = self._fixture(server_onupdate=target) 3404 assert c.server_onupdate.arg == target 3405 assert c.server_onupdate.column is c 3406 3407 def test_column_default_positional(self): 3408 target = schema.ColumnDefault('y') 3409 c = self._fixture(target) 3410 assert c.default is target 3411 assert target.column is c 3412 3413 def test_column_default_keyword_as_schemaitem(self): 3414 target = schema.ColumnDefault('y') 3415 c = self._fixture(default=target) 3416 assert c.default is target 3417 assert target.column is c 3418 3419 def test_column_default_keyword_as_clause(self): 3420 target = 'y' 3421 c = self._fixture(default=target) 3422 assert c.default.arg == target 3423 assert c.default.column is c 3424 3425 def test_column_default_onupdate_positional(self): 3426 target = schema.ColumnDefault('y', for_update=True) 3427 c = self._fixture(target) 3428 assert c.onupdate is target 3429 assert target.column is c 3430 3431 def test_column_default_onupdate_keyword_as_schemaitem(self): 3432 target = schema.ColumnDefault('y', for_update=True) 3433 c = self._fixture(onupdate=target) 3434 assert c.onupdate is target 3435 assert target.column is c 3436 3437 def test_column_default_onupdate_keyword_as_clause(self): 3438 target = 'y' 3439 c = self._fixture(onupdate=target) 3440 assert c.onupdate.arg == target 3441 assert c.onupdate.column is c 3442 3443 3444class ColumnOptionsTest(fixtures.TestBase): 3445 3446 def test_default_generators(self): 3447 g1, g2 = Sequence('foo_id_seq'), ColumnDefault('f5') 3448 assert Column(String, default=g1).default is g1 3449 assert Column(String, onupdate=g1).onupdate is g1 3450 assert Column(String, default=g2).default is g2 3451 assert Column(String, onupdate=g2).onupdate is g2 3452 3453 def _null_type_error(self, col): 3454 t = Table('t', MetaData(), col) 3455 assert_raises_message( 3456 exc.CompileError, 3457 r"\(in table 't', column 'foo'\): Can't generate DDL for NullType", 3458 schema.CreateTable(t).compile 3459 ) 3460 3461 def _no_name_error(self, col): 3462 assert_raises_message( 3463 exc.ArgumentError, 3464 "Column must be constructed with a non-blank name or " 3465 "assign a non-blank .name", 3466 Table, 't', MetaData(), col 3467 ) 3468 3469 def _no_error(self, col): 3470 m = MetaData() 3471 b = Table('bar', m, Column('id', Integer)) 3472 t = Table('t', m, col) 3473 schema.CreateTable(t).compile() 3474 3475 def test_argument_signatures(self): 3476 self._no_name_error(Column()) 3477 self._null_type_error(Column("foo")) 3478 self._no_name_error(Column(default="foo")) 3479 3480 self._no_name_error(Column(Sequence("a"))) 3481 self._null_type_error(Column("foo", default="foo")) 3482 3483 self._null_type_error(Column("foo", Sequence("a"))) 3484 3485 self._no_name_error(Column(ForeignKey('bar.id'))) 3486 3487 self._no_error(Column("foo", ForeignKey('bar.id'))) 3488 3489 self._no_name_error(Column(ForeignKey('bar.id'), default="foo")) 3490 3491 self._no_name_error(Column(ForeignKey('bar.id'), Sequence("a"))) 3492 self._no_error(Column("foo", ForeignKey('bar.id'), default="foo")) 3493 self._no_error(Column("foo", ForeignKey('bar.id'), Sequence("a"))) 3494 3495 def test_column_info(self): 3496 3497 c1 = Column('foo', String, info={'x': 'y'}) 3498 c2 = Column('bar', String, info={}) 3499 c3 = Column('bat', String) 3500 assert c1.info == {'x': 'y'} 3501 assert c2.info == {} 3502 assert c3.info == {} 3503 3504 for c in (c1, c2, c3): 3505 c.info['bar'] = 'zip' 3506 assert c.info['bar'] == 'zip' 3507 3508 3509class CatchAllEventsTest(fixtures.RemovesEvents, fixtures.TestBase): 3510 3511 def test_all_events(self): 3512 canary = [] 3513 3514 def before_attach(obj, parent): 3515 canary.append("%s->%s" % (obj.__class__.__name__, 3516 parent.__class__.__name__)) 3517 3518 def after_attach(obj, parent): 3519 canary.append("%s->%s" % (obj.__class__.__name__, parent)) 3520 3521 self.event_listen( 3522 schema.SchemaItem, 3523 "before_parent_attach", 3524 before_attach) 3525 self.event_listen( 3526 schema.SchemaItem, 3527 "after_parent_attach", 3528 after_attach) 3529 3530 m = MetaData() 3531 Table('t1', m, 3532 Column('id', Integer, Sequence('foo_id'), primary_key=True), 3533 Column('bar', String, ForeignKey('t2.id')) 3534 ) 3535 Table('t2', m, 3536 Column('id', Integer, primary_key=True), 3537 ) 3538 3539 eq_( 3540 canary, 3541 ['Sequence->Column', 'Sequence->id', 'ForeignKey->Column', 3542 'ForeignKey->bar', 'Table->MetaData', 3543 'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t1', 3544 'Column->Table', 'Column->t1', 'Column->Table', 3545 'Column->t1', 'ForeignKeyConstraint->Table', 3546 'ForeignKeyConstraint->t1', 'Table->MetaData(bind=None)', 3547 'Table->MetaData', 'PrimaryKeyConstraint->Table', 3548 'PrimaryKeyConstraint->t2', 'Column->Table', 'Column->t2', 3549 'Table->MetaData(bind=None)'] 3550 ) 3551 3552 def test_events_per_constraint(self): 3553 canary = [] 3554 3555 def evt(target): 3556 def before_attach(obj, parent): 3557 canary.append("%s->%s" % (target.__name__, 3558 parent.__class__.__name__)) 3559 3560 def after_attach(obj, parent): 3561 assert hasattr(obj, 'name') # so we can change it 3562 canary.append("%s->%s" % (target.__name__, parent)) 3563 self.event_listen(target, "before_parent_attach", before_attach) 3564 self.event_listen(target, "after_parent_attach", after_attach) 3565 3566 for target in [ 3567 schema.ForeignKeyConstraint, schema.PrimaryKeyConstraint, 3568 schema.UniqueConstraint, 3569 schema.CheckConstraint, 3570 schema.Index 3571 ]: 3572 evt(target) 3573 3574 m = MetaData() 3575 Table('t1', m, 3576 Column('id', Integer, Sequence('foo_id'), primary_key=True), 3577 Column('bar', String, ForeignKey('t2.id'), index=True), 3578 Column('bat', Integer, unique=True), 3579 ) 3580 Table('t2', m, 3581 Column('id', Integer, primary_key=True), 3582 Column('bar', Integer), 3583 Column('bat', Integer), 3584 CheckConstraint("bar>5"), 3585 UniqueConstraint('bar', 'bat'), 3586 Index(None, 'bar', 'bat') 3587 ) 3588 eq_( 3589 canary, 3590 [ 3591 'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t1', 3592 'Index->Table', 'Index->t1', 3593 'ForeignKeyConstraint->Table', 'ForeignKeyConstraint->t1', 3594 'UniqueConstraint->Table', 'UniqueConstraint->t1', 3595 'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t2', 3596 'CheckConstraint->Table', 'CheckConstraint->t2', 3597 'UniqueConstraint->Table', 'UniqueConstraint->t2', 3598 'Index->Table', 'Index->t2' 3599 ] 3600 ) 3601 3602 3603class DialectKWArgTest(fixtures.TestBase): 3604 3605 @contextmanager 3606 def _fixture(self): 3607 from sqlalchemy.engine.default import DefaultDialect 3608 3609 class ParticipatingDialect(DefaultDialect): 3610 construct_arguments = [ 3611 (schema.Index, { 3612 "x": 5, 3613 "y": False, 3614 "z_one": None 3615 }), 3616 (schema.ForeignKeyConstraint, { 3617 "foobar": False 3618 }) 3619 ] 3620 3621 class ParticipatingDialect2(DefaultDialect): 3622 construct_arguments = [ 3623 (schema.Index, { 3624 "x": 9, 3625 "y": True, 3626 "pp": "default" 3627 }), 3628 (schema.Table, { 3629 "*": None 3630 }) 3631 ] 3632 3633 class NonParticipatingDialect(DefaultDialect): 3634 construct_arguments = None 3635 3636 def load(dialect_name): 3637 if dialect_name == "participating": 3638 return ParticipatingDialect 3639 elif dialect_name == "participating2": 3640 return ParticipatingDialect2 3641 elif dialect_name == "nonparticipating": 3642 return NonParticipatingDialect 3643 else: 3644 raise exc.NoSuchModuleError("no dialect %r" % dialect_name) 3645 with mock.patch("sqlalchemy.dialects.registry.load", load): 3646 yield 3647 3648 def teardown(self): 3649 Index._kw_registry.clear() 3650 3651 def test_participating(self): 3652 with self._fixture(): 3653 idx = Index('a', 'b', 'c', participating_y=True) 3654 eq_( 3655 idx.dialect_options, 3656 {"participating": {"x": 5, "y": True, "z_one": None}} 3657 ) 3658 eq_( 3659 idx.dialect_kwargs, 3660 { 3661 'participating_y': True, 3662 } 3663 ) 3664 3665 def test_nonparticipating(self): 3666 with self._fixture(): 3667 idx = Index( 3668 'a', 3669 'b', 3670 'c', 3671 nonparticipating_y=True, 3672 nonparticipating_q=5) 3673 eq_( 3674 idx.dialect_kwargs, 3675 { 3676 'nonparticipating_y': True, 3677 'nonparticipating_q': 5 3678 } 3679 ) 3680 3681 def test_bad_kwarg_raise(self): 3682 with self._fixture(): 3683 assert_raises_message( 3684 TypeError, 3685 "Additional arguments should be named " 3686 "<dialectname>_<argument>, got 'foobar'", 3687 Index, 'a', 'b', 'c', foobar=True 3688 ) 3689 3690 def test_unknown_dialect_warning(self): 3691 with self._fixture(): 3692 assert_raises_message( 3693 exc.SAWarning, 3694 "Can't validate argument 'unknown_y'; can't locate " 3695 "any SQLAlchemy dialect named 'unknown'", 3696 Index, 'a', 'b', 'c', unknown_y=True 3697 ) 3698 3699 def test_participating_bad_kw(self): 3700 with self._fixture(): 3701 assert_raises_message( 3702 exc.ArgumentError, 3703 "Argument 'participating_q_p_x' is not accepted by dialect " 3704 "'participating' on behalf of " 3705 "<class 'sqlalchemy.sql.schema.Index'>", 3706 Index, 'a', 'b', 'c', participating_q_p_x=8 3707 ) 3708 3709 def test_participating_unknown_schema_item(self): 3710 with self._fixture(): 3711 # the dialect doesn't include UniqueConstraint in 3712 # its registry at all. 3713 assert_raises_message( 3714 exc.ArgumentError, 3715 "Argument 'participating_q_p_x' is not accepted by dialect " 3716 "'participating' on behalf of " 3717 "<class 'sqlalchemy.sql.schema.UniqueConstraint'>", 3718 UniqueConstraint, 'a', 'b', participating_q_p_x=8 3719 ) 3720 3721 @testing.emits_warning("Can't validate") 3722 def test_unknown_dialect_warning_still_populates(self): 3723 with self._fixture(): 3724 idx = Index('a', 'b', 'c', unknown_y=True) 3725 eq_(idx.dialect_kwargs, {"unknown_y": True}) # still populates 3726 3727 @testing.emits_warning("Can't validate") 3728 def test_unknown_dialect_warning_still_populates_multiple(self): 3729 with self._fixture(): 3730 idx = Index('a', 'b', 'c', unknown_y=True, unknown_z=5, 3731 otherunknown_foo='bar', participating_y=8) 3732 eq_( 3733 idx.dialect_options, 3734 { 3735 "unknown": {'y': True, 'z': 5, '*': None}, 3736 "otherunknown": {'foo': 'bar', '*': None}, 3737 "participating": {'x': 5, 'y': 8, 'z_one': None} 3738 } 3739 ) 3740 eq_(idx.dialect_kwargs, 3741 {'unknown_z': 5, 'participating_y': 8, 3742 'unknown_y': True, 3743 'otherunknown_foo': 'bar'} 3744 ) # still populates 3745 3746 def test_runs_safekwarg(self): 3747 3748 with mock.patch("sqlalchemy.util.safe_kwarg", 3749 lambda arg: "goofy_%s" % arg): 3750 with self._fixture(): 3751 idx = Index('a', 'b') 3752 idx.kwargs[util.u('participating_x')] = 7 3753 3754 eq_( 3755 list(idx.dialect_kwargs), 3756 ['goofy_participating_x'] 3757 ) 3758 3759 def test_combined(self): 3760 with self._fixture(): 3761 idx = Index('a', 'b', 'c', participating_x=7, 3762 nonparticipating_y=True) 3763 3764 eq_( 3765 idx.dialect_options, 3766 { 3767 'participating': {'y': False, 'x': 7, 'z_one': None}, 3768 'nonparticipating': {'y': True, '*': None} 3769 } 3770 ) 3771 eq_( 3772 idx.dialect_kwargs, 3773 { 3774 'participating_x': 7, 3775 'nonparticipating_y': True, 3776 } 3777 ) 3778 3779 def test_multiple_participating(self): 3780 with self._fixture(): 3781 idx = Index('a', 'b', 'c', 3782 participating_x=7, 3783 participating2_x=15, 3784 participating2_y="lazy" 3785 ) 3786 eq_( 3787 idx.dialect_options, 3788 { 3789 "participating": {'x': 7, 'y': False, 'z_one': None}, 3790 "participating2": {'x': 15, 'y': 'lazy', 'pp': 'default'}, 3791 } 3792 ) 3793 eq_( 3794 idx.dialect_kwargs, 3795 { 3796 'participating_x': 7, 3797 'participating2_x': 15, 3798 'participating2_y': 'lazy' 3799 } 3800 ) 3801 3802 def test_foreign_key_propagate(self): 3803 with self._fixture(): 3804 m = MetaData() 3805 fk = ForeignKey('t2.id', participating_foobar=True) 3806 t = Table('t', m, Column('id', Integer, fk)) 3807 fkc = [ 3808 c for c in t.constraints if isinstance( 3809 c, 3810 ForeignKeyConstraint)][0] 3811 eq_( 3812 fkc.dialect_kwargs, 3813 {'participating_foobar': True} 3814 ) 3815 3816 def test_foreign_key_propagate_exceptions_delayed(self): 3817 with self._fixture(): 3818 m = MetaData() 3819 fk = ForeignKey('t2.id', participating_fake=True) 3820 c1 = Column('id', Integer, fk) 3821 assert_raises_message( 3822 exc.ArgumentError, 3823 "Argument 'participating_fake' is not accepted by " 3824 "dialect 'participating' on behalf of " 3825 "<class 'sqlalchemy.sql.schema.ForeignKeyConstraint'>", 3826 Table, 't', m, c1 3827 ) 3828 3829 def test_wildcard(self): 3830 with self._fixture(): 3831 m = MetaData() 3832 t = Table('x', m, Column('x', Integer), 3833 participating2_xyz='foo', 3834 participating2_engine='InnoDB', 3835 ) 3836 eq_( 3837 t.dialect_kwargs, 3838 { 3839 'participating2_xyz': 'foo', 3840 'participating2_engine': 'InnoDB' 3841 } 3842 ) 3843 3844 def test_uninit_wildcard(self): 3845 with self._fixture(): 3846 m = MetaData() 3847 t = Table('x', m, Column('x', Integer)) 3848 eq_( 3849 t.dialect_options['participating2'], {'*': None} 3850 ) 3851 eq_( 3852 t.dialect_kwargs, {} 3853 ) 3854 3855 def test_not_contains_wildcard(self): 3856 with self._fixture(): 3857 m = MetaData() 3858 t = Table('x', m, Column('x', Integer)) 3859 assert 'foobar' not in t.dialect_options['participating2'] 3860 3861 def test_contains_wildcard(self): 3862 with self._fixture(): 3863 m = MetaData() 3864 t = Table('x', m, Column('x', Integer), participating2_foobar=5) 3865 assert 'foobar' in t.dialect_options['participating2'] 3866 3867 def test_update(self): 3868 with self._fixture(): 3869 idx = Index('a', 'b', 'c', participating_x=20) 3870 eq_(idx.dialect_kwargs, { 3871 "participating_x": 20, 3872 }) 3873 idx._validate_dialect_kwargs({ 3874 "participating_x": 25, 3875 "participating_z_one": "default"}) 3876 eq_(idx.dialect_options, { 3877 "participating": {"x": 25, "y": False, "z_one": "default"} 3878 }) 3879 eq_(idx.dialect_kwargs, { 3880 "participating_x": 25, 3881 'participating_z_one': "default" 3882 }) 3883 3884 idx._validate_dialect_kwargs({ 3885 "participating_x": 25, 3886 "participating_z_one": "default"}) 3887 3888 eq_(idx.dialect_options, { 3889 "participating": {"x": 25, "y": False, "z_one": "default"} 3890 }) 3891 eq_(idx.dialect_kwargs, { 3892 "participating_x": 25, 3893 'participating_z_one': "default" 3894 }) 3895 3896 idx._validate_dialect_kwargs({ 3897 "participating_y": True, 3898 'participating2_y': "p2y"}) 3899 eq_(idx.dialect_options, { 3900 "participating": {"x": 25, "y": True, "z_one": "default"}, 3901 "participating2": {"y": "p2y", "pp": "default", "x": 9} 3902 }) 3903 eq_(idx.dialect_kwargs, { 3904 "participating_x": 25, 3905 "participating_y": True, 3906 'participating2_y': "p2y", 3907 "participating_z_one": "default"}) 3908 3909 def test_key_error_kwargs_no_dialect(self): 3910 with self._fixture(): 3911 idx = Index('a', 'b', 'c') 3912 assert_raises( 3913 KeyError, 3914 idx.kwargs.__getitem__, 'foo_bar' 3915 ) 3916 3917 def test_key_error_kwargs_no_underscore(self): 3918 with self._fixture(): 3919 idx = Index('a', 'b', 'c') 3920 assert_raises( 3921 KeyError, 3922 idx.kwargs.__getitem__, 'foobar' 3923 ) 3924 3925 def test_key_error_kwargs_no_argument(self): 3926 with self._fixture(): 3927 idx = Index('a', 'b', 'c') 3928 assert_raises( 3929 KeyError, 3930 idx.kwargs.__getitem__, 'participating_asdmfq34098' 3931 ) 3932 3933 assert_raises( 3934 KeyError, 3935 idx.kwargs.__getitem__, 'nonparticipating_asdmfq34098' 3936 ) 3937 3938 def test_key_error_dialect_options(self): 3939 with self._fixture(): 3940 idx = Index('a', 'b', 'c') 3941 assert_raises( 3942 KeyError, 3943 idx.dialect_options['participating'].__getitem__, 'asdfaso890' 3944 ) 3945 3946 assert_raises( 3947 KeyError, 3948 idx.dialect_options['nonparticipating'].__getitem__, 3949 'asdfaso890') 3950 3951 def test_ad_hoc_participating_via_opt(self): 3952 with self._fixture(): 3953 idx = Index('a', 'b', 'c') 3954 idx.dialect_options['participating']['foobar'] = 5 3955 3956 eq_(idx.dialect_options['participating']['foobar'], 5) 3957 eq_(idx.kwargs['participating_foobar'], 5) 3958 3959 def test_ad_hoc_nonparticipating_via_opt(self): 3960 with self._fixture(): 3961 idx = Index('a', 'b', 'c') 3962 idx.dialect_options['nonparticipating']['foobar'] = 5 3963 3964 eq_(idx.dialect_options['nonparticipating']['foobar'], 5) 3965 eq_(idx.kwargs['nonparticipating_foobar'], 5) 3966 3967 def test_ad_hoc_participating_via_kwargs(self): 3968 with self._fixture(): 3969 idx = Index('a', 'b', 'c') 3970 idx.kwargs['participating_foobar'] = 5 3971 3972 eq_(idx.dialect_options['participating']['foobar'], 5) 3973 eq_(idx.kwargs['participating_foobar'], 5) 3974 3975 def test_ad_hoc_nonparticipating_via_kwargs(self): 3976 with self._fixture(): 3977 idx = Index('a', 'b', 'c') 3978 idx.kwargs['nonparticipating_foobar'] = 5 3979 3980 eq_(idx.dialect_options['nonparticipating']['foobar'], 5) 3981 eq_(idx.kwargs['nonparticipating_foobar'], 5) 3982 3983 def test_ad_hoc_via_kwargs_invalid_key(self): 3984 with self._fixture(): 3985 idx = Index('a', 'b', 'c') 3986 assert_raises_message( 3987 exc.ArgumentError, 3988 "Keys must be of the form <dialectname>_<argname>", 3989 idx.kwargs.__setitem__, "foobar", 5 3990 ) 3991 3992 def test_ad_hoc_via_kwargs_invalid_dialect(self): 3993 with self._fixture(): 3994 idx = Index('a', 'b', 'c') 3995 assert_raises_message( 3996 exc.ArgumentError, 3997 "no dialect 'nonexistent'", 3998 idx.kwargs.__setitem__, "nonexistent_foobar", 5 3999 ) 4000 4001 def test_add_new_arguments_participating(self): 4002 with self._fixture(): 4003 Index.argument_for("participating", "xyzqpr", False) 4004 4005 idx = Index('a', 'b', 'c', participating_xyzqpr=True) 4006 4007 eq_(idx.kwargs['participating_xyzqpr'], True) 4008 4009 idx = Index('a', 'b', 'c') 4010 eq_(idx.dialect_options['participating']['xyzqpr'], False) 4011 4012 def test_add_new_arguments_participating_no_existing(self): 4013 with self._fixture(): 4014 PrimaryKeyConstraint.argument_for("participating", "xyzqpr", False) 4015 4016 pk = PrimaryKeyConstraint('a', 'b', 'c', participating_xyzqpr=True) 4017 4018 eq_(pk.kwargs['participating_xyzqpr'], True) 4019 4020 pk = PrimaryKeyConstraint('a', 'b', 'c') 4021 eq_(pk.dialect_options['participating']['xyzqpr'], False) 4022 4023 def test_add_new_arguments_nonparticipating(self): 4024 with self._fixture(): 4025 assert_raises_message( 4026 exc.ArgumentError, 4027 "Dialect 'nonparticipating' does have keyword-argument " 4028 "validation and defaults enabled configured", 4029 Index.argument_for, "nonparticipating", "xyzqpr", False 4030 ) 4031 4032 def test_add_new_arguments_invalid_dialect(self): 4033 with self._fixture(): 4034 assert_raises_message( 4035 exc.ArgumentError, 4036 "no dialect 'nonexistent'", 4037 Index.argument_for, "nonexistent", "foobar", 5 4038 ) 4039 4040 4041class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): 4042 __dialect__ = 'default' 4043 4044 def _fixture(self, naming_convention, table_schema=None): 4045 m1 = MetaData(naming_convention=naming_convention) 4046 4047 u1 = Table('user', m1, 4048 Column('id', Integer, primary_key=True), 4049 Column('version', Integer, primary_key=True), 4050 Column('data', String(30)), 4051 schema=table_schema 4052 ) 4053 4054 return u1 4055 4056 def test_uq_name(self): 4057 u1 = self._fixture(naming_convention={ 4058 "uq": "uq_%(table_name)s_%(column_0_name)s" 4059 }) 4060 uq = UniqueConstraint(u1.c.data) 4061 eq_(uq.name, "uq_user_data") 4062 4063 def test_ck_name_required(self): 4064 u1 = self._fixture(naming_convention={ 4065 "ck": "ck_%(table_name)s_%(constraint_name)s" 4066 }) 4067 ck = CheckConstraint(u1.c.data == 'x', name='mycheck') 4068 eq_(ck.name, "ck_user_mycheck") 4069 4070 assert_raises_message( 4071 exc.InvalidRequestError, 4072 r"Naming convention including %\(constraint_name\)s token " 4073 "requires that constraint is explicitly named.", 4074 CheckConstraint, u1.c.data == 'x' 4075 ) 4076 4077 def test_ck_name_deferred_required(self): 4078 u1 = self._fixture(naming_convention={ 4079 "ck": "ck_%(table_name)s_%(constraint_name)s" 4080 }) 4081 ck = CheckConstraint(u1.c.data == 'x', name=elements._defer_name(None)) 4082 4083 assert_raises_message( 4084 exc.InvalidRequestError, 4085 r"Naming convention including %\(constraint_name\)s token " 4086 "requires that constraint is explicitly named.", 4087 schema.AddConstraint(ck).compile 4088 ) 4089 4090 def test_column_attached_ck_name(self): 4091 m = MetaData(naming_convention={ 4092 "ck": "ck_%(table_name)s_%(constraint_name)s" 4093 }) 4094 ck = CheckConstraint('x > 5', name='x1') 4095 Table('t', m, Column('x', ck)) 4096 eq_(ck.name, "ck_t_x1") 4097 4098 def test_table_attached_ck_name(self): 4099 m = MetaData(naming_convention={ 4100 "ck": "ck_%(table_name)s_%(constraint_name)s" 4101 }) 4102 ck = CheckConstraint('x > 5', name='x1') 4103 Table('t', m, Column('x', Integer), ck) 4104 eq_(ck.name, "ck_t_x1") 4105 4106 def test_uq_name_already_conv(self): 4107 m = MetaData(naming_convention={ 4108 "uq": "uq_%(constraint_name)s_%(column_0_name)s" 4109 }) 4110 4111 t = Table('mytable', m) 4112 uq = UniqueConstraint(name=naming.conv('my_special_key')) 4113 4114 t.append_constraint(uq) 4115 eq_(uq.name, "my_special_key") 4116 4117 def test_fk_name_schema(self): 4118 u1 = self._fixture(naming_convention={ 4119 "fk": "fk_%(table_name)s_%(column_0_name)s_" 4120 "%(referred_table_name)s_%(referred_column_0_name)s" 4121 }, table_schema="foo") 4122 m1 = u1.metadata 4123 a1 = Table('address', m1, 4124 Column('id', Integer, primary_key=True), 4125 Column('user_id', Integer), 4126 Column('user_version_id', Integer) 4127 ) 4128 fk = ForeignKeyConstraint(['user_id', 'user_version_id'], 4129 ['foo.user.id', 'foo.user.version']) 4130 a1.append_constraint(fk) 4131 eq_(fk.name, "fk_address_user_id_user_id") 4132 4133 def test_fk_attrs(self): 4134 u1 = self._fixture(naming_convention={ 4135 "fk": "fk_%(table_name)s_%(column_0_name)s_" 4136 "%(referred_table_name)s_%(referred_column_0_name)s" 4137 }) 4138 m1 = u1.metadata 4139 a1 = Table('address', m1, 4140 Column('id', Integer, primary_key=True), 4141 Column('user_id', Integer), 4142 Column('user_version_id', Integer) 4143 ) 4144 fk = ForeignKeyConstraint(['user_id', 'user_version_id'], 4145 ['user.id', 'user.version']) 4146 a1.append_constraint(fk) 4147 eq_(fk.name, "fk_address_user_id_user_id") 4148 4149 def test_custom(self): 4150 def key_hash(const, table): 4151 return "HASH_%s" % table.name 4152 4153 u1 = self._fixture(naming_convention={ 4154 "fk": "fk_%(table_name)s_%(key_hash)s", 4155 "key_hash": key_hash 4156 }) 4157 m1 = u1.metadata 4158 a1 = Table('address', m1, 4159 Column('id', Integer, primary_key=True), 4160 Column('user_id', Integer), 4161 Column('user_version_id', Integer) 4162 ) 4163 fk = ForeignKeyConstraint(['user_id', 'user_version_id'], 4164 ['user.id', 'user.version']) 4165 a1.append_constraint(fk) 4166 eq_(fk.name, "fk_address_HASH_address") 4167 4168 def test_schematype_ck_name_boolean(self): 4169 m1 = MetaData(naming_convention={ 4170 "ck": "ck_%(table_name)s_%(constraint_name)s"}) 4171 4172 u1 = Table('user', m1, 4173 Column('x', Boolean(name='foo')) 4174 ) 4175 # constraint is not hit 4176 eq_( 4177 [c for c in u1.constraints 4178 if isinstance(c, CheckConstraint)][0].name, "foo" 4179 ) 4180 # but is hit at compile time 4181 self.assert_compile( 4182 schema.CreateTable(u1), 4183 'CREATE TABLE "user" (' 4184 "x BOOLEAN, " 4185 "CONSTRAINT ck_user_foo CHECK (x IN (0, 1))" 4186 ")" 4187 ) 4188 4189 def test_schematype_ck_name_boolean_not_on_name(self): 4190 m1 = MetaData(naming_convention={ 4191 "ck": "ck_%(table_name)s_%(column_0_name)s"}) 4192 4193 u1 = Table('user', m1, 4194 Column('x', Boolean()) 4195 ) 4196 # constraint is not hit 4197 eq_( 4198 [c for c in u1.constraints 4199 if isinstance(c, CheckConstraint)][0].name, "_unnamed_" 4200 ) 4201 # but is hit at compile time 4202 self.assert_compile( 4203 schema.CreateTable(u1), 4204 'CREATE TABLE "user" (' 4205 "x BOOLEAN, " 4206 "CONSTRAINT ck_user_x CHECK (x IN (0, 1))" 4207 ")" 4208 ) 4209 4210 def test_schematype_ck_name_enum(self): 4211 m1 = MetaData(naming_convention={ 4212 "ck": "ck_%(table_name)s_%(constraint_name)s"}) 4213 4214 u1 = Table('user', m1, 4215 Column('x', Enum('a', 'b', name='foo')) 4216 ) 4217 eq_( 4218 [c for c in u1.constraints 4219 if isinstance(c, CheckConstraint)][0].name, "foo" 4220 ) 4221 # but is hit at compile time 4222 self.assert_compile( 4223 schema.CreateTable(u1), 4224 'CREATE TABLE "user" (' 4225 "x VARCHAR(1), " 4226 "CONSTRAINT ck_user_foo CHECK (x IN ('a', 'b'))" 4227 ")" 4228 ) 4229 4230 def test_schematype_ck_name_propagate_conv(self): 4231 m1 = MetaData(naming_convention={ 4232 "ck": "ck_%(table_name)s_%(constraint_name)s"}) 4233 4234 u1 = Table('user', m1, 4235 Column('x', Enum('a', 'b', name=naming.conv('foo'))) 4236 ) 4237 eq_( 4238 [c for c in u1.constraints 4239 if isinstance(c, CheckConstraint)][0].name, "foo" 4240 ) 4241 # but is hit at compile time 4242 self.assert_compile( 4243 schema.CreateTable(u1), 4244 'CREATE TABLE "user" (' 4245 "x VARCHAR(1), " 4246 "CONSTRAINT foo CHECK (x IN ('a', 'b'))" 4247 ")" 4248 ) 4249 4250 def test_schematype_ck_name_boolean_no_name(self): 4251 m1 = MetaData(naming_convention={ 4252 "ck": "ck_%(table_name)s_%(constraint_name)s" 4253 }) 4254 4255 u1 = Table( 4256 'user', m1, 4257 Column('x', Boolean()) 4258 ) 4259 # constraint gets special _defer_none_name 4260 eq_( 4261 [c for c in u1.constraints 4262 if isinstance(c, CheckConstraint)][0].name, "_unnamed_" 4263 ) 4264 # no issue with native boolean 4265 self.assert_compile( 4266 schema.CreateTable(u1), 4267 'CREATE TABLE "user" (' 4268 "x BOOLEAN" 4269 ")", 4270 dialect='postgresql' 4271 ) 4272 4273 assert_raises_message( 4274 exc.InvalidRequestError, 4275 r"Naming convention including \%\(constraint_name\)s token " 4276 r"requires that constraint is explicitly named.", 4277 schema.CreateTable(u1).compile, dialect=default.DefaultDialect() 4278 ) 4279 4280 def test_schematype_no_ck_name_boolean_no_name(self): 4281 m1 = MetaData() # no naming convention 4282 4283 u1 = Table( 4284 'user', m1, 4285 Column('x', Boolean()) 4286 ) 4287 # constraint gets special _defer_none_name 4288 eq_( 4289 [c for c in u1.constraints 4290 if isinstance(c, CheckConstraint)][0].name, "_unnamed_" 4291 ) 4292 4293 self.assert_compile( 4294 schema.CreateTable(u1), 4295 'CREATE TABLE "user" (x BOOLEAN, CHECK (x IN (0, 1)))' 4296 ) 4297 4298 def test_ck_constraint_redundant_event(self): 4299 u1 = self._fixture(naming_convention={ 4300 "ck": "ck_%(table_name)s_%(constraint_name)s"}) 4301 4302 ck1 = CheckConstraint(u1.c.version > 3, name='foo') 4303 u1.append_constraint(ck1) 4304 u1.append_constraint(ck1) 4305 u1.append_constraint(ck1) 4306 4307 eq_(ck1.name, "ck_user_foo") 4308 4309 def test_pickle_metadata(self): 4310 m = MetaData(naming_convention={"pk": "%(table_name)s_pk"}) 4311 4312 m2 = pickle.loads(pickle.dumps(m)) 4313 4314 eq_(m2.naming_convention, {"pk": "%(table_name)s_pk"}) 4315 4316 t2a = Table('t2', m, Column('id', Integer, primary_key=True)) 4317 t2b = Table('t2', m2, Column('id', Integer, primary_key=True)) 4318 4319 eq_(t2a.primary_key.name, t2b.primary_key.name) 4320 eq_(t2b.primary_key.name, "t2_pk") 4321