1"""Unit tests illustrating usage of the ``history_meta.py``
2module functions."""
3
4from unittest import TestCase
5from sqlalchemy.ext.declarative import declarative_base
6from .history_meta import Versioned, versioned_session
7from sqlalchemy import create_engine, Column, Integer, String, \
8    ForeignKey, Boolean, select
9from sqlalchemy.orm import clear_mappers, Session, deferred, relationship, \
10    column_property
11from sqlalchemy.testing import AssertsCompiledSQL, eq_, assert_raises, ne_
12from sqlalchemy.testing.entities import ComparableEntity
13from sqlalchemy.orm import exc as orm_exc
14import warnings
15
16warnings.simplefilter("error")
17
18engine = None
19
20
21def setup_module():
22    global engine
23    engine = create_engine('sqlite://', echo=True)
24
25
26class TestVersioning(TestCase, AssertsCompiledSQL):
27    __dialect__ = 'default'
28
29    def setUp(self):
30        self.session = Session(engine)
31        self.Base = declarative_base()
32        versioned_session(self.session)
33
34    def tearDown(self):
35        self.session.close()
36        clear_mappers()
37        self.Base.metadata.drop_all(engine)
38
39    def create_tables(self):
40        self.Base.metadata.create_all(engine)
41
42    def test_plain(self):
43        class SomeClass(Versioned, self.Base, ComparableEntity):
44            __tablename__ = 'sometable'
45
46            id = Column(Integer, primary_key=True)
47            name = Column(String(50))
48
49        self.create_tables()
50        sess = self.session
51        sc = SomeClass(name='sc1')
52        sess.add(sc)
53        sess.commit()
54
55        sc.name = 'sc1modified'
56        sess.commit()
57
58        assert sc.version == 2
59
60        SomeClassHistory = SomeClass.__history_mapper__.class_
61
62        eq_(
63            sess.query(SomeClassHistory).filter(
64                SomeClassHistory.version == 1).all(),
65            [SomeClassHistory(version=1, name='sc1')]
66        )
67
68        sc.name = 'sc1modified2'
69
70        eq_(
71            sess.query(SomeClassHistory).order_by(
72                SomeClassHistory.version).all(),
73            [
74                SomeClassHistory(version=1, name='sc1'),
75                SomeClassHistory(version=2, name='sc1modified')
76            ]
77        )
78
79        assert sc.version == 3
80
81        sess.commit()
82
83        sc.name = 'temp'
84        sc.name = 'sc1modified2'
85
86        sess.commit()
87
88        eq_(
89            sess.query(SomeClassHistory).order_by(
90                SomeClassHistory.version).all(),
91            [
92                SomeClassHistory(version=1, name='sc1'),
93                SomeClassHistory(version=2, name='sc1modified')
94            ]
95        )
96
97        sess.delete(sc)
98        sess.commit()
99
100        eq_(
101            sess.query(SomeClassHistory).order_by(
102                SomeClassHistory.version).all(),
103            [
104                SomeClassHistory(version=1, name='sc1'),
105                SomeClassHistory(version=2, name='sc1modified'),
106                SomeClassHistory(version=3, name='sc1modified2')
107            ]
108        )
109
110    def test_w_mapper_versioning(self):
111        class SomeClass(Versioned, self.Base, ComparableEntity):
112            __tablename__ = 'sometable'
113
114            id = Column(Integer, primary_key=True)
115            name = Column(String(50))
116
117        SomeClass.__mapper__.version_id_col = SomeClass.__table__.c.version
118
119        self.create_tables()
120        sess = self.session
121        sc = SomeClass(name='sc1')
122        sess.add(sc)
123        sess.commit()
124
125        s2 = Session(sess.bind)
126        sc2 = s2.query(SomeClass).first()
127        sc2.name = 'sc1modified'
128
129        sc.name = 'sc1modified_again'
130        sess.commit()
131
132        eq_(sc.version, 2)
133
134        assert_raises(
135            orm_exc.StaleDataError,
136            s2.flush
137        )
138
139    def test_from_null(self):
140        class SomeClass(Versioned, self.Base, ComparableEntity):
141            __tablename__ = 'sometable'
142
143            id = Column(Integer, primary_key=True)
144            name = Column(String(50))
145
146        self.create_tables()
147        sess = self.session
148        sc = SomeClass()
149        sess.add(sc)
150        sess.commit()
151
152        sc.name = 'sc1'
153        sess.commit()
154
155        assert sc.version == 2
156
157    def test_insert_null(self):
158        class SomeClass(Versioned, self.Base, ComparableEntity):
159            __tablename__ = 'sometable'
160
161            id = Column(Integer, primary_key=True)
162            boole = Column(Boolean, default=False)
163
164        self.create_tables()
165        sess = self.session
166        sc = SomeClass(boole=True)
167        sess.add(sc)
168        sess.commit()
169
170        sc.boole = None
171        sess.commit()
172
173        sc.boole = False
174        sess.commit()
175
176        SomeClassHistory = SomeClass.__history_mapper__.class_
177
178        eq_(
179            sess.query(SomeClassHistory.boole).order_by(
180                SomeClassHistory.id).all(),
181            [(True, ), (None, )]
182        )
183
184        eq_(sc.version, 3)
185
186    def test_deferred(self):
187        """test versioning of unloaded, deferred columns."""
188
189        class SomeClass(Versioned, self.Base, ComparableEntity):
190            __tablename__ = 'sometable'
191
192            id = Column(Integer, primary_key=True)
193            name = Column(String(50))
194            data = deferred(Column(String(25)))
195
196        self.create_tables()
197        sess = self.session
198        sc = SomeClass(name='sc1', data='somedata')
199        sess.add(sc)
200        sess.commit()
201        sess.close()
202
203        sc = sess.query(SomeClass).first()
204        assert 'data' not in sc.__dict__
205
206        sc.name = 'sc1modified'
207        sess.commit()
208
209        assert sc.version == 2
210
211        SomeClassHistory = SomeClass.__history_mapper__.class_
212
213        eq_(
214            sess.query(SomeClassHistory).filter(
215                SomeClassHistory.version == 1).all(),
216            [SomeClassHistory(version=1, name='sc1', data='somedata')]
217        )
218
219    def test_joined_inheritance(self):
220        class BaseClass(Versioned, self.Base, ComparableEntity):
221            __tablename__ = 'basetable'
222
223            id = Column(Integer, primary_key=True)
224            name = Column(String(50))
225            type = Column(String(20))
226
227            __mapper_args__ = {
228                'polymorphic_on': type,
229                'polymorphic_identity': 'base'}
230
231        class SubClassSeparatePk(BaseClass):
232            __tablename__ = 'subtable1'
233
234            id = column_property(
235                Column(Integer, primary_key=True),
236                BaseClass.id
237            )
238            base_id = Column(Integer, ForeignKey('basetable.id'))
239            subdata1 = Column(String(50))
240
241            __mapper_args__ = {'polymorphic_identity': 'sep'}
242
243        class SubClassSamePk(BaseClass):
244            __tablename__ = 'subtable2'
245
246            id = Column(
247                Integer, ForeignKey('basetable.id'), primary_key=True)
248            subdata2 = Column(String(50))
249
250            __mapper_args__ = {'polymorphic_identity': 'same'}
251
252        self.create_tables()
253        sess = self.session
254
255        sep1 = SubClassSeparatePk(name='sep1', subdata1='sep1subdata')
256        base1 = BaseClass(name='base1')
257        same1 = SubClassSamePk(name='same1', subdata2='same1subdata')
258        sess.add_all([sep1, base1, same1])
259        sess.commit()
260
261        base1.name = 'base1mod'
262        same1.subdata2 = 'same1subdatamod'
263        sep1.name = 'sep1mod'
264        sess.commit()
265
266        BaseClassHistory = BaseClass.__history_mapper__.class_
267        SubClassSeparatePkHistory = \
268            SubClassSeparatePk.__history_mapper__.class_
269        SubClassSamePkHistory = SubClassSamePk.__history_mapper__.class_
270        eq_(
271            sess.query(BaseClassHistory).order_by(BaseClassHistory.id).all(),
272            [
273                SubClassSeparatePkHistory(
274                    id=1, name='sep1', type='sep', version=1),
275                BaseClassHistory(id=2, name='base1', type='base', version=1),
276                SubClassSamePkHistory(
277                    id=3, name='same1', type='same', version=1)
278            ]
279        )
280
281        same1.subdata2 = 'same1subdatamod2'
282
283        eq_(
284            sess.query(BaseClassHistory).order_by(
285                BaseClassHistory.id, BaseClassHistory.version).all(),
286            [
287                SubClassSeparatePkHistory(
288                    id=1, name='sep1', type='sep', version=1),
289                BaseClassHistory(id=2, name='base1', type='base', version=1),
290                SubClassSamePkHistory(
291                    id=3, name='same1', type='same', version=1),
292                SubClassSamePkHistory(
293                    id=3, name='same1', type='same', version=2)
294            ]
295        )
296
297        base1.name = 'base1mod2'
298        eq_(
299            sess.query(BaseClassHistory).order_by(
300                BaseClassHistory.id, BaseClassHistory.version).all(),
301            [
302                SubClassSeparatePkHistory(
303                    id=1, name='sep1', type='sep', version=1),
304                BaseClassHistory(id=2, name='base1', type='base', version=1),
305                BaseClassHistory(
306                    id=2, name='base1mod', type='base', version=2),
307                SubClassSamePkHistory(
308                    id=3, name='same1', type='same', version=1),
309                SubClassSamePkHistory(
310                    id=3, name='same1', type='same', version=2)
311            ]
312        )
313
314    def test_joined_inheritance_multilevel(self):
315        class BaseClass(Versioned, self.Base, ComparableEntity):
316            __tablename__ = 'basetable'
317
318            id = Column(Integer, primary_key=True)
319            name = Column(String(50))
320            type = Column(String(20))
321
322            __mapper_args__ = {
323                'polymorphic_on': type,
324                'polymorphic_identity': 'base'}
325
326        class SubClass(BaseClass):
327            __tablename__ = 'subtable'
328
329            id = column_property(
330                Column(Integer, primary_key=True),
331                BaseClass.id
332            )
333            base_id = Column(Integer, ForeignKey('basetable.id'))
334            subdata1 = Column(String(50))
335
336            __mapper_args__ = {'polymorphic_identity': 'sub'}
337
338        class SubSubClass(SubClass):
339            __tablename__ = 'subsubtable'
340
341            id = Column(Integer, ForeignKey('subtable.id'), primary_key=True)
342            subdata2 = Column(String(50))
343
344            __mapper_args__ = {'polymorphic_identity': 'subsub'}
345
346        self.create_tables()
347
348        SubSubHistory = SubSubClass.__history_mapper__.class_
349        sess = self.session
350        q = sess.query(SubSubHistory)
351        self.assert_compile(
352            q,
353
354
355            "SELECT "
356
357            "subsubtable_history.id AS subsubtable_history_id, "
358            "subtable_history.id AS subtable_history_id, "
359            "basetable_history.id AS basetable_history_id, "
360
361            "subsubtable_history.changed AS subsubtable_history_changed, "
362            "subtable_history.changed AS subtable_history_changed, "
363            "basetable_history.changed AS basetable_history_changed, "
364
365            "basetable_history.name AS basetable_history_name, "
366
367            "basetable_history.type AS basetable_history_type, "
368
369            "subsubtable_history.version AS subsubtable_history_version, "
370            "subtable_history.version AS subtable_history_version, "
371            "basetable_history.version AS basetable_history_version, "
372
373
374            "subtable_history.base_id AS subtable_history_base_id, "
375            "subtable_history.subdata1 AS subtable_history_subdata1, "
376            "subsubtable_history.subdata2 AS subsubtable_history_subdata2 "
377            "FROM basetable_history "
378            "JOIN subtable_history "
379            "ON basetable_history.id = subtable_history.base_id "
380            "AND basetable_history.version = subtable_history.version "
381            "JOIN subsubtable_history ON subtable_history.id = "
382            "subsubtable_history.id AND subtable_history.version = "
383            "subsubtable_history.version"
384        )
385
386        ssc = SubSubClass(name='ss1', subdata1='sd1', subdata2='sd2')
387        sess.add(ssc)
388        sess.commit()
389        eq_(
390            sess.query(SubSubHistory).all(),
391            []
392        )
393        ssc.subdata1 = 'sd11'
394        ssc.subdata2 = 'sd22'
395        sess.commit()
396        eq_(
397            sess.query(SubSubHistory).all(),
398            [SubSubHistory(name='ss1', subdata1='sd1',
399                                subdata2='sd2', type='subsub', version=1)]
400        )
401        eq_(ssc, SubSubClass(
402            name='ss1', subdata1='sd11',
403            subdata2='sd22', version=2))
404
405    def test_joined_inheritance_changed(self):
406        class BaseClass(Versioned, self.Base, ComparableEntity):
407            __tablename__ = 'basetable'
408
409            id = Column(Integer, primary_key=True)
410            name = Column(String(50))
411            type = Column(String(20))
412
413            __mapper_args__ = {
414                'polymorphic_on': type,
415                'polymorphic_identity': 'base'
416            }
417
418        class SubClass(BaseClass):
419            __tablename__ = 'subtable'
420
421            id = Column(Integer, ForeignKey('basetable.id'), primary_key=True)
422
423            __mapper_args__ = {'polymorphic_identity': 'sep'}
424
425        self.create_tables()
426
427        BaseClassHistory = BaseClass.__history_mapper__.class_
428        SubClassHistory = SubClass.__history_mapper__.class_
429        sess = self.session
430        s1 = SubClass(name='s1')
431        sess.add(s1)
432        sess.commit()
433
434        s1.name = 's2'
435        sess.commit()
436
437        actual_changed_base = sess.scalar(
438            select([BaseClass.__history_mapper__.local_table.c.changed]))
439        actual_changed_sub = sess.scalar(
440            select([SubClass.__history_mapper__.local_table.c.changed]))
441        h1 = sess.query(BaseClassHistory).first()
442        eq_(h1.changed, actual_changed_base)
443        eq_(h1.changed, actual_changed_sub)
444
445        h1 = sess.query(SubClassHistory).first()
446        eq_(h1.changed, actual_changed_base)
447        eq_(h1.changed, actual_changed_sub)
448
449    def test_single_inheritance(self):
450        class BaseClass(Versioned, self.Base, ComparableEntity):
451            __tablename__ = 'basetable'
452
453            id = Column(Integer, primary_key=True)
454            name = Column(String(50))
455            type = Column(String(50))
456            __mapper_args__ = {
457                'polymorphic_on': type,
458                'polymorphic_identity': 'base'}
459
460        class SubClass(BaseClass):
461
462            subname = Column(String(50), unique=True)
463            __mapper_args__ = {'polymorphic_identity': 'sub'}
464
465        self.create_tables()
466        sess = self.session
467
468        b1 = BaseClass(name='b1')
469        sc = SubClass(name='s1', subname='sc1')
470
471        sess.add_all([b1, sc])
472
473        sess.commit()
474
475        b1.name = 'b1modified'
476
477        BaseClassHistory = BaseClass.__history_mapper__.class_
478        SubClassHistory = SubClass.__history_mapper__.class_
479
480        eq_(
481            sess.query(BaseClassHistory).order_by(
482                BaseClassHistory.id, BaseClassHistory.version).all(),
483            [BaseClassHistory(id=1, name='b1', type='base', version=1)]
484        )
485
486        sc.name = 's1modified'
487        b1.name = 'b1modified2'
488
489        eq_(
490            sess.query(BaseClassHistory).order_by(
491                BaseClassHistory.id, BaseClassHistory.version).all(),
492            [
493                BaseClassHistory(id=1, name='b1', type='base', version=1),
494                BaseClassHistory(
495                    id=1, name='b1modified', type='base', version=2),
496                SubClassHistory(id=2, name='s1', type='sub', version=1)
497            ]
498        )
499
500        # test the unique constraint on the subclass
501        # column
502        sc.name = "modifyagain"
503        sess.flush()
504
505    def test_unique(self):
506        class SomeClass(Versioned, self.Base, ComparableEntity):
507            __tablename__ = 'sometable'
508
509            id = Column(Integer, primary_key=True)
510            name = Column(String(50), unique=True)
511            data = Column(String(50))
512
513        self.create_tables()
514        sess = self.session
515        sc = SomeClass(name='sc1', data='sc1')
516        sess.add(sc)
517        sess.commit()
518
519        sc.data = 'sc1modified'
520        sess.commit()
521
522        assert sc.version == 2
523
524        sc.data = 'sc1modified2'
525        sess.commit()
526
527        assert sc.version == 3
528
529    def test_relationship(self):
530
531        class SomeRelated(self.Base, ComparableEntity):
532            __tablename__ = 'somerelated'
533
534            id = Column(Integer, primary_key=True)
535
536        class SomeClass(Versioned, self.Base, ComparableEntity):
537            __tablename__ = 'sometable'
538
539            id = Column(Integer, primary_key=True)
540            name = Column(String(50))
541            related_id = Column(Integer, ForeignKey('somerelated.id'))
542            related = relationship("SomeRelated", backref='classes')
543
544        SomeClassHistory = SomeClass.__history_mapper__.class_
545
546        self.create_tables()
547        sess = self.session
548        sc = SomeClass(name='sc1')
549        sess.add(sc)
550        sess.commit()
551
552        assert sc.version == 1
553
554        sr1 = SomeRelated()
555        sc.related = sr1
556        sess.commit()
557
558        assert sc.version == 2
559
560        eq_(
561            sess.query(SomeClassHistory).filter(
562                SomeClassHistory.version == 1).all(),
563            [SomeClassHistory(version=1, name='sc1', related_id=None)]
564        )
565
566        sc.related = None
567
568        eq_(
569            sess.query(SomeClassHistory).order_by(
570                SomeClassHistory.version).all(),
571            [
572                SomeClassHistory(version=1, name='sc1', related_id=None),
573                SomeClassHistory(version=2, name='sc1', related_id=sr1.id)
574            ]
575        )
576
577        assert sc.version == 3
578
579    def test_backref_relationship(self):
580
581        class SomeRelated(self.Base, ComparableEntity):
582            __tablename__ = 'somerelated'
583
584            id = Column(Integer, primary_key=True)
585            name = Column(String(50))
586            related_id = Column(Integer, ForeignKey('sometable.id'))
587            related = relationship("SomeClass", backref='related')
588
589        class SomeClass(Versioned, self.Base, ComparableEntity):
590            __tablename__ = 'sometable'
591
592            id = Column(Integer, primary_key=True)
593
594        self.create_tables()
595        sess = self.session
596        sc = SomeClass()
597        sess.add(sc)
598        sess.commit()
599
600        assert sc.version == 1
601
602        sr = SomeRelated(name='sr', related=sc)
603        sess.add(sr)
604        sess.commit()
605
606        assert sc.version == 1
607
608        sr.name = 'sr2'
609        sess.commit()
610
611        assert sc.version == 1
612
613        sess.delete(sr)
614        sess.commit()
615
616        assert sc.version == 1
617
618    def test_create_double_flush(self):
619
620        class SomeClass(Versioned, self.Base, ComparableEntity):
621            __tablename__ = 'sometable'
622
623            id = Column(Integer, primary_key=True)
624            name = Column(String(30))
625            other = Column(String(30))
626
627        self.create_tables()
628
629        sc = SomeClass()
630        self.session.add(sc)
631        self.session.flush()
632        sc.name = 'Foo'
633        self.session.flush()
634
635        assert sc.version == 2
636
637    def test_mutate_plain_column(self):
638        class Document(self.Base, Versioned):
639            __tablename__ = 'document'
640            id = Column(Integer, primary_key=True, autoincrement=True)
641            name = Column(String, nullable=True)
642            description_ = Column('description', String, nullable=True)
643
644        self.create_tables()
645
646        document = Document()
647        self.session.add(document)
648        document.name = 'Foo'
649        self.session.commit()
650        document.name = 'Bar'
651        self.session.commit()
652
653        DocumentHistory = Document.__history_mapper__.class_
654        v2 = self.session.query(Document).one()
655        v1 = self.session.query(DocumentHistory).one()
656        self.assertEqual(v1.id, v2.id)
657        self.assertEqual(v2.name, 'Bar')
658        self.assertEqual(v1.name, 'Foo')
659
660    def test_mutate_named_column(self):
661        class Document(self.Base, Versioned):
662            __tablename__ = 'document'
663            id = Column(Integer, primary_key=True, autoincrement=True)
664            name = Column(String, nullable=True)
665            description_ = Column('description', String, nullable=True)
666
667        self.create_tables()
668
669        document = Document()
670        self.session.add(document)
671        document.description_ = 'Foo'
672        self.session.commit()
673        document.description_ = 'Bar'
674        self.session.commit()
675
676        DocumentHistory = Document.__history_mapper__.class_
677        v2 = self.session.query(Document).one()
678        v1 = self.session.query(DocumentHistory).one()
679        self.assertEqual(v1.id, v2.id)
680        self.assertEqual(v2.description_, 'Bar')
681        self.assertEqual(v1.description_, 'Foo')
682
683    def test_unique_identifiers_across_deletes(self):
684        """Ensure unique integer values are used for the primary table.
685
686        Checks whether the database assigns the same identifier twice
687        within the span of a table.  SQLite will do this if
688        sqlite_autoincrement is not set (e.g. SQLite's AUTOINCREMENT flag).
689
690        """
691
692        class SomeClass(Versioned, self.Base, ComparableEntity):
693            __tablename__ = 'sometable'
694
695            id = Column(Integer, primary_key=True)
696            name = Column(String(50))
697
698        self.create_tables()
699        sess = self.session
700        sc = SomeClass(name='sc1')
701        sess.add(sc)
702        sess.commit()
703
704        sess.delete(sc)
705        sess.commit()
706
707        sc2 = SomeClass(name='sc2')
708        sess.add(sc2)
709        sess.commit()
710
711        SomeClassHistory = SomeClass.__history_mapper__.class_
712
713        # only one entry should exist in the history table; one()
714        # ensures that
715        scdeleted = sess.query(SomeClassHistory).one()
716
717        # If sc2 has the same id that deleted sc1 had,
718        # it will fail when modified or deleted
719        # because of the violation of the uniqueness of the primary key on
720        # sometable_history
721        ne_(sc2.id, scdeleted.id)
722
723        # If previous assertion fails, this will also fail:
724        sc2.name = 'sc2 modified'
725        sess.commit()
726