1from sqlalchemy import Integer, ForeignKey, String, func
2from sqlalchemy.types import PickleType, TypeDecorator, VARCHAR
3from sqlalchemy.orm import mapper, Session, composite, column_property
4from sqlalchemy.orm.mapper import Mapper
5from sqlalchemy.orm.instrumentation import ClassManager
6from sqlalchemy.testing.schema import Table, Column
7from sqlalchemy.testing import eq_, assert_raises_message, assert_raises
8from sqlalchemy.testing.util import picklers
9from sqlalchemy.testing import fixtures
10from sqlalchemy.ext.mutable import MutableComposite
11from sqlalchemy.ext.mutable import MutableDict, MutableList, MutableSet
12
13
14class Foo(fixtures.BasicEntity):
15    pass
16
17
18class SubFoo(Foo):
19    pass
20
21
22class FooWithEq(object):
23
24    def __init__(self, **kw):
25        for k in kw:
26            setattr(self, k, kw[k])
27
28    def __hash__(self):
29        return hash(self.id)
30
31    def __eq__(self, other):
32        return self.id == other.id
33
34
35class Point(MutableComposite):
36
37    def __init__(self, x, y):
38        self.x = x
39        self.y = y
40
41    def __setattr__(self, key, value):
42        object.__setattr__(self, key, value)
43        self.changed()
44
45    def __composite_values__(self):
46        return self.x, self.y
47
48    def __getstate__(self):
49        return self.x, self.y
50
51    def __setstate__(self, state):
52        self.x, self.y = state
53
54    def __eq__(self, other):
55        return isinstance(other, Point) and \
56            other.x == self.x and \
57            other.y == self.y
58
59
60class MyPoint(Point):
61
62    @classmethod
63    def coerce(cls, key, value):
64        if isinstance(value, tuple):
65            value = Point(*value)
66        return value
67
68
69class _MutableDictTestFixture(object):
70    @classmethod
71    def _type_fixture(cls):
72        return MutableDict
73
74    def teardown(self):
75        # clear out mapper events
76        Mapper.dispatch._clear()
77        ClassManager.dispatch._clear()
78        super(_MutableDictTestFixture, self).teardown()
79
80
81class _MutableDictTestBase(_MutableDictTestFixture):
82    run_define_tables = 'each'
83
84    def setup_mappers(cls):
85        foo = cls.tables.foo
86
87        mapper(Foo, foo)
88
89    def test_coerce_none(self):
90        sess = Session()
91        f1 = Foo(data=None)
92        sess.add(f1)
93        sess.commit()
94        eq_(f1.data, None)
95
96    def test_coerce_raise(self):
97        assert_raises_message(
98            ValueError,
99            "Attribute 'data' does not accept objects of type",
100            Foo, data=set([1, 2, 3])
101        )
102
103    def test_in_place_mutation(self):
104        sess = Session()
105
106        f1 = Foo(data={'a': 'b'})
107        sess.add(f1)
108        sess.commit()
109
110        f1.data['a'] = 'c'
111        sess.commit()
112
113        eq_(f1.data, {'a': 'c'})
114
115    def test_clear(self):
116        sess = Session()
117
118        f1 = Foo(data={'a': 'b'})
119        sess.add(f1)
120        sess.commit()
121
122        f1.data.clear()
123        sess.commit()
124
125        eq_(f1.data, {})
126
127    def test_update(self):
128        sess = Session()
129
130        f1 = Foo(data={'a': 'b'})
131        sess.add(f1)
132        sess.commit()
133
134        f1.data.update({'a': 'z'})
135        sess.commit()
136
137        eq_(f1.data, {'a': 'z'})
138
139    def test_pop(self):
140        sess = Session()
141
142        f1 = Foo(data={'a': 'b', 'c': 'd'})
143        sess.add(f1)
144        sess.commit()
145
146        eq_(f1.data.pop('a'), 'b')
147        sess.commit()
148
149        assert_raises(KeyError, f1.data.pop, 'g')
150
151        eq_(f1.data, {'c': 'd'})
152
153    def test_pop_default(self):
154        sess = Session()
155
156        f1 = Foo(data={'a': 'b', 'c': 'd'})
157        sess.add(f1)
158        sess.commit()
159
160        eq_(f1.data.pop('a', 'q'), 'b')
161        eq_(f1.data.pop('a', 'q'), 'q')
162        sess.commit()
163
164        eq_(f1.data, {'c': 'd'})
165
166    def test_popitem(self):
167        sess = Session()
168
169        orig = {'a': 'b', 'c': 'd'}
170
171        # the orig dict remains unchanged when we assign,
172        # but just making this future-proof
173        data = dict(orig)
174        f1 = Foo(data=data)
175        sess.add(f1)
176        sess.commit()
177
178        k, v = f1.data.popitem()
179        assert k in ('a', 'c')
180        orig.pop(k)
181
182        sess.commit()
183
184        eq_(f1.data, orig)
185
186    def test_setdefault(self):
187        sess = Session()
188
189        f1 = Foo(data={'a': 'b'})
190        sess.add(f1)
191        sess.commit()
192
193        eq_(f1.data.setdefault('c', 'd'), 'd')
194        sess.commit()
195
196        eq_(f1.data, {'a': 'b', 'c': 'd'})
197
198        eq_(f1.data.setdefault('c', 'q'), 'd')
199        sess.commit()
200
201        eq_(f1.data, {'a': 'b', 'c': 'd'})
202
203    def test_replace(self):
204        sess = Session()
205        f1 = Foo(data={'a': 'b'})
206        sess.add(f1)
207        sess.flush()
208
209        f1.data = {'b': 'c'}
210        sess.commit()
211        eq_(f1.data, {'b': 'c'})
212
213    def test_replace_itself_still_ok(self):
214        sess = Session()
215        f1 = Foo(data={'a': 'b'})
216        sess.add(f1)
217        sess.flush()
218
219        f1.data = f1.data
220        f1.data['b'] = 'c'
221        sess.commit()
222        eq_(f1.data, {'a': 'b', 'b': 'c'})
223
224    def test_pickle_parent(self):
225        sess = Session()
226
227        f1 = Foo(data={'a': 'b'})
228        sess.add(f1)
229        sess.commit()
230        f1.data
231        sess.close()
232
233        for loads, dumps in picklers():
234            sess = Session()
235            f2 = loads(dumps(f1))
236            sess.add(f2)
237            f2.data['a'] = 'c'
238            assert f2 in sess.dirty
239
240    def test_unrelated_flush(self):
241        sess = Session()
242        f1 = Foo(data={"a": "b"}, unrelated_data="unrelated")
243        sess.add(f1)
244        sess.flush()
245        f1.unrelated_data = "unrelated 2"
246        sess.flush()
247        f1.data["a"] = "c"
248        sess.commit()
249        eq_(f1.data["a"], "c")
250
251    def _test_non_mutable(self):
252        sess = Session()
253
254        f1 = Foo(non_mutable_data={'a': 'b'})
255        sess.add(f1)
256        sess.commit()
257
258        f1.non_mutable_data['a'] = 'c'
259        sess.commit()
260
261        eq_(f1.non_mutable_data, {'a': 'b'})
262
263
264class _MutableListTestFixture(object):
265    @classmethod
266    def _type_fixture(cls):
267        return MutableList
268
269    def teardown(self):
270        # clear out mapper events
271        Mapper.dispatch._clear()
272        ClassManager.dispatch._clear()
273        super(_MutableListTestFixture, self).teardown()
274
275
276class _MutableListTestBase(_MutableListTestFixture):
277    run_define_tables = 'each'
278
279    def setup_mappers(cls):
280        foo = cls.tables.foo
281
282        mapper(Foo, foo)
283
284    def test_coerce_none(self):
285        sess = Session()
286        f1 = Foo(data=None)
287        sess.add(f1)
288        sess.commit()
289        eq_(f1.data, None)
290
291    def test_coerce_raise(self):
292        assert_raises_message(
293            ValueError,
294            "Attribute 'data' does not accept objects of type",
295            Foo, data=set([1, 2, 3])
296        )
297
298    def test_in_place_mutation(self):
299        sess = Session()
300
301        f1 = Foo(data=[1, 2])
302        sess.add(f1)
303        sess.commit()
304
305        f1.data[0] = 3
306        sess.commit()
307
308        eq_(f1.data, [3, 2])
309
310    def test_in_place_slice_mutation(self):
311        sess = Session()
312
313        f1 = Foo(data=[1, 2, 3, 4])
314        sess.add(f1)
315        sess.commit()
316
317        f1.data[1:3] = 5, 6
318        sess.commit()
319
320        eq_(f1.data, [1, 5, 6, 4])
321
322    def test_del_slice(self):
323        sess = Session()
324
325        f1 = Foo(data=[1, 2, 3, 4])
326        sess.add(f1)
327        sess.commit()
328
329        del f1.data[1:3]
330        sess.commit()
331
332        eq_(f1.data, [1, 4])
333
334    def test_clear(self):
335        if not hasattr(list, 'clear'):
336            # py2 list doesn't have 'clear'
337            return
338        sess = Session()
339
340        f1 = Foo(data=[1, 2])
341        sess.add(f1)
342        sess.commit()
343
344        f1.data.clear()
345        sess.commit()
346
347        eq_(f1.data, [])
348
349    def test_pop(self):
350        sess = Session()
351
352        f1 = Foo(data=[1, 2, 3])
353        sess.add(f1)
354        sess.commit()
355
356        eq_(f1.data.pop(), 3)
357        eq_(f1.data.pop(0), 1)
358        sess.commit()
359
360        assert_raises(IndexError, f1.data.pop, 5)
361
362        eq_(f1.data, [2])
363
364    def test_append(self):
365        sess = Session()
366
367        f1 = Foo(data=[1, 2])
368        sess.add(f1)
369        sess.commit()
370
371        f1.data.append(5)
372        sess.commit()
373
374        eq_(f1.data, [1, 2, 5])
375
376    def test_extend(self):
377        sess = Session()
378
379        f1 = Foo(data=[1, 2])
380        sess.add(f1)
381        sess.commit()
382
383        f1.data.extend([5])
384        sess.commit()
385
386        eq_(f1.data, [1, 2, 5])
387
388    def test_insert(self):
389        sess = Session()
390
391        f1 = Foo(data=[1, 2])
392        sess.add(f1)
393        sess.commit()
394
395        f1.data.insert(1, 5)
396        sess.commit()
397
398        eq_(f1.data, [1, 5, 2])
399
400    def test_remove(self):
401        sess = Session()
402
403        f1 = Foo(data=[1, 2, 3])
404        sess.add(f1)
405        sess.commit()
406
407        f1.data.remove(2)
408        sess.commit()
409
410        eq_(f1.data, [1, 3])
411
412    def test_sort(self):
413        sess = Session()
414
415        f1 = Foo(data=[1, 3, 2])
416        sess.add(f1)
417        sess.commit()
418
419        f1.data.sort()
420        sess.commit()
421
422        eq_(f1.data, [1, 2, 3])
423
424    def test_reverse(self):
425        sess = Session()
426
427        f1 = Foo(data=[1, 3, 2])
428        sess.add(f1)
429        sess.commit()
430
431        f1.data.reverse()
432        sess.commit()
433
434        eq_(f1.data, [2, 3, 1])
435
436    def test_pickle_parent(self):
437        sess = Session()
438
439        f1 = Foo(data=[1, 2])
440        sess.add(f1)
441        sess.commit()
442        f1.data
443        sess.close()
444
445        for loads, dumps in picklers():
446            sess = Session()
447            f2 = loads(dumps(f1))
448            sess.add(f2)
449            f2.data[0] = 3
450            assert f2 in sess.dirty
451
452    def test_unrelated_flush(self):
453        sess = Session()
454        f1 = Foo(data=[1, 2], unrelated_data="unrelated")
455        sess.add(f1)
456        sess.flush()
457        f1.unrelated_data = "unrelated 2"
458        sess.flush()
459        f1.data[0] = 3
460        sess.commit()
461        eq_(f1.data[0], 3)
462
463
464class _MutableSetTestFixture(object):
465    @classmethod
466    def _type_fixture(cls):
467        return MutableSet
468
469    def teardown(self):
470        # clear out mapper events
471        Mapper.dispatch._clear()
472        ClassManager.dispatch._clear()
473        super(_MutableSetTestFixture, self).teardown()
474
475
476class _MutableSetTestBase(_MutableSetTestFixture):
477    run_define_tables = 'each'
478
479    def setup_mappers(cls):
480        foo = cls.tables.foo
481
482        mapper(Foo, foo)
483
484    def test_coerce_none(self):
485        sess = Session()
486        f1 = Foo(data=None)
487        sess.add(f1)
488        sess.commit()
489        eq_(f1.data, None)
490
491    def test_coerce_raise(self):
492        assert_raises_message(
493            ValueError,
494            "Attribute 'data' does not accept objects of type",
495            Foo, data=[1, 2, 3]
496        )
497
498    def test_clear(self):
499        sess = Session()
500
501        f1 = Foo(data=set([1, 2]))
502        sess.add(f1)
503        sess.commit()
504
505        f1.data.clear()
506        sess.commit()
507
508        eq_(f1.data, set())
509
510    def test_pop(self):
511        sess = Session()
512
513        f1 = Foo(data=set([1]))
514        sess.add(f1)
515        sess.commit()
516
517        eq_(f1.data.pop(), 1)
518        sess.commit()
519
520        assert_raises(KeyError, f1.data.pop)
521
522        eq_(f1.data, set())
523
524    def test_add(self):
525        sess = Session()
526
527        f1 = Foo(data=set([1, 2]))
528        sess.add(f1)
529        sess.commit()
530
531        f1.data.add(5)
532        sess.commit()
533
534        eq_(f1.data, set([1, 2, 5]))
535
536    def test_update(self):
537        sess = Session()
538
539        f1 = Foo(data=set([1, 2]))
540        sess.add(f1)
541        sess.commit()
542
543        f1.data.update(set([2, 5]))
544        sess.commit()
545
546        eq_(f1.data, set([1, 2, 5]))
547
548    def test_intersection_update(self):
549        sess = Session()
550
551        f1 = Foo(data=set([1, 2]))
552        sess.add(f1)
553        sess.commit()
554
555        f1.data.intersection_update(set([2, 5]))
556        sess.commit()
557
558        eq_(f1.data, set([2]))
559
560    def test_difference_update(self):
561        sess = Session()
562
563        f1 = Foo(data=set([1, 2]))
564        sess.add(f1)
565        sess.commit()
566
567        f1.data.difference_update(set([2, 5]))
568        sess.commit()
569
570        eq_(f1.data, set([1]))
571
572    def test_symmetric_difference_update(self):
573        sess = Session()
574
575        f1 = Foo(data=set([1, 2]))
576        sess.add(f1)
577        sess.commit()
578
579        f1.data.symmetric_difference_update(set([2, 5]))
580        sess.commit()
581
582        eq_(f1.data, set([1, 5]))
583
584    def test_remove(self):
585        sess = Session()
586
587        f1 = Foo(data=set([1, 2, 3]))
588        sess.add(f1)
589        sess.commit()
590
591        f1.data.remove(2)
592        sess.commit()
593
594        eq_(f1.data, set([1, 3]))
595
596    def test_discard(self):
597        sess = Session()
598
599        f1 = Foo(data=set([1, 2, 3]))
600        sess.add(f1)
601        sess.commit()
602
603        f1.data.discard(2)
604        sess.commit()
605
606        eq_(f1.data, set([1, 3]))
607
608        f1.data.discard(2)
609        sess.commit()
610
611        eq_(f1.data, set([1, 3]))
612
613    def test_pickle_parent(self):
614        sess = Session()
615
616        f1 = Foo(data=set([1, 2]))
617        sess.add(f1)
618        sess.commit()
619        f1.data
620        sess.close()
621
622        for loads, dumps in picklers():
623            sess = Session()
624            f2 = loads(dumps(f1))
625            sess.add(f2)
626            f2.data.add(3)
627            assert f2 in sess.dirty
628
629    def test_unrelated_flush(self):
630        sess = Session()
631        f1 = Foo(data=set([1, 2]), unrelated_data="unrelated")
632        sess.add(f1)
633        sess.flush()
634        f1.unrelated_data = "unrelated 2"
635        sess.flush()
636        f1.data.add(3)
637        sess.commit()
638        eq_(f1.data, set([1, 2, 3]))
639
640
641class MutableColumnDefaultTest(_MutableDictTestFixture, fixtures.MappedTest):
642    @classmethod
643    def define_tables(cls, metadata):
644        MutableDict = cls._type_fixture()
645
646        mutable_pickle = MutableDict.as_mutable(PickleType)
647        Table(
648            'foo', metadata,
649            Column(
650                'id', Integer, primary_key=True,
651                test_needs_autoincrement=True),
652            Column('data', mutable_pickle, default={}),
653        )
654
655    def setup_mappers(cls):
656        foo = cls.tables.foo
657
658        mapper(Foo, foo)
659
660    def test_evt_on_flush_refresh(self):
661        # test for #3427
662
663        sess = Session()
664
665        f1 = Foo()
666        sess.add(f1)
667        sess.flush()
668        assert isinstance(f1.data, self._type_fixture())
669        assert f1 not in sess.dirty
670        f1.data['foo'] = 'bar'
671        assert f1 in sess.dirty
672
673
674class MutableWithScalarPickleTest(_MutableDictTestBase, fixtures.MappedTest):
675
676    @classmethod
677    def define_tables(cls, metadata):
678        MutableDict = cls._type_fixture()
679
680        mutable_pickle = MutableDict.as_mutable(PickleType)
681        Table('foo', metadata,
682              Column('id', Integer, primary_key=True,
683                     test_needs_autoincrement=True),
684              Column('skip', mutable_pickle),
685              Column('data', mutable_pickle),
686              Column('non_mutable_data', PickleType),
687              Column('unrelated_data', String(50))
688              )
689
690    def test_non_mutable(self):
691        self._test_non_mutable()
692
693
694class MutableWithScalarJSONTest(_MutableDictTestBase, fixtures.MappedTest):
695
696    @classmethod
697    def define_tables(cls, metadata):
698        import json
699
700        class JSONEncodedDict(TypeDecorator):
701            impl = VARCHAR(50)
702
703            def process_bind_param(self, value, dialect):
704                if value is not None:
705                    value = json.dumps(value)
706
707                return value
708
709            def process_result_value(self, value, dialect):
710                if value is not None:
711                    value = json.loads(value)
712                return value
713
714        MutableDict = cls._type_fixture()
715
716        Table('foo', metadata,
717              Column('id', Integer, primary_key=True,
718                     test_needs_autoincrement=True),
719              Column('data', MutableDict.as_mutable(JSONEncodedDict)),
720              Column('non_mutable_data', JSONEncodedDict),
721              Column('unrelated_data', String(50))
722              )
723
724    def test_non_mutable(self):
725        self._test_non_mutable()
726
727
728class MutableColumnCopyJSONTest(_MutableDictTestBase, fixtures.MappedTest):
729
730    @classmethod
731    def define_tables(cls, metadata):
732        import json
733        from sqlalchemy.ext.declarative import declarative_base
734
735        class JSONEncodedDict(TypeDecorator):
736            impl = VARCHAR(50)
737
738            def process_bind_param(self, value, dialect):
739                if value is not None:
740                    value = json.dumps(value)
741
742                return value
743
744            def process_result_value(self, value, dialect):
745                if value is not None:
746                    value = json.loads(value)
747                return value
748
749        MutableDict = cls._type_fixture()
750
751        Base = declarative_base(metadata=metadata)
752
753        class AbstractFoo(Base):
754            __abstract__ = True
755
756            id = Column(Integer, primary_key=True,
757                        test_needs_autoincrement=True)
758            data = Column(MutableDict.as_mutable(JSONEncodedDict))
759            non_mutable_data = Column(JSONEncodedDict)
760            unrelated_data = Column(String(50))
761
762        class Foo(AbstractFoo):
763            __tablename__ = "foo"
764            column_prop = column_property(
765                func.lower(AbstractFoo.unrelated_data))
766
767        assert Foo.data.property.columns[0].type is not AbstractFoo.data.type
768
769    def test_non_mutable(self):
770        self._test_non_mutable()
771
772
773class MutableColumnCopyArrayTest(_MutableListTestBase, fixtures.MappedTest):
774    __requires__ = 'array_type',
775
776    @classmethod
777    def define_tables(cls, metadata):
778        from sqlalchemy.ext.declarative import declarative_base
779        from sqlalchemy.sql.sqltypes import ARRAY
780
781        MutableList = cls._type_fixture()
782
783        Base = declarative_base(metadata=metadata)
784
785        class Mixin(object):
786            data = Column(MutableList.as_mutable(ARRAY(Integer)))
787
788        class Foo(Mixin, Base):
789            __tablename__ = 'foo'
790            id = Column(Integer, primary_key=True)
791
792
793class MutableListWithScalarPickleTest(_MutableListTestBase,
794                                      fixtures.MappedTest):
795
796    @classmethod
797    def define_tables(cls, metadata):
798        MutableList = cls._type_fixture()
799
800        mutable_pickle = MutableList.as_mutable(PickleType)
801        Table('foo', metadata,
802              Column('id', Integer, primary_key=True,
803                     test_needs_autoincrement=True),
804              Column('skip', mutable_pickle),
805              Column('data', mutable_pickle),
806              Column('non_mutable_data', PickleType),
807              Column('unrelated_data', String(50))
808              )
809
810
811class MutableSetWithScalarPickleTest(_MutableSetTestBase, fixtures.MappedTest):
812
813    @classmethod
814    def define_tables(cls, metadata):
815        MutableSet = cls._type_fixture()
816
817        mutable_pickle = MutableSet.as_mutable(PickleType)
818        Table('foo', metadata,
819              Column('id', Integer, primary_key=True,
820                     test_needs_autoincrement=True),
821              Column('skip', mutable_pickle),
822              Column('data', mutable_pickle),
823              Column('non_mutable_data', PickleType),
824              Column('unrelated_data', String(50))
825              )
826
827
828class MutableAssocWithAttrInheritTest(_MutableDictTestBase,
829                                      fixtures.MappedTest):
830
831    @classmethod
832    def define_tables(cls, metadata):
833
834        Table('foo', metadata,
835              Column('id', Integer, primary_key=True,
836                     test_needs_autoincrement=True),
837              Column('data', PickleType),
838              Column('non_mutable_data', PickleType),
839              Column('unrelated_data', String(50))
840              )
841
842        Table('subfoo', metadata,
843              Column('id', Integer, ForeignKey('foo.id'), primary_key=True),
844              )
845
846    def setup_mappers(cls):
847        foo = cls.tables.foo
848        subfoo = cls.tables.subfoo
849
850        mapper(Foo, foo)
851        mapper(SubFoo, subfoo, inherits=Foo)
852        MutableDict.associate_with_attribute(Foo.data)
853
854    def test_in_place_mutation(self):
855        sess = Session()
856
857        f1 = SubFoo(data={'a': 'b'})
858        sess.add(f1)
859        sess.commit()
860
861        f1.data['a'] = 'c'
862        sess.commit()
863
864        eq_(f1.data, {'a': 'c'})
865
866    def test_replace(self):
867        sess = Session()
868        f1 = SubFoo(data={'a': 'b'})
869        sess.add(f1)
870        sess.flush()
871
872        f1.data = {'b': 'c'}
873        sess.commit()
874        eq_(f1.data, {'b': 'c'})
875
876
877class MutableAssociationScalarPickleTest(_MutableDictTestBase,
878                                         fixtures.MappedTest):
879
880    @classmethod
881    def define_tables(cls, metadata):
882        MutableDict = cls._type_fixture()
883        MutableDict.associate_with(PickleType)
884
885        Table('foo', metadata,
886              Column('id', Integer, primary_key=True,
887                     test_needs_autoincrement=True),
888              Column('skip', PickleType),
889              Column('data', PickleType),
890              Column('unrelated_data', String(50))
891              )
892
893
894class MutableAssociationScalarJSONTest(_MutableDictTestBase,
895                                       fixtures.MappedTest):
896
897    @classmethod
898    def define_tables(cls, metadata):
899        import json
900
901        class JSONEncodedDict(TypeDecorator):
902            impl = VARCHAR(50)
903
904            def process_bind_param(self, value, dialect):
905                if value is not None:
906                    value = json.dumps(value)
907
908                return value
909
910            def process_result_value(self, value, dialect):
911                if value is not None:
912                    value = json.loads(value)
913                return value
914
915        MutableDict = cls._type_fixture()
916        MutableDict.associate_with(JSONEncodedDict)
917
918        Table('foo', metadata,
919              Column('id', Integer, primary_key=True,
920                     test_needs_autoincrement=True),
921              Column('data', JSONEncodedDict),
922              Column('unrelated_data', String(50))
923              )
924
925
926class CustomMutableAssociationScalarJSONTest(_MutableDictTestBase,
927                                             fixtures.MappedTest):
928
929    CustomMutableDict = None
930
931    @classmethod
932    def _type_fixture(cls):
933        if not(getattr(cls, 'CustomMutableDict')):
934            MutableDict = super(
935                CustomMutableAssociationScalarJSONTest, cls)._type_fixture()
936
937            class CustomMutableDict(MutableDict):
938                pass
939            cls.CustomMutableDict = CustomMutableDict
940        return cls.CustomMutableDict
941
942    @classmethod
943    def define_tables(cls, metadata):
944        import json
945
946        class JSONEncodedDict(TypeDecorator):
947            impl = VARCHAR(50)
948
949            def process_bind_param(self, value, dialect):
950                if value is not None:
951                    value = json.dumps(value)
952
953                return value
954
955            def process_result_value(self, value, dialect):
956                if value is not None:
957                    value = json.loads(value)
958                return value
959
960        CustomMutableDict = cls._type_fixture()
961        CustomMutableDict.associate_with(JSONEncodedDict)
962
963        Table('foo', metadata,
964              Column('id', Integer, primary_key=True,
965                     test_needs_autoincrement=True),
966              Column('data', JSONEncodedDict),
967              Column('unrelated_data', String(50))
968              )
969
970    def test_pickle_parent(self):
971        # Picklers don't know how to pickle CustomMutableDict,
972        # but we aren't testing that here
973        pass
974
975    def test_coerce(self):
976        sess = Session()
977        f1 = Foo(data={'a': 'b'})
978        sess.add(f1)
979        sess.flush()
980        eq_(type(f1.data), self._type_fixture())
981
982
983class _CompositeTestBase(object):
984
985    @classmethod
986    def define_tables(cls, metadata):
987        Table('foo', metadata,
988              Column('id', Integer, primary_key=True,
989                     test_needs_autoincrement=True),
990              Column('x', Integer),
991              Column('y', Integer),
992              Column('unrelated_data', String(50))
993              )
994
995    def setup(self):
996        from sqlalchemy.ext import mutable
997        mutable._setup_composite_listener()
998        super(_CompositeTestBase, self).setup()
999
1000    def teardown(self):
1001        # clear out mapper events
1002        Mapper.dispatch._clear()
1003        ClassManager.dispatch._clear()
1004        super(_CompositeTestBase, self).teardown()
1005
1006    @classmethod
1007    def _type_fixture(cls):
1008
1009        return Point
1010
1011
1012class MutableCompositeColumnDefaultTest(_CompositeTestBase,
1013                                        fixtures.MappedTest):
1014    @classmethod
1015    def define_tables(cls, metadata):
1016        Table(
1017            'foo', metadata,
1018            Column('id', Integer, primary_key=True,
1019                   test_needs_autoincrement=True),
1020            Column('x', Integer, default=5),
1021            Column('y', Integer, default=9),
1022            Column('unrelated_data', String(50))
1023        )
1024
1025    @classmethod
1026    def setup_mappers(cls):
1027        foo = cls.tables.foo
1028
1029        cls.Point = cls._type_fixture()
1030
1031        mapper(Foo, foo, properties={
1032            'data': composite(cls.Point, foo.c.x, foo.c.y)
1033        })
1034
1035    def test_evt_on_flush_refresh(self):
1036        # this still worked prior to #3427 being fixed in any case
1037
1038        sess = Session()
1039
1040        f1 = Foo(data=self.Point(None, None))
1041        sess.add(f1)
1042        sess.flush()
1043        eq_(f1.data, self.Point(5, 9))
1044        assert f1 not in sess.dirty
1045        f1.data.x = 10
1046        assert f1 in sess.dirty
1047
1048
1049class MutableCompositesUnpickleTest(_CompositeTestBase, fixtures.MappedTest):
1050
1051    @classmethod
1052    def setup_mappers(cls):
1053        foo = cls.tables.foo
1054
1055        cls.Point = cls._type_fixture()
1056
1057        mapper(FooWithEq, foo, properties={
1058            'data': composite(cls.Point, foo.c.x, foo.c.y)
1059        })
1060
1061    def test_unpickle_modified_eq(self):
1062        u1 = FooWithEq(data=self.Point(3, 5))
1063        for loads, dumps in picklers():
1064            loads(dumps(u1))
1065
1066
1067class MutableCompositesTest(_CompositeTestBase, fixtures.MappedTest):
1068
1069    @classmethod
1070    def setup_mappers(cls):
1071        foo = cls.tables.foo
1072
1073        Point = cls._type_fixture()
1074
1075        mapper(Foo, foo, properties={
1076            'data': composite(Point, foo.c.x, foo.c.y)
1077        })
1078
1079    def test_in_place_mutation(self):
1080        sess = Session()
1081        d = Point(3, 4)
1082        f1 = Foo(data=d)
1083        sess.add(f1)
1084        sess.commit()
1085
1086        f1.data.y = 5
1087        sess.commit()
1088
1089        eq_(f1.data, Point(3, 5))
1090
1091    def test_pickle_of_parent(self):
1092        sess = Session()
1093        d = Point(3, 4)
1094        f1 = Foo(data=d)
1095        sess.add(f1)
1096        sess.commit()
1097
1098        f1.data
1099        assert 'data' in f1.__dict__
1100        sess.close()
1101
1102        for loads, dumps in picklers():
1103            sess = Session()
1104            f2 = loads(dumps(f1))
1105            sess.add(f2)
1106            f2.data.y = 12
1107            assert f2 in sess.dirty
1108
1109    def test_set_none(self):
1110        sess = Session()
1111        f1 = Foo(data=None)
1112        sess.add(f1)
1113        sess.commit()
1114        eq_(f1.data, Point(None, None))
1115
1116        f1.data.y = 5
1117        sess.commit()
1118        eq_(f1.data, Point(None, 5))
1119
1120    def test_set_illegal(self):
1121        f1 = Foo()
1122        assert_raises_message(
1123            ValueError,
1124            "Attribute 'data' does not accept objects",
1125            setattr, f1, 'data', 'foo'
1126        )
1127
1128    def test_unrelated_flush(self):
1129        sess = Session()
1130        f1 = Foo(data=Point(3, 4), unrelated_data="unrelated")
1131        sess.add(f1)
1132        sess.flush()
1133        f1.unrelated_data = "unrelated 2"
1134        sess.flush()
1135        f1.data.x = 5
1136        sess.commit()
1137
1138        eq_(f1.data.x, 5)
1139
1140
1141class MutableCompositeCallableTest(_CompositeTestBase, fixtures.MappedTest):
1142
1143    @classmethod
1144    def setup_mappers(cls):
1145        foo = cls.tables.foo
1146
1147        Point = cls._type_fixture()
1148
1149        # in this case, this is not actually a MutableComposite.
1150        # so we don't expect it to track changes
1151        mapper(Foo, foo, properties={
1152            'data': composite(lambda x, y: Point(x, y), foo.c.x, foo.c.y)
1153        })
1154
1155    def test_basic(self):
1156        sess = Session()
1157        f1 = Foo(data=Point(3, 4))
1158        sess.add(f1)
1159        sess.flush()
1160        f1.data.x = 5
1161        sess.commit()
1162
1163        # we didn't get the change.
1164        eq_(f1.data.x, 3)
1165
1166
1167class MutableCompositeCustomCoerceTest(_CompositeTestBase,
1168                                       fixtures.MappedTest):
1169
1170    @classmethod
1171    def _type_fixture(cls):
1172
1173        return MyPoint
1174
1175    @classmethod
1176    def setup_mappers(cls):
1177        foo = cls.tables.foo
1178
1179        Point = cls._type_fixture()
1180
1181        mapper(Foo, foo, properties={
1182            'data': composite(Point, foo.c.x, foo.c.y)
1183        })
1184
1185    def test_custom_coerce(self):
1186        f = Foo()
1187        f.data = (3, 4)
1188        eq_(f.data, Point(3, 4))
1189
1190    def test_round_trip_ok(self):
1191        sess = Session()
1192        f = Foo()
1193        f.data = (3, 4)
1194
1195        sess.add(f)
1196        sess.commit()
1197
1198        eq_(f.data, Point(3, 4))
1199
1200
1201class MutableInheritedCompositesTest(_CompositeTestBase, fixtures.MappedTest):
1202
1203    @classmethod
1204    def define_tables(cls, metadata):
1205        Table('foo', metadata,
1206              Column('id', Integer, primary_key=True,
1207                     test_needs_autoincrement=True),
1208              Column('x', Integer),
1209              Column('y', Integer)
1210              )
1211        Table('subfoo', metadata,
1212              Column('id', Integer, ForeignKey('foo.id'), primary_key=True),
1213              )
1214
1215    @classmethod
1216    def setup_mappers(cls):
1217        foo = cls.tables.foo
1218        subfoo = cls.tables.subfoo
1219
1220        Point = cls._type_fixture()
1221
1222        mapper(Foo, foo, properties={
1223            'data': composite(Point, foo.c.x, foo.c.y)
1224        })
1225        mapper(SubFoo, subfoo, inherits=Foo)
1226
1227    def test_in_place_mutation_subclass(self):
1228        sess = Session()
1229        d = Point(3, 4)
1230        f1 = SubFoo(data=d)
1231        sess.add(f1)
1232        sess.commit()
1233
1234        f1.data.y = 5
1235        sess.commit()
1236
1237        eq_(f1.data, Point(3, 5))
1238
1239    def test_pickle_of_parent_subclass(self):
1240        sess = Session()
1241        d = Point(3, 4)
1242        f1 = SubFoo(data=d)
1243        sess.add(f1)
1244        sess.commit()
1245
1246        f1.data
1247        assert 'data' in f1.__dict__
1248        sess.close()
1249
1250        for loads, dumps in picklers():
1251            sess = Session()
1252            f2 = loads(dumps(f1))
1253            sess.add(f2)
1254            f2.data.y = 12
1255            assert f2 in sess.dirty
1256