1from sqlalchemy import Integer, ForeignKey, String, func 2from sqlalchemy.types import PickleType, TypeDecorator, VARCHAR 3from sqlalchemy.orm import mapper, Session, composite, column_property 4from sqlalchemy.orm.mapper import Mapper 5from sqlalchemy.orm.instrumentation import ClassManager 6from sqlalchemy.testing.schema import Table, Column 7from sqlalchemy.testing import eq_, assert_raises_message, assert_raises 8from sqlalchemy.testing.util import picklers 9from sqlalchemy.testing import fixtures 10from sqlalchemy.ext.mutable import MutableComposite 11from sqlalchemy.ext.mutable import MutableDict, MutableList, MutableSet 12 13 14class Foo(fixtures.BasicEntity): 15 pass 16 17 18class SubFoo(Foo): 19 pass 20 21 22class FooWithEq(object): 23 24 def __init__(self, **kw): 25 for k in kw: 26 setattr(self, k, kw[k]) 27 28 def __hash__(self): 29 return hash(self.id) 30 31 def __eq__(self, other): 32 return self.id == other.id 33 34 35class Point(MutableComposite): 36 37 def __init__(self, x, y): 38 self.x = x 39 self.y = y 40 41 def __setattr__(self, key, value): 42 object.__setattr__(self, key, value) 43 self.changed() 44 45 def __composite_values__(self): 46 return self.x, self.y 47 48 def __getstate__(self): 49 return self.x, self.y 50 51 def __setstate__(self, state): 52 self.x, self.y = state 53 54 def __eq__(self, other): 55 return isinstance(other, Point) and \ 56 other.x == self.x and \ 57 other.y == self.y 58 59 60class MyPoint(Point): 61 62 @classmethod 63 def coerce(cls, key, value): 64 if isinstance(value, tuple): 65 value = Point(*value) 66 return value 67 68 69class _MutableDictTestFixture(object): 70 @classmethod 71 def _type_fixture(cls): 72 return MutableDict 73 74 def teardown(self): 75 # clear out mapper events 76 Mapper.dispatch._clear() 77 ClassManager.dispatch._clear() 78 super(_MutableDictTestFixture, self).teardown() 79 80 81class _MutableDictTestBase(_MutableDictTestFixture): 82 run_define_tables = 'each' 83 84 def setup_mappers(cls): 85 foo = cls.tables.foo 86 87 mapper(Foo, foo) 88 89 def test_coerce_none(self): 90 sess = Session() 91 f1 = Foo(data=None) 92 sess.add(f1) 93 sess.commit() 94 eq_(f1.data, None) 95 96 def test_coerce_raise(self): 97 assert_raises_message( 98 ValueError, 99 "Attribute 'data' does not accept objects of type", 100 Foo, data=set([1, 2, 3]) 101 ) 102 103 def test_in_place_mutation(self): 104 sess = Session() 105 106 f1 = Foo(data={'a': 'b'}) 107 sess.add(f1) 108 sess.commit() 109 110 f1.data['a'] = 'c' 111 sess.commit() 112 113 eq_(f1.data, {'a': 'c'}) 114 115 def test_clear(self): 116 sess = Session() 117 118 f1 = Foo(data={'a': 'b'}) 119 sess.add(f1) 120 sess.commit() 121 122 f1.data.clear() 123 sess.commit() 124 125 eq_(f1.data, {}) 126 127 def test_update(self): 128 sess = Session() 129 130 f1 = Foo(data={'a': 'b'}) 131 sess.add(f1) 132 sess.commit() 133 134 f1.data.update({'a': 'z'}) 135 sess.commit() 136 137 eq_(f1.data, {'a': 'z'}) 138 139 def test_pop(self): 140 sess = Session() 141 142 f1 = Foo(data={'a': 'b', 'c': 'd'}) 143 sess.add(f1) 144 sess.commit() 145 146 eq_(f1.data.pop('a'), 'b') 147 sess.commit() 148 149 assert_raises(KeyError, f1.data.pop, 'g') 150 151 eq_(f1.data, {'c': 'd'}) 152 153 def test_pop_default(self): 154 sess = Session() 155 156 f1 = Foo(data={'a': 'b', 'c': 'd'}) 157 sess.add(f1) 158 sess.commit() 159 160 eq_(f1.data.pop('a', 'q'), 'b') 161 eq_(f1.data.pop('a', 'q'), 'q') 162 sess.commit() 163 164 eq_(f1.data, {'c': 'd'}) 165 166 def test_popitem(self): 167 sess = Session() 168 169 orig = {'a': 'b', 'c': 'd'} 170 171 # the orig dict remains unchanged when we assign, 172 # but just making this future-proof 173 data = dict(orig) 174 f1 = Foo(data=data) 175 sess.add(f1) 176 sess.commit() 177 178 k, v = f1.data.popitem() 179 assert k in ('a', 'c') 180 orig.pop(k) 181 182 sess.commit() 183 184 eq_(f1.data, orig) 185 186 def test_setdefault(self): 187 sess = Session() 188 189 f1 = Foo(data={'a': 'b'}) 190 sess.add(f1) 191 sess.commit() 192 193 eq_(f1.data.setdefault('c', 'd'), 'd') 194 sess.commit() 195 196 eq_(f1.data, {'a': 'b', 'c': 'd'}) 197 198 eq_(f1.data.setdefault('c', 'q'), 'd') 199 sess.commit() 200 201 eq_(f1.data, {'a': 'b', 'c': 'd'}) 202 203 def test_replace(self): 204 sess = Session() 205 f1 = Foo(data={'a': 'b'}) 206 sess.add(f1) 207 sess.flush() 208 209 f1.data = {'b': 'c'} 210 sess.commit() 211 eq_(f1.data, {'b': 'c'}) 212 213 def test_replace_itself_still_ok(self): 214 sess = Session() 215 f1 = Foo(data={'a': 'b'}) 216 sess.add(f1) 217 sess.flush() 218 219 f1.data = f1.data 220 f1.data['b'] = 'c' 221 sess.commit() 222 eq_(f1.data, {'a': 'b', 'b': 'c'}) 223 224 def test_pickle_parent(self): 225 sess = Session() 226 227 f1 = Foo(data={'a': 'b'}) 228 sess.add(f1) 229 sess.commit() 230 f1.data 231 sess.close() 232 233 for loads, dumps in picklers(): 234 sess = Session() 235 f2 = loads(dumps(f1)) 236 sess.add(f2) 237 f2.data['a'] = 'c' 238 assert f2 in sess.dirty 239 240 def test_unrelated_flush(self): 241 sess = Session() 242 f1 = Foo(data={"a": "b"}, unrelated_data="unrelated") 243 sess.add(f1) 244 sess.flush() 245 f1.unrelated_data = "unrelated 2" 246 sess.flush() 247 f1.data["a"] = "c" 248 sess.commit() 249 eq_(f1.data["a"], "c") 250 251 def _test_non_mutable(self): 252 sess = Session() 253 254 f1 = Foo(non_mutable_data={'a': 'b'}) 255 sess.add(f1) 256 sess.commit() 257 258 f1.non_mutable_data['a'] = 'c' 259 sess.commit() 260 261 eq_(f1.non_mutable_data, {'a': 'b'}) 262 263 264class _MutableListTestFixture(object): 265 @classmethod 266 def _type_fixture(cls): 267 return MutableList 268 269 def teardown(self): 270 # clear out mapper events 271 Mapper.dispatch._clear() 272 ClassManager.dispatch._clear() 273 super(_MutableListTestFixture, self).teardown() 274 275 276class _MutableListTestBase(_MutableListTestFixture): 277 run_define_tables = 'each' 278 279 def setup_mappers(cls): 280 foo = cls.tables.foo 281 282 mapper(Foo, foo) 283 284 def test_coerce_none(self): 285 sess = Session() 286 f1 = Foo(data=None) 287 sess.add(f1) 288 sess.commit() 289 eq_(f1.data, None) 290 291 def test_coerce_raise(self): 292 assert_raises_message( 293 ValueError, 294 "Attribute 'data' does not accept objects of type", 295 Foo, data=set([1, 2, 3]) 296 ) 297 298 def test_in_place_mutation(self): 299 sess = Session() 300 301 f1 = Foo(data=[1, 2]) 302 sess.add(f1) 303 sess.commit() 304 305 f1.data[0] = 3 306 sess.commit() 307 308 eq_(f1.data, [3, 2]) 309 310 def test_in_place_slice_mutation(self): 311 sess = Session() 312 313 f1 = Foo(data=[1, 2, 3, 4]) 314 sess.add(f1) 315 sess.commit() 316 317 f1.data[1:3] = 5, 6 318 sess.commit() 319 320 eq_(f1.data, [1, 5, 6, 4]) 321 322 def test_del_slice(self): 323 sess = Session() 324 325 f1 = Foo(data=[1, 2, 3, 4]) 326 sess.add(f1) 327 sess.commit() 328 329 del f1.data[1:3] 330 sess.commit() 331 332 eq_(f1.data, [1, 4]) 333 334 def test_clear(self): 335 if not hasattr(list, 'clear'): 336 # py2 list doesn't have 'clear' 337 return 338 sess = Session() 339 340 f1 = Foo(data=[1, 2]) 341 sess.add(f1) 342 sess.commit() 343 344 f1.data.clear() 345 sess.commit() 346 347 eq_(f1.data, []) 348 349 def test_pop(self): 350 sess = Session() 351 352 f1 = Foo(data=[1, 2, 3]) 353 sess.add(f1) 354 sess.commit() 355 356 eq_(f1.data.pop(), 3) 357 eq_(f1.data.pop(0), 1) 358 sess.commit() 359 360 assert_raises(IndexError, f1.data.pop, 5) 361 362 eq_(f1.data, [2]) 363 364 def test_append(self): 365 sess = Session() 366 367 f1 = Foo(data=[1, 2]) 368 sess.add(f1) 369 sess.commit() 370 371 f1.data.append(5) 372 sess.commit() 373 374 eq_(f1.data, [1, 2, 5]) 375 376 def test_extend(self): 377 sess = Session() 378 379 f1 = Foo(data=[1, 2]) 380 sess.add(f1) 381 sess.commit() 382 383 f1.data.extend([5]) 384 sess.commit() 385 386 eq_(f1.data, [1, 2, 5]) 387 388 def test_insert(self): 389 sess = Session() 390 391 f1 = Foo(data=[1, 2]) 392 sess.add(f1) 393 sess.commit() 394 395 f1.data.insert(1, 5) 396 sess.commit() 397 398 eq_(f1.data, [1, 5, 2]) 399 400 def test_remove(self): 401 sess = Session() 402 403 f1 = Foo(data=[1, 2, 3]) 404 sess.add(f1) 405 sess.commit() 406 407 f1.data.remove(2) 408 sess.commit() 409 410 eq_(f1.data, [1, 3]) 411 412 def test_sort(self): 413 sess = Session() 414 415 f1 = Foo(data=[1, 3, 2]) 416 sess.add(f1) 417 sess.commit() 418 419 f1.data.sort() 420 sess.commit() 421 422 eq_(f1.data, [1, 2, 3]) 423 424 def test_reverse(self): 425 sess = Session() 426 427 f1 = Foo(data=[1, 3, 2]) 428 sess.add(f1) 429 sess.commit() 430 431 f1.data.reverse() 432 sess.commit() 433 434 eq_(f1.data, [2, 3, 1]) 435 436 def test_pickle_parent(self): 437 sess = Session() 438 439 f1 = Foo(data=[1, 2]) 440 sess.add(f1) 441 sess.commit() 442 f1.data 443 sess.close() 444 445 for loads, dumps in picklers(): 446 sess = Session() 447 f2 = loads(dumps(f1)) 448 sess.add(f2) 449 f2.data[0] = 3 450 assert f2 in sess.dirty 451 452 def test_unrelated_flush(self): 453 sess = Session() 454 f1 = Foo(data=[1, 2], unrelated_data="unrelated") 455 sess.add(f1) 456 sess.flush() 457 f1.unrelated_data = "unrelated 2" 458 sess.flush() 459 f1.data[0] = 3 460 sess.commit() 461 eq_(f1.data[0], 3) 462 463 464class _MutableSetTestFixture(object): 465 @classmethod 466 def _type_fixture(cls): 467 return MutableSet 468 469 def teardown(self): 470 # clear out mapper events 471 Mapper.dispatch._clear() 472 ClassManager.dispatch._clear() 473 super(_MutableSetTestFixture, self).teardown() 474 475 476class _MutableSetTestBase(_MutableSetTestFixture): 477 run_define_tables = 'each' 478 479 def setup_mappers(cls): 480 foo = cls.tables.foo 481 482 mapper(Foo, foo) 483 484 def test_coerce_none(self): 485 sess = Session() 486 f1 = Foo(data=None) 487 sess.add(f1) 488 sess.commit() 489 eq_(f1.data, None) 490 491 def test_coerce_raise(self): 492 assert_raises_message( 493 ValueError, 494 "Attribute 'data' does not accept objects of type", 495 Foo, data=[1, 2, 3] 496 ) 497 498 def test_clear(self): 499 sess = Session() 500 501 f1 = Foo(data=set([1, 2])) 502 sess.add(f1) 503 sess.commit() 504 505 f1.data.clear() 506 sess.commit() 507 508 eq_(f1.data, set()) 509 510 def test_pop(self): 511 sess = Session() 512 513 f1 = Foo(data=set([1])) 514 sess.add(f1) 515 sess.commit() 516 517 eq_(f1.data.pop(), 1) 518 sess.commit() 519 520 assert_raises(KeyError, f1.data.pop) 521 522 eq_(f1.data, set()) 523 524 def test_add(self): 525 sess = Session() 526 527 f1 = Foo(data=set([1, 2])) 528 sess.add(f1) 529 sess.commit() 530 531 f1.data.add(5) 532 sess.commit() 533 534 eq_(f1.data, set([1, 2, 5])) 535 536 def test_update(self): 537 sess = Session() 538 539 f1 = Foo(data=set([1, 2])) 540 sess.add(f1) 541 sess.commit() 542 543 f1.data.update(set([2, 5])) 544 sess.commit() 545 546 eq_(f1.data, set([1, 2, 5])) 547 548 def test_intersection_update(self): 549 sess = Session() 550 551 f1 = Foo(data=set([1, 2])) 552 sess.add(f1) 553 sess.commit() 554 555 f1.data.intersection_update(set([2, 5])) 556 sess.commit() 557 558 eq_(f1.data, set([2])) 559 560 def test_difference_update(self): 561 sess = Session() 562 563 f1 = Foo(data=set([1, 2])) 564 sess.add(f1) 565 sess.commit() 566 567 f1.data.difference_update(set([2, 5])) 568 sess.commit() 569 570 eq_(f1.data, set([1])) 571 572 def test_symmetric_difference_update(self): 573 sess = Session() 574 575 f1 = Foo(data=set([1, 2])) 576 sess.add(f1) 577 sess.commit() 578 579 f1.data.symmetric_difference_update(set([2, 5])) 580 sess.commit() 581 582 eq_(f1.data, set([1, 5])) 583 584 def test_remove(self): 585 sess = Session() 586 587 f1 = Foo(data=set([1, 2, 3])) 588 sess.add(f1) 589 sess.commit() 590 591 f1.data.remove(2) 592 sess.commit() 593 594 eq_(f1.data, set([1, 3])) 595 596 def test_discard(self): 597 sess = Session() 598 599 f1 = Foo(data=set([1, 2, 3])) 600 sess.add(f1) 601 sess.commit() 602 603 f1.data.discard(2) 604 sess.commit() 605 606 eq_(f1.data, set([1, 3])) 607 608 f1.data.discard(2) 609 sess.commit() 610 611 eq_(f1.data, set([1, 3])) 612 613 def test_pickle_parent(self): 614 sess = Session() 615 616 f1 = Foo(data=set([1, 2])) 617 sess.add(f1) 618 sess.commit() 619 f1.data 620 sess.close() 621 622 for loads, dumps in picklers(): 623 sess = Session() 624 f2 = loads(dumps(f1)) 625 sess.add(f2) 626 f2.data.add(3) 627 assert f2 in sess.dirty 628 629 def test_unrelated_flush(self): 630 sess = Session() 631 f1 = Foo(data=set([1, 2]), unrelated_data="unrelated") 632 sess.add(f1) 633 sess.flush() 634 f1.unrelated_data = "unrelated 2" 635 sess.flush() 636 f1.data.add(3) 637 sess.commit() 638 eq_(f1.data, set([1, 2, 3])) 639 640 641class MutableColumnDefaultTest(_MutableDictTestFixture, fixtures.MappedTest): 642 @classmethod 643 def define_tables(cls, metadata): 644 MutableDict = cls._type_fixture() 645 646 mutable_pickle = MutableDict.as_mutable(PickleType) 647 Table( 648 'foo', metadata, 649 Column( 650 'id', Integer, primary_key=True, 651 test_needs_autoincrement=True), 652 Column('data', mutable_pickle, default={}), 653 ) 654 655 def setup_mappers(cls): 656 foo = cls.tables.foo 657 658 mapper(Foo, foo) 659 660 def test_evt_on_flush_refresh(self): 661 # test for #3427 662 663 sess = Session() 664 665 f1 = Foo() 666 sess.add(f1) 667 sess.flush() 668 assert isinstance(f1.data, self._type_fixture()) 669 assert f1 not in sess.dirty 670 f1.data['foo'] = 'bar' 671 assert f1 in sess.dirty 672 673 674class MutableWithScalarPickleTest(_MutableDictTestBase, fixtures.MappedTest): 675 676 @classmethod 677 def define_tables(cls, metadata): 678 MutableDict = cls._type_fixture() 679 680 mutable_pickle = MutableDict.as_mutable(PickleType) 681 Table('foo', metadata, 682 Column('id', Integer, primary_key=True, 683 test_needs_autoincrement=True), 684 Column('skip', mutable_pickle), 685 Column('data', mutable_pickle), 686 Column('non_mutable_data', PickleType), 687 Column('unrelated_data', String(50)) 688 ) 689 690 def test_non_mutable(self): 691 self._test_non_mutable() 692 693 694class MutableWithScalarJSONTest(_MutableDictTestBase, fixtures.MappedTest): 695 696 @classmethod 697 def define_tables(cls, metadata): 698 import json 699 700 class JSONEncodedDict(TypeDecorator): 701 impl = VARCHAR(50) 702 703 def process_bind_param(self, value, dialect): 704 if value is not None: 705 value = json.dumps(value) 706 707 return value 708 709 def process_result_value(self, value, dialect): 710 if value is not None: 711 value = json.loads(value) 712 return value 713 714 MutableDict = cls._type_fixture() 715 716 Table('foo', metadata, 717 Column('id', Integer, primary_key=True, 718 test_needs_autoincrement=True), 719 Column('data', MutableDict.as_mutable(JSONEncodedDict)), 720 Column('non_mutable_data', JSONEncodedDict), 721 Column('unrelated_data', String(50)) 722 ) 723 724 def test_non_mutable(self): 725 self._test_non_mutable() 726 727 728class MutableColumnCopyJSONTest(_MutableDictTestBase, fixtures.MappedTest): 729 730 @classmethod 731 def define_tables(cls, metadata): 732 import json 733 from sqlalchemy.ext.declarative import declarative_base 734 735 class JSONEncodedDict(TypeDecorator): 736 impl = VARCHAR(50) 737 738 def process_bind_param(self, value, dialect): 739 if value is not None: 740 value = json.dumps(value) 741 742 return value 743 744 def process_result_value(self, value, dialect): 745 if value is not None: 746 value = json.loads(value) 747 return value 748 749 MutableDict = cls._type_fixture() 750 751 Base = declarative_base(metadata=metadata) 752 753 class AbstractFoo(Base): 754 __abstract__ = True 755 756 id = Column(Integer, primary_key=True, 757 test_needs_autoincrement=True) 758 data = Column(MutableDict.as_mutable(JSONEncodedDict)) 759 non_mutable_data = Column(JSONEncodedDict) 760 unrelated_data = Column(String(50)) 761 762 class Foo(AbstractFoo): 763 __tablename__ = "foo" 764 column_prop = column_property( 765 func.lower(AbstractFoo.unrelated_data)) 766 767 assert Foo.data.property.columns[0].type is not AbstractFoo.data.type 768 769 def test_non_mutable(self): 770 self._test_non_mutable() 771 772 773class MutableColumnCopyArrayTest(_MutableListTestBase, fixtures.MappedTest): 774 __requires__ = 'array_type', 775 776 @classmethod 777 def define_tables(cls, metadata): 778 from sqlalchemy.ext.declarative import declarative_base 779 from sqlalchemy.sql.sqltypes import ARRAY 780 781 MutableList = cls._type_fixture() 782 783 Base = declarative_base(metadata=metadata) 784 785 class Mixin(object): 786 data = Column(MutableList.as_mutable(ARRAY(Integer))) 787 788 class Foo(Mixin, Base): 789 __tablename__ = 'foo' 790 id = Column(Integer, primary_key=True) 791 792 793class MutableListWithScalarPickleTest(_MutableListTestBase, 794 fixtures.MappedTest): 795 796 @classmethod 797 def define_tables(cls, metadata): 798 MutableList = cls._type_fixture() 799 800 mutable_pickle = MutableList.as_mutable(PickleType) 801 Table('foo', metadata, 802 Column('id', Integer, primary_key=True, 803 test_needs_autoincrement=True), 804 Column('skip', mutable_pickle), 805 Column('data', mutable_pickle), 806 Column('non_mutable_data', PickleType), 807 Column('unrelated_data', String(50)) 808 ) 809 810 811class MutableSetWithScalarPickleTest(_MutableSetTestBase, fixtures.MappedTest): 812 813 @classmethod 814 def define_tables(cls, metadata): 815 MutableSet = cls._type_fixture() 816 817 mutable_pickle = MutableSet.as_mutable(PickleType) 818 Table('foo', metadata, 819 Column('id', Integer, primary_key=True, 820 test_needs_autoincrement=True), 821 Column('skip', mutable_pickle), 822 Column('data', mutable_pickle), 823 Column('non_mutable_data', PickleType), 824 Column('unrelated_data', String(50)) 825 ) 826 827 828class MutableAssocWithAttrInheritTest(_MutableDictTestBase, 829 fixtures.MappedTest): 830 831 @classmethod 832 def define_tables(cls, metadata): 833 834 Table('foo', metadata, 835 Column('id', Integer, primary_key=True, 836 test_needs_autoincrement=True), 837 Column('data', PickleType), 838 Column('non_mutable_data', PickleType), 839 Column('unrelated_data', String(50)) 840 ) 841 842 Table('subfoo', metadata, 843 Column('id', Integer, ForeignKey('foo.id'), primary_key=True), 844 ) 845 846 def setup_mappers(cls): 847 foo = cls.tables.foo 848 subfoo = cls.tables.subfoo 849 850 mapper(Foo, foo) 851 mapper(SubFoo, subfoo, inherits=Foo) 852 MutableDict.associate_with_attribute(Foo.data) 853 854 def test_in_place_mutation(self): 855 sess = Session() 856 857 f1 = SubFoo(data={'a': 'b'}) 858 sess.add(f1) 859 sess.commit() 860 861 f1.data['a'] = 'c' 862 sess.commit() 863 864 eq_(f1.data, {'a': 'c'}) 865 866 def test_replace(self): 867 sess = Session() 868 f1 = SubFoo(data={'a': 'b'}) 869 sess.add(f1) 870 sess.flush() 871 872 f1.data = {'b': 'c'} 873 sess.commit() 874 eq_(f1.data, {'b': 'c'}) 875 876 877class MutableAssociationScalarPickleTest(_MutableDictTestBase, 878 fixtures.MappedTest): 879 880 @classmethod 881 def define_tables(cls, metadata): 882 MutableDict = cls._type_fixture() 883 MutableDict.associate_with(PickleType) 884 885 Table('foo', metadata, 886 Column('id', Integer, primary_key=True, 887 test_needs_autoincrement=True), 888 Column('skip', PickleType), 889 Column('data', PickleType), 890 Column('unrelated_data', String(50)) 891 ) 892 893 894class MutableAssociationScalarJSONTest(_MutableDictTestBase, 895 fixtures.MappedTest): 896 897 @classmethod 898 def define_tables(cls, metadata): 899 import json 900 901 class JSONEncodedDict(TypeDecorator): 902 impl = VARCHAR(50) 903 904 def process_bind_param(self, value, dialect): 905 if value is not None: 906 value = json.dumps(value) 907 908 return value 909 910 def process_result_value(self, value, dialect): 911 if value is not None: 912 value = json.loads(value) 913 return value 914 915 MutableDict = cls._type_fixture() 916 MutableDict.associate_with(JSONEncodedDict) 917 918 Table('foo', metadata, 919 Column('id', Integer, primary_key=True, 920 test_needs_autoincrement=True), 921 Column('data', JSONEncodedDict), 922 Column('unrelated_data', String(50)) 923 ) 924 925 926class CustomMutableAssociationScalarJSONTest(_MutableDictTestBase, 927 fixtures.MappedTest): 928 929 CustomMutableDict = None 930 931 @classmethod 932 def _type_fixture(cls): 933 if not(getattr(cls, 'CustomMutableDict')): 934 MutableDict = super( 935 CustomMutableAssociationScalarJSONTest, cls)._type_fixture() 936 937 class CustomMutableDict(MutableDict): 938 pass 939 cls.CustomMutableDict = CustomMutableDict 940 return cls.CustomMutableDict 941 942 @classmethod 943 def define_tables(cls, metadata): 944 import json 945 946 class JSONEncodedDict(TypeDecorator): 947 impl = VARCHAR(50) 948 949 def process_bind_param(self, value, dialect): 950 if value is not None: 951 value = json.dumps(value) 952 953 return value 954 955 def process_result_value(self, value, dialect): 956 if value is not None: 957 value = json.loads(value) 958 return value 959 960 CustomMutableDict = cls._type_fixture() 961 CustomMutableDict.associate_with(JSONEncodedDict) 962 963 Table('foo', metadata, 964 Column('id', Integer, primary_key=True, 965 test_needs_autoincrement=True), 966 Column('data', JSONEncodedDict), 967 Column('unrelated_data', String(50)) 968 ) 969 970 def test_pickle_parent(self): 971 # Picklers don't know how to pickle CustomMutableDict, 972 # but we aren't testing that here 973 pass 974 975 def test_coerce(self): 976 sess = Session() 977 f1 = Foo(data={'a': 'b'}) 978 sess.add(f1) 979 sess.flush() 980 eq_(type(f1.data), self._type_fixture()) 981 982 983class _CompositeTestBase(object): 984 985 @classmethod 986 def define_tables(cls, metadata): 987 Table('foo', metadata, 988 Column('id', Integer, primary_key=True, 989 test_needs_autoincrement=True), 990 Column('x', Integer), 991 Column('y', Integer), 992 Column('unrelated_data', String(50)) 993 ) 994 995 def setup(self): 996 from sqlalchemy.ext import mutable 997 mutable._setup_composite_listener() 998 super(_CompositeTestBase, self).setup() 999 1000 def teardown(self): 1001 # clear out mapper events 1002 Mapper.dispatch._clear() 1003 ClassManager.dispatch._clear() 1004 super(_CompositeTestBase, self).teardown() 1005 1006 @classmethod 1007 def _type_fixture(cls): 1008 1009 return Point 1010 1011 1012class MutableCompositeColumnDefaultTest(_CompositeTestBase, 1013 fixtures.MappedTest): 1014 @classmethod 1015 def define_tables(cls, metadata): 1016 Table( 1017 'foo', metadata, 1018 Column('id', Integer, primary_key=True, 1019 test_needs_autoincrement=True), 1020 Column('x', Integer, default=5), 1021 Column('y', Integer, default=9), 1022 Column('unrelated_data', String(50)) 1023 ) 1024 1025 @classmethod 1026 def setup_mappers(cls): 1027 foo = cls.tables.foo 1028 1029 cls.Point = cls._type_fixture() 1030 1031 mapper(Foo, foo, properties={ 1032 'data': composite(cls.Point, foo.c.x, foo.c.y) 1033 }) 1034 1035 def test_evt_on_flush_refresh(self): 1036 # this still worked prior to #3427 being fixed in any case 1037 1038 sess = Session() 1039 1040 f1 = Foo(data=self.Point(None, None)) 1041 sess.add(f1) 1042 sess.flush() 1043 eq_(f1.data, self.Point(5, 9)) 1044 assert f1 not in sess.dirty 1045 f1.data.x = 10 1046 assert f1 in sess.dirty 1047 1048 1049class MutableCompositesUnpickleTest(_CompositeTestBase, fixtures.MappedTest): 1050 1051 @classmethod 1052 def setup_mappers(cls): 1053 foo = cls.tables.foo 1054 1055 cls.Point = cls._type_fixture() 1056 1057 mapper(FooWithEq, foo, properties={ 1058 'data': composite(cls.Point, foo.c.x, foo.c.y) 1059 }) 1060 1061 def test_unpickle_modified_eq(self): 1062 u1 = FooWithEq(data=self.Point(3, 5)) 1063 for loads, dumps in picklers(): 1064 loads(dumps(u1)) 1065 1066 1067class MutableCompositesTest(_CompositeTestBase, fixtures.MappedTest): 1068 1069 @classmethod 1070 def setup_mappers(cls): 1071 foo = cls.tables.foo 1072 1073 Point = cls._type_fixture() 1074 1075 mapper(Foo, foo, properties={ 1076 'data': composite(Point, foo.c.x, foo.c.y) 1077 }) 1078 1079 def test_in_place_mutation(self): 1080 sess = Session() 1081 d = Point(3, 4) 1082 f1 = Foo(data=d) 1083 sess.add(f1) 1084 sess.commit() 1085 1086 f1.data.y = 5 1087 sess.commit() 1088 1089 eq_(f1.data, Point(3, 5)) 1090 1091 def test_pickle_of_parent(self): 1092 sess = Session() 1093 d = Point(3, 4) 1094 f1 = Foo(data=d) 1095 sess.add(f1) 1096 sess.commit() 1097 1098 f1.data 1099 assert 'data' in f1.__dict__ 1100 sess.close() 1101 1102 for loads, dumps in picklers(): 1103 sess = Session() 1104 f2 = loads(dumps(f1)) 1105 sess.add(f2) 1106 f2.data.y = 12 1107 assert f2 in sess.dirty 1108 1109 def test_set_none(self): 1110 sess = Session() 1111 f1 = Foo(data=None) 1112 sess.add(f1) 1113 sess.commit() 1114 eq_(f1.data, Point(None, None)) 1115 1116 f1.data.y = 5 1117 sess.commit() 1118 eq_(f1.data, Point(None, 5)) 1119 1120 def test_set_illegal(self): 1121 f1 = Foo() 1122 assert_raises_message( 1123 ValueError, 1124 "Attribute 'data' does not accept objects", 1125 setattr, f1, 'data', 'foo' 1126 ) 1127 1128 def test_unrelated_flush(self): 1129 sess = Session() 1130 f1 = Foo(data=Point(3, 4), unrelated_data="unrelated") 1131 sess.add(f1) 1132 sess.flush() 1133 f1.unrelated_data = "unrelated 2" 1134 sess.flush() 1135 f1.data.x = 5 1136 sess.commit() 1137 1138 eq_(f1.data.x, 5) 1139 1140 1141class MutableCompositeCallableTest(_CompositeTestBase, fixtures.MappedTest): 1142 1143 @classmethod 1144 def setup_mappers(cls): 1145 foo = cls.tables.foo 1146 1147 Point = cls._type_fixture() 1148 1149 # in this case, this is not actually a MutableComposite. 1150 # so we don't expect it to track changes 1151 mapper(Foo, foo, properties={ 1152 'data': composite(lambda x, y: Point(x, y), foo.c.x, foo.c.y) 1153 }) 1154 1155 def test_basic(self): 1156 sess = Session() 1157 f1 = Foo(data=Point(3, 4)) 1158 sess.add(f1) 1159 sess.flush() 1160 f1.data.x = 5 1161 sess.commit() 1162 1163 # we didn't get the change. 1164 eq_(f1.data.x, 3) 1165 1166 1167class MutableCompositeCustomCoerceTest(_CompositeTestBase, 1168 fixtures.MappedTest): 1169 1170 @classmethod 1171 def _type_fixture(cls): 1172 1173 return MyPoint 1174 1175 @classmethod 1176 def setup_mappers(cls): 1177 foo = cls.tables.foo 1178 1179 Point = cls._type_fixture() 1180 1181 mapper(Foo, foo, properties={ 1182 'data': composite(Point, foo.c.x, foo.c.y) 1183 }) 1184 1185 def test_custom_coerce(self): 1186 f = Foo() 1187 f.data = (3, 4) 1188 eq_(f.data, Point(3, 4)) 1189 1190 def test_round_trip_ok(self): 1191 sess = Session() 1192 f = Foo() 1193 f.data = (3, 4) 1194 1195 sess.add(f) 1196 sess.commit() 1197 1198 eq_(f.data, Point(3, 4)) 1199 1200 1201class MutableInheritedCompositesTest(_CompositeTestBase, fixtures.MappedTest): 1202 1203 @classmethod 1204 def define_tables(cls, metadata): 1205 Table('foo', metadata, 1206 Column('id', Integer, primary_key=True, 1207 test_needs_autoincrement=True), 1208 Column('x', Integer), 1209 Column('y', Integer) 1210 ) 1211 Table('subfoo', metadata, 1212 Column('id', Integer, ForeignKey('foo.id'), primary_key=True), 1213 ) 1214 1215 @classmethod 1216 def setup_mappers(cls): 1217 foo = cls.tables.foo 1218 subfoo = cls.tables.subfoo 1219 1220 Point = cls._type_fixture() 1221 1222 mapper(Foo, foo, properties={ 1223 'data': composite(Point, foo.c.x, foo.c.y) 1224 }) 1225 mapper(SubFoo, subfoo, inherits=Foo) 1226 1227 def test_in_place_mutation_subclass(self): 1228 sess = Session() 1229 d = Point(3, 4) 1230 f1 = SubFoo(data=d) 1231 sess.add(f1) 1232 sess.commit() 1233 1234 f1.data.y = 5 1235 sess.commit() 1236 1237 eq_(f1.data, Point(3, 5)) 1238 1239 def test_pickle_of_parent_subclass(self): 1240 sess = Session() 1241 d = Point(3, 4) 1242 f1 = SubFoo(data=d) 1243 sess.add(f1) 1244 sess.commit() 1245 1246 f1.data 1247 assert 'data' in f1.__dict__ 1248 sess.close() 1249 1250 for loads, dumps in picklers(): 1251 sess = Session() 1252 f2 = loads(dumps(f1)) 1253 sess.add(f2) 1254 f2.data.y = 12 1255 assert f2 in sess.dirty 1256