1
2from sqlalchemy.testing import assert_raises, assert_raises_message
3import sqlalchemy as sa
4from sqlalchemy import MetaData, Integer, ForeignKey, util, event
5from sqlalchemy.orm import mapper, relationship, create_session, \
6    attributes, class_mapper, clear_mappers, instrumentation, events
7from sqlalchemy.testing.schema import Table
8from sqlalchemy.testing.schema import Column
9from sqlalchemy.testing import eq_, ne_
10from sqlalchemy.testing import fixtures
11from sqlalchemy import testing
12
13
14class InitTest(fixtures.ORMTest):
15    def fixture(self):
16        return Table('t', MetaData(),
17                     Column('id', Integer, primary_key=True),
18                     Column('type', Integer),
19                     Column('x', Integer),
20                     Column('y', Integer))
21
22    def register(self, cls, canary):
23        original_init = cls.__init__
24        instrumentation.register_class(cls)
25        ne_(cls.__init__, original_init)
26        manager = instrumentation.manager_of_class(cls)
27        def init(state, args, kwargs):
28            canary.append((cls, 'init', state.class_))
29        event.listen(manager, 'init', init, raw=True)
30
31    def test_ai(self):
32        inits = []
33
34        class A(object):
35            def __init__(self):
36                inits.append((A, '__init__'))
37
38        obj = A()
39        eq_(inits, [(A, '__init__')])
40
41    def test_A(self):
42        inits = []
43
44        class A(object):
45            pass
46        self.register(A, inits)
47
48        obj = A()
49        eq_(inits, [(A, 'init', A)])
50
51    def test_Ai(self):
52        inits = []
53
54        class A(object):
55            def __init__(self):
56                inits.append((A, '__init__'))
57        self.register(A, inits)
58
59        obj = A()
60        eq_(inits, [(A, 'init', A), (A, '__init__')])
61
62    def test_ai_B(self):
63        inits = []
64
65        class A(object):
66            def __init__(self):
67                inits.append((A, '__init__'))
68
69        class B(A):
70            pass
71        self.register(B, inits)
72
73        obj = A()
74        eq_(inits, [(A, '__init__')])
75
76        del inits[:]
77
78        obj = B()
79        eq_(inits, [(B, 'init', B), (A, '__init__')])
80
81    def test_ai_Bi(self):
82        inits = []
83
84        class A(object):
85            def __init__(self):
86                inits.append((A, '__init__'))
87
88        class B(A):
89            def __init__(self):
90                inits.append((B, '__init__'))
91                super(B, self).__init__()
92        self.register(B, inits)
93
94        obj = A()
95        eq_(inits, [(A, '__init__')])
96
97        del inits[:]
98
99        obj = B()
100        eq_(inits, [(B, 'init', B), (B, '__init__'), (A, '__init__')])
101
102    def test_Ai_bi(self):
103        inits = []
104
105        class A(object):
106            def __init__(self):
107                inits.append((A, '__init__'))
108        self.register(A, inits)
109
110        class B(A):
111            def __init__(self):
112                inits.append((B, '__init__'))
113                super(B, self).__init__()
114
115        obj = A()
116        eq_(inits, [(A, 'init', A), (A, '__init__')])
117
118        del inits[:]
119
120        obj = B()
121        eq_(inits, [(B, '__init__'), (A, 'init', B), (A, '__init__')])
122
123    def test_Ai_Bi(self):
124        inits = []
125
126        class A(object):
127            def __init__(self):
128                inits.append((A, '__init__'))
129        self.register(A, inits)
130
131        class B(A):
132            def __init__(self):
133                inits.append((B, '__init__'))
134                super(B, self).__init__()
135        self.register(B, inits)
136
137        obj = A()
138        eq_(inits, [(A, 'init', A), (A, '__init__')])
139
140        del inits[:]
141
142        obj = B()
143        eq_(inits, [(B, 'init', B), (B, '__init__'), (A, '__init__')])
144
145    def test_Ai_B(self):
146        inits = []
147
148        class A(object):
149            def __init__(self):
150                inits.append((A, '__init__'))
151        self.register(A, inits)
152
153        class B(A):
154            pass
155        self.register(B, inits)
156
157        obj = A()
158        eq_(inits, [(A, 'init', A), (A, '__init__')])
159
160        del inits[:]
161
162        obj = B()
163        eq_(inits, [(B, 'init', B), (A, '__init__')])
164
165    def test_Ai_Bi_Ci(self):
166        inits = []
167
168        class A(object):
169            def __init__(self):
170                inits.append((A, '__init__'))
171        self.register(A, inits)
172
173        class B(A):
174            def __init__(self):
175                inits.append((B, '__init__'))
176                super(B, self).__init__()
177        self.register(B, inits)
178
179        class C(B):
180            def __init__(self):
181                inits.append((C, '__init__'))
182                super(C, self).__init__()
183        self.register(C, inits)
184
185        obj = A()
186        eq_(inits, [(A, 'init', A), (A, '__init__')])
187
188        del inits[:]
189
190        obj = B()
191        eq_(inits, [(B, 'init', B), (B, '__init__'), (A, '__init__')])
192
193        del inits[:]
194        obj = C()
195        eq_(inits, [(C, 'init', C), (C, '__init__'), (B, '__init__'),
196                   (A, '__init__')])
197
198    def test_Ai_bi_Ci(self):
199        inits = []
200
201        class A(object):
202            def __init__(self):
203                inits.append((A, '__init__'))
204        self.register(A, inits)
205
206        class B(A):
207            def __init__(self):
208                inits.append((B, '__init__'))
209                super(B, self).__init__()
210
211        class C(B):
212            def __init__(self):
213                inits.append((C, '__init__'))
214                super(C, self).__init__()
215        self.register(C, inits)
216
217        obj = A()
218        eq_(inits, [(A, 'init', A), (A, '__init__')])
219
220        del inits[:]
221
222        obj = B()
223        eq_(inits, [(B, '__init__'), (A, 'init', B), (A, '__init__')])
224
225        del inits[:]
226        obj = C()
227        eq_(inits, [(C, 'init', C), (C, '__init__'),  (B, '__init__'),
228                   (A, '__init__')])
229
230    def test_Ai_b_Ci(self):
231        inits = []
232
233        class A(object):
234            def __init__(self):
235                inits.append((A, '__init__'))
236        self.register(A, inits)
237
238        class B(A):
239            pass
240
241        class C(B):
242            def __init__(self):
243                inits.append((C, '__init__'))
244                super(C, self).__init__()
245        self.register(C, inits)
246
247        obj = A()
248        eq_(inits, [(A, 'init', A), (A, '__init__')])
249
250        del inits[:]
251
252        obj = B()
253        eq_(inits, [(A, 'init', B), (A, '__init__')])
254
255        del inits[:]
256        obj = C()
257        eq_(inits, [(C, 'init', C), (C, '__init__'), (A, '__init__')])
258
259    def test_Ai_B_Ci(self):
260        inits = []
261
262        class A(object):
263            def __init__(self):
264                inits.append((A, '__init__'))
265        self.register(A, inits)
266
267        class B(A):
268            pass
269        self.register(B, inits)
270
271        class C(B):
272            def __init__(self):
273                inits.append((C, '__init__'))
274                super(C, self).__init__()
275        self.register(C, inits)
276
277        obj = A()
278        eq_(inits, [(A, 'init', A), (A, '__init__')])
279
280        del inits[:]
281
282        obj = B()
283        eq_(inits, [(B, 'init', B), (A, '__init__')])
284
285        del inits[:]
286        obj = C()
287        eq_(inits, [(C, 'init', C), (C, '__init__'), (A, '__init__')])
288
289    def test_Ai_B_C(self):
290        inits = []
291
292        class A(object):
293            def __init__(self):
294                inits.append((A, '__init__'))
295        self.register(A, inits)
296
297        class B(A):
298            pass
299        self.register(B, inits)
300
301        class C(B):
302            pass
303        self.register(C, inits)
304
305        obj = A()
306        eq_(inits, [(A, 'init', A), (A, '__init__')])
307
308        del inits[:]
309
310        obj = B()
311        eq_(inits, [(B, 'init', B), (A, '__init__')])
312
313        del inits[:]
314        obj = C()
315        eq_(inits, [(C, 'init', C), (A, '__init__')])
316
317    def test_A_Bi_C(self):
318        inits = []
319
320        class A(object):
321            pass
322        self.register(A, inits)
323
324        class B(A):
325            def __init__(self):
326                inits.append((B, '__init__'))
327        self.register(B, inits)
328
329        class C(B):
330            pass
331        self.register(C, inits)
332
333        obj = A()
334        eq_(inits, [(A, 'init', A)])
335
336        del inits[:]
337
338        obj = B()
339        eq_(inits, [(B, 'init', B), (B, '__init__')])
340
341        del inits[:]
342        obj = C()
343        eq_(inits, [(C, 'init', C), (B, '__init__')])
344
345    def test_A_B_Ci(self):
346        inits = []
347
348        class A(object):
349            pass
350        self.register(A, inits)
351
352        class B(A):
353            pass
354        self.register(B, inits)
355
356        class C(B):
357            def __init__(self):
358                inits.append((C, '__init__'))
359        self.register(C, inits)
360
361        obj = A()
362        eq_(inits, [(A, 'init', A)])
363
364        del inits[:]
365
366        obj = B()
367        eq_(inits, [(B, 'init', B)])
368
369        del inits[:]
370        obj = C()
371        eq_(inits, [(C, 'init', C), (C, '__init__')])
372
373    def test_A_B_C(self):
374        inits = []
375
376        class A(object):
377            pass
378        self.register(A, inits)
379
380        class B(A):
381            pass
382        self.register(B, inits)
383
384        class C(B):
385            pass
386        self.register(C, inits)
387
388        obj = A()
389        eq_(inits, [(A, 'init', A)])
390
391        del inits[:]
392
393        obj = B()
394        eq_(inits, [(B, 'init', B)])
395
396        del inits[:]
397        obj = C()
398        eq_(inits, [(C, 'init', C)])
399
400    def test_defaulted_init(self):
401        class X(object):
402            def __init__(self_, a, b=123, c='abc'):
403                self_.a = a
404                self_.b = b
405                self_.c = c
406        instrumentation.register_class(X)
407
408        o = X('foo')
409        eq_(o.a, 'foo')
410        eq_(o.b, 123)
411        eq_(o.c, 'abc')
412
413        class Y(object):
414            unique = object()
415
416            class OutOfScopeForEval(object):
417                def __repr__(self_):
418                    # misleading repr
419                    return '123'
420
421            outofscope = OutOfScopeForEval()
422
423            def __init__(self_, u=unique, o=outofscope):
424                self_.u = u
425                self_.o = o
426
427        instrumentation.register_class(Y)
428
429        o = Y()
430        assert o.u is Y.unique
431        assert o.o is Y.outofscope
432
433
434class MapperInitTest(fixtures.ORMTest):
435
436    def fixture(self):
437        return Table('t', MetaData(),
438                     Column('id', Integer, primary_key=True),
439                     Column('type', Integer),
440                     Column('x', Integer),
441                     Column('y', Integer))
442
443    def test_partially_mapped_inheritance(self):
444        class A(object):
445            pass
446
447        class B(A):
448            pass
449
450        class C(B):
451            def __init__(self, x):
452                pass
453
454        m = mapper(A, self.fixture())
455
456        # B is not mapped in the current implementation
457        assert_raises(sa.orm.exc.UnmappedClassError, class_mapper, B)
458
459        # C is not mapped in the current implementation
460        assert_raises(sa.orm.exc.UnmappedClassError, class_mapper, C)
461
462    def test_del_warning(self):
463        class A(object):
464            def __del__(self):
465                pass
466
467        assert_raises_message(
468            sa.exc.SAWarning,
469            r"__del__\(\) method on class "
470            "<class '.*\.A'> will cause "
471            "unreachable cycles and memory leaks, as SQLAlchemy "
472            "instrumentation often creates reference cycles.  "
473            "Please remove this method.",
474            mapper, A, self.fixture()
475        )
476
477class OnLoadTest(fixtures.ORMTest):
478    """Check that Events.load is not hit in regular attributes operations."""
479
480    def test_basic(self):
481        import pickle
482
483        global A
484        class A(object):
485            pass
486
487        def canary(instance):
488            assert False
489
490        try:
491            instrumentation.register_class(A)
492            manager = instrumentation.manager_of_class(A)
493            event.listen(manager, 'load', canary)
494
495            a = A()
496            p_a = pickle.dumps(a)
497            re_a = pickle.loads(p_a)
498        finally:
499            del A
500
501
502class NativeInstrumentationTest(fixtures.ORMTest):
503    def test_register_reserved_attribute(self):
504        class T(object):
505            pass
506
507        instrumentation.register_class(T)
508        manager = instrumentation.manager_of_class(T)
509
510        sa = instrumentation.ClassManager.STATE_ATTR
511        ma = instrumentation.ClassManager.MANAGER_ATTR
512
513        fails = lambda method, attr: assert_raises(
514            KeyError, getattr(manager, method), attr, property())
515
516        fails('install_member', sa)
517        fails('install_member', ma)
518        fails('install_descriptor', sa)
519        fails('install_descriptor', ma)
520
521    def test_mapped_stateattr(self):
522        t = Table('t', MetaData(),
523                  Column('id', Integer, primary_key=True),
524                  Column(instrumentation.ClassManager.STATE_ATTR, Integer))
525
526        class T(object):
527            pass
528
529        assert_raises(KeyError, mapper, T, t)
530
531    def test_mapped_managerattr(self):
532        t = Table('t', MetaData(),
533                  Column('id', Integer, primary_key=True),
534                  Column(instrumentation.ClassManager.MANAGER_ATTR, Integer))
535
536        class T(object):
537            pass
538        assert_raises(KeyError, mapper, T, t)
539
540class Py3KFunctionInstTest(fixtures.ORMTest):
541    __requires__ = ("python3", )
542
543
544    def _instrument(self, cls):
545        manager = instrumentation.register_class(cls)
546        canary = []
547        def check(target, args, kwargs):
548            canary.append((args, kwargs))
549        event.listen(manager, "init", check)
550        return cls, canary
551
552    def test_kw_only_args(self):
553        cls, canary = self._kw_only_fixture()
554
555        a = cls("a", b="b", c="c")
556        eq_(canary, [(('a', ), {'b': 'b', 'c': 'c'})])
557
558    def test_kw_plus_posn_args(self):
559        cls, canary = self._kw_plus_posn_fixture()
560
561        a = cls("a", 1, 2, 3, b="b", c="c")
562        eq_(canary, [(('a', 1, 2, 3), {'b': 'b', 'c': 'c'})])
563
564    def test_kw_only_args_plus_opt(self):
565        cls, canary = self._kw_opt_fixture()
566
567        a = cls("a", b="b")
568        eq_(canary, [(('a', ), {'b': 'b', 'c': 'c'})])
569
570        canary[:] = []
571        a = cls("a", b="b", c="d")
572        eq_(canary, [(('a', ), {'b': 'b', 'c': 'd'})])
573
574    def test_kw_only_sig(self):
575        cls, canary = self._kw_only_fixture()
576        assert_raises(
577            TypeError,
578            cls, "a", "b", "c"
579        )
580
581    def test_kw_plus_opt_sig(self):
582        cls, canary = self._kw_only_fixture()
583        assert_raises(
584            TypeError,
585            cls, "a", "b", "c"
586        )
587
588        assert_raises(
589            TypeError,
590            cls, "a", "b", c="c"
591        )
592
593if util.py3k:
594    _locals = {}
595    exec("""
596def _kw_only_fixture(self):
597    class A(object):
598        def __init__(self, a, *, b, c):
599            self.a = a
600            self.b = b
601            self.c = c
602    return self._instrument(A)
603
604def _kw_plus_posn_fixture(self):
605    class A(object):
606        def __init__(self, a, *args, b, c):
607            self.a = a
608            self.b = b
609            self.c = c
610    return self._instrument(A)
611
612def _kw_opt_fixture(self):
613    class A(object):
614        def __init__(self, a, *, b, c="c"):
615            self.a = a
616            self.b = b
617            self.c = c
618    return self._instrument(A)
619""", _locals)
620    for k in _locals:
621        setattr(Py3KFunctionInstTest, k, _locals[k])
622
623class MiscTest(fixtures.ORMTest):
624    """Seems basic, but not directly covered elsewhere!"""
625
626    def test_compileonattr(self):
627        t = Table('t', MetaData(),
628                  Column('id', Integer, primary_key=True),
629                  Column('x', Integer))
630        class A(object):
631            pass
632        mapper(A, t)
633
634        a = A()
635        assert a.id is None
636
637    def test_compileonattr_rel(self):
638        m = MetaData()
639        t1 = Table('t1', m,
640                   Column('id', Integer, primary_key=True),
641                   Column('x', Integer))
642        t2 = Table('t2', m,
643                   Column('id', Integer, primary_key=True),
644                   Column('t1_id', Integer, ForeignKey('t1.id')))
645        class A(object):
646            pass
647        class B(object):
648            pass
649        mapper(A, t1, properties=dict(bs=relationship(B)))
650        mapper(B, t2)
651
652        a = A()
653        assert not a.bs
654
655    def test_uninstrument(self):
656        class A(object):
657            pass
658
659        manager = instrumentation.register_class(A)
660        attributes.register_attribute(A, 'x', uselist=False, useobject=False)
661
662        assert instrumentation.manager_of_class(A) is manager
663        instrumentation.unregister_class(A)
664        assert instrumentation.manager_of_class(A) is None
665        assert not hasattr(A, 'x')
666
667        # I prefer 'is' here but on pypy
668        # it seems only == works
669        assert A.__init__ == object.__init__
670
671    def test_compileonattr_rel_backref_a(self):
672        m = MetaData()
673        t1 = Table('t1', m,
674                   Column('id', Integer, primary_key=True),
675                   Column('x', Integer))
676        t2 = Table('t2', m,
677                   Column('id', Integer, primary_key=True),
678                   Column('t1_id', Integer, ForeignKey('t1.id')))
679
680        class Base(object):
681            def __init__(self, *args, **kwargs):
682                pass
683
684        for base in object, Base:
685            class A(base):
686                pass
687            class B(base):
688                pass
689            mapper(A, t1, properties=dict(bs=relationship(B, backref='a')))
690            mapper(B, t2)
691
692            b = B()
693            assert b.a is None
694            a = A()
695            b.a = a
696
697            session = create_session()
698            session.add(b)
699            assert a in session, "base is %s" % base
700
701    def test_compileonattr_rel_backref_b(self):
702        m = MetaData()
703        t1 = Table('t1', m,
704                   Column('id', Integer, primary_key=True),
705                   Column('x', Integer))
706        t2 = Table('t2', m,
707                   Column('id', Integer, primary_key=True),
708                   Column('t1_id', Integer, ForeignKey('t1.id')))
709
710        class Base(object):
711            def __init__(self):
712                pass
713        class Base_AKW(object):
714            def __init__(self, *args, **kwargs):
715                pass
716
717        for base in object, Base, Base_AKW:
718            class A(base):
719                pass
720            class B(base):
721                pass
722            mapper(A, t1)
723            mapper(B, t2, properties=dict(a=relationship(A, backref='bs')))
724
725            a = A()
726            b = B()
727            b.a = a
728
729            session = create_session()
730            session.add(a)
731            assert b in session, 'base: %s' % base
732
733
734