1import operator 2import re 3 4import sqlalchemy as sa 5from .. import assert_raises_message 6from .. import config 7from .. import engines 8from .. import eq_ 9from .. import expect_warnings 10from .. import fixtures 11from .. import is_ 12from ..schema import Column 13from ..schema import Table 14from ... import event 15from ... import exc as sa_exc 16from ... import ForeignKey 17from ... import inspect 18from ... import Integer 19from ... import MetaData 20from ... import String 21from ... import testing 22from ... import types as sql_types 23from ...engine.reflection import Inspector 24from ...schema import DDL 25from ...schema import Index 26from ...sql.elements import quoted_name 27 28 29metadata, users = None, None 30 31 32class HasTableTest(fixtures.TablesTest): 33 __backend__ = True 34 35 @classmethod 36 def define_tables(cls, metadata): 37 Table( 38 "test_table", 39 metadata, 40 Column("id", Integer, primary_key=True), 41 Column("data", String(50)), 42 ) 43 44 def test_has_table(self): 45 with config.db.begin() as conn: 46 assert config.db.dialect.has_table(conn, "test_table") 47 assert not config.db.dialect.has_table(conn, "nonexistent_table") 48 49 50class ComponentReflectionTest(fixtures.TablesTest): 51 run_inserts = run_deletes = None 52 53 __backend__ = True 54 55 @classmethod 56 def setup_bind(cls): 57 if config.requirements.independent_connections.enabled: 58 from sqlalchemy import pool 59 60 return engines.testing_engine( 61 options=dict(poolclass=pool.StaticPool) 62 ) 63 else: 64 return config.db 65 66 @classmethod 67 def define_tables(cls, metadata): 68 cls.define_reflected_tables(metadata, None) 69 if testing.requires.schemas.enabled: 70 cls.define_reflected_tables(metadata, testing.config.test_schema) 71 72 @classmethod 73 def define_reflected_tables(cls, metadata, schema): 74 if schema: 75 schema_prefix = schema + "." 76 else: 77 schema_prefix = "" 78 79 if testing.requires.self_referential_foreign_keys.enabled: 80 users = Table( 81 "users", 82 metadata, 83 Column("user_id", sa.INT, primary_key=True), 84 Column("test1", sa.CHAR(5), nullable=False), 85 Column("test2", sa.Float(5), nullable=False), 86 Column( 87 "parent_user_id", 88 sa.Integer, 89 sa.ForeignKey( 90 "%susers.user_id" % schema_prefix, name="user_id_fk" 91 ), 92 ), 93 schema=schema, 94 test_needs_fk=True, 95 ) 96 else: 97 users = Table( 98 "users", 99 metadata, 100 Column("user_id", sa.INT, primary_key=True), 101 Column("test1", sa.CHAR(5), nullable=False), 102 Column("test2", sa.Float(5), nullable=False), 103 schema=schema, 104 test_needs_fk=True, 105 ) 106 107 Table( 108 "dingalings", 109 metadata, 110 Column("dingaling_id", sa.Integer, primary_key=True), 111 Column( 112 "address_id", 113 sa.Integer, 114 sa.ForeignKey("%semail_addresses.address_id" % schema_prefix), 115 ), 116 Column("data", sa.String(30)), 117 schema=schema, 118 test_needs_fk=True, 119 ) 120 Table( 121 "email_addresses", 122 metadata, 123 Column("address_id", sa.Integer), 124 Column( 125 "remote_user_id", sa.Integer, sa.ForeignKey(users.c.user_id) 126 ), 127 Column("email_address", sa.String(20)), 128 sa.PrimaryKeyConstraint("address_id", name="email_ad_pk"), 129 schema=schema, 130 test_needs_fk=True, 131 ) 132 Table( 133 "comment_test", 134 metadata, 135 Column("id", sa.Integer, primary_key=True, comment="id comment"), 136 Column("data", sa.String(20), comment="data % comment"), 137 Column( 138 "d2", 139 sa.String(20), 140 comment=r"""Comment types type speedily ' " \ '' Fun!""", 141 ), 142 schema=schema, 143 comment=r"""the test % ' " \ table comment""", 144 ) 145 146 if testing.requires.cross_schema_fk_reflection.enabled: 147 if schema is None: 148 Table( 149 "local_table", 150 metadata, 151 Column("id", sa.Integer, primary_key=True), 152 Column("data", sa.String(20)), 153 Column( 154 "remote_id", 155 ForeignKey( 156 "%s.remote_table_2.id" % testing.config.test_schema 157 ), 158 ), 159 test_needs_fk=True, 160 schema=config.db.dialect.default_schema_name, 161 ) 162 else: 163 Table( 164 "remote_table", 165 metadata, 166 Column("id", sa.Integer, primary_key=True), 167 Column( 168 "local_id", 169 ForeignKey( 170 "%s.local_table.id" 171 % config.db.dialect.default_schema_name 172 ), 173 ), 174 Column("data", sa.String(20)), 175 schema=schema, 176 test_needs_fk=True, 177 ) 178 Table( 179 "remote_table_2", 180 metadata, 181 Column("id", sa.Integer, primary_key=True), 182 Column("data", sa.String(20)), 183 schema=schema, 184 test_needs_fk=True, 185 ) 186 187 if testing.requires.index_reflection.enabled: 188 cls.define_index(metadata, users) 189 190 if not schema: 191 # test_needs_fk is at the moment to force MySQL InnoDB 192 noncol_idx_test_nopk = Table( 193 "noncol_idx_test_nopk", 194 metadata, 195 Column("q", sa.String(5)), 196 test_needs_fk=True, 197 ) 198 199 noncol_idx_test_pk = Table( 200 "noncol_idx_test_pk", 201 metadata, 202 Column("id", sa.Integer, primary_key=True), 203 Column("q", sa.String(5)), 204 test_needs_fk=True, 205 ) 206 Index("noncol_idx_nopk", noncol_idx_test_nopk.c.q.desc()) 207 Index("noncol_idx_pk", noncol_idx_test_pk.c.q.desc()) 208 209 if testing.requires.view_column_reflection.enabled: 210 cls.define_views(metadata, schema) 211 if not schema and testing.requires.temp_table_reflection.enabled: 212 cls.define_temp_tables(metadata) 213 214 @classmethod 215 def define_temp_tables(cls, metadata): 216 # cheat a bit, we should fix this with some dialect-level 217 # temp table fixture 218 if testing.against("oracle"): 219 kw = { 220 "prefixes": ["GLOBAL TEMPORARY"], 221 "oracle_on_commit": "PRESERVE ROWS", 222 } 223 else: 224 kw = {"prefixes": ["TEMPORARY"]} 225 226 user_tmp = Table( 227 "user_tmp", 228 metadata, 229 Column("id", sa.INT, primary_key=True), 230 Column("name", sa.VARCHAR(50)), 231 Column("foo", sa.INT), 232 sa.UniqueConstraint("name", name="user_tmp_uq"), 233 sa.Index("user_tmp_ix", "foo"), 234 **kw 235 ) 236 if ( 237 testing.requires.view_reflection.enabled 238 and testing.requires.temporary_views.enabled 239 ): 240 event.listen( 241 user_tmp, 242 "after_create", 243 DDL( 244 "create temporary view user_tmp_v as " 245 "select * from user_tmp" 246 ), 247 ) 248 event.listen(user_tmp, "before_drop", DDL("drop view user_tmp_v")) 249 250 @classmethod 251 def define_index(cls, metadata, users): 252 Index("users_t_idx", users.c.test1, users.c.test2) 253 Index("users_all_idx", users.c.user_id, users.c.test2, users.c.test1) 254 255 @classmethod 256 def define_views(cls, metadata, schema): 257 for table_name in ("users", "email_addresses"): 258 fullname = table_name 259 if schema: 260 fullname = "%s.%s" % (schema, table_name) 261 view_name = fullname + "_v" 262 query = "CREATE VIEW %s AS SELECT * FROM %s" % ( 263 view_name, 264 fullname, 265 ) 266 267 event.listen(metadata, "after_create", DDL(query)) 268 event.listen( 269 metadata, "before_drop", DDL("DROP VIEW %s" % view_name) 270 ) 271 272 @testing.requires.schema_reflection 273 def test_get_schema_names(self): 274 insp = inspect(testing.db) 275 276 self.assert_(testing.config.test_schema in insp.get_schema_names()) 277 278 @testing.requires.schema_reflection 279 def test_dialect_initialize(self): 280 engine = engines.testing_engine() 281 assert not hasattr(engine.dialect, "default_schema_name") 282 inspect(engine) 283 assert hasattr(engine.dialect, "default_schema_name") 284 285 @testing.requires.schema_reflection 286 def test_get_default_schema_name(self): 287 insp = inspect(testing.db) 288 eq_(insp.default_schema_name, testing.db.dialect.default_schema_name) 289 290 @testing.provide_metadata 291 def _test_get_table_names( 292 self, schema=None, table_type="table", order_by=None 293 ): 294 _ignore_tables = [ 295 "comment_test", 296 "noncol_idx_test_pk", 297 "noncol_idx_test_nopk", 298 "local_table", 299 "remote_table", 300 "remote_table_2", 301 ] 302 meta = self.metadata 303 users, addresses, dingalings = ( 304 self.tables.users, 305 self.tables.email_addresses, 306 self.tables.dingalings, 307 ) 308 insp = inspect(meta.bind) 309 310 if table_type == "view": 311 table_names = insp.get_view_names(schema) 312 table_names.sort() 313 answer = ["email_addresses_v", "users_v"] 314 eq_(sorted(table_names), answer) 315 else: 316 table_names = [ 317 t 318 for t in insp.get_table_names(schema, order_by=order_by) 319 if t not in _ignore_tables 320 ] 321 322 if order_by == "foreign_key": 323 answer = ["users", "email_addresses", "dingalings"] 324 eq_(table_names, answer) 325 else: 326 answer = ["dingalings", "email_addresses", "users"] 327 eq_(sorted(table_names), answer) 328 329 @testing.requires.temp_table_names 330 def test_get_temp_table_names(self): 331 insp = inspect(self.bind) 332 temp_table_names = insp.get_temp_table_names() 333 eq_(sorted(temp_table_names), ["user_tmp"]) 334 335 @testing.requires.view_reflection 336 @testing.requires.temp_table_names 337 @testing.requires.temporary_views 338 def test_get_temp_view_names(self): 339 insp = inspect(self.bind) 340 temp_table_names = insp.get_temp_view_names() 341 eq_(sorted(temp_table_names), ["user_tmp_v"]) 342 343 @testing.requires.table_reflection 344 def test_get_table_names(self): 345 self._test_get_table_names() 346 347 @testing.requires.table_reflection 348 @testing.requires.foreign_key_constraint_reflection 349 def test_get_table_names_fks(self): 350 self._test_get_table_names(order_by="foreign_key") 351 352 @testing.requires.comment_reflection 353 def test_get_comments(self): 354 self._test_get_comments() 355 356 @testing.requires.comment_reflection 357 @testing.requires.schemas 358 def test_get_comments_with_schema(self): 359 self._test_get_comments(testing.config.test_schema) 360 361 def _test_get_comments(self, schema=None): 362 insp = inspect(testing.db) 363 364 eq_( 365 insp.get_table_comment("comment_test", schema=schema), 366 {"text": r"""the test % ' " \ table comment"""}, 367 ) 368 369 eq_(insp.get_table_comment("users", schema=schema), {"text": None}) 370 371 eq_( 372 [ 373 {"name": rec["name"], "comment": rec["comment"]} 374 for rec in insp.get_columns("comment_test", schema=schema) 375 ], 376 [ 377 {"comment": "id comment", "name": "id"}, 378 {"comment": "data % comment", "name": "data"}, 379 { 380 "comment": ( 381 r"""Comment types type speedily ' " \ '' Fun!""" 382 ), 383 "name": "d2", 384 }, 385 ], 386 ) 387 388 @testing.requires.table_reflection 389 @testing.requires.schemas 390 def test_get_table_names_with_schema(self): 391 self._test_get_table_names(testing.config.test_schema) 392 393 @testing.requires.view_column_reflection 394 def test_get_view_names(self): 395 self._test_get_table_names(table_type="view") 396 397 @testing.requires.view_column_reflection 398 @testing.requires.schemas 399 def test_get_view_names_with_schema(self): 400 self._test_get_table_names( 401 testing.config.test_schema, table_type="view" 402 ) 403 404 @testing.requires.table_reflection 405 @testing.requires.view_column_reflection 406 def test_get_tables_and_views(self): 407 self._test_get_table_names() 408 self._test_get_table_names(table_type="view") 409 410 def _test_get_columns(self, schema=None, table_type="table"): 411 meta = MetaData(testing.db) 412 users, addresses, dingalings = ( 413 self.tables.users, 414 self.tables.email_addresses, 415 self.tables.dingalings, 416 ) 417 table_names = ["users", "email_addresses"] 418 if table_type == "view": 419 table_names = ["users_v", "email_addresses_v"] 420 insp = inspect(meta.bind) 421 for table_name, table in zip(table_names, (users, addresses)): 422 schema_name = schema 423 cols = insp.get_columns(table_name, schema=schema_name) 424 self.assert_(len(cols) > 0, len(cols)) 425 426 # should be in order 427 428 for i, col in enumerate(table.columns): 429 eq_(col.name, cols[i]["name"]) 430 ctype = cols[i]["type"].__class__ 431 ctype_def = col.type 432 if isinstance(ctype_def, sa.types.TypeEngine): 433 ctype_def = ctype_def.__class__ 434 435 # Oracle returns Date for DateTime. 436 437 if testing.against("oracle") and ctype_def in ( 438 sql_types.Date, 439 sql_types.DateTime, 440 ): 441 ctype_def = sql_types.Date 442 443 # assert that the desired type and return type share 444 # a base within one of the generic types. 445 446 self.assert_( 447 len( 448 set(ctype.__mro__) 449 .intersection(ctype_def.__mro__) 450 .intersection( 451 [ 452 sql_types.Integer, 453 sql_types.Numeric, 454 sql_types.DateTime, 455 sql_types.Date, 456 sql_types.Time, 457 sql_types.String, 458 sql_types._Binary, 459 ] 460 ) 461 ) 462 > 0, 463 "%s(%s), %s(%s)" 464 % (col.name, col.type, cols[i]["name"], ctype), 465 ) 466 467 if not col.primary_key: 468 assert cols[i]["default"] is None 469 470 @testing.requires.table_reflection 471 def test_get_columns(self): 472 self._test_get_columns() 473 474 @testing.provide_metadata 475 def _type_round_trip(self, *types): 476 t = Table( 477 "t", 478 self.metadata, 479 *[Column("t%d" % i, type_) for i, type_ in enumerate(types)] 480 ) 481 t.create() 482 483 return [ 484 c["type"] for c in inspect(self.metadata.bind).get_columns("t") 485 ] 486 487 @testing.requires.table_reflection 488 def test_numeric_reflection(self): 489 for typ in self._type_round_trip(sql_types.Numeric(18, 5)): 490 assert isinstance(typ, sql_types.Numeric) 491 eq_(typ.precision, 18) 492 eq_(typ.scale, 5) 493 494 @testing.requires.table_reflection 495 def test_varchar_reflection(self): 496 typ = self._type_round_trip(sql_types.String(52))[0] 497 assert isinstance(typ, sql_types.String) 498 eq_(typ.length, 52) 499 500 @testing.requires.table_reflection 501 @testing.provide_metadata 502 def test_nullable_reflection(self): 503 t = Table( 504 "t", 505 self.metadata, 506 Column("a", Integer, nullable=True), 507 Column("b", Integer, nullable=False), 508 ) 509 t.create() 510 eq_( 511 dict( 512 (col["name"], col["nullable"]) 513 for col in inspect(self.metadata.bind).get_columns("t") 514 ), 515 {"a": True, "b": False}, 516 ) 517 518 @testing.requires.table_reflection 519 @testing.requires.schemas 520 def test_get_columns_with_schema(self): 521 self._test_get_columns(schema=testing.config.test_schema) 522 523 @testing.requires.temp_table_reflection 524 def test_get_temp_table_columns(self): 525 meta = MetaData(self.bind) 526 user_tmp = self.tables.user_tmp 527 insp = inspect(meta.bind) 528 cols = insp.get_columns("user_tmp") 529 self.assert_(len(cols) > 0, len(cols)) 530 531 for i, col in enumerate(user_tmp.columns): 532 eq_(col.name, cols[i]["name"]) 533 534 @testing.requires.temp_table_reflection 535 @testing.requires.view_column_reflection 536 @testing.requires.temporary_views 537 def test_get_temp_view_columns(self): 538 insp = inspect(self.bind) 539 cols = insp.get_columns("user_tmp_v") 540 eq_([col["name"] for col in cols], ["id", "name", "foo"]) 541 542 @testing.requires.view_column_reflection 543 def test_get_view_columns(self): 544 self._test_get_columns(table_type="view") 545 546 @testing.requires.view_column_reflection 547 @testing.requires.schemas 548 def test_get_view_columns_with_schema(self): 549 self._test_get_columns( 550 schema=testing.config.test_schema, table_type="view" 551 ) 552 553 @testing.provide_metadata 554 def _test_get_pk_constraint(self, schema=None): 555 meta = self.metadata 556 users, addresses = self.tables.users, self.tables.email_addresses 557 insp = inspect(meta.bind) 558 559 users_cons = insp.get_pk_constraint(users.name, schema=schema) 560 users_pkeys = users_cons["constrained_columns"] 561 eq_(users_pkeys, ["user_id"]) 562 563 addr_cons = insp.get_pk_constraint(addresses.name, schema=schema) 564 addr_pkeys = addr_cons["constrained_columns"] 565 eq_(addr_pkeys, ["address_id"]) 566 567 with testing.requires.reflects_pk_names.fail_if(): 568 eq_(addr_cons["name"], "email_ad_pk") 569 570 @testing.requires.primary_key_constraint_reflection 571 def test_get_pk_constraint(self): 572 self._test_get_pk_constraint() 573 574 @testing.requires.table_reflection 575 @testing.requires.primary_key_constraint_reflection 576 @testing.requires.schemas 577 def test_get_pk_constraint_with_schema(self): 578 self._test_get_pk_constraint(schema=testing.config.test_schema) 579 580 @testing.requires.table_reflection 581 @testing.provide_metadata 582 def test_deprecated_get_primary_keys(self): 583 meta = self.metadata 584 users = self.tables.users 585 insp = Inspector(meta.bind) 586 assert_raises_message( 587 sa_exc.SADeprecationWarning, 588 r".*get_primary_keys\(\) method is deprecated", 589 insp.get_primary_keys, 590 users.name, 591 ) 592 593 @testing.provide_metadata 594 def _test_get_foreign_keys(self, schema=None): 595 meta = self.metadata 596 users, addresses, dingalings = ( 597 self.tables.users, 598 self.tables.email_addresses, 599 self.tables.dingalings, 600 ) 601 insp = inspect(meta.bind) 602 expected_schema = schema 603 # users 604 605 if testing.requires.self_referential_foreign_keys.enabled: 606 users_fkeys = insp.get_foreign_keys(users.name, schema=schema) 607 fkey1 = users_fkeys[0] 608 609 with testing.requires.named_constraints.fail_if(): 610 eq_(fkey1["name"], "user_id_fk") 611 612 eq_(fkey1["referred_schema"], expected_schema) 613 eq_(fkey1["referred_table"], users.name) 614 eq_(fkey1["referred_columns"], ["user_id"]) 615 if testing.requires.self_referential_foreign_keys.enabled: 616 eq_(fkey1["constrained_columns"], ["parent_user_id"]) 617 618 # addresses 619 addr_fkeys = insp.get_foreign_keys(addresses.name, schema=schema) 620 fkey1 = addr_fkeys[0] 621 622 with testing.requires.implicitly_named_constraints.fail_if(): 623 self.assert_(fkey1["name"] is not None) 624 625 eq_(fkey1["referred_schema"], expected_schema) 626 eq_(fkey1["referred_table"], users.name) 627 eq_(fkey1["referred_columns"], ["user_id"]) 628 eq_(fkey1["constrained_columns"], ["remote_user_id"]) 629 630 @testing.requires.foreign_key_constraint_reflection 631 def test_get_foreign_keys(self): 632 self._test_get_foreign_keys() 633 634 @testing.requires.foreign_key_constraint_reflection 635 @testing.requires.schemas 636 def test_get_foreign_keys_with_schema(self): 637 self._test_get_foreign_keys(schema=testing.config.test_schema) 638 639 @testing.requires.cross_schema_fk_reflection 640 @testing.requires.schemas 641 def test_get_inter_schema_foreign_keys(self): 642 local_table, remote_table, remote_table_2 = self.tables( 643 "%s.local_table" % testing.db.dialect.default_schema_name, 644 "%s.remote_table" % testing.config.test_schema, 645 "%s.remote_table_2" % testing.config.test_schema, 646 ) 647 648 insp = inspect(config.db) 649 650 local_fkeys = insp.get_foreign_keys(local_table.name) 651 eq_(len(local_fkeys), 1) 652 653 fkey1 = local_fkeys[0] 654 eq_(fkey1["referred_schema"], testing.config.test_schema) 655 eq_(fkey1["referred_table"], remote_table_2.name) 656 eq_(fkey1["referred_columns"], ["id"]) 657 eq_(fkey1["constrained_columns"], ["remote_id"]) 658 659 remote_fkeys = insp.get_foreign_keys( 660 remote_table.name, schema=testing.config.test_schema 661 ) 662 eq_(len(remote_fkeys), 1) 663 664 fkey2 = remote_fkeys[0] 665 666 assert fkey2["referred_schema"] in ( 667 None, 668 testing.db.dialect.default_schema_name, 669 ) 670 eq_(fkey2["referred_table"], local_table.name) 671 eq_(fkey2["referred_columns"], ["id"]) 672 eq_(fkey2["constrained_columns"], ["local_id"]) 673 674 @testing.requires.foreign_key_constraint_option_reflection_ondelete 675 def test_get_foreign_key_options_ondelete(self): 676 self._test_get_foreign_key_options(ondelete="CASCADE") 677 678 @testing.requires.foreign_key_constraint_option_reflection_onupdate 679 def test_get_foreign_key_options_onupdate(self): 680 self._test_get_foreign_key_options(onupdate="SET NULL") 681 682 @testing.provide_metadata 683 def _test_get_foreign_key_options(self, **options): 684 meta = self.metadata 685 686 Table( 687 "x", 688 meta, 689 Column("id", Integer, primary_key=True), 690 test_needs_fk=True, 691 ) 692 693 Table( 694 "table", 695 meta, 696 Column("id", Integer, primary_key=True), 697 Column("x_id", Integer, sa.ForeignKey("x.id", name="xid")), 698 Column("test", String(10)), 699 test_needs_fk=True, 700 ) 701 702 Table( 703 "user", 704 meta, 705 Column("id", Integer, primary_key=True), 706 Column("name", String(50), nullable=False), 707 Column("tid", Integer), 708 sa.ForeignKeyConstraint( 709 ["tid"], ["table.id"], name="myfk", **options 710 ), 711 test_needs_fk=True, 712 ) 713 714 meta.create_all() 715 716 insp = inspect(meta.bind) 717 718 # test 'options' is always present for a backend 719 # that can reflect these, since alembic looks for this 720 opts = insp.get_foreign_keys("table")[0]["options"] 721 722 eq_(dict((k, opts[k]) for k in opts if opts[k]), {}) 723 724 opts = insp.get_foreign_keys("user")[0]["options"] 725 eq_(dict((k, opts[k]) for k in opts if opts[k]), options) 726 727 def _assert_insp_indexes(self, indexes, expected_indexes): 728 index_names = [d["name"] for d in indexes] 729 for e_index in expected_indexes: 730 assert e_index["name"] in index_names 731 index = indexes[index_names.index(e_index["name"])] 732 for key in e_index: 733 eq_(e_index[key], index[key]) 734 735 @testing.provide_metadata 736 def _test_get_indexes(self, schema=None): 737 meta = self.metadata 738 users, addresses, dingalings = ( 739 self.tables.users, 740 self.tables.email_addresses, 741 self.tables.dingalings, 742 ) 743 # The database may decide to create indexes for foreign keys, etc. 744 # so there may be more indexes than expected. 745 insp = inspect(meta.bind) 746 indexes = insp.get_indexes("users", schema=schema) 747 expected_indexes = [ 748 { 749 "unique": False, 750 "column_names": ["test1", "test2"], 751 "name": "users_t_idx", 752 }, 753 { 754 "unique": False, 755 "column_names": ["user_id", "test2", "test1"], 756 "name": "users_all_idx", 757 }, 758 ] 759 self._assert_insp_indexes(indexes, expected_indexes) 760 761 @testing.requires.index_reflection 762 def test_get_indexes(self): 763 self._test_get_indexes() 764 765 @testing.requires.index_reflection 766 @testing.requires.schemas 767 def test_get_indexes_with_schema(self): 768 self._test_get_indexes(schema=testing.config.test_schema) 769 770 @testing.provide_metadata 771 def _test_get_noncol_index(self, tname, ixname): 772 meta = self.metadata 773 insp = inspect(meta.bind) 774 indexes = insp.get_indexes(tname) 775 776 # reflecting an index that has "x DESC" in it as the column. 777 # the DB may or may not give us "x", but make sure we get the index 778 # back, it has a name, it's connected to the table. 779 expected_indexes = [{"unique": False, "name": ixname}] 780 self._assert_insp_indexes(indexes, expected_indexes) 781 782 t = Table(tname, meta, autoload_with=meta.bind) 783 eq_(len(t.indexes), 1) 784 is_(list(t.indexes)[0].table, t) 785 eq_(list(t.indexes)[0].name, ixname) 786 787 @testing.requires.index_reflection 788 def test_get_noncol_index_no_pk(self): 789 self._test_get_noncol_index("noncol_idx_test_nopk", "noncol_idx_nopk") 790 791 @testing.requires.index_reflection 792 def test_get_noncol_index_pk(self): 793 self._test_get_noncol_index("noncol_idx_test_pk", "noncol_idx_pk") 794 795 @testing.requires.indexes_with_expressions 796 @testing.provide_metadata 797 def test_reflect_expression_based_indexes(self): 798 Table( 799 "t", 800 self.metadata, 801 Column("x", String(30)), 802 Column("y", String(30)), 803 ) 804 event.listen( 805 self.metadata, 806 "after_create", 807 DDL("CREATE INDEX t_idx ON t(lower(x), lower(y))"), 808 ) 809 event.listen( 810 self.metadata, "after_create", DDL("CREATE INDEX t_idx_2 ON t(x)") 811 ) 812 self.metadata.create_all() 813 814 insp = inspect(self.metadata.bind) 815 816 with expect_warnings( 817 "Skipped unsupported reflection of expression-based index t_idx" 818 ): 819 eq_( 820 insp.get_indexes("t"), 821 [{"name": "t_idx_2", "column_names": ["x"], "unique": 0}], 822 ) 823 824 @testing.requires.unique_constraint_reflection 825 def test_get_unique_constraints(self): 826 self._test_get_unique_constraints() 827 828 @testing.requires.temp_table_reflection 829 @testing.requires.unique_constraint_reflection 830 def test_get_temp_table_unique_constraints(self): 831 insp = inspect(self.bind) 832 reflected = insp.get_unique_constraints("user_tmp") 833 for refl in reflected: 834 # Different dialects handle duplicate index and constraints 835 # differently, so ignore this flag 836 refl.pop("duplicates_index", None) 837 eq_(reflected, [{"column_names": ["name"], "name": "user_tmp_uq"}]) 838 839 @testing.requires.temp_table_reflection 840 def test_get_temp_table_indexes(self): 841 insp = inspect(self.bind) 842 indexes = insp.get_indexes("user_tmp") 843 for ind in indexes: 844 ind.pop("dialect_options", None) 845 eq_( 846 # TODO: we need to add better filtering for indexes/uq constraints 847 # that are doubled up 848 [idx for idx in indexes if idx["name"] == "user_tmp_ix"], 849 [ 850 { 851 "unique": False, 852 "column_names": ["foo"], 853 "name": "user_tmp_ix", 854 } 855 ], 856 ) 857 858 @testing.requires.unique_constraint_reflection 859 @testing.requires.schemas 860 def test_get_unique_constraints_with_schema(self): 861 self._test_get_unique_constraints(schema=testing.config.test_schema) 862 863 @testing.provide_metadata 864 def _test_get_unique_constraints(self, schema=None): 865 # SQLite dialect needs to parse the names of the constraints 866 # separately from what it gets from PRAGMA index_list(), and 867 # then matches them up. so same set of column_names in two 868 # constraints will confuse it. Perhaps we should no longer 869 # bother with index_list() here since we have the whole 870 # CREATE TABLE? 871 uniques = sorted( 872 [ 873 {"name": "unique_a", "column_names": ["a"]}, 874 {"name": "unique_a_b_c", "column_names": ["a", "b", "c"]}, 875 {"name": "unique_c_a_b", "column_names": ["c", "a", "b"]}, 876 {"name": "unique_asc_key", "column_names": ["asc", "key"]}, 877 {"name": "i.have.dots", "column_names": ["b"]}, 878 {"name": "i have spaces", "column_names": ["c"]}, 879 ], 880 key=operator.itemgetter("name"), 881 ) 882 orig_meta = self.metadata 883 table = Table( 884 "testtbl", 885 orig_meta, 886 Column("a", sa.String(20)), 887 Column("b", sa.String(30)), 888 Column("c", sa.Integer), 889 # reserved identifiers 890 Column("asc", sa.String(30)), 891 Column("key", sa.String(30)), 892 schema=schema, 893 ) 894 for uc in uniques: 895 table.append_constraint( 896 sa.UniqueConstraint(*uc["column_names"], name=uc["name"]) 897 ) 898 orig_meta.create_all() 899 900 inspector = inspect(orig_meta.bind) 901 reflected = sorted( 902 inspector.get_unique_constraints("testtbl", schema=schema), 903 key=operator.itemgetter("name"), 904 ) 905 906 names_that_duplicate_index = set() 907 908 for orig, refl in zip(uniques, reflected): 909 # Different dialects handle duplicate index and constraints 910 # differently, so ignore this flag 911 dupe = refl.pop("duplicates_index", None) 912 if dupe: 913 names_that_duplicate_index.add(dupe) 914 eq_(orig, refl) 915 916 reflected_metadata = MetaData() 917 reflected = Table( 918 "testtbl", 919 reflected_metadata, 920 autoload_with=orig_meta.bind, 921 schema=schema, 922 ) 923 924 # test "deduplicates for index" logic. MySQL and Oracle 925 # "unique constraints" are actually unique indexes (with possible 926 # exception of a unique that is a dupe of another one in the case 927 # of Oracle). make sure # they aren't duplicated. 928 idx_names = set([idx.name for idx in reflected.indexes]) 929 uq_names = set( 930 [ 931 uq.name 932 for uq in reflected.constraints 933 if isinstance(uq, sa.UniqueConstraint) 934 ] 935 ).difference(["unique_c_a_b"]) 936 937 assert not idx_names.intersection(uq_names) 938 if names_that_duplicate_index: 939 eq_(names_that_duplicate_index, idx_names) 940 eq_(uq_names, set()) 941 942 @testing.requires.check_constraint_reflection 943 def test_get_check_constraints(self): 944 self._test_get_check_constraints() 945 946 @testing.requires.check_constraint_reflection 947 @testing.requires.schemas 948 def test_get_check_constraints_schema(self): 949 self._test_get_check_constraints(schema=testing.config.test_schema) 950 951 @testing.provide_metadata 952 def _test_get_check_constraints(self, schema=None): 953 orig_meta = self.metadata 954 Table( 955 "sa_cc", 956 orig_meta, 957 Column("a", Integer()), 958 sa.CheckConstraint("a > 1 AND a < 5", name="cc1"), 959 sa.CheckConstraint("a = 1 OR (a > 2 AND a < 5)", name="cc2"), 960 schema=schema, 961 ) 962 963 orig_meta.create_all() 964 965 inspector = inspect(orig_meta.bind) 966 reflected = sorted( 967 inspector.get_check_constraints("sa_cc", schema=schema), 968 key=operator.itemgetter("name"), 969 ) 970 971 # trying to minimize effect of quoting, parenthesis, etc. 972 # may need to add more to this as new dialects get CHECK 973 # constraint reflection support 974 def normalize(sqltext): 975 return " ".join( 976 re.findall(r"and|\d|=|a|or|<|>", sqltext.lower(), re.I) 977 ) 978 979 reflected = [ 980 {"name": item["name"], "sqltext": normalize(item["sqltext"])} 981 for item in reflected 982 ] 983 eq_( 984 reflected, 985 [ 986 {"name": "cc1", "sqltext": "a > 1 and a < 5"}, 987 {"name": "cc2", "sqltext": "a = 1 or a > 2 and a < 5"}, 988 ], 989 ) 990 991 @testing.provide_metadata 992 def _test_get_view_definition(self, schema=None): 993 meta = self.metadata 994 users, addresses, dingalings = ( 995 self.tables.users, 996 self.tables.email_addresses, 997 self.tables.dingalings, 998 ) 999 view_name1 = "users_v" 1000 view_name2 = "email_addresses_v" 1001 insp = inspect(meta.bind) 1002 v1 = insp.get_view_definition(view_name1, schema=schema) 1003 self.assert_(v1) 1004 v2 = insp.get_view_definition(view_name2, schema=schema) 1005 self.assert_(v2) 1006 1007 @testing.requires.view_reflection 1008 def test_get_view_definition(self): 1009 self._test_get_view_definition() 1010 1011 @testing.requires.view_reflection 1012 @testing.requires.schemas 1013 def test_get_view_definition_with_schema(self): 1014 self._test_get_view_definition(schema=testing.config.test_schema) 1015 1016 @testing.only_on("postgresql", "PG specific feature") 1017 @testing.provide_metadata 1018 def _test_get_table_oid(self, table_name, schema=None): 1019 meta = self.metadata 1020 users, addresses, dingalings = ( 1021 self.tables.users, 1022 self.tables.email_addresses, 1023 self.tables.dingalings, 1024 ) 1025 insp = inspect(meta.bind) 1026 oid = insp.get_table_oid(table_name, schema) 1027 self.assert_(isinstance(oid, int)) 1028 1029 def test_get_table_oid(self): 1030 self._test_get_table_oid("users") 1031 1032 @testing.requires.schemas 1033 def test_get_table_oid_with_schema(self): 1034 self._test_get_table_oid("users", schema=testing.config.test_schema) 1035 1036 @testing.requires.table_reflection 1037 @testing.provide_metadata 1038 def test_autoincrement_col(self): 1039 """test that 'autoincrement' is reflected according to sqla's policy. 1040 1041 Don't mark this test as unsupported for any backend ! 1042 1043 (technically it fails with MySQL InnoDB since "id" comes before "id2") 1044 1045 A backend is better off not returning "autoincrement" at all, 1046 instead of potentially returning "False" for an auto-incrementing 1047 primary key column. 1048 1049 """ 1050 1051 meta = self.metadata 1052 insp = inspect(meta.bind) 1053 1054 for tname, cname in [ 1055 ("users", "user_id"), 1056 ("email_addresses", "address_id"), 1057 ("dingalings", "dingaling_id"), 1058 ]: 1059 cols = insp.get_columns(tname) 1060 id_ = {c["name"]: c for c in cols}[cname] 1061 assert id_.get("autoincrement", True) 1062 1063 1064class NormalizedNameTest(fixtures.TablesTest): 1065 __requires__ = ("denormalized_names",) 1066 __backend__ = True 1067 1068 @classmethod 1069 def define_tables(cls, metadata): 1070 Table( 1071 quoted_name("t1", quote=True), 1072 metadata, 1073 Column("id", Integer, primary_key=True), 1074 ) 1075 Table( 1076 quoted_name("t2", quote=True), 1077 metadata, 1078 Column("id", Integer, primary_key=True), 1079 Column("t1id", ForeignKey("t1.id")), 1080 ) 1081 1082 def test_reflect_lowercase_forced_tables(self): 1083 1084 m2 = MetaData(testing.db) 1085 t2_ref = Table(quoted_name("t2", quote=True), m2, autoload=True) 1086 t1_ref = m2.tables["t1"] 1087 assert t2_ref.c.t1id.references(t1_ref.c.id) 1088 1089 m3 = MetaData(testing.db) 1090 m3.reflect(only=lambda name, m: name.lower() in ("t1", "t2")) 1091 assert m3.tables["t2"].c.t1id.references(m3.tables["t1"].c.id) 1092 1093 def test_get_table_names(self): 1094 tablenames = [ 1095 t 1096 for t in inspect(testing.db).get_table_names() 1097 if t.lower() in ("t1", "t2") 1098 ] 1099 1100 eq_(tablenames[0].upper(), tablenames[0].lower()) 1101 eq_(tablenames[1].upper(), tablenames[1].lower()) 1102 1103 1104__all__ = ("ComponentReflectionTest", "HasTableTest", "NormalizedNameTest") 1105