1import operator 2import re 3 4import sqlalchemy as sa 5from .. import assert_raises_message 6from .. import config 7from .. import engines 8from .. import eq_ 9from .. import expect_warnings 10from .. import fixtures 11from .. import is_ 12from ..provision import temp_table_keyword_args 13from ..schema import Column 14from ..schema import Table 15from ... import event 16from ... import exc as sa_exc 17from ... import ForeignKey 18from ... import inspect 19from ... import Integer 20from ... import MetaData 21from ... import String 22from ... import testing 23from ... import types as sql_types 24from ...engine.reflection import Inspector 25from ...schema import DDL 26from ...schema import Index 27from ...sql.elements import quoted_name 28from ...testing import is_false 29from ...testing import is_true 30 31 32metadata, users = None, None 33 34 35class HasTableTest(fixtures.TablesTest): 36 __backend__ = True 37 38 @classmethod 39 def define_tables(cls, metadata): 40 Table( 41 "test_table", 42 metadata, 43 Column("id", Integer, primary_key=True), 44 Column("data", String(50)), 45 ) 46 if testing.requires.schemas.enabled: 47 Table( 48 "test_table_s", 49 metadata, 50 Column("id", Integer, primary_key=True), 51 Column("data", String(50)), 52 schema=config.test_schema, 53 ) 54 55 def test_has_table(self): 56 with config.db.begin() as conn: 57 is_true(config.db.dialect.has_table(conn, "test_table")) 58 is_false(config.db.dialect.has_table(conn, "test_table_s")) 59 is_false(config.db.dialect.has_table(conn, "nonexistent_table")) 60 61 @testing.requires.schemas 62 def test_has_table_schema(self): 63 with config.db.begin() as conn: 64 is_false( 65 config.db.dialect.has_table( 66 conn, "test_table", schema=config.test_schema 67 ) 68 ) 69 is_true( 70 config.db.dialect.has_table( 71 conn, "test_table_s", schema=config.test_schema 72 ) 73 ) 74 is_false( 75 config.db.dialect.has_table( 76 conn, "nonexistent_table", schema=config.test_schema 77 ) 78 ) 79 80 81class QuotedNameArgumentTest(fixtures.TablesTest): 82 run_create_tables = "once" 83 __backend__ = True 84 85 @classmethod 86 def define_tables(cls, metadata): 87 Table( 88 "quote ' one", 89 metadata, 90 Column("id", Integer), 91 Column("name", String(50)), 92 Column("data", String(50)), 93 Column("related_id", Integer), 94 sa.PrimaryKeyConstraint("id", name="pk quote ' one"), 95 sa.Index("ix quote ' one", "name"), 96 sa.UniqueConstraint( 97 "data", 98 name="uq quote' one", 99 ), 100 sa.ForeignKeyConstraint( 101 ["id"], ["related.id"], name="fk quote ' one" 102 ), 103 sa.CheckConstraint("name != 'foo'", name="ck quote ' one"), 104 comment=r"""quote ' one comment""", 105 test_needs_fk=True, 106 ) 107 108 if testing.requires.symbol_names_w_double_quote.enabled: 109 Table( 110 'quote " two', 111 metadata, 112 Column("id", Integer), 113 Column("name", String(50)), 114 Column("data", String(50)), 115 Column("related_id", Integer), 116 sa.PrimaryKeyConstraint("id", name='pk quote " two'), 117 sa.Index('ix quote " two', "name"), 118 sa.UniqueConstraint( 119 "data", 120 name='uq quote" two', 121 ), 122 sa.ForeignKeyConstraint( 123 ["id"], ["related.id"], name='fk quote " two' 124 ), 125 sa.CheckConstraint("name != 'foo'", name='ck quote " two '), 126 comment=r"""quote " two comment""", 127 test_needs_fk=True, 128 ) 129 130 Table( 131 "related", 132 metadata, 133 Column("id", Integer, primary_key=True), 134 Column("related", Integer), 135 test_needs_fk=True, 136 ) 137 138 if testing.requires.view_column_reflection.enabled: 139 140 if testing.requires.symbol_names_w_double_quote.enabled: 141 names = [ 142 "quote ' one", 143 'quote " two', 144 ] 145 else: 146 names = [ 147 "quote ' one", 148 ] 149 for name in names: 150 query = "CREATE VIEW %s AS SELECT * FROM %s" % ( 151 testing.db.dialect.identifier_preparer.quote( 152 "view %s" % name 153 ), 154 testing.db.dialect.identifier_preparer.quote(name), 155 ) 156 157 event.listen(metadata, "after_create", DDL(query)) 158 event.listen( 159 metadata, 160 "before_drop", 161 DDL( 162 "DROP VIEW %s" 163 % testing.db.dialect.identifier_preparer.quote( 164 "view %s" % name 165 ) 166 ), 167 ) 168 169 def quote_fixtures(fn): 170 return testing.combinations( 171 ("quote ' one",), 172 ('quote " two', testing.requires.symbol_names_w_double_quote), 173 )(fn) 174 175 @quote_fixtures 176 def test_get_table_options(self, name): 177 insp = inspect(testing.db) 178 179 insp.get_table_options(name) 180 181 @quote_fixtures 182 @testing.requires.view_column_reflection 183 def test_get_view_definition(self, name): 184 insp = inspect(testing.db) 185 assert insp.get_view_definition("view %s" % name) 186 187 @quote_fixtures 188 def test_get_columns(self, name): 189 insp = inspect(testing.db) 190 assert insp.get_columns(name) 191 192 @quote_fixtures 193 def test_get_pk_constraint(self, name): 194 insp = inspect(testing.db) 195 assert insp.get_pk_constraint(name) 196 197 @quote_fixtures 198 def test_get_foreign_keys(self, name): 199 insp = inspect(testing.db) 200 assert insp.get_foreign_keys(name) 201 202 @quote_fixtures 203 def test_get_indexes(self, name): 204 insp = inspect(testing.db) 205 assert insp.get_indexes(name) 206 207 @quote_fixtures 208 @testing.requires.unique_constraint_reflection 209 def test_get_unique_constraints(self, name): 210 insp = inspect(testing.db) 211 assert insp.get_unique_constraints(name) 212 213 @quote_fixtures 214 @testing.requires.comment_reflection 215 def test_get_table_comment(self, name): 216 insp = inspect(testing.db) 217 assert insp.get_table_comment(name) 218 219 @quote_fixtures 220 @testing.requires.check_constraint_reflection 221 def test_get_check_constraints(self, name): 222 insp = inspect(testing.db) 223 assert insp.get_check_constraints(name) 224 225 226class ComponentReflectionTest(fixtures.TablesTest): 227 run_inserts = run_deletes = None 228 229 __backend__ = True 230 231 @classmethod 232 def setup_bind(cls): 233 if config.requirements.independent_connections.enabled: 234 from sqlalchemy import pool 235 236 return engines.testing_engine( 237 options=dict(poolclass=pool.StaticPool) 238 ) 239 else: 240 return config.db 241 242 @classmethod 243 def define_tables(cls, metadata): 244 cls.define_reflected_tables(metadata, None) 245 if testing.requires.schemas.enabled: 246 cls.define_reflected_tables(metadata, testing.config.test_schema) 247 248 @classmethod 249 def define_reflected_tables(cls, metadata, schema): 250 if schema: 251 schema_prefix = schema + "." 252 else: 253 schema_prefix = "" 254 255 if testing.requires.self_referential_foreign_keys.enabled: 256 users = Table( 257 "users", 258 metadata, 259 Column("user_id", sa.INT, primary_key=True), 260 Column("test1", sa.CHAR(5), nullable=False), 261 Column("test2", sa.Float(5), nullable=False), 262 Column( 263 "parent_user_id", 264 sa.Integer, 265 sa.ForeignKey( 266 "%susers.user_id" % schema_prefix, name="user_id_fk" 267 ), 268 ), 269 schema=schema, 270 test_needs_fk=True, 271 ) 272 else: 273 users = Table( 274 "users", 275 metadata, 276 Column("user_id", sa.INT, primary_key=True), 277 Column("test1", sa.CHAR(5), nullable=False), 278 Column("test2", sa.Float(5), nullable=False), 279 schema=schema, 280 test_needs_fk=True, 281 ) 282 283 Table( 284 "dingalings", 285 metadata, 286 Column("dingaling_id", sa.Integer, primary_key=True), 287 Column( 288 "address_id", 289 sa.Integer, 290 sa.ForeignKey("%semail_addresses.address_id" % schema_prefix), 291 ), 292 Column("data", sa.String(30)), 293 schema=schema, 294 test_needs_fk=True, 295 ) 296 Table( 297 "email_addresses", 298 metadata, 299 Column("address_id", sa.Integer), 300 Column( 301 "remote_user_id", sa.Integer, sa.ForeignKey(users.c.user_id) 302 ), 303 Column("email_address", sa.String(20)), 304 sa.PrimaryKeyConstraint("address_id", name="email_ad_pk"), 305 schema=schema, 306 test_needs_fk=True, 307 ) 308 Table( 309 "comment_test", 310 metadata, 311 Column("id", sa.Integer, primary_key=True, comment="id comment"), 312 Column("data", sa.String(20), comment="data % comment"), 313 Column( 314 "d2", 315 sa.String(20), 316 comment=r"""Comment types type speedily ' " \ '' Fun!""", 317 ), 318 schema=schema, 319 comment=r"""the test % ' " \ table comment""", 320 ) 321 322 if testing.requires.cross_schema_fk_reflection.enabled: 323 if schema is None: 324 Table( 325 "local_table", 326 metadata, 327 Column("id", sa.Integer, primary_key=True), 328 Column("data", sa.String(20)), 329 Column( 330 "remote_id", 331 ForeignKey( 332 "%s.remote_table_2.id" % testing.config.test_schema 333 ), 334 ), 335 test_needs_fk=True, 336 schema=config.db.dialect.default_schema_name, 337 ) 338 else: 339 Table( 340 "remote_table", 341 metadata, 342 Column("id", sa.Integer, primary_key=True), 343 Column( 344 "local_id", 345 ForeignKey( 346 "%s.local_table.id" 347 % config.db.dialect.default_schema_name 348 ), 349 ), 350 Column("data", sa.String(20)), 351 schema=schema, 352 test_needs_fk=True, 353 ) 354 Table( 355 "remote_table_2", 356 metadata, 357 Column("id", sa.Integer, primary_key=True), 358 Column("data", sa.String(20)), 359 schema=schema, 360 test_needs_fk=True, 361 ) 362 363 if testing.requires.index_reflection.enabled: 364 cls.define_index(metadata, users) 365 366 if not schema: 367 # test_needs_fk is at the moment to force MySQL InnoDB 368 noncol_idx_test_nopk = Table( 369 "noncol_idx_test_nopk", 370 metadata, 371 Column("q", sa.String(5)), 372 test_needs_fk=True, 373 ) 374 375 noncol_idx_test_pk = Table( 376 "noncol_idx_test_pk", 377 metadata, 378 Column("id", sa.Integer, primary_key=True), 379 Column("q", sa.String(5)), 380 test_needs_fk=True, 381 ) 382 383 if testing.requires.indexes_with_ascdesc.enabled: 384 Index("noncol_idx_nopk", noncol_idx_test_nopk.c.q.desc()) 385 Index("noncol_idx_pk", noncol_idx_test_pk.c.q.desc()) 386 387 if testing.requires.view_column_reflection.enabled: 388 cls.define_views(metadata, schema) 389 if not schema and testing.requires.temp_table_reflection.enabled: 390 cls.define_temp_tables(metadata) 391 392 @classmethod 393 def define_temp_tables(cls, metadata): 394 kw = temp_table_keyword_args(config, config.db) 395 user_tmp = Table( 396 "user_tmp", 397 metadata, 398 Column("id", sa.INT, primary_key=True), 399 Column("name", sa.VARCHAR(50)), 400 Column("foo", sa.INT), 401 sa.UniqueConstraint("name", name="user_tmp_uq"), 402 sa.Index("user_tmp_ix", "foo"), 403 **kw 404 ) 405 if ( 406 testing.requires.view_reflection.enabled 407 and testing.requires.temporary_views.enabled 408 ): 409 event.listen( 410 user_tmp, 411 "after_create", 412 DDL( 413 "create temporary view user_tmp_v as " 414 "select * from user_tmp" 415 ), 416 ) 417 event.listen(user_tmp, "before_drop", DDL("drop view user_tmp_v")) 418 419 @classmethod 420 def define_index(cls, metadata, users): 421 Index("users_t_idx", users.c.test1, users.c.test2) 422 Index("users_all_idx", users.c.user_id, users.c.test2, users.c.test1) 423 424 @classmethod 425 def define_views(cls, metadata, schema): 426 for table_name in ("users", "email_addresses"): 427 fullname = table_name 428 if schema: 429 fullname = "%s.%s" % (schema, table_name) 430 view_name = fullname + "_v" 431 query = "CREATE VIEW %s AS SELECT * FROM %s" % ( 432 view_name, 433 fullname, 434 ) 435 436 event.listen(metadata, "after_create", DDL(query)) 437 event.listen( 438 metadata, "before_drop", DDL("DROP VIEW %s" % view_name) 439 ) 440 441 @testing.requires.schema_reflection 442 def test_get_schema_names(self): 443 insp = inspect(testing.db) 444 445 self.assert_(testing.config.test_schema in insp.get_schema_names()) 446 447 @testing.requires.schema_reflection 448 def test_dialect_initialize(self): 449 engine = engines.testing_engine() 450 assert not hasattr(engine.dialect, "default_schema_name") 451 inspect(engine) 452 assert hasattr(engine.dialect, "default_schema_name") 453 454 @testing.requires.schema_reflection 455 def test_get_default_schema_name(self): 456 insp = inspect(testing.db) 457 eq_(insp.default_schema_name, testing.db.dialect.default_schema_name) 458 459 @testing.provide_metadata 460 def _test_get_table_names( 461 self, schema=None, table_type="table", order_by=None 462 ): 463 _ignore_tables = [ 464 "comment_test", 465 "noncol_idx_test_pk", 466 "noncol_idx_test_nopk", 467 "local_table", 468 "remote_table", 469 "remote_table_2", 470 ] 471 meta = self.metadata 472 473 insp = inspect(meta.bind) 474 475 if table_type == "view": 476 table_names = insp.get_view_names(schema) 477 table_names.sort() 478 answer = ["email_addresses_v", "users_v"] 479 eq_(sorted(table_names), answer) 480 else: 481 if order_by: 482 tables = [ 483 rec[0] 484 for rec in insp.get_sorted_table_and_fkc_names(schema) 485 if rec[0] 486 ] 487 else: 488 tables = insp.get_table_names(schema) 489 table_names = [t for t in tables if t not in _ignore_tables] 490 491 if order_by == "foreign_key": 492 answer = ["users", "email_addresses", "dingalings"] 493 eq_(table_names, answer) 494 else: 495 answer = ["dingalings", "email_addresses", "users"] 496 eq_(sorted(table_names), answer) 497 498 @testing.requires.temp_table_names 499 def test_get_temp_table_names(self): 500 insp = inspect(self.bind) 501 temp_table_names = insp.get_temp_table_names() 502 eq_(sorted(temp_table_names), ["user_tmp"]) 503 504 @testing.requires.view_reflection 505 @testing.requires.temp_table_names 506 @testing.requires.temporary_views 507 def test_get_temp_view_names(self): 508 insp = inspect(self.bind) 509 temp_table_names = insp.get_temp_view_names() 510 eq_(sorted(temp_table_names), ["user_tmp_v"]) 511 512 @testing.requires.table_reflection 513 def test_get_table_names(self): 514 self._test_get_table_names() 515 516 @testing.requires.table_reflection 517 @testing.requires.foreign_key_constraint_reflection 518 def test_get_table_names_fks(self): 519 self._test_get_table_names(order_by="foreign_key") 520 521 @testing.requires.comment_reflection 522 def test_get_comments(self): 523 self._test_get_comments() 524 525 @testing.requires.comment_reflection 526 @testing.requires.schemas 527 def test_get_comments_with_schema(self): 528 self._test_get_comments(testing.config.test_schema) 529 530 def _test_get_comments(self, schema=None): 531 insp = inspect(testing.db) 532 533 eq_( 534 insp.get_table_comment("comment_test", schema=schema), 535 {"text": r"""the test % ' " \ table comment"""}, 536 ) 537 538 eq_(insp.get_table_comment("users", schema=schema), {"text": None}) 539 540 eq_( 541 [ 542 {"name": rec["name"], "comment": rec["comment"]} 543 for rec in insp.get_columns("comment_test", schema=schema) 544 ], 545 [ 546 {"comment": "id comment", "name": "id"}, 547 {"comment": "data % comment", "name": "data"}, 548 { 549 "comment": ( 550 r"""Comment types type speedily ' " \ '' Fun!""" 551 ), 552 "name": "d2", 553 }, 554 ], 555 ) 556 557 @testing.requires.table_reflection 558 @testing.requires.schemas 559 def test_get_table_names_with_schema(self): 560 self._test_get_table_names(testing.config.test_schema) 561 562 @testing.requires.view_column_reflection 563 def test_get_view_names(self): 564 self._test_get_table_names(table_type="view") 565 566 @testing.requires.view_column_reflection 567 @testing.requires.schemas 568 def test_get_view_names_with_schema(self): 569 self._test_get_table_names( 570 testing.config.test_schema, table_type="view" 571 ) 572 573 @testing.requires.table_reflection 574 @testing.requires.view_column_reflection 575 def test_get_tables_and_views(self): 576 self._test_get_table_names() 577 self._test_get_table_names(table_type="view") 578 579 def _test_get_columns(self, schema=None, table_type="table"): 580 meta = MetaData(testing.db) 581 users, addresses = (self.tables.users, self.tables.email_addresses) 582 table_names = ["users", "email_addresses"] 583 if table_type == "view": 584 table_names = ["users_v", "email_addresses_v"] 585 insp = inspect(meta.bind) 586 for table_name, table in zip(table_names, (users, addresses)): 587 schema_name = schema 588 cols = insp.get_columns(table_name, schema=schema_name) 589 self.assert_(len(cols) > 0, len(cols)) 590 591 # should be in order 592 593 for i, col in enumerate(table.columns): 594 eq_(col.name, cols[i]["name"]) 595 ctype = cols[i]["type"].__class__ 596 ctype_def = col.type 597 if isinstance(ctype_def, sa.types.TypeEngine): 598 ctype_def = ctype_def.__class__ 599 600 # Oracle returns Date for DateTime. 601 602 if testing.against("oracle") and ctype_def in ( 603 sql_types.Date, 604 sql_types.DateTime, 605 ): 606 ctype_def = sql_types.Date 607 608 # assert that the desired type and return type share 609 # a base within one of the generic types. 610 611 self.assert_( 612 len( 613 set(ctype.__mro__) 614 .intersection(ctype_def.__mro__) 615 .intersection( 616 [ 617 sql_types.Integer, 618 sql_types.Numeric, 619 sql_types.DateTime, 620 sql_types.Date, 621 sql_types.Time, 622 sql_types.String, 623 sql_types._Binary, 624 ] 625 ) 626 ) 627 > 0, 628 "%s(%s), %s(%s)" 629 % (col.name, col.type, cols[i]["name"], ctype), 630 ) 631 632 if not col.primary_key: 633 assert cols[i]["default"] is None 634 635 @testing.requires.table_reflection 636 def test_get_columns(self): 637 self._test_get_columns() 638 639 @testing.provide_metadata 640 def _type_round_trip(self, *types): 641 t = Table( 642 "t", 643 self.metadata, 644 *[Column("t%d" % i, type_) for i, type_ in enumerate(types)] 645 ) 646 t.create() 647 648 return [ 649 c["type"] for c in inspect(self.metadata.bind).get_columns("t") 650 ] 651 652 @testing.requires.table_reflection 653 def test_numeric_reflection(self): 654 for typ in self._type_round_trip(sql_types.Numeric(18, 5)): 655 assert isinstance(typ, sql_types.Numeric) 656 eq_(typ.precision, 18) 657 eq_(typ.scale, 5) 658 659 @testing.requires.table_reflection 660 def test_varchar_reflection(self): 661 typ = self._type_round_trip(sql_types.String(52))[0] 662 assert isinstance(typ, sql_types.String) 663 eq_(typ.length, 52) 664 665 @testing.requires.table_reflection 666 @testing.provide_metadata 667 def test_nullable_reflection(self): 668 t = Table( 669 "t", 670 self.metadata, 671 Column("a", Integer, nullable=True), 672 Column("b", Integer, nullable=False), 673 ) 674 t.create() 675 eq_( 676 dict( 677 (col["name"], col["nullable"]) 678 for col in inspect(self.metadata.bind).get_columns("t") 679 ), 680 {"a": True, "b": False}, 681 ) 682 683 @testing.requires.table_reflection 684 @testing.requires.schemas 685 def test_get_columns_with_schema(self): 686 self._test_get_columns(schema=testing.config.test_schema) 687 688 @testing.requires.temp_table_reflection 689 def test_get_temp_table_columns(self): 690 meta = MetaData(self.bind) 691 user_tmp = self.tables.user_tmp 692 insp = inspect(meta.bind) 693 cols = insp.get_columns("user_tmp") 694 self.assert_(len(cols) > 0, len(cols)) 695 696 for i, col in enumerate(user_tmp.columns): 697 eq_(col.name, cols[i]["name"]) 698 699 @testing.requires.temp_table_reflection 700 @testing.requires.view_column_reflection 701 @testing.requires.temporary_views 702 def test_get_temp_view_columns(self): 703 insp = inspect(self.bind) 704 cols = insp.get_columns("user_tmp_v") 705 eq_([col["name"] for col in cols], ["id", "name", "foo"]) 706 707 @testing.requires.view_column_reflection 708 def test_get_view_columns(self): 709 self._test_get_columns(table_type="view") 710 711 @testing.requires.view_column_reflection 712 @testing.requires.schemas 713 def test_get_view_columns_with_schema(self): 714 self._test_get_columns( 715 schema=testing.config.test_schema, table_type="view" 716 ) 717 718 @testing.provide_metadata 719 def _test_get_pk_constraint(self, schema=None): 720 meta = self.metadata 721 users, addresses = self.tables.users, self.tables.email_addresses 722 insp = inspect(meta.bind) 723 724 users_cons = insp.get_pk_constraint(users.name, schema=schema) 725 users_pkeys = users_cons["constrained_columns"] 726 eq_(users_pkeys, ["user_id"]) 727 728 addr_cons = insp.get_pk_constraint(addresses.name, schema=schema) 729 addr_pkeys = addr_cons["constrained_columns"] 730 eq_(addr_pkeys, ["address_id"]) 731 732 with testing.requires.reflects_pk_names.fail_if(): 733 eq_(addr_cons["name"], "email_ad_pk") 734 735 @testing.requires.primary_key_constraint_reflection 736 def test_get_pk_constraint(self): 737 self._test_get_pk_constraint() 738 739 @testing.requires.table_reflection 740 @testing.requires.primary_key_constraint_reflection 741 @testing.requires.schemas 742 def test_get_pk_constraint_with_schema(self): 743 self._test_get_pk_constraint(schema=testing.config.test_schema) 744 745 @testing.requires.table_reflection 746 @testing.provide_metadata 747 def test_deprecated_get_primary_keys(self): 748 meta = self.metadata 749 users = self.tables.users 750 insp = Inspector(meta.bind) 751 assert_raises_message( 752 sa_exc.SADeprecationWarning, 753 r".*get_primary_keys\(\) method is deprecated", 754 insp.get_primary_keys, 755 users.name, 756 ) 757 758 @testing.provide_metadata 759 def _test_get_foreign_keys(self, schema=None): 760 meta = self.metadata 761 users, addresses = (self.tables.users, self.tables.email_addresses) 762 insp = inspect(meta.bind) 763 expected_schema = schema 764 # users 765 766 if testing.requires.self_referential_foreign_keys.enabled: 767 users_fkeys = insp.get_foreign_keys(users.name, schema=schema) 768 fkey1 = users_fkeys[0] 769 770 with testing.requires.named_constraints.fail_if(): 771 eq_(fkey1["name"], "user_id_fk") 772 773 eq_(fkey1["referred_schema"], expected_schema) 774 eq_(fkey1["referred_table"], users.name) 775 eq_(fkey1["referred_columns"], ["user_id"]) 776 if testing.requires.self_referential_foreign_keys.enabled: 777 eq_(fkey1["constrained_columns"], ["parent_user_id"]) 778 779 # addresses 780 addr_fkeys = insp.get_foreign_keys(addresses.name, schema=schema) 781 fkey1 = addr_fkeys[0] 782 783 with testing.requires.implicitly_named_constraints.fail_if(): 784 self.assert_(fkey1["name"] is not None) 785 786 eq_(fkey1["referred_schema"], expected_schema) 787 eq_(fkey1["referred_table"], users.name) 788 eq_(fkey1["referred_columns"], ["user_id"]) 789 eq_(fkey1["constrained_columns"], ["remote_user_id"]) 790 791 @testing.requires.foreign_key_constraint_reflection 792 def test_get_foreign_keys(self): 793 self._test_get_foreign_keys() 794 795 @testing.requires.foreign_key_constraint_reflection 796 @testing.requires.schemas 797 def test_get_foreign_keys_with_schema(self): 798 self._test_get_foreign_keys(schema=testing.config.test_schema) 799 800 @testing.requires.cross_schema_fk_reflection 801 @testing.requires.schemas 802 def test_get_inter_schema_foreign_keys(self): 803 local_table, remote_table, remote_table_2 = self.tables( 804 "%s.local_table" % testing.db.dialect.default_schema_name, 805 "%s.remote_table" % testing.config.test_schema, 806 "%s.remote_table_2" % testing.config.test_schema, 807 ) 808 809 insp = inspect(config.db) 810 811 local_fkeys = insp.get_foreign_keys(local_table.name) 812 eq_(len(local_fkeys), 1) 813 814 fkey1 = local_fkeys[0] 815 eq_(fkey1["referred_schema"], testing.config.test_schema) 816 eq_(fkey1["referred_table"], remote_table_2.name) 817 eq_(fkey1["referred_columns"], ["id"]) 818 eq_(fkey1["constrained_columns"], ["remote_id"]) 819 820 remote_fkeys = insp.get_foreign_keys( 821 remote_table.name, schema=testing.config.test_schema 822 ) 823 eq_(len(remote_fkeys), 1) 824 825 fkey2 = remote_fkeys[0] 826 827 assert fkey2["referred_schema"] in ( 828 None, 829 testing.db.dialect.default_schema_name, 830 ) 831 eq_(fkey2["referred_table"], local_table.name) 832 eq_(fkey2["referred_columns"], ["id"]) 833 eq_(fkey2["constrained_columns"], ["local_id"]) 834 835 @testing.requires.foreign_key_constraint_option_reflection_ondelete 836 def test_get_foreign_key_options_ondelete(self): 837 self._test_get_foreign_key_options(ondelete="CASCADE") 838 839 @testing.requires.foreign_key_constraint_option_reflection_onupdate 840 def test_get_foreign_key_options_onupdate(self): 841 self._test_get_foreign_key_options(onupdate="SET NULL") 842 843 @testing.provide_metadata 844 def _test_get_foreign_key_options(self, **options): 845 meta = self.metadata 846 847 Table( 848 "x", 849 meta, 850 Column("id", Integer, primary_key=True), 851 test_needs_fk=True, 852 ) 853 854 Table( 855 "table", 856 meta, 857 Column("id", Integer, primary_key=True), 858 Column("x_id", Integer, sa.ForeignKey("x.id", name="xid")), 859 Column("test", String(10)), 860 test_needs_fk=True, 861 ) 862 863 Table( 864 "user", 865 meta, 866 Column("id", Integer, primary_key=True), 867 Column("name", String(50), nullable=False), 868 Column("tid", Integer), 869 sa.ForeignKeyConstraint( 870 ["tid"], ["table.id"], name="myfk", **options 871 ), 872 test_needs_fk=True, 873 ) 874 875 meta.create_all() 876 877 insp = inspect(meta.bind) 878 879 # test 'options' is always present for a backend 880 # that can reflect these, since alembic looks for this 881 opts = insp.get_foreign_keys("table")[0]["options"] 882 883 eq_(dict((k, opts[k]) for k in opts if opts[k]), {}) 884 885 opts = insp.get_foreign_keys("user")[0]["options"] 886 eq_(dict((k, opts[k]) for k in opts if opts[k]), options) 887 888 def _assert_insp_indexes(self, indexes, expected_indexes): 889 index_names = [d["name"] for d in indexes] 890 for e_index in expected_indexes: 891 assert e_index["name"] in index_names 892 index = indexes[index_names.index(e_index["name"])] 893 for key in e_index: 894 eq_(e_index[key], index[key]) 895 896 @testing.provide_metadata 897 def _test_get_indexes(self, schema=None): 898 meta = self.metadata 899 900 # The database may decide to create indexes for foreign keys, etc. 901 # so there may be more indexes than expected. 902 insp = inspect(meta.bind) 903 indexes = insp.get_indexes("users", schema=schema) 904 expected_indexes = [ 905 { 906 "unique": False, 907 "column_names": ["test1", "test2"], 908 "name": "users_t_idx", 909 }, 910 { 911 "unique": False, 912 "column_names": ["user_id", "test2", "test1"], 913 "name": "users_all_idx", 914 }, 915 ] 916 self._assert_insp_indexes(indexes, expected_indexes) 917 918 @testing.requires.index_reflection 919 def test_get_indexes(self): 920 self._test_get_indexes() 921 922 @testing.requires.index_reflection 923 @testing.requires.schemas 924 def test_get_indexes_with_schema(self): 925 self._test_get_indexes(schema=testing.config.test_schema) 926 927 @testing.provide_metadata 928 def _test_get_noncol_index(self, tname, ixname): 929 meta = self.metadata 930 insp = inspect(meta.bind) 931 indexes = insp.get_indexes(tname) 932 933 # reflecting an index that has "x DESC" in it as the column. 934 # the DB may or may not give us "x", but make sure we get the index 935 # back, it has a name, it's connected to the table. 936 expected_indexes = [{"unique": False, "name": ixname}] 937 self._assert_insp_indexes(indexes, expected_indexes) 938 939 t = Table(tname, meta, autoload_with=meta.bind) 940 eq_(len(t.indexes), 1) 941 is_(list(t.indexes)[0].table, t) 942 eq_(list(t.indexes)[0].name, ixname) 943 944 @testing.requires.index_reflection 945 @testing.requires.indexes_with_ascdesc 946 def test_get_noncol_index_no_pk(self): 947 self._test_get_noncol_index("noncol_idx_test_nopk", "noncol_idx_nopk") 948 949 @testing.requires.index_reflection 950 @testing.requires.indexes_with_ascdesc 951 def test_get_noncol_index_pk(self): 952 self._test_get_noncol_index("noncol_idx_test_pk", "noncol_idx_pk") 953 954 @testing.requires.indexes_with_expressions 955 @testing.provide_metadata 956 def test_reflect_expression_based_indexes(self): 957 Table( 958 "t", 959 self.metadata, 960 Column("x", String(30)), 961 Column("y", String(30)), 962 ) 963 event.listen( 964 self.metadata, 965 "after_create", 966 DDL("CREATE INDEX t_idx ON t(lower(x), lower(y))"), 967 ) 968 event.listen( 969 self.metadata, "after_create", DDL("CREATE INDEX t_idx_2 ON t(x)") 970 ) 971 self.metadata.create_all() 972 973 insp = inspect(self.metadata.bind) 974 975 with expect_warnings( 976 "Skipped unsupported reflection of expression-based index t_idx" 977 ): 978 eq_( 979 insp.get_indexes("t"), 980 [{"name": "t_idx_2", "column_names": ["x"], "unique": 0}], 981 ) 982 983 @testing.requires.unique_constraint_reflection 984 def test_get_unique_constraints(self): 985 self._test_get_unique_constraints() 986 987 @testing.requires.temp_table_reflection 988 @testing.requires.unique_constraint_reflection 989 def test_get_temp_table_unique_constraints(self): 990 insp = inspect(self.bind) 991 reflected = insp.get_unique_constraints("user_tmp") 992 for refl in reflected: 993 # Different dialects handle duplicate index and constraints 994 # differently, so ignore this flag 995 refl.pop("duplicates_index", None) 996 eq_(reflected, [{"column_names": ["name"], "name": "user_tmp_uq"}]) 997 998 @testing.requires.temp_table_reflection 999 def test_get_temp_table_indexes(self): 1000 insp = inspect(self.bind) 1001 indexes = insp.get_indexes("user_tmp") 1002 for ind in indexes: 1003 ind.pop("dialect_options", None) 1004 eq_( 1005 # TODO: we need to add better filtering for indexes/uq constraints 1006 # that are doubled up 1007 [idx for idx in indexes if idx["name"] == "user_tmp_ix"], 1008 [ 1009 { 1010 "unique": False, 1011 "column_names": ["foo"], 1012 "name": "user_tmp_ix", 1013 } 1014 ], 1015 ) 1016 1017 @testing.requires.unique_constraint_reflection 1018 @testing.requires.schemas 1019 def test_get_unique_constraints_with_schema(self): 1020 self._test_get_unique_constraints(schema=testing.config.test_schema) 1021 1022 @testing.provide_metadata 1023 def _test_get_unique_constraints(self, schema=None): 1024 # SQLite dialect needs to parse the names of the constraints 1025 # separately from what it gets from PRAGMA index_list(), and 1026 # then matches them up. so same set of column_names in two 1027 # constraints will confuse it. Perhaps we should no longer 1028 # bother with index_list() here since we have the whole 1029 # CREATE TABLE? 1030 uniques = sorted( 1031 [ 1032 {"name": "unique_a", "column_names": ["a"]}, 1033 {"name": "unique_a_b_c", "column_names": ["a", "b", "c"]}, 1034 {"name": "unique_c_a_b", "column_names": ["c", "a", "b"]}, 1035 {"name": "unique_asc_key", "column_names": ["asc", "key"]}, 1036 {"name": "i.have.dots", "column_names": ["b"]}, 1037 {"name": "i have spaces", "column_names": ["c"]}, 1038 ], 1039 key=operator.itemgetter("name"), 1040 ) 1041 orig_meta = self.metadata 1042 table = Table( 1043 "testtbl", 1044 orig_meta, 1045 Column("a", sa.String(20)), 1046 Column("b", sa.String(30)), 1047 Column("c", sa.Integer), 1048 # reserved identifiers 1049 Column("asc", sa.String(30)), 1050 Column("key", sa.String(30)), 1051 schema=schema, 1052 ) 1053 for uc in uniques: 1054 table.append_constraint( 1055 sa.UniqueConstraint(*uc["column_names"], name=uc["name"]) 1056 ) 1057 orig_meta.create_all() 1058 1059 inspector = inspect(orig_meta.bind) 1060 reflected = sorted( 1061 inspector.get_unique_constraints("testtbl", schema=schema), 1062 key=operator.itemgetter("name"), 1063 ) 1064 1065 names_that_duplicate_index = set() 1066 1067 for orig, refl in zip(uniques, reflected): 1068 # Different dialects handle duplicate index and constraints 1069 # differently, so ignore this flag 1070 dupe = refl.pop("duplicates_index", None) 1071 if dupe: 1072 names_that_duplicate_index.add(dupe) 1073 eq_(orig, refl) 1074 1075 reflected_metadata = MetaData() 1076 reflected = Table( 1077 "testtbl", 1078 reflected_metadata, 1079 autoload_with=orig_meta.bind, 1080 schema=schema, 1081 ) 1082 1083 # test "deduplicates for index" logic. MySQL and Oracle 1084 # "unique constraints" are actually unique indexes (with possible 1085 # exception of a unique that is a dupe of another one in the case 1086 # of Oracle). make sure # they aren't duplicated. 1087 idx_names = set([idx.name for idx in reflected.indexes]) 1088 uq_names = set( 1089 [ 1090 uq.name 1091 for uq in reflected.constraints 1092 if isinstance(uq, sa.UniqueConstraint) 1093 ] 1094 ).difference(["unique_c_a_b"]) 1095 1096 assert not idx_names.intersection(uq_names) 1097 if names_that_duplicate_index: 1098 eq_(names_that_duplicate_index, idx_names) 1099 eq_(uq_names, set()) 1100 1101 @testing.requires.check_constraint_reflection 1102 def test_get_check_constraints(self): 1103 self._test_get_check_constraints() 1104 1105 @testing.requires.check_constraint_reflection 1106 @testing.requires.schemas 1107 def test_get_check_constraints_schema(self): 1108 self._test_get_check_constraints(schema=testing.config.test_schema) 1109 1110 @testing.provide_metadata 1111 def _test_get_check_constraints(self, schema=None): 1112 orig_meta = self.metadata 1113 Table( 1114 "sa_cc", 1115 orig_meta, 1116 Column("a", Integer()), 1117 sa.CheckConstraint("a > 1 AND a < 5", name="cc1"), 1118 sa.CheckConstraint("a = 1 OR (a > 2 AND a < 5)", name="cc2"), 1119 schema=schema, 1120 ) 1121 1122 orig_meta.create_all() 1123 1124 inspector = inspect(orig_meta.bind) 1125 reflected = sorted( 1126 inspector.get_check_constraints("sa_cc", schema=schema), 1127 key=operator.itemgetter("name"), 1128 ) 1129 1130 # trying to minimize effect of quoting, parenthesis, etc. 1131 # may need to add more to this as new dialects get CHECK 1132 # constraint reflection support 1133 def normalize(sqltext): 1134 return " ".join( 1135 re.findall(r"and|\d|=|a|or|<|>", sqltext.lower(), re.I) 1136 ) 1137 1138 reflected = [ 1139 {"name": item["name"], "sqltext": normalize(item["sqltext"])} 1140 for item in reflected 1141 ] 1142 eq_( 1143 reflected, 1144 [ 1145 {"name": "cc1", "sqltext": "a > 1 and a < 5"}, 1146 {"name": "cc2", "sqltext": "a = 1 or a > 2 and a < 5"}, 1147 ], 1148 ) 1149 1150 @testing.provide_metadata 1151 def _test_get_view_definition(self, schema=None): 1152 meta = self.metadata 1153 view_name1 = "users_v" 1154 view_name2 = "email_addresses_v" 1155 insp = inspect(meta.bind) 1156 v1 = insp.get_view_definition(view_name1, schema=schema) 1157 self.assert_(v1) 1158 v2 = insp.get_view_definition(view_name2, schema=schema) 1159 self.assert_(v2) 1160 1161 @testing.requires.view_reflection 1162 def test_get_view_definition(self): 1163 self._test_get_view_definition() 1164 1165 @testing.requires.view_reflection 1166 @testing.requires.schemas 1167 def test_get_view_definition_with_schema(self): 1168 self._test_get_view_definition(schema=testing.config.test_schema) 1169 1170 @testing.only_on("postgresql", "PG specific feature") 1171 @testing.provide_metadata 1172 def _test_get_table_oid(self, table_name, schema=None): 1173 meta = self.metadata 1174 insp = inspect(meta.bind) 1175 oid = insp.get_table_oid(table_name, schema) 1176 self.assert_(isinstance(oid, int)) 1177 1178 def test_get_table_oid(self): 1179 self._test_get_table_oid("users") 1180 1181 @testing.requires.schemas 1182 def test_get_table_oid_with_schema(self): 1183 self._test_get_table_oid("users", schema=testing.config.test_schema) 1184 1185 @testing.requires.table_reflection 1186 @testing.provide_metadata 1187 def test_autoincrement_col(self): 1188 """test that 'autoincrement' is reflected according to sqla's policy. 1189 1190 Don't mark this test as unsupported for any backend ! 1191 1192 (technically it fails with MySQL InnoDB since "id" comes before "id2") 1193 1194 A backend is better off not returning "autoincrement" at all, 1195 instead of potentially returning "False" for an auto-incrementing 1196 primary key column. 1197 1198 """ 1199 1200 meta = self.metadata 1201 insp = inspect(meta.bind) 1202 1203 for tname, cname in [ 1204 ("users", "user_id"), 1205 ("email_addresses", "address_id"), 1206 ("dingalings", "dingaling_id"), 1207 ]: 1208 cols = insp.get_columns(tname) 1209 id_ = {c["name"]: c for c in cols}[cname] 1210 assert id_.get("autoincrement", True) 1211 1212 1213class NormalizedNameTest(fixtures.TablesTest): 1214 __requires__ = ("denormalized_names",) 1215 __backend__ = True 1216 1217 @classmethod 1218 def define_tables(cls, metadata): 1219 Table( 1220 quoted_name("t1", quote=True), 1221 metadata, 1222 Column("id", Integer, primary_key=True), 1223 ) 1224 Table( 1225 quoted_name("t2", quote=True), 1226 metadata, 1227 Column("id", Integer, primary_key=True), 1228 Column("t1id", ForeignKey("t1.id")), 1229 ) 1230 1231 def test_reflect_lowercase_forced_tables(self): 1232 1233 m2 = MetaData(testing.db) 1234 t2_ref = Table(quoted_name("t2", quote=True), m2, autoload=True) 1235 t1_ref = m2.tables["t1"] 1236 assert t2_ref.c.t1id.references(t1_ref.c.id) 1237 1238 m3 = MetaData(testing.db) 1239 m3.reflect(only=lambda name, m: name.lower() in ("t1", "t2")) 1240 assert m3.tables["t2"].c.t1id.references(m3.tables["t1"].c.id) 1241 1242 def test_get_table_names(self): 1243 tablenames = [ 1244 t 1245 for t in inspect(testing.db).get_table_names() 1246 if t.lower() in ("t1", "t2") 1247 ] 1248 1249 eq_(tablenames[0].upper(), tablenames[0].lower()) 1250 eq_(tablenames[1].upper(), tablenames[1].lower()) 1251 1252 1253class ComputedReflectionTest(fixtures.ComputedReflectionFixtureTest): 1254 def test_computed_col_default_not_set(self): 1255 insp = inspect(config.db) 1256 1257 cols = insp.get_columns("computed_column_table") 1258 for col in cols: 1259 if col["name"] == "with_default": 1260 is_true("42" in col["default"]) 1261 elif not col["autoincrement"]: 1262 is_(col["default"], None) 1263 1264 def test_get_column_returns_computed(self): 1265 insp = inspect(config.db) 1266 1267 cols = insp.get_columns("computed_default_table") 1268 data = {c["name"]: c for c in cols} 1269 for key in ("id", "normal", "with_default"): 1270 is_true("computed" not in data[key]) 1271 compData = data["computed_col"] 1272 is_true("computed" in compData) 1273 is_true("sqltext" in compData["computed"]) 1274 eq_(self.normalize(compData["computed"]["sqltext"]), "normal+42") 1275 eq_( 1276 "persisted" in compData["computed"], 1277 testing.requires.computed_columns_reflect_persisted.enabled, 1278 ) 1279 if testing.requires.computed_columns_reflect_persisted.enabled: 1280 eq_( 1281 compData["computed"]["persisted"], 1282 testing.requires.computed_columns_default_persisted.enabled, 1283 ) 1284 1285 def check_column(self, data, column, sqltext, persisted): 1286 is_true("computed" in data[column]) 1287 compData = data[column]["computed"] 1288 eq_(self.normalize(compData["sqltext"]), sqltext) 1289 if testing.requires.computed_columns_reflect_persisted.enabled: 1290 is_true("persisted" in compData) 1291 is_(compData["persisted"], persisted) 1292 1293 def test_get_column_returns_persisted(self): 1294 insp = inspect(config.db) 1295 1296 cols = insp.get_columns("computed_column_table") 1297 data = {c["name"]: c for c in cols} 1298 1299 self.check_column( 1300 data, 1301 "computed_no_flag", 1302 "normal+42", 1303 testing.requires.computed_columns_default_persisted.enabled, 1304 ) 1305 if testing.requires.computed_columns_virtual.enabled: 1306 self.check_column( 1307 data, 1308 "computed_virtual", 1309 "normal+2", 1310 False, 1311 ) 1312 if testing.requires.computed_columns_stored.enabled: 1313 self.check_column( 1314 data, 1315 "computed_stored", 1316 "normal-42", 1317 True, 1318 ) 1319 1320 @testing.requires.schemas 1321 def test_get_column_returns_persisted_with_schema(self): 1322 insp = inspect(config.db) 1323 1324 cols = insp.get_columns( 1325 "computed_column_table", schema=config.test_schema 1326 ) 1327 data = {c["name"]: c for c in cols} 1328 1329 self.check_column( 1330 data, 1331 "computed_no_flag", 1332 "normal/42", 1333 testing.requires.computed_columns_default_persisted.enabled, 1334 ) 1335 if testing.requires.computed_columns_virtual.enabled: 1336 self.check_column( 1337 data, 1338 "computed_virtual", 1339 "normal/2", 1340 False, 1341 ) 1342 if testing.requires.computed_columns_stored.enabled: 1343 self.check_column( 1344 data, 1345 "computed_stored", 1346 "normal*42", 1347 True, 1348 ) 1349 1350 1351class CompositeKeyReflectionTest(fixtures.TablesTest): 1352 __backend__ = True 1353 1354 @classmethod 1355 def define_tables(cls, metadata): 1356 tb1 = Table( 1357 "tb1", 1358 metadata, 1359 Column("id", Integer), 1360 Column("attr", Integer), 1361 Column("name", sql_types.VARCHAR(20)), 1362 sa.PrimaryKeyConstraint("name", "id", "attr", name="pk_tb1"), 1363 schema=None, 1364 test_needs_fk=True, 1365 ) 1366 Table( 1367 "tb2", 1368 metadata, 1369 Column("id", Integer, primary_key=True), 1370 Column("pid", Integer), 1371 Column("pattr", Integer), 1372 Column("pname", sql_types.VARCHAR(20)), 1373 sa.ForeignKeyConstraint( 1374 ["pname", "pid", "pattr"], 1375 [tb1.c.name, tb1.c.id, tb1.c.attr], 1376 name="fk_tb1_name_id_attr", 1377 ), 1378 schema=None, 1379 test_needs_fk=True, 1380 ) 1381 1382 @testing.requires.primary_key_constraint_reflection 1383 @testing.provide_metadata 1384 def test_pk_column_order(self): 1385 # test for issue #5661 1386 meta = self.metadata 1387 insp = inspect(meta.bind) 1388 primary_key = insp.get_pk_constraint(self.tables.tb1.name) 1389 eq_(primary_key.get("constrained_columns"), ["name", "id", "attr"]) 1390 1391 @testing.requires.foreign_key_constraint_reflection 1392 @testing.provide_metadata 1393 def test_fk_column_order(self): 1394 # test for issue #5661 1395 meta = self.metadata 1396 insp = inspect(meta.bind) 1397 foreign_keys = insp.get_foreign_keys(self.tables.tb2.name) 1398 eq_(len(foreign_keys), 1) 1399 fkey1 = foreign_keys[0] 1400 eq_(fkey1.get("referred_columns"), ["name", "id", "attr"]) 1401 eq_(fkey1.get("constrained_columns"), ["pname", "pid", "pattr"]) 1402 1403 1404__all__ = ( 1405 "ComponentReflectionTest", 1406 "QuotedNameArgumentTest", 1407 "HasTableTest", 1408 "NormalizedNameTest", 1409 "ComputedReflectionTest", 1410 "CompositeKeyReflectionTest", 1411) 1412