1from sqlalchemy.testing import assert_raises 2from sqlalchemy.testing import assert_raises_message 3from sqlalchemy.testing import emits_warning 4 5import pickle 6from sqlalchemy import Integer, String, UniqueConstraint, \ 7 CheckConstraint, ForeignKey, MetaData, Sequence, \ 8 ForeignKeyConstraint, PrimaryKeyConstraint, ColumnDefault, Index, event,\ 9 events, Unicode, types as sqltypes, bindparam, \ 10 Table, Column, Boolean, Enum, func, text, BLANK_SCHEMA 11from sqlalchemy import schema, exc 12from sqlalchemy.sql import elements, naming 13import sqlalchemy as tsa 14from sqlalchemy.testing import fixtures 15from sqlalchemy import testing 16from sqlalchemy.testing import ComparesTables, AssertsCompiledSQL 17from sqlalchemy.testing import eq_, is_, mock 18from contextlib import contextmanager 19from sqlalchemy import util 20 21 22class MetaDataTest(fixtures.TestBase, ComparesTables): 23 24 def test_metadata_contains(self): 25 metadata = MetaData() 26 t1 = Table('t1', metadata, Column('x', Integer)) 27 t2 = Table('t2', metadata, Column('x', Integer), schema='foo') 28 t3 = Table('t2', MetaData(), Column('x', Integer)) 29 t4 = Table('t1', MetaData(), Column('x', Integer), schema='foo') 30 31 assert "t1" in metadata 32 assert "foo.t2" in metadata 33 assert "t2" not in metadata 34 assert "foo.t1" not in metadata 35 assert t1 in metadata 36 assert t2 in metadata 37 assert t3 not in metadata 38 assert t4 not in metadata 39 40 def test_uninitialized_column_copy(self): 41 for col in [ 42 Column('foo', String(), nullable=False), 43 Column('baz', String(), unique=True), 44 Column(Integer(), primary_key=True), 45 Column('bar', Integer(), Sequence('foo_seq'), primary_key=True, 46 key='bar'), 47 Column(Integer(), ForeignKey('bat.blah'), doc="this is a col"), 48 Column('bar', Integer(), ForeignKey('bat.blah'), primary_key=True, 49 key='bar'), 50 Column('bar', Integer(), info={'foo': 'bar'}), 51 ]: 52 c2 = col.copy() 53 for attr in ('name', 'type', 'nullable', 54 'primary_key', 'key', 'unique', 'info', 55 'doc'): 56 eq_(getattr(col, attr), getattr(c2, attr)) 57 eq_(len(col.foreign_keys), len(c2.foreign_keys)) 58 if col.default: 59 eq_(c2.default.name, 'foo_seq') 60 for a1, a2 in zip(col.foreign_keys, c2.foreign_keys): 61 assert a1 is not a2 62 eq_(a2._colspec, 'bat.blah') 63 64 def test_col_subclass_copy(self): 65 class MyColumn(schema.Column): 66 67 def __init__(self, *args, **kw): 68 self.widget = kw.pop('widget', None) 69 super(MyColumn, self).__init__(*args, **kw) 70 71 def copy(self, *arg, **kw): 72 c = super(MyColumn, self).copy(*arg, **kw) 73 c.widget = self.widget 74 return c 75 c1 = MyColumn('foo', Integer, widget='x') 76 c2 = c1.copy() 77 assert isinstance(c2, MyColumn) 78 eq_(c2.widget, 'x') 79 80 def test_uninitialized_column_copy_events(self): 81 msgs = [] 82 83 def write(c, t): 84 msgs.append("attach %s.%s" % (t.name, c.name)) 85 c1 = Column('foo', String()) 86 m = MetaData() 87 for i in range(3): 88 cx = c1.copy() 89 # as of 0.7, these events no longer copy. its expected 90 # that listeners will be re-established from the 91 # natural construction of things. 92 cx._on_table_attach(write) 93 Table('foo%d' % i, m, cx) 94 eq_(msgs, ['attach foo0.foo', 'attach foo1.foo', 'attach foo2.foo']) 95 96 def test_schema_collection_add(self): 97 metadata = MetaData() 98 99 Table('t1', metadata, Column('x', Integer), schema='foo') 100 Table('t2', metadata, Column('x', Integer), schema='bar') 101 Table('t3', metadata, Column('x', Integer)) 102 103 eq_(metadata._schemas, set(['foo', 'bar'])) 104 eq_(len(metadata.tables), 3) 105 106 def test_schema_collection_remove(self): 107 metadata = MetaData() 108 109 t1 = Table('t1', metadata, Column('x', Integer), schema='foo') 110 Table('t2', metadata, Column('x', Integer), schema='bar') 111 t3 = Table('t3', metadata, Column('x', Integer), schema='bar') 112 113 metadata.remove(t3) 114 eq_(metadata._schemas, set(['foo', 'bar'])) 115 eq_(len(metadata.tables), 2) 116 117 metadata.remove(t1) 118 eq_(metadata._schemas, set(['bar'])) 119 eq_(len(metadata.tables), 1) 120 121 def test_schema_collection_remove_all(self): 122 metadata = MetaData() 123 124 Table('t1', metadata, Column('x', Integer), schema='foo') 125 Table('t2', metadata, Column('x', Integer), schema='bar') 126 127 metadata.clear() 128 eq_(metadata._schemas, set()) 129 eq_(len(metadata.tables), 0) 130 131 def test_metadata_tables_immutable(self): 132 metadata = MetaData() 133 134 Table('t1', metadata, Column('x', Integer)) 135 assert 't1' in metadata.tables 136 137 assert_raises( 138 TypeError, 139 lambda: metadata.tables.pop('t1') 140 ) 141 142 @testing.provide_metadata 143 def test_dupe_tables(self): 144 metadata = self.metadata 145 Table('table1', metadata, 146 Column('col1', Integer, primary_key=True), 147 Column('col2', String(20))) 148 149 metadata.create_all() 150 Table('table1', metadata, autoload=True) 151 152 def go(): 153 Table('table1', metadata, 154 Column('col1', Integer, primary_key=True), 155 Column('col2', String(20))) 156 assert_raises_message( 157 tsa.exc.InvalidRequestError, 158 "Table 'table1' is already defined for this " 159 "MetaData instance. Specify 'extend_existing=True' " 160 "to redefine options and columns on an existing " 161 "Table object.", 162 go 163 ) 164 165 def test_fk_copy(self): 166 c1 = Column('foo', Integer) 167 c2 = Column('bar', Integer) 168 m = MetaData() 169 t1 = Table('t', m, c1, c2) 170 171 kw = dict(onupdate="X", 172 ondelete="Y", use_alter=True, name='f1', 173 deferrable="Z", initially="Q", link_to_name=True) 174 175 fk1 = ForeignKey(c1, **kw) 176 fk2 = ForeignKeyConstraint((c1,), (c2,), **kw) 177 178 t1.append_constraint(fk2) 179 fk1c = fk1.copy() 180 fk2c = fk2.copy() 181 182 for k in kw: 183 eq_(getattr(fk1c, k), kw[k]) 184 eq_(getattr(fk2c, k), kw[k]) 185 186 def test_check_constraint_copy(self): 187 r = lambda x: x 188 c = CheckConstraint("foo bar", 189 name='name', 190 initially=True, 191 deferrable=True, 192 _create_rule=r) 193 c2 = c.copy() 194 eq_(c2.name, 'name') 195 eq_(str(c2.sqltext), "foo bar") 196 eq_(c2.initially, True) 197 eq_(c2.deferrable, True) 198 assert c2._create_rule is r 199 200 def test_col_replace_w_constraint(self): 201 m = MetaData() 202 a = Table('a', m, Column('id', Integer, primary_key=True)) 203 204 aid = Column('a_id', ForeignKey('a.id')) 205 b = Table('b', m, aid) 206 b.append_column(aid) 207 208 assert b.c.a_id.references(a.c.id) 209 eq_(len(b.constraints), 2) 210 211 def test_fk_construct(self): 212 c1 = Column('foo', Integer) 213 c2 = Column('bar', Integer) 214 m = MetaData() 215 t1 = Table('t', m, c1, c2) 216 fk1 = ForeignKeyConstraint(('foo', ), ('bar', ), table=t1) 217 assert fk1 in t1.constraints 218 219 def test_fk_constraint_col_collection_w_table(self): 220 c1 = Column('foo', Integer) 221 c2 = Column('bar', Integer) 222 m = MetaData() 223 t1 = Table('t', m, c1, c2) 224 fk1 = ForeignKeyConstraint(('foo', ), ('bar', ), table=t1) 225 eq_(dict(fk1.columns), {"foo": c1}) 226 227 def test_fk_constraint_col_collection_no_table(self): 228 fk1 = ForeignKeyConstraint(('foo', 'bat'), ('bar', 'hoho')) 229 eq_(dict(fk1.columns), {}) 230 eq_(fk1.column_keys, ['foo', 'bat']) 231 eq_(fk1._col_description, 'foo, bat') 232 eq_(fk1._elements, {"foo": fk1.elements[0], "bat": fk1.elements[1]}) 233 234 def test_fk_constraint_col_collection_no_table_real_cols(self): 235 c1 = Column('foo', Integer) 236 c2 = Column('bar', Integer) 237 fk1 = ForeignKeyConstraint((c1, ), (c2, )) 238 eq_(dict(fk1.columns), {}) 239 eq_(fk1.column_keys, ['foo']) 240 eq_(fk1._col_description, 'foo') 241 eq_(fk1._elements, {"foo": fk1.elements[0]}) 242 243 def test_fk_constraint_col_collection_added_to_table(self): 244 c1 = Column('foo', Integer) 245 m = MetaData() 246 fk1 = ForeignKeyConstraint(('foo', ), ('bar', )) 247 Table('t', m, c1, fk1) 248 eq_(dict(fk1.columns), {"foo": c1}) 249 eq_(fk1._elements, {"foo": fk1.elements[0]}) 250 251 def test_fk_constraint_col_collection_via_fk(self): 252 fk = ForeignKey('bar') 253 c1 = Column('foo', Integer, fk) 254 m = MetaData() 255 t1 = Table('t', m, c1) 256 fk1 = fk.constraint 257 eq_(fk1.column_keys, ['foo']) 258 assert fk1 in t1.constraints 259 eq_(fk1.column_keys, ['foo']) 260 eq_(dict(fk1.columns), {"foo": c1}) 261 eq_(fk1._elements, {"foo": fk}) 262 263 def test_fk_no_such_parent_col_error(self): 264 meta = MetaData() 265 a = Table('a', meta, Column('a', Integer)) 266 Table('b', meta, Column('b', Integer)) 267 268 def go(): 269 a.append_constraint( 270 ForeignKeyConstraint(['x'], ['b.b']) 271 ) 272 assert_raises_message( 273 exc.ArgumentError, 274 "Can't create ForeignKeyConstraint on " 275 "table 'a': no column named 'x' is present.", 276 go 277 ) 278 279 def test_fk_given_non_col(self): 280 not_a_col = bindparam('x') 281 assert_raises_message( 282 exc.ArgumentError, 283 "String, Column, or Column-bound argument expected, got Bind", 284 ForeignKey, not_a_col 285 ) 286 287 def test_fk_given_non_col_clauseelem(self): 288 class Foo(object): 289 290 def __clause_element__(self): 291 return bindparam('x') 292 assert_raises_message( 293 exc.ArgumentError, 294 "String, Column, or Column-bound argument expected, got Bind", 295 ForeignKey, Foo() 296 ) 297 298 def test_fk_given_col_non_table(self): 299 t = Table('t', MetaData(), Column('x', Integer)) 300 xa = t.alias().c.x 301 assert_raises_message( 302 exc.ArgumentError, 303 "ForeignKey received Column not bound to a Table, got: .*Alias", 304 ForeignKey, xa 305 ) 306 307 def test_fk_given_col_non_table_clauseelem(self): 308 t = Table('t', MetaData(), Column('x', Integer)) 309 310 class Foo(object): 311 312 def __clause_element__(self): 313 return t.alias().c.x 314 315 assert_raises_message( 316 exc.ArgumentError, 317 "ForeignKey received Column not bound to a Table, got: .*Alias", 318 ForeignKey, Foo() 319 ) 320 321 def test_fk_no_such_target_col_error_upfront(self): 322 meta = MetaData() 323 a = Table('a', meta, Column('a', Integer)) 324 Table('b', meta, Column('b', Integer)) 325 326 a.append_constraint(ForeignKeyConstraint(['a'], ['b.x'])) 327 328 assert_raises_message( 329 exc.NoReferencedColumnError, 330 "Could not initialize target column for ForeignKey 'b.x' on " 331 "table 'a': table 'b' has no column named 'x'", 332 getattr, list(a.foreign_keys)[0], "column" 333 ) 334 335 def test_fk_no_such_target_col_error_delayed(self): 336 meta = MetaData() 337 a = Table('a', meta, Column('a', Integer)) 338 a.append_constraint( 339 ForeignKeyConstraint(['a'], ['b.x'])) 340 341 b = Table('b', meta, Column('b', Integer)) 342 343 assert_raises_message( 344 exc.NoReferencedColumnError, 345 "Could not initialize target column for ForeignKey 'b.x' on " 346 "table 'a': table 'b' has no column named 'x'", 347 getattr, list(a.foreign_keys)[0], "column" 348 ) 349 350 def test_pickle_metadata_sequence_restated(self): 351 m1 = MetaData() 352 Table('a', m1, 353 Column('id', Integer, primary_key=True), 354 Column('x', Integer, Sequence("x_seq"))) 355 356 m2 = pickle.loads(pickle.dumps(m1)) 357 358 s2 = Sequence("x_seq") 359 t2 = Table('a', m2, 360 Column('id', Integer, primary_key=True), 361 Column('x', Integer, s2), 362 extend_existing=True) 363 364 assert m2._sequences['x_seq'] is t2.c.x.default 365 assert m2._sequences['x_seq'] is s2 366 367 def test_sequence_restated_replaced(self): 368 """Test restatement of Sequence replaces.""" 369 370 m1 = MetaData() 371 s1 = Sequence("x_seq") 372 t = Table('a', m1, 373 Column('x', Integer, s1) 374 ) 375 assert m1._sequences['x_seq'] is s1 376 377 s2 = Sequence('x_seq') 378 Table('a', m1, 379 Column('x', Integer, s2), 380 extend_existing=True 381 ) 382 assert t.c.x.default is s2 383 assert m1._sequences['x_seq'] is s2 384 385 def test_sequence_attach_to_table(self): 386 m1 = MetaData() 387 s1 = Sequence("s") 388 t = Table('a', m1, Column('x', Integer, s1)) 389 assert s1.metadata is m1 390 391 def test_sequence_attach_to_existing_table(self): 392 m1 = MetaData() 393 s1 = Sequence("s") 394 t = Table('a', m1, Column('x', Integer)) 395 t.c.x._init_items(s1) 396 assert s1.metadata is m1 397 398 def test_pickle_metadata_sequence_implicit(self): 399 m1 = MetaData() 400 Table('a', m1, 401 Column('id', Integer, primary_key=True), 402 Column('x', Integer, Sequence("x_seq"))) 403 404 m2 = pickle.loads(pickle.dumps(m1)) 405 406 t2 = Table('a', m2, extend_existing=True) 407 408 eq_(m2._sequences, {'x_seq': t2.c.x.default}) 409 410 def test_pickle_metadata_schema(self): 411 m1 = MetaData() 412 Table('a', m1, 413 Column('id', Integer, primary_key=True), 414 Column('x', Integer, Sequence("x_seq")), 415 schema='y') 416 417 m2 = pickle.loads(pickle.dumps(m1)) 418 419 Table('a', m2, schema='y', 420 extend_existing=True) 421 422 eq_(m2._schemas, m1._schemas) 423 424 def test_metadata_schema_arg(self): 425 m1 = MetaData(schema='sch1') 426 m2 = MetaData(schema='sch1', quote_schema=True) 427 m3 = MetaData(schema='sch1', quote_schema=False) 428 m4 = MetaData() 429 430 for i, (name, metadata, schema, quote_schema, 431 exp_schema, exp_quote_schema) in enumerate([ 432 ('t1', m1, None, None, 'sch1', None), 433 ('t2', m1, 'sch2', None, 'sch2', None), 434 ('t3', m1, 'sch2', True, 'sch2', True), 435 ('t4', m1, 'sch1', None, 'sch1', None), 436 ('t5', m1, BLANK_SCHEMA, None, None, None), 437 ('t1', m2, None, None, 'sch1', True), 438 ('t2', m2, 'sch2', None, 'sch2', None), 439 ('t3', m2, 'sch2', True, 'sch2', True), 440 ('t4', m2, 'sch1', None, 'sch1', None), 441 ('t1', m3, None, None, 'sch1', False), 442 ('t2', m3, 'sch2', None, 'sch2', None), 443 ('t3', m3, 'sch2', True, 'sch2', True), 444 ('t4', m3, 'sch1', None, 'sch1', None), 445 ('t1', m4, None, None, None, None), 446 ('t2', m4, 'sch2', None, 'sch2', None), 447 ('t3', m4, 'sch2', True, 'sch2', True), 448 ('t4', m4, 'sch1', None, 'sch1', None), 449 ('t5', m4, BLANK_SCHEMA, None, None, None), 450 ]): 451 kw = {} 452 if schema is not None: 453 kw['schema'] = schema 454 if quote_schema is not None: 455 kw['quote_schema'] = quote_schema 456 t = Table(name, metadata, **kw) 457 eq_(t.schema, exp_schema, "test %d, table schema" % i) 458 eq_(t.schema.quote if t.schema is not None else None, 459 exp_quote_schema, 460 "test %d, table quote_schema" % i) 461 seq = Sequence(name, metadata=metadata, **kw) 462 eq_(seq.schema, exp_schema, "test %d, seq schema" % i) 463 eq_(seq.schema.quote if seq.schema is not None else None, 464 exp_quote_schema, 465 "test %d, seq quote_schema" % i) 466 467 def test_manual_dependencies(self): 468 meta = MetaData() 469 a = Table('a', meta, Column('foo', Integer)) 470 b = Table('b', meta, Column('foo', Integer)) 471 c = Table('c', meta, Column('foo', Integer)) 472 d = Table('d', meta, Column('foo', Integer)) 473 e = Table('e', meta, Column('foo', Integer)) 474 475 e.add_is_dependent_on(c) 476 a.add_is_dependent_on(b) 477 b.add_is_dependent_on(d) 478 e.add_is_dependent_on(b) 479 c.add_is_dependent_on(a) 480 eq_( 481 meta.sorted_tables, 482 [d, b, a, c, e] 483 ) 484 485 def test_deterministic_order(self): 486 meta = MetaData() 487 a = Table('a', meta, Column('foo', Integer)) 488 b = Table('b', meta, Column('foo', Integer)) 489 c = Table('c', meta, Column('foo', Integer)) 490 d = Table('d', meta, Column('foo', Integer)) 491 e = Table('e', meta, Column('foo', Integer)) 492 493 e.add_is_dependent_on(c) 494 a.add_is_dependent_on(b) 495 eq_( 496 meta.sorted_tables, 497 [b, c, d, a, e] 498 ) 499 500 def test_nonexistent(self): 501 assert_raises(tsa.exc.NoSuchTableError, Table, 502 'fake_table', 503 MetaData(testing.db), autoload=True) 504 505 def test_assorted_repr(self): 506 t1 = Table("foo", MetaData(), Column("x", Integer)) 507 i1 = Index("bar", t1.c.x) 508 ck = schema.CheckConstraint("x > y", name="someconstraint") 509 510 for const, exp in ( 511 (Sequence("my_seq"), 512 "Sequence('my_seq')"), 513 (Sequence("my_seq", start=5), 514 "Sequence('my_seq', start=5)"), 515 (Column("foo", Integer), 516 "Column('foo', Integer(), table=None)"), 517 (Table("bar", MetaData(), Column("x", String)), 518 "Table('bar', MetaData(bind=None), " 519 "Column('x', String(), table=<bar>), schema=None)"), 520 (schema.DefaultGenerator(for_update=True), 521 "DefaultGenerator(for_update=True)"), 522 (schema.Index("bar", "c"), "Index('bar', 'c')"), 523 (i1, "Index('bar', Column('x', Integer(), table=<foo>))"), 524 (schema.FetchedValue(), "FetchedValue()"), 525 (ck, 526 "CheckConstraint(" 527 "%s" 528 ", name='someconstraint')" % repr(ck.sqltext)), 529 ): 530 eq_( 531 repr(const), 532 exp 533 ) 534 535 536class ToMetaDataTest(fixtures.TestBase, ComparesTables): 537 538 @testing.requires.check_constraints 539 def test_copy(self): 540 from sqlalchemy.testing.schema import Table 541 meta = MetaData() 542 543 table = Table( 544 'mytable', 545 meta, 546 Column( 547 'myid', 548 Integer, 549 Sequence('foo_id_seq'), 550 primary_key=True), 551 Column( 552 'name', 553 String(40), 554 nullable=True), 555 Column( 556 'foo', 557 String(40), 558 nullable=False, 559 server_default='x', 560 server_onupdate='q'), 561 Column( 562 'bar', 563 String(40), 564 nullable=False, 565 default='y', 566 onupdate='z'), 567 Column( 568 'description', 569 String(30), 570 CheckConstraint("description='hi'")), 571 UniqueConstraint('name'), 572 test_needs_fk=True) 573 574 table2 = Table( 575 'othertable', 576 meta, 577 Column( 578 'id', 579 Integer, 580 Sequence('foo_seq'), 581 primary_key=True), 582 Column( 583 'myid', 584 Integer, 585 ForeignKey('mytable.myid'), 586 ), 587 test_needs_fk=True) 588 589 def test_to_metadata(): 590 meta2 = MetaData() 591 table_c = table.tometadata(meta2) 592 table2_c = table2.tometadata(meta2) 593 return (table_c, table2_c) 594 595 def test_pickle(): 596 meta.bind = testing.db 597 meta2 = pickle.loads(pickle.dumps(meta)) 598 assert meta2.bind is None 599 pickle.loads(pickle.dumps(meta2)) 600 return (meta2.tables['mytable'], meta2.tables['othertable']) 601 602 def test_pickle_via_reflect(): 603 # this is the most common use case, pickling the results of a 604 # database reflection 605 meta2 = MetaData(bind=testing.db) 606 t1 = Table('mytable', meta2, autoload=True) 607 Table('othertable', meta2, autoload=True) 608 meta3 = pickle.loads(pickle.dumps(meta2)) 609 assert meta3.bind is None 610 assert meta3.tables['mytable'] is not t1 611 612 return (meta3.tables['mytable'], meta3.tables['othertable']) 613 614 meta.create_all(testing.db) 615 try: 616 for test, has_constraints, reflect in \ 617 (test_to_metadata, True, False), \ 618 (test_pickle, True, False), \ 619 (test_pickle_via_reflect, False, True): 620 table_c, table2_c = test() 621 self.assert_tables_equal(table, table_c) 622 self.assert_tables_equal(table2, table2_c) 623 assert table is not table_c 624 assert table.primary_key is not table_c.primary_key 625 assert list(table2_c.c.myid.foreign_keys)[0].column \ 626 is table_c.c.myid 627 assert list(table2_c.c.myid.foreign_keys)[0].column \ 628 is not table.c.myid 629 assert 'x' in str(table_c.c.foo.server_default.arg) 630 if not reflect: 631 assert isinstance(table_c.c.myid.default, Sequence) 632 assert str(table_c.c.foo.server_onupdate.arg) == 'q' 633 assert str(table_c.c.bar.default.arg) == 'y' 634 assert getattr(table_c.c.bar.onupdate.arg, 'arg', 635 table_c.c.bar.onupdate.arg) == 'z' 636 assert isinstance(table2_c.c.id.default, Sequence) 637 638 # constraints don't get reflected for any dialect right 639 # now 640 641 if has_constraints: 642 for c in table_c.c.description.constraints: 643 if isinstance(c, CheckConstraint): 644 break 645 else: 646 assert False 647 assert str(c.sqltext) == "description='hi'" 648 for c in table_c.constraints: 649 if isinstance(c, UniqueConstraint): 650 break 651 else: 652 assert False 653 assert c.columns.contains_column(table_c.c.name) 654 assert not c.columns.contains_column(table.c.name) 655 656 finally: 657 meta.drop_all(testing.db) 658 659 def test_col_key_fk_parent(self): 660 # test #2643 661 m1 = MetaData() 662 a = Table('a', m1, Column('x', Integer)) 663 b = Table('b', m1, Column('x', Integer, ForeignKey('a.x'), key='y')) 664 assert b.c.y.references(a.c.x) 665 666 m2 = MetaData() 667 b2 = b.tometadata(m2) 668 a2 = a.tometadata(m2) 669 assert b2.c.y.references(a2.c.x) 670 671 def test_change_schema(self): 672 meta = MetaData() 673 674 table = Table('mytable', meta, 675 Column('myid', Integer, primary_key=True), 676 Column('name', String(40), nullable=True), 677 Column('description', String(30), 678 CheckConstraint("description='hi'")), 679 UniqueConstraint('name'), 680 ) 681 682 table2 = Table('othertable', meta, 683 Column('id', Integer, primary_key=True), 684 Column('myid', Integer, ForeignKey('mytable.myid')), 685 ) 686 687 meta2 = MetaData() 688 table_c = table.tometadata(meta2, schema='someschema') 689 table2_c = table2.tometadata(meta2, schema='someschema') 690 691 eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid 692 == table2_c.c.myid)) 693 eq_(str(table_c.join(table2_c).onclause), 694 'someschema.mytable.myid = someschema.othertable.myid') 695 696 def test_retain_table_schema(self): 697 meta = MetaData() 698 699 table = Table('mytable', meta, 700 Column('myid', Integer, primary_key=True), 701 Column('name', String(40), nullable=True), 702 Column('description', String(30), 703 CheckConstraint("description='hi'")), 704 UniqueConstraint('name'), 705 schema='myschema', 706 ) 707 708 table2 = Table( 709 'othertable', 710 meta, 711 Column( 712 'id', 713 Integer, 714 primary_key=True), 715 Column( 716 'myid', 717 Integer, 718 ForeignKey('myschema.mytable.myid')), 719 schema='myschema', 720 ) 721 722 meta2 = MetaData() 723 table_c = table.tometadata(meta2) 724 table2_c = table2.tometadata(meta2) 725 726 eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid 727 == table2_c.c.myid)) 728 eq_(str(table_c.join(table2_c).onclause), 729 'myschema.mytable.myid = myschema.othertable.myid') 730 731 def test_change_name_retain_metadata(self): 732 meta = MetaData() 733 734 table = Table('mytable', meta, 735 Column('myid', Integer, primary_key=True), 736 Column('name', String(40), nullable=True), 737 Column('description', String(30), 738 CheckConstraint("description='hi'")), 739 UniqueConstraint('name'), 740 schema='myschema', 741 ) 742 743 table2 = table.tometadata(table.metadata, name='newtable') 744 table3 = table.tometadata(table.metadata, schema='newschema', 745 name='newtable') 746 747 assert table.metadata is table2.metadata 748 assert table.metadata is table3.metadata 749 eq_((table.name, table2.name, table3.name), 750 ('mytable', 'newtable', 'newtable')) 751 eq_((table.key, table2.key, table3.key), 752 ('myschema.mytable', 'myschema.newtable', 'newschema.newtable')) 753 754 def test_change_name_change_metadata(self): 755 meta = MetaData() 756 meta2 = MetaData() 757 758 table = Table('mytable', meta, 759 Column('myid', Integer, primary_key=True), 760 Column('name', String(40), nullable=True), 761 Column('description', String(30), 762 CheckConstraint("description='hi'")), 763 UniqueConstraint('name'), 764 schema='myschema', 765 ) 766 767 table2 = table.tometadata(meta2, name='newtable') 768 769 assert table.metadata is not table2.metadata 770 eq_((table.name, table2.name), 771 ('mytable', 'newtable')) 772 eq_((table.key, table2.key), 773 ('myschema.mytable', 'myschema.newtable')) 774 775 def test_change_name_selfref_fk_moves(self): 776 meta = MetaData() 777 778 referenced = Table('ref', meta, 779 Column('id', Integer, primary_key=True), 780 ) 781 table = Table('mytable', meta, 782 Column('id', Integer, primary_key=True), 783 Column('parent_id', ForeignKey('mytable.id')), 784 Column('ref_id', ForeignKey('ref.id')) 785 ) 786 787 table2 = table.tometadata(table.metadata, name='newtable') 788 assert table.metadata is table2.metadata 789 assert table2.c.ref_id.references(referenced.c.id) 790 assert table2.c.parent_id.references(table2.c.id) 791 792 def test_change_name_selfref_fk_moves_w_schema(self): 793 meta = MetaData() 794 795 referenced = Table('ref', meta, 796 Column('id', Integer, primary_key=True), 797 ) 798 table = Table('mytable', meta, 799 Column('id', Integer, primary_key=True), 800 Column('parent_id', ForeignKey('mytable.id')), 801 Column('ref_id', ForeignKey('ref.id')) 802 ) 803 804 table2 = table.tometadata( 805 table.metadata, name='newtable', schema='newschema') 806 ref2 = referenced.tometadata(table.metadata, schema='newschema') 807 assert table.metadata is table2.metadata 808 assert table2.c.ref_id.references(ref2.c.id) 809 assert table2.c.parent_id.references(table2.c.id) 810 811 def _assert_fk(self, t2, schema, expected, referred_schema_fn=None): 812 m2 = MetaData() 813 existing_schema = t2.schema 814 if schema: 815 t2c = t2.tometadata(m2, schema=schema, 816 referred_schema_fn=referred_schema_fn) 817 eq_(t2c.schema, schema) 818 else: 819 t2c = t2.tometadata(m2, referred_schema_fn=referred_schema_fn) 820 eq_(t2c.schema, existing_schema) 821 eq_(list(t2c.c.y.foreign_keys)[0]._get_colspec(), expected) 822 823 def test_fk_has_schema_string_retain_schema(self): 824 m = MetaData() 825 826 t2 = Table('t2', m, Column('y', Integer, ForeignKey('q.t1.x'))) 827 self._assert_fk(t2, None, "q.t1.x") 828 829 Table('t1', m, Column('x', Integer), schema='q') 830 self._assert_fk(t2, None, "q.t1.x") 831 832 def test_fk_has_schema_string_new_schema(self): 833 m = MetaData() 834 835 t2 = Table('t2', m, Column('y', Integer, ForeignKey('q.t1.x'))) 836 self._assert_fk(t2, "z", "q.t1.x") 837 838 Table('t1', m, Column('x', Integer), schema='q') 839 self._assert_fk(t2, "z", "q.t1.x") 840 841 def test_fk_has_schema_col_retain_schema(self): 842 m = MetaData() 843 844 t1 = Table('t1', m, Column('x', Integer), schema='q') 845 t2 = Table('t2', m, Column('y', Integer, ForeignKey(t1.c.x))) 846 847 self._assert_fk(t2, "z", "q.t1.x") 848 849 def test_fk_has_schema_col_new_schema(self): 850 m = MetaData() 851 852 t1 = Table('t1', m, Column('x', Integer), schema='q') 853 t2 = Table('t2', m, Column('y', Integer, ForeignKey(t1.c.x))) 854 855 self._assert_fk(t2, "z", "q.t1.x") 856 857 def test_fk_and_referent_has_same_schema_string_retain_schema(self): 858 m = MetaData() 859 860 t2 = Table('t2', m, Column('y', Integer, 861 ForeignKey('q.t1.x')), schema="q") 862 863 self._assert_fk(t2, None, "q.t1.x") 864 865 Table('t1', m, Column('x', Integer), schema='q') 866 self._assert_fk(t2, None, "q.t1.x") 867 868 def test_fk_and_referent_has_same_schema_string_new_schema(self): 869 m = MetaData() 870 871 t2 = Table('t2', m, Column('y', Integer, 872 ForeignKey('q.t1.x')), schema="q") 873 874 self._assert_fk(t2, "z", "z.t1.x") 875 876 Table('t1', m, Column('x', Integer), schema='q') 877 self._assert_fk(t2, "z", "z.t1.x") 878 879 def test_fk_and_referent_has_same_schema_col_retain_schema(self): 880 m = MetaData() 881 882 t1 = Table('t1', m, Column('x', Integer), schema='q') 883 t2 = Table('t2', m, Column('y', Integer, 884 ForeignKey(t1.c.x)), schema='q') 885 self._assert_fk(t2, None, "q.t1.x") 886 887 def test_fk_and_referent_has_same_schema_col_new_schema(self): 888 m = MetaData() 889 890 t1 = Table('t1', m, Column('x', Integer), schema='q') 891 t2 = Table('t2', m, Column('y', Integer, 892 ForeignKey(t1.c.x)), schema='q') 893 self._assert_fk(t2, 'z', "z.t1.x") 894 895 def test_fk_and_referent_has_diff_schema_string_retain_schema(self): 896 m = MetaData() 897 898 t2 = Table('t2', m, Column('y', Integer, 899 ForeignKey('p.t1.x')), schema="q") 900 901 self._assert_fk(t2, None, "p.t1.x") 902 903 Table('t1', m, Column('x', Integer), schema='p') 904 self._assert_fk(t2, None, "p.t1.x") 905 906 def test_fk_and_referent_has_diff_schema_string_new_schema(self): 907 m = MetaData() 908 909 t2 = Table('t2', m, Column('y', Integer, 910 ForeignKey('p.t1.x')), schema="q") 911 912 self._assert_fk(t2, "z", "p.t1.x") 913 914 Table('t1', m, Column('x', Integer), schema='p') 915 self._assert_fk(t2, "z", "p.t1.x") 916 917 def test_fk_and_referent_has_diff_schema_col_retain_schema(self): 918 m = MetaData() 919 920 t1 = Table('t1', m, Column('x', Integer), schema='p') 921 t2 = Table('t2', m, Column('y', Integer, 922 ForeignKey(t1.c.x)), schema='q') 923 self._assert_fk(t2, None, "p.t1.x") 924 925 def test_fk_and_referent_has_diff_schema_col_new_schema(self): 926 m = MetaData() 927 928 t1 = Table('t1', m, Column('x', Integer), schema='p') 929 t2 = Table('t2', m, Column('y', Integer, 930 ForeignKey(t1.c.x)), schema='q') 931 self._assert_fk(t2, 'z', "p.t1.x") 932 933 def test_fk_custom_system(self): 934 m = MetaData() 935 t2 = Table('t2', m, Column('y', Integer, 936 ForeignKey('p.t1.x')), schema='q') 937 938 def ref_fn(table, to_schema, constraint, referred_schema): 939 assert table is t2 940 eq_(to_schema, "z") 941 eq_(referred_schema, "p") 942 return "h" 943 self._assert_fk(t2, 'z', "h.t1.x", referred_schema_fn=ref_fn) 944 945 def test_copy_info(self): 946 m = MetaData() 947 fk = ForeignKey('t2.id') 948 c = Column('c', Integer, fk) 949 ck = CheckConstraint('c > 5') 950 t = Table('t', m, c, ck) 951 952 m.info['minfo'] = True 953 fk.info['fkinfo'] = True 954 c.info['cinfo'] = True 955 ck.info['ckinfo'] = True 956 t.info['tinfo'] = True 957 t.primary_key.info['pkinfo'] = True 958 fkc = [const for const in t.constraints if 959 isinstance(const, ForeignKeyConstraint)][0] 960 fkc.info['fkcinfo'] = True 961 962 m2 = MetaData() 963 t2 = t.tometadata(m2) 964 965 m.info['minfo'] = False 966 fk.info['fkinfo'] = False 967 c.info['cinfo'] = False 968 ck.info['ckinfo'] = False 969 t.primary_key.info['pkinfo'] = False 970 fkc.info['fkcinfo'] = False 971 972 eq_(m2.info, {}) 973 eq_(t2.info, {"tinfo": True}) 974 eq_(t2.c.c.info, {"cinfo": True}) 975 eq_(list(t2.c.c.foreign_keys)[0].info, {"fkinfo": True}) 976 eq_(t2.primary_key.info, {"pkinfo": True}) 977 978 fkc2 = [const for const in t2.constraints 979 if isinstance(const, ForeignKeyConstraint)][0] 980 eq_(fkc2.info, {"fkcinfo": True}) 981 982 ck2 = [const for const in 983 t2.constraints if isinstance(const, CheckConstraint)][0] 984 eq_(ck2.info, {"ckinfo": True}) 985 986 def test_dialect_kwargs(self): 987 meta = MetaData() 988 989 table = Table('mytable', meta, 990 Column('myid', Integer, primary_key=True), 991 mysql_engine='InnoDB', 992 ) 993 994 meta2 = MetaData() 995 table_c = table.tometadata(meta2) 996 997 eq_(table.kwargs, {"mysql_engine": "InnoDB"}) 998 999 eq_(table.kwargs, table_c.kwargs) 1000 1001 def test_indexes(self): 1002 meta = MetaData() 1003 1004 table = Table('mytable', meta, 1005 Column('id', Integer, primary_key=True), 1006 Column('data1', Integer, index=True), 1007 Column('data2', Integer), 1008 ) 1009 Index('multi', table.c.data1, table.c.data2), 1010 1011 meta2 = MetaData() 1012 table_c = table.tometadata(meta2) 1013 1014 def _get_key(i): 1015 return [i.name, i.unique] + \ 1016 sorted(i.kwargs.items()) + \ 1017 list(i.columns.keys()) 1018 1019 eq_( 1020 sorted([_get_key(i) for i in table.indexes]), 1021 sorted([_get_key(i) for i in table_c.indexes]) 1022 ) 1023 1024 @emits_warning("Table '.+' already exists within the given MetaData") 1025 def test_already_exists(self): 1026 1027 meta1 = MetaData() 1028 table1 = Table('mytable', meta1, 1029 Column('myid', Integer, primary_key=True), 1030 ) 1031 meta2 = MetaData() 1032 table2 = Table('mytable', meta2, 1033 Column('yourid', Integer, primary_key=True), 1034 ) 1035 1036 table_c = table1.tometadata(meta2) 1037 table_d = table2.tometadata(meta2) 1038 1039 # d'oh! 1040 assert table_c is table_d 1041 1042 def test_default_schema_metadata(self): 1043 meta = MetaData(schema='myschema') 1044 1045 table = Table( 1046 'mytable', 1047 meta, 1048 Column( 1049 'myid', 1050 Integer, 1051 primary_key=True), 1052 Column( 1053 'name', 1054 String(40), 1055 nullable=True), 1056 Column( 1057 'description', 1058 String(30), 1059 CheckConstraint("description='hi'")), 1060 UniqueConstraint('name'), 1061 ) 1062 1063 table2 = Table( 1064 'othertable', meta, Column( 1065 'id', Integer, primary_key=True), Column( 1066 'myid', Integer, ForeignKey('myschema.mytable.myid')), ) 1067 1068 meta2 = MetaData(schema='someschema') 1069 table_c = table.tometadata(meta2, schema=None) 1070 table2_c = table2.tometadata(meta2, schema=None) 1071 1072 eq_(str(table_c.join(table2_c).onclause), 1073 str(table_c.c.myid == table2_c.c.myid)) 1074 eq_(str(table_c.join(table2_c).onclause), 1075 "someschema.mytable.myid = someschema.othertable.myid") 1076 1077 def test_strip_schema(self): 1078 meta = MetaData() 1079 1080 table = Table('mytable', meta, 1081 Column('myid', Integer, primary_key=True), 1082 Column('name', String(40), nullable=True), 1083 Column('description', String(30), 1084 CheckConstraint("description='hi'")), 1085 UniqueConstraint('name'), 1086 ) 1087 1088 table2 = Table('othertable', meta, 1089 Column('id', Integer, primary_key=True), 1090 Column('myid', Integer, ForeignKey('mytable.myid')), 1091 ) 1092 1093 meta2 = MetaData() 1094 table_c = table.tometadata(meta2, schema=None) 1095 table2_c = table2.tometadata(meta2, schema=None) 1096 1097 eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid 1098 == table2_c.c.myid)) 1099 eq_(str(table_c.join(table2_c).onclause), 1100 'mytable.myid = othertable.myid') 1101 1102 def test_unique_true_flag(self): 1103 meta = MetaData() 1104 1105 table = Table('mytable', meta, Column('x', Integer, unique=True)) 1106 1107 m2 = MetaData() 1108 1109 t2 = table.tometadata(m2) 1110 1111 eq_( 1112 len([ 1113 const for const 1114 in t2.constraints 1115 if isinstance(const, UniqueConstraint)]), 1116 1 1117 ) 1118 1119 def test_index_true_flag(self): 1120 meta = MetaData() 1121 1122 table = Table('mytable', meta, Column('x', Integer, index=True)) 1123 1124 m2 = MetaData() 1125 1126 t2 = table.tometadata(m2) 1127 1128 eq_(len(t2.indexes), 1) 1129 1130 1131class InfoTest(fixtures.TestBase): 1132 def test_metadata_info(self): 1133 m1 = MetaData() 1134 eq_(m1.info, {}) 1135 1136 m1 = MetaData(info={"foo": "bar"}) 1137 eq_(m1.info, {"foo": "bar"}) 1138 1139 def test_foreignkey_constraint_info(self): 1140 fkc = ForeignKeyConstraint(['a'], ['b'], name='bar') 1141 eq_(fkc.info, {}) 1142 1143 fkc = ForeignKeyConstraint( 1144 ['a'], ['b'], name='bar', info={"foo": "bar"}) 1145 eq_(fkc.info, {"foo": "bar"}) 1146 1147 def test_foreignkey_info(self): 1148 fkc = ForeignKey('a') 1149 eq_(fkc.info, {}) 1150 1151 fkc = ForeignKey('a', info={"foo": "bar"}) 1152 eq_(fkc.info, {"foo": "bar"}) 1153 1154 def test_primarykey_constraint_info(self): 1155 pkc = PrimaryKeyConstraint('a', name='x') 1156 eq_(pkc.info, {}) 1157 1158 pkc = PrimaryKeyConstraint('a', name='x', info={'foo': 'bar'}) 1159 eq_(pkc.info, {'foo': 'bar'}) 1160 1161 def test_unique_constraint_info(self): 1162 uc = UniqueConstraint('a', name='x') 1163 eq_(uc.info, {}) 1164 1165 uc = UniqueConstraint('a', name='x', info={'foo': 'bar'}) 1166 eq_(uc.info, {'foo': 'bar'}) 1167 1168 def test_check_constraint_info(self): 1169 cc = CheckConstraint('foo=bar', name='x') 1170 eq_(cc.info, {}) 1171 1172 cc = CheckConstraint('foo=bar', name='x', info={'foo': 'bar'}) 1173 eq_(cc.info, {'foo': 'bar'}) 1174 1175 def test_index_info(self): 1176 ix = Index('x', 'a') 1177 eq_(ix.info, {}) 1178 1179 ix = Index('x', 'a', info={'foo': 'bar'}) 1180 eq_(ix.info, {'foo': 'bar'}) 1181 1182 def test_column_info(self): 1183 c = Column('x', Integer) 1184 eq_(c.info, {}) 1185 1186 c = Column('x', Integer, info={'foo': 'bar'}) 1187 eq_(c.info, {'foo': 'bar'}) 1188 1189 def test_table_info(self): 1190 t = Table('x', MetaData()) 1191 eq_(t.info, {}) 1192 1193 t = Table('x', MetaData(), info={'foo': 'bar'}) 1194 eq_(t.info, {'foo': 'bar'}) 1195 1196 1197class TableTest(fixtures.TestBase, AssertsCompiledSQL): 1198 1199 @testing.requires.temporary_tables 1200 @testing.skip_if('mssql', 'different col format') 1201 def test_prefixes(self): 1202 from sqlalchemy import Table 1203 table1 = Table("temporary_table_1", MetaData(), 1204 Column("col1", Integer), 1205 prefixes=["TEMPORARY"]) 1206 1207 self.assert_compile( 1208 schema.CreateTable(table1), 1209 "CREATE TEMPORARY TABLE temporary_table_1 (col1 INTEGER)" 1210 ) 1211 1212 table2 = Table("temporary_table_2", MetaData(), 1213 Column("col1", Integer), 1214 prefixes=["VIRTUAL"]) 1215 self.assert_compile( 1216 schema.CreateTable(table2), 1217 "CREATE VIRTUAL TABLE temporary_table_2 (col1 INTEGER)" 1218 ) 1219 1220 def test_table_info(self): 1221 metadata = MetaData() 1222 t1 = Table('foo', metadata, info={'x': 'y'}) 1223 t2 = Table('bar', metadata, info={}) 1224 t3 = Table('bat', metadata) 1225 assert t1.info == {'x': 'y'} 1226 assert t2.info == {} 1227 assert t3.info == {} 1228 for t in (t1, t2, t3): 1229 t.info['bar'] = 'zip' 1230 assert t.info['bar'] == 'zip' 1231 1232 def test_foreign_key_constraints_collection(self): 1233 metadata = MetaData() 1234 t1 = Table('foo', metadata, Column('a', Integer)) 1235 eq_(t1.foreign_key_constraints, set()) 1236 1237 fk1 = ForeignKey('q.id') 1238 fk2 = ForeignKey('j.id') 1239 fk3 = ForeignKeyConstraint(['b', 'c'], ['r.x', 'r.y']) 1240 1241 t1.append_column(Column('b', Integer, fk1)) 1242 eq_( 1243 t1.foreign_key_constraints, 1244 set([fk1.constraint])) 1245 1246 t1.append_column(Column('c', Integer, fk2)) 1247 eq_( 1248 t1.foreign_key_constraints, 1249 set([fk1.constraint, fk2.constraint])) 1250 1251 t1.append_constraint(fk3) 1252 eq_( 1253 t1.foreign_key_constraints, 1254 set([fk1.constraint, fk2.constraint, fk3])) 1255 1256 def test_c_immutable(self): 1257 m = MetaData() 1258 t1 = Table('t', m, Column('x', Integer), Column('y', Integer)) 1259 assert_raises( 1260 TypeError, 1261 t1.c.extend, [Column('z', Integer)] 1262 ) 1263 1264 def assign(): 1265 t1.c['z'] = Column('z', Integer) 1266 assert_raises( 1267 TypeError, 1268 assign 1269 ) 1270 1271 def assign2(): 1272 t1.c.z = Column('z', Integer) 1273 assert_raises( 1274 TypeError, 1275 assign2 1276 ) 1277 1278 def test_c_mutate_after_unpickle(self): 1279 m = MetaData() 1280 1281 y = Column('y', Integer) 1282 t1 = Table('t', m, Column('x', Integer), y) 1283 1284 t2 = pickle.loads(pickle.dumps(t1)) 1285 z = Column('z', Integer) 1286 g = Column('g', Integer) 1287 t2.append_column(z) 1288 1289 is_(t1.c.contains_column(y), True) 1290 is_(t2.c.contains_column(y), False) 1291 y2 = t2.c.y 1292 is_(t2.c.contains_column(y2), True) 1293 1294 is_(t2.c.contains_column(z), True) 1295 is_(t2.c.contains_column(g), False) 1296 1297 def test_autoincrement_replace(self): 1298 m = MetaData() 1299 1300 t = Table('t', m, 1301 Column('id', Integer, primary_key=True) 1302 ) 1303 1304 is_(t._autoincrement_column, t.c.id) 1305 1306 t = Table('t', m, 1307 Column('id', Integer, primary_key=True), 1308 extend_existing=True 1309 ) 1310 is_(t._autoincrement_column, t.c.id) 1311 1312 def test_pk_args_standalone(self): 1313 m = MetaData() 1314 t = Table('t', m, 1315 Column('x', Integer, primary_key=True), 1316 PrimaryKeyConstraint(mssql_clustered=True) 1317 ) 1318 eq_( 1319 list(t.primary_key), [t.c.x] 1320 ) 1321 eq_( 1322 t.primary_key.dialect_kwargs, {"mssql_clustered": True} 1323 ) 1324 1325 def test_pk_cols_sets_flags(self): 1326 m = MetaData() 1327 t = Table('t', m, 1328 Column('x', Integer), 1329 Column('y', Integer), 1330 Column('z', Integer), 1331 PrimaryKeyConstraint('x', 'y') 1332 ) 1333 eq_(t.c.x.primary_key, True) 1334 eq_(t.c.y.primary_key, True) 1335 eq_(t.c.z.primary_key, False) 1336 1337 def test_pk_col_mismatch_one(self): 1338 m = MetaData() 1339 assert_raises_message( 1340 exc.SAWarning, 1341 "Table 't' specifies columns 'x' as primary_key=True, " 1342 "not matching locally specified columns 'q'", 1343 Table, 't', m, 1344 Column('x', Integer, primary_key=True), 1345 Column('q', Integer), 1346 PrimaryKeyConstraint('q') 1347 ) 1348 1349 def test_pk_col_mismatch_two(self): 1350 m = MetaData() 1351 assert_raises_message( 1352 exc.SAWarning, 1353 "Table 't' specifies columns 'a', 'b', 'c' as primary_key=True, " 1354 "not matching locally specified columns 'b', 'c'", 1355 Table, 't', m, 1356 Column('a', Integer, primary_key=True), 1357 Column('b', Integer, primary_key=True), 1358 Column('c', Integer, primary_key=True), 1359 PrimaryKeyConstraint('b', 'c') 1360 ) 1361 1362 @testing.emits_warning("Table 't'") 1363 def test_pk_col_mismatch_three(self): 1364 m = MetaData() 1365 t = Table('t', m, 1366 Column('x', Integer, primary_key=True), 1367 Column('q', Integer), 1368 PrimaryKeyConstraint('q') 1369 ) 1370 eq_(list(t.primary_key), [t.c.q]) 1371 1372 @testing.emits_warning("Table 't'") 1373 def test_pk_col_mismatch_four(self): 1374 m = MetaData() 1375 t = Table('t', m, 1376 Column('a', Integer, primary_key=True), 1377 Column('b', Integer, primary_key=True), 1378 Column('c', Integer, primary_key=True), 1379 PrimaryKeyConstraint('b', 'c') 1380 ) 1381 eq_(list(t.primary_key), [t.c.b, t.c.c]) 1382 1383 def test_pk_always_flips_nullable(self): 1384 m = MetaData() 1385 1386 t1 = Table('t1', m, Column('x', Integer), PrimaryKeyConstraint('x')) 1387 1388 t2 = Table('t2', m, Column('x', Integer, primary_key=True)) 1389 1390 eq_(list(t1.primary_key), [t1.c.x]) 1391 1392 eq_(list(t2.primary_key), [t2.c.x]) 1393 1394 assert t1.c.x.primary_key 1395 assert t2.c.x.primary_key 1396 1397 assert not t2.c.x.nullable 1398 assert not t1.c.x.nullable 1399 1400 1401class SchemaTypeTest(fixtures.TestBase): 1402 1403 class MyType(sqltypes.SchemaType, sqltypes.TypeEngine): 1404 column = None 1405 table = None 1406 evt_targets = () 1407 1408 def _set_table(self, column, table): 1409 super(SchemaTypeTest.MyType, self)._set_table(column, table) 1410 self.column = column 1411 self.table = table 1412 1413 def _on_table_create(self, target, bind, **kw): 1414 super(SchemaTypeTest.MyType, self)._on_table_create( 1415 target, bind, **kw) 1416 self.evt_targets += (target,) 1417 1418 def _on_metadata_create(self, target, bind, **kw): 1419 super(SchemaTypeTest.MyType, self)._on_metadata_create( 1420 target, bind, **kw) 1421 self.evt_targets += (target,) 1422 1423 class MyTypeWImpl(MyType): 1424 1425 def _gen_dialect_impl(self, dialect): 1426 return self.adapt(SchemaTypeTest.MyTypeImpl) 1427 1428 class MyTypeImpl(MyTypeWImpl): 1429 pass 1430 1431 def test_independent_schema(self): 1432 m = MetaData() 1433 type_ = self.MyType(schema="q") 1434 t1 = Table('x', m, Column("y", type_), schema="z") 1435 eq_(t1.c.y.type.schema, "q") 1436 1437 def test_inherit_schema(self): 1438 m = MetaData() 1439 type_ = self.MyType(schema="q", inherit_schema=True) 1440 t1 = Table('x', m, Column("y", type_), schema="z") 1441 eq_(t1.c.y.type.schema, "z") 1442 1443 def test_independent_schema_enum(self): 1444 m = MetaData() 1445 type_ = sqltypes.Enum("a", schema="q") 1446 t1 = Table('x', m, Column("y", type_), schema="z") 1447 eq_(t1.c.y.type.schema, "q") 1448 1449 def test_inherit_schema_enum(self): 1450 m = MetaData() 1451 type_ = sqltypes.Enum("a", "b", "c", schema="q", inherit_schema=True) 1452 t1 = Table('x', m, Column("y", type_), schema="z") 1453 eq_(t1.c.y.type.schema, "z") 1454 1455 def test_tometadata_copy_type(self): 1456 m1 = MetaData() 1457 1458 type_ = self.MyType() 1459 t1 = Table('x', m1, Column("y", type_)) 1460 1461 m2 = MetaData() 1462 t2 = t1.tometadata(m2) 1463 1464 # metadata isn't set 1465 is_(t2.c.y.type.metadata, None) 1466 1467 # our test type sets table, though 1468 is_(t2.c.y.type.table, t2) 1469 1470 def test_tometadata_independent_schema(self): 1471 m1 = MetaData() 1472 1473 type_ = self.MyType() 1474 t1 = Table('x', m1, Column("y", type_)) 1475 1476 m2 = MetaData() 1477 t2 = t1.tometadata(m2, schema="bar") 1478 1479 eq_(t2.c.y.type.schema, None) 1480 1481 def test_tometadata_inherit_schema(self): 1482 m1 = MetaData() 1483 1484 type_ = self.MyType(inherit_schema=True) 1485 t1 = Table('x', m1, Column("y", type_)) 1486 1487 m2 = MetaData() 1488 t2 = t1.tometadata(m2, schema="bar") 1489 1490 eq_(t1.c.y.type.schema, None) 1491 eq_(t2.c.y.type.schema, "bar") 1492 1493 def test_tometadata_independent_events(self): 1494 m1 = MetaData() 1495 1496 type_ = self.MyType() 1497 t1 = Table('x', m1, Column("y", type_)) 1498 1499 m2 = MetaData() 1500 t2 = t1.tometadata(m2) 1501 1502 t1.dispatch.before_create(t1, testing.db) 1503 eq_(t1.c.y.type.evt_targets, (t1,)) 1504 eq_(t2.c.y.type.evt_targets, ()) 1505 1506 t2.dispatch.before_create(t2, testing.db) 1507 t2.dispatch.before_create(t2, testing.db) 1508 eq_(t1.c.y.type.evt_targets, (t1,)) 1509 eq_(t2.c.y.type.evt_targets, (t2, t2)) 1510 1511 def test_metadata_dispatch_no_new_impl(self): 1512 m1 = MetaData() 1513 typ = self.MyType(metadata=m1) 1514 m1.dispatch.before_create(m1, testing.db) 1515 eq_(typ.evt_targets, (m1, )) 1516 1517 dialect_impl = typ.dialect_impl(testing.db.dialect) 1518 eq_(dialect_impl.evt_targets, ()) 1519 1520 def test_metadata_dispatch_new_impl(self): 1521 m1 = MetaData() 1522 typ = self.MyTypeWImpl(metadata=m1) 1523 m1.dispatch.before_create(m1, testing.db) 1524 eq_(typ.evt_targets, (m1, )) 1525 1526 dialect_impl = typ.dialect_impl(testing.db.dialect) 1527 eq_(dialect_impl.evt_targets, (m1, )) 1528 1529 def test_table_dispatch_no_new_impl(self): 1530 m1 = MetaData() 1531 typ = self.MyType() 1532 t1 = Table('t1', m1, Column('x', typ)) 1533 m1.dispatch.before_create(t1, testing.db) 1534 eq_(typ.evt_targets, (t1, )) 1535 1536 dialect_impl = typ.dialect_impl(testing.db.dialect) 1537 eq_(dialect_impl.evt_targets, ()) 1538 1539 def test_table_dispatch_new_impl(self): 1540 m1 = MetaData() 1541 typ = self.MyTypeWImpl() 1542 t1 = Table('t1', m1, Column('x', typ)) 1543 m1.dispatch.before_create(t1, testing.db) 1544 eq_(typ.evt_targets, (t1, )) 1545 1546 dialect_impl = typ.dialect_impl(testing.db.dialect) 1547 eq_(dialect_impl.evt_targets, (t1, )) 1548 1549 def test_create_metadata_bound_no_crash(self): 1550 m1 = MetaData() 1551 self.MyType(metadata=m1) 1552 1553 m1.create_all(testing.db) 1554 1555 def test_boolean_constraint_type_doesnt_double(self): 1556 m1 = MetaData() 1557 1558 t1 = Table('x', m1, Column("flag", Boolean())) 1559 eq_( 1560 len([ 1561 c for c in t1.constraints 1562 if isinstance(c, CheckConstraint)]), 1563 1 1564 ) 1565 m2 = MetaData() 1566 t2 = t1.tometadata(m2) 1567 1568 eq_( 1569 len([ 1570 c for c in t2.constraints 1571 if isinstance(c, CheckConstraint)]), 1572 1 1573 ) 1574 1575 def test_enum_constraint_type_doesnt_double(self): 1576 m1 = MetaData() 1577 1578 t1 = Table('x', m1, Column("flag", Enum('a', 'b', 'c'))) 1579 eq_( 1580 len([ 1581 c for c in t1.constraints 1582 if isinstance(c, CheckConstraint)]), 1583 1 1584 ) 1585 m2 = MetaData() 1586 t2 = t1.tometadata(m2) 1587 1588 eq_( 1589 len([ 1590 c for c in t2.constraints 1591 if isinstance(c, CheckConstraint)]), 1592 1 1593 ) 1594 1595 1596class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): 1597 1598 def test_default_schema_metadata_fk(self): 1599 m = MetaData(schema="foo") 1600 t1 = Table('t1', m, Column('x', Integer)) 1601 t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x'))) 1602 assert t2.c.x.references(t1.c.x) 1603 1604 def test_ad_hoc_schema_equiv_fk(self): 1605 m = MetaData() 1606 t1 = Table('t1', m, Column('x', Integer), schema="foo") 1607 t2 = Table( 1608 't2', 1609 m, 1610 Column( 1611 'x', 1612 Integer, 1613 ForeignKey('t1.x')), 1614 schema="foo") 1615 assert_raises( 1616 exc.NoReferencedTableError, 1617 lambda: t2.c.x.references(t1.c.x) 1618 ) 1619 1620 def test_default_schema_metadata_fk_alt_remote(self): 1621 m = MetaData(schema="foo") 1622 t1 = Table('t1', m, Column('x', Integer)) 1623 t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')), 1624 schema="bar") 1625 assert t2.c.x.references(t1.c.x) 1626 1627 def test_default_schema_metadata_fk_alt_local_raises(self): 1628 m = MetaData(schema="foo") 1629 t1 = Table('t1', m, Column('x', Integer), schema="bar") 1630 t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x'))) 1631 assert_raises( 1632 exc.NoReferencedTableError, 1633 lambda: t2.c.x.references(t1.c.x) 1634 ) 1635 1636 def test_default_schema_metadata_fk_alt_local(self): 1637 m = MetaData(schema="foo") 1638 t1 = Table('t1', m, Column('x', Integer), schema="bar") 1639 t2 = Table('t2', m, Column('x', Integer, ForeignKey('bar.t1.x'))) 1640 assert t2.c.x.references(t1.c.x) 1641 1642 def test_create_drop_schema(self): 1643 1644 self.assert_compile( 1645 schema.CreateSchema("sa_schema"), 1646 "CREATE SCHEMA sa_schema" 1647 ) 1648 self.assert_compile( 1649 schema.DropSchema("sa_schema"), 1650 "DROP SCHEMA sa_schema" 1651 ) 1652 self.assert_compile( 1653 schema.DropSchema("sa_schema", cascade=True), 1654 "DROP SCHEMA sa_schema CASCADE" 1655 ) 1656 1657 def test_iteration(self): 1658 metadata = MetaData() 1659 table1 = Table( 1660 'table1', 1661 metadata, 1662 Column( 1663 'col1', 1664 Integer, 1665 primary_key=True), 1666 schema='someschema') 1667 table2 = Table( 1668 'table2', 1669 metadata, 1670 Column( 1671 'col1', 1672 Integer, 1673 primary_key=True), 1674 Column( 1675 'col2', 1676 Integer, 1677 ForeignKey('someschema.table1.col1')), 1678 schema='someschema') 1679 1680 t1 = str(schema.CreateTable(table1).compile(bind=testing.db)) 1681 t2 = str(schema.CreateTable(table2).compile(bind=testing.db)) 1682 if testing.db.dialect.preparer(testing.db.dialect).omit_schema: 1683 assert t1.index("CREATE TABLE table1") > -1 1684 assert t2.index("CREATE TABLE table2") > -1 1685 else: 1686 assert t1.index("CREATE TABLE someschema.table1") > -1 1687 assert t2.index("CREATE TABLE someschema.table2") > -1 1688 1689 1690class UseExistingTest(fixtures.TablesTest): 1691 1692 @classmethod 1693 def define_tables(cls, metadata): 1694 Table('users', metadata, 1695 Column('id', Integer, primary_key=True), 1696 Column('name', String(30))) 1697 1698 def _useexisting_fixture(self): 1699 meta2 = MetaData(testing.db) 1700 Table('users', meta2, autoload=True) 1701 return meta2 1702 1703 def _notexisting_fixture(self): 1704 return MetaData(testing.db) 1705 1706 def test_exception_no_flags(self): 1707 meta2 = self._useexisting_fixture() 1708 1709 def go(): 1710 Table('users', meta2, Column('name', 1711 Unicode), autoload=True) 1712 assert_raises_message( 1713 exc.InvalidRequestError, 1714 "Table 'users' is already defined for this " 1715 "MetaData instance.", 1716 go 1717 ) 1718 1719 def test_keep_plus_existing_raises(self): 1720 meta2 = self._useexisting_fixture() 1721 assert_raises( 1722 exc.ArgumentError, 1723 Table, 'users', meta2, keep_existing=True, 1724 extend_existing=True 1725 ) 1726 1727 @testing.uses_deprecated() 1728 def test_existing_plus_useexisting_raises(self): 1729 meta2 = self._useexisting_fixture() 1730 assert_raises( 1731 exc.ArgumentError, 1732 Table, 'users', meta2, useexisting=True, 1733 extend_existing=True 1734 ) 1735 1736 def test_keep_existing_no_dupe_constraints(self): 1737 meta2 = self._notexisting_fixture() 1738 users = Table('users', meta2, 1739 Column('id', Integer), 1740 Column('name', Unicode), 1741 UniqueConstraint('name'), 1742 keep_existing=True 1743 ) 1744 assert 'name' in users.c 1745 assert 'id' in users.c 1746 eq_(len(users.constraints), 2) 1747 1748 u2 = Table('users', meta2, 1749 Column('id', Integer), 1750 Column('name', Unicode), 1751 UniqueConstraint('name'), 1752 keep_existing=True 1753 ) 1754 eq_(len(u2.constraints), 2) 1755 1756 def test_extend_existing_dupes_constraints(self): 1757 meta2 = self._notexisting_fixture() 1758 users = Table('users', meta2, 1759 Column('id', Integer), 1760 Column('name', Unicode), 1761 UniqueConstraint('name'), 1762 extend_existing=True 1763 ) 1764 assert 'name' in users.c 1765 assert 'id' in users.c 1766 eq_(len(users.constraints), 2) 1767 1768 u2 = Table('users', meta2, 1769 Column('id', Integer), 1770 Column('name', Unicode), 1771 UniqueConstraint('name'), 1772 extend_existing=True 1773 ) 1774 # constraint got duped 1775 eq_(len(u2.constraints), 3) 1776 1777 def test_keep_existing_coltype(self): 1778 meta2 = self._useexisting_fixture() 1779 users = Table('users', meta2, Column('name', Unicode), 1780 autoload=True, keep_existing=True) 1781 assert not isinstance(users.c.name.type, Unicode) 1782 1783 def test_keep_existing_quote(self): 1784 meta2 = self._useexisting_fixture() 1785 users = Table('users', meta2, quote=True, autoload=True, 1786 keep_existing=True) 1787 assert not users.name.quote 1788 1789 def test_keep_existing_add_column(self): 1790 meta2 = self._useexisting_fixture() 1791 users = Table('users', meta2, 1792 Column('foo', Integer), 1793 autoload=True, 1794 keep_existing=True) 1795 assert "foo" not in users.c 1796 1797 def test_keep_existing_coltype_no_orig(self): 1798 meta2 = self._notexisting_fixture() 1799 users = Table('users', meta2, Column('name', Unicode), 1800 autoload=True, keep_existing=True) 1801 assert isinstance(users.c.name.type, Unicode) 1802 1803 @testing.skip_if( 1804 lambda: testing.db.dialect.requires_name_normalize, 1805 "test depends on lowercase as case insensitive") 1806 def test_keep_existing_quote_no_orig(self): 1807 meta2 = self._notexisting_fixture() 1808 users = Table('users', meta2, quote=True, 1809 autoload=True, 1810 keep_existing=True) 1811 assert users.name.quote 1812 1813 def test_keep_existing_add_column_no_orig(self): 1814 meta2 = self._notexisting_fixture() 1815 users = Table('users', meta2, 1816 Column('foo', Integer), 1817 autoload=True, 1818 keep_existing=True) 1819 assert "foo" in users.c 1820 1821 def test_keep_existing_coltype_no_reflection(self): 1822 meta2 = self._useexisting_fixture() 1823 users = Table('users', meta2, Column('name', Unicode), 1824 keep_existing=True) 1825 assert not isinstance(users.c.name.type, Unicode) 1826 1827 def test_keep_existing_quote_no_reflection(self): 1828 meta2 = self._useexisting_fixture() 1829 users = Table('users', meta2, quote=True, 1830 keep_existing=True) 1831 assert not users.name.quote 1832 1833 def test_keep_existing_add_column_no_reflection(self): 1834 meta2 = self._useexisting_fixture() 1835 users = Table('users', meta2, 1836 Column('foo', Integer), 1837 keep_existing=True) 1838 assert "foo" not in users.c 1839 1840 def test_extend_existing_coltype(self): 1841 meta2 = self._useexisting_fixture() 1842 users = Table('users', meta2, Column('name', Unicode), 1843 autoload=True, extend_existing=True) 1844 assert isinstance(users.c.name.type, Unicode) 1845 1846 def test_extend_existing_quote(self): 1847 meta2 = self._useexisting_fixture() 1848 assert_raises_message( 1849 tsa.exc.ArgumentError, 1850 "Can't redefine 'quote' or 'quote_schema' arguments", 1851 Table, 'users', meta2, quote=True, autoload=True, 1852 extend_existing=True 1853 ) 1854 1855 def test_extend_existing_add_column(self): 1856 meta2 = self._useexisting_fixture() 1857 users = Table('users', meta2, 1858 Column('foo', Integer), 1859 autoload=True, 1860 extend_existing=True) 1861 assert "foo" in users.c 1862 1863 def test_extend_existing_coltype_no_orig(self): 1864 meta2 = self._notexisting_fixture() 1865 users = Table('users', meta2, Column('name', Unicode), 1866 autoload=True, extend_existing=True) 1867 assert isinstance(users.c.name.type, Unicode) 1868 1869 @testing.skip_if( 1870 lambda: testing.db.dialect.requires_name_normalize, 1871 "test depends on lowercase as case insensitive") 1872 def test_extend_existing_quote_no_orig(self): 1873 meta2 = self._notexisting_fixture() 1874 users = Table('users', meta2, quote=True, 1875 autoload=True, 1876 extend_existing=True) 1877 assert users.name.quote 1878 1879 def test_extend_existing_add_column_no_orig(self): 1880 meta2 = self._notexisting_fixture() 1881 users = Table('users', meta2, 1882 Column('foo', Integer), 1883 autoload=True, 1884 extend_existing=True) 1885 assert "foo" in users.c 1886 1887 def test_extend_existing_coltype_no_reflection(self): 1888 meta2 = self._useexisting_fixture() 1889 users = Table('users', meta2, Column('name', Unicode), 1890 extend_existing=True) 1891 assert isinstance(users.c.name.type, Unicode) 1892 1893 def test_extend_existing_quote_no_reflection(self): 1894 meta2 = self._useexisting_fixture() 1895 assert_raises_message( 1896 tsa.exc.ArgumentError, 1897 "Can't redefine 'quote' or 'quote_schema' arguments", 1898 Table, 'users', meta2, quote=True, 1899 extend_existing=True 1900 ) 1901 1902 def test_extend_existing_add_column_no_reflection(self): 1903 meta2 = self._useexisting_fixture() 1904 users = Table('users', meta2, 1905 Column('foo', Integer), 1906 extend_existing=True) 1907 assert "foo" in users.c 1908 1909 1910class IndexTest(fixtures.TestBase): 1911 1912 def _assert(self, t, i, columns=True): 1913 eq_(t.indexes, set([i])) 1914 if columns: 1915 eq_(list(i.columns), [t.c.x]) 1916 else: 1917 eq_(list(i.columns), []) 1918 assert i.table is t 1919 1920 def test_separate_decl_columns(self): 1921 m = MetaData() 1922 t = Table('t', m, Column('x', Integer)) 1923 i = Index('i', t.c.x) 1924 self._assert(t, i) 1925 1926 def test_separate_decl_columns_functional(self): 1927 m = MetaData() 1928 t = Table('t', m, Column('x', Integer)) 1929 i = Index('i', func.foo(t.c.x)) 1930 self._assert(t, i) 1931 1932 def test_inline_decl_columns(self): 1933 m = MetaData() 1934 c = Column('x', Integer) 1935 i = Index('i', c) 1936 t = Table('t', m, c, i) 1937 self._assert(t, i) 1938 1939 def test_inline_decl_columns_functional(self): 1940 m = MetaData() 1941 c = Column('x', Integer) 1942 i = Index('i', func.foo(c)) 1943 t = Table('t', m, c, i) 1944 self._assert(t, i) 1945 1946 def test_inline_decl_string(self): 1947 m = MetaData() 1948 i = Index('i', "x") 1949 t = Table('t', m, Column('x', Integer), i) 1950 self._assert(t, i) 1951 1952 def test_inline_decl_textonly(self): 1953 m = MetaData() 1954 i = Index('i', text("foobar(x)")) 1955 t = Table('t', m, Column('x', Integer), i) 1956 self._assert(t, i, columns=False) 1957 1958 def test_separate_decl_textonly(self): 1959 m = MetaData() 1960 i = Index('i', text("foobar(x)")) 1961 t = Table('t', m, Column('x', Integer)) 1962 t.append_constraint(i) 1963 self._assert(t, i, columns=False) 1964 1965 def test_unnamed_column_exception(self): 1966 # this can occur in some declarative situations 1967 c = Column(Integer) 1968 idx = Index('q', c) 1969 m = MetaData() 1970 t = Table('t', m, Column('q')) 1971 assert_raises_message( 1972 exc.ArgumentError, 1973 "Can't add unnamed column to column collection", 1974 t.append_constraint, idx 1975 ) 1976 1977 1978class ConstraintTest(fixtures.TestBase): 1979 1980 def _single_fixture(self): 1981 m = MetaData() 1982 1983 t1 = Table('t1', m, 1984 Column('a', Integer), 1985 Column('b', Integer) 1986 ) 1987 1988 t2 = Table('t2', m, 1989 Column('a', Integer, ForeignKey('t1.a')) 1990 ) 1991 1992 t3 = Table('t3', m, 1993 Column('a', Integer) 1994 ) 1995 return t1, t2, t3 1996 1997 def test_table_references(self): 1998 t1, t2, t3 = self._single_fixture() 1999 assert list(t2.c.a.foreign_keys)[0].references(t1) 2000 assert not list(t2.c.a.foreign_keys)[0].references(t3) 2001 2002 def test_column_references(self): 2003 t1, t2, t3 = self._single_fixture() 2004 assert t2.c.a.references(t1.c.a) 2005 assert not t2.c.a.references(t3.c.a) 2006 assert not t2.c.a.references(t1.c.b) 2007 2008 def test_column_references_derived(self): 2009 t1, t2, t3 = self._single_fixture() 2010 s1 = tsa.select([tsa.select([t1]).alias()]) 2011 assert t2.c.a.references(s1.c.a) 2012 assert not t2.c.a.references(s1.c.b) 2013 2014 def test_copy_doesnt_reference(self): 2015 t1, t2, t3 = self._single_fixture() 2016 a2 = t2.c.a.copy() 2017 assert not a2.references(t1.c.a) 2018 assert not a2.references(t1.c.b) 2019 2020 def test_derived_column_references(self): 2021 t1, t2, t3 = self._single_fixture() 2022 s1 = tsa.select([tsa.select([t2]).alias()]) 2023 assert s1.c.a.references(t1.c.a) 2024 assert not s1.c.a.references(t1.c.b) 2025 2026 def test_referred_table_accessor(self): 2027 t1, t2, t3 = self._single_fixture() 2028 fkc = list(t2.foreign_key_constraints)[0] 2029 is_(fkc.referred_table, t1) 2030 2031 def test_referred_table_accessor_not_available(self): 2032 t1 = Table('t', MetaData(), Column('x', ForeignKey('q.id'))) 2033 fkc = list(t1.foreign_key_constraints)[0] 2034 assert_raises_message( 2035 exc.InvalidRequestError, 2036 "Foreign key associated with column 't.x' could not find " 2037 "table 'q' with which to generate a foreign key to target " 2038 "column 'id'", 2039 getattr, fkc, "referred_table" 2040 ) 2041 2042 def test_related_column_not_present_atfirst_ok(self): 2043 m = MetaData() 2044 base_table = Table("base", m, 2045 Column("id", Integer, primary_key=True) 2046 ) 2047 fk = ForeignKey('base.q') 2048 derived_table = Table("derived", m, 2049 Column("id", None, fk, 2050 primary_key=True), 2051 ) 2052 2053 base_table.append_column(Column('q', Integer)) 2054 assert fk.column is base_table.c.q 2055 assert isinstance(derived_table.c.id.type, Integer) 2056 2057 def test_related_column_not_present_atfirst_ok_onname(self): 2058 m = MetaData() 2059 base_table = Table("base", m, 2060 Column("id", Integer, primary_key=True) 2061 ) 2062 fk = ForeignKey('base.q', link_to_name=True) 2063 derived_table = Table("derived", m, 2064 Column("id", None, fk, 2065 primary_key=True), 2066 ) 2067 2068 base_table.append_column(Column('q', Integer, key='zz')) 2069 assert fk.column is base_table.c.zz 2070 assert isinstance(derived_table.c.id.type, Integer) 2071 2072 def test_related_column_not_present_atfirst_ok_linktoname_conflict(self): 2073 m = MetaData() 2074 base_table = Table("base", m, 2075 Column("id", Integer, primary_key=True) 2076 ) 2077 fk = ForeignKey('base.q', link_to_name=True) 2078 derived_table = Table("derived", m, 2079 Column("id", None, fk, 2080 primary_key=True), 2081 ) 2082 2083 base_table.append_column(Column('zz', Integer, key='q')) 2084 base_table.append_column(Column('q', Integer, key='zz')) 2085 assert fk.column is base_table.c.zz 2086 assert isinstance(derived_table.c.id.type, Integer) 2087 2088 def test_invalid_composite_fk_check_strings(self): 2089 m = MetaData() 2090 2091 assert_raises_message( 2092 exc.ArgumentError, 2093 r"ForeignKeyConstraint on t1\(x, y\) refers to " 2094 "multiple remote tables: t2 and t3", 2095 Table, 2096 't1', m, Column('x', Integer), Column('y', Integer), 2097 ForeignKeyConstraint(['x', 'y'], ['t2.x', 't3.y']) 2098 ) 2099 2100 def test_invalid_composite_fk_check_columns(self): 2101 m = MetaData() 2102 2103 t2 = Table('t2', m, Column('x', Integer)) 2104 t3 = Table('t3', m, Column('y', Integer)) 2105 2106 assert_raises_message( 2107 exc.ArgumentError, 2108 r"ForeignKeyConstraint on t1\(x, y\) refers to " 2109 "multiple remote tables: t2 and t3", 2110 Table, 2111 't1', m, Column('x', Integer), Column('y', Integer), 2112 ForeignKeyConstraint(['x', 'y'], [t2.c.x, t3.c.y]) 2113 ) 2114 2115 def test_invalid_composite_fk_check_columns_notattached(self): 2116 m = MetaData() 2117 x = Column('x', Integer) 2118 y = Column('y', Integer) 2119 2120 # no error is raised for this one right now. 2121 # which is a minor bug. 2122 Table('t1', m, Column('x', Integer), Column('y', Integer), 2123 ForeignKeyConstraint(['x', 'y'], [x, y]) 2124 ) 2125 2126 Table('t2', m, x) 2127 Table('t3', m, y) 2128 2129 def test_constraint_copied_to_proxy_ok(self): 2130 m = MetaData() 2131 Table('t1', m, Column('id', Integer, primary_key=True)) 2132 t2 = Table('t2', m, Column('id', Integer, ForeignKey('t1.id'), 2133 primary_key=True)) 2134 2135 s = tsa.select([t2]) 2136 t2fk = list(t2.c.id.foreign_keys)[0] 2137 sfk = list(s.c.id.foreign_keys)[0] 2138 2139 # the two FKs share the ForeignKeyConstraint 2140 is_( 2141 t2fk.constraint, 2142 sfk.constraint 2143 ) 2144 2145 # but the ForeignKeyConstraint isn't 2146 # aware of the select's FK 2147 eq_( 2148 t2fk.constraint.elements, 2149 [t2fk] 2150 ) 2151 2152 def test_type_propagate_composite_fk_string(self): 2153 metadata = MetaData() 2154 Table( 2155 'a', metadata, 2156 Column('key1', Integer, primary_key=True), 2157 Column('key2', String(40), primary_key=True)) 2158 2159 b = Table('b', metadata, 2160 Column('a_key1', None), 2161 Column('a_key2', None), 2162 Column('id', Integer, primary_key=True), 2163 ForeignKeyConstraint(['a_key1', 'a_key2'], 2164 ['a.key1', 'a.key2']) 2165 ) 2166 2167 assert isinstance(b.c.a_key1.type, Integer) 2168 assert isinstance(b.c.a_key2.type, String) 2169 2170 def test_type_propagate_composite_fk_col(self): 2171 metadata = MetaData() 2172 a = Table('a', metadata, 2173 Column('key1', Integer, primary_key=True), 2174 Column('key2', String(40), primary_key=True)) 2175 2176 b = Table('b', metadata, 2177 Column('a_key1', None), 2178 Column('a_key2', None), 2179 Column('id', Integer, primary_key=True), 2180 ForeignKeyConstraint(['a_key1', 'a_key2'], 2181 [a.c.key1, a.c.key2]) 2182 ) 2183 2184 assert isinstance(b.c.a_key1.type, Integer) 2185 assert isinstance(b.c.a_key2.type, String) 2186 2187 def test_type_propagate_standalone_fk_string(self): 2188 metadata = MetaData() 2189 Table( 2190 'a', metadata, 2191 Column('key1', Integer, primary_key=True)) 2192 2193 b = Table('b', metadata, 2194 Column('a_key1', None, ForeignKey("a.key1")), 2195 ) 2196 2197 assert isinstance(b.c.a_key1.type, Integer) 2198 2199 def test_type_propagate_standalone_fk_col(self): 2200 metadata = MetaData() 2201 a = Table('a', metadata, 2202 Column('key1', Integer, primary_key=True)) 2203 2204 b = Table('b', metadata, 2205 Column('a_key1', None, ForeignKey(a.c.key1)), 2206 ) 2207 2208 assert isinstance(b.c.a_key1.type, Integer) 2209 2210 def test_type_propagate_chained_string_source_first(self): 2211 metadata = MetaData() 2212 Table( 2213 'a', metadata, 2214 Column('key1', Integer, primary_key=True) 2215 ) 2216 2217 b = Table('b', metadata, 2218 Column('a_key1', None, ForeignKey("a.key1")), 2219 ) 2220 2221 c = Table('c', metadata, 2222 Column('b_key1', None, ForeignKey("b.a_key1")), 2223 ) 2224 2225 assert isinstance(b.c.a_key1.type, Integer) 2226 assert isinstance(c.c.b_key1.type, Integer) 2227 2228 def test_type_propagate_chained_string_source_last(self): 2229 metadata = MetaData() 2230 2231 b = Table('b', metadata, 2232 Column('a_key1', None, ForeignKey("a.key1")), 2233 ) 2234 2235 c = Table('c', metadata, 2236 Column('b_key1', None, ForeignKey("b.a_key1")), 2237 ) 2238 2239 Table( 2240 'a', metadata, 2241 Column('key1', Integer, primary_key=True)) 2242 2243 assert isinstance(b.c.a_key1.type, Integer) 2244 assert isinstance(c.c.b_key1.type, Integer) 2245 2246 def test_type_propagate_chained_string_source_last_onname(self): 2247 metadata = MetaData() 2248 2249 b = Table('b', metadata, 2250 Column( 2251 'a_key1', None, 2252 ForeignKey("a.key1", link_to_name=True), key="ak1"), 2253 ) 2254 2255 c = Table('c', metadata, 2256 Column( 2257 'b_key1', None, 2258 ForeignKey("b.a_key1", link_to_name=True), key="bk1"), 2259 ) 2260 2261 Table( 2262 'a', metadata, 2263 Column('key1', Integer, primary_key=True, key='ak1')) 2264 2265 assert isinstance(b.c.ak1.type, Integer) 2266 assert isinstance(c.c.bk1.type, Integer) 2267 2268 def test_type_propagate_chained_string_source_last_onname_conflict(self): 2269 metadata = MetaData() 2270 2271 b = Table('b', metadata, 2272 # b.c.key1 -> a.c.key1 -> String 2273 Column( 2274 'ak1', None, 2275 ForeignKey("a.key1", link_to_name=False), key="key1"), 2276 # b.c.ak1 -> a.c.ak1 -> Integer 2277 Column( 2278 'a_key1', None, 2279 ForeignKey("a.key1", link_to_name=True), key="ak1"), 2280 ) 2281 2282 c = Table('c', metadata, 2283 # c.c.b_key1 -> b.c.ak1 -> Integer 2284 Column( 2285 'b_key1', None, 2286 ForeignKey("b.ak1", link_to_name=False)), 2287 # c.c.b_ak1 -> b.c.ak1 2288 Column( 2289 'b_ak1', None, 2290 ForeignKey("b.ak1", link_to_name=True)), 2291 ) 2292 2293 Table( 2294 'a', metadata, 2295 # a.c.key1 2296 Column('ak1', String, key="key1"), 2297 # a.c.ak1 2298 Column('key1', Integer, primary_key=True, key='ak1'), 2299 ) 2300 2301 assert isinstance(b.c.key1.type, String) 2302 assert isinstance(b.c.ak1.type, Integer) 2303 2304 assert isinstance(c.c.b_ak1.type, String) 2305 assert isinstance(c.c.b_key1.type, Integer) 2306 2307 def test_type_propagate_chained_col_orig_first(self): 2308 metadata = MetaData() 2309 a = Table('a', metadata, 2310 Column('key1', Integer, primary_key=True)) 2311 2312 b = Table('b', metadata, 2313 Column('a_key1', None, ForeignKey(a.c.key1)), 2314 ) 2315 2316 c = Table('c', metadata, 2317 Column('b_key1', None, ForeignKey(b.c.a_key1)), 2318 ) 2319 2320 assert isinstance(b.c.a_key1.type, Integer) 2321 assert isinstance(c.c.b_key1.type, Integer) 2322 2323 def test_column_accessor_col(self): 2324 c1 = Column('x', Integer) 2325 fk = ForeignKey(c1) 2326 is_(fk.column, c1) 2327 2328 def test_column_accessor_clause_element(self): 2329 c1 = Column('x', Integer) 2330 2331 class CThing(object): 2332 2333 def __init__(self, c): 2334 self.c = c 2335 2336 def __clause_element__(self): 2337 return self.c 2338 2339 fk = ForeignKey(CThing(c1)) 2340 is_(fk.column, c1) 2341 2342 def test_column_accessor_string_no_parent(self): 2343 fk = ForeignKey("sometable.somecol") 2344 assert_raises_message( 2345 exc.InvalidRequestError, 2346 "this ForeignKey object does not yet have a parent " 2347 "Column associated with it.", 2348 getattr, fk, "column" 2349 ) 2350 2351 def test_column_accessor_string_no_parent_table(self): 2352 fk = ForeignKey("sometable.somecol") 2353 Column('x', fk) 2354 assert_raises_message( 2355 exc.InvalidRequestError, 2356 "this ForeignKey's parent column is not yet " 2357 "associated with a Table.", 2358 getattr, fk, "column" 2359 ) 2360 2361 def test_column_accessor_string_no_target_table(self): 2362 fk = ForeignKey("sometable.somecol") 2363 c1 = Column('x', fk) 2364 Table('t', MetaData(), c1) 2365 assert_raises_message( 2366 exc.NoReferencedTableError, 2367 "Foreign key associated with column 't.x' could not find " 2368 "table 'sometable' with which to generate a " 2369 "foreign key to target column 'somecol'", 2370 getattr, fk, "column" 2371 ) 2372 2373 def test_column_accessor_string_no_target_column(self): 2374 fk = ForeignKey("sometable.somecol") 2375 c1 = Column('x', fk) 2376 m = MetaData() 2377 Table('t', m, c1) 2378 Table("sometable", m, Column('notsomecol', Integer)) 2379 assert_raises_message( 2380 exc.NoReferencedColumnError, 2381 "Could not initialize target column for ForeignKey " 2382 "'sometable.somecol' on table 't': " 2383 "table 'sometable' has no column named 'somecol'", 2384 getattr, fk, "column" 2385 ) 2386 2387 def test_remove_table_fk_bookkeeping(self): 2388 metadata = MetaData() 2389 fk = ForeignKey('t1.x') 2390 t2 = Table('t2', metadata, Column('y', Integer, fk)) 2391 t3 = Table('t3', metadata, Column('y', Integer, ForeignKey('t1.x'))) 2392 2393 assert t2.key in metadata.tables 2394 assert ("t1", "x") in metadata._fk_memos 2395 2396 metadata.remove(t2) 2397 2398 # key is removed 2399 assert t2.key not in metadata.tables 2400 2401 # the memo for the FK is still there 2402 assert ("t1", "x") in metadata._fk_memos 2403 2404 # fk is not in the collection 2405 assert fk not in metadata._fk_memos[("t1", "x")] 2406 2407 # make the referenced table 2408 t1 = Table('t1', metadata, Column('x', Integer)) 2409 2410 # t2 tells us exactly what's wrong 2411 assert_raises_message( 2412 exc.InvalidRequestError, 2413 "Table t2 is no longer associated with its parent MetaData", 2414 getattr, fk, "column" 2415 ) 2416 2417 # t3 is unaffected 2418 assert t3.c.y.references(t1.c.x) 2419 2420 # remove twice OK 2421 metadata.remove(t2) 2422 2423 2424class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): 2425 2426 """Test Column() construction.""" 2427 2428 __dialect__ = 'default' 2429 2430 def columns(self): 2431 return [Column(Integer), 2432 Column('b', Integer), 2433 Column(Integer), 2434 Column('d', Integer), 2435 Column(Integer, name='e'), 2436 Column(type_=Integer), 2437 Column(Integer()), 2438 Column('h', Integer()), 2439 Column(type_=Integer())] 2440 2441 def test_basic(self): 2442 c = self.columns() 2443 2444 for i, v in ((0, 'a'), (2, 'c'), (5, 'f'), (6, 'g'), (8, 'i')): 2445 c[i].name = v 2446 c[i].key = v 2447 del i, v 2448 2449 tbl = Table('table', MetaData(), *c) 2450 2451 for i, col in enumerate(tbl.c): 2452 assert col.name == c[i].name 2453 2454 def test_name_none(self): 2455 2456 c = Column(Integer) 2457 assert_raises_message( 2458 exc.ArgumentError, 2459 "Column must be constructed with a non-blank name or assign a " 2460 "non-blank .name ", 2461 Table, 't', MetaData(), c) 2462 2463 def test_name_blank(self): 2464 2465 c = Column('', Integer) 2466 assert_raises_message( 2467 exc.ArgumentError, 2468 "Column must be constructed with a non-blank name or assign a " 2469 "non-blank .name ", 2470 Table, 't', MetaData(), c) 2471 2472 def test_dupe_column(self): 2473 c = Column('x', Integer) 2474 Table('t', MetaData(), c) 2475 2476 assert_raises_message( 2477 exc.ArgumentError, 2478 "Column object 'x' already assigned to Table 't'", 2479 Table, 'q', MetaData(), c) 2480 2481 def test_incomplete_key(self): 2482 c = Column(Integer) 2483 assert c.name is None 2484 assert c.key is None 2485 2486 c.name = 'named' 2487 Table('t', MetaData(), c) 2488 2489 assert c.name == 'named' 2490 assert c.name == c.key 2491 2492 def test_unique_index_flags_default_to_none(self): 2493 c = Column(Integer) 2494 eq_(c.unique, None) 2495 eq_(c.index, None) 2496 2497 c = Column('c', Integer, index=True) 2498 eq_(c.unique, None) 2499 eq_(c.index, True) 2500 2501 t = Table('t', MetaData(), c) 2502 eq_(list(t.indexes)[0].unique, False) 2503 2504 c = Column(Integer, unique=True) 2505 eq_(c.unique, True) 2506 eq_(c.index, None) 2507 2508 c = Column('c', Integer, index=True, unique=True) 2509 eq_(c.unique, True) 2510 eq_(c.index, True) 2511 2512 t = Table('t', MetaData(), c) 2513 eq_(list(t.indexes)[0].unique, True) 2514 2515 def test_bogus(self): 2516 assert_raises(exc.ArgumentError, Column, 'foo', name='bar') 2517 assert_raises(exc.ArgumentError, Column, 'foo', Integer, 2518 type_=Integer()) 2519 2520 def test_custom_subclass_proxy(self): 2521 """test proxy generation of a Column subclass, can be compiled.""" 2522 2523 from sqlalchemy.schema import Column 2524 from sqlalchemy.ext.compiler import compiles 2525 from sqlalchemy.sql import select 2526 2527 class MyColumn(Column): 2528 2529 def _constructor(self, name, type, **kw): 2530 kw['name'] = name 2531 return MyColumn(type, **kw) 2532 2533 def __init__(self, type, **kw): 2534 Column.__init__(self, type, **kw) 2535 2536 def my_goofy_thing(self): 2537 return "hi" 2538 2539 @compiles(MyColumn) 2540 def goofy(element, compiler, **kw): 2541 s = compiler.visit_column(element, **kw) 2542 return s + "-" 2543 2544 id = MyColumn(Integer, primary_key=True) 2545 id.name = 'id' 2546 name = MyColumn(String) 2547 name.name = 'name' 2548 t1 = Table('foo', MetaData(), 2549 id, 2550 name 2551 ) 2552 2553 # goofy thing 2554 eq_(t1.c.name.my_goofy_thing(), "hi") 2555 2556 # create proxy 2557 s = select([t1.select().alias()]) 2558 2559 # proxy has goofy thing 2560 eq_(s.c.name.my_goofy_thing(), "hi") 2561 2562 # compile works 2563 self.assert_compile( 2564 select([t1.select().alias()]), 2565 "SELECT anon_1.id-, anon_1.name- FROM " 2566 "(SELECT foo.id- AS id, foo.name- AS name " 2567 "FROM foo) AS anon_1", 2568 ) 2569 2570 def test_custom_subclass_proxy_typeerror(self): 2571 from sqlalchemy.schema import Column 2572 from sqlalchemy.sql import select 2573 2574 class MyColumn(Column): 2575 2576 def __init__(self, type, **kw): 2577 Column.__init__(self, type, **kw) 2578 2579 id = MyColumn(Integer, primary_key=True) 2580 id.name = 'id' 2581 name = MyColumn(String) 2582 name.name = 'name' 2583 t1 = Table('foo', MetaData(), 2584 id, 2585 name 2586 ) 2587 assert_raises_message( 2588 TypeError, 2589 "Could not create a copy of this <class " 2590 "'test.sql.test_metadata..*MyColumn'> " 2591 "object. Ensure the class includes a _constructor()", 2592 getattr, select([t1.select().alias()]), 'c' 2593 ) 2594 2595 def test_custom_create(self): 2596 from sqlalchemy.ext.compiler import compiles, deregister 2597 2598 @compiles(schema.CreateColumn) 2599 def compile(element, compiler, **kw): 2600 column = element.element 2601 2602 if "special" not in column.info: 2603 return compiler.visit_create_column(element, **kw) 2604 2605 text = "%s SPECIAL DIRECTIVE %s" % ( 2606 column.name, 2607 compiler.type_compiler.process(column.type) 2608 ) 2609 default = compiler.get_column_default_string(column) 2610 if default is not None: 2611 text += " DEFAULT " + default 2612 2613 if not column.nullable: 2614 text += " NOT NULL" 2615 2616 if column.constraints: 2617 text += " ".join( 2618 compiler.process(const) 2619 for const in column.constraints) 2620 return text 2621 2622 t = Table( 2623 'mytable', MetaData(), 2624 Column('x', Integer, info={ 2625 "special": True}, primary_key=True), 2626 Column('y', String(50)), 2627 Column('z', String(20), info={ 2628 "special": True})) 2629 2630 self.assert_compile( 2631 schema.CreateTable(t), 2632 "CREATE TABLE mytable (x SPECIAL DIRECTIVE INTEGER " 2633 "NOT NULL, y VARCHAR(50), " 2634 "z SPECIAL DIRECTIVE VARCHAR(20), PRIMARY KEY (x))" 2635 ) 2636 2637 deregister(schema.CreateColumn) 2638 2639 2640class ColumnDefaultsTest(fixtures.TestBase): 2641 2642 """test assignment of default fixures to columns""" 2643 2644 def _fixture(self, *arg, **kw): 2645 return Column('x', Integer, *arg, **kw) 2646 2647 def test_server_default_positional(self): 2648 target = schema.DefaultClause('y') 2649 c = self._fixture(target) 2650 assert c.server_default is target 2651 assert target.column is c 2652 2653 def test_onupdate_default_not_server_default_one(self): 2654 target1 = schema.DefaultClause('y') 2655 target2 = schema.DefaultClause('z') 2656 2657 c = self._fixture(server_default=target1, server_onupdate=target2) 2658 eq_(c.server_default.arg, 'y') 2659 eq_(c.server_onupdate.arg, 'z') 2660 2661 def test_onupdate_default_not_server_default_two(self): 2662 target1 = schema.DefaultClause('y', for_update=True) 2663 target2 = schema.DefaultClause('z', for_update=True) 2664 2665 c = self._fixture(server_default=target1, server_onupdate=target2) 2666 eq_(c.server_default.arg, 'y') 2667 eq_(c.server_onupdate.arg, 'z') 2668 2669 def test_onupdate_default_not_server_default_three(self): 2670 target1 = schema.DefaultClause('y', for_update=False) 2671 target2 = schema.DefaultClause('z', for_update=True) 2672 2673 c = self._fixture(target1, target2) 2674 eq_(c.server_default.arg, 'y') 2675 eq_(c.server_onupdate.arg, 'z') 2676 2677 def test_onupdate_default_not_server_default_four(self): 2678 target1 = schema.DefaultClause('y', for_update=False) 2679 2680 c = self._fixture(server_onupdate=target1) 2681 is_(c.server_default, None) 2682 eq_(c.server_onupdate.arg, 'y') 2683 2684 def test_server_default_keyword_as_schemaitem(self): 2685 target = schema.DefaultClause('y') 2686 c = self._fixture(server_default=target) 2687 assert c.server_default is target 2688 assert target.column is c 2689 2690 def test_server_default_keyword_as_clause(self): 2691 target = 'y' 2692 c = self._fixture(server_default=target) 2693 assert c.server_default.arg == target 2694 assert c.server_default.column is c 2695 2696 def test_server_default_onupdate_positional(self): 2697 target = schema.DefaultClause('y', for_update=True) 2698 c = self._fixture(target) 2699 assert c.server_onupdate is target 2700 assert target.column is c 2701 2702 def test_server_default_onupdate_keyword_as_schemaitem(self): 2703 target = schema.DefaultClause('y', for_update=True) 2704 c = self._fixture(server_onupdate=target) 2705 assert c.server_onupdate is target 2706 assert target.column is c 2707 2708 def test_server_default_onupdate_keyword_as_clause(self): 2709 target = 'y' 2710 c = self._fixture(server_onupdate=target) 2711 assert c.server_onupdate.arg == target 2712 assert c.server_onupdate.column is c 2713 2714 def test_column_default_positional(self): 2715 target = schema.ColumnDefault('y') 2716 c = self._fixture(target) 2717 assert c.default is target 2718 assert target.column is c 2719 2720 def test_column_default_keyword_as_schemaitem(self): 2721 target = schema.ColumnDefault('y') 2722 c = self._fixture(default=target) 2723 assert c.default is target 2724 assert target.column is c 2725 2726 def test_column_default_keyword_as_clause(self): 2727 target = 'y' 2728 c = self._fixture(default=target) 2729 assert c.default.arg == target 2730 assert c.default.column is c 2731 2732 def test_column_default_onupdate_positional(self): 2733 target = schema.ColumnDefault('y', for_update=True) 2734 c = self._fixture(target) 2735 assert c.onupdate is target 2736 assert target.column is c 2737 2738 def test_column_default_onupdate_keyword_as_schemaitem(self): 2739 target = schema.ColumnDefault('y', for_update=True) 2740 c = self._fixture(onupdate=target) 2741 assert c.onupdate is target 2742 assert target.column is c 2743 2744 def test_column_default_onupdate_keyword_as_clause(self): 2745 target = 'y' 2746 c = self._fixture(onupdate=target) 2747 assert c.onupdate.arg == target 2748 assert c.onupdate.column is c 2749 2750 2751class ColumnOptionsTest(fixtures.TestBase): 2752 2753 def test_default_generators(self): 2754 g1, g2 = Sequence('foo_id_seq'), ColumnDefault('f5') 2755 assert Column(String, default=g1).default is g1 2756 assert Column(String, onupdate=g1).onupdate is g1 2757 assert Column(String, default=g2).default is g2 2758 assert Column(String, onupdate=g2).onupdate is g2 2759 2760 def _null_type_error(self, col): 2761 t = Table('t', MetaData(), col) 2762 assert_raises_message( 2763 exc.CompileError, 2764 r"\(in table 't', column 'foo'\): Can't generate DDL for NullType", 2765 schema.CreateTable(t).compile 2766 ) 2767 2768 def _no_name_error(self, col): 2769 assert_raises_message( 2770 exc.ArgumentError, 2771 "Column must be constructed with a non-blank name or " 2772 "assign a non-blank .name", 2773 Table, 't', MetaData(), col 2774 ) 2775 2776 def _no_error(self, col): 2777 m = MetaData() 2778 b = Table('bar', m, Column('id', Integer)) 2779 t = Table('t', m, col) 2780 schema.CreateTable(t).compile() 2781 2782 def test_argument_signatures(self): 2783 self._no_name_error(Column()) 2784 self._null_type_error(Column("foo")) 2785 self._no_name_error(Column(default="foo")) 2786 2787 self._no_name_error(Column(Sequence("a"))) 2788 self._null_type_error(Column("foo", default="foo")) 2789 2790 self._null_type_error(Column("foo", Sequence("a"))) 2791 2792 self._no_name_error(Column(ForeignKey('bar.id'))) 2793 2794 self._no_error(Column("foo", ForeignKey('bar.id'))) 2795 2796 self._no_name_error(Column(ForeignKey('bar.id'), default="foo")) 2797 2798 self._no_name_error(Column(ForeignKey('bar.id'), Sequence("a"))) 2799 self._no_error(Column("foo", ForeignKey('bar.id'), default="foo")) 2800 self._no_error(Column("foo", ForeignKey('bar.id'), Sequence("a"))) 2801 2802 def test_column_info(self): 2803 2804 c1 = Column('foo', String, info={'x': 'y'}) 2805 c2 = Column('bar', String, info={}) 2806 c3 = Column('bat', String) 2807 assert c1.info == {'x': 'y'} 2808 assert c2.info == {} 2809 assert c3.info == {} 2810 2811 for c in (c1, c2, c3): 2812 c.info['bar'] = 'zip' 2813 assert c.info['bar'] == 'zip' 2814 2815 2816class CatchAllEventsTest(fixtures.RemovesEvents, fixtures.TestBase): 2817 2818 def test_all_events(self): 2819 canary = [] 2820 2821 def before_attach(obj, parent): 2822 canary.append("%s->%s" % (obj.__class__.__name__, 2823 parent.__class__.__name__)) 2824 2825 def after_attach(obj, parent): 2826 canary.append("%s->%s" % (obj.__class__.__name__, parent)) 2827 2828 self.event_listen( 2829 schema.SchemaItem, 2830 "before_parent_attach", 2831 before_attach) 2832 self.event_listen( 2833 schema.SchemaItem, 2834 "after_parent_attach", 2835 after_attach) 2836 2837 m = MetaData() 2838 Table('t1', m, 2839 Column('id', Integer, Sequence('foo_id'), primary_key=True), 2840 Column('bar', String, ForeignKey('t2.id')) 2841 ) 2842 Table('t2', m, 2843 Column('id', Integer, primary_key=True), 2844 ) 2845 2846 eq_( 2847 canary, 2848 ['Sequence->Column', 'Sequence->id', 'ForeignKey->Column', 2849 'ForeignKey->bar', 'Table->MetaData', 2850 'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t1', 2851 'Column->Table', 'Column->t1', 'Column->Table', 2852 'Column->t1', 'ForeignKeyConstraint->Table', 2853 'ForeignKeyConstraint->t1', 'Table->MetaData(bind=None)', 2854 'Table->MetaData', 'PrimaryKeyConstraint->Table', 2855 'PrimaryKeyConstraint->t2', 'Column->Table', 'Column->t2', 2856 'Table->MetaData(bind=None)'] 2857 ) 2858 2859 def test_events_per_constraint(self): 2860 canary = [] 2861 2862 def evt(target): 2863 def before_attach(obj, parent): 2864 canary.append("%s->%s" % (target.__name__, 2865 parent.__class__.__name__)) 2866 2867 def after_attach(obj, parent): 2868 assert hasattr(obj, 'name') # so we can change it 2869 canary.append("%s->%s" % (target.__name__, parent)) 2870 self.event_listen(target, "before_parent_attach", before_attach) 2871 self.event_listen(target, "after_parent_attach", after_attach) 2872 2873 for target in [ 2874 schema.ForeignKeyConstraint, schema.PrimaryKeyConstraint, 2875 schema.UniqueConstraint, 2876 schema.CheckConstraint, 2877 schema.Index 2878 ]: 2879 evt(target) 2880 2881 m = MetaData() 2882 Table('t1', m, 2883 Column('id', Integer, Sequence('foo_id'), primary_key=True), 2884 Column('bar', String, ForeignKey('t2.id'), index=True), 2885 Column('bat', Integer, unique=True), 2886 ) 2887 Table('t2', m, 2888 Column('id', Integer, primary_key=True), 2889 Column('bar', Integer), 2890 Column('bat', Integer), 2891 CheckConstraint("bar>5"), 2892 UniqueConstraint('bar', 'bat'), 2893 Index(None, 'bar', 'bat') 2894 ) 2895 eq_( 2896 canary, 2897 [ 2898 'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t1', 2899 'Index->Table', 'Index->t1', 2900 'ForeignKeyConstraint->Table', 'ForeignKeyConstraint->t1', 2901 'UniqueConstraint->Table', 'UniqueConstraint->t1', 2902 'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t2', 2903 'CheckConstraint->Table', 'CheckConstraint->t2', 2904 'UniqueConstraint->Table', 'UniqueConstraint->t2', 2905 'Index->Table', 'Index->t2' 2906 ] 2907 ) 2908 2909 2910class DialectKWArgTest(fixtures.TestBase): 2911 2912 @contextmanager 2913 def _fixture(self): 2914 from sqlalchemy.engine.default import DefaultDialect 2915 2916 class ParticipatingDialect(DefaultDialect): 2917 construct_arguments = [ 2918 (schema.Index, { 2919 "x": 5, 2920 "y": False, 2921 "z_one": None 2922 }), 2923 (schema.ForeignKeyConstraint, { 2924 "foobar": False 2925 }) 2926 ] 2927 2928 class ParticipatingDialect2(DefaultDialect): 2929 construct_arguments = [ 2930 (schema.Index, { 2931 "x": 9, 2932 "y": True, 2933 "pp": "default" 2934 }), 2935 (schema.Table, { 2936 "*": None 2937 }) 2938 ] 2939 2940 class NonParticipatingDialect(DefaultDialect): 2941 construct_arguments = None 2942 2943 def load(dialect_name): 2944 if dialect_name == "participating": 2945 return ParticipatingDialect 2946 elif dialect_name == "participating2": 2947 return ParticipatingDialect2 2948 elif dialect_name == "nonparticipating": 2949 return NonParticipatingDialect 2950 else: 2951 raise exc.NoSuchModuleError("no dialect %r" % dialect_name) 2952 with mock.patch("sqlalchemy.dialects.registry.load", load): 2953 yield 2954 2955 def teardown(self): 2956 Index._kw_registry.clear() 2957 2958 def test_participating(self): 2959 with self._fixture(): 2960 idx = Index('a', 'b', 'c', participating_y=True) 2961 eq_( 2962 idx.dialect_options, 2963 {"participating": {"x": 5, "y": True, "z_one": None}} 2964 ) 2965 eq_( 2966 idx.dialect_kwargs, 2967 { 2968 'participating_y': True, 2969 } 2970 ) 2971 2972 def test_nonparticipating(self): 2973 with self._fixture(): 2974 idx = Index( 2975 'a', 2976 'b', 2977 'c', 2978 nonparticipating_y=True, 2979 nonparticipating_q=5) 2980 eq_( 2981 idx.dialect_kwargs, 2982 { 2983 'nonparticipating_y': True, 2984 'nonparticipating_q': 5 2985 } 2986 ) 2987 2988 def test_bad_kwarg_raise(self): 2989 with self._fixture(): 2990 assert_raises_message( 2991 TypeError, 2992 "Additional arguments should be named " 2993 "<dialectname>_<argument>, got 'foobar'", 2994 Index, 'a', 'b', 'c', foobar=True 2995 ) 2996 2997 def test_unknown_dialect_warning(self): 2998 with self._fixture(): 2999 assert_raises_message( 3000 exc.SAWarning, 3001 "Can't validate argument 'unknown_y'; can't locate " 3002 "any SQLAlchemy dialect named 'unknown'", 3003 Index, 'a', 'b', 'c', unknown_y=True 3004 ) 3005 3006 def test_participating_bad_kw(self): 3007 with self._fixture(): 3008 assert_raises_message( 3009 exc.ArgumentError, 3010 "Argument 'participating_q_p_x' is not accepted by dialect " 3011 "'participating' on behalf of " 3012 "<class 'sqlalchemy.sql.schema.Index'>", 3013 Index, 'a', 'b', 'c', participating_q_p_x=8 3014 ) 3015 3016 def test_participating_unknown_schema_item(self): 3017 with self._fixture(): 3018 # the dialect doesn't include UniqueConstraint in 3019 # its registry at all. 3020 assert_raises_message( 3021 exc.ArgumentError, 3022 "Argument 'participating_q_p_x' is not accepted by dialect " 3023 "'participating' on behalf of " 3024 "<class 'sqlalchemy.sql.schema.UniqueConstraint'>", 3025 UniqueConstraint, 'a', 'b', participating_q_p_x=8 3026 ) 3027 3028 @testing.emits_warning("Can't validate") 3029 def test_unknown_dialect_warning_still_populates(self): 3030 with self._fixture(): 3031 idx = Index('a', 'b', 'c', unknown_y=True) 3032 eq_(idx.dialect_kwargs, {"unknown_y": True}) # still populates 3033 3034 @testing.emits_warning("Can't validate") 3035 def test_unknown_dialect_warning_still_populates_multiple(self): 3036 with self._fixture(): 3037 idx = Index('a', 'b', 'c', unknown_y=True, unknown_z=5, 3038 otherunknown_foo='bar', participating_y=8) 3039 eq_( 3040 idx.dialect_options, 3041 { 3042 "unknown": {'y': True, 'z': 5, '*': None}, 3043 "otherunknown": {'foo': 'bar', '*': None}, 3044 "participating": {'x': 5, 'y': 8, 'z_one': None} 3045 } 3046 ) 3047 eq_(idx.dialect_kwargs, 3048 {'unknown_z': 5, 'participating_y': 8, 3049 'unknown_y': True, 3050 'otherunknown_foo': 'bar'} 3051 ) # still populates 3052 3053 def test_runs_safekwarg(self): 3054 3055 with mock.patch("sqlalchemy.util.safe_kwarg", 3056 lambda arg: "goofy_%s" % arg): 3057 with self._fixture(): 3058 idx = Index('a', 'b') 3059 idx.kwargs[util.u('participating_x')] = 7 3060 3061 eq_( 3062 list(idx.dialect_kwargs), 3063 ['goofy_participating_x'] 3064 ) 3065 3066 def test_combined(self): 3067 with self._fixture(): 3068 idx = Index('a', 'b', 'c', participating_x=7, 3069 nonparticipating_y=True) 3070 3071 eq_( 3072 idx.dialect_options, 3073 { 3074 'participating': {'y': False, 'x': 7, 'z_one': None}, 3075 'nonparticipating': {'y': True, '*': None} 3076 } 3077 ) 3078 eq_( 3079 idx.dialect_kwargs, 3080 { 3081 'participating_x': 7, 3082 'nonparticipating_y': True, 3083 } 3084 ) 3085 3086 def test_multiple_participating(self): 3087 with self._fixture(): 3088 idx = Index('a', 'b', 'c', 3089 participating_x=7, 3090 participating2_x=15, 3091 participating2_y="lazy" 3092 ) 3093 eq_( 3094 idx.dialect_options, 3095 { 3096 "participating": {'x': 7, 'y': False, 'z_one': None}, 3097 "participating2": {'x': 15, 'y': 'lazy', 'pp': 'default'}, 3098 } 3099 ) 3100 eq_( 3101 idx.dialect_kwargs, 3102 { 3103 'participating_x': 7, 3104 'participating2_x': 15, 3105 'participating2_y': 'lazy' 3106 } 3107 ) 3108 3109 def test_foreign_key_propagate(self): 3110 with self._fixture(): 3111 m = MetaData() 3112 fk = ForeignKey('t2.id', participating_foobar=True) 3113 t = Table('t', m, Column('id', Integer, fk)) 3114 fkc = [ 3115 c for c in t.constraints if isinstance( 3116 c, 3117 ForeignKeyConstraint)][0] 3118 eq_( 3119 fkc.dialect_kwargs, 3120 {'participating_foobar': True} 3121 ) 3122 3123 def test_foreign_key_propagate_exceptions_delayed(self): 3124 with self._fixture(): 3125 m = MetaData() 3126 fk = ForeignKey('t2.id', participating_fake=True) 3127 c1 = Column('id', Integer, fk) 3128 assert_raises_message( 3129 exc.ArgumentError, 3130 "Argument 'participating_fake' is not accepted by " 3131 "dialect 'participating' on behalf of " 3132 "<class 'sqlalchemy.sql.schema.ForeignKeyConstraint'>", 3133 Table, 't', m, c1 3134 ) 3135 3136 def test_wildcard(self): 3137 with self._fixture(): 3138 m = MetaData() 3139 t = Table('x', m, Column('x', Integer), 3140 participating2_xyz='foo', 3141 participating2_engine='InnoDB', 3142 ) 3143 eq_( 3144 t.dialect_kwargs, 3145 { 3146 'participating2_xyz': 'foo', 3147 'participating2_engine': 'InnoDB' 3148 } 3149 ) 3150 3151 def test_uninit_wildcard(self): 3152 with self._fixture(): 3153 m = MetaData() 3154 t = Table('x', m, Column('x', Integer)) 3155 eq_( 3156 t.dialect_options['participating2'], {'*': None} 3157 ) 3158 eq_( 3159 t.dialect_kwargs, {} 3160 ) 3161 3162 def test_not_contains_wildcard(self): 3163 with self._fixture(): 3164 m = MetaData() 3165 t = Table('x', m, Column('x', Integer)) 3166 assert 'foobar' not in t.dialect_options['participating2'] 3167 3168 def test_contains_wildcard(self): 3169 with self._fixture(): 3170 m = MetaData() 3171 t = Table('x', m, Column('x', Integer), participating2_foobar=5) 3172 assert 'foobar' in t.dialect_options['participating2'] 3173 3174 def test_update(self): 3175 with self._fixture(): 3176 idx = Index('a', 'b', 'c', participating_x=20) 3177 eq_(idx.dialect_kwargs, { 3178 "participating_x": 20, 3179 }) 3180 idx._validate_dialect_kwargs({ 3181 "participating_x": 25, 3182 "participating_z_one": "default"}) 3183 eq_(idx.dialect_options, { 3184 "participating": {"x": 25, "y": False, "z_one": "default"} 3185 }) 3186 eq_(idx.dialect_kwargs, { 3187 "participating_x": 25, 3188 'participating_z_one': "default" 3189 }) 3190 3191 idx._validate_dialect_kwargs({ 3192 "participating_x": 25, 3193 "participating_z_one": "default"}) 3194 3195 eq_(idx.dialect_options, { 3196 "participating": {"x": 25, "y": False, "z_one": "default"} 3197 }) 3198 eq_(idx.dialect_kwargs, { 3199 "participating_x": 25, 3200 'participating_z_one': "default" 3201 }) 3202 3203 idx._validate_dialect_kwargs({ 3204 "participating_y": True, 3205 'participating2_y': "p2y"}) 3206 eq_(idx.dialect_options, { 3207 "participating": {"x": 25, "y": True, "z_one": "default"}, 3208 "participating2": {"y": "p2y", "pp": "default", "x": 9} 3209 }) 3210 eq_(idx.dialect_kwargs, { 3211 "participating_x": 25, 3212 "participating_y": True, 3213 'participating2_y': "p2y", 3214 "participating_z_one": "default"}) 3215 3216 def test_key_error_kwargs_no_dialect(self): 3217 with self._fixture(): 3218 idx = Index('a', 'b', 'c') 3219 assert_raises( 3220 KeyError, 3221 idx.kwargs.__getitem__, 'foo_bar' 3222 ) 3223 3224 def test_key_error_kwargs_no_underscore(self): 3225 with self._fixture(): 3226 idx = Index('a', 'b', 'c') 3227 assert_raises( 3228 KeyError, 3229 idx.kwargs.__getitem__, 'foobar' 3230 ) 3231 3232 def test_key_error_kwargs_no_argument(self): 3233 with self._fixture(): 3234 idx = Index('a', 'b', 'c') 3235 assert_raises( 3236 KeyError, 3237 idx.kwargs.__getitem__, 'participating_asdmfq34098' 3238 ) 3239 3240 assert_raises( 3241 KeyError, 3242 idx.kwargs.__getitem__, 'nonparticipating_asdmfq34098' 3243 ) 3244 3245 def test_key_error_dialect_options(self): 3246 with self._fixture(): 3247 idx = Index('a', 'b', 'c') 3248 assert_raises( 3249 KeyError, 3250 idx.dialect_options['participating'].__getitem__, 'asdfaso890' 3251 ) 3252 3253 assert_raises( 3254 KeyError, 3255 idx.dialect_options['nonparticipating'].__getitem__, 3256 'asdfaso890') 3257 3258 def test_ad_hoc_participating_via_opt(self): 3259 with self._fixture(): 3260 idx = Index('a', 'b', 'c') 3261 idx.dialect_options['participating']['foobar'] = 5 3262 3263 eq_(idx.dialect_options['participating']['foobar'], 5) 3264 eq_(idx.kwargs['participating_foobar'], 5) 3265 3266 def test_ad_hoc_nonparticipating_via_opt(self): 3267 with self._fixture(): 3268 idx = Index('a', 'b', 'c') 3269 idx.dialect_options['nonparticipating']['foobar'] = 5 3270 3271 eq_(idx.dialect_options['nonparticipating']['foobar'], 5) 3272 eq_(idx.kwargs['nonparticipating_foobar'], 5) 3273 3274 def test_ad_hoc_participating_via_kwargs(self): 3275 with self._fixture(): 3276 idx = Index('a', 'b', 'c') 3277 idx.kwargs['participating_foobar'] = 5 3278 3279 eq_(idx.dialect_options['participating']['foobar'], 5) 3280 eq_(idx.kwargs['participating_foobar'], 5) 3281 3282 def test_ad_hoc_nonparticipating_via_kwargs(self): 3283 with self._fixture(): 3284 idx = Index('a', 'b', 'c') 3285 idx.kwargs['nonparticipating_foobar'] = 5 3286 3287 eq_(idx.dialect_options['nonparticipating']['foobar'], 5) 3288 eq_(idx.kwargs['nonparticipating_foobar'], 5) 3289 3290 def test_ad_hoc_via_kwargs_invalid_key(self): 3291 with self._fixture(): 3292 idx = Index('a', 'b', 'c') 3293 assert_raises_message( 3294 exc.ArgumentError, 3295 "Keys must be of the form <dialectname>_<argname>", 3296 idx.kwargs.__setitem__, "foobar", 5 3297 ) 3298 3299 def test_ad_hoc_via_kwargs_invalid_dialect(self): 3300 with self._fixture(): 3301 idx = Index('a', 'b', 'c') 3302 assert_raises_message( 3303 exc.ArgumentError, 3304 "no dialect 'nonexistent'", 3305 idx.kwargs.__setitem__, "nonexistent_foobar", 5 3306 ) 3307 3308 def test_add_new_arguments_participating(self): 3309 with self._fixture(): 3310 Index.argument_for("participating", "xyzqpr", False) 3311 3312 idx = Index('a', 'b', 'c', participating_xyzqpr=True) 3313 3314 eq_(idx.kwargs['participating_xyzqpr'], True) 3315 3316 idx = Index('a', 'b', 'c') 3317 eq_(idx.dialect_options['participating']['xyzqpr'], False) 3318 3319 def test_add_new_arguments_participating_no_existing(self): 3320 with self._fixture(): 3321 PrimaryKeyConstraint.argument_for("participating", "xyzqpr", False) 3322 3323 pk = PrimaryKeyConstraint('a', 'b', 'c', participating_xyzqpr=True) 3324 3325 eq_(pk.kwargs['participating_xyzqpr'], True) 3326 3327 pk = PrimaryKeyConstraint('a', 'b', 'c') 3328 eq_(pk.dialect_options['participating']['xyzqpr'], False) 3329 3330 def test_add_new_arguments_nonparticipating(self): 3331 with self._fixture(): 3332 assert_raises_message( 3333 exc.ArgumentError, 3334 "Dialect 'nonparticipating' does have keyword-argument " 3335 "validation and defaults enabled configured", 3336 Index.argument_for, "nonparticipating", "xyzqpr", False 3337 ) 3338 3339 def test_add_new_arguments_invalid_dialect(self): 3340 with self._fixture(): 3341 assert_raises_message( 3342 exc.ArgumentError, 3343 "no dialect 'nonexistent'", 3344 Index.argument_for, "nonexistent", "foobar", 5 3345 ) 3346 3347 3348class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): 3349 __dialect__ = 'default' 3350 3351 def _fixture(self, naming_convention, table_schema=None): 3352 m1 = MetaData(naming_convention=naming_convention) 3353 3354 u1 = Table('user', m1, 3355 Column('id', Integer, primary_key=True), 3356 Column('version', Integer, primary_key=True), 3357 Column('data', String(30)), 3358 schema=table_schema 3359 ) 3360 3361 return u1 3362 3363 def test_uq_name(self): 3364 u1 = self._fixture(naming_convention={ 3365 "uq": "uq_%(table_name)s_%(column_0_name)s" 3366 }) 3367 uq = UniqueConstraint(u1.c.data) 3368 eq_(uq.name, "uq_user_data") 3369 3370 def test_ck_name_required(self): 3371 u1 = self._fixture(naming_convention={ 3372 "ck": "ck_%(table_name)s_%(constraint_name)s" 3373 }) 3374 ck = CheckConstraint(u1.c.data == 'x', name='mycheck') 3375 eq_(ck.name, "ck_user_mycheck") 3376 3377 assert_raises_message( 3378 exc.InvalidRequestError, 3379 r"Naming convention including %\(constraint_name\)s token " 3380 "requires that constraint is explicitly named.", 3381 CheckConstraint, u1.c.data == 'x' 3382 ) 3383 3384 def test_ck_name_deferred_required(self): 3385 u1 = self._fixture(naming_convention={ 3386 "ck": "ck_%(table_name)s_%(constraint_name)s" 3387 }) 3388 ck = CheckConstraint(u1.c.data == 'x', name=elements._defer_name(None)) 3389 3390 assert_raises_message( 3391 exc.InvalidRequestError, 3392 r"Naming convention including %\(constraint_name\)s token " 3393 "requires that constraint is explicitly named.", 3394 schema.AddConstraint(ck).compile 3395 ) 3396 3397 def test_column_attached_ck_name(self): 3398 m = MetaData(naming_convention={ 3399 "ck": "ck_%(table_name)s_%(constraint_name)s" 3400 }) 3401 ck = CheckConstraint('x > 5', name='x1') 3402 Table('t', m, Column('x', ck)) 3403 eq_(ck.name, "ck_t_x1") 3404 3405 def test_table_attached_ck_name(self): 3406 m = MetaData(naming_convention={ 3407 "ck": "ck_%(table_name)s_%(constraint_name)s" 3408 }) 3409 ck = CheckConstraint('x > 5', name='x1') 3410 Table('t', m, Column('x', Integer), ck) 3411 eq_(ck.name, "ck_t_x1") 3412 3413 def test_uq_name_already_conv(self): 3414 m = MetaData(naming_convention={ 3415 "uq": "uq_%(constraint_name)s_%(column_0_name)s" 3416 }) 3417 3418 t = Table('mytable', m) 3419 uq = UniqueConstraint(name=naming.conv('my_special_key')) 3420 3421 t.append_constraint(uq) 3422 eq_(uq.name, "my_special_key") 3423 3424 def test_fk_name_schema(self): 3425 u1 = self._fixture(naming_convention={ 3426 "fk": "fk_%(table_name)s_%(column_0_name)s_" 3427 "%(referred_table_name)s_%(referred_column_0_name)s" 3428 }, table_schema="foo") 3429 m1 = u1.metadata 3430 a1 = Table('address', m1, 3431 Column('id', Integer, primary_key=True), 3432 Column('user_id', Integer), 3433 Column('user_version_id', Integer) 3434 ) 3435 fk = ForeignKeyConstraint(['user_id', 'user_version_id'], 3436 ['foo.user.id', 'foo.user.version']) 3437 a1.append_constraint(fk) 3438 eq_(fk.name, "fk_address_user_id_user_id") 3439 3440 def test_fk_attrs(self): 3441 u1 = self._fixture(naming_convention={ 3442 "fk": "fk_%(table_name)s_%(column_0_name)s_" 3443 "%(referred_table_name)s_%(referred_column_0_name)s" 3444 }) 3445 m1 = u1.metadata 3446 a1 = Table('address', m1, 3447 Column('id', Integer, primary_key=True), 3448 Column('user_id', Integer), 3449 Column('user_version_id', Integer) 3450 ) 3451 fk = ForeignKeyConstraint(['user_id', 'user_version_id'], 3452 ['user.id', 'user.version']) 3453 a1.append_constraint(fk) 3454 eq_(fk.name, "fk_address_user_id_user_id") 3455 3456 def test_custom(self): 3457 def key_hash(const, table): 3458 return "HASH_%s" % table.name 3459 3460 u1 = self._fixture(naming_convention={ 3461 "fk": "fk_%(table_name)s_%(key_hash)s", 3462 "key_hash": key_hash 3463 }) 3464 m1 = u1.metadata 3465 a1 = Table('address', m1, 3466 Column('id', Integer, primary_key=True), 3467 Column('user_id', Integer), 3468 Column('user_version_id', Integer) 3469 ) 3470 fk = ForeignKeyConstraint(['user_id', 'user_version_id'], 3471 ['user.id', 'user.version']) 3472 a1.append_constraint(fk) 3473 eq_(fk.name, "fk_address_HASH_address") 3474 3475 def test_schematype_ck_name_boolean(self): 3476 m1 = MetaData(naming_convention={ 3477 "ck": "ck_%(table_name)s_%(constraint_name)s"}) 3478 3479 u1 = Table('user', m1, 3480 Column('x', Boolean(name='foo')) 3481 ) 3482 # constraint is not hit 3483 eq_( 3484 [c for c in u1.constraints 3485 if isinstance(c, CheckConstraint)][0].name, "foo" 3486 ) 3487 # but is hit at compile time 3488 self.assert_compile( 3489 schema.CreateTable(u1), 3490 'CREATE TABLE "user" (' 3491 "x BOOLEAN, " 3492 "CONSTRAINT ck_user_foo CHECK (x IN (0, 1))" 3493 ")" 3494 ) 3495 3496 def test_schematype_ck_name_boolean_not_on_name(self): 3497 m1 = MetaData(naming_convention={ 3498 "ck": "ck_%(table_name)s_%(column_0_name)s"}) 3499 3500 u1 = Table('user', m1, 3501 Column('x', Boolean()) 3502 ) 3503 # constraint is not hit 3504 eq_( 3505 [c for c in u1.constraints 3506 if isinstance(c, CheckConstraint)][0].name, "_unnamed_" 3507 ) 3508 # but is hit at compile time 3509 self.assert_compile( 3510 schema.CreateTable(u1), 3511 'CREATE TABLE "user" (' 3512 "x BOOLEAN, " 3513 "CONSTRAINT ck_user_x CHECK (x IN (0, 1))" 3514 ")" 3515 ) 3516 3517 def test_schematype_ck_name_enum(self): 3518 m1 = MetaData(naming_convention={ 3519 "ck": "ck_%(table_name)s_%(constraint_name)s"}) 3520 3521 u1 = Table('user', m1, 3522 Column('x', Enum('a', 'b', name='foo')) 3523 ) 3524 eq_( 3525 [c for c in u1.constraints 3526 if isinstance(c, CheckConstraint)][0].name, "foo" 3527 ) 3528 # but is hit at compile time 3529 self.assert_compile( 3530 schema.CreateTable(u1), 3531 'CREATE TABLE "user" (' 3532 "x VARCHAR(1), " 3533 "CONSTRAINT ck_user_foo CHECK (x IN ('a', 'b'))" 3534 ")" 3535 ) 3536 3537 def test_schematype_ck_name_propagate_conv(self): 3538 m1 = MetaData(naming_convention={ 3539 "ck": "ck_%(table_name)s_%(constraint_name)s"}) 3540 3541 u1 = Table('user', m1, 3542 Column('x', Enum('a', 'b', name=naming.conv('foo'))) 3543 ) 3544 eq_( 3545 [c for c in u1.constraints 3546 if isinstance(c, CheckConstraint)][0].name, "foo" 3547 ) 3548 # but is hit at compile time 3549 self.assert_compile( 3550 schema.CreateTable(u1), 3551 'CREATE TABLE "user" (' 3552 "x VARCHAR(1), " 3553 "CONSTRAINT foo CHECK (x IN ('a', 'b'))" 3554 ")" 3555 ) 3556 3557 def test_schematype_ck_name_boolean_no_name(self): 3558 m1 = MetaData(naming_convention={ 3559 "ck": "ck_%(table_name)s_%(constraint_name)s" 3560 }) 3561 3562 u1 = Table( 3563 'user', m1, 3564 Column('x', Boolean()) 3565 ) 3566 # constraint gets special _defer_none_name 3567 eq_( 3568 [c for c in u1.constraints 3569 if isinstance(c, CheckConstraint)][0].name, "_unnamed_" 3570 ) 3571 # no issue with native boolean 3572 self.assert_compile( 3573 schema.CreateTable(u1), 3574 'CREATE TABLE "user" (' 3575 "x BOOLEAN" 3576 ")", 3577 dialect='postgresql' 3578 ) 3579 3580 assert_raises_message( 3581 exc.InvalidRequestError, 3582 "Naming convention including \%\(constraint_name\)s token " 3583 "requires that constraint is explicitly named.", 3584 schema.CreateTable(u1).compile 3585 ) 3586 3587 def test_schematype_no_ck_name_boolean_no_name(self): 3588 m1 = MetaData() # no naming convention 3589 3590 u1 = Table( 3591 'user', m1, 3592 Column('x', Boolean()) 3593 ) 3594 # constraint gets special _defer_none_name 3595 eq_( 3596 [c for c in u1.constraints 3597 if isinstance(c, CheckConstraint)][0].name, "_unnamed_" 3598 ) 3599 3600 self.assert_compile( 3601 schema.CreateTable(u1), 3602 'CREATE TABLE "user" (x BOOLEAN, CHECK (x IN (0, 1)))' 3603 ) 3604 3605 def test_ck_constraint_redundant_event(self): 3606 u1 = self._fixture(naming_convention={ 3607 "ck": "ck_%(table_name)s_%(constraint_name)s"}) 3608 3609 ck1 = CheckConstraint(u1.c.version > 3, name='foo') 3610 u1.append_constraint(ck1) 3611 u1.append_constraint(ck1) 3612 u1.append_constraint(ck1) 3613 3614 eq_(ck1.name, "ck_user_foo") 3615 3616 def test_pickle_metadata(self): 3617 m = MetaData(naming_convention={"pk": "%(table_name)s_pk"}) 3618 3619 m2 = pickle.loads(pickle.dumps(m)) 3620 3621 eq_(m2.naming_convention, {"pk": "%(table_name)s_pk"}) 3622 3623 t2a = Table('t2', m, Column('id', Integer, primary_key=True)) 3624 t2b = Table('t2', m2, Column('id', Integer, primary_key=True)) 3625 3626 eq_(t2a.primary_key.name, t2b.primary_key.name) 3627 eq_(t2b.primary_key.name, "t2_pk") 3628